90ac0b81d1
Add full V2 architecture: identity, content, studio (.NET 10) and file, render, notification, gateway (Go) services with vendored deps, plus DB migrations, event/API contracts, and an init-db script. Wire the Next.js frontend to the gateway: server-side JWT auth routes (login/register/refresh/logout/me), gateway fetch helper, and session/ cookie/jwt helpers under src/lib. Containerize the stack via docker-compose.v2.yml and per-service Dockerfiles. Base images resolve through a Nexus mirror (Docker Hub) and MCR directly; npm/NuGet pull from Nexus groups. Self-host fonts via next/font/local to avoid Google Fonts (geo-blocked). Add CI workflow and ignore .env.v2, *.stackdump, and .NET bin/obj. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
318 lines
12 KiB
Go
318 lines
12 KiB
Go
package db
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/flatrender/notification-svc/internal/models"
|
|
"github.com/google/uuid"
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
)
|
|
|
|
type Store struct {
|
|
pool *pgxpool.Pool
|
|
}
|
|
|
|
func NewStore(pool *pgxpool.Pool) *Store {
|
|
return &Store{pool: pool}
|
|
}
|
|
|
|
// ── Notifications ─────────────────────────────────────────────────────────────
|
|
|
|
func (s *Store) ListNotifications(ctx context.Context, userID uuid.UUID, onlyUnread bool, page, pageSize int) ([]*models.Notification, int64, error) {
|
|
where := "user_id = $1 AND deleted_at IS NULL"
|
|
args := []any{userID}
|
|
if onlyUnread {
|
|
where += " AND seen = FALSE"
|
|
}
|
|
|
|
var total int64
|
|
_ = s.pool.QueryRow(ctx, fmt.Sprintf("SELECT COUNT(*) FROM notification.notifications WHERE %s", where), args...).Scan(&total)
|
|
|
|
args = append(args, pageSize, (page-1)*pageSize)
|
|
rows, err := s.pool.Query(ctx,
|
|
fmt.Sprintf(`SELECT id, tenant_id, user_id,
|
|
notification_type::text, priority::text,
|
|
title, message, label, signature, icon, image, animation_demo, design,
|
|
action_url, action_text,
|
|
render_job_id, export_id, payment_id, gift_id, earned_gift_id,
|
|
is_emergency, seen, seen_at, clicked, clicked_at, gift_used,
|
|
expire_date, created_at, updated_at
|
|
FROM notification.notifications
|
|
WHERE %s
|
|
ORDER BY created_at DESC
|
|
LIMIT $%d OFFSET $%d`, where, len(args)-1, len(args)), args...)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
defer rows.Close()
|
|
notifs, err := scanNotifications(rows)
|
|
return notifs, total, err
|
|
}
|
|
|
|
func (s *Store) GetNotificationByID(ctx context.Context, id, userID uuid.UUID) (*models.Notification, error) {
|
|
rows, err := s.pool.Query(ctx, `
|
|
SELECT id, tenant_id, user_id,
|
|
notification_type::text, priority::text,
|
|
title, message, label, signature, icon, image, animation_demo, design,
|
|
action_url, action_text,
|
|
render_job_id, export_id, payment_id, gift_id, earned_gift_id,
|
|
is_emergency, seen, seen_at, clicked, clicked_at, gift_used,
|
|
expire_date, created_at, updated_at
|
|
FROM notification.notifications
|
|
WHERE id = $1 AND user_id = $2 AND deleted_at IS NULL`, id, userID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
notifs, err := scanNotifications(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(notifs) == 0 {
|
|
return nil, fmt.Errorf("notification not found")
|
|
}
|
|
return notifs[0], nil
|
|
}
|
|
|
|
func (s *Store) CreateNotification(ctx context.Context, req *models.CreateNotificationRequest) (*models.Notification, error) {
|
|
priority := "Normal"
|
|
if req.Priority != nil {
|
|
priority = *req.Priority
|
|
}
|
|
var id uuid.UUID
|
|
err := s.pool.QueryRow(ctx, `
|
|
INSERT INTO notification.notifications
|
|
(tenant_id, user_id, notification_type, priority, title, message, label,
|
|
icon, image, action_url, action_text,
|
|
render_job_id, export_id, payment_id, gift_id, earned_gift_id,
|
|
is_emergency, expire_date)
|
|
VALUES ($1,$2,$3::notification_kind,$4::notification_priority,$5,$6,$7,
|
|
$8,$9,$10,$11,
|
|
$12,$13,$14,$15,$16,
|
|
$17,$18)
|
|
RETURNING id`,
|
|
req.TenantID, req.UserID, req.NotificationType, priority,
|
|
req.Title, req.Message, req.Label,
|
|
req.Icon, req.Image, req.ActionURL, req.ActionText,
|
|
req.RenderJobID, req.ExportID, req.PaymentID, req.GiftID, req.EarnedGiftID,
|
|
req.IsEmergency, req.ExpireDate,
|
|
).Scan(&id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return s.GetNotificationByID(ctx, id, req.UserID)
|
|
}
|
|
|
|
func (s *Store) MarkSeen(ctx context.Context, id, userID uuid.UUID) error {
|
|
_, err := s.pool.Exec(ctx, `
|
|
UPDATE notification.notifications SET seen = TRUE, seen_at = NOW(), updated_at = NOW()
|
|
WHERE id = $1 AND user_id = $2 AND seen = FALSE`, id, userID)
|
|
return err
|
|
}
|
|
|
|
func (s *Store) MarkAllSeen(ctx context.Context, userID uuid.UUID) (int64, error) {
|
|
tag, err := s.pool.Exec(ctx, `
|
|
UPDATE notification.notifications SET seen = TRUE, seen_at = NOW(), updated_at = NOW()
|
|
WHERE user_id = $1 AND seen = FALSE AND deleted_at IS NULL`, userID)
|
|
return tag.RowsAffected(), err
|
|
}
|
|
|
|
func (s *Store) MarkClicked(ctx context.Context, id, userID uuid.UUID) error {
|
|
_, err := s.pool.Exec(ctx, `
|
|
UPDATE notification.notifications
|
|
SET clicked = TRUE, clicked_at = NOW(), seen = TRUE, seen_at = COALESCE(seen_at, NOW()), updated_at = NOW()
|
|
WHERE id = $1 AND user_id = $2`, id, userID)
|
|
return err
|
|
}
|
|
|
|
func (s *Store) SoftDeleteNotification(ctx context.Context, id, userID uuid.UUID) error {
|
|
tag, err := s.pool.Exec(ctx, `
|
|
UPDATE notification.notifications SET deleted_at = NOW()
|
|
WHERE id = $1 AND user_id = $2 AND deleted_at IS NULL`, id, userID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if tag.RowsAffected() == 0 {
|
|
return fmt.Errorf("notification not found")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Store) CountUnread(ctx context.Context, userID uuid.UUID) (int64, error) {
|
|
var count int64
|
|
err := s.pool.QueryRow(ctx,
|
|
`SELECT COUNT(*) FROM notification.notifications WHERE user_id = $1 AND seen = FALSE AND deleted_at IS NULL`, userID).Scan(&count)
|
|
return count, err
|
|
}
|
|
|
|
// ── Preferences ───────────────────────────────────────────────────────────────
|
|
|
|
func (s *Store) ListPreferences(ctx context.Context, userID uuid.UUID) ([]*models.NotificationPreference, error) {
|
|
rows, err := s.pool.Query(ctx, `
|
|
SELECT id, user_id, notification_type::text, channel::text, enabled, updated_at
|
|
FROM notification.notification_preferences WHERE user_id = $1 ORDER BY notification_type, channel`, userID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var out []*models.NotificationPreference
|
|
for rows.Next() {
|
|
p := &models.NotificationPreference{}
|
|
if err := rows.Scan(&p.ID, &p.UserID, &p.NotificationType, &p.Channel, &p.Enabled, &p.UpdatedAt); err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, p)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
func (s *Store) UpsertPreference(ctx context.Context, userID uuid.UUID, req *models.UpdatePreferenceRequest) error {
|
|
_, err := s.pool.Exec(ctx, `
|
|
INSERT INTO notification.notification_preferences (user_id, notification_type, channel, enabled)
|
|
VALUES ($1, $2::notification_kind, $3::delivery_channel, $4)
|
|
ON CONFLICT (user_id, notification_type, channel) DO UPDATE
|
|
SET enabled = EXCLUDED.enabled, updated_at = NOW()`,
|
|
userID, req.NotificationType, req.Channel, req.Enabled)
|
|
return err
|
|
}
|
|
|
|
// ── Templates (admin) ─────────────────────────────────────────────────────────
|
|
|
|
func (s *Store) ListTemplates(ctx context.Context, tenantID *uuid.UUID) ([]*models.NotificationTemplate, error) {
|
|
var rows pgx.Rows
|
|
var err error
|
|
if tenantID != nil {
|
|
rows, err = s.pool.Query(ctx, `
|
|
SELECT id, tenant_id, code, channel::text, locale, subject, body_text, body_html,
|
|
push_title, push_body, push_icon, variables_schema::text, is_active, created_at, updated_at
|
|
FROM notification.notification_templates
|
|
WHERE (tenant_id = $1 OR tenant_id IS NULL) AND is_active = TRUE
|
|
ORDER BY code, channel, locale`, *tenantID)
|
|
} else {
|
|
rows, err = s.pool.Query(ctx, `
|
|
SELECT id, tenant_id, code, channel::text, locale, subject, body_text, body_html,
|
|
push_title, push_body, push_icon, variables_schema::text, is_active, created_at, updated_at
|
|
FROM notification.notification_templates
|
|
ORDER BY code, channel, locale`)
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
return scanTemplates(rows)
|
|
}
|
|
|
|
func (s *Store) UpsertTemplate(ctx context.Context, tenantID *uuid.UUID, req *models.TemplateUpsertRequest) (*models.NotificationTemplate, error) {
|
|
isActive := true
|
|
if req.IsActive != nil {
|
|
isActive = *req.IsActive
|
|
}
|
|
var id uuid.UUID
|
|
err := s.pool.QueryRow(ctx, `
|
|
INSERT INTO notification.notification_templates
|
|
(tenant_id, code, channel, locale, subject, body_text, body_html,
|
|
push_title, push_body, push_icon, is_active)
|
|
VALUES ($1,$2,$3::delivery_channel,$4,$5,$6,$7,$8,$9,$10,$11)
|
|
ON CONFLICT (tenant_id, code, channel, locale) DO UPDATE SET
|
|
subject = EXCLUDED.subject, body_text = EXCLUDED.body_text, body_html = EXCLUDED.body_html,
|
|
push_title = EXCLUDED.push_title, push_body = EXCLUDED.push_body, push_icon = EXCLUDED.push_icon,
|
|
is_active = EXCLUDED.is_active, updated_at = NOW()
|
|
RETURNING id`,
|
|
tenantID, req.Code, req.Channel, req.Locale, req.Subject, req.BodyText, req.BodyHTML,
|
|
req.PushTitle, req.PushBody, req.PushIcon, isActive,
|
|
).Scan(&id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rows, err := s.pool.Query(ctx, `
|
|
SELECT id, tenant_id, code, channel::text, locale, subject, body_text, body_html,
|
|
push_title, push_body, push_icon, variables_schema::text, is_active, created_at, updated_at
|
|
FROM notification.notification_templates WHERE id = $1`, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
tpls, err := scanTemplates(rows)
|
|
if err != nil || len(tpls) == 0 {
|
|
return nil, err
|
|
}
|
|
return tpls[0], nil
|
|
}
|
|
|
|
// ── Deliveries ────────────────────────────────────────────────────────────────
|
|
|
|
func (s *Store) CreateDelivery(ctx context.Context, notifID uuid.UUID, userID, tenantID uuid.UUID, channel, recipient string) error {
|
|
_, err := s.pool.Exec(ctx, `
|
|
INSERT INTO notification.notification_deliveries
|
|
(tenant_id, user_id, notification_id, channel, recipient, status)
|
|
VALUES ($1,$2,$3,$4::delivery_channel,$5,'Pending')`,
|
|
tenantID, userID, notifID, channel, recipient)
|
|
return err
|
|
}
|
|
|
|
func (s *Store) UpdateDeliveryStatus(ctx context.Context, id uuid.UUID, status, errMsg *string, providerMsgID *string) error {
|
|
_, err := s.pool.Exec(ctx, `
|
|
UPDATE notification.notification_deliveries SET
|
|
status = $1::delivery_status_kind,
|
|
error_message = $2,
|
|
provider_message_id = $3,
|
|
sent_at = CASE WHEN $1 = 'Sent' THEN NOW() ELSE sent_at END,
|
|
delivered_at = CASE WHEN $1 = 'Delivered' THEN NOW() ELSE delivered_at END,
|
|
failed_at = CASE WHEN $1 IN ('Failed','Bounced') THEN NOW() ELSE failed_at END,
|
|
updated_at = NOW()
|
|
WHERE id = $4`,
|
|
status, errMsg, providerMsgID, id)
|
|
return err
|
|
}
|
|
|
|
// ── Scanners ─────────────────────────────────────────────────────────────────
|
|
|
|
func scanNotifications(rows pgx.Rows) ([]*models.Notification, error) {
|
|
var out []*models.Notification
|
|
for rows.Next() {
|
|
n := &models.Notification{}
|
|
if err := rows.Scan(
|
|
&n.ID, &n.TenantID, &n.UserID,
|
|
&n.NotificationType, &n.Priority,
|
|
&n.Title, &n.Message, &n.Label, &n.Signature, &n.Icon, &n.Image, &n.AnimationDemo, &n.Design,
|
|
&n.ActionURL, &n.ActionText,
|
|
&n.RenderJobID, &n.ExportID, &n.PaymentID, &n.GiftID, &n.EarnedGiftID,
|
|
&n.IsEmergency, &n.Seen, &n.SeenAt, &n.Clicked, &n.ClickedAt, &n.GiftUsed,
|
|
&n.ExpireDate, &n.CreatedAt, &n.UpdatedAt,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, n)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
func scanTemplates(rows pgx.Rows) ([]*models.NotificationTemplate, error) {
|
|
var out []*models.NotificationTemplate
|
|
for rows.Next() {
|
|
t := &models.NotificationTemplate{}
|
|
var varsSchema *string
|
|
if err := rows.Scan(&t.ID, &t.TenantID, &t.Code, &t.Channel, &t.Locale,
|
|
&t.Subject, &t.BodyText, &t.BodyHTML, &t.PushTitle, &t.PushBody, &t.PushIcon,
|
|
&varsSchema, &t.IsActive, &t.CreatedAt, &t.UpdatedAt); err != nil {
|
|
return nil, err
|
|
}
|
|
t.VariablesSchema = varsSchema
|
|
out = append(out, t)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
// ── Cleanup helpers ───────────────────────────────────────────────────────────
|
|
|
|
func (s *Store) PurgeExpiredNotifications(ctx context.Context) (int64, error) {
|
|
tag, err := s.pool.Exec(ctx, `
|
|
UPDATE notification.notifications
|
|
SET deleted_at = NOW()
|
|
WHERE expire_date < $1 AND deleted_at IS NULL`, time.Now())
|
|
return tag.RowsAffected(), err
|
|
}
|