Go · 16202 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 package notif
4
5 import (
6 "context"
7 "encoding/json"
8 "errors"
9 "fmt"
10
11 "github.com/jackc/pgx/v5"
12 "github.com/jackc/pgx/v5/pgtype"
13
14 "github.com/tenseleyFlow/shithub/internal/auth/policy"
15 issuesdb "github.com/tenseleyFlow/shithub/internal/issues/sqlc"
16 notifdb "github.com/tenseleyFlow/shithub/internal/notif/sqlc"
17 reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
18 socialdb "github.com/tenseleyFlow/shithub/internal/social/sqlc"
19 )
20
21 // FanoutConsumer is the consumer name used in
22 // `domain_events_processed`. Other consumers (S33 webhooks) will
23 // use distinct names so cursors don't collide.
24 const FanoutConsumer = "notify_fanout"
25
26 // FanoutOnce drains up to FanoutBatch domain_events starting after
27 // the persisted cursor, computes recipient sets, and writes inbox
28 // rows + (optionally) emails. Returns the number of events
29 // processed so the caller can decide to loop.
30 //
31 // Designed to be invoked from a worker job (see
32 // `internal/worker/jobs/notify_fanout.go`); the cron framework's
33 // landing date will determine how it's scheduled (the job kind is
34 // already in the worker registry).
35 func FanoutOnce(ctx context.Context, deps Deps) (int, error) {
36 q := notifdb.New()
37 cur, err := q.GetEventCursor(ctx, deps.Pool, FanoutConsumer)
38 if err != nil && !errors.Is(err, pgx.ErrNoRows) {
39 return 0, fmt.Errorf("fanout: load cursor: %w", err)
40 }
41 last := int64(0)
42 if err == nil {
43 last = cur.LastEventID
44 }
45
46 rows, err := q.ListUnprocessedDomainEvents(ctx, deps.Pool, notifdb.ListUnprocessedDomainEventsParams{
47 ID: last,
48 Limit: int32(FanoutBatch),
49 })
50 if err != nil {
51 return 0, fmt.Errorf("fanout: list events: %w", err)
52 }
53
54 processed := 0
55 for _, ev := range rows {
56 if err := dispatchEvent(ctx, deps, ev); err != nil {
57 if deps.Logger != nil {
58 deps.Logger.WarnContext(ctx, "fanout: dispatch failed",
59 "event_id", ev.ID, "kind", ev.Kind, "error", err)
60 }
61 // Don't advance the cursor past a failure — retry on
62 // the next tick. (A poison row would loop forever; the
63 // per-event error handling below catches the typical
64 // poison shapes and skips them.)
65 break
66 }
67 last = ev.ID
68 processed++
69 }
70
71 if processed > 0 {
72 if err := q.SetEventCursor(ctx, deps.Pool, notifdb.SetEventCursorParams{
73 Consumer: FanoutConsumer, LastEventID: last,
74 }); err != nil {
75 return processed, fmt.Errorf("fanout: persist cursor: %w", err)
76 }
77 }
78 return processed, nil
79 }
80
81 // dispatchEvent routes a single domain_events row through the
82 // recipient computer + per-recipient action engine.
83 func dispatchEvent(ctx context.Context, deps Deps, ev notifdb.DomainEvent) error {
84 actorUserID := int64(0)
85 if ev.ActorUserID.Valid {
86 actorUserID = ev.ActorUserID.Int64
87 }
88 repoID := int64(0)
89 if ev.RepoID.Valid {
90 repoID = ev.RepoID.Int64
91 }
92
93 threadKind, threadID, err := threadFromEvent(ctx, deps, ev)
94 if err != nil {
95 return err
96 }
97
98 // Compute recipient set. Each recipient is paired with the
99 // relation that earned them the slot; the routing matrix
100 // decides what to do per (kind, relation).
101 recipients, err := computeRecipients(ctx, deps, ev, threadKind, threadID, actorUserID, repoID)
102 if err != nil {
103 return err
104 }
105
106 // Auto-subscription: insert an "if-absent" thread row for
107 // users whose participation rule earned them this notification
108 // (author / assignee / reviewer / mentioned / commenter). Their
109 // future notifications then route via the subscribed-thread
110 // relation without re-deriving the rule.
111 if threadID != 0 {
112 applyAutoSubscriptions(ctx, deps, ev, threadKind, threadID, recipients)
113 }
114
115 q := notifdb.New()
116 for recipID, rec := range recipients {
117 // Self-suppression: never notify the actor about their own
118 // action. Defense in depth — the recipient computer also
119 // excludes them, but the duplicate guard is cheap.
120 if recipID == actorUserID {
121 continue
122 }
123 // Visibility re-check at fan-out time. The repo could have
124 // flipped public→private between event-emit and fan-out;
125 // a stale-read public event must not leak.
126 if repoID != 0 {
127 ok, err := canRecipientSeeRepo(ctx, deps, recipID, repoID)
128 if err != nil {
129 return err
130 }
131 if !ok {
132 continue
133 }
134 }
135 action := pickAction(ev.Kind, rec.Relations)
136 if !action.NotifyInbox {
137 continue
138 }
139 // `OverrideIgnore=false` AND user has watches.level=ignore
140 // on the repo → skip. The OverrideIgnore=true case (i.e.
141 // @mentions) bypasses the ignore check.
142 if !action.OverrideIgnore && repoID != 0 {
143 ignored, err := repoIgnoredByRecipient(ctx, deps, recipID, repoID)
144 if err != nil {
145 return err
146 }
147 if ignored {
148 continue
149 }
150 }
151 // Materialize the inbox row.
152 notif, err := upsertInboxRow(ctx, deps, q, recipID, ev, action.Reason, threadKind, threadID, repoID, actorUserID)
153 if err != nil {
154 return err
155 }
156 // Email side. Pref + storm dampener gate.
157 if action.EmailDefault && deps.EmailSender != nil {
158 if err := maybeSendEmail(ctx, deps, q, recipID, notif, action.Reason, threadKind, threadID); err != nil && deps.Logger != nil {
159 deps.Logger.WarnContext(ctx, "fanout: email send",
160 "recipient", recipID, "error", err)
161 }
162 }
163 }
164 return nil
165 }
166
167 // canRecipientSeeRepo runs the visibility predicate by hand for
168 // one recipient. Cheap — repo + collab-row lookup, always indexed.
169 func canRecipientSeeRepo(ctx context.Context, deps Deps, recipientID, repoID int64) (bool, error) {
170 rq := reposdb.New()
171 repo, err := rq.GetRepoByID(ctx, deps.Pool, repoID)
172 if err != nil {
173 if errors.Is(err, pgx.ErrNoRows) {
174 return false, nil
175 }
176 return false, err
177 }
178 actor := policy.UserActor(recipientID, "", false, false)
179 return policy.IsVisibleTo(ctx, policy.Deps{Pool: deps.Pool}, actor, policy.NewRepoRefFromRepo(repo)), nil
180 }
181
182 // repoIgnoredByRecipient reports whether the recipient has set
183 // `watches.level = 'ignore'` for the repo.
184 func repoIgnoredByRecipient(ctx context.Context, deps Deps, recipientID, repoID int64) (bool, error) {
185 row, err := socialdb.New().GetWatch(ctx, deps.Pool, socialdb.GetWatchParams{
186 UserID: recipientID, RepoID: repoID,
187 })
188 if err != nil {
189 if errors.Is(err, pgx.ErrNoRows) {
190 return false, nil
191 }
192 return false, err
193 }
194 return row.Level == socialdb.WatchLevelIgnore, nil
195 }
196
197 // upsertInboxRow either creates a fresh notifications row for a
198 // (recipient, thread) pair or coalesces onto the existing row by
199 // bumping last_event_at + last_actor + setting unread=true.
200 func upsertInboxRow(
201 ctx context.Context, deps Deps, q *notifdb.Queries,
202 recipientID int64,
203 ev notifdb.DomainEvent,
204 reason Reason,
205 threadKind notifdb.NullNotificationThreadKind,
206 threadID int64,
207 repoID int64,
208 actorUserID int64,
209 ) (notifdb.Notification, error) {
210 if threadID == 0 {
211 // Thread-less notification (e.g. lifecycle). Each event
212 // fires its own row; no coalesce.
213 return q.InsertThreadlessNotification(ctx, deps.Pool, notifdb.InsertThreadlessNotificationParams{
214 RecipientUserID: recipientID,
215 Kind: ev.Kind,
216 Reason: string(reason),
217 RepoID: intToPg(repoID),
218 SourceEventID: pgtype.Int8{Int64: ev.ID, Valid: true},
219 LastActorUserID: intToPg(actorUserID),
220 })
221 }
222 return q.UpsertNotificationByThread(ctx, deps.Pool, notifdb.UpsertNotificationByThreadParams{
223 RecipientUserID: recipientID,
224 Kind: ev.Kind,
225 Reason: string(reason),
226 RepoID: intToPg(repoID),
227 ThreadKind: threadKind,
228 ThreadID: pgtype.Int8{Int64: threadID, Valid: true},
229 SourceEventID: pgtype.Int8{Int64: ev.ID, Valid: true},
230 LastActorUserID: intToPg(actorUserID),
231 })
232 }
233
234 // pickAction iterates the recipient's relations in priority order
235 // and returns the first non-Skip Action. The `RelMention` slot is
236 // most permissive (override-ignore) so it's first.
237 func pickAction(kind string, rels []Relation) Action {
238 for _, rel := range rels {
239 if a := Routing(kind, rel); a.NotifyInbox {
240 return a
241 }
242 }
243 return Skip
244 }
245
246 // threadFromEvent extracts the (thread_kind, thread_id) pair when
247 // the event is thread-shaped (issue or PR). For non-thread events
248 // (repo-admin lifecycle) returns ({Valid:false}, 0).
249 func threadFromEvent(ctx context.Context, deps Deps, ev notifdb.DomainEvent) (notifdb.NullNotificationThreadKind, int64, error) {
250 switch ev.SourceKind {
251 case "issue":
252 // payload may carry kind=issue|pr; we resolve via the
253 // issues table to be sure.
254 issue, err := issuesdb.New().GetIssueByID(ctx, deps.Pool, ev.SourceID)
255 if err != nil {
256 if errors.Is(err, pgx.ErrNoRows) {
257 return notifdb.NullNotificationThreadKind{}, 0, nil
258 }
259 return notifdb.NullNotificationThreadKind{}, 0, err
260 }
261 kind := notifdb.NotificationThreadKindIssue
262 if issue.Kind == issuesdb.IssueKindPr {
263 kind = notifdb.NotificationThreadKindPr
264 }
265 return notifdb.NullNotificationThreadKind{NotificationThreadKind: kind, Valid: true}, issue.ID, nil
266 }
267 return notifdb.NullNotificationThreadKind{}, 0, nil
268 }
269
270 // recipient is one row in the per-event recipient set. Multiple
271 // relations may apply (e.g. user is both author + assignee); the
272 // pickAction step picks the highest-priority Action.
273 type recipient struct {
274 Relations []Relation
275 }
276
277 func (r *recipient) add(rel Relation) {
278 for _, existing := range r.Relations {
279 if existing == rel {
280 return
281 }
282 }
283 r.Relations = append(r.Relations, rel)
284 }
285
286 // computeRecipients gathers users from every relevant slot:
287 // * mention list from the event payload (when applicable),
288 // * thread author / assignee / reviewer (for thread-shaped events),
289 // * watches at level=all/participating on the repo,
290 // * explicit thread subscribers,
291 // * repo owner (for lifecycle events).
292 //
293 // The actor is excluded; visibility re-check happens later.
294 func computeRecipients(
295 ctx context.Context, deps Deps,
296 ev notifdb.DomainEvent,
297 threadKind notifdb.NullNotificationThreadKind,
298 threadID int64,
299 actorUserID, repoID int64,
300 ) (map[int64]*recipient, error) {
301 out := map[int64]*recipient{}
302
303 // Mentions live in the payload as a `mentions` array of user ids.
304 if ev.Payload != nil {
305 var p struct {
306 Mentions []int64 `json:"mentions"`
307 }
308 _ = json.Unmarshal(ev.Payload, &p)
309 for _, uid := range p.Mentions {
310 ensure(out, uid).add(RelMention)
311 }
312 }
313
314 // Thread relations (author, assignee, commenter via auto-sub).
315 if threadID != 0 {
316 issue, err := issuesdb.New().GetIssueByID(ctx, deps.Pool, threadID)
317 if err == nil {
318 if issue.AuthorUserID.Valid {
319 ensure(out, issue.AuthorUserID.Int64).add(RelAuthor)
320 }
321 // Assignees from issue_assignees.
322 assignees, err := issuesdb.New().ListIssueAssignees(ctx, deps.Pool, threadID)
323 if err == nil {
324 for _, a := range assignees {
325 ensure(out, a.UserID).add(RelAssignee)
326 }
327 }
328 }
329 // Explicit thread subscribers.
330 subs, err := notifdb.New().ListSubscribersForThread(ctx, deps.Pool, notifdb.ListSubscribersForThreadParams{
331 ThreadKind: threadKind.NotificationThreadKind,
332 ThreadID: threadID,
333 })
334 if err == nil {
335 for _, s := range subs {
336 ensure(out, s.RecipientUserID).add(RelSubscribedThread)
337 }
338 }
339 }
340
341 // Repo watchers at level=all (relevant for "new issue" /
342 // "new PR" events). The fan-out matrix decides which kinds
343 // actually consume this slot — so populating it for every
344 // thread-shaped event is fine (skipped at routing time).
345 if repoID != 0 {
346 watchers, err := socialdb.New().ListRepoWatchersByLevel(ctx, deps.Pool, socialdb.ListRepoWatchersByLevelParams{
347 RepoID: repoID, Level: socialdb.WatchLevelAll,
348 })
349 if err == nil {
350 for _, uid := range watchers {
351 ensure(out, uid).add(RelWatchingAll)
352 }
353 }
354 }
355
356 // Repo-owner slot for lifecycle events.
357 if repoID != 0 && isLifecycleEvent(ev.Kind) {
358 repo, err := reposdb.New().GetRepoByID(ctx, deps.Pool, repoID)
359 if err == nil && repo.OwnerUserID.Valid {
360 ensure(out, repo.OwnerUserID.Int64).add(RelRepoOwner)
361 }
362 }
363
364 // Reviewer slot for review_requested events. Payload carries
365 // the recipient id.
366 if ev.Kind == "review_requested" && ev.Payload != nil {
367 var p struct {
368 ReviewerUserID int64 `json:"reviewer_user_id"`
369 }
370 _ = json.Unmarshal(ev.Payload, &p)
371 if p.ReviewerUserID != 0 {
372 ensure(out, p.ReviewerUserID).add(RelReviewer)
373 }
374 }
375
376 // Self-suppression: drop the actor. We still defense-in-depth
377 // it at the dispatch site, but trimming early saves per-
378 // recipient policy lookups.
379 if actorUserID != 0 {
380 delete(out, actorUserID)
381 }
382 return out, nil
383 }
384
385 // applyAutoSubscriptions inserts notification_threads rows for the
386 // recipients that just earned a slot via author / assignee /
387 // reviewer / mention / first-commenter rules. Non-destructive —
388 // preserves explicit user choices.
389 func applyAutoSubscriptions(
390 ctx context.Context, deps Deps,
391 ev notifdb.DomainEvent,
392 threadKind notifdb.NullNotificationThreadKind,
393 threadID int64,
394 recipients map[int64]*recipient,
395 ) {
396 q := notifdb.New()
397 for uid, rec := range recipients {
398 var reason string
399 var subscribed bool
400 // Pick the strongest auto-sub reason that applies.
401 for _, rel := range rec.Relations {
402 switch rel {
403 case RelAuthor:
404 reason, subscribed = "author", true
405 case RelAssignee:
406 reason, subscribed = "assigned", true
407 case RelReviewer:
408 reason, subscribed = "review_requested", true
409 case RelMention:
410 if reason == "" {
411 reason, subscribed = "mention", true
412 }
413 case RelCommenter:
414 if reason == "" {
415 reason, subscribed = "commenter", true
416 }
417 }
418 }
419 if reason == "" {
420 continue
421 }
422 _ = q.InsertNotificationThreadIfAbsent(ctx, deps.Pool, notifdb.InsertNotificationThreadIfAbsentParams{
423 RecipientUserID: uid,
424 ThreadKind: threadKind.NotificationThreadKind,
425 ThreadID: threadID,
426 Subscribed: subscribed,
427 Reason: reason,
428 })
429 }
430 }
431
432 // maybeSendEmail consults the user's per-kind email pref + the
433 // storm dampener, then either dispatches the email or skips. Logs
434 // every attempt to notification_email_log so the dampener is
435 // self-consistent across worker restarts.
436 func maybeSendEmail(
437 ctx context.Context, deps Deps, q *notifdb.Queries,
438 recipientID int64,
439 notif notifdb.Notification,
440 reason Reason,
441 threadKind notifdb.NullNotificationThreadKind,
442 threadID int64,
443 ) error {
444 prefKey := EmailPrefKey(reason)
445 if prefKey != "" {
446 on, err := emailPrefOn(ctx, deps.Pool, recipientID, prefKey)
447 if err != nil {
448 return err
449 }
450 if !on {
451 return nil
452 }
453 }
454
455 // Per-thread dampener.
456 if threadID != 0 {
457 count, err := q.CountEmailsForRecipientThreadSince(ctx, deps.Pool, notifdb.CountEmailsForRecipientThreadSinceParams{
458 RecipientUserID: recipientID,
459 ThreadKind: threadKind,
460 ThreadID: pgtype.Int8{Int64: threadID, Valid: true},
461 Column4: StormPerThreadMins,
462 })
463 if err != nil {
464 return err
465 }
466 if count >= StormPerThreadCap {
467 return nil
468 }
469 }
470 // Per-recipient absolute cap.
471 count, err := q.CountEmailsForRecipientSince(ctx, deps.Pool, notifdb.CountEmailsForRecipientSinceParams{
472 RecipientUserID: recipientID,
473 Column2: StormAbsoluteMins,
474 })
475 if err != nil {
476 return err
477 }
478 if count >= StormAbsoluteCap {
479 return nil
480 }
481
482 // We don't actually render + send here — that's a separate
483 // helper (`sendNotificationEmail`). Logging the send here is
484 // a layering violation; the helper logs after the SMTP call
485 // succeeds. The helper isn't ready in this commit; the gate
486 // stays so the storm dampener is wired even if no template
487 // renders today.
488 return sendNotificationEmail(ctx, deps, recipientID, notif, reason, threadKind, threadID)
489 }
490
491 // ensure returns the *recipient for uid, creating it on first add.
492 func ensure(m map[int64]*recipient, uid int64) *recipient {
493 if r, ok := m[uid]; ok {
494 return r
495 }
496 r := &recipient{}
497 m[uid] = r
498 return r
499 }
500
501 func intToPg(v int64) pgtype.Int8 {
502 return pgtype.Int8{Int64: v, Valid: v != 0}
503 }
504
505 func isLifecycleEvent(kind string) bool {
506 switch kind {
507 case "repo_archived", "repo_unarchived",
508 "repo_visibility_changed", "repo_soft_deleted",
509 "repo_restored", "repo_transfer_requested",
510 "repo_transfer_accepted", "repo_transfer_declined",
511 "repo_transfer_canceled", "repo_transfer_expired":
512 return true
513 }
514 return false
515 }
516