tenseleyflow/shithub / 424bde8

Browse files

S29: notifications schema, sqlc, fan-out worker core

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
424bde8a72c6ee514f982066193bea1d74b8c454
Parents
708189e
Tree
610643c

16 changed files

StatusFile+-
A internal/migrationsfs/migrations/0033_notifications.sql 139 0
A internal/notif/email.go 255 0
A internal/notif/fanout.go 515 0
A internal/notif/notif.go 77 0
A internal/notif/queries/cursor.sql 27 0
A internal/notif/queries/email_log.sql 27 0
A internal/notif/queries/notifications.sql 90 0
A internal/notif/queries/threads.sql 37 0
A internal/notif/routing.go 165 0
A internal/notif/sqlc/cursor.sql.go 93 0
A internal/notif/sqlc/db.go 25 0
A internal/notif/sqlc/email_log.sql.go 93 0
A internal/notif/sqlc/models.go 1404 0
A internal/notif/sqlc/notifications.sql.go 332 0
A internal/notif/sqlc/querier.go 74 0
A internal/notif/sqlc/threads.sql.go 152 0
internal/migrationsfs/migrations/0033_notifications.sqladded
@@ -0,0 +1,139 @@
1
+-- SPDX-License-Identifier: AGPL-3.0-or-later
2
+--
3
+-- S29 notifications inbox + per-thread subscription + email send log.
4
+--
5
+-- `domain_events` was already shipped in S26 (per the S00–S25
6
+-- audit's forward-deferral). The fan-out worker this sprint lands
7
+-- consumes from there.
8
+--
9
+-- Schema decisions:
10
+--
11
+-- * `notifications` is the user-facing inbox. One row per
12
+--   (recipient, thread). Multiple events on the same thread coalesce
13
+--   into the same row (last_event_at + last_actor_user_id update).
14
+--
15
+-- * `notification_threads` is the per-user explicit subscription
16
+--   override on top of the per-repo `watches` rule. A row here lets
17
+--   the user pin themselves to a thread (or ignore it) regardless
18
+--   of the repo-level watch.
19
+--
20
+-- * `notification_email_log` is for de-dup, abuse triage, and the
21
+--   storm dampener's per-thread cap.
22
+--
23
+-- * `domain_events_processed` records the fan-out worker's cursor
24
+--   so a restart resumes where it stopped, and so the worker can
25
+--   detect drift if a row is somehow consumed twice. Keyed
26
+--   (consumer, last_event_id) so S33 webhooks shares the table
27
+--   shape with its own consumer name later.
28
+
29
+-- +goose Up
30
+
31
+CREATE TYPE notification_thread_kind AS ENUM ('issue', 'pr');
32
+
33
+CREATE TABLE notifications (
34
+    id                  bigserial   PRIMARY KEY,
35
+    recipient_user_id   bigint      NOT NULL REFERENCES users(id) ON DELETE CASCADE,
36
+    kind                text        NOT NULL,
37
+    reason              text        NOT NULL,
38
+    repo_id             bigint      REFERENCES repos(id) ON DELETE CASCADE,
39
+    thread_kind         notification_thread_kind,
40
+    thread_id           bigint,
41
+    -- source_event_id points at the most recent domain_events row
42
+    -- this notification was last touched by. Useful for debugging
43
+    -- + for the read-state UI ("which event did I last see?").
44
+    source_event_id     bigint,
45
+    unread              boolean     NOT NULL DEFAULT true,
46
+    last_event_at       timestamptz NOT NULL DEFAULT now(),
47
+    last_actor_user_id  bigint      REFERENCES users(id) ON DELETE SET NULL,
48
+    summary             jsonb       NOT NULL DEFAULT '{}'::jsonb,
49
+    created_at          timestamptz NOT NULL DEFAULT now(),
50
+    updated_at          timestamptz NOT NULL DEFAULT now(),
51
+
52
+    CONSTRAINT notifications_kind_length CHECK (char_length(kind) BETWEEN 1 AND 64),
53
+    CONSTRAINT notifications_reason_length CHECK (char_length(reason) BETWEEN 1 AND 32)
54
+);
55
+
56
+-- Inbox view: "all my notifications, recent first."
57
+CREATE INDEX notifications_recipient_recent_idx
58
+    ON notifications (recipient_user_id, last_event_at DESC);
59
+
60
+-- Unread badge count: filtered partial index keeps the count cheap
61
+-- even when the recipient has tens of thousands of read rows.
62
+CREATE INDEX notifications_recipient_unread_idx
63
+    ON notifications (recipient_user_id)
64
+    WHERE unread = true;
65
+
66
+-- Coalesce lookup: "do I already have an inbox row for this thread?"
67
+CREATE UNIQUE INDEX notifications_thread_coalesce_idx
68
+    ON notifications (recipient_user_id, thread_kind, thread_id)
69
+    WHERE thread_id IS NOT NULL;
70
+
71
+CREATE TRIGGER set_updated_at BEFORE UPDATE ON notifications
72
+    FOR EACH ROW EXECUTE FUNCTION tg_set_updated_at();
73
+
74
+-- ─── per-thread subscription ────────────────────────────────────────
75
+
76
+CREATE TABLE notification_threads (
77
+    recipient_user_id bigint                      NOT NULL REFERENCES users(id) ON DELETE CASCADE,
78
+    thread_kind       notification_thread_kind    NOT NULL,
79
+    thread_id         bigint                      NOT NULL,
80
+    subscribed        boolean                     NOT NULL,
81
+    reason            text                        NOT NULL,
82
+    updated_at        timestamptz                 NOT NULL DEFAULT now(),
83
+    PRIMARY KEY (recipient_user_id, thread_kind, thread_id)
84
+);
85
+
86
+CREATE INDEX notification_threads_thread_idx
87
+    ON notification_threads (thread_kind, thread_id);
88
+
89
+CREATE TRIGGER set_updated_at BEFORE UPDATE ON notification_threads
90
+    FOR EACH ROW EXECUTE FUNCTION tg_set_updated_at();
91
+
92
+-- ─── email send log (storm dampener + abuse triage) ─────────────────
93
+
94
+CREATE TABLE notification_email_log (
95
+    id                bigserial   PRIMARY KEY,
96
+    recipient_user_id bigint      NOT NULL REFERENCES users(id) ON DELETE CASCADE,
97
+    notification_id   bigint      REFERENCES notifications(id) ON DELETE SET NULL,
98
+    thread_kind       notification_thread_kind,
99
+    thread_id         bigint,
100
+    sent_at           timestamptz NOT NULL DEFAULT now(),
101
+    message_id        text
102
+);
103
+
104
+-- Storm dampener: "did I email this recipient about this thread in
105
+-- the last N minutes?" The partial-index shape isn't great because
106
+-- the predicate is time-relative; just key on the lookup columns
107
+-- and let the planner narrow.
108
+CREATE INDEX notification_email_log_recent_idx
109
+    ON notification_email_log (recipient_user_id, thread_kind, thread_id, sent_at DESC);
110
+
111
+-- Per-recipient absolute rate cap (e.g. "≤ 100 emails per hour"):
112
+-- recency-sorted by recipient.
113
+CREATE INDEX notification_email_log_recipient_idx
114
+    ON notification_email_log (recipient_user_id, sent_at DESC);
115
+
116
+-- ─── fan-out cursor ────────────────────────────────────────────────
117
+
118
+CREATE TABLE domain_events_processed (
119
+    consumer        text        NOT NULL,
120
+    last_event_id   bigint      NOT NULL,
121
+    updated_at      timestamptz NOT NULL DEFAULT now(),
122
+    PRIMARY KEY (consumer)
123
+);
124
+
125
+CREATE TRIGGER set_updated_at BEFORE UPDATE ON domain_events_processed
126
+    FOR EACH ROW EXECUTE FUNCTION tg_set_updated_at();
127
+
128
+-- Seed the notify cursor at 0 so a fresh DB doesn't trip the
129
+-- "first run" branch in the worker (which would otherwise have
130
+-- to handle the absent-row case).
131
+INSERT INTO domain_events_processed (consumer, last_event_id)
132
+VALUES ('notify_fanout', 0);
133
+
134
+-- +goose Down
135
+DROP TABLE IF EXISTS domain_events_processed;
136
+DROP TABLE IF EXISTS notification_email_log;
137
+DROP TABLE IF EXISTS notification_threads;
138
+DROP TABLE IF EXISTS notifications;
139
+DROP TYPE IF EXISTS notification_thread_kind;
internal/notif/email.goadded
@@ -0,0 +1,255 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package notif
4
+
5
+import (
6
+	"context"
7
+	"crypto/hmac"
8
+	"crypto/sha256"
9
+	"encoding/base64"
10
+	"encoding/json"
11
+	"errors"
12
+	"fmt"
13
+	"html"
14
+	"strconv"
15
+	"strings"
16
+
17
+	"github.com/jackc/pgx/v5"
18
+	"github.com/jackc/pgx/v5/pgtype"
19
+	"github.com/jackc/pgx/v5/pgxpool"
20
+
21
+	"github.com/tenseleyFlow/shithub/internal/auth/email"
22
+	notifdb "github.com/tenseleyFlow/shithub/internal/notif/sqlc"
23
+	usersdb "github.com/tenseleyFlow/shithub/internal/users/sqlc"
24
+)
25
+
26
+// sendNotificationEmail renders + sends one notification email and
27
+// records it in notification_email_log. Idempotency is the
28
+// dampener's job, not ours; we send what we're told to send.
29
+//
30
+// The email body is intentionally minimal in v1: a plaintext line
31
+// summarising the reason + a link to the thread. Per-kind HTML
32
+// templates land in a follow-up commit (or the next sprint that
33
+// touches the email surface). The List-Unsubscribe URL is wired
34
+// up so the standards-compliant bits (RFC 8058) are in place from
35
+// day one.
36
+func sendNotificationEmail(
37
+	ctx context.Context, deps Deps,
38
+	recipientID int64,
39
+	notif notifdb.Notification,
40
+	reason Reason,
41
+	threadKind notifdb.NullNotificationThreadKind,
42
+	threadID int64,
43
+) error {
44
+	if deps.EmailSender == nil {
45
+		return nil
46
+	}
47
+	uq := usersdb.New()
48
+	user, err := uq.GetUserByID(ctx, deps.Pool, recipientID)
49
+	if err != nil {
50
+		return fmt.Errorf("load recipient: %w", err)
51
+	}
52
+	if !user.PrimaryEmailID.Valid {
53
+		// No verified primary; nothing to send to.
54
+		return nil
55
+	}
56
+	em, err := uq.GetUserEmailByID(ctx, deps.Pool, user.PrimaryEmailID.Int64)
57
+	if err != nil || !em.Verified {
58
+		return nil
59
+	}
60
+
61
+	threadURL := buildThreadURL(deps.BaseURL, ctx, deps.Pool, notif)
62
+	subject := buildSubject(deps.SiteName, notif, reason)
63
+	plain := buildPlainBody(deps.SiteName, notif, reason, threadURL)
64
+	htmlBody := buildHTMLBody(deps.SiteName, notif, reason, threadURL)
65
+
66
+	// The List-Unsubscribe URL is computed but not yet wired into
67
+	// the Sender — the existing email.Message shape (S05) doesn't
68
+	// carry arbitrary headers. Adding a `Headers map[string]string`
69
+	// is a follow-up that ripples through the SMTP / Postmark
70
+	// backends; tracked in the S29 status block as deferred. The
71
+	// URL is still useful as a body-footer fallback.
72
+	unsubURL := unsubscribeURL(deps.BaseURL, deps.UnsubscribeKey, recipientID,
73
+		threadKind.Valid, threadKindString(threadKind), threadID)
74
+	plain += "\nUnsubscribe: " + unsubURL + "\n"
75
+
76
+	if err := deps.EmailSender.Send(ctx, email.Message{
77
+		From:    deps.EmailFrom,
78
+		To:      string(em.Email),
79
+		Subject: subject,
80
+		HTML:    htmlBody,
81
+		Text:    plain,
82
+	}); err != nil {
83
+		return fmt.Errorf("send: %w", err)
84
+	}
85
+
86
+	// Log the send so the dampener can count it next time.
87
+	if err := notifdb.New().InsertEmailLog(ctx, deps.Pool, notifdb.InsertEmailLogParams{
88
+		RecipientUserID: recipientID,
89
+		NotificationID:  pgInt(notif.ID),
90
+		ThreadKind:      threadKind,
91
+		ThreadID:        pgInt(threadID),
92
+		MessageID:       pgString(""),
93
+	}); err != nil {
94
+		// Non-fatal — the email already went out. Log loudly so
95
+		// the operator notices a logging-side regression.
96
+		if deps.Logger != nil {
97
+			deps.Logger.WarnContext(ctx, "email log insert failed",
98
+				"recipient", recipientID, "error", err)
99
+		}
100
+	}
101
+	return nil
102
+}
103
+
104
+// emailPrefOn reads the user's per-key boolean preference. Defaults
105
+// to true when no row exists (matches the spec's "Email all
106
+// participating threads master toggle, default on").
107
+func emailPrefOn(ctx context.Context, pool *pgxpool.Pool, userID int64, key string) (bool, error) {
108
+	var raw json.RawMessage
109
+	err := pool.QueryRow(ctx,
110
+		`SELECT value FROM user_notification_prefs WHERE user_id = $1 AND key = $2`,
111
+		userID, key,
112
+	).Scan(&raw)
113
+	if err != nil {
114
+		if errors.Is(err, pgx.ErrNoRows) {
115
+			return true, nil
116
+		}
117
+		return true, err
118
+	}
119
+	// Stored as JSON `true` or `false`.
120
+	s := strings.TrimSpace(string(raw))
121
+	return s == "true", nil
122
+}
123
+
124
+// unsubscribeURL builds an HMAC-signed one-click unsubscribe link.
125
+// The URL embeds (recipient_id, thread_kind, thread_id, sig) so
126
+// the handler can verify and act without a session cookie. Key
127
+// rotation invalidates outstanding links — operators are expected
128
+// to keep the key stable.
129
+func unsubscribeURL(baseURL string, key []byte, recipientID int64, hasThread bool, threadKind string, threadID int64) string {
130
+	if !hasThread {
131
+		// No thread → can't unsubscribe at the per-thread level.
132
+		// The footer's link should point at the user's email-prefs
133
+		// page instead. Return that path; the email-template
134
+		// renderer can swap it in.
135
+		return strings.TrimRight(baseURL, "/") + "/settings/notifications"
136
+	}
137
+	rec := strconv.FormatInt(recipientID, 10)
138
+	tid := strconv.FormatInt(threadID, 10)
139
+	payload := rec + ":" + threadKind + ":" + tid
140
+	mac := hmac.New(sha256.New, key)
141
+	mac.Write([]byte(payload))
142
+	sig := base64.RawURLEncoding.EncodeToString(mac.Sum(nil))
143
+	return strings.TrimRight(baseURL, "/") + "/notifications/unsubscribe?u=" + rec +
144
+		"&tk=" + threadKind + "&ti=" + tid + "&sig=" + sig
145
+}
146
+
147
+// VerifyUnsubscribe checks the HMAC. Returns true when the
148
+// signature matches the supplied (recipient, thread) tuple.
149
+func VerifyUnsubscribe(key []byte, recipientID int64, threadKind string, threadID int64, sig string) bool {
150
+	rec := strconv.FormatInt(recipientID, 10)
151
+	tid := strconv.FormatInt(threadID, 10)
152
+	payload := rec + ":" + threadKind + ":" + tid
153
+	mac := hmac.New(sha256.New, key)
154
+	mac.Write([]byte(payload))
155
+	want := base64.RawURLEncoding.EncodeToString(mac.Sum(nil))
156
+	return hmac.Equal([]byte(sig), []byte(want))
157
+}
158
+
159
+func threadKindString(k notifdb.NullNotificationThreadKind) string {
160
+	if !k.Valid {
161
+		return ""
162
+	}
163
+	return string(k.NotificationThreadKind)
164
+}
165
+
166
+func buildSubject(site string, notif notifdb.Notification, reason Reason) string {
167
+	prefix := site
168
+	if prefix == "" {
169
+		prefix = "shithub"
170
+	}
171
+	return "[" + prefix + "] " + string(reason) + ": " + notif.Kind
172
+}
173
+
174
+func buildPlainBody(site string, notif notifdb.Notification, reason Reason, link string) string {
175
+	if site == "" {
176
+		site = "shithub"
177
+	}
178
+	var b strings.Builder
179
+	b.WriteString("You're being notified on " + site + ".\n\n")
180
+	b.WriteString("Reason: " + string(reason) + "\n")
181
+	b.WriteString("Event: " + notif.Kind + "\n")
182
+	if link != "" {
183
+		b.WriteString("\n" + link + "\n")
184
+	}
185
+	b.WriteString("\n— " + site + "\n")
186
+	return b.String()
187
+}
188
+
189
+// buildHTMLBody is the deliberately-minimal HTML mirror of the plain
190
+// body. Per-kind templates land in a follow-up; today the layout is
191
+// site / reason / event / link, all escaped. No CSS, no images — most
192
+// MUAs render this fine and the simpler the markup, the lower the
193
+// spam-score risk.
194
+func buildHTMLBody(site string, notif notifdb.Notification, reason Reason, link string) string {
195
+	if site == "" {
196
+		site = "shithub"
197
+	}
198
+	var b strings.Builder
199
+	b.WriteString("<p>You're being notified on ")
200
+	b.WriteString(html.EscapeString(site))
201
+	b.WriteString(".</p>\n")
202
+	b.WriteString("<p><strong>Reason:</strong> ")
203
+	b.WriteString(html.EscapeString(string(reason)))
204
+	b.WriteString("<br><strong>Event:</strong> ")
205
+	b.WriteString(html.EscapeString(notif.Kind))
206
+	b.WriteString("</p>\n")
207
+	if link != "" {
208
+		b.WriteString(`<p><a href="`)
209
+		b.WriteString(html.EscapeString(link))
210
+		b.WriteString(`">`)
211
+		b.WriteString(html.EscapeString(link))
212
+		b.WriteString("</a></p>\n")
213
+	}
214
+	b.WriteString("<p>— ")
215
+	b.WriteString(html.EscapeString(site))
216
+	b.WriteString("</p>\n")
217
+	return b.String()
218
+}
219
+
220
+// buildThreadURL composes a canonical URL for the thread the
221
+// notification points at. Best-effort: when we can't resolve the
222
+// repo / number we fall back to the notification's inbox row URL.
223
+func buildThreadURL(baseURL string, ctx context.Context, pool *pgxpool.Pool, notif notifdb.Notification) string {
224
+	if !notif.RepoID.Valid || !notif.ThreadID.Valid || !notif.ThreadKind.Valid {
225
+		return strings.TrimRight(baseURL, "/") + "/notifications"
226
+	}
227
+	row, err := notifdb.New().GetNotification(ctx, pool, notif.ID)
228
+	_ = row
229
+	if err != nil {
230
+		return strings.TrimRight(baseURL, "/") + "/notifications"
231
+	}
232
+	// Resolve owner + repo + number with a small ad-hoc query.
233
+	var ownerName, repoName string
234
+	var number int64
235
+	err = pool.QueryRow(ctx, `
236
+		SELECT u.username, r.name, i.number
237
+		FROM repos r
238
+		JOIN users u ON u.id = r.owner_user_id
239
+		LEFT JOIN issues i ON i.id = $1
240
+		WHERE r.id = $2
241
+	`, notif.ThreadID.Int64, notif.RepoID.Int64).Scan(&ownerName, &repoName, &number)
242
+	if err != nil || number == 0 {
243
+		return strings.TrimRight(baseURL, "/") + "/notifications"
244
+	}
245
+	segment := "issues"
246
+	if notif.ThreadKind.NotificationThreadKind == notifdb.NotificationThreadKindPr {
247
+		segment = "pulls"
248
+	}
249
+	return strings.TrimRight(baseURL, "/") + "/" + ownerName + "/" + repoName + "/" + segment + "/" + strconv.FormatInt(number, 10)
250
+}
251
+
252
+func pgInt(v int64) pgtype.Int8 { return pgtype.Int8{Int64: v, Valid: v != 0} }
253
+func pgString(s string) pgtype.Text {
254
+	return pgtype.Text{String: s, Valid: s != ""}
255
+}
internal/notif/fanout.goadded
@@ -0,0 +1,515 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package notif
4
+
5
+import (
6
+	"context"
7
+	"encoding/json"
8
+	"errors"
9
+	"fmt"
10
+
11
+	"github.com/jackc/pgx/v5"
12
+	"github.com/jackc/pgx/v5/pgtype"
13
+
14
+	"github.com/tenseleyFlow/shithub/internal/auth/policy"
15
+	issuesdb "github.com/tenseleyFlow/shithub/internal/issues/sqlc"
16
+	notifdb "github.com/tenseleyFlow/shithub/internal/notif/sqlc"
17
+	reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
18
+	socialdb "github.com/tenseleyFlow/shithub/internal/social/sqlc"
19
+)
20
+
21
+// FanoutConsumer is the consumer name used in
22
+// `domain_events_processed`. Other consumers (S33 webhooks) will
23
+// use distinct names so cursors don't collide.
24
+const FanoutConsumer = "notify_fanout"
25
+
26
+// FanoutOnce drains up to FanoutBatch domain_events starting after
27
+// the persisted cursor, computes recipient sets, and writes inbox
28
+// rows + (optionally) emails. Returns the number of events
29
+// processed so the caller can decide to loop.
30
+//
31
+// Designed to be invoked from a worker job (see
32
+// `internal/worker/jobs/notify_fanout.go`); the cron framework's
33
+// landing date will determine how it's scheduled (the job kind is
34
+// already in the worker registry).
35
+func FanoutOnce(ctx context.Context, deps Deps) (int, error) {
36
+	q := notifdb.New()
37
+	cur, err := q.GetEventCursor(ctx, deps.Pool, FanoutConsumer)
38
+	if err != nil && !errors.Is(err, pgx.ErrNoRows) {
39
+		return 0, fmt.Errorf("fanout: load cursor: %w", err)
40
+	}
41
+	last := int64(0)
42
+	if err == nil {
43
+		last = cur.LastEventID
44
+	}
45
+
46
+	rows, err := q.ListUnprocessedDomainEvents(ctx, deps.Pool, notifdb.ListUnprocessedDomainEventsParams{
47
+		ID:    last,
48
+		Limit: int32(FanoutBatch),
49
+	})
50
+	if err != nil {
51
+		return 0, fmt.Errorf("fanout: list events: %w", err)
52
+	}
53
+
54
+	processed := 0
55
+	for _, ev := range rows {
56
+		if err := dispatchEvent(ctx, deps, ev); err != nil {
57
+			if deps.Logger != nil {
58
+				deps.Logger.WarnContext(ctx, "fanout: dispatch failed",
59
+					"event_id", ev.ID, "kind", ev.Kind, "error", err)
60
+			}
61
+			// Don't advance the cursor past a failure — retry on
62
+			// the next tick. (A poison row would loop forever; the
63
+			// per-event error handling below catches the typical
64
+			// poison shapes and skips them.)
65
+			break
66
+		}
67
+		last = ev.ID
68
+		processed++
69
+	}
70
+
71
+	if processed > 0 {
72
+		if err := q.SetEventCursor(ctx, deps.Pool, notifdb.SetEventCursorParams{
73
+			Consumer: FanoutConsumer, LastEventID: last,
74
+		}); err != nil {
75
+			return processed, fmt.Errorf("fanout: persist cursor: %w", err)
76
+		}
77
+	}
78
+	return processed, nil
79
+}
80
+
81
+// dispatchEvent routes a single domain_events row through the
82
+// recipient computer + per-recipient action engine.
83
+func dispatchEvent(ctx context.Context, deps Deps, ev notifdb.DomainEvent) error {
84
+	actorUserID := int64(0)
85
+	if ev.ActorUserID.Valid {
86
+		actorUserID = ev.ActorUserID.Int64
87
+	}
88
+	repoID := int64(0)
89
+	if ev.RepoID.Valid {
90
+		repoID = ev.RepoID.Int64
91
+	}
92
+
93
+	threadKind, threadID, err := threadFromEvent(ctx, deps, ev)
94
+	if err != nil {
95
+		return err
96
+	}
97
+
98
+	// Compute recipient set. Each recipient is paired with the
99
+	// relation that earned them the slot; the routing matrix
100
+	// decides what to do per (kind, relation).
101
+	recipients, err := computeRecipients(ctx, deps, ev, threadKind, threadID, actorUserID, repoID)
102
+	if err != nil {
103
+		return err
104
+	}
105
+
106
+	// Auto-subscription: insert an "if-absent" thread row for
107
+	// users whose participation rule earned them this notification
108
+	// (author / assignee / reviewer / mentioned / commenter). Their
109
+	// future notifications then route via the subscribed-thread
110
+	// relation without re-deriving the rule.
111
+	if threadID != 0 {
112
+		applyAutoSubscriptions(ctx, deps, ev, threadKind, threadID, recipients)
113
+	}
114
+
115
+	q := notifdb.New()
116
+	for recipID, rec := range recipients {
117
+		// Self-suppression: never notify the actor about their own
118
+		// action. Defense in depth — the recipient computer also
119
+		// excludes them, but the duplicate guard is cheap.
120
+		if recipID == actorUserID {
121
+			continue
122
+		}
123
+		// Visibility re-check at fan-out time. The repo could have
124
+		// flipped public→private between event-emit and fan-out;
125
+		// a stale-read public event must not leak.
126
+		if repoID != 0 {
127
+			ok, err := canRecipientSeeRepo(ctx, deps, recipID, repoID)
128
+			if err != nil {
129
+				return err
130
+			}
131
+			if !ok {
132
+				continue
133
+			}
134
+		}
135
+		action := pickAction(ev.Kind, rec.Relations)
136
+		if !action.NotifyInbox {
137
+			continue
138
+		}
139
+		// `OverrideIgnore=false` AND user has watches.level=ignore
140
+		// on the repo → skip. The OverrideIgnore=true case (i.e.
141
+		// @mentions) bypasses the ignore check.
142
+		if !action.OverrideIgnore && repoID != 0 {
143
+			ignored, err := repoIgnoredByRecipient(ctx, deps, recipID, repoID)
144
+			if err != nil {
145
+				return err
146
+			}
147
+			if ignored {
148
+				continue
149
+			}
150
+		}
151
+		// Materialize the inbox row.
152
+		notif, err := upsertInboxRow(ctx, deps, q, recipID, ev, action.Reason, threadKind, threadID, repoID, actorUserID)
153
+		if err != nil {
154
+			return err
155
+		}
156
+		// Email side. Pref + storm dampener gate.
157
+		if action.EmailDefault && deps.EmailSender != nil {
158
+			if err := maybeSendEmail(ctx, deps, q, recipID, notif, action.Reason, threadKind, threadID); err != nil && deps.Logger != nil {
159
+				deps.Logger.WarnContext(ctx, "fanout: email send",
160
+					"recipient", recipID, "error", err)
161
+			}
162
+		}
163
+	}
164
+	return nil
165
+}
166
+
167
+// canRecipientSeeRepo runs the visibility predicate by hand for
168
+// one recipient. Cheap — repo + collab-row lookup, always indexed.
169
+func canRecipientSeeRepo(ctx context.Context, deps Deps, recipientID, repoID int64) (bool, error) {
170
+	rq := reposdb.New()
171
+	repo, err := rq.GetRepoByID(ctx, deps.Pool, repoID)
172
+	if err != nil {
173
+		if errors.Is(err, pgx.ErrNoRows) {
174
+			return false, nil
175
+		}
176
+		return false, err
177
+	}
178
+	actor := policy.UserActor(recipientID, "", false, false)
179
+	return policy.IsVisibleTo(ctx, policy.Deps{Pool: deps.Pool}, actor, policy.NewRepoRefFromRepo(repo)), nil
180
+}
181
+
182
+// repoIgnoredByRecipient reports whether the recipient has set
183
+// `watches.level = 'ignore'` for the repo.
184
+func repoIgnoredByRecipient(ctx context.Context, deps Deps, recipientID, repoID int64) (bool, error) {
185
+	row, err := socialdb.New().GetWatch(ctx, deps.Pool, socialdb.GetWatchParams{
186
+		UserID: recipientID, RepoID: repoID,
187
+	})
188
+	if err != nil {
189
+		if errors.Is(err, pgx.ErrNoRows) {
190
+			return false, nil
191
+		}
192
+		return false, err
193
+	}
194
+	return row.Level == socialdb.WatchLevelIgnore, nil
195
+}
196
+
197
+// upsertInboxRow either creates a fresh notifications row for a
198
+// (recipient, thread) pair or coalesces onto the existing row by
199
+// bumping last_event_at + last_actor + setting unread=true.
200
+func upsertInboxRow(
201
+	ctx context.Context, deps Deps, q *notifdb.Queries,
202
+	recipientID int64,
203
+	ev notifdb.DomainEvent,
204
+	reason Reason,
205
+	threadKind notifdb.NullNotificationThreadKind,
206
+	threadID int64,
207
+	repoID int64,
208
+	actorUserID int64,
209
+) (notifdb.Notification, error) {
210
+	if threadID == 0 {
211
+		// Thread-less notification (e.g. lifecycle). Each event
212
+		// fires its own row; no coalesce.
213
+		return q.InsertThreadlessNotification(ctx, deps.Pool, notifdb.InsertThreadlessNotificationParams{
214
+			RecipientUserID: recipientID,
215
+			Kind:            ev.Kind,
216
+			Reason:          string(reason),
217
+			RepoID:          intToPg(repoID),
218
+			SourceEventID:   pgtype.Int8{Int64: ev.ID, Valid: true},
219
+			LastActorUserID: intToPg(actorUserID),
220
+		})
221
+	}
222
+	return q.UpsertNotificationByThread(ctx, deps.Pool, notifdb.UpsertNotificationByThreadParams{
223
+		RecipientUserID: recipientID,
224
+		Kind:            ev.Kind,
225
+		Reason:          string(reason),
226
+		RepoID:          intToPg(repoID),
227
+		ThreadKind:      threadKind,
228
+		ThreadID:        pgtype.Int8{Int64: threadID, Valid: true},
229
+		SourceEventID:   pgtype.Int8{Int64: ev.ID, Valid: true},
230
+		LastActorUserID: intToPg(actorUserID),
231
+	})
232
+}
233
+
234
+// pickAction iterates the recipient's relations in priority order
235
+// and returns the first non-Skip Action. The `RelMention` slot is
236
+// most permissive (override-ignore) so it's first.
237
+func pickAction(kind string, rels []Relation) Action {
238
+	for _, rel := range rels {
239
+		if a := Routing(kind, rel); a.NotifyInbox {
240
+			return a
241
+		}
242
+	}
243
+	return Skip
244
+}
245
+
246
+// threadFromEvent extracts the (thread_kind, thread_id) pair when
247
+// the event is thread-shaped (issue or PR). For non-thread events
248
+// (repo-admin lifecycle) returns ({Valid:false}, 0).
249
+func threadFromEvent(ctx context.Context, deps Deps, ev notifdb.DomainEvent) (notifdb.NullNotificationThreadKind, int64, error) {
250
+	switch ev.SourceKind {
251
+	case "issue":
252
+		// payload may carry kind=issue|pr; we resolve via the
253
+		// issues table to be sure.
254
+		issue, err := issuesdb.New().GetIssueByID(ctx, deps.Pool, ev.SourceID)
255
+		if err != nil {
256
+			if errors.Is(err, pgx.ErrNoRows) {
257
+				return notifdb.NullNotificationThreadKind{}, 0, nil
258
+			}
259
+			return notifdb.NullNotificationThreadKind{}, 0, err
260
+		}
261
+		kind := notifdb.NotificationThreadKindIssue
262
+		if issue.Kind == issuesdb.IssueKindPr {
263
+			kind = notifdb.NotificationThreadKindPr
264
+		}
265
+		return notifdb.NullNotificationThreadKind{NotificationThreadKind: kind, Valid: true}, issue.ID, nil
266
+	}
267
+	return notifdb.NullNotificationThreadKind{}, 0, nil
268
+}
269
+
270
+// recipient is one row in the per-event recipient set. Multiple
271
+// relations may apply (e.g. user is both author + assignee); the
272
+// pickAction step picks the highest-priority Action.
273
+type recipient struct {
274
+	Relations []Relation
275
+}
276
+
277
+func (r *recipient) add(rel Relation) {
278
+	for _, existing := range r.Relations {
279
+		if existing == rel {
280
+			return
281
+		}
282
+	}
283
+	r.Relations = append(r.Relations, rel)
284
+}
285
+
286
+// computeRecipients gathers users from every relevant slot:
287
+// * mention list from the event payload (when applicable),
288
+// * thread author / assignee / reviewer (for thread-shaped events),
289
+// * watches at level=all/participating on the repo,
290
+// * explicit thread subscribers,
291
+// * repo owner (for lifecycle events).
292
+//
293
+// The actor is excluded; visibility re-check happens later.
294
+func computeRecipients(
295
+	ctx context.Context, deps Deps,
296
+	ev notifdb.DomainEvent,
297
+	threadKind notifdb.NullNotificationThreadKind,
298
+	threadID int64,
299
+	actorUserID, repoID int64,
300
+) (map[int64]*recipient, error) {
301
+	out := map[int64]*recipient{}
302
+
303
+	// Mentions live in the payload as a `mentions` array of user ids.
304
+	if ev.Payload != nil {
305
+		var p struct {
306
+			Mentions []int64 `json:"mentions"`
307
+		}
308
+		_ = json.Unmarshal(ev.Payload, &p)
309
+		for _, uid := range p.Mentions {
310
+			ensure(out, uid).add(RelMention)
311
+		}
312
+	}
313
+
314
+	// Thread relations (author, assignee, commenter via auto-sub).
315
+	if threadID != 0 {
316
+		issue, err := issuesdb.New().GetIssueByID(ctx, deps.Pool, threadID)
317
+		if err == nil {
318
+			if issue.AuthorUserID.Valid {
319
+				ensure(out, issue.AuthorUserID.Int64).add(RelAuthor)
320
+			}
321
+			// Assignees from issue_assignees.
322
+			assignees, err := issuesdb.New().ListIssueAssignees(ctx, deps.Pool, threadID)
323
+			if err == nil {
324
+				for _, a := range assignees {
325
+					ensure(out, a.UserID).add(RelAssignee)
326
+				}
327
+			}
328
+		}
329
+		// Explicit thread subscribers.
330
+		subs, err := notifdb.New().ListSubscribersForThread(ctx, deps.Pool, notifdb.ListSubscribersForThreadParams{
331
+			ThreadKind: threadKind.NotificationThreadKind,
332
+			ThreadID:   threadID,
333
+		})
334
+		if err == nil {
335
+			for _, s := range subs {
336
+				ensure(out, s.RecipientUserID).add(RelSubscribedThread)
337
+			}
338
+		}
339
+	}
340
+
341
+	// Repo watchers at level=all (relevant for "new issue" /
342
+	// "new PR" events). The fan-out matrix decides which kinds
343
+	// actually consume this slot — so populating it for every
344
+	// thread-shaped event is fine (skipped at routing time).
345
+	if repoID != 0 {
346
+		watchers, err := socialdb.New().ListRepoWatchersByLevel(ctx, deps.Pool, socialdb.ListRepoWatchersByLevelParams{
347
+			RepoID: repoID, Level: socialdb.WatchLevelAll,
348
+		})
349
+		if err == nil {
350
+			for _, uid := range watchers {
351
+				ensure(out, uid).add(RelWatchingAll)
352
+			}
353
+		}
354
+	}
355
+
356
+	// Repo-owner slot for lifecycle events.
357
+	if repoID != 0 && isLifecycleEvent(ev.Kind) {
358
+		repo, err := reposdb.New().GetRepoByID(ctx, deps.Pool, repoID)
359
+		if err == nil && repo.OwnerUserID.Valid {
360
+			ensure(out, repo.OwnerUserID.Int64).add(RelRepoOwner)
361
+		}
362
+	}
363
+
364
+	// Reviewer slot for review_requested events. Payload carries
365
+	// the recipient id.
366
+	if ev.Kind == "review_requested" && ev.Payload != nil {
367
+		var p struct {
368
+			ReviewerUserID int64 `json:"reviewer_user_id"`
369
+		}
370
+		_ = json.Unmarshal(ev.Payload, &p)
371
+		if p.ReviewerUserID != 0 {
372
+			ensure(out, p.ReviewerUserID).add(RelReviewer)
373
+		}
374
+	}
375
+
376
+	// Self-suppression: drop the actor. We still defense-in-depth
377
+	// it at the dispatch site, but trimming early saves per-
378
+	// recipient policy lookups.
379
+	if actorUserID != 0 {
380
+		delete(out, actorUserID)
381
+	}
382
+	return out, nil
383
+}
384
+
385
+// applyAutoSubscriptions inserts notification_threads rows for the
386
+// recipients that just earned a slot via author / assignee /
387
+// reviewer / mention / first-commenter rules. Non-destructive —
388
+// preserves explicit user choices.
389
+func applyAutoSubscriptions(
390
+	ctx context.Context, deps Deps,
391
+	ev notifdb.DomainEvent,
392
+	threadKind notifdb.NullNotificationThreadKind,
393
+	threadID int64,
394
+	recipients map[int64]*recipient,
395
+) {
396
+	q := notifdb.New()
397
+	for uid, rec := range recipients {
398
+		var reason string
399
+		var subscribed bool
400
+		// Pick the strongest auto-sub reason that applies.
401
+		for _, rel := range rec.Relations {
402
+			switch rel {
403
+			case RelAuthor:
404
+				reason, subscribed = "author", true
405
+			case RelAssignee:
406
+				reason, subscribed = "assigned", true
407
+			case RelReviewer:
408
+				reason, subscribed = "review_requested", true
409
+			case RelMention:
410
+				if reason == "" {
411
+					reason, subscribed = "mention", true
412
+				}
413
+			case RelCommenter:
414
+				if reason == "" {
415
+					reason, subscribed = "commenter", true
416
+				}
417
+			}
418
+		}
419
+		if reason == "" {
420
+			continue
421
+		}
422
+		_ = q.InsertNotificationThreadIfAbsent(ctx, deps.Pool, notifdb.InsertNotificationThreadIfAbsentParams{
423
+			RecipientUserID: uid,
424
+			ThreadKind:      threadKind.NotificationThreadKind,
425
+			ThreadID:        threadID,
426
+			Subscribed:      subscribed,
427
+			Reason:          reason,
428
+		})
429
+	}
430
+}
431
+
432
+// maybeSendEmail consults the user's per-kind email pref + the
433
+// storm dampener, then either dispatches the email or skips. Logs
434
+// every attempt to notification_email_log so the dampener is
435
+// self-consistent across worker restarts.
436
+func maybeSendEmail(
437
+	ctx context.Context, deps Deps, q *notifdb.Queries,
438
+	recipientID int64,
439
+	notif notifdb.Notification,
440
+	reason Reason,
441
+	threadKind notifdb.NullNotificationThreadKind,
442
+	threadID int64,
443
+) error {
444
+	prefKey := EmailPrefKey(reason)
445
+	if prefKey != "" {
446
+		on, err := emailPrefOn(ctx, deps.Pool, recipientID, prefKey)
447
+		if err != nil {
448
+			return err
449
+		}
450
+		if !on {
451
+			return nil
452
+		}
453
+	}
454
+
455
+	// Per-thread dampener.
456
+	if threadID != 0 {
457
+		count, err := q.CountEmailsForRecipientThreadSince(ctx, deps.Pool, notifdb.CountEmailsForRecipientThreadSinceParams{
458
+			RecipientUserID: recipientID,
459
+			ThreadKind:      threadKind,
460
+			ThreadID:        pgtype.Int8{Int64: threadID, Valid: true},
461
+			Column4:         StormPerThreadMins,
462
+		})
463
+		if err != nil {
464
+			return err
465
+		}
466
+		if count >= StormPerThreadCap {
467
+			return nil
468
+		}
469
+	}
470
+	// Per-recipient absolute cap.
471
+	count, err := q.CountEmailsForRecipientSince(ctx, deps.Pool, notifdb.CountEmailsForRecipientSinceParams{
472
+		RecipientUserID: recipientID,
473
+		Column2:         StormAbsoluteMins,
474
+	})
475
+	if err != nil {
476
+		return err
477
+	}
478
+	if count >= StormAbsoluteCap {
479
+		return nil
480
+	}
481
+
482
+	// We don't actually render + send here — that's a separate
483
+	// helper (`sendNotificationEmail`). Logging the send here is
484
+	// a layering violation; the helper logs after the SMTP call
485
+	// succeeds. The helper isn't ready in this commit; the gate
486
+	// stays so the storm dampener is wired even if no template
487
+	// renders today.
488
+	return sendNotificationEmail(ctx, deps, recipientID, notif, reason, threadKind, threadID)
489
+}
490
+
491
+// ensure returns the *recipient for uid, creating it on first add.
492
+func ensure(m map[int64]*recipient, uid int64) *recipient {
493
+	if r, ok := m[uid]; ok {
494
+		return r
495
+	}
496
+	r := &recipient{}
497
+	m[uid] = r
498
+	return r
499
+}
500
+
501
+func intToPg(v int64) pgtype.Int8 {
502
+	return pgtype.Int8{Int64: v, Valid: v != 0}
503
+}
504
+
505
+func isLifecycleEvent(kind string) bool {
506
+	switch kind {
507
+	case "repo_archived", "repo_unarchived",
508
+		"repo_visibility_changed", "repo_soft_deleted",
509
+		"repo_restored", "repo_transfer_requested",
510
+		"repo_transfer_accepted", "repo_transfer_declined",
511
+		"repo_transfer_canceled", "repo_transfer_expired":
512
+		return true
513
+	}
514
+	return false
515
+}
internal/notif/notif.goadded
@@ -0,0 +1,77 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+// Package notif owns S29's notification fan-out: consume from
4
+// `domain_events`, compute the recipient set per the routing
5
+// matrix, materialize inbox rows + email-side dispatch.
6
+//
7
+// The recipient computation is the only interesting code here. The
8
+// routing matrix is data (a Go map keyed on event-kind +
9
+// recipient-relation), the auto-subscription rules are a tiny set
10
+// of "if absent, insert", and the storm dampener is a per-recipient,
11
+// per-thread, time-window count.
12
+//
13
+// Visibility re-check at fan-out is the security boundary: a repo
14
+// can flip from public to private between event-emit and fan-out;
15
+// a stale-read public event must not produce a notification for a
16
+// recipient who can no longer see the underlying resource.
17
+package notif
18
+
19
+import (
20
+	"errors"
21
+	"log/slog"
22
+
23
+	"github.com/jackc/pgx/v5/pgxpool"
24
+
25
+	"github.com/tenseleyFlow/shithub/internal/auth/email"
26
+)
27
+
28
+// Deps wires the package against the rest of the runtime.
29
+//
30
+// EmailSender is optional: when nil, notifications still land in
31
+// the inbox but no email goes out. Useful in tests + during local
32
+// dev when the operator hasn't configured SMTP. EmailFrom is the
33
+// default From address; per-template overrides are post-MVP.
34
+//
35
+// SiteName is the human-readable site name used in subject lines /
36
+// list-unsubscribe footers ("shithub" by default; operators can
37
+// re-brand for self-hosted instances).
38
+//
39
+// BaseURL is the public-facing scheme+host (e.g.
40
+// "https://shithub.example") used to build canonical links in
41
+// email bodies + the unsubscribe URL.
42
+//
43
+// UnsubscribeKey is the HMAC-SHA256 key for the one-click
44
+// list-unsubscribe URL. Rotating the key invalidates outstanding
45
+// unsubscribe links — operators are expected to keep it stable.
46
+type Deps struct {
47
+	Pool           *pgxpool.Pool
48
+	Logger         *slog.Logger
49
+	EmailSender    email.Sender
50
+	EmailFrom      string
51
+	SiteName       string
52
+	BaseURL        string
53
+	UnsubscribeKey []byte
54
+}
55
+
56
+// Errors surfaced by the orchestrator.
57
+var (
58
+	ErrNotFound        = errors.New("notif: notification not found")
59
+	ErrUnauthorized    = errors.New("notif: not your notification")
60
+	ErrUnsubscribeBad  = errors.New("notif: invalid unsubscribe token")
61
+)
62
+
63
+// FanoutBatch is the per-tick cap on how many domain_events the
64
+// fan-out worker drains in one call. Bounded so a single tick can't
65
+// monopolize the worker pool when a backlog accumulates (e.g. after
66
+// a deploy that takes the worker offline for a moment).
67
+const FanoutBatch = 200
68
+
69
+// StormDampener defaults — one email per recipient per thread per
70
+// 10 minutes (the spec's day-1 lean). Per-recipient absolute cap
71
+// is 100 emails per hour.
72
+const (
73
+	StormPerThreadCap   = 1
74
+	StormPerThreadMins  = 10
75
+	StormAbsoluteCap    = 100
76
+	StormAbsoluteMins   = 60
77
+)
internal/notif/queries/cursor.sqladded
@@ -0,0 +1,27 @@
1
+-- ─── domain_events_processed ──────────────────────────────────────
2
+
3
+-- name: GetEventCursor :one
4
+SELECT consumer, last_event_id, updated_at
5
+FROM domain_events_processed
6
+WHERE consumer = $1;
7
+
8
+-- name: SetEventCursor :exec
9
+-- Always-write upsert so the worker doesn't have to special-case
10
+-- the missing-row branch (the migration seeds 'notify_fanout' at
11
+-- 0; future consumers like 'webhook_deliver' do the same on first
12
+-- run via this same call).
13
+INSERT INTO domain_events_processed (consumer, last_event_id)
14
+VALUES ($1, $2)
15
+ON CONFLICT (consumer)
16
+DO UPDATE SET last_event_id = EXCLUDED.last_event_id,
17
+              updated_at    = now();
18
+
19
+-- name: ListUnprocessedDomainEvents :many
20
+-- The fan-out worker's read cursor. Bounded so a single tick
21
+-- doesn't try to drain a million-row backlog.
22
+SELECT id, actor_user_id, kind, repo_id, source_kind, source_id,
23
+       public, payload, created_at
24
+FROM domain_events
25
+WHERE id > $1
26
+ORDER BY id
27
+LIMIT $2;
internal/notif/queries/email_log.sqladded
@@ -0,0 +1,27 @@
1
+-- ─── notification_email_log ────────────────────────────────────────
2
+
3
+-- name: InsertEmailLog :exec
4
+-- Records an email send. Caller decides what to bind for thread_id
5
+-- (NULL for thread-less notifications). MessageID is the SMTP /
6
+-- transactional-provider message id when available; empty when
7
+-- the sender doesn't surface one.
8
+INSERT INTO notification_email_log
9
+    (recipient_user_id, notification_id, thread_kind, thread_id, message_id)
10
+VALUES ($1, $2, $3, $4, $5);
11
+
12
+-- name: CountEmailsForRecipientThreadSince :one
13
+-- Storm dampener probe: how many emails for this thread did we
14
+-- send to this recipient in the last $4 minutes? Caller compares
15
+-- to the cap.
16
+SELECT count(*) FROM notification_email_log
17
+WHERE recipient_user_id = $1
18
+  AND thread_kind = $2
19
+  AND thread_id = $3
20
+  AND sent_at > now() - make_interval(mins => $4::int);
21
+
22
+-- name: CountEmailsForRecipientSince :one
23
+-- Per-recipient absolute rate cap: how many total emails to this
24
+-- recipient in the last $2 minutes?
25
+SELECT count(*) FROM notification_email_log
26
+WHERE recipient_user_id = $1
27
+  AND sent_at > now() - make_interval(mins => $2::int);
internal/notif/queries/notifications.sqladded
@@ -0,0 +1,90 @@
1
+-- ─── notifications ─────────────────────────────────────────────────
2
+
3
+-- name: UpsertNotificationByThread :one
4
+-- Coalesce-or-insert: if a row exists for (recipient, thread), bump
5
+-- last_event_at + last_actor + reason and re-flip unread=true so the
6
+-- inbox surfaces it again. Otherwise insert a fresh row.
7
+--
8
+-- Returns the resulting row (whether it was created or updated)
9
+-- so the caller can chain an email-enqueue without a re-read.
10
+INSERT INTO notifications (
11
+    recipient_user_id, kind, reason, repo_id,
12
+    thread_kind, thread_id, source_event_id,
13
+    last_event_at, last_actor_user_id
14
+) VALUES (
15
+    $1, $2, $3, $4, $5, $6, $7, now(), $8
16
+)
17
+ON CONFLICT (recipient_user_id, thread_kind, thread_id) WHERE thread_id IS NOT NULL
18
+DO UPDATE SET
19
+    kind               = EXCLUDED.kind,
20
+    reason             = EXCLUDED.reason,
21
+    source_event_id    = EXCLUDED.source_event_id,
22
+    last_event_at      = now(),
23
+    last_actor_user_id = EXCLUDED.last_actor_user_id,
24
+    unread             = true,
25
+    updated_at         = now()
26
+RETURNING id, recipient_user_id, kind, reason, repo_id,
27
+          thread_kind, thread_id, source_event_id, unread,
28
+          last_event_at, last_actor_user_id, summary, created_at, updated_at;
29
+
30
+-- name: InsertThreadlessNotification :one
31
+-- For events with no thread (e.g. repo-admin lifecycle: archived).
32
+-- These don't coalesce; each fires its own row. Used sparingly.
33
+INSERT INTO notifications (
34
+    recipient_user_id, kind, reason, repo_id,
35
+    source_event_id, last_actor_user_id
36
+) VALUES ($1, $2, $3, $4, $5, $6)
37
+RETURNING id, recipient_user_id, kind, reason, repo_id,
38
+          thread_kind, thread_id, source_event_id, unread,
39
+          last_event_at, last_actor_user_id, summary, created_at, updated_at;
40
+
41
+-- name: ListNotificationsForRecipient :many
42
+-- Inbox view, recency-sorted. `onlyUnread` toggles the inbox
43
+-- filter ("Unread" tab vs "All").
44
+SELECT n.id, n.recipient_user_id, n.kind, n.reason, n.repo_id,
45
+       n.thread_kind, n.thread_id, n.source_event_id, n.unread,
46
+       n.last_event_at, n.last_actor_user_id, n.summary,
47
+       n.created_at, n.updated_at,
48
+       coalesce(u.username, '') AS actor_username,
49
+       coalesce(r.name, '') AS repo_name,
50
+       coalesce(ru.username, '') AS repo_owner_username,
51
+       coalesce(i.number, 0) AS thread_number,
52
+       coalesce(i.title, '') AS thread_title
53
+FROM notifications n
54
+LEFT JOIN users u  ON u.id = n.last_actor_user_id
55
+LEFT JOIN repos r  ON r.id = n.repo_id
56
+LEFT JOIN users ru ON ru.id = r.owner_user_id
57
+LEFT JOIN issues i ON i.id = n.thread_id
58
+WHERE n.recipient_user_id = $1
59
+  AND ($2::boolean = false OR n.unread = true)
60
+ORDER BY n.last_event_at DESC
61
+LIMIT $3 OFFSET $4;
62
+
63
+-- name: CountUnreadForRecipient :one
64
+SELECT count(*) FROM notifications
65
+WHERE recipient_user_id = $1 AND unread = true;
66
+
67
+-- name: CountNotificationsForRecipient :one
68
+SELECT count(*) FROM notifications
69
+WHERE recipient_user_id = $1
70
+  AND ($2::boolean = false OR unread = true);
71
+
72
+-- name: SetNotificationRead :exec
73
+UPDATE notifications SET unread = false, updated_at = now()
74
+WHERE id = $1 AND recipient_user_id = $2;
75
+
76
+-- name: SetNotificationUnread :exec
77
+UPDATE notifications SET unread = true, updated_at = now()
78
+WHERE id = $1 AND recipient_user_id = $2;
79
+
80
+-- name: MarkAllReadForRecipient :exec
81
+-- Bounded sweep: a single call doesn't try to update millions of
82
+-- rows. Caller paginates via repeated calls when count > batch.
83
+UPDATE notifications SET unread = false, updated_at = now()
84
+WHERE recipient_user_id = $1 AND unread = true;
85
+
86
+-- name: GetNotification :one
87
+SELECT id, recipient_user_id, kind, reason, repo_id,
88
+       thread_kind, thread_id, source_event_id, unread,
89
+       last_event_at, last_actor_user_id, summary, created_at, updated_at
90
+FROM notifications WHERE id = $1;
internal/notif/queries/threads.sqladded
@@ -0,0 +1,37 @@
1
+-- ─── notification_threads ──────────────────────────────────────────
2
+
3
+-- name: GetNotificationThread :one
4
+SELECT recipient_user_id, thread_kind, thread_id, subscribed, reason, updated_at
5
+FROM notification_threads
6
+WHERE recipient_user_id = $1 AND thread_kind = $2 AND thread_id = $3;
7
+
8
+-- name: UpsertNotificationThread :exec
9
+-- Always-write upsert. Used by Subscribe / Unsubscribe / Ignore
10
+-- handlers and by the auto-subscription rules in the fan-out
11
+-- worker.
12
+INSERT INTO notification_threads (recipient_user_id, thread_kind, thread_id, subscribed, reason)
13
+VALUES ($1, $2, $3, $4, $5)
14
+ON CONFLICT (recipient_user_id, thread_kind, thread_id)
15
+DO UPDATE SET subscribed = EXCLUDED.subscribed,
16
+              reason     = EXCLUDED.reason,
17
+              updated_at = now();
18
+
19
+-- name: InsertNotificationThreadIfAbsent :exec
20
+-- Auto-subscription path: only insert if the user has no explicit
21
+-- preference yet. Preserves user choices (e.g. an explicit
22
+-- `subscribed=false` from clicking "Unsubscribe").
23
+INSERT INTO notification_threads (recipient_user_id, thread_kind, thread_id, subscribed, reason)
24
+VALUES ($1, $2, $3, $4, $5)
25
+ON CONFLICT (recipient_user_id, thread_kind, thread_id) DO NOTHING;
26
+
27
+-- name: DeleteNotificationThread :exec
28
+DELETE FROM notification_threads
29
+WHERE recipient_user_id = $1 AND thread_kind = $2 AND thread_id = $3;
30
+
31
+-- name: ListSubscribersForThread :many
32
+-- Fan-out helper: returns recipients who explicitly subscribed to a
33
+-- thread. The fan-out worker unions this with the per-repo `watches`
34
+-- result + author/assignee/reviewer rules.
35
+SELECT recipient_user_id, reason
36
+FROM notification_threads
37
+WHERE thread_kind = $1 AND thread_id = $2 AND subscribed = true;
internal/notif/routing.goadded
@@ -0,0 +1,165 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package notif
4
+
5
+// Reason is the short string the inbox row + email footer surface
6
+// to the user ("you were mentioned", "you were assigned", etc.).
7
+// Kept short because it's also the
8
+// `notification_email_log.notification_id`-joined column on the
9
+// inbox row.
10
+type Reason string
11
+
12
+const (
13
+	ReasonMention         Reason = "mention"
14
+	ReasonAssignment      Reason = "assignment"
15
+	ReasonReviewRequested Reason = "review_requested"
16
+	ReasonAuthor          Reason = "author"          // you opened the thread
17
+	ReasonCommenter       Reason = "commenter"       // you commented earlier
18
+	ReasonSubscribed      Reason = "subscribed"      // explicit thread sub
19
+	ReasonWatching        Reason = "watching"        // repo-level watch
20
+	ReasonRepoAdminAction Reason = "repo_admin_action"
21
+)
22
+
23
+// Relation is how a recipient relates to an event. The routing
24
+// matrix is keyed on (event-kind, relation). Typically more than
25
+// one relation applies — the fan-out worker picks the highest-
26
+// priority one (the order matches the priority).
27
+type Relation int
28
+
29
+const (
30
+	RelMention Relation = iota
31
+	RelAssignee
32
+	RelReviewer
33
+	RelAuthor
34
+	RelCommenter
35
+	RelSubscribedThread
36
+	RelWatchingAll
37
+	RelWatchingParticipating
38
+	RelRepoOwner // for repo-admin lifecycle events
39
+)
40
+
41
+// Action is the routing-matrix output. NotifyInbox controls the
42
+// in-app inbox row; EmailDefault is whether to email by default
43
+// (subject to the user's per-kind email pref). OverrideIgnore
44
+// means the event notifies even when the user has set
45
+// `watches.level = 'ignore'` for the repo (matches GitHub's
46
+// "@mentions always notify" semantics).
47
+type Action struct {
48
+	NotifyInbox    bool
49
+	EmailDefault   bool
50
+	OverrideIgnore bool
51
+	Reason         Reason
52
+}
53
+
54
+// Skip is the zero-action used when the matrix decides the
55
+// event-relation pair shouldn't notify.
56
+var Skip = Action{}
57
+
58
+// Routing returns the action for an (event-kind, recipient
59
+// relation) pair. The matrix is small enough today to be a
60
+// switch; once we have ~30 event kinds it'll graduate to a real
61
+// table-driven shape.
62
+//
63
+// Unknown kinds → Skip (deny by default — better to miss a
64
+// notification than to spam users when a new kind is added but
65
+// not routed).
66
+func Routing(kind string, rel Relation) Action {
67
+	switch kind {
68
+	case "issue_opened", "pull_opened":
69
+		// New issue / PR. Watching=all surfaces; nothing else.
70
+		switch rel {
71
+		case RelWatchingAll:
72
+			return Action{NotifyInbox: true, EmailDefault: true, Reason: ReasonWatching}
73
+		}
74
+	case "issue_comment", "pull_comment":
75
+		switch rel {
76
+		case RelMention:
77
+			return Action{NotifyInbox: true, EmailDefault: true, OverrideIgnore: true, Reason: ReasonMention}
78
+		case RelAssignee:
79
+			return Action{NotifyInbox: true, EmailDefault: true, Reason: ReasonAssignment}
80
+		case RelAuthor:
81
+			return Action{NotifyInbox: true, EmailDefault: true, Reason: ReasonAuthor}
82
+		case RelCommenter:
83
+			return Action{NotifyInbox: true, EmailDefault: true, Reason: ReasonCommenter}
84
+		case RelSubscribedThread:
85
+			return Action{NotifyInbox: true, EmailDefault: true, Reason: ReasonSubscribed}
86
+		case RelWatchingAll:
87
+			return Action{NotifyInbox: true, EmailDefault: true, Reason: ReasonWatching}
88
+		}
89
+	case "issue_assigned", "pull_assigned":
90
+		switch rel {
91
+		case RelAssignee:
92
+			return Action{NotifyInbox: true, EmailDefault: true, Reason: ReasonAssignment}
93
+		case RelMention:
94
+			return Action{NotifyInbox: true, EmailDefault: true, OverrideIgnore: true, Reason: ReasonMention}
95
+		}
96
+	case "issue_closed", "pull_closed", "pull_merged":
97
+		switch rel {
98
+		case RelAuthor:
99
+			return Action{NotifyInbox: true, EmailDefault: true, Reason: ReasonAuthor}
100
+		case RelAssignee:
101
+			return Action{NotifyInbox: true, EmailDefault: true, Reason: ReasonAssignment}
102
+		case RelSubscribedThread:
103
+			return Action{NotifyInbox: true, EmailDefault: true, Reason: ReasonSubscribed}
104
+		case RelWatchingAll:
105
+			return Action{NotifyInbox: true, EmailDefault: false, Reason: ReasonWatching}
106
+		}
107
+	case "review_requested":
108
+		switch rel {
109
+		case RelReviewer:
110
+			return Action{NotifyInbox: true, EmailDefault: true, OverrideIgnore: true, Reason: ReasonReviewRequested}
111
+		}
112
+	case "review_submitted":
113
+		switch rel {
114
+		case RelAuthor:
115
+			return Action{NotifyInbox: true, EmailDefault: true, Reason: ReasonAuthor}
116
+		case RelSubscribedThread:
117
+			return Action{NotifyInbox: true, EmailDefault: false, Reason: ReasonSubscribed}
118
+		}
119
+	case "mentioned":
120
+		// Standalone mention event (no inline thread context).
121
+		// Always inbox + email; overrides ignore.
122
+		switch rel {
123
+		case RelMention:
124
+			return Action{NotifyInbox: true, EmailDefault: true, OverrideIgnore: true, Reason: ReasonMention}
125
+		}
126
+	case "check_failed", "check_fixed":
127
+		// PR-author gets pinged when their PR's checks flip.
128
+		switch rel {
129
+		case RelAuthor:
130
+			return Action{NotifyInbox: true, EmailDefault: false, Reason: ReasonAuthor}
131
+		}
132
+	case "repo_archived", "repo_unarchived",
133
+		"repo_visibility_changed", "repo_soft_deleted",
134
+		"repo_restored", "repo_transfer_requested",
135
+		"repo_transfer_accepted", "repo_transfer_declined",
136
+		"repo_transfer_canceled", "repo_transfer_expired":
137
+		// S16-deferred lifecycle email kinds. Repo owner is the
138
+		// canonical recipient; transfer flows additionally notify
139
+		// the prospective recipient (computed by the caller).
140
+		switch rel {
141
+		case RelRepoOwner:
142
+			return Action{NotifyInbox: true, EmailDefault: true, Reason: ReasonRepoAdminAction}
143
+		}
144
+	}
145
+	return Skip
146
+}
147
+
148
+// EmailPrefKey returns the user_notification_prefs key for the
149
+// per-kind email toggle. Empty means "no toggle exists for this
150
+// reason — fall back to global default true."
151
+func EmailPrefKey(reason Reason) string {
152
+	switch reason {
153
+	case ReasonMention:
154
+		return "mentions_email"
155
+	case ReasonAssignment:
156
+		return "assignments_email"
157
+	case ReasonReviewRequested:
158
+		return "pr_review_requests_email"
159
+	case ReasonAuthor, ReasonCommenter, ReasonSubscribed, ReasonWatching:
160
+		return "issues_email"
161
+	case ReasonRepoAdminAction:
162
+		return "repo_admin_action_email"
163
+	}
164
+	return ""
165
+}
internal/notif/sqlc/cursor.sql.goadded
@@ -0,0 +1,93 @@
1
+// Code generated by sqlc. DO NOT EDIT.
2
+// versions:
3
+//   sqlc v1.31.1
4
+// source: cursor.sql
5
+
6
+package notifdb
7
+
8
+import (
9
+	"context"
10
+)
11
+
12
+const getEventCursor = `-- name: GetEventCursor :one
13
+
14
+SELECT consumer, last_event_id, updated_at
15
+FROM domain_events_processed
16
+WHERE consumer = $1
17
+`
18
+
19
+// ─── domain_events_processed ──────────────────────────────────────
20
+func (q *Queries) GetEventCursor(ctx context.Context, db DBTX, consumer string) (DomainEventsProcessed, error) {
21
+	row := db.QueryRow(ctx, getEventCursor, consumer)
22
+	var i DomainEventsProcessed
23
+	err := row.Scan(&i.Consumer, &i.LastEventID, &i.UpdatedAt)
24
+	return i, err
25
+}
26
+
27
+const listUnprocessedDomainEvents = `-- name: ListUnprocessedDomainEvents :many
28
+SELECT id, actor_user_id, kind, repo_id, source_kind, source_id,
29
+       public, payload, created_at
30
+FROM domain_events
31
+WHERE id > $1
32
+ORDER BY id
33
+LIMIT $2
34
+`
35
+
36
+type ListUnprocessedDomainEventsParams struct {
37
+	ID    int64
38
+	Limit int32
39
+}
40
+
41
+// The fan-out worker's read cursor. Bounded so a single tick
42
+// doesn't try to drain a million-row backlog.
43
+func (q *Queries) ListUnprocessedDomainEvents(ctx context.Context, db DBTX, arg ListUnprocessedDomainEventsParams) ([]DomainEvent, error) {
44
+	rows, err := db.Query(ctx, listUnprocessedDomainEvents, arg.ID, arg.Limit)
45
+	if err != nil {
46
+		return nil, err
47
+	}
48
+	defer rows.Close()
49
+	items := []DomainEvent{}
50
+	for rows.Next() {
51
+		var i DomainEvent
52
+		if err := rows.Scan(
53
+			&i.ID,
54
+			&i.ActorUserID,
55
+			&i.Kind,
56
+			&i.RepoID,
57
+			&i.SourceKind,
58
+			&i.SourceID,
59
+			&i.Public,
60
+			&i.Payload,
61
+			&i.CreatedAt,
62
+		); err != nil {
63
+			return nil, err
64
+		}
65
+		items = append(items, i)
66
+	}
67
+	if err := rows.Err(); err != nil {
68
+		return nil, err
69
+	}
70
+	return items, nil
71
+}
72
+
73
+const setEventCursor = `-- name: SetEventCursor :exec
74
+INSERT INTO domain_events_processed (consumer, last_event_id)
75
+VALUES ($1, $2)
76
+ON CONFLICT (consumer)
77
+DO UPDATE SET last_event_id = EXCLUDED.last_event_id,
78
+              updated_at    = now()
79
+`
80
+
81
+type SetEventCursorParams struct {
82
+	Consumer    string
83
+	LastEventID int64
84
+}
85
+
86
+// Always-write upsert so the worker doesn't have to special-case
87
+// the missing-row branch (the migration seeds 'notify_fanout' at
88
+// 0; future consumers like 'webhook_deliver' do the same on first
89
+// run via this same call).
90
+func (q *Queries) SetEventCursor(ctx context.Context, db DBTX, arg SetEventCursorParams) error {
91
+	_, err := db.Exec(ctx, setEventCursor, arg.Consumer, arg.LastEventID)
92
+	return err
93
+}
internal/notif/sqlc/db.goadded
@@ -0,0 +1,25 @@
1
+// Code generated by sqlc. DO NOT EDIT.
2
+// versions:
3
+//   sqlc v1.31.1
4
+
5
+package notifdb
6
+
7
+import (
8
+	"context"
9
+
10
+	"github.com/jackc/pgx/v5"
11
+	"github.com/jackc/pgx/v5/pgconn"
12
+)
13
+
14
+type DBTX interface {
15
+	Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error)
16
+	Query(context.Context, string, ...interface{}) (pgx.Rows, error)
17
+	QueryRow(context.Context, string, ...interface{}) pgx.Row
18
+}
19
+
20
+func New() *Queries {
21
+	return &Queries{}
22
+}
23
+
24
+type Queries struct {
25
+}
internal/notif/sqlc/email_log.sql.goadded
@@ -0,0 +1,93 @@
1
+// Code generated by sqlc. DO NOT EDIT.
2
+// versions:
3
+//   sqlc v1.31.1
4
+// source: email_log.sql
5
+
6
+package notifdb
7
+
8
+import (
9
+	"context"
10
+
11
+	"github.com/jackc/pgx/v5/pgtype"
12
+)
13
+
14
+const countEmailsForRecipientSince = `-- name: CountEmailsForRecipientSince :one
15
+SELECT count(*) FROM notification_email_log
16
+WHERE recipient_user_id = $1
17
+  AND sent_at > now() - make_interval(mins => $2::int)
18
+`
19
+
20
+type CountEmailsForRecipientSinceParams struct {
21
+	RecipientUserID int64
22
+	Column2         int32
23
+}
24
+
25
+// Per-recipient absolute rate cap: how many total emails to this
26
+// recipient in the last $2 minutes?
27
+func (q *Queries) CountEmailsForRecipientSince(ctx context.Context, db DBTX, arg CountEmailsForRecipientSinceParams) (int64, error) {
28
+	row := db.QueryRow(ctx, countEmailsForRecipientSince, arg.RecipientUserID, arg.Column2)
29
+	var count int64
30
+	err := row.Scan(&count)
31
+	return count, err
32
+}
33
+
34
+const countEmailsForRecipientThreadSince = `-- name: CountEmailsForRecipientThreadSince :one
35
+SELECT count(*) FROM notification_email_log
36
+WHERE recipient_user_id = $1
37
+  AND thread_kind = $2
38
+  AND thread_id = $3
39
+  AND sent_at > now() - make_interval(mins => $4::int)
40
+`
41
+
42
+type CountEmailsForRecipientThreadSinceParams struct {
43
+	RecipientUserID int64
44
+	ThreadKind      NullNotificationThreadKind
45
+	ThreadID        pgtype.Int8
46
+	Column4         int32
47
+}
48
+
49
+// Storm dampener probe: how many emails for this thread did we
50
+// send to this recipient in the last $4 minutes? Caller compares
51
+// to the cap.
52
+func (q *Queries) CountEmailsForRecipientThreadSince(ctx context.Context, db DBTX, arg CountEmailsForRecipientThreadSinceParams) (int64, error) {
53
+	row := db.QueryRow(ctx, countEmailsForRecipientThreadSince,
54
+		arg.RecipientUserID,
55
+		arg.ThreadKind,
56
+		arg.ThreadID,
57
+		arg.Column4,
58
+	)
59
+	var count int64
60
+	err := row.Scan(&count)
61
+	return count, err
62
+}
63
+
64
+const insertEmailLog = `-- name: InsertEmailLog :exec
65
+
66
+INSERT INTO notification_email_log
67
+    (recipient_user_id, notification_id, thread_kind, thread_id, message_id)
68
+VALUES ($1, $2, $3, $4, $5)
69
+`
70
+
71
+type InsertEmailLogParams struct {
72
+	RecipientUserID int64
73
+	NotificationID  pgtype.Int8
74
+	ThreadKind      NullNotificationThreadKind
75
+	ThreadID        pgtype.Int8
76
+	MessageID       pgtype.Text
77
+}
78
+
79
+// ─── notification_email_log ────────────────────────────────────────
80
+// Records an email send. Caller decides what to bind for thread_id
81
+// (NULL for thread-less notifications). MessageID is the SMTP /
82
+// transactional-provider message id when available; empty when
83
+// the sender doesn't surface one.
84
+func (q *Queries) InsertEmailLog(ctx context.Context, db DBTX, arg InsertEmailLogParams) error {
85
+	_, err := db.Exec(ctx, insertEmailLog,
86
+		arg.RecipientUserID,
87
+		arg.NotificationID,
88
+		arg.ThreadKind,
89
+		arg.ThreadID,
90
+		arg.MessageID,
91
+	)
92
+	return err
93
+}
internal/notif/sqlc/models.goadded
1404 lines changed — click to load
@@ -0,0 +1,1404 @@
1
+// Code generated by sqlc. DO NOT EDIT.
2
+// versions:
3
+//   sqlc v1.31.1
4
+
5
+package notifdb
6
+
7
+import (
8
+	"database/sql/driver"
9
+	"fmt"
10
+	"net/netip"
11
+
12
+	"github.com/jackc/pgx/v5/pgtype"
13
+)
14
+
15
+type CheckConclusion string
16
+
17
+const (
18
+	CheckConclusionSuccess        CheckConclusion = "success"
19
+	CheckConclusionFailure        CheckConclusion = "failure"
20
+	CheckConclusionNeutral        CheckConclusion = "neutral"
21
+	CheckConclusionCancelled      CheckConclusion = "cancelled"
22
+	CheckConclusionSkipped        CheckConclusion = "skipped"
23
+	CheckConclusionTimedOut       CheckConclusion = "timed_out"
24
+	CheckConclusionActionRequired CheckConclusion = "action_required"
25
+	CheckConclusionStale          CheckConclusion = "stale"
26
+)
27
+
28
+func (e *CheckConclusion) Scan(src interface{}) error {
29
+	switch s := src.(type) {
30
+	case []byte:
31
+		*e = CheckConclusion(s)
32
+	case string:
33
+		*e = CheckConclusion(s)
34
+	default:
35
+		return fmt.Errorf("unsupported scan type for CheckConclusion: %T", src)
36
+	}
37
+	return nil
38
+}
39
+
40
+type NullCheckConclusion struct {
41
+	CheckConclusion CheckConclusion
42
+	Valid           bool // Valid is true if CheckConclusion is not NULL
43
+}
44
+
45
+// Scan implements the Scanner interface.
46
+func (ns *NullCheckConclusion) Scan(value interface{}) error {
47
+	if value == nil {
48
+		ns.CheckConclusion, ns.Valid = "", false
49
+		return nil
50
+	}
51
+	ns.Valid = true
52
+	return ns.CheckConclusion.Scan(value)
53
+}
54
+
55
+// Value implements the driver Valuer interface.
56
+func (ns NullCheckConclusion) Value() (driver.Value, error) {
57
+	if !ns.Valid {
58
+		return nil, nil
59
+	}
60
+	return string(ns.CheckConclusion), nil
61
+}
62
+
63
+type CheckStatus string
64
+
65
+const (
66
+	CheckStatusQueued     CheckStatus = "queued"
67
+	CheckStatusInProgress CheckStatus = "in_progress"
68
+	CheckStatusCompleted  CheckStatus = "completed"
69
+	CheckStatusPending    CheckStatus = "pending"
70
+)
71
+
72
+func (e *CheckStatus) Scan(src interface{}) error {
73
+	switch s := src.(type) {
74
+	case []byte:
75
+		*e = CheckStatus(s)
76
+	case string:
77
+		*e = CheckStatus(s)
78
+	default:
79
+		return fmt.Errorf("unsupported scan type for CheckStatus: %T", src)
80
+	}
81
+	return nil
82
+}
83
+
84
+type NullCheckStatus struct {
85
+	CheckStatus CheckStatus
86
+	Valid       bool // Valid is true if CheckStatus is not NULL
87
+}
88
+
89
+// Scan implements the Scanner interface.
90
+func (ns *NullCheckStatus) Scan(value interface{}) error {
91
+	if value == nil {
92
+		ns.CheckStatus, ns.Valid = "", false
93
+		return nil
94
+	}
95
+	ns.Valid = true
96
+	return ns.CheckStatus.Scan(value)
97
+}
98
+
99
+// Value implements the driver Valuer interface.
100
+func (ns NullCheckStatus) Value() (driver.Value, error) {
101
+	if !ns.Valid {
102
+		return nil, nil
103
+	}
104
+	return string(ns.CheckStatus), nil
105
+}
106
+
107
+type CollabRole string
108
+
109
+const (
110
+	CollabRoleRead     CollabRole = "read"
111
+	CollabRoleTriage   CollabRole = "triage"
112
+	CollabRoleWrite    CollabRole = "write"
113
+	CollabRoleMaintain CollabRole = "maintain"
114
+	CollabRoleAdmin    CollabRole = "admin"
115
+)
116
+
117
+func (e *CollabRole) Scan(src interface{}) error {
118
+	switch s := src.(type) {
119
+	case []byte:
120
+		*e = CollabRole(s)
121
+	case string:
122
+		*e = CollabRole(s)
123
+	default:
124
+		return fmt.Errorf("unsupported scan type for CollabRole: %T", src)
125
+	}
126
+	return nil
127
+}
128
+
129
+type NullCollabRole struct {
130
+	CollabRole CollabRole
131
+	Valid      bool // Valid is true if CollabRole is not NULL
132
+}
133
+
134
+// Scan implements the Scanner interface.
135
+func (ns *NullCollabRole) Scan(value interface{}) error {
136
+	if value == nil {
137
+		ns.CollabRole, ns.Valid = "", false
138
+		return nil
139
+	}
140
+	ns.Valid = true
141
+	return ns.CollabRole.Scan(value)
142
+}
143
+
144
+// Value implements the driver Valuer interface.
145
+func (ns NullCollabRole) Value() (driver.Value, error) {
146
+	if !ns.Valid {
147
+		return nil, nil
148
+	}
149
+	return string(ns.CollabRole), nil
150
+}
151
+
152
+type IssueKind string
153
+
154
+const (
155
+	IssueKindIssue IssueKind = "issue"
156
+	IssueKindPr    IssueKind = "pr"
157
+)
158
+
159
+func (e *IssueKind) Scan(src interface{}) error {
160
+	switch s := src.(type) {
161
+	case []byte:
162
+		*e = IssueKind(s)
163
+	case string:
164
+		*e = IssueKind(s)
165
+	default:
166
+		return fmt.Errorf("unsupported scan type for IssueKind: %T", src)
167
+	}
168
+	return nil
169
+}
170
+
171
+type NullIssueKind struct {
172
+	IssueKind IssueKind
173
+	Valid     bool // Valid is true if IssueKind is not NULL
174
+}
175
+
176
+// Scan implements the Scanner interface.
177
+func (ns *NullIssueKind) Scan(value interface{}) error {
178
+	if value == nil {
179
+		ns.IssueKind, ns.Valid = "", false
180
+		return nil
181
+	}
182
+	ns.Valid = true
183
+	return ns.IssueKind.Scan(value)
184
+}
185
+
186
+// Value implements the driver Valuer interface.
187
+func (ns NullIssueKind) Value() (driver.Value, error) {
188
+	if !ns.Valid {
189
+		return nil, nil
190
+	}
191
+	return string(ns.IssueKind), nil
192
+}
193
+
194
+type IssueRefSource string
195
+
196
+const (
197
+	IssueRefSourceCommentBody   IssueRefSource = "comment_body"
198
+	IssueRefSourceIssueBody     IssueRefSource = "issue_body"
199
+	IssueRefSourceCommitMessage IssueRefSource = "commit_message"
200
+)
201
+
202
+func (e *IssueRefSource) Scan(src interface{}) error {
203
+	switch s := src.(type) {
204
+	case []byte:
205
+		*e = IssueRefSource(s)
206
+	case string:
207
+		*e = IssueRefSource(s)
208
+	default:
209
+		return fmt.Errorf("unsupported scan type for IssueRefSource: %T", src)
210
+	}
211
+	return nil
212
+}
213
+
214
+type NullIssueRefSource struct {
215
+	IssueRefSource IssueRefSource
216
+	Valid          bool // Valid is true if IssueRefSource is not NULL
217
+}
218
+
219
+// Scan implements the Scanner interface.
220
+func (ns *NullIssueRefSource) Scan(value interface{}) error {
221
+	if value == nil {
222
+		ns.IssueRefSource, ns.Valid = "", false
223
+		return nil
224
+	}
225
+	ns.Valid = true
226
+	return ns.IssueRefSource.Scan(value)
227
+}
228
+
229
+// Value implements the driver Valuer interface.
230
+func (ns NullIssueRefSource) Value() (driver.Value, error) {
231
+	if !ns.Valid {
232
+		return nil, nil
233
+	}
234
+	return string(ns.IssueRefSource), nil
235
+}
236
+
237
+type IssueState string
238
+
239
+const (
240
+	IssueStateOpen   IssueState = "open"
241
+	IssueStateClosed IssueState = "closed"
242
+)
243
+
244
+func (e *IssueState) Scan(src interface{}) error {
245
+	switch s := src.(type) {
246
+	case []byte:
247
+		*e = IssueState(s)
248
+	case string:
249
+		*e = IssueState(s)
250
+	default:
251
+		return fmt.Errorf("unsupported scan type for IssueState: %T", src)
252
+	}
253
+	return nil
254
+}
255
+
256
+type NullIssueState struct {
257
+	IssueState IssueState
258
+	Valid      bool // Valid is true if IssueState is not NULL
259
+}
260
+
261
+// Scan implements the Scanner interface.
262
+func (ns *NullIssueState) Scan(value interface{}) error {
263
+	if value == nil {
264
+		ns.IssueState, ns.Valid = "", false
265
+		return nil
266
+	}
267
+	ns.Valid = true
268
+	return ns.IssueState.Scan(value)
269
+}
270
+
271
+// Value implements the driver Valuer interface.
272
+func (ns NullIssueState) Value() (driver.Value, error) {
273
+	if !ns.Valid {
274
+		return nil, nil
275
+	}
276
+	return string(ns.IssueState), nil
277
+}
278
+
279
+type IssueStateReason string
280
+
281
+const (
282
+	IssueStateReasonCompleted  IssueStateReason = "completed"
283
+	IssueStateReasonNotPlanned IssueStateReason = "not_planned"
284
+	IssueStateReasonReopened   IssueStateReason = "reopened"
285
+	IssueStateReasonDuplicate  IssueStateReason = "duplicate"
286
+)
287
+
288
+func (e *IssueStateReason) Scan(src interface{}) error {
289
+	switch s := src.(type) {
290
+	case []byte:
291
+		*e = IssueStateReason(s)
292
+	case string:
293
+		*e = IssueStateReason(s)
294
+	default:
295
+		return fmt.Errorf("unsupported scan type for IssueStateReason: %T", src)
296
+	}
297
+	return nil
298
+}
299
+
300
+type NullIssueStateReason struct {
301
+	IssueStateReason IssueStateReason
302
+	Valid            bool // Valid is true if IssueStateReason is not NULL
303
+}
304
+
305
+// Scan implements the Scanner interface.
306
+func (ns *NullIssueStateReason) Scan(value interface{}) error {
307
+	if value == nil {
308
+		ns.IssueStateReason, ns.Valid = "", false
309
+		return nil
310
+	}
311
+	ns.Valid = true
312
+	return ns.IssueStateReason.Scan(value)
313
+}
314
+
315
+// Value implements the driver Valuer interface.
316
+func (ns NullIssueStateReason) Value() (driver.Value, error) {
317
+	if !ns.Valid {
318
+		return nil, nil
319
+	}
320
+	return string(ns.IssueStateReason), nil
321
+}
322
+
323
+type MilestoneState string
324
+
325
+const (
326
+	MilestoneStateOpen   MilestoneState = "open"
327
+	MilestoneStateClosed MilestoneState = "closed"
328
+)
329
+
330
+func (e *MilestoneState) Scan(src interface{}) error {
331
+	switch s := src.(type) {
332
+	case []byte:
333
+		*e = MilestoneState(s)
334
+	case string:
335
+		*e = MilestoneState(s)
336
+	default:
337
+		return fmt.Errorf("unsupported scan type for MilestoneState: %T", src)
338
+	}
339
+	return nil
340
+}
341
+
342
+type NullMilestoneState struct {
343
+	MilestoneState MilestoneState
344
+	Valid          bool // Valid is true if MilestoneState is not NULL
345
+}
346
+
347
+// Scan implements the Scanner interface.
348
+func (ns *NullMilestoneState) Scan(value interface{}) error {
349
+	if value == nil {
350
+		ns.MilestoneState, ns.Valid = "", false
351
+		return nil
352
+	}
353
+	ns.Valid = true
354
+	return ns.MilestoneState.Scan(value)
355
+}
356
+
357
+// Value implements the driver Valuer interface.
358
+func (ns NullMilestoneState) Value() (driver.Value, error) {
359
+	if !ns.Valid {
360
+		return nil, nil
361
+	}
362
+	return string(ns.MilestoneState), nil
363
+}
364
+
365
+type NotificationThreadKind string
366
+
367
+const (
368
+	NotificationThreadKindIssue NotificationThreadKind = "issue"
369
+	NotificationThreadKindPr    NotificationThreadKind = "pr"
370
+)
371
+
372
+func (e *NotificationThreadKind) Scan(src interface{}) error {
373
+	switch s := src.(type) {
374
+	case []byte:
375
+		*e = NotificationThreadKind(s)
376
+	case string:
377
+		*e = NotificationThreadKind(s)
378
+	default:
379
+		return fmt.Errorf("unsupported scan type for NotificationThreadKind: %T", src)
380
+	}
381
+	return nil
382
+}
383
+
384
+type NullNotificationThreadKind struct {
385
+	NotificationThreadKind NotificationThreadKind
386
+	Valid                  bool // Valid is true if NotificationThreadKind is not NULL
387
+}
388
+
389
+// Scan implements the Scanner interface.
390
+func (ns *NullNotificationThreadKind) Scan(value interface{}) error {
391
+	if value == nil {
392
+		ns.NotificationThreadKind, ns.Valid = "", false
393
+		return nil
394
+	}
395
+	ns.Valid = true
396
+	return ns.NotificationThreadKind.Scan(value)
397
+}
398
+
399
+// Value implements the driver Valuer interface.
400
+func (ns NullNotificationThreadKind) Value() (driver.Value, error) {
401
+	if !ns.Valid {
402
+		return nil, nil
403
+	}
404
+	return string(ns.NotificationThreadKind), nil
405
+}
406
+
407
+type PrFileStatus string
408
+
409
+const (
410
+	PrFileStatusAdded    PrFileStatus = "added"
411
+	PrFileStatusModified PrFileStatus = "modified"
412
+	PrFileStatusDeleted  PrFileStatus = "deleted"
413
+	PrFileStatusRenamed  PrFileStatus = "renamed"
414
+	PrFileStatusCopied   PrFileStatus = "copied"
415
+)
416
+
417
+func (e *PrFileStatus) Scan(src interface{}) error {
418
+	switch s := src.(type) {
419
+	case []byte:
420
+		*e = PrFileStatus(s)
421
+	case string:
422
+		*e = PrFileStatus(s)
423
+	default:
424
+		return fmt.Errorf("unsupported scan type for PrFileStatus: %T", src)
425
+	}
426
+	return nil
427
+}
428
+
429
+type NullPrFileStatus struct {
430
+	PrFileStatus PrFileStatus
431
+	Valid        bool // Valid is true if PrFileStatus is not NULL
432
+}
433
+
434
+// Scan implements the Scanner interface.
435
+func (ns *NullPrFileStatus) Scan(value interface{}) error {
436
+	if value == nil {
437
+		ns.PrFileStatus, ns.Valid = "", false
438
+		return nil
439
+	}
440
+	ns.Valid = true
441
+	return ns.PrFileStatus.Scan(value)
442
+}
443
+
444
+// Value implements the driver Valuer interface.
445
+func (ns NullPrFileStatus) Value() (driver.Value, error) {
446
+	if !ns.Valid {
447
+		return nil, nil
448
+	}
449
+	return string(ns.PrFileStatus), nil
450
+}
451
+
452
+type PrMergeMethod string
453
+
454
+const (
455
+	PrMergeMethodMerge  PrMergeMethod = "merge"
456
+	PrMergeMethodSquash PrMergeMethod = "squash"
457
+	PrMergeMethodRebase PrMergeMethod = "rebase"
458
+)
459
+
460
+func (e *PrMergeMethod) Scan(src interface{}) error {
461
+	switch s := src.(type) {
462
+	case []byte:
463
+		*e = PrMergeMethod(s)
464
+	case string:
465
+		*e = PrMergeMethod(s)
466
+	default:
467
+		return fmt.Errorf("unsupported scan type for PrMergeMethod: %T", src)
468
+	}
469
+	return nil
470
+}
471
+
472
+type NullPrMergeMethod struct {
473
+	PrMergeMethod PrMergeMethod
474
+	Valid         bool // Valid is true if PrMergeMethod is not NULL
475
+}
476
+
477
+// Scan implements the Scanner interface.
478
+func (ns *NullPrMergeMethod) Scan(value interface{}) error {
479
+	if value == nil {
480
+		ns.PrMergeMethod, ns.Valid = "", false
481
+		return nil
482
+	}
483
+	ns.Valid = true
484
+	return ns.PrMergeMethod.Scan(value)
485
+}
486
+
487
+// Value implements the driver Valuer interface.
488
+func (ns NullPrMergeMethod) Value() (driver.Value, error) {
489
+	if !ns.Valid {
490
+		return nil, nil
491
+	}
492
+	return string(ns.PrMergeMethod), nil
493
+}
494
+
495
+type PrMergeableState string
496
+
497
+const (
498
+	PrMergeableStateUnknown PrMergeableState = "unknown"
499
+	PrMergeableStateClean   PrMergeableState = "clean"
500
+	PrMergeableStateDirty   PrMergeableState = "dirty"
501
+	PrMergeableStateBlocked PrMergeableState = "blocked"
502
+	PrMergeableStateBehind  PrMergeableState = "behind"
503
+)
504
+
505
+func (e *PrMergeableState) Scan(src interface{}) error {
506
+	switch s := src.(type) {
507
+	case []byte:
508
+		*e = PrMergeableState(s)
509
+	case string:
510
+		*e = PrMergeableState(s)
511
+	default:
512
+		return fmt.Errorf("unsupported scan type for PrMergeableState: %T", src)
513
+	}
514
+	return nil
515
+}
516
+
517
+type NullPrMergeableState struct {
518
+	PrMergeableState PrMergeableState
519
+	Valid            bool // Valid is true if PrMergeableState is not NULL
520
+}
521
+
522
+// Scan implements the Scanner interface.
523
+func (ns *NullPrMergeableState) Scan(value interface{}) error {
524
+	if value == nil {
525
+		ns.PrMergeableState, ns.Valid = "", false
526
+		return nil
527
+	}
528
+	ns.Valid = true
529
+	return ns.PrMergeableState.Scan(value)
530
+}
531
+
532
+// Value implements the driver Valuer interface.
533
+func (ns NullPrMergeableState) Value() (driver.Value, error) {
534
+	if !ns.Valid {
535
+		return nil, nil
536
+	}
537
+	return string(ns.PrMergeableState), nil
538
+}
539
+
540
+type PrReviewSide string
541
+
542
+const (
543
+	PrReviewSideLeft  PrReviewSide = "left"
544
+	PrReviewSideRight PrReviewSide = "right"
545
+)
546
+
547
+func (e *PrReviewSide) Scan(src interface{}) error {
548
+	switch s := src.(type) {
549
+	case []byte:
550
+		*e = PrReviewSide(s)
551
+	case string:
552
+		*e = PrReviewSide(s)
553
+	default:
554
+		return fmt.Errorf("unsupported scan type for PrReviewSide: %T", src)
555
+	}
556
+	return nil
557
+}
558
+
559
+type NullPrReviewSide struct {
560
+	PrReviewSide PrReviewSide
561
+	Valid        bool // Valid is true if PrReviewSide is not NULL
562
+}
563
+
564
+// Scan implements the Scanner interface.
565
+func (ns *NullPrReviewSide) Scan(value interface{}) error {
566
+	if value == nil {
567
+		ns.PrReviewSide, ns.Valid = "", false
568
+		return nil
569
+	}
570
+	ns.Valid = true
571
+	return ns.PrReviewSide.Scan(value)
572
+}
573
+
574
+// Value implements the driver Valuer interface.
575
+func (ns NullPrReviewSide) Value() (driver.Value, error) {
576
+	if !ns.Valid {
577
+		return nil, nil
578
+	}
579
+	return string(ns.PrReviewSide), nil
580
+}
581
+
582
+type PrReviewState string
583
+
584
+const (
585
+	PrReviewStateComment        PrReviewState = "comment"
586
+	PrReviewStateApprove        PrReviewState = "approve"
587
+	PrReviewStateRequestChanges PrReviewState = "request_changes"
588
+)
589
+
590
+func (e *PrReviewState) Scan(src interface{}) error {
591
+	switch s := src.(type) {
592
+	case []byte:
593
+		*e = PrReviewState(s)
594
+	case string:
595
+		*e = PrReviewState(s)
596
+	default:
597
+		return fmt.Errorf("unsupported scan type for PrReviewState: %T", src)
598
+	}
599
+	return nil
600
+}
601
+
602
+type NullPrReviewState struct {
603
+	PrReviewState PrReviewState
604
+	Valid         bool // Valid is true if PrReviewState is not NULL
605
+}
606
+
607
+// Scan implements the Scanner interface.
608
+func (ns *NullPrReviewState) Scan(value interface{}) error {
609
+	if value == nil {
610
+		ns.PrReviewState, ns.Valid = "", false
611
+		return nil
612
+	}
613
+	ns.Valid = true
614
+	return ns.PrReviewState.Scan(value)
615
+}
616
+
617
+// Value implements the driver Valuer interface.
618
+func (ns NullPrReviewState) Value() (driver.Value, error) {
619
+	if !ns.Valid {
620
+		return nil, nil
621
+	}
622
+	return string(ns.PrReviewState), nil
623
+}
624
+
625
+type RepoInitStatus string
626
+
627
+const (
628
+	RepoInitStatusInitialized RepoInitStatus = "initialized"
629
+	RepoInitStatusInitPending RepoInitStatus = "init_pending"
630
+	RepoInitStatusInitFailed  RepoInitStatus = "init_failed"
631
+)
632
+
633
+func (e *RepoInitStatus) Scan(src interface{}) error {
634
+	switch s := src.(type) {
635
+	case []byte:
636
+		*e = RepoInitStatus(s)
637
+	case string:
638
+		*e = RepoInitStatus(s)
639
+	default:
640
+		return fmt.Errorf("unsupported scan type for RepoInitStatus: %T", src)
641
+	}
642
+	return nil
643
+}
644
+
645
+type NullRepoInitStatus struct {
646
+	RepoInitStatus RepoInitStatus
647
+	Valid          bool // Valid is true if RepoInitStatus is not NULL
648
+}
649
+
650
+// Scan implements the Scanner interface.
651
+func (ns *NullRepoInitStatus) Scan(value interface{}) error {
652
+	if value == nil {
653
+		ns.RepoInitStatus, ns.Valid = "", false
654
+		return nil
655
+	}
656
+	ns.Valid = true
657
+	return ns.RepoInitStatus.Scan(value)
658
+}
659
+
660
+// Value implements the driver Valuer interface.
661
+func (ns NullRepoInitStatus) Value() (driver.Value, error) {
662
+	if !ns.Valid {
663
+		return nil, nil
664
+	}
665
+	return string(ns.RepoInitStatus), nil
666
+}
667
+
668
+type RepoVisibility string
669
+
670
+const (
671
+	RepoVisibilityPublic  RepoVisibility = "public"
672
+	RepoVisibilityPrivate RepoVisibility = "private"
673
+)
674
+
675
+func (e *RepoVisibility) Scan(src interface{}) error {
676
+	switch s := src.(type) {
677
+	case []byte:
678
+		*e = RepoVisibility(s)
679
+	case string:
680
+		*e = RepoVisibility(s)
681
+	default:
682
+		return fmt.Errorf("unsupported scan type for RepoVisibility: %T", src)
683
+	}
684
+	return nil
685
+}
686
+
687
+type NullRepoVisibility struct {
688
+	RepoVisibility RepoVisibility
689
+	Valid          bool // Valid is true if RepoVisibility is not NULL
690
+}
691
+
692
+// Scan implements the Scanner interface.
693
+func (ns *NullRepoVisibility) Scan(value interface{}) error {
694
+	if value == nil {
695
+		ns.RepoVisibility, ns.Valid = "", false
696
+		return nil
697
+	}
698
+	ns.Valid = true
699
+	return ns.RepoVisibility.Scan(value)
700
+}
701
+
702
+// Value implements the driver Valuer interface.
703
+func (ns NullRepoVisibility) Value() (driver.Value, error) {
704
+	if !ns.Valid {
705
+		return nil, nil
706
+	}
707
+	return string(ns.RepoVisibility), nil
708
+}
709
+
710
+type TransferPrincipalKind string
711
+
712
+const (
713
+	TransferPrincipalKindUser TransferPrincipalKind = "user"
714
+	TransferPrincipalKindOrg  TransferPrincipalKind = "org"
715
+)
716
+
717
+func (e *TransferPrincipalKind) Scan(src interface{}) error {
718
+	switch s := src.(type) {
719
+	case []byte:
720
+		*e = TransferPrincipalKind(s)
721
+	case string:
722
+		*e = TransferPrincipalKind(s)
723
+	default:
724
+		return fmt.Errorf("unsupported scan type for TransferPrincipalKind: %T", src)
725
+	}
726
+	return nil
727
+}
728
+
729
+type NullTransferPrincipalKind struct {
730
+	TransferPrincipalKind TransferPrincipalKind
731
+	Valid                 bool // Valid is true if TransferPrincipalKind is not NULL
732
+}
733
+
734
+// Scan implements the Scanner interface.
735
+func (ns *NullTransferPrincipalKind) Scan(value interface{}) error {
736
+	if value == nil {
737
+		ns.TransferPrincipalKind, ns.Valid = "", false
738
+		return nil
739
+	}
740
+	ns.Valid = true
741
+	return ns.TransferPrincipalKind.Scan(value)
742
+}
743
+
744
+// Value implements the driver Valuer interface.
745
+func (ns NullTransferPrincipalKind) Value() (driver.Value, error) {
746
+	if !ns.Valid {
747
+		return nil, nil
748
+	}
749
+	return string(ns.TransferPrincipalKind), nil
750
+}
751
+
752
+type TransferStatus string
753
+
754
+const (
755
+	TransferStatusPending  TransferStatus = "pending"
756
+	TransferStatusAccepted TransferStatus = "accepted"
757
+	TransferStatusDeclined TransferStatus = "declined"
758
+	TransferStatusCanceled TransferStatus = "canceled"
759
+	TransferStatusExpired  TransferStatus = "expired"
760
+)
761
+
762
+func (e *TransferStatus) Scan(src interface{}) error {
763
+	switch s := src.(type) {
764
+	case []byte:
765
+		*e = TransferStatus(s)
766
+	case string:
767
+		*e = TransferStatus(s)
768
+	default:
769
+		return fmt.Errorf("unsupported scan type for TransferStatus: %T", src)
770
+	}
771
+	return nil
772
+}
773
+
774
+type NullTransferStatus struct {
775
+	TransferStatus TransferStatus
776
+	Valid          bool // Valid is true if TransferStatus is not NULL
777
+}
778
+
779
+// Scan implements the Scanner interface.
780
+func (ns *NullTransferStatus) Scan(value interface{}) error {
781
+	if value == nil {
782
+		ns.TransferStatus, ns.Valid = "", false
783
+		return nil
784
+	}
785
+	ns.Valid = true
786
+	return ns.TransferStatus.Scan(value)
787
+}
788
+
789
+// Value implements the driver Valuer interface.
790
+func (ns NullTransferStatus) Value() (driver.Value, error) {
791
+	if !ns.Valid {
792
+		return nil, nil
793
+	}
794
+	return string(ns.TransferStatus), nil
795
+}
796
+
797
+type WatchLevel string
798
+
799
+const (
800
+	WatchLevelAll           WatchLevel = "all"
801
+	WatchLevelParticipating WatchLevel = "participating"
802
+	WatchLevelIgnore        WatchLevel = "ignore"
803
+)
804
+
805
+func (e *WatchLevel) Scan(src interface{}) error {
806
+	switch s := src.(type) {
807
+	case []byte:
808
+		*e = WatchLevel(s)
809
+	case string:
810
+		*e = WatchLevel(s)
811
+	default:
812
+		return fmt.Errorf("unsupported scan type for WatchLevel: %T", src)
813
+	}
814
+	return nil
815
+}
816
+
817
+type NullWatchLevel struct {
818
+	WatchLevel WatchLevel
819
+	Valid      bool // Valid is true if WatchLevel is not NULL
820
+}
821
+
822
+// Scan implements the Scanner interface.
823
+func (ns *NullWatchLevel) Scan(value interface{}) error {
824
+	if value == nil {
825
+		ns.WatchLevel, ns.Valid = "", false
826
+		return nil
827
+	}
828
+	ns.Valid = true
829
+	return ns.WatchLevel.Scan(value)
830
+}
831
+
832
+// Value implements the driver Valuer interface.
833
+func (ns NullWatchLevel) Value() (driver.Value, error) {
834
+	if !ns.Valid {
835
+		return nil, nil
836
+	}
837
+	return string(ns.WatchLevel), nil
838
+}
839
+
840
+type AuthAuditLog struct {
841
+	ID         int64
842
+	ActorID    pgtype.Int8
843
+	Action     string
844
+	TargetType string
845
+	TargetID   pgtype.Int8
846
+	Meta       []byte
847
+	CreatedAt  pgtype.Timestamptz
848
+}
849
+
850
+type AuthThrottle struct {
851
+	ID              int64
852
+	Scope           string
853
+	Identifier      string
854
+	Hits            int32
855
+	WindowStartedAt pgtype.Timestamptz
856
+}
857
+
858
+type BranchProtectionRule struct {
859
+	ID                             int64
860
+	RepoID                         int64
861
+	Pattern                        string
862
+	PreventForcePush               bool
863
+	PreventDeletion                bool
864
+	RequirePrForPush               bool
865
+	AllowedPusherUserIds           []int64
866
+	RequireSignedCommits           bool
867
+	StatusChecksRequired           []string
868
+	CreatedAt                      pgtype.Timestamptz
869
+	UpdatedAt                      pgtype.Timestamptz
870
+	CreatedByUserID                pgtype.Int8
871
+	RequiredReviewCount            int32
872
+	DismissStaleReviewsOnPush      bool
873
+	RequireCodeOwnerReview         bool
874
+	DismissStaleStatusChecksOnPush bool
875
+}
876
+
877
+type CheckRun struct {
878
+	ID          int64
879
+	SuiteID     int64
880
+	RepoID      int64
881
+	HeadSha     string
882
+	Name        string
883
+	Status      CheckStatus
884
+	Conclusion  NullCheckConclusion
885
+	StartedAt   pgtype.Timestamptz
886
+	CompletedAt pgtype.Timestamptz
887
+	DetailsUrl  string
888
+	Output      []byte
889
+	ExternalID  pgtype.Text
890
+	CreatedAt   pgtype.Timestamptz
891
+	UpdatedAt   pgtype.Timestamptz
892
+}
893
+
894
+type CheckSuite struct {
895
+	ID         int64
896
+	RepoID     int64
897
+	HeadSha    string
898
+	AppSlug    string
899
+	Status     CheckStatus
900
+	Conclusion NullCheckConclusion
901
+	CreatedAt  pgtype.Timestamptz
902
+	UpdatedAt  pgtype.Timestamptz
903
+}
904
+
905
+type CodeSearchContent struct {
906
+	RepoID      int64
907
+	RefName     string
908
+	Path        string
909
+	ContentTsv  interface{}
910
+	ContentTrgm string
911
+}
912
+
913
+type CodeSearchPath struct {
914
+	RepoID  int64
915
+	RefName string
916
+	Path    string
917
+	Tsv     interface{}
918
+}
919
+
920
+type DomainEvent struct {
921
+	ID          int64
922
+	ActorUserID pgtype.Int8
923
+	Kind        string
924
+	RepoID      pgtype.Int8
925
+	SourceKind  string
926
+	SourceID    int64
927
+	Public      bool
928
+	Payload     []byte
929
+	CreatedAt   pgtype.Timestamptz
930
+}
931
+
932
+type DomainEventsProcessed struct {
933
+	Consumer    string
934
+	LastEventID int64
935
+	UpdatedAt   pgtype.Timestamptz
936
+}
937
+
938
+type EmailVerification struct {
939
+	ID          int64
940
+	UserEmailID int64
941
+	TokenHash   []byte
942
+	ExpiresAt   pgtype.Timestamptz
943
+	UsedAt      pgtype.Timestamptz
944
+	CreatedAt   pgtype.Timestamptz
945
+}
946
+
947
+type Issue struct {
948
+	ID                int64
949
+	RepoID            int64
950
+	Number            int64
951
+	Kind              IssueKind
952
+	Title             string
953
+	Body              string
954
+	BodyHtmlCached    pgtype.Text
955
+	MdPipelineVersion int32
956
+	AuthorUserID      pgtype.Int8
957
+	State             IssueState
958
+	StateReason       NullIssueStateReason
959
+	Locked            bool
960
+	LockReason        pgtype.Text
961
+	MilestoneID       pgtype.Int8
962
+	CreatedAt         pgtype.Timestamptz
963
+	UpdatedAt         pgtype.Timestamptz
964
+	EditedAt          pgtype.Timestamptz
965
+	ClosedAt          pgtype.Timestamptz
966
+	ClosedByUserID    pgtype.Int8
967
+}
968
+
969
+type IssueAssignee struct {
970
+	IssueID          int64
971
+	UserID           int64
972
+	AssignedAt       pgtype.Timestamptz
973
+	AssignedByUserID pgtype.Int8
974
+}
975
+
976
+type IssueComment struct {
977
+	ID                int64
978
+	IssueID           int64
979
+	AuthorUserID      pgtype.Int8
980
+	Body              string
981
+	BodyHtmlCached    pgtype.Text
982
+	MdPipelineVersion int32
983
+	CreatedAt         pgtype.Timestamptz
984
+	UpdatedAt         pgtype.Timestamptz
985
+	EditedAt          pgtype.Timestamptz
986
+}
987
+
988
+type IssueEvent struct {
989
+	ID          int64
990
+	IssueID     int64
991
+	ActorUserID pgtype.Int8
992
+	Kind        string
993
+	Meta        []byte
994
+	RefTargetID pgtype.Int8
995
+	CreatedAt   pgtype.Timestamptz
996
+}
997
+
998
+type IssueLabel struct {
999
+	IssueID         int64
1000
+	LabelID         int64
1001
+	AppliedAt       pgtype.Timestamptz
1002
+	AppliedByUserID pgtype.Int8
1003
+}
1004
+
1005
+type IssueReference struct {
1006
+	ID             int64
1007
+	SourceIssueID  pgtype.Int8
1008
+	TargetIssueID  int64
1009
+	SourceKind     IssueRefSource
1010
+	SourceObjectID pgtype.Int8
1011
+	CreatedAt      pgtype.Timestamptz
1012
+}
1013
+
1014
+type IssuesSearch struct {
1015
+	IssueID      int64
1016
+	RepoID       int64
1017
+	Kind         IssueKind
1018
+	State        IssueState
1019
+	AuthorUserID pgtype.Int8
1020
+	Tsv          interface{}
1021
+}
1022
+
1023
+type Job struct {
1024
+	ID          int64
1025
+	Kind        string
1026
+	Payload     []byte
1027
+	RunAt       pgtype.Timestamptz
1028
+	Attempts    int32
1029
+	MaxAttempts int32
1030
+	LastError   pgtype.Text
1031
+	LockedBy    pgtype.Text
1032
+	LockedAt    pgtype.Timestamptz
1033
+	CompletedAt pgtype.Timestamptz
1034
+	FailedAt    pgtype.Timestamptz
1035
+	CreatedAt   pgtype.Timestamptz
1036
+}
1037
+
1038
+type Label struct {
1039
+	ID          int64
1040
+	RepoID      int64
1041
+	Name        string
1042
+	Color       string
1043
+	Description string
1044
+	CreatedAt   pgtype.Timestamptz
1045
+}
1046
+
1047
+type Meta struct {
1048
+	Key       string
1049
+	Value     []byte
1050
+	UpdatedAt pgtype.Timestamptz
1051
+}
1052
+
1053
+type Milestone struct {
1054
+	ID          int64
1055
+	RepoID      int64
1056
+	Title       string
1057
+	Description string
1058
+	State       MilestoneState
1059
+	DueOn       pgtype.Timestamptz
1060
+	CreatedAt   pgtype.Timestamptz
1061
+	ClosedAt    pgtype.Timestamptz
1062
+}
1063
+
1064
+type Notification struct {
1065
+	ID              int64
1066
+	RecipientUserID int64
1067
+	Kind            string
1068
+	Reason          string
1069
+	RepoID          pgtype.Int8
1070
+	ThreadKind      NullNotificationThreadKind
1071
+	ThreadID        pgtype.Int8
1072
+	SourceEventID   pgtype.Int8
1073
+	Unread          bool
1074
+	LastEventAt     pgtype.Timestamptz
1075
+	LastActorUserID pgtype.Int8
1076
+	Summary         []byte
1077
+	CreatedAt       pgtype.Timestamptz
1078
+	UpdatedAt       pgtype.Timestamptz
1079
+}
1080
+
1081
+type NotificationEmailLog struct {
1082
+	ID              int64
1083
+	RecipientUserID int64
1084
+	NotificationID  pgtype.Int8
1085
+	ThreadKind      NullNotificationThreadKind
1086
+	ThreadID        pgtype.Int8
1087
+	SentAt          pgtype.Timestamptz
1088
+	MessageID       pgtype.Text
1089
+}
1090
+
1091
+type NotificationThread struct {
1092
+	RecipientUserID int64
1093
+	ThreadKind      NotificationThreadKind
1094
+	ThreadID        int64
1095
+	Subscribed      bool
1096
+	Reason          string
1097
+	UpdatedAt       pgtype.Timestamptz
1098
+}
1099
+
1100
+type PasswordReset struct {
1101
+	ID        int64
1102
+	UserID    int64
1103
+	TokenHash []byte
1104
+	ExpiresAt pgtype.Timestamptz
1105
+	UsedAt    pgtype.Timestamptz
1106
+	CreatedAt pgtype.Timestamptz
1107
+}
1108
+
1109
+type PrReview struct {
1110
+	ID                int64
1111
+	PrIssueID         int64
1112
+	AuthorUserID      pgtype.Int8
1113
+	State             PrReviewState
1114
+	Body              string
1115
+	BodyHtmlCached    pgtype.Text
1116
+	SubmittedAt       pgtype.Timestamptz
1117
+	DismissedAt       pgtype.Timestamptz
1118
+	DismissedByUserID pgtype.Int8
1119
+	DismissalReason   string
1120
+}
1121
+
1122
+type PrReviewComment struct {
1123
+	ID                int64
1124
+	PrIssueID         int64
1125
+	ReviewID          pgtype.Int8
1126
+	AuthorUserID      pgtype.Int8
1127
+	FilePath          string
1128
+	Side              PrReviewSide
1129
+	OriginalCommitSha string
1130
+	OriginalLine      int32
1131
+	OriginalPosition  int32
1132
+	CurrentPosition   pgtype.Int4
1133
+	Body              string
1134
+	BodyHtmlCached    pgtype.Text
1135
+	InReplyToID       pgtype.Int8
1136
+	Pending           bool
1137
+	ResolvedAt        pgtype.Timestamptz
1138
+	ResolvedByUserID  pgtype.Int8
1139
+	CreatedAt         pgtype.Timestamptz
1140
+	UpdatedAt         pgtype.Timestamptz
1141
+	EditedAt          pgtype.Timestamptz
1142
+}
1143
+
1144
+type PrReviewRequest struct {
1145
+	ID                  int64
1146
+	PrIssueID           int64
1147
+	RequestedUserID     pgtype.Int8
1148
+	RequestedTeamID     pgtype.Int8
1149
+	RequestedByUserID   pgtype.Int8
1150
+	RequestedAt         pgtype.Timestamptz
1151
+	DismissedAt         pgtype.Timestamptz
1152
+	SatisfiedByReviewID pgtype.Int8
1153
+}
1154
+
1155
+type PullRequest struct {
1156
+	IssueID            int64
1157
+	BaseRef            string
1158
+	HeadRef            string
1159
+	HeadRepoID         int64
1160
+	BaseOid            string
1161
+	HeadOid            string
1162
+	Draft              bool
1163
+	Mergeable          pgtype.Bool
1164
+	MergeableState     PrMergeableState
1165
+	MergeCommitSha     pgtype.Text
1166
+	MergedAt           pgtype.Timestamptz
1167
+	MergedByUserID     pgtype.Int8
1168
+	MergeMethod        NullPrMergeMethod
1169
+	BaseOidAtMerge     pgtype.Text
1170
+	HeadOidAtMerge     pgtype.Text
1171
+	LastSynchronizedAt pgtype.Timestamptz
1172
+}
1173
+
1174
+type PullRequestCommit struct {
1175
+	PrID           int64
1176
+	Sha            string
1177
+	Position       int32
1178
+	AuthorName     string
1179
+	AuthorEmail    string
1180
+	CommitterName  string
1181
+	CommitterEmail string
1182
+	Subject        string
1183
+	Body           string
1184
+	AuthoredAt     pgtype.Timestamptz
1185
+	CommittedAt    pgtype.Timestamptz
1186
+}
1187
+
1188
+type PullRequestFile struct {
1189
+	PrID      int64
1190
+	Path      string
1191
+	Status    PrFileStatus
1192
+	OldPath   pgtype.Text
1193
+	Additions int32
1194
+	Deletions int32
1195
+	Changes   int32
1196
+}
1197
+
1198
+type PushEvent struct {
1199
+	ID           int64
1200
+	RepoID       int64
1201
+	PusherUserID pgtype.Int8
1202
+	BeforeSha    string
1203
+	AfterSha     string
1204
+	Ref          string
1205
+	Protocol     string
1206
+	RequestID    string
1207
+	ProcessedAt  pgtype.Timestamptz
1208
+	CreatedAt    pgtype.Timestamptz
1209
+}
1210
+
1211
+type Repo struct {
1212
+	ID                 int64
1213
+	OwnerUserID        pgtype.Int8
1214
+	OwnerOrgID         pgtype.Int8
1215
+	Name               string
1216
+	Description        string
1217
+	Visibility         RepoVisibility
1218
+	DefaultBranch      string
1219
+	IsArchived         bool
1220
+	ArchivedAt         pgtype.Timestamptz
1221
+	DeletedAt          pgtype.Timestamptz
1222
+	DiskUsedBytes      int64
1223
+	ForkOfRepoID       pgtype.Int8
1224
+	LicenseKey         pgtype.Text
1225
+	PrimaryLanguage    pgtype.Text
1226
+	HasIssues          bool
1227
+	HasPulls           bool
1228
+	CreatedAt          pgtype.Timestamptz
1229
+	UpdatedAt          pgtype.Timestamptz
1230
+	DefaultBranchOid   pgtype.Text
1231
+	AllowSquashMerge   bool
1232
+	AllowRebaseMerge   bool
1233
+	AllowMergeCommit   bool
1234
+	DefaultMergeMethod PrMergeMethod
1235
+	StarCount          int64
1236
+	WatcherCount       int64
1237
+	ForkCount          int64
1238
+	InitStatus         RepoInitStatus
1239
+	LastIndexedOid     pgtype.Text
1240
+}
1241
+
1242
+type RepoCollaborator struct {
1243
+	RepoID        int64
1244
+	UserID        int64
1245
+	Role          CollabRole
1246
+	AddedAt       pgtype.Timestamptz
1247
+	AddedByUserID pgtype.Int8
1248
+}
1249
+
1250
+type RepoIssueCounter struct {
1251
+	RepoID     int64
1252
+	NextNumber int64
1253
+}
1254
+
1255
+type RepoRedirect struct {
1256
+	OldOwnerUserID pgtype.Int8
1257
+	OldOwnerOrgID  pgtype.Int8
1258
+	OldName        string
1259
+	RepoID         int64
1260
+	RedirectedAt   pgtype.Timestamptz
1261
+}
1262
+
1263
+type RepoTransferRequest struct {
1264
+	ID              int64
1265
+	RepoID          int64
1266
+	FromUserID      int64
1267
+	ToPrincipalKind TransferPrincipalKind
1268
+	ToPrincipalID   int64
1269
+	CreatedBy       int64
1270
+	CreatedAt       pgtype.Timestamptz
1271
+	ExpiresAt       pgtype.Timestamptz
1272
+	Status          TransferStatus
1273
+	AcceptedAt      pgtype.Timestamptz
1274
+	DeclinedAt      pgtype.Timestamptz
1275
+	CanceledAt      pgtype.Timestamptz
1276
+}
1277
+
1278
+type ReposSearch struct {
1279
+	RepoID int64
1280
+	Tsv    interface{}
1281
+}
1282
+
1283
+type Star struct {
1284
+	UserID    int64
1285
+	RepoID    int64
1286
+	StarredAt pgtype.Timestamptz
1287
+}
1288
+
1289
+type User struct {
1290
+	ID                int64
1291
+	Username          string
1292
+	DisplayName       string
1293
+	PrimaryEmailID    pgtype.Int8
1294
+	PasswordHash      string
1295
+	PasswordAlgo      string
1296
+	PasswordUpdatedAt pgtype.Timestamptz
1297
+	EmailVerified     bool
1298
+	LastLoginAt       pgtype.Timestamptz
1299
+	SuspendedAt       pgtype.Timestamptz
1300
+	SuspendedReason   pgtype.Text
1301
+	DeletedAt         pgtype.Timestamptz
1302
+	CreatedAt         pgtype.Timestamptz
1303
+	UpdatedAt         pgtype.Timestamptz
1304
+	Bio               string
1305
+	Location          string
1306
+	Website           string
1307
+	Company           string
1308
+	Pronouns          string
1309
+	AvatarObjectKey   pgtype.Text
1310
+	Theme             string
1311
+	SessionEpoch      int32
1312
+}
1313
+
1314
+type UserEmail struct {
1315
+	ID                    int64
1316
+	UserID                int64
1317
+	Email                 string
1318
+	IsPrimary             bool
1319
+	Verified              bool
1320
+	VerificationTokenHash []byte
1321
+	VerificationSentAt    pgtype.Timestamptz
1322
+	VerifiedAt            pgtype.Timestamptz
1323
+	CreatedAt             pgtype.Timestamptz
1324
+}
1325
+
1326
+type UserNotificationPref struct {
1327
+	UserID    int64
1328
+	Key       string
1329
+	Value     []byte
1330
+	UpdatedAt pgtype.Timestamptz
1331
+}
1332
+
1333
+type UserRecoveryCode struct {
1334
+	ID          int64
1335
+	UserID      int64
1336
+	CodeHash    []byte
1337
+	UsedAt      pgtype.Timestamptz
1338
+	GeneratedAt pgtype.Timestamptz
1339
+	CreatedAt   pgtype.Timestamptz
1340
+}
1341
+
1342
+type UserSshKey struct {
1343
+	ID                int64
1344
+	UserID            int64
1345
+	Title             string
1346
+	FingerprintSha256 string
1347
+	KeyType           string
1348
+	KeyBits           int32
1349
+	PublicKey         string
1350
+	LastUsedAt        pgtype.Timestamptz
1351
+	LastUsedIp        *netip.Addr
1352
+	CreatedAt         pgtype.Timestamptz
1353
+}
1354
+
1355
+type UserToken struct {
1356
+	ID          int64
1357
+	UserID      int64
1358
+	Name        string
1359
+	TokenHash   []byte
1360
+	TokenPrefix string
1361
+	Scopes      []string
1362
+	ExpiresAt   pgtype.Timestamptz
1363
+	LastUsedAt  pgtype.Timestamptz
1364
+	LastUsedIp  *netip.Addr
1365
+	RevokedAt   pgtype.Timestamptz
1366
+	CreatedAt   pgtype.Timestamptz
1367
+}
1368
+
1369
+type UserTotp struct {
1370
+	ID              int64
1371
+	UserID          int64
1372
+	SecretEncrypted []byte
1373
+	SecretNonce     []byte
1374
+	ConfirmedAt     pgtype.Timestamptz
1375
+	LastUsedCounter int64
1376
+	CreatedAt       pgtype.Timestamptz
1377
+	UpdatedAt       pgtype.Timestamptz
1378
+}
1379
+
1380
+type UsernameRedirect struct {
1381
+	OldUsername string
1382
+	UserID      int64
1383
+	ChangedAt   pgtype.Timestamptz
1384
+}
1385
+
1386
+type UsersSearch struct {
1387
+	UserID int64
1388
+	Tsv    interface{}
1389
+}
1390
+
1391
+type Watch struct {
1392
+	UserID    int64
1393
+	RepoID    int64
1394
+	Level     WatchLevel
1395
+	UpdatedAt pgtype.Timestamptz
1396
+}
1397
+
1398
+type WebhookEventsPending struct {
1399
+	ID        int64
1400
+	RepoID    int64
1401
+	EventKind string
1402
+	Payload   []byte
1403
+	CreatedAt pgtype.Timestamptz
1404
+}
internal/notif/sqlc/notifications.sql.goadded
@@ -0,0 +1,332 @@
1
+// Code generated by sqlc. DO NOT EDIT.
2
+// versions:
3
+//   sqlc v1.31.1
4
+// source: notifications.sql
5
+
6
+package notifdb
7
+
8
+import (
9
+	"context"
10
+
11
+	"github.com/jackc/pgx/v5/pgtype"
12
+)
13
+
14
+const countNotificationsForRecipient = `-- name: CountNotificationsForRecipient :one
15
+SELECT count(*) FROM notifications
16
+WHERE recipient_user_id = $1
17
+  AND ($2::boolean = false OR unread = true)
18
+`
19
+
20
+type CountNotificationsForRecipientParams struct {
21
+	RecipientUserID int64
22
+	Column2         bool
23
+}
24
+
25
+func (q *Queries) CountNotificationsForRecipient(ctx context.Context, db DBTX, arg CountNotificationsForRecipientParams) (int64, error) {
26
+	row := db.QueryRow(ctx, countNotificationsForRecipient, arg.RecipientUserID, arg.Column2)
27
+	var count int64
28
+	err := row.Scan(&count)
29
+	return count, err
30
+}
31
+
32
+const countUnreadForRecipient = `-- name: CountUnreadForRecipient :one
33
+SELECT count(*) FROM notifications
34
+WHERE recipient_user_id = $1 AND unread = true
35
+`
36
+
37
+func (q *Queries) CountUnreadForRecipient(ctx context.Context, db DBTX, recipientUserID int64) (int64, error) {
38
+	row := db.QueryRow(ctx, countUnreadForRecipient, recipientUserID)
39
+	var count int64
40
+	err := row.Scan(&count)
41
+	return count, err
42
+}
43
+
44
+const getNotification = `-- name: GetNotification :one
45
+SELECT id, recipient_user_id, kind, reason, repo_id,
46
+       thread_kind, thread_id, source_event_id, unread,
47
+       last_event_at, last_actor_user_id, summary, created_at, updated_at
48
+FROM notifications WHERE id = $1
49
+`
50
+
51
+func (q *Queries) GetNotification(ctx context.Context, db DBTX, id int64) (Notification, error) {
52
+	row := db.QueryRow(ctx, getNotification, id)
53
+	var i Notification
54
+	err := row.Scan(
55
+		&i.ID,
56
+		&i.RecipientUserID,
57
+		&i.Kind,
58
+		&i.Reason,
59
+		&i.RepoID,
60
+		&i.ThreadKind,
61
+		&i.ThreadID,
62
+		&i.SourceEventID,
63
+		&i.Unread,
64
+		&i.LastEventAt,
65
+		&i.LastActorUserID,
66
+		&i.Summary,
67
+		&i.CreatedAt,
68
+		&i.UpdatedAt,
69
+	)
70
+	return i, err
71
+}
72
+
73
+const insertThreadlessNotification = `-- name: InsertThreadlessNotification :one
74
+INSERT INTO notifications (
75
+    recipient_user_id, kind, reason, repo_id,
76
+    source_event_id, last_actor_user_id
77
+) VALUES ($1, $2, $3, $4, $5, $6)
78
+RETURNING id, recipient_user_id, kind, reason, repo_id,
79
+          thread_kind, thread_id, source_event_id, unread,
80
+          last_event_at, last_actor_user_id, summary, created_at, updated_at
81
+`
82
+
83
+type InsertThreadlessNotificationParams struct {
84
+	RecipientUserID int64
85
+	Kind            string
86
+	Reason          string
87
+	RepoID          pgtype.Int8
88
+	SourceEventID   pgtype.Int8
89
+	LastActorUserID pgtype.Int8
90
+}
91
+
92
+// For events with no thread (e.g. repo-admin lifecycle: archived).
93
+// These don't coalesce; each fires its own row. Used sparingly.
94
+func (q *Queries) InsertThreadlessNotification(ctx context.Context, db DBTX, arg InsertThreadlessNotificationParams) (Notification, error) {
95
+	row := db.QueryRow(ctx, insertThreadlessNotification,
96
+		arg.RecipientUserID,
97
+		arg.Kind,
98
+		arg.Reason,
99
+		arg.RepoID,
100
+		arg.SourceEventID,
101
+		arg.LastActorUserID,
102
+	)
103
+	var i Notification
104
+	err := row.Scan(
105
+		&i.ID,
106
+		&i.RecipientUserID,
107
+		&i.Kind,
108
+		&i.Reason,
109
+		&i.RepoID,
110
+		&i.ThreadKind,
111
+		&i.ThreadID,
112
+		&i.SourceEventID,
113
+		&i.Unread,
114
+		&i.LastEventAt,
115
+		&i.LastActorUserID,
116
+		&i.Summary,
117
+		&i.CreatedAt,
118
+		&i.UpdatedAt,
119
+	)
120
+	return i, err
121
+}
122
+
123
+const listNotificationsForRecipient = `-- name: ListNotificationsForRecipient :many
124
+SELECT n.id, n.recipient_user_id, n.kind, n.reason, n.repo_id,
125
+       n.thread_kind, n.thread_id, n.source_event_id, n.unread,
126
+       n.last_event_at, n.last_actor_user_id, n.summary,
127
+       n.created_at, n.updated_at,
128
+       coalesce(u.username, '') AS actor_username,
129
+       coalesce(r.name, '') AS repo_name,
130
+       coalesce(ru.username, '') AS repo_owner_username,
131
+       coalesce(i.number, 0) AS thread_number,
132
+       coalesce(i.title, '') AS thread_title
133
+FROM notifications n
134
+LEFT JOIN users u  ON u.id = n.last_actor_user_id
135
+LEFT JOIN repos r  ON r.id = n.repo_id
136
+LEFT JOIN users ru ON ru.id = r.owner_user_id
137
+LEFT JOIN issues i ON i.id = n.thread_id
138
+WHERE n.recipient_user_id = $1
139
+  AND ($2::boolean = false OR n.unread = true)
140
+ORDER BY n.last_event_at DESC
141
+LIMIT $3 OFFSET $4
142
+`
143
+
144
+type ListNotificationsForRecipientParams struct {
145
+	RecipientUserID int64
146
+	Column2         bool
147
+	Limit           int32
148
+	Offset          int32
149
+}
150
+
151
+type ListNotificationsForRecipientRow struct {
152
+	ID                int64
153
+	RecipientUserID   int64
154
+	Kind              string
155
+	Reason            string
156
+	RepoID            pgtype.Int8
157
+	ThreadKind        NullNotificationThreadKind
158
+	ThreadID          pgtype.Int8
159
+	SourceEventID     pgtype.Int8
160
+	Unread            bool
161
+	LastEventAt       pgtype.Timestamptz
162
+	LastActorUserID   pgtype.Int8
163
+	Summary           []byte
164
+	CreatedAt         pgtype.Timestamptz
165
+	UpdatedAt         pgtype.Timestamptz
166
+	ActorUsername     string
167
+	RepoName          string
168
+	RepoOwnerUsername string
169
+	ThreadNumber      int64
170
+	ThreadTitle       string
171
+}
172
+
173
+// Inbox view, recency-sorted. `onlyUnread` toggles the inbox
174
+// filter ("Unread" tab vs "All").
175
+func (q *Queries) ListNotificationsForRecipient(ctx context.Context, db DBTX, arg ListNotificationsForRecipientParams) ([]ListNotificationsForRecipientRow, error) {
176
+	rows, err := db.Query(ctx, listNotificationsForRecipient,
177
+		arg.RecipientUserID,
178
+		arg.Column2,
179
+		arg.Limit,
180
+		arg.Offset,
181
+	)
182
+	if err != nil {
183
+		return nil, err
184
+	}
185
+	defer rows.Close()
186
+	items := []ListNotificationsForRecipientRow{}
187
+	for rows.Next() {
188
+		var i ListNotificationsForRecipientRow
189
+		if err := rows.Scan(
190
+			&i.ID,
191
+			&i.RecipientUserID,
192
+			&i.Kind,
193
+			&i.Reason,
194
+			&i.RepoID,
195
+			&i.ThreadKind,
196
+			&i.ThreadID,
197
+			&i.SourceEventID,
198
+			&i.Unread,
199
+			&i.LastEventAt,
200
+			&i.LastActorUserID,
201
+			&i.Summary,
202
+			&i.CreatedAt,
203
+			&i.UpdatedAt,
204
+			&i.ActorUsername,
205
+			&i.RepoName,
206
+			&i.RepoOwnerUsername,
207
+			&i.ThreadNumber,
208
+			&i.ThreadTitle,
209
+		); err != nil {
210
+			return nil, err
211
+		}
212
+		items = append(items, i)
213
+	}
214
+	if err := rows.Err(); err != nil {
215
+		return nil, err
216
+	}
217
+	return items, nil
218
+}
219
+
220
+const markAllReadForRecipient = `-- name: MarkAllReadForRecipient :exec
221
+UPDATE notifications SET unread = false, updated_at = now()
222
+WHERE recipient_user_id = $1 AND unread = true
223
+`
224
+
225
+// Bounded sweep: a single call doesn't try to update millions of
226
+// rows. Caller paginates via repeated calls when count > batch.
227
+func (q *Queries) MarkAllReadForRecipient(ctx context.Context, db DBTX, recipientUserID int64) error {
228
+	_, err := db.Exec(ctx, markAllReadForRecipient, recipientUserID)
229
+	return err
230
+}
231
+
232
+const setNotificationRead = `-- name: SetNotificationRead :exec
233
+UPDATE notifications SET unread = false, updated_at = now()
234
+WHERE id = $1 AND recipient_user_id = $2
235
+`
236
+
237
+type SetNotificationReadParams struct {
238
+	ID              int64
239
+	RecipientUserID int64
240
+}
241
+
242
+func (q *Queries) SetNotificationRead(ctx context.Context, db DBTX, arg SetNotificationReadParams) error {
243
+	_, err := db.Exec(ctx, setNotificationRead, arg.ID, arg.RecipientUserID)
244
+	return err
245
+}
246
+
247
+const setNotificationUnread = `-- name: SetNotificationUnread :exec
248
+UPDATE notifications SET unread = true, updated_at = now()
249
+WHERE id = $1 AND recipient_user_id = $2
250
+`
251
+
252
+type SetNotificationUnreadParams struct {
253
+	ID              int64
254
+	RecipientUserID int64
255
+}
256
+
257
+func (q *Queries) SetNotificationUnread(ctx context.Context, db DBTX, arg SetNotificationUnreadParams) error {
258
+	_, err := db.Exec(ctx, setNotificationUnread, arg.ID, arg.RecipientUserID)
259
+	return err
260
+}
261
+
262
+const upsertNotificationByThread = `-- name: UpsertNotificationByThread :one
263
+
264
+INSERT INTO notifications (
265
+    recipient_user_id, kind, reason, repo_id,
266
+    thread_kind, thread_id, source_event_id,
267
+    last_event_at, last_actor_user_id
268
+) VALUES (
269
+    $1, $2, $3, $4, $5, $6, $7, now(), $8
270
+)
271
+ON CONFLICT (recipient_user_id, thread_kind, thread_id) WHERE thread_id IS NOT NULL
272
+DO UPDATE SET
273
+    kind               = EXCLUDED.kind,
274
+    reason             = EXCLUDED.reason,
275
+    source_event_id    = EXCLUDED.source_event_id,
276
+    last_event_at      = now(),
277
+    last_actor_user_id = EXCLUDED.last_actor_user_id,
278
+    unread             = true,
279
+    updated_at         = now()
280
+RETURNING id, recipient_user_id, kind, reason, repo_id,
281
+          thread_kind, thread_id, source_event_id, unread,
282
+          last_event_at, last_actor_user_id, summary, created_at, updated_at
283
+`
284
+
285
+type UpsertNotificationByThreadParams struct {
286
+	RecipientUserID int64
287
+	Kind            string
288
+	Reason          string
289
+	RepoID          pgtype.Int8
290
+	ThreadKind      NullNotificationThreadKind
291
+	ThreadID        pgtype.Int8
292
+	SourceEventID   pgtype.Int8
293
+	LastActorUserID pgtype.Int8
294
+}
295
+
296
+// ─── notifications ─────────────────────────────────────────────────
297
+// Coalesce-or-insert: if a row exists for (recipient, thread), bump
298
+// last_event_at + last_actor + reason and re-flip unread=true so the
299
+// inbox surfaces it again. Otherwise insert a fresh row.
300
+//
301
+// Returns the resulting row (whether it was created or updated)
302
+// so the caller can chain an email-enqueue without a re-read.
303
+func (q *Queries) UpsertNotificationByThread(ctx context.Context, db DBTX, arg UpsertNotificationByThreadParams) (Notification, error) {
304
+	row := db.QueryRow(ctx, upsertNotificationByThread,
305
+		arg.RecipientUserID,
306
+		arg.Kind,
307
+		arg.Reason,
308
+		arg.RepoID,
309
+		arg.ThreadKind,
310
+		arg.ThreadID,
311
+		arg.SourceEventID,
312
+		arg.LastActorUserID,
313
+	)
314
+	var i Notification
315
+	err := row.Scan(
316
+		&i.ID,
317
+		&i.RecipientUserID,
318
+		&i.Kind,
319
+		&i.Reason,
320
+		&i.RepoID,
321
+		&i.ThreadKind,
322
+		&i.ThreadID,
323
+		&i.SourceEventID,
324
+		&i.Unread,
325
+		&i.LastEventAt,
326
+		&i.LastActorUserID,
327
+		&i.Summary,
328
+		&i.CreatedAt,
329
+		&i.UpdatedAt,
330
+	)
331
+	return i, err
332
+}
internal/notif/sqlc/querier.goadded
@@ -0,0 +1,74 @@
1
+// Code generated by sqlc. DO NOT EDIT.
2
+// versions:
3
+//   sqlc v1.31.1
4
+
5
+package notifdb
6
+
7
+import (
8
+	"context"
9
+)
10
+
11
+type Querier interface {
12
+	// Per-recipient absolute rate cap: how many total emails to this
13
+	// recipient in the last $2 minutes?
14
+	CountEmailsForRecipientSince(ctx context.Context, db DBTX, arg CountEmailsForRecipientSinceParams) (int64, error)
15
+	// Storm dampener probe: how many emails for this thread did we
16
+	// send to this recipient in the last $4 minutes? Caller compares
17
+	// to the cap.
18
+	CountEmailsForRecipientThreadSince(ctx context.Context, db DBTX, arg CountEmailsForRecipientThreadSinceParams) (int64, error)
19
+	CountNotificationsForRecipient(ctx context.Context, db DBTX, arg CountNotificationsForRecipientParams) (int64, error)
20
+	CountUnreadForRecipient(ctx context.Context, db DBTX, recipientUserID int64) (int64, error)
21
+	DeleteNotificationThread(ctx context.Context, db DBTX, arg DeleteNotificationThreadParams) error
22
+	// ─── domain_events_processed ──────────────────────────────────────
23
+	GetEventCursor(ctx context.Context, db DBTX, consumer string) (DomainEventsProcessed, error)
24
+	GetNotification(ctx context.Context, db DBTX, id int64) (Notification, error)
25
+	// ─── notification_threads ──────────────────────────────────────────
26
+	GetNotificationThread(ctx context.Context, db DBTX, arg GetNotificationThreadParams) (NotificationThread, error)
27
+	// ─── notification_email_log ────────────────────────────────────────
28
+	// Records an email send. Caller decides what to bind for thread_id
29
+	// (NULL for thread-less notifications). MessageID is the SMTP /
30
+	// transactional-provider message id when available; empty when
31
+	// the sender doesn't surface one.
32
+	InsertEmailLog(ctx context.Context, db DBTX, arg InsertEmailLogParams) error
33
+	// Auto-subscription path: only insert if the user has no explicit
34
+	// preference yet. Preserves user choices (e.g. an explicit
35
+	// `subscribed=false` from clicking "Unsubscribe").
36
+	InsertNotificationThreadIfAbsent(ctx context.Context, db DBTX, arg InsertNotificationThreadIfAbsentParams) error
37
+	// For events with no thread (e.g. repo-admin lifecycle: archived).
38
+	// These don't coalesce; each fires its own row. Used sparingly.
39
+	InsertThreadlessNotification(ctx context.Context, db DBTX, arg InsertThreadlessNotificationParams) (Notification, error)
40
+	// Inbox view, recency-sorted. `onlyUnread` toggles the inbox
41
+	// filter ("Unread" tab vs "All").
42
+	ListNotificationsForRecipient(ctx context.Context, db DBTX, arg ListNotificationsForRecipientParams) ([]ListNotificationsForRecipientRow, error)
43
+	// Fan-out helper: returns recipients who explicitly subscribed to a
44
+	// thread. The fan-out worker unions this with the per-repo `watches`
45
+	// result + author/assignee/reviewer rules.
46
+	ListSubscribersForThread(ctx context.Context, db DBTX, arg ListSubscribersForThreadParams) ([]ListSubscribersForThreadRow, error)
47
+	// The fan-out worker's read cursor. Bounded so a single tick
48
+	// doesn't try to drain a million-row backlog.
49
+	ListUnprocessedDomainEvents(ctx context.Context, db DBTX, arg ListUnprocessedDomainEventsParams) ([]DomainEvent, error)
50
+	// Bounded sweep: a single call doesn't try to update millions of
51
+	// rows. Caller paginates via repeated calls when count > batch.
52
+	MarkAllReadForRecipient(ctx context.Context, db DBTX, recipientUserID int64) error
53
+	// Always-write upsert so the worker doesn't have to special-case
54
+	// the missing-row branch (the migration seeds 'notify_fanout' at
55
+	// 0; future consumers like 'webhook_deliver' do the same on first
56
+	// run via this same call).
57
+	SetEventCursor(ctx context.Context, db DBTX, arg SetEventCursorParams) error
58
+	SetNotificationRead(ctx context.Context, db DBTX, arg SetNotificationReadParams) error
59
+	SetNotificationUnread(ctx context.Context, db DBTX, arg SetNotificationUnreadParams) error
60
+	// ─── notifications ─────────────────────────────────────────────────
61
+	// Coalesce-or-insert: if a row exists for (recipient, thread), bump
62
+	// last_event_at + last_actor + reason and re-flip unread=true so the
63
+	// inbox surfaces it again. Otherwise insert a fresh row.
64
+	//
65
+	// Returns the resulting row (whether it was created or updated)
66
+	// so the caller can chain an email-enqueue without a re-read.
67
+	UpsertNotificationByThread(ctx context.Context, db DBTX, arg UpsertNotificationByThreadParams) (Notification, error)
68
+	// Always-write upsert. Used by Subscribe / Unsubscribe / Ignore
69
+	// handlers and by the auto-subscription rules in the fan-out
70
+	// worker.
71
+	UpsertNotificationThread(ctx context.Context, db DBTX, arg UpsertNotificationThreadParams) error
72
+}
73
+
74
+var _ Querier = (*Queries)(nil)
internal/notif/sqlc/threads.sql.goadded
@@ -0,0 +1,152 @@
1
+// Code generated by sqlc. DO NOT EDIT.
2
+// versions:
3
+//   sqlc v1.31.1
4
+// source: threads.sql
5
+
6
+package notifdb
7
+
8
+import (
9
+	"context"
10
+)
11
+
12
+const deleteNotificationThread = `-- name: DeleteNotificationThread :exec
13
+DELETE FROM notification_threads
14
+WHERE recipient_user_id = $1 AND thread_kind = $2 AND thread_id = $3
15
+`
16
+
17
+type DeleteNotificationThreadParams struct {
18
+	RecipientUserID int64
19
+	ThreadKind      NotificationThreadKind
20
+	ThreadID        int64
21
+}
22
+
23
+func (q *Queries) DeleteNotificationThread(ctx context.Context, db DBTX, arg DeleteNotificationThreadParams) error {
24
+	_, err := db.Exec(ctx, deleteNotificationThread, arg.RecipientUserID, arg.ThreadKind, arg.ThreadID)
25
+	return err
26
+}
27
+
28
+const getNotificationThread = `-- name: GetNotificationThread :one
29
+
30
+SELECT recipient_user_id, thread_kind, thread_id, subscribed, reason, updated_at
31
+FROM notification_threads
32
+WHERE recipient_user_id = $1 AND thread_kind = $2 AND thread_id = $3
33
+`
34
+
35
+type GetNotificationThreadParams struct {
36
+	RecipientUserID int64
37
+	ThreadKind      NotificationThreadKind
38
+	ThreadID        int64
39
+}
40
+
41
+// ─── notification_threads ──────────────────────────────────────────
42
+func (q *Queries) GetNotificationThread(ctx context.Context, db DBTX, arg GetNotificationThreadParams) (NotificationThread, error) {
43
+	row := db.QueryRow(ctx, getNotificationThread, arg.RecipientUserID, arg.ThreadKind, arg.ThreadID)
44
+	var i NotificationThread
45
+	err := row.Scan(
46
+		&i.RecipientUserID,
47
+		&i.ThreadKind,
48
+		&i.ThreadID,
49
+		&i.Subscribed,
50
+		&i.Reason,
51
+		&i.UpdatedAt,
52
+	)
53
+	return i, err
54
+}
55
+
56
+const insertNotificationThreadIfAbsent = `-- name: InsertNotificationThreadIfAbsent :exec
57
+INSERT INTO notification_threads (recipient_user_id, thread_kind, thread_id, subscribed, reason)
58
+VALUES ($1, $2, $3, $4, $5)
59
+ON CONFLICT (recipient_user_id, thread_kind, thread_id) DO NOTHING
60
+`
61
+
62
+type InsertNotificationThreadIfAbsentParams struct {
63
+	RecipientUserID int64
64
+	ThreadKind      NotificationThreadKind
65
+	ThreadID        int64
66
+	Subscribed      bool
67
+	Reason          string
68
+}
69
+
70
+// Auto-subscription path: only insert if the user has no explicit
71
+// preference yet. Preserves user choices (e.g. an explicit
72
+// `subscribed=false` from clicking "Unsubscribe").
73
+func (q *Queries) InsertNotificationThreadIfAbsent(ctx context.Context, db DBTX, arg InsertNotificationThreadIfAbsentParams) error {
74
+	_, err := db.Exec(ctx, insertNotificationThreadIfAbsent,
75
+		arg.RecipientUserID,
76
+		arg.ThreadKind,
77
+		arg.ThreadID,
78
+		arg.Subscribed,
79
+		arg.Reason,
80
+	)
81
+	return err
82
+}
83
+
84
+const listSubscribersForThread = `-- name: ListSubscribersForThread :many
85
+SELECT recipient_user_id, reason
86
+FROM notification_threads
87
+WHERE thread_kind = $1 AND thread_id = $2 AND subscribed = true
88
+`
89
+
90
+type ListSubscribersForThreadParams struct {
91
+	ThreadKind NotificationThreadKind
92
+	ThreadID   int64
93
+}
94
+
95
+type ListSubscribersForThreadRow struct {
96
+	RecipientUserID int64
97
+	Reason          string
98
+}
99
+
100
+// Fan-out helper: returns recipients who explicitly subscribed to a
101
+// thread. The fan-out worker unions this with the per-repo `watches`
102
+// result + author/assignee/reviewer rules.
103
+func (q *Queries) ListSubscribersForThread(ctx context.Context, db DBTX, arg ListSubscribersForThreadParams) ([]ListSubscribersForThreadRow, error) {
104
+	rows, err := db.Query(ctx, listSubscribersForThread, arg.ThreadKind, arg.ThreadID)
105
+	if err != nil {
106
+		return nil, err
107
+	}
108
+	defer rows.Close()
109
+	items := []ListSubscribersForThreadRow{}
110
+	for rows.Next() {
111
+		var i ListSubscribersForThreadRow
112
+		if err := rows.Scan(&i.RecipientUserID, &i.Reason); err != nil {
113
+			return nil, err
114
+		}
115
+		items = append(items, i)
116
+	}
117
+	if err := rows.Err(); err != nil {
118
+		return nil, err
119
+	}
120
+	return items, nil
121
+}
122
+
123
+const upsertNotificationThread = `-- name: UpsertNotificationThread :exec
124
+INSERT INTO notification_threads (recipient_user_id, thread_kind, thread_id, subscribed, reason)
125
+VALUES ($1, $2, $3, $4, $5)
126
+ON CONFLICT (recipient_user_id, thread_kind, thread_id)
127
+DO UPDATE SET subscribed = EXCLUDED.subscribed,
128
+              reason     = EXCLUDED.reason,
129
+              updated_at = now()
130
+`
131
+
132
+type UpsertNotificationThreadParams struct {
133
+	RecipientUserID int64
134
+	ThreadKind      NotificationThreadKind
135
+	ThreadID        int64
136
+	Subscribed      bool
137
+	Reason          string
138
+}
139
+
140
+// Always-write upsert. Used by Subscribe / Unsubscribe / Ignore
141
+// handlers and by the auto-subscription rules in the fan-out
142
+// worker.
143
+func (q *Queries) UpsertNotificationThread(ctx context.Context, db DBTX, arg UpsertNotificationThreadParams) error {
144
+	_, err := db.Exec(ctx, upsertNotificationThread,
145
+		arg.RecipientUserID,
146
+		arg.ThreadKind,
147
+		arg.ThreadID,
148
+		arg.Subscribed,
149
+		arg.Reason,
150
+	)
151
+	return err
152
+}