Go · 3122 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 package notif
4
5 import (
6 "context"
7 "encoding/json"
8 "fmt"
9
10 "github.com/jackc/pgx/v5"
11 "github.com/jackc/pgx/v5/pgtype"
12 "github.com/jackc/pgx/v5/pgxpool"
13
14 socialdb "github.com/tenseleyFlow/shithub/internal/social/sqlc"
15 "github.com/tenseleyFlow/shithub/internal/worker"
16 )
17
18 // Event is the call-site shape for an emitter. The notif package
19 // owns the schema-side details (which columns are NULL, payload
20 // shape, JSON marshal); callers fill in the semantic fields.
21 //
22 // Mentions, when present, are surfaced to the fan-out worker via the
23 // `mentions` JSON key in payload — kept here so callers don't have
24 // to remember the wire convention.
25 type Event struct {
26 ActorUserID int64 // 0 → system event (NULL in DB).
27 Kind string // e.g. "issue_comment_created", "pr_review_requested".
28 RepoID int64 // 0 → user-scoped event (NULL).
29 SourceKind string // "issue", "pull", "repo", "user", …
30 SourceID int64
31 Public bool // matches repo visibility (caller decides).
32 Mentions []int64 // resolved user-ids to fan out @-mentions to.
33 Extra map[string]any
34 }
35
36 // Emit inserts one row into domain_events and wakes the worker pool.
37 // Use within a tx via EmitTx when the emit must atomically commit
38 // with the source change (the typical case — issue.Create + the
39 // matching event row land together or not at all).
40 //
41 // The emit side is intentionally thin: the routing matrix and
42 // recipient computation live entirely in the fan-out worker so
43 // callers don't grow a per-kind switch.
44 func Emit(ctx context.Context, pool *pgxpool.Pool, ev Event) error {
45 if err := emitInto(ctx, pool, ev); err != nil {
46 return err
47 }
48 // Wake the worker pool. Best-effort — the cron-driven schedule
49 // catches up if NOTIFY is dropped (LISTEN reconnects on its own).
50 _ = worker.Notify(ctx, pool)
51 return nil
52 }
53
54 // EmitTx is the in-tx variant. Use it when the source mutation and
55 // the event row must commit together (almost always). The NOTIFY is
56 // deferred to commit by Postgres semantics; this matches the rest of
57 // the worker's enqueue patterns.
58 func EmitTx(ctx context.Context, tx pgx.Tx, ev Event) error {
59 return emitInto(ctx, tx, ev)
60 }
61
62 func emitInto(ctx context.Context, db socialdb.DBTX, ev Event) error {
63 payload, err := buildPayload(ev)
64 if err != nil {
65 return fmt.Errorf("notif emit: payload: %w", err)
66 }
67 if _, err := socialdb.New().InsertDomainEvent(ctx, db, socialdb.InsertDomainEventParams{
68 ActorUserID: pgInt8(ev.ActorUserID),
69 Kind: ev.Kind,
70 RepoID: pgInt8(ev.RepoID),
71 SourceKind: ev.SourceKind,
72 SourceID: ev.SourceID,
73 Public: ev.Public,
74 Payload: payload,
75 }); err != nil {
76 return fmt.Errorf("notif emit: insert: %w", err)
77 }
78 return nil
79 }
80
81 func buildPayload(ev Event) ([]byte, error) {
82 out := map[string]any{}
83 for k, v := range ev.Extra {
84 out[k] = v
85 }
86 if len(ev.Mentions) > 0 {
87 out["mentions"] = ev.Mentions
88 }
89 if len(out) == 0 {
90 return []byte("{}"), nil
91 }
92 return json.Marshal(out)
93 }
94
95 func pgInt8(v int64) pgtype.Int8 {
96 return pgtype.Int8{Int64: v, Valid: v != 0}
97 }
98