Go · 6173 bytes Raw Blame History
1 // Code generated by sqlc. DO NOT EDIT.
2 // versions:
3 // sqlc v1.31.1
4 // source: jobs.sql
5
6 package workerdb
7
8 import (
9 "context"
10
11 "github.com/jackc/pgx/v5/pgtype"
12 )
13
14 const claimJob = `-- name: ClaimJob :one
15 UPDATE jobs
16 SET locked_by = $2,
17 locked_at = now(),
18 attempts = jobs.attempts + 1
19 WHERE id = (
20 SELECT j.id FROM jobs j
21 WHERE j.kind = $1
22 AND j.completed_at IS NULL
23 AND j.failed_at IS NULL
24 AND j.run_at <= now()
25 AND (j.locked_by IS NULL OR j.locked_at < now() - interval '5 minutes')
26 ORDER BY j.run_at ASC, j.id ASC
27 FOR UPDATE SKIP LOCKED
28 LIMIT 1
29 )
30 RETURNING id, kind, payload, run_at, attempts, max_attempts, last_error, locked_by, locked_at, completed_at, failed_at, created_at
31 `
32
33 type ClaimJobParams struct {
34 Kind string
35 LockedBy pgtype.Text
36 }
37
38 // Atomically claim the oldest runnable job of a given kind. Workers
39 // supply their own instance ID as locked_by so admins can see who's
40 // holding what. Returns pgx.ErrNoRows when the queue is empty for
41 // that kind.
42 func (q *Queries) ClaimJob(ctx context.Context, db DBTX, arg ClaimJobParams) (Job, error) {
43 row := db.QueryRow(ctx, claimJob, arg.Kind, arg.LockedBy)
44 var i Job
45 err := row.Scan(
46 &i.ID,
47 &i.Kind,
48 &i.Payload,
49 &i.RunAt,
50 &i.Attempts,
51 &i.MaxAttempts,
52 &i.LastError,
53 &i.LockedBy,
54 &i.LockedAt,
55 &i.CompletedAt,
56 &i.FailedAt,
57 &i.CreatedAt,
58 )
59 return i, err
60 }
61
62 const enqueueJob = `-- name: EnqueueJob :one
63
64 INSERT INTO jobs (kind, payload, run_at, max_attempts)
65 VALUES ($1, $2, COALESCE($3::timestamptz, now()), COALESCE($4::int, 5))
66 RETURNING id, kind, payload, run_at, attempts, max_attempts, last_error, locked_by, locked_at, completed_at, failed_at, created_at
67 `
68
69 type EnqueueJobParams struct {
70 Kind string
71 Payload []byte
72 RunAt pgtype.Timestamptz
73 MaxAttempts pgtype.Int4
74 }
75
76 // SPDX-License-Identifier: AGPL-3.0-or-later
77 //
78 // Jobs queue. The dispatch query uses FOR UPDATE SKIP LOCKED — the
79 // canonical Postgres pattern for safe concurrent dequeue. Multiple
80 // workers can run ClaimJob concurrently; each gets a different row.
81 // Insert a new job. run_at defaults to now() so the job is immediately
82 // runnable; pass a future timestamp for delayed work. The returned row
83 // carries the assigned ID, which callers persist when the enqueue is
84 // done inside a wider transaction.
85 func (q *Queries) EnqueueJob(ctx context.Context, db DBTX, arg EnqueueJobParams) (Job, error) {
86 row := db.QueryRow(ctx, enqueueJob,
87 arg.Kind,
88 arg.Payload,
89 arg.RunAt,
90 arg.MaxAttempts,
91 )
92 var i Job
93 err := row.Scan(
94 &i.ID,
95 &i.Kind,
96 &i.Payload,
97 &i.RunAt,
98 &i.Attempts,
99 &i.MaxAttempts,
100 &i.LastError,
101 &i.LockedBy,
102 &i.LockedAt,
103 &i.CompletedAt,
104 &i.FailedAt,
105 &i.CreatedAt,
106 )
107 return i, err
108 }
109
110 const getJob = `-- name: GetJob :one
111 SELECT id, kind, payload, run_at, attempts, max_attempts, last_error, locked_by, locked_at, completed_at, failed_at, created_at FROM jobs WHERE id = $1
112 `
113
114 // Single-row lookup by id. Used in tests and the admin panel.
115 func (q *Queries) GetJob(ctx context.Context, db DBTX, id int64) (Job, error) {
116 row := db.QueryRow(ctx, getJob, id)
117 var i Job
118 err := row.Scan(
119 &i.ID,
120 &i.Kind,
121 &i.Payload,
122 &i.RunAt,
123 &i.Attempts,
124 &i.MaxAttempts,
125 &i.LastError,
126 &i.LockedBy,
127 &i.LockedAt,
128 &i.CompletedAt,
129 &i.FailedAt,
130 &i.CreatedAt,
131 )
132 return i, err
133 }
134
135 const markJobCompleted = `-- name: MarkJobCompleted :exec
136 UPDATE jobs
137 SET completed_at = now(),
138 locked_by = NULL,
139 locked_at = NULL
140 WHERE id = $1
141 `
142
143 // On success: clear lock + set completed_at.
144 func (q *Queries) MarkJobCompleted(ctx context.Context, db DBTX, id int64) error {
145 _, err := db.Exec(ctx, markJobCompleted, id)
146 return err
147 }
148
149 const markJobFailed = `-- name: MarkJobFailed :exec
150 UPDATE jobs
151 SET failed_at = now(),
152 locked_by = NULL,
153 locked_at = NULL,
154 last_error = $2
155 WHERE id = $1
156 `
157
158 type MarkJobFailedParams struct {
159 ID int64
160 LastError pgtype.Text
161 }
162
163 // Terminal failure (attempts hit max). Holds locked_by NULL so the
164 // row is no longer "in flight" to dashboards; failed_at marks it
165 // visible to a future poison-job inspector (S34 admin panel).
166 func (q *Queries) MarkJobFailed(ctx context.Context, db DBTX, arg MarkJobFailedParams) error {
167 _, err := db.Exec(ctx, markJobFailed, arg.ID, arg.LastError)
168 return err
169 }
170
171 const purgeCompletedJobs = `-- name: PurgeCompletedJobs :execrows
172 DELETE FROM jobs
173 WHERE completed_at IS NOT NULL
174 AND completed_at < $1
175 `
176
177 // Delete completed jobs older than the supplied cutoff. Used by the
178 // jobs:purge_completed maintenance job. Returns the number of rows
179 // deleted so the cron handler can log progress.
180 func (q *Queries) PurgeCompletedJobs(ctx context.Context, db DBTX, completedAt pgtype.Timestamptz) (int64, error) {
181 result, err := db.Exec(ctx, purgeCompletedJobs, completedAt)
182 if err != nil {
183 return 0, err
184 }
185 return result.RowsAffected(), nil
186 }
187
188 const purgeFailedJobs = `-- name: PurgeFailedJobs :execrows
189 DELETE FROM jobs
190 WHERE failed_at IS NOT NULL
191 AND failed_at < $1
192 `
193
194 // Same as PurgeCompletedJobs but for terminally failed rows. Kept
195 // separate because operators usually want a longer retention on
196 // failures to inspect what blew up.
197 func (q *Queries) PurgeFailedJobs(ctx context.Context, db DBTX, failedAt pgtype.Timestamptz) (int64, error) {
198 result, err := db.Exec(ctx, purgeFailedJobs, failedAt)
199 if err != nil {
200 return 0, err
201 }
202 return result.RowsAffected(), nil
203 }
204
205 const rescheduleJob = `-- name: RescheduleJob :exec
206 UPDATE jobs
207 SET locked_by = NULL,
208 locked_at = NULL,
209 last_error = $2,
210 run_at = $3
211 WHERE id = $1
212 `
213
214 type RescheduleJobParams struct {
215 ID int64
216 LastError pgtype.Text
217 RunAt pgtype.Timestamptz
218 }
219
220 // Transient failure: clear the lock, record the error, push run_at
221 // forward by the caller-computed backoff. attempts was incremented
222 // by ClaimJob, so checking attempts >= max_attempts elsewhere decides
223 // between Reschedule and MarkJobFailed.
224 func (q *Queries) RescheduleJob(ctx context.Context, db DBTX, arg RescheduleJobParams) error {
225 _, err := db.Exec(ctx, rescheduleJob, arg.ID, arg.LastError, arg.RunAt)
226 return err
227 }
228