Files
soroush.asadi 90ac0b81d1 feat: V2 microservices stack — backend services, gateway, JWT auth
Add full V2 architecture: identity, content, studio (.NET 10) and file,
render, notification, gateway (Go) services with vendored deps, plus DB
migrations, event/API contracts, and an init-db script.

Wire the Next.js frontend to the gateway: server-side JWT auth routes
(login/register/refresh/logout/me), gateway fetch helper, and session/
cookie/jwt helpers under src/lib.

Containerize the stack via docker-compose.v2.yml and per-service
Dockerfiles. Base images resolve through a Nexus mirror (Docker Hub) and
MCR directly; npm/NuGet pull from Nexus groups. Self-host fonts via
next/font/local to avoid Google Fonts (geo-blocked).

Add CI workflow and ignore .env.v2, *.stackdump, and .NET bin/obj.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-29 23:29:31 +03:30

318 lines
12 KiB
Go

package db
import (
"context"
"fmt"
"time"
"github.com/flatrender/notification-svc/internal/models"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
type Store struct {
pool *pgxpool.Pool
}
func NewStore(pool *pgxpool.Pool) *Store {
return &Store{pool: pool}
}
// ── Notifications ─────────────────────────────────────────────────────────────
func (s *Store) ListNotifications(ctx context.Context, userID uuid.UUID, onlyUnread bool, page, pageSize int) ([]*models.Notification, int64, error) {
where := "user_id = $1 AND deleted_at IS NULL"
args := []any{userID}
if onlyUnread {
where += " AND seen = FALSE"
}
var total int64
_ = s.pool.QueryRow(ctx, fmt.Sprintf("SELECT COUNT(*) FROM notification.notifications WHERE %s", where), args...).Scan(&total)
args = append(args, pageSize, (page-1)*pageSize)
rows, err := s.pool.Query(ctx,
fmt.Sprintf(`SELECT id, tenant_id, user_id,
notification_type::text, priority::text,
title, message, label, signature, icon, image, animation_demo, design,
action_url, action_text,
render_job_id, export_id, payment_id, gift_id, earned_gift_id,
is_emergency, seen, seen_at, clicked, clicked_at, gift_used,
expire_date, created_at, updated_at
FROM notification.notifications
WHERE %s
ORDER BY created_at DESC
LIMIT $%d OFFSET $%d`, where, len(args)-1, len(args)), args...)
if err != nil {
return nil, 0, err
}
defer rows.Close()
notifs, err := scanNotifications(rows)
return notifs, total, err
}
func (s *Store) GetNotificationByID(ctx context.Context, id, userID uuid.UUID) (*models.Notification, error) {
rows, err := s.pool.Query(ctx, `
SELECT id, tenant_id, user_id,
notification_type::text, priority::text,
title, message, label, signature, icon, image, animation_demo, design,
action_url, action_text,
render_job_id, export_id, payment_id, gift_id, earned_gift_id,
is_emergency, seen, seen_at, clicked, clicked_at, gift_used,
expire_date, created_at, updated_at
FROM notification.notifications
WHERE id = $1 AND user_id = $2 AND deleted_at IS NULL`, id, userID)
if err != nil {
return nil, err
}
defer rows.Close()
notifs, err := scanNotifications(rows)
if err != nil {
return nil, err
}
if len(notifs) == 0 {
return nil, fmt.Errorf("notification not found")
}
return notifs[0], nil
}
func (s *Store) CreateNotification(ctx context.Context, req *models.CreateNotificationRequest) (*models.Notification, error) {
priority := "Normal"
if req.Priority != nil {
priority = *req.Priority
}
var id uuid.UUID
err := s.pool.QueryRow(ctx, `
INSERT INTO notification.notifications
(tenant_id, user_id, notification_type, priority, title, message, label,
icon, image, action_url, action_text,
render_job_id, export_id, payment_id, gift_id, earned_gift_id,
is_emergency, expire_date)
VALUES ($1,$2,$3::notification_kind,$4::notification_priority,$5,$6,$7,
$8,$9,$10,$11,
$12,$13,$14,$15,$16,
$17,$18)
RETURNING id`,
req.TenantID, req.UserID, req.NotificationType, priority,
req.Title, req.Message, req.Label,
req.Icon, req.Image, req.ActionURL, req.ActionText,
req.RenderJobID, req.ExportID, req.PaymentID, req.GiftID, req.EarnedGiftID,
req.IsEmergency, req.ExpireDate,
).Scan(&id)
if err != nil {
return nil, err
}
return s.GetNotificationByID(ctx, id, req.UserID)
}
func (s *Store) MarkSeen(ctx context.Context, id, userID uuid.UUID) error {
_, err := s.pool.Exec(ctx, `
UPDATE notification.notifications SET seen = TRUE, seen_at = NOW(), updated_at = NOW()
WHERE id = $1 AND user_id = $2 AND seen = FALSE`, id, userID)
return err
}
func (s *Store) MarkAllSeen(ctx context.Context, userID uuid.UUID) (int64, error) {
tag, err := s.pool.Exec(ctx, `
UPDATE notification.notifications SET seen = TRUE, seen_at = NOW(), updated_at = NOW()
WHERE user_id = $1 AND seen = FALSE AND deleted_at IS NULL`, userID)
return tag.RowsAffected(), err
}
func (s *Store) MarkClicked(ctx context.Context, id, userID uuid.UUID) error {
_, err := s.pool.Exec(ctx, `
UPDATE notification.notifications
SET clicked = TRUE, clicked_at = NOW(), seen = TRUE, seen_at = COALESCE(seen_at, NOW()), updated_at = NOW()
WHERE id = $1 AND user_id = $2`, id, userID)
return err
}
func (s *Store) SoftDeleteNotification(ctx context.Context, id, userID uuid.UUID) error {
tag, err := s.pool.Exec(ctx, `
UPDATE notification.notifications SET deleted_at = NOW()
WHERE id = $1 AND user_id = $2 AND deleted_at IS NULL`, id, userID)
if err != nil {
return err
}
if tag.RowsAffected() == 0 {
return fmt.Errorf("notification not found")
}
return nil
}
func (s *Store) CountUnread(ctx context.Context, userID uuid.UUID) (int64, error) {
var count int64
err := s.pool.QueryRow(ctx,
`SELECT COUNT(*) FROM notification.notifications WHERE user_id = $1 AND seen = FALSE AND deleted_at IS NULL`, userID).Scan(&count)
return count, err
}
// ── Preferences ───────────────────────────────────────────────────────────────
func (s *Store) ListPreferences(ctx context.Context, userID uuid.UUID) ([]*models.NotificationPreference, error) {
rows, err := s.pool.Query(ctx, `
SELECT id, user_id, notification_type::text, channel::text, enabled, updated_at
FROM notification.notification_preferences WHERE user_id = $1 ORDER BY notification_type, channel`, userID)
if err != nil {
return nil, err
}
defer rows.Close()
var out []*models.NotificationPreference
for rows.Next() {
p := &models.NotificationPreference{}
if err := rows.Scan(&p.ID, &p.UserID, &p.NotificationType, &p.Channel, &p.Enabled, &p.UpdatedAt); err != nil {
return nil, err
}
out = append(out, p)
}
return out, rows.Err()
}
func (s *Store) UpsertPreference(ctx context.Context, userID uuid.UUID, req *models.UpdatePreferenceRequest) error {
_, err := s.pool.Exec(ctx, `
INSERT INTO notification.notification_preferences (user_id, notification_type, channel, enabled)
VALUES ($1, $2::notification_kind, $3::delivery_channel, $4)
ON CONFLICT (user_id, notification_type, channel) DO UPDATE
SET enabled = EXCLUDED.enabled, updated_at = NOW()`,
userID, req.NotificationType, req.Channel, req.Enabled)
return err
}
// ── Templates (admin) ─────────────────────────────────────────────────────────
func (s *Store) ListTemplates(ctx context.Context, tenantID *uuid.UUID) ([]*models.NotificationTemplate, error) {
var rows pgx.Rows
var err error
if tenantID != nil {
rows, err = s.pool.Query(ctx, `
SELECT id, tenant_id, code, channel::text, locale, subject, body_text, body_html,
push_title, push_body, push_icon, variables_schema::text, is_active, created_at, updated_at
FROM notification.notification_templates
WHERE (tenant_id = $1 OR tenant_id IS NULL) AND is_active = TRUE
ORDER BY code, channel, locale`, *tenantID)
} else {
rows, err = s.pool.Query(ctx, `
SELECT id, tenant_id, code, channel::text, locale, subject, body_text, body_html,
push_title, push_body, push_icon, variables_schema::text, is_active, created_at, updated_at
FROM notification.notification_templates
ORDER BY code, channel, locale`)
}
if err != nil {
return nil, err
}
defer rows.Close()
return scanTemplates(rows)
}
func (s *Store) UpsertTemplate(ctx context.Context, tenantID *uuid.UUID, req *models.TemplateUpsertRequest) (*models.NotificationTemplate, error) {
isActive := true
if req.IsActive != nil {
isActive = *req.IsActive
}
var id uuid.UUID
err := s.pool.QueryRow(ctx, `
INSERT INTO notification.notification_templates
(tenant_id, code, channel, locale, subject, body_text, body_html,
push_title, push_body, push_icon, is_active)
VALUES ($1,$2,$3::delivery_channel,$4,$5,$6,$7,$8,$9,$10,$11)
ON CONFLICT (tenant_id, code, channel, locale) DO UPDATE SET
subject = EXCLUDED.subject, body_text = EXCLUDED.body_text, body_html = EXCLUDED.body_html,
push_title = EXCLUDED.push_title, push_body = EXCLUDED.push_body, push_icon = EXCLUDED.push_icon,
is_active = EXCLUDED.is_active, updated_at = NOW()
RETURNING id`,
tenantID, req.Code, req.Channel, req.Locale, req.Subject, req.BodyText, req.BodyHTML,
req.PushTitle, req.PushBody, req.PushIcon, isActive,
).Scan(&id)
if err != nil {
return nil, err
}
rows, err := s.pool.Query(ctx, `
SELECT id, tenant_id, code, channel::text, locale, subject, body_text, body_html,
push_title, push_body, push_icon, variables_schema::text, is_active, created_at, updated_at
FROM notification.notification_templates WHERE id = $1`, id)
if err != nil {
return nil, err
}
defer rows.Close()
tpls, err := scanTemplates(rows)
if err != nil || len(tpls) == 0 {
return nil, err
}
return tpls[0], nil
}
// ── Deliveries ────────────────────────────────────────────────────────────────
func (s *Store) CreateDelivery(ctx context.Context, notifID uuid.UUID, userID, tenantID uuid.UUID, channel, recipient string) error {
_, err := s.pool.Exec(ctx, `
INSERT INTO notification.notification_deliveries
(tenant_id, user_id, notification_id, channel, recipient, status)
VALUES ($1,$2,$3,$4::delivery_channel,$5,'Pending')`,
tenantID, userID, notifID, channel, recipient)
return err
}
func (s *Store) UpdateDeliveryStatus(ctx context.Context, id uuid.UUID, status, errMsg *string, providerMsgID *string) error {
_, err := s.pool.Exec(ctx, `
UPDATE notification.notification_deliveries SET
status = $1::delivery_status_kind,
error_message = $2,
provider_message_id = $3,
sent_at = CASE WHEN $1 = 'Sent' THEN NOW() ELSE sent_at END,
delivered_at = CASE WHEN $1 = 'Delivered' THEN NOW() ELSE delivered_at END,
failed_at = CASE WHEN $1 IN ('Failed','Bounced') THEN NOW() ELSE failed_at END,
updated_at = NOW()
WHERE id = $4`,
status, errMsg, providerMsgID, id)
return err
}
// ── Scanners ─────────────────────────────────────────────────────────────────
func scanNotifications(rows pgx.Rows) ([]*models.Notification, error) {
var out []*models.Notification
for rows.Next() {
n := &models.Notification{}
if err := rows.Scan(
&n.ID, &n.TenantID, &n.UserID,
&n.NotificationType, &n.Priority,
&n.Title, &n.Message, &n.Label, &n.Signature, &n.Icon, &n.Image, &n.AnimationDemo, &n.Design,
&n.ActionURL, &n.ActionText,
&n.RenderJobID, &n.ExportID, &n.PaymentID, &n.GiftID, &n.EarnedGiftID,
&n.IsEmergency, &n.Seen, &n.SeenAt, &n.Clicked, &n.ClickedAt, &n.GiftUsed,
&n.ExpireDate, &n.CreatedAt, &n.UpdatedAt,
); err != nil {
return nil, err
}
out = append(out, n)
}
return out, rows.Err()
}
func scanTemplates(rows pgx.Rows) ([]*models.NotificationTemplate, error) {
var out []*models.NotificationTemplate
for rows.Next() {
t := &models.NotificationTemplate{}
var varsSchema *string
if err := rows.Scan(&t.ID, &t.TenantID, &t.Code, &t.Channel, &t.Locale,
&t.Subject, &t.BodyText, &t.BodyHTML, &t.PushTitle, &t.PushBody, &t.PushIcon,
&varsSchema, &t.IsActive, &t.CreatedAt, &t.UpdatedAt); err != nil {
return nil, err
}
t.VariablesSchema = varsSchema
out = append(out, t)
}
return out, rows.Err()
}
// ── Cleanup helpers ───────────────────────────────────────────────────────────
func (s *Store) PurgeExpiredNotifications(ctx context.Context) (int64, error) {
tag, err := s.pool.Exec(ctx, `
UPDATE notification.notifications
SET deleted_at = NOW()
WHERE expire_date < $1 AND deleted_at IS NULL`, time.Now())
return tag.RowsAffected(), err
}