| 1 | // Code generated by sqlc. DO NOT EDIT. |
| 2 | // versions: |
| 3 | // sqlc v1.31.1 |
| 4 | // source: jobs.sql |
| 5 | |
| 6 | package workerdb |
| 7 | |
| 8 | import ( |
| 9 | "context" |
| 10 | |
| 11 | "github.com/jackc/pgx/v5/pgtype" |
| 12 | ) |
| 13 | |
| 14 | const claimJob = `-- name: ClaimJob :one |
| 15 | UPDATE jobs |
| 16 | SET locked_by = $2, |
| 17 | locked_at = now(), |
| 18 | attempts = jobs.attempts + 1 |
| 19 | WHERE id = ( |
| 20 | SELECT j.id FROM jobs j |
| 21 | WHERE j.kind = $1 |
| 22 | AND j.completed_at IS NULL |
| 23 | AND j.failed_at IS NULL |
| 24 | AND j.run_at <= now() |
| 25 | AND (j.locked_by IS NULL OR j.locked_at < now() - interval '5 minutes') |
| 26 | ORDER BY j.run_at ASC, j.id ASC |
| 27 | FOR UPDATE SKIP LOCKED |
| 28 | LIMIT 1 |
| 29 | ) |
| 30 | RETURNING id, kind, payload, run_at, attempts, max_attempts, last_error, locked_by, locked_at, completed_at, failed_at, created_at |
| 31 | ` |
| 32 | |
| 33 | type ClaimJobParams struct { |
| 34 | Kind string |
| 35 | LockedBy pgtype.Text |
| 36 | } |
| 37 | |
| 38 | // Atomically claim the oldest runnable job of a given kind. Workers |
| 39 | // supply their own instance ID as locked_by so admins can see who's |
| 40 | // holding what. Returns pgx.ErrNoRows when the queue is empty for |
| 41 | // that kind. |
| 42 | func (q *Queries) ClaimJob(ctx context.Context, db DBTX, arg ClaimJobParams) (Job, error) { |
| 43 | row := db.QueryRow(ctx, claimJob, arg.Kind, arg.LockedBy) |
| 44 | var i Job |
| 45 | err := row.Scan( |
| 46 | &i.ID, |
| 47 | &i.Kind, |
| 48 | &i.Payload, |
| 49 | &i.RunAt, |
| 50 | &i.Attempts, |
| 51 | &i.MaxAttempts, |
| 52 | &i.LastError, |
| 53 | &i.LockedBy, |
| 54 | &i.LockedAt, |
| 55 | &i.CompletedAt, |
| 56 | &i.FailedAt, |
| 57 | &i.CreatedAt, |
| 58 | ) |
| 59 | return i, err |
| 60 | } |
| 61 | |
| 62 | const enqueueJob = `-- name: EnqueueJob :one |
| 63 | |
| 64 | INSERT INTO jobs (kind, payload, run_at, max_attempts) |
| 65 | VALUES ($1, $2, COALESCE($3::timestamptz, now()), COALESCE($4::int, 5)) |
| 66 | RETURNING id, kind, payload, run_at, attempts, max_attempts, last_error, locked_by, locked_at, completed_at, failed_at, created_at |
| 67 | ` |
| 68 | |
| 69 | type EnqueueJobParams struct { |
| 70 | Kind string |
| 71 | Payload []byte |
| 72 | RunAt pgtype.Timestamptz |
| 73 | MaxAttempts pgtype.Int4 |
| 74 | } |
| 75 | |
| 76 | // SPDX-License-Identifier: AGPL-3.0-or-later |
| 77 | // |
| 78 | // Jobs queue. The dispatch query uses FOR UPDATE SKIP LOCKED — the |
| 79 | // canonical Postgres pattern for safe concurrent dequeue. Multiple |
| 80 | // workers can run ClaimJob concurrently; each gets a different row. |
| 81 | // Insert a new job. run_at defaults to now() so the job is immediately |
| 82 | // runnable; pass a future timestamp for delayed work. The returned row |
| 83 | // carries the assigned ID, which callers persist when the enqueue is |
| 84 | // done inside a wider transaction. |
| 85 | func (q *Queries) EnqueueJob(ctx context.Context, db DBTX, arg EnqueueJobParams) (Job, error) { |
| 86 | row := db.QueryRow(ctx, enqueueJob, |
| 87 | arg.Kind, |
| 88 | arg.Payload, |
| 89 | arg.RunAt, |
| 90 | arg.MaxAttempts, |
| 91 | ) |
| 92 | var i Job |
| 93 | err := row.Scan( |
| 94 | &i.ID, |
| 95 | &i.Kind, |
| 96 | &i.Payload, |
| 97 | &i.RunAt, |
| 98 | &i.Attempts, |
| 99 | &i.MaxAttempts, |
| 100 | &i.LastError, |
| 101 | &i.LockedBy, |
| 102 | &i.LockedAt, |
| 103 | &i.CompletedAt, |
| 104 | &i.FailedAt, |
| 105 | &i.CreatedAt, |
| 106 | ) |
| 107 | return i, err |
| 108 | } |
| 109 | |
| 110 | const getJob = `-- name: GetJob :one |
| 111 | SELECT id, kind, payload, run_at, attempts, max_attempts, last_error, locked_by, locked_at, completed_at, failed_at, created_at FROM jobs WHERE id = $1 |
| 112 | ` |
| 113 | |
| 114 | // Single-row lookup by id. Used in tests and the admin panel. |
| 115 | func (q *Queries) GetJob(ctx context.Context, db DBTX, id int64) (Job, error) { |
| 116 | row := db.QueryRow(ctx, getJob, id) |
| 117 | var i Job |
| 118 | err := row.Scan( |
| 119 | &i.ID, |
| 120 | &i.Kind, |
| 121 | &i.Payload, |
| 122 | &i.RunAt, |
| 123 | &i.Attempts, |
| 124 | &i.MaxAttempts, |
| 125 | &i.LastError, |
| 126 | &i.LockedBy, |
| 127 | &i.LockedAt, |
| 128 | &i.CompletedAt, |
| 129 | &i.FailedAt, |
| 130 | &i.CreatedAt, |
| 131 | ) |
| 132 | return i, err |
| 133 | } |
| 134 | |
| 135 | const markJobCompleted = `-- name: MarkJobCompleted :exec |
| 136 | UPDATE jobs |
| 137 | SET completed_at = now(), |
| 138 | locked_by = NULL, |
| 139 | locked_at = NULL |
| 140 | WHERE id = $1 |
| 141 | ` |
| 142 | |
| 143 | // On success: clear lock + set completed_at. |
| 144 | func (q *Queries) MarkJobCompleted(ctx context.Context, db DBTX, id int64) error { |
| 145 | _, err := db.Exec(ctx, markJobCompleted, id) |
| 146 | return err |
| 147 | } |
| 148 | |
| 149 | const markJobFailed = `-- name: MarkJobFailed :exec |
| 150 | UPDATE jobs |
| 151 | SET failed_at = now(), |
| 152 | locked_by = NULL, |
| 153 | locked_at = NULL, |
| 154 | last_error = $2 |
| 155 | WHERE id = $1 |
| 156 | ` |
| 157 | |
| 158 | type MarkJobFailedParams struct { |
| 159 | ID int64 |
| 160 | LastError pgtype.Text |
| 161 | } |
| 162 | |
| 163 | // Terminal failure (attempts hit max). Holds locked_by NULL so the |
| 164 | // row is no longer "in flight" to dashboards; failed_at marks it |
| 165 | // visible to a future poison-job inspector (S34 admin panel). |
| 166 | func (q *Queries) MarkJobFailed(ctx context.Context, db DBTX, arg MarkJobFailedParams) error { |
| 167 | _, err := db.Exec(ctx, markJobFailed, arg.ID, arg.LastError) |
| 168 | return err |
| 169 | } |
| 170 | |
| 171 | const purgeCompletedJobs = `-- name: PurgeCompletedJobs :execrows |
| 172 | DELETE FROM jobs |
| 173 | WHERE completed_at IS NOT NULL |
| 174 | AND completed_at < $1 |
| 175 | ` |
| 176 | |
| 177 | // Delete completed jobs older than the supplied cutoff. Used by the |
| 178 | // jobs:purge_completed maintenance job. Returns the number of rows |
| 179 | // deleted so the cron handler can log progress. |
| 180 | func (q *Queries) PurgeCompletedJobs(ctx context.Context, db DBTX, completedAt pgtype.Timestamptz) (int64, error) { |
| 181 | result, err := db.Exec(ctx, purgeCompletedJobs, completedAt) |
| 182 | if err != nil { |
| 183 | return 0, err |
| 184 | } |
| 185 | return result.RowsAffected(), nil |
| 186 | } |
| 187 | |
| 188 | const purgeFailedJobs = `-- name: PurgeFailedJobs :execrows |
| 189 | DELETE FROM jobs |
| 190 | WHERE failed_at IS NOT NULL |
| 191 | AND failed_at < $1 |
| 192 | ` |
| 193 | |
| 194 | // Same as PurgeCompletedJobs but for terminally failed rows. Kept |
| 195 | // separate because operators usually want a longer retention on |
| 196 | // failures to inspect what blew up. |
| 197 | func (q *Queries) PurgeFailedJobs(ctx context.Context, db DBTX, failedAt pgtype.Timestamptz) (int64, error) { |
| 198 | result, err := db.Exec(ctx, purgeFailedJobs, failedAt) |
| 199 | if err != nil { |
| 200 | return 0, err |
| 201 | } |
| 202 | return result.RowsAffected(), nil |
| 203 | } |
| 204 | |
| 205 | const rescheduleJob = `-- name: RescheduleJob :exec |
| 206 | UPDATE jobs |
| 207 | SET locked_by = NULL, |
| 208 | locked_at = NULL, |
| 209 | last_error = $2, |
| 210 | run_at = $3 |
| 211 | WHERE id = $1 |
| 212 | ` |
| 213 | |
| 214 | type RescheduleJobParams struct { |
| 215 | ID int64 |
| 216 | LastError pgtype.Text |
| 217 | RunAt pgtype.Timestamptz |
| 218 | } |
| 219 | |
| 220 | // Transient failure: clear the lock, record the error, push run_at |
| 221 | // forward by the caller-computed backoff. attempts was incremented |
| 222 | // by ClaimJob, so checking attempts >= max_attempts elsewhere decides |
| 223 | // between Reschedule and MarkJobFailed. |
| 224 | func (q *Queries) RescheduleJob(ctx context.Context, db DBTX, arg RescheduleJobParams) error { |
| 225 | _, err := db.Exec(ctx, rescheduleJob, arg.ID, arg.LastError, arg.RunAt) |
| 226 | return err |
| 227 | } |
| 228 |