| 1 | // SPDX-License-Identifier: AGPL-3.0-or-later |
| 2 | |
| 3 | package notif |
| 4 | |
| 5 | import ( |
| 6 | "context" |
| 7 | "encoding/json" |
| 8 | "errors" |
| 9 | "fmt" |
| 10 | |
| 11 | "github.com/jackc/pgx/v5" |
| 12 | "github.com/jackc/pgx/v5/pgtype" |
| 13 | |
| 14 | "github.com/tenseleyFlow/shithub/internal/auth/policy" |
| 15 | issuesdb "github.com/tenseleyFlow/shithub/internal/issues/sqlc" |
| 16 | notifdb "github.com/tenseleyFlow/shithub/internal/notif/sqlc" |
| 17 | reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc" |
| 18 | socialdb "github.com/tenseleyFlow/shithub/internal/social/sqlc" |
| 19 | ) |
| 20 | |
| 21 | // FanoutConsumer is the consumer name used in |
| 22 | // `domain_events_processed`. Other consumers (S33 webhooks) will |
| 23 | // use distinct names so cursors don't collide. |
| 24 | const FanoutConsumer = "notify_fanout" |
| 25 | |
| 26 | // FanoutOnce drains up to FanoutBatch domain_events starting after |
| 27 | // the persisted cursor, computes recipient sets, and writes inbox |
| 28 | // rows + (optionally) emails. Returns the number of events |
| 29 | // processed so the caller can decide to loop. |
| 30 | // |
| 31 | // Designed to be invoked from a worker job (see |
| 32 | // `internal/worker/jobs/notify_fanout.go`); the cron framework's |
| 33 | // landing date will determine how it's scheduled (the job kind is |
| 34 | // already in the worker registry). |
| 35 | func FanoutOnce(ctx context.Context, deps Deps) (int, error) { |
| 36 | q := notifdb.New() |
| 37 | cur, err := q.GetEventCursor(ctx, deps.Pool, FanoutConsumer) |
| 38 | if err != nil && !errors.Is(err, pgx.ErrNoRows) { |
| 39 | return 0, fmt.Errorf("fanout: load cursor: %w", err) |
| 40 | } |
| 41 | last := int64(0) |
| 42 | if err == nil { |
| 43 | last = cur.LastEventID |
| 44 | } |
| 45 | |
| 46 | rows, err := q.ListUnprocessedDomainEvents(ctx, deps.Pool, notifdb.ListUnprocessedDomainEventsParams{ |
| 47 | ID: last, |
| 48 | Limit: int32(FanoutBatch), |
| 49 | }) |
| 50 | if err != nil { |
| 51 | return 0, fmt.Errorf("fanout: list events: %w", err) |
| 52 | } |
| 53 | |
| 54 | processed := 0 |
| 55 | for _, ev := range rows { |
| 56 | if err := dispatchEvent(ctx, deps, ev); err != nil { |
| 57 | if deps.Logger != nil { |
| 58 | deps.Logger.WarnContext(ctx, "fanout: dispatch failed", |
| 59 | "event_id", ev.ID, "kind", ev.Kind, "error", err) |
| 60 | } |
| 61 | // Don't advance the cursor past a failure — retry on |
| 62 | // the next tick. (A poison row would loop forever; the |
| 63 | // per-event error handling below catches the typical |
| 64 | // poison shapes and skips them.) |
| 65 | break |
| 66 | } |
| 67 | last = ev.ID |
| 68 | processed++ |
| 69 | } |
| 70 | |
| 71 | if processed > 0 { |
| 72 | if err := q.SetEventCursor(ctx, deps.Pool, notifdb.SetEventCursorParams{ |
| 73 | Consumer: FanoutConsumer, LastEventID: last, |
| 74 | }); err != nil { |
| 75 | return processed, fmt.Errorf("fanout: persist cursor: %w", err) |
| 76 | } |
| 77 | } |
| 78 | return processed, nil |
| 79 | } |
| 80 | |
| 81 | // dispatchEvent routes a single domain_events row through the |
| 82 | // recipient computer + per-recipient action engine. |
| 83 | func dispatchEvent(ctx context.Context, deps Deps, ev notifdb.DomainEvent) error { |
| 84 | actorUserID := int64(0) |
| 85 | if ev.ActorUserID.Valid { |
| 86 | actorUserID = ev.ActorUserID.Int64 |
| 87 | } |
| 88 | repoID := int64(0) |
| 89 | if ev.RepoID.Valid { |
| 90 | repoID = ev.RepoID.Int64 |
| 91 | } |
| 92 | |
| 93 | threadKind, threadID, err := threadFromEvent(ctx, deps, ev) |
| 94 | if err != nil { |
| 95 | return err |
| 96 | } |
| 97 | |
| 98 | // Compute recipient set. Each recipient is paired with the |
| 99 | // relation that earned them the slot; the routing matrix |
| 100 | // decides what to do per (kind, relation). |
| 101 | recipients, err := computeRecipients(ctx, deps, ev, threadKind, threadID, actorUserID, repoID) |
| 102 | if err != nil { |
| 103 | return err |
| 104 | } |
| 105 | |
| 106 | // Auto-subscription: insert an "if-absent" thread row for |
| 107 | // users whose participation rule earned them this notification |
| 108 | // (author / assignee / reviewer / mentioned / commenter). Their |
| 109 | // future notifications then route via the subscribed-thread |
| 110 | // relation without re-deriving the rule. |
| 111 | if threadID != 0 { |
| 112 | applyAutoSubscriptions(ctx, deps, ev, threadKind, threadID, recipients) |
| 113 | } |
| 114 | |
| 115 | q := notifdb.New() |
| 116 | for recipID, rec := range recipients { |
| 117 | // Self-suppression: never notify the actor about their own |
| 118 | // action. Defense in depth — the recipient computer also |
| 119 | // excludes them, but the duplicate guard is cheap. |
| 120 | if recipID == actorUserID { |
| 121 | continue |
| 122 | } |
| 123 | // Visibility re-check at fan-out time. The repo could have |
| 124 | // flipped public→private between event-emit and fan-out; |
| 125 | // a stale-read public event must not leak. |
| 126 | if repoID != 0 { |
| 127 | ok, err := canRecipientSeeRepo(ctx, deps, recipID, repoID) |
| 128 | if err != nil { |
| 129 | return err |
| 130 | } |
| 131 | if !ok { |
| 132 | continue |
| 133 | } |
| 134 | } |
| 135 | action := pickAction(ev.Kind, rec.Relations) |
| 136 | if !action.NotifyInbox { |
| 137 | continue |
| 138 | } |
| 139 | // `OverrideIgnore=false` AND user has watches.level=ignore |
| 140 | // on the repo → skip. The OverrideIgnore=true case (i.e. |
| 141 | // @mentions) bypasses the ignore check. |
| 142 | if !action.OverrideIgnore && repoID != 0 { |
| 143 | ignored, err := repoIgnoredByRecipient(ctx, deps, recipID, repoID) |
| 144 | if err != nil { |
| 145 | return err |
| 146 | } |
| 147 | if ignored { |
| 148 | continue |
| 149 | } |
| 150 | } |
| 151 | // Materialize the inbox row. |
| 152 | notif, err := upsertInboxRow(ctx, deps, q, recipID, ev, action.Reason, threadKind, threadID, repoID, actorUserID) |
| 153 | if err != nil { |
| 154 | return err |
| 155 | } |
| 156 | // Email side. Pref + storm dampener gate. |
| 157 | if action.EmailDefault && deps.EmailSender != nil { |
| 158 | if err := maybeSendEmail(ctx, deps, q, recipID, notif, action.Reason, threadKind, threadID); err != nil && deps.Logger != nil { |
| 159 | deps.Logger.WarnContext(ctx, "fanout: email send", |
| 160 | "recipient", recipID, "error", err) |
| 161 | } |
| 162 | } |
| 163 | } |
| 164 | return nil |
| 165 | } |
| 166 | |
| 167 | // canRecipientSeeRepo runs the visibility predicate by hand for |
| 168 | // one recipient. Cheap — repo + collab-row lookup, always indexed. |
| 169 | func canRecipientSeeRepo(ctx context.Context, deps Deps, recipientID, repoID int64) (bool, error) { |
| 170 | rq := reposdb.New() |
| 171 | repo, err := rq.GetRepoByID(ctx, deps.Pool, repoID) |
| 172 | if err != nil { |
| 173 | if errors.Is(err, pgx.ErrNoRows) { |
| 174 | return false, nil |
| 175 | } |
| 176 | return false, err |
| 177 | } |
| 178 | actor := policy.UserActor(recipientID, "", false, false) |
| 179 | return policy.IsVisibleTo(ctx, policy.Deps{Pool: deps.Pool}, actor, policy.NewRepoRefFromRepo(repo)), nil |
| 180 | } |
| 181 | |
| 182 | // repoIgnoredByRecipient reports whether the recipient has set |
| 183 | // `watches.level = 'ignore'` for the repo. |
| 184 | func repoIgnoredByRecipient(ctx context.Context, deps Deps, recipientID, repoID int64) (bool, error) { |
| 185 | row, err := socialdb.New().GetWatch(ctx, deps.Pool, socialdb.GetWatchParams{ |
| 186 | UserID: recipientID, RepoID: repoID, |
| 187 | }) |
| 188 | if err != nil { |
| 189 | if errors.Is(err, pgx.ErrNoRows) { |
| 190 | return false, nil |
| 191 | } |
| 192 | return false, err |
| 193 | } |
| 194 | return row.Level == socialdb.WatchLevelIgnore, nil |
| 195 | } |
| 196 | |
| 197 | // upsertInboxRow either creates a fresh notifications row for a |
| 198 | // (recipient, thread) pair or coalesces onto the existing row by |
| 199 | // bumping last_event_at + last_actor + setting unread=true. |
| 200 | func upsertInboxRow( |
| 201 | ctx context.Context, deps Deps, q *notifdb.Queries, |
| 202 | recipientID int64, |
| 203 | ev notifdb.DomainEvent, |
| 204 | reason Reason, |
| 205 | threadKind notifdb.NullNotificationThreadKind, |
| 206 | threadID int64, |
| 207 | repoID int64, |
| 208 | actorUserID int64, |
| 209 | ) (notifdb.Notification, error) { |
| 210 | if threadID == 0 { |
| 211 | // Thread-less notification (e.g. lifecycle). Each event |
| 212 | // fires its own row; no coalesce. |
| 213 | return q.InsertThreadlessNotification(ctx, deps.Pool, notifdb.InsertThreadlessNotificationParams{ |
| 214 | RecipientUserID: recipientID, |
| 215 | Kind: ev.Kind, |
| 216 | Reason: string(reason), |
| 217 | RepoID: intToPg(repoID), |
| 218 | SourceEventID: pgtype.Int8{Int64: ev.ID, Valid: true}, |
| 219 | LastActorUserID: intToPg(actorUserID), |
| 220 | }) |
| 221 | } |
| 222 | return q.UpsertNotificationByThread(ctx, deps.Pool, notifdb.UpsertNotificationByThreadParams{ |
| 223 | RecipientUserID: recipientID, |
| 224 | Kind: ev.Kind, |
| 225 | Reason: string(reason), |
| 226 | RepoID: intToPg(repoID), |
| 227 | ThreadKind: threadKind, |
| 228 | ThreadID: pgtype.Int8{Int64: threadID, Valid: true}, |
| 229 | SourceEventID: pgtype.Int8{Int64: ev.ID, Valid: true}, |
| 230 | LastActorUserID: intToPg(actorUserID), |
| 231 | }) |
| 232 | } |
| 233 | |
| 234 | // pickAction iterates the recipient's relations in priority order |
| 235 | // and returns the first non-Skip Action. The `RelMention` slot is |
| 236 | // most permissive (override-ignore) so it's first. |
| 237 | func pickAction(kind string, rels []Relation) Action { |
| 238 | for _, rel := range rels { |
| 239 | if a := Routing(kind, rel); a.NotifyInbox { |
| 240 | return a |
| 241 | } |
| 242 | } |
| 243 | return Skip |
| 244 | } |
| 245 | |
| 246 | // threadFromEvent extracts the (thread_kind, thread_id) pair when |
| 247 | // the event is thread-shaped (issue or PR). For non-thread events |
| 248 | // (repo-admin lifecycle) returns ({Valid:false}, 0). |
| 249 | func threadFromEvent(ctx context.Context, deps Deps, ev notifdb.DomainEvent) (notifdb.NullNotificationThreadKind, int64, error) { |
| 250 | switch ev.SourceKind { |
| 251 | case "issue": |
| 252 | // payload may carry kind=issue|pr; we resolve via the |
| 253 | // issues table to be sure. |
| 254 | issue, err := issuesdb.New().GetIssueByID(ctx, deps.Pool, ev.SourceID) |
| 255 | if err != nil { |
| 256 | if errors.Is(err, pgx.ErrNoRows) { |
| 257 | return notifdb.NullNotificationThreadKind{}, 0, nil |
| 258 | } |
| 259 | return notifdb.NullNotificationThreadKind{}, 0, err |
| 260 | } |
| 261 | kind := notifdb.NotificationThreadKindIssue |
| 262 | if issue.Kind == issuesdb.IssueKindPr { |
| 263 | kind = notifdb.NotificationThreadKindPr |
| 264 | } |
| 265 | return notifdb.NullNotificationThreadKind{NotificationThreadKind: kind, Valid: true}, issue.ID, nil |
| 266 | } |
| 267 | return notifdb.NullNotificationThreadKind{}, 0, nil |
| 268 | } |
| 269 | |
| 270 | // recipient is one row in the per-event recipient set. Multiple |
| 271 | // relations may apply (e.g. user is both author + assignee); the |
| 272 | // pickAction step picks the highest-priority Action. |
| 273 | type recipient struct { |
| 274 | Relations []Relation |
| 275 | } |
| 276 | |
| 277 | func (r *recipient) add(rel Relation) { |
| 278 | for _, existing := range r.Relations { |
| 279 | if existing == rel { |
| 280 | return |
| 281 | } |
| 282 | } |
| 283 | r.Relations = append(r.Relations, rel) |
| 284 | } |
| 285 | |
| 286 | // computeRecipients gathers users from every relevant slot: |
| 287 | // * mention list from the event payload (when applicable), |
| 288 | // * thread author / assignee / reviewer (for thread-shaped events), |
| 289 | // * watches at level=all/participating on the repo, |
| 290 | // * explicit thread subscribers, |
| 291 | // * repo owner (for lifecycle events). |
| 292 | // |
| 293 | // The actor is excluded; visibility re-check happens later. |
| 294 | func computeRecipients( |
| 295 | ctx context.Context, deps Deps, |
| 296 | ev notifdb.DomainEvent, |
| 297 | threadKind notifdb.NullNotificationThreadKind, |
| 298 | threadID int64, |
| 299 | actorUserID, repoID int64, |
| 300 | ) (map[int64]*recipient, error) { |
| 301 | out := map[int64]*recipient{} |
| 302 | |
| 303 | // Mentions live in the payload as a `mentions` array of user ids. |
| 304 | if ev.Payload != nil { |
| 305 | var p struct { |
| 306 | Mentions []int64 `json:"mentions"` |
| 307 | } |
| 308 | _ = json.Unmarshal(ev.Payload, &p) |
| 309 | for _, uid := range p.Mentions { |
| 310 | ensure(out, uid).add(RelMention) |
| 311 | } |
| 312 | } |
| 313 | |
| 314 | // Thread relations (author, assignee, commenter via auto-sub). |
| 315 | if threadID != 0 { |
| 316 | issue, err := issuesdb.New().GetIssueByID(ctx, deps.Pool, threadID) |
| 317 | if err == nil { |
| 318 | if issue.AuthorUserID.Valid { |
| 319 | ensure(out, issue.AuthorUserID.Int64).add(RelAuthor) |
| 320 | } |
| 321 | // Assignees from issue_assignees. |
| 322 | assignees, err := issuesdb.New().ListIssueAssignees(ctx, deps.Pool, threadID) |
| 323 | if err == nil { |
| 324 | for _, a := range assignees { |
| 325 | ensure(out, a.UserID).add(RelAssignee) |
| 326 | } |
| 327 | } |
| 328 | } |
| 329 | // Explicit thread subscribers. |
| 330 | subs, err := notifdb.New().ListSubscribersForThread(ctx, deps.Pool, notifdb.ListSubscribersForThreadParams{ |
| 331 | ThreadKind: threadKind.NotificationThreadKind, |
| 332 | ThreadID: threadID, |
| 333 | }) |
| 334 | if err == nil { |
| 335 | for _, s := range subs { |
| 336 | ensure(out, s.RecipientUserID).add(RelSubscribedThread) |
| 337 | } |
| 338 | } |
| 339 | } |
| 340 | |
| 341 | // Repo watchers at level=all (relevant for "new issue" / |
| 342 | // "new PR" events). The fan-out matrix decides which kinds |
| 343 | // actually consume this slot — so populating it for every |
| 344 | // thread-shaped event is fine (skipped at routing time). |
| 345 | if repoID != 0 { |
| 346 | watchers, err := socialdb.New().ListRepoWatchersByLevel(ctx, deps.Pool, socialdb.ListRepoWatchersByLevelParams{ |
| 347 | RepoID: repoID, Level: socialdb.WatchLevelAll, |
| 348 | }) |
| 349 | if err == nil { |
| 350 | for _, uid := range watchers { |
| 351 | ensure(out, uid).add(RelWatchingAll) |
| 352 | } |
| 353 | } |
| 354 | } |
| 355 | |
| 356 | // Repo-owner slot for lifecycle events. |
| 357 | if repoID != 0 && isLifecycleEvent(ev.Kind) { |
| 358 | repo, err := reposdb.New().GetRepoByID(ctx, deps.Pool, repoID) |
| 359 | if err == nil && repo.OwnerUserID.Valid { |
| 360 | ensure(out, repo.OwnerUserID.Int64).add(RelRepoOwner) |
| 361 | } |
| 362 | } |
| 363 | |
| 364 | // Reviewer slot for review_requested events. Payload carries |
| 365 | // the recipient id. |
| 366 | if ev.Kind == "review_requested" && ev.Payload != nil { |
| 367 | var p struct { |
| 368 | ReviewerUserID int64 `json:"reviewer_user_id"` |
| 369 | } |
| 370 | _ = json.Unmarshal(ev.Payload, &p) |
| 371 | if p.ReviewerUserID != 0 { |
| 372 | ensure(out, p.ReviewerUserID).add(RelReviewer) |
| 373 | } |
| 374 | } |
| 375 | |
| 376 | // Self-suppression: drop the actor. We still defense-in-depth |
| 377 | // it at the dispatch site, but trimming early saves per- |
| 378 | // recipient policy lookups. |
| 379 | if actorUserID != 0 { |
| 380 | delete(out, actorUserID) |
| 381 | } |
| 382 | return out, nil |
| 383 | } |
| 384 | |
| 385 | // applyAutoSubscriptions inserts notification_threads rows for the |
| 386 | // recipients that just earned a slot via author / assignee / |
| 387 | // reviewer / mention / first-commenter rules. Non-destructive — |
| 388 | // preserves explicit user choices. |
| 389 | func applyAutoSubscriptions( |
| 390 | ctx context.Context, deps Deps, |
| 391 | ev notifdb.DomainEvent, |
| 392 | threadKind notifdb.NullNotificationThreadKind, |
| 393 | threadID int64, |
| 394 | recipients map[int64]*recipient, |
| 395 | ) { |
| 396 | q := notifdb.New() |
| 397 | for uid, rec := range recipients { |
| 398 | var reason string |
| 399 | var subscribed bool |
| 400 | // Pick the strongest auto-sub reason that applies. |
| 401 | for _, rel := range rec.Relations { |
| 402 | switch rel { |
| 403 | case RelAuthor: |
| 404 | reason, subscribed = "author", true |
| 405 | case RelAssignee: |
| 406 | reason, subscribed = "assigned", true |
| 407 | case RelReviewer: |
| 408 | reason, subscribed = "review_requested", true |
| 409 | case RelMention: |
| 410 | if reason == "" { |
| 411 | reason, subscribed = "mention", true |
| 412 | } |
| 413 | case RelCommenter: |
| 414 | if reason == "" { |
| 415 | reason, subscribed = "commenter", true |
| 416 | } |
| 417 | } |
| 418 | } |
| 419 | if reason == "" { |
| 420 | continue |
| 421 | } |
| 422 | _ = q.InsertNotificationThreadIfAbsent(ctx, deps.Pool, notifdb.InsertNotificationThreadIfAbsentParams{ |
| 423 | RecipientUserID: uid, |
| 424 | ThreadKind: threadKind.NotificationThreadKind, |
| 425 | ThreadID: threadID, |
| 426 | Subscribed: subscribed, |
| 427 | Reason: reason, |
| 428 | }) |
| 429 | } |
| 430 | } |
| 431 | |
| 432 | // maybeSendEmail consults the user's per-kind email pref + the |
| 433 | // storm dampener, then either dispatches the email or skips. Logs |
| 434 | // every attempt to notification_email_log so the dampener is |
| 435 | // self-consistent across worker restarts. |
| 436 | func maybeSendEmail( |
| 437 | ctx context.Context, deps Deps, q *notifdb.Queries, |
| 438 | recipientID int64, |
| 439 | notif notifdb.Notification, |
| 440 | reason Reason, |
| 441 | threadKind notifdb.NullNotificationThreadKind, |
| 442 | threadID int64, |
| 443 | ) error { |
| 444 | prefKey := EmailPrefKey(reason) |
| 445 | if prefKey != "" { |
| 446 | on, err := emailPrefOn(ctx, deps.Pool, recipientID, prefKey) |
| 447 | if err != nil { |
| 448 | return err |
| 449 | } |
| 450 | if !on { |
| 451 | return nil |
| 452 | } |
| 453 | } |
| 454 | |
| 455 | // Per-thread dampener. |
| 456 | if threadID != 0 { |
| 457 | count, err := q.CountEmailsForRecipientThreadSince(ctx, deps.Pool, notifdb.CountEmailsForRecipientThreadSinceParams{ |
| 458 | RecipientUserID: recipientID, |
| 459 | ThreadKind: threadKind, |
| 460 | ThreadID: pgtype.Int8{Int64: threadID, Valid: true}, |
| 461 | Column4: StormPerThreadMins, |
| 462 | }) |
| 463 | if err != nil { |
| 464 | return err |
| 465 | } |
| 466 | if count >= StormPerThreadCap { |
| 467 | return nil |
| 468 | } |
| 469 | } |
| 470 | // Per-recipient absolute cap. |
| 471 | count, err := q.CountEmailsForRecipientSince(ctx, deps.Pool, notifdb.CountEmailsForRecipientSinceParams{ |
| 472 | RecipientUserID: recipientID, |
| 473 | Column2: StormAbsoluteMins, |
| 474 | }) |
| 475 | if err != nil { |
| 476 | return err |
| 477 | } |
| 478 | if count >= StormAbsoluteCap { |
| 479 | return nil |
| 480 | } |
| 481 | |
| 482 | // We don't actually render + send here — that's a separate |
| 483 | // helper (`sendNotificationEmail`). Logging the send here is |
| 484 | // a layering violation; the helper logs after the SMTP call |
| 485 | // succeeds. The helper isn't ready in this commit; the gate |
| 486 | // stays so the storm dampener is wired even if no template |
| 487 | // renders today. |
| 488 | return sendNotificationEmail(ctx, deps, recipientID, notif, reason, threadKind, threadID) |
| 489 | } |
| 490 | |
| 491 | // ensure returns the *recipient for uid, creating it on first add. |
| 492 | func ensure(m map[int64]*recipient, uid int64) *recipient { |
| 493 | if r, ok := m[uid]; ok { |
| 494 | return r |
| 495 | } |
| 496 | r := &recipient{} |
| 497 | m[uid] = r |
| 498 | return r |
| 499 | } |
| 500 | |
| 501 | func intToPg(v int64) pgtype.Int8 { |
| 502 | return pgtype.Int8{Int64: v, Valid: v != 0} |
| 503 | } |
| 504 | |
| 505 | func isLifecycleEvent(kind string) bool { |
| 506 | switch kind { |
| 507 | case "repo_archived", "repo_unarchived", |
| 508 | "repo_visibility_changed", "repo_soft_deleted", |
| 509 | "repo_restored", "repo_transfer_requested", |
| 510 | "repo_transfer_accepted", "repo_transfer_declined", |
| 511 | "repo_transfer_canceled", "repo_transfer_expired": |
| 512 | return true |
| 513 | } |
| 514 | return false |
| 515 | } |
| 516 |