package db import ( "context" "fmt" "time" "github.com/flatrender/notification-svc/internal/models" "github.com/google/uuid" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" ) type Store struct { pool *pgxpool.Pool } func NewStore(pool *pgxpool.Pool) *Store { return &Store{pool: pool} } // ── Notifications ───────────────────────────────────────────────────────────── func (s *Store) ListNotifications(ctx context.Context, userID uuid.UUID, onlyUnread bool, page, pageSize int) ([]*models.Notification, int64, error) { where := "user_id = $1 AND deleted_at IS NULL" args := []any{userID} if onlyUnread { where += " AND seen = FALSE" } var total int64 _ = s.pool.QueryRow(ctx, fmt.Sprintf("SELECT COUNT(*) FROM notification.notifications WHERE %s", where), args...).Scan(&total) args = append(args, pageSize, (page-1)*pageSize) rows, err := s.pool.Query(ctx, fmt.Sprintf(`SELECT id, tenant_id, user_id, notification_type::text, priority::text, title, message, label, signature, icon, image, animation_demo, design, action_url, action_text, render_job_id, export_id, payment_id, gift_id, earned_gift_id, is_emergency, seen, seen_at, clicked, clicked_at, gift_used, expire_date, created_at, updated_at FROM notification.notifications WHERE %s ORDER BY created_at DESC LIMIT $%d OFFSET $%d`, where, len(args)-1, len(args)), args...) if err != nil { return nil, 0, err } defer rows.Close() notifs, err := scanNotifications(rows) return notifs, total, err } func (s *Store) GetNotificationByID(ctx context.Context, id, userID uuid.UUID) (*models.Notification, error) { rows, err := s.pool.Query(ctx, ` SELECT id, tenant_id, user_id, notification_type::text, priority::text, title, message, label, signature, icon, image, animation_demo, design, action_url, action_text, render_job_id, export_id, payment_id, gift_id, earned_gift_id, is_emergency, seen, seen_at, clicked, clicked_at, gift_used, expire_date, created_at, updated_at FROM notification.notifications WHERE id = $1 AND user_id = $2 AND deleted_at IS NULL`, id, userID) if err != nil { return nil, err } defer rows.Close() notifs, err := scanNotifications(rows) if err != nil { return nil, err } if len(notifs) == 0 { return nil, fmt.Errorf("notification not found") } return notifs[0], nil } func (s *Store) CreateNotification(ctx context.Context, req *models.CreateNotificationRequest) (*models.Notification, error) { priority := "Normal" if req.Priority != nil { priority = *req.Priority } var id uuid.UUID err := s.pool.QueryRow(ctx, ` INSERT INTO notification.notifications (tenant_id, user_id, notification_type, priority, title, message, label, icon, image, action_url, action_text, render_job_id, export_id, payment_id, gift_id, earned_gift_id, is_emergency, expire_date) VALUES ($1,$2,$3::notification_kind,$4::notification_priority,$5,$6,$7, $8,$9,$10,$11, $12,$13,$14,$15,$16, $17,$18) RETURNING id`, req.TenantID, req.UserID, req.NotificationType, priority, req.Title, req.Message, req.Label, req.Icon, req.Image, req.ActionURL, req.ActionText, req.RenderJobID, req.ExportID, req.PaymentID, req.GiftID, req.EarnedGiftID, req.IsEmergency, req.ExpireDate, ).Scan(&id) if err != nil { return nil, err } return s.GetNotificationByID(ctx, id, req.UserID) } func (s *Store) MarkSeen(ctx context.Context, id, userID uuid.UUID) error { _, err := s.pool.Exec(ctx, ` UPDATE notification.notifications SET seen = TRUE, seen_at = NOW(), updated_at = NOW() WHERE id = $1 AND user_id = $2 AND seen = FALSE`, id, userID) return err } func (s *Store) MarkAllSeen(ctx context.Context, userID uuid.UUID) (int64, error) { tag, err := s.pool.Exec(ctx, ` UPDATE notification.notifications SET seen = TRUE, seen_at = NOW(), updated_at = NOW() WHERE user_id = $1 AND seen = FALSE AND deleted_at IS NULL`, userID) return tag.RowsAffected(), err } func (s *Store) MarkClicked(ctx context.Context, id, userID uuid.UUID) error { _, err := s.pool.Exec(ctx, ` UPDATE notification.notifications SET clicked = TRUE, clicked_at = NOW(), seen = TRUE, seen_at = COALESCE(seen_at, NOW()), updated_at = NOW() WHERE id = $1 AND user_id = $2`, id, userID) return err } func (s *Store) SoftDeleteNotification(ctx context.Context, id, userID uuid.UUID) error { tag, err := s.pool.Exec(ctx, ` UPDATE notification.notifications SET deleted_at = NOW() WHERE id = $1 AND user_id = $2 AND deleted_at IS NULL`, id, userID) if err != nil { return err } if tag.RowsAffected() == 0 { return fmt.Errorf("notification not found") } return nil } func (s *Store) CountUnread(ctx context.Context, userID uuid.UUID) (int64, error) { var count int64 err := s.pool.QueryRow(ctx, `SELECT COUNT(*) FROM notification.notifications WHERE user_id = $1 AND seen = FALSE AND deleted_at IS NULL`, userID).Scan(&count) return count, err } // ── Preferences ─────────────────────────────────────────────────────────────── func (s *Store) ListPreferences(ctx context.Context, userID uuid.UUID) ([]*models.NotificationPreference, error) { rows, err := s.pool.Query(ctx, ` SELECT id, user_id, notification_type::text, channel::text, enabled, updated_at FROM notification.notification_preferences WHERE user_id = $1 ORDER BY notification_type, channel`, userID) if err != nil { return nil, err } defer rows.Close() var out []*models.NotificationPreference for rows.Next() { p := &models.NotificationPreference{} if err := rows.Scan(&p.ID, &p.UserID, &p.NotificationType, &p.Channel, &p.Enabled, &p.UpdatedAt); err != nil { return nil, err } out = append(out, p) } return out, rows.Err() } func (s *Store) UpsertPreference(ctx context.Context, userID uuid.UUID, req *models.UpdatePreferenceRequest) error { _, err := s.pool.Exec(ctx, ` INSERT INTO notification.notification_preferences (user_id, notification_type, channel, enabled) VALUES ($1, $2::notification_kind, $3::delivery_channel, $4) ON CONFLICT (user_id, notification_type, channel) DO UPDATE SET enabled = EXCLUDED.enabled, updated_at = NOW()`, userID, req.NotificationType, req.Channel, req.Enabled) return err } // ── Templates (admin) ───────────────────────────────────────────────────────── func (s *Store) ListTemplates(ctx context.Context, tenantID *uuid.UUID) ([]*models.NotificationTemplate, error) { var rows pgx.Rows var err error if tenantID != nil { rows, err = s.pool.Query(ctx, ` SELECT id, tenant_id, code, channel::text, locale, subject, body_text, body_html, push_title, push_body, push_icon, variables_schema::text, is_active, created_at, updated_at FROM notification.notification_templates WHERE (tenant_id = $1 OR tenant_id IS NULL) AND is_active = TRUE ORDER BY code, channel, locale`, *tenantID) } else { rows, err = s.pool.Query(ctx, ` SELECT id, tenant_id, code, channel::text, locale, subject, body_text, body_html, push_title, push_body, push_icon, variables_schema::text, is_active, created_at, updated_at FROM notification.notification_templates ORDER BY code, channel, locale`) } if err != nil { return nil, err } defer rows.Close() return scanTemplates(rows) } func (s *Store) UpsertTemplate(ctx context.Context, tenantID *uuid.UUID, req *models.TemplateUpsertRequest) (*models.NotificationTemplate, error) { isActive := true if req.IsActive != nil { isActive = *req.IsActive } var id uuid.UUID err := s.pool.QueryRow(ctx, ` INSERT INTO notification.notification_templates (tenant_id, code, channel, locale, subject, body_text, body_html, push_title, push_body, push_icon, is_active) VALUES ($1,$2,$3::delivery_channel,$4,$5,$6,$7,$8,$9,$10,$11) ON CONFLICT (tenant_id, code, channel, locale) DO UPDATE SET subject = EXCLUDED.subject, body_text = EXCLUDED.body_text, body_html = EXCLUDED.body_html, push_title = EXCLUDED.push_title, push_body = EXCLUDED.push_body, push_icon = EXCLUDED.push_icon, is_active = EXCLUDED.is_active, updated_at = NOW() RETURNING id`, tenantID, req.Code, req.Channel, req.Locale, req.Subject, req.BodyText, req.BodyHTML, req.PushTitle, req.PushBody, req.PushIcon, isActive, ).Scan(&id) if err != nil { return nil, err } rows, err := s.pool.Query(ctx, ` SELECT id, tenant_id, code, channel::text, locale, subject, body_text, body_html, push_title, push_body, push_icon, variables_schema::text, is_active, created_at, updated_at FROM notification.notification_templates WHERE id = $1`, id) if err != nil { return nil, err } defer rows.Close() tpls, err := scanTemplates(rows) if err != nil || len(tpls) == 0 { return nil, err } return tpls[0], nil } // ── Deliveries ──────────────────────────────────────────────────────────────── func (s *Store) CreateDelivery(ctx context.Context, notifID uuid.UUID, userID, tenantID uuid.UUID, channel, recipient string) error { _, err := s.pool.Exec(ctx, ` INSERT INTO notification.notification_deliveries (tenant_id, user_id, notification_id, channel, recipient, status) VALUES ($1,$2,$3,$4::delivery_channel,$5,'Pending')`, tenantID, userID, notifID, channel, recipient) return err } func (s *Store) UpdateDeliveryStatus(ctx context.Context, id uuid.UUID, status, errMsg *string, providerMsgID *string) error { _, err := s.pool.Exec(ctx, ` UPDATE notification.notification_deliveries SET status = $1::delivery_status_kind, error_message = $2, provider_message_id = $3, sent_at = CASE WHEN $1 = 'Sent' THEN NOW() ELSE sent_at END, delivered_at = CASE WHEN $1 = 'Delivered' THEN NOW() ELSE delivered_at END, failed_at = CASE WHEN $1 IN ('Failed','Bounced') THEN NOW() ELSE failed_at END, updated_at = NOW() WHERE id = $4`, status, errMsg, providerMsgID, id) return err } // ── Scanners ───────────────────────────────────────────────────────────────── func scanNotifications(rows pgx.Rows) ([]*models.Notification, error) { var out []*models.Notification for rows.Next() { n := &models.Notification{} if err := rows.Scan( &n.ID, &n.TenantID, &n.UserID, &n.NotificationType, &n.Priority, &n.Title, &n.Message, &n.Label, &n.Signature, &n.Icon, &n.Image, &n.AnimationDemo, &n.Design, &n.ActionURL, &n.ActionText, &n.RenderJobID, &n.ExportID, &n.PaymentID, &n.GiftID, &n.EarnedGiftID, &n.IsEmergency, &n.Seen, &n.SeenAt, &n.Clicked, &n.ClickedAt, &n.GiftUsed, &n.ExpireDate, &n.CreatedAt, &n.UpdatedAt, ); err != nil { return nil, err } out = append(out, n) } return out, rows.Err() } func scanTemplates(rows pgx.Rows) ([]*models.NotificationTemplate, error) { var out []*models.NotificationTemplate for rows.Next() { t := &models.NotificationTemplate{} var varsSchema *string if err := rows.Scan(&t.ID, &t.TenantID, &t.Code, &t.Channel, &t.Locale, &t.Subject, &t.BodyText, &t.BodyHTML, &t.PushTitle, &t.PushBody, &t.PushIcon, &varsSchema, &t.IsActive, &t.CreatedAt, &t.UpdatedAt); err != nil { return nil, err } t.VariablesSchema = varsSchema out = append(out, t) } return out, rows.Err() } // ── Cleanup helpers ─────────────────────────────────────────────────────────── func (s *Store) PurgeExpiredNotifications(ctx context.Context) (int64, error) { tag, err := s.pool.Exec(ctx, ` UPDATE notification.notifications SET deleted_at = NOW() WHERE expire_date < $1 AND deleted_at IS NULL`, time.Now()) return tag.RowsAffected(), err }