Go · 5842 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 // Package worker drives the Postgres-backed job queue introduced in
4 // S14. The pool dispatches one Job per goroutine, claiming rows via
5 // FOR UPDATE SKIP LOCKED so concurrent workers don't double-process.
6 //
7 // Job kinds, their payload schema, and their handlers live alongside
8 // this package in sub-packages (jobs/<kind>.go). The pool itself is
9 // kind-agnostic: a Handler is just a func that takes a payload and
10 // returns nil-or-error.
11 package worker
12
13 import (
14 "context"
15 "encoding/json"
16 "errors"
17 "fmt"
18 "time"
19 )
20
21 // Kind is the canonical name of a job. Use lowercase letters and
22 // colon-separated namespaces (e.g. "push:process", "repo:size_recalc").
23 // Kind doubles as the dispatch index — workers query
24 // `WHERE kind = $1` so adding new kinds doesn't disturb existing ones.
25 type Kind string
26
27 // Built-in kinds shipped in S14.
28 const (
29 KindPushProcess Kind = "push:process"
30 KindRepoSizeRecalc Kind = "repo:size_recalc"
31 KindJobsPurge Kind = "jobs:purge_completed"
32 )
33
34 // S16 lifecycle housekeeping kind.
35 const (
36 KindLifecycleSweep Kind = "lifecycle:sweep"
37 )
38
39 // S22 pull-request kinds. Synchronize refreshes commits + files after
40 // a head-side push; mergeability runs the merge-tree probe.
41 //
42 // (No async-merge kind: the POST .../merge handler executes the merge
43 // synchronously so the redirect lands on the merged state. If we add
44 // async merging later — for very large repos — re-introduce a
45 // KindPRMerge here and a matching jobs.PRMerge handler.)
46 const (
47 KindPRSynchronize Kind = "pr:synchronize"
48 KindPRMergeability Kind = "pr:mergeability"
49 )
50
51 // S27 fork kinds. fork_clone runs `git clone --bare --shared` for a
52 // freshly created fork shell; the fork's init_status flips from
53 // init_pending → initialized (or init_failed) on success/failure.
54 const (
55 KindRepoForkClone Kind = "repo:fork_clone"
56 )
57
58 // S28 code-search kinds. index_code re-indexes a repo's default
59 // branch (paths + content). Enqueued by push:process when the
60 // default branch advances; also by index_reconcile when drift
61 // between default_branch_oid and last_indexed_oid is detected.
62 const (
63 KindRepoIndexCode Kind = "repo:index_code"
64 KindRepoIndexReconcile Kind = "repo:index_reconcile"
65 )
66
67 // S29 notification kinds. notify:fanout drains the domain_events
68 // table past the persisted cursor and materializes inbox rows +
69 // (optional) emails. Self-throttling per FanoutBatch; loop-friendly
70 // (the handler returns the count so the pool can re-enqueue when a
71 // drain didn't catch up).
72 const (
73 KindNotifyFanout Kind = "notify:fanout"
74 )
75
76 // S42 social feed kinds. trending:compute refreshes the denormalized
77 // day/week/month Explore rankings.
78 const (
79 KindTrendingCompute Kind = "trending:compute"
80 )
81
82 // Organization import kinds. The discovery job lists GitHub repositories
83 // and fans out one child job per repository so large organizations can
84 // progress incrementally.
85 const (
86 KindOrgGitHubImportDiscover Kind = "org:github_import_discover"
87 KindOrgGitHubImportRepo Kind = "org:github_import_repo"
88 )
89
90 // Organization billing kinds. seat_sync recomputes active org members,
91 // records a local billing snapshot, and updates Stripe subscription-item
92 // quantity when hosted Team billing is active.
93 const (
94 KindOrgBillingSeatSync Kind = "org:billing_seat_sync"
95 )
96
97 // S51 GPG signature verification backfill. One job per repo;
98 // payload {repo_id}. The handler walks the repo's default branch +
99 // annotated tags and writes commit_verification_cache rows for every
100 // signed object. Dispatched both eagerly (one job per repo when a
101 // user adds a GPG key — DispatchForKey) and as a bulk admin command
102 // (shithubd gpg-backfill-all — DispatchAll). The handler is
103 // idempotent thanks to UpsertCommitVerification's ON CONFLICT.
104 const (
105 KindGPGBackfill Kind = "gpg:backfill"
106 )
107
108 // NotifyChannel is the Postgres LISTEN/NOTIFY channel the pool subscribes
109 // to so it wakes up immediately when a job is enqueued, instead of
110 // polling. Callers wrapping enqueue in a tx must NOTIFY inside the
111 // same tx so the notification only fires on commit.
112 const NotifyChannel = "shithub_jobs"
113
114 // Handler runs one job. The framework supplies the raw JSON payload;
115 // handlers Unmarshal into their own typed schema. A nil error reports
116 // success and the job is marked completed; any non-nil error triggers
117 // the backoff/retry path. ErrPoison is the explicit "do not retry" signal
118 // — useful when the input is malformed and retrying can't help.
119 type Handler func(ctx context.Context, payload json.RawMessage) error
120
121 // ErrPoison wraps a handler error that should NOT be retried. The pool
122 // jumps the job straight to MarkJobFailed instead of rescheduling.
123 var ErrPoison = errors.New("worker: poison job")
124
125 // PoisonError wraps cause as a poison error. The cause is preserved in
126 // last_error for operator inspection.
127 func PoisonError(cause error) error {
128 return fmt.Errorf("%w: %v", ErrPoison, cause)
129 }
130
131 // Backoff returns the delay before retrying a job that is about to be
132 // rescheduled. The formula is `30s * 2^attempts` capped at 1 hour, with
133 // ±20% jitter so a fleet doesn't synchronize retries on a sibling
134 // dependency outage.
135 //
136 // `attempts` is the number of attempts already made (1-indexed: the
137 // just-failed attempt counts as 1).
138 func Backoff(attempts int, jitter func() float64) time.Duration {
139 if attempts < 1 {
140 attempts = 1
141 }
142 const (
143 base = 30 * time.Second
144 maxDelay = time.Hour
145 )
146 // Compute base * 2^(attempts-1), guarding against overflow.
147 d := base
148 for i := 1; i < attempts; i++ {
149 d *= 2
150 if d >= maxDelay {
151 d = maxDelay
152 break
153 }
154 }
155 if d > maxDelay {
156 d = maxDelay
157 }
158 if jitter != nil {
159 // jitter() in [0,1) → multiplier in [0.8, 1.2)
160 mult := 0.8 + 0.4*jitter()
161 d = time.Duration(float64(d) * mult)
162 }
163 return d
164 }
165