| 1 | // SPDX-License-Identifier: AGPL-3.0-or-later |
| 2 | |
| 3 | // Package worker drives the Postgres-backed job queue introduced in |
| 4 | // S14. The pool dispatches one Job per goroutine, claiming rows via |
| 5 | // FOR UPDATE SKIP LOCKED so concurrent workers don't double-process. |
| 6 | // |
| 7 | // Job kinds, their payload schema, and their handlers live alongside |
| 8 | // this package in sub-packages (jobs/<kind>.go). The pool itself is |
| 9 | // kind-agnostic: a Handler is just a func that takes a payload and |
| 10 | // returns nil-or-error. |
| 11 | package worker |
| 12 | |
| 13 | import ( |
| 14 | "context" |
| 15 | "encoding/json" |
| 16 | "errors" |
| 17 | "fmt" |
| 18 | "time" |
| 19 | ) |
| 20 | |
| 21 | // Kind is the canonical name of a job. Use lowercase letters and |
| 22 | // colon-separated namespaces (e.g. "push:process", "repo:size_recalc"). |
| 23 | // Kind doubles as the dispatch index — workers query |
| 24 | // `WHERE kind = $1` so adding new kinds doesn't disturb existing ones. |
| 25 | type Kind string |
| 26 | |
| 27 | // Built-in kinds shipped in S14. |
| 28 | const ( |
| 29 | KindPushProcess Kind = "push:process" |
| 30 | KindRepoSizeRecalc Kind = "repo:size_recalc" |
| 31 | KindJobsPurge Kind = "jobs:purge_completed" |
| 32 | ) |
| 33 | |
| 34 | // S16 lifecycle housekeeping kind. |
| 35 | const ( |
| 36 | KindLifecycleSweep Kind = "lifecycle:sweep" |
| 37 | ) |
| 38 | |
| 39 | // S22 pull-request kinds. Synchronize refreshes commits + files after |
| 40 | // a head-side push; mergeability runs the merge-tree probe. |
| 41 | // |
| 42 | // (No async-merge kind: the POST .../merge handler executes the merge |
| 43 | // synchronously so the redirect lands on the merged state. If we add |
| 44 | // async merging later — for very large repos — re-introduce a |
| 45 | // KindPRMerge here and a matching jobs.PRMerge handler.) |
| 46 | const ( |
| 47 | KindPRSynchronize Kind = "pr:synchronize" |
| 48 | KindPRMergeability Kind = "pr:mergeability" |
| 49 | ) |
| 50 | |
| 51 | // NotifyChannel is the Postgres LISTEN/NOTIFY channel the pool subscribes |
| 52 | // to so it wakes up immediately when a job is enqueued, instead of |
| 53 | // polling. Callers wrapping enqueue in a tx must NOTIFY inside the |
| 54 | // same tx so the notification only fires on commit. |
| 55 | const NotifyChannel = "shithub_jobs" |
| 56 | |
| 57 | // Handler runs one job. The framework supplies the raw JSON payload; |
| 58 | // handlers Unmarshal into their own typed schema. A nil error reports |
| 59 | // success and the job is marked completed; any non-nil error triggers |
| 60 | // the backoff/retry path. ErrPoison is the explicit "do not retry" signal |
| 61 | // — useful when the input is malformed and retrying can't help. |
| 62 | type Handler func(ctx context.Context, payload json.RawMessage) error |
| 63 | |
| 64 | // ErrPoison wraps a handler error that should NOT be retried. The pool |
| 65 | // jumps the job straight to MarkJobFailed instead of rescheduling. |
| 66 | var ErrPoison = errors.New("worker: poison job") |
| 67 | |
| 68 | // PoisonError wraps cause as a poison error. The cause is preserved in |
| 69 | // last_error for operator inspection. |
| 70 | func PoisonError(cause error) error { |
| 71 | return fmt.Errorf("%w: %v", ErrPoison, cause) |
| 72 | } |
| 73 | |
| 74 | // Backoff returns the delay before retrying a job that is about to be |
| 75 | // rescheduled. The formula is `30s * 2^attempts` capped at 1 hour, with |
| 76 | // ±20% jitter so a fleet doesn't synchronize retries on a sibling |
| 77 | // dependency outage. |
| 78 | // |
| 79 | // `attempts` is the number of attempts already made (1-indexed: the |
| 80 | // just-failed attempt counts as 1). |
| 81 | func Backoff(attempts int, jitter func() float64) time.Duration { |
| 82 | if attempts < 1 { |
| 83 | attempts = 1 |
| 84 | } |
| 85 | const ( |
| 86 | base = 30 * time.Second |
| 87 | cap_ = time.Hour |
| 88 | ) |
| 89 | // Compute base * 2^(attempts-1), guarding against overflow. |
| 90 | d := base |
| 91 | for i := 1; i < attempts; i++ { |
| 92 | d *= 2 |
| 93 | if d >= cap_ { |
| 94 | d = cap_ |
| 95 | break |
| 96 | } |
| 97 | } |
| 98 | if d > cap_ { |
| 99 | d = cap_ |
| 100 | } |
| 101 | if jitter != nil { |
| 102 | // jitter() in [0,1) → multiplier in [0.8, 1.2) |
| 103 | mult := 0.8 + 0.4*jitter() |
| 104 | d = time.Duration(float64(d) * mult) |
| 105 | } |
| 106 | return d |
| 107 | } |
| 108 |