| 1 | // SPDX-License-Identifier: AGPL-3.0-or-later |
| 2 | |
| 3 | // Package worker drives the Postgres-backed job queue introduced in |
| 4 | // S14. The pool dispatches one Job per goroutine, claiming rows via |
| 5 | // FOR UPDATE SKIP LOCKED so concurrent workers don't double-process. |
| 6 | // |
| 7 | // Job kinds, their payload schema, and their handlers live alongside |
| 8 | // this package in sub-packages (jobs/<kind>.go). The pool itself is |
| 9 | // kind-agnostic: a Handler is just a func that takes a payload and |
| 10 | // returns nil-or-error. |
| 11 | package worker |
| 12 | |
| 13 | import ( |
| 14 | "context" |
| 15 | "encoding/json" |
| 16 | "errors" |
| 17 | "fmt" |
| 18 | "time" |
| 19 | ) |
| 20 | |
| 21 | // Kind is the canonical name of a job. Use lowercase letters and |
| 22 | // colon-separated namespaces (e.g. "push:process", "repo:size_recalc"). |
| 23 | // Kind doubles as the dispatch index — workers query |
| 24 | // `WHERE kind = $1` so adding new kinds doesn't disturb existing ones. |
| 25 | type Kind string |
| 26 | |
| 27 | // Built-in kinds shipped in S14. |
| 28 | const ( |
| 29 | KindPushProcess Kind = "push:process" |
| 30 | KindRepoSizeRecalc Kind = "repo:size_recalc" |
| 31 | KindJobsPurge Kind = "jobs:purge_completed" |
| 32 | ) |
| 33 | |
| 34 | // S16 lifecycle housekeeping kind. |
| 35 | const ( |
| 36 | KindLifecycleSweep Kind = "lifecycle:sweep" |
| 37 | ) |
| 38 | |
| 39 | // S22 pull-request kinds. Synchronize refreshes commits + files after |
| 40 | // a head-side push; mergeability runs the merge-tree probe. |
| 41 | // |
| 42 | // (No async-merge kind: the POST .../merge handler executes the merge |
| 43 | // synchronously so the redirect lands on the merged state. If we add |
| 44 | // async merging later — for very large repos — re-introduce a |
| 45 | // KindPRMerge here and a matching jobs.PRMerge handler.) |
| 46 | const ( |
| 47 | KindPRSynchronize Kind = "pr:synchronize" |
| 48 | KindPRMergeability Kind = "pr:mergeability" |
| 49 | ) |
| 50 | |
| 51 | // S27 fork kinds. fork_clone runs `git clone --bare --shared` for a |
| 52 | // freshly created fork shell; the fork's init_status flips from |
| 53 | // init_pending → initialized (or init_failed) on success/failure. |
| 54 | const ( |
| 55 | KindRepoForkClone Kind = "repo:fork_clone" |
| 56 | ) |
| 57 | |
| 58 | // S28 code-search kinds. index_code re-indexes a repo's default |
| 59 | // branch (paths + content). Enqueued by push:process when the |
| 60 | // default branch advances; also by index_reconcile when drift |
| 61 | // between default_branch_oid and last_indexed_oid is detected. |
| 62 | const ( |
| 63 | KindRepoIndexCode Kind = "repo:index_code" |
| 64 | KindRepoIndexReconcile Kind = "repo:index_reconcile" |
| 65 | ) |
| 66 | |
| 67 | // S29 notification kinds. notify:fanout drains the domain_events |
| 68 | // table past the persisted cursor and materializes inbox rows + |
| 69 | // (optional) emails. Self-throttling per FanoutBatch; loop-friendly |
| 70 | // (the handler returns the count so the pool can re-enqueue when a |
| 71 | // drain didn't catch up). |
| 72 | const ( |
| 73 | KindNotifyFanout Kind = "notify:fanout" |
| 74 | ) |
| 75 | |
| 76 | // S42 social feed kinds. trending:compute refreshes the denormalized |
| 77 | // day/week/month Explore rankings. |
| 78 | const ( |
| 79 | KindTrendingCompute Kind = "trending:compute" |
| 80 | ) |
| 81 | |
| 82 | // Organization import kinds. The discovery job lists GitHub repositories |
| 83 | // and fans out one child job per repository so large organizations can |
| 84 | // progress incrementally. |
| 85 | const ( |
| 86 | KindOrgGitHubImportDiscover Kind = "org:github_import_discover" |
| 87 | KindOrgGitHubImportRepo Kind = "org:github_import_repo" |
| 88 | ) |
| 89 | |
| 90 | // NotifyChannel is the Postgres LISTEN/NOTIFY channel the pool subscribes |
| 91 | // to so it wakes up immediately when a job is enqueued, instead of |
| 92 | // polling. Callers wrapping enqueue in a tx must NOTIFY inside the |
| 93 | // same tx so the notification only fires on commit. |
| 94 | const NotifyChannel = "shithub_jobs" |
| 95 | |
| 96 | // Handler runs one job. The framework supplies the raw JSON payload; |
| 97 | // handlers Unmarshal into their own typed schema. A nil error reports |
| 98 | // success and the job is marked completed; any non-nil error triggers |
| 99 | // the backoff/retry path. ErrPoison is the explicit "do not retry" signal |
| 100 | // — useful when the input is malformed and retrying can't help. |
| 101 | type Handler func(ctx context.Context, payload json.RawMessage) error |
| 102 | |
| 103 | // ErrPoison wraps a handler error that should NOT be retried. The pool |
| 104 | // jumps the job straight to MarkJobFailed instead of rescheduling. |
| 105 | var ErrPoison = errors.New("worker: poison job") |
| 106 | |
| 107 | // PoisonError wraps cause as a poison error. The cause is preserved in |
| 108 | // last_error for operator inspection. |
| 109 | func PoisonError(cause error) error { |
| 110 | return fmt.Errorf("%w: %v", ErrPoison, cause) |
| 111 | } |
| 112 | |
| 113 | // Backoff returns the delay before retrying a job that is about to be |
| 114 | // rescheduled. The formula is `30s * 2^attempts` capped at 1 hour, with |
| 115 | // ±20% jitter so a fleet doesn't synchronize retries on a sibling |
| 116 | // dependency outage. |
| 117 | // |
| 118 | // `attempts` is the number of attempts already made (1-indexed: the |
| 119 | // just-failed attempt counts as 1). |
| 120 | func Backoff(attempts int, jitter func() float64) time.Duration { |
| 121 | if attempts < 1 { |
| 122 | attempts = 1 |
| 123 | } |
| 124 | const ( |
| 125 | base = 30 * time.Second |
| 126 | maxDelay = time.Hour |
| 127 | ) |
| 128 | // Compute base * 2^(attempts-1), guarding against overflow. |
| 129 | d := base |
| 130 | for i := 1; i < attempts; i++ { |
| 131 | d *= 2 |
| 132 | if d >= maxDelay { |
| 133 | d = maxDelay |
| 134 | break |
| 135 | } |
| 136 | } |
| 137 | if d > maxDelay { |
| 138 | d = maxDelay |
| 139 | } |
| 140 | if jitter != nil { |
| 141 | // jitter() in [0,1) → multiplier in [0.8, 1.2) |
| 142 | mult := 0.8 + 0.4*jitter() |
| 143 | d = time.Duration(float64(d) * mult) |
| 144 | } |
| 145 | return d |
| 146 | } |
| 147 |