| 1 | // SPDX-License-Identifier: AGPL-3.0-or-later |
| 2 | |
| 3 | package notif |
| 4 | |
| 5 | import ( |
| 6 | "context" |
| 7 | "encoding/json" |
| 8 | "fmt" |
| 9 | |
| 10 | "github.com/jackc/pgx/v5" |
| 11 | "github.com/jackc/pgx/v5/pgtype" |
| 12 | "github.com/jackc/pgx/v5/pgxpool" |
| 13 | |
| 14 | socialdb "github.com/tenseleyFlow/shithub/internal/social/sqlc" |
| 15 | "github.com/tenseleyFlow/shithub/internal/worker" |
| 16 | ) |
| 17 | |
| 18 | // Event is the call-site shape for an emitter. The notif package |
| 19 | // owns the schema-side details (which columns are NULL, payload |
| 20 | // shape, JSON marshal); callers fill in the semantic fields. |
| 21 | // |
| 22 | // Mentions, when present, are surfaced to the fan-out worker via the |
| 23 | // `mentions` JSON key in payload — kept here so callers don't have |
| 24 | // to remember the wire convention. |
| 25 | type Event struct { |
| 26 | ActorUserID int64 // 0 → system event (NULL in DB). |
| 27 | Kind string // e.g. "issue_comment_created", "pr_review_requested". |
| 28 | RepoID int64 // 0 → user-scoped event (NULL). |
| 29 | SourceKind string // "issue", "pull", "repo", "user", … |
| 30 | SourceID int64 |
| 31 | Public bool // matches repo visibility (caller decides). |
| 32 | Mentions []int64 // resolved user-ids to fan out @-mentions to. |
| 33 | Extra map[string]any |
| 34 | } |
| 35 | |
| 36 | // Emit inserts one row into domain_events and wakes the worker pool. |
| 37 | // Use within a tx via EmitTx when the emit must atomically commit |
| 38 | // with the source change (the typical case — issue.Create + the |
| 39 | // matching event row land together or not at all). |
| 40 | // |
| 41 | // The emit side is intentionally thin: the routing matrix and |
| 42 | // recipient computation live entirely in the fan-out worker so |
| 43 | // callers don't grow a per-kind switch. |
| 44 | func Emit(ctx context.Context, pool *pgxpool.Pool, ev Event) error { |
| 45 | if err := emitInto(ctx, pool, ev); err != nil { |
| 46 | return err |
| 47 | } |
| 48 | // Wake the worker pool. Best-effort — the cron-driven schedule |
| 49 | // catches up if NOTIFY is dropped (LISTEN reconnects on its own). |
| 50 | _ = worker.Notify(ctx, pool) |
| 51 | return nil |
| 52 | } |
| 53 | |
| 54 | // EmitTx is the in-tx variant. Use it when the source mutation and |
| 55 | // the event row must commit together (almost always). The NOTIFY is |
| 56 | // deferred to commit by Postgres semantics; this matches the rest of |
| 57 | // the worker's enqueue patterns. |
| 58 | func EmitTx(ctx context.Context, tx pgx.Tx, ev Event) error { |
| 59 | return emitInto(ctx, tx, ev) |
| 60 | } |
| 61 | |
| 62 | func emitInto(ctx context.Context, db socialdb.DBTX, ev Event) error { |
| 63 | payload, err := buildPayload(ev) |
| 64 | if err != nil { |
| 65 | return fmt.Errorf("notif emit: payload: %w", err) |
| 66 | } |
| 67 | if _, err := socialdb.New().InsertDomainEvent(ctx, db, socialdb.InsertDomainEventParams{ |
| 68 | ActorUserID: pgInt8(ev.ActorUserID), |
| 69 | Kind: ev.Kind, |
| 70 | RepoID: pgInt8(ev.RepoID), |
| 71 | SourceKind: ev.SourceKind, |
| 72 | SourceID: ev.SourceID, |
| 73 | Public: ev.Public, |
| 74 | Payload: payload, |
| 75 | }); err != nil { |
| 76 | return fmt.Errorf("notif emit: insert: %w", err) |
| 77 | } |
| 78 | return nil |
| 79 | } |
| 80 | |
| 81 | func buildPayload(ev Event) ([]byte, error) { |
| 82 | out := map[string]any{} |
| 83 | for k, v := range ev.Extra { |
| 84 | out[k] = v |
| 85 | } |
| 86 | if len(ev.Mentions) > 0 { |
| 87 | out["mentions"] = ev.Mentions |
| 88 | } |
| 89 | if len(out) == 0 { |
| 90 | return []byte("{}"), nil |
| 91 | } |
| 92 | return json.Marshal(out) |
| 93 | } |
| 94 | |
| 95 | func pgInt8(v int64) pgtype.Int8 { |
| 96 | return pgtype.Int8{Int64: v, Valid: v != 0} |
| 97 | } |
| 98 |