Go · 1874 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 package worker
4
5 import (
6 "context"
7 "encoding/json"
8 "fmt"
9
10 "github.com/jackc/pgx/v5/pgtype"
11
12 workerdb "github.com/tenseleyFlow/shithub/internal/worker/sqlc"
13 )
14
15 // DBTX matches the pgx interface that sqlc-generated methods accept
16 // (anything providing Exec/Query/QueryRow). The pool, a tx, and the
17 // helpers in dbtest all satisfy it.
18 type DBTX = workerdb.DBTX
19
20 // EnqueueOptions tunes a single enqueue. RunAt zero means "now"; pass a
21 // future time to delay the first run. MaxAttempts zero means use the
22 // table default (5).
23 type EnqueueOptions struct {
24 RunAt pgtype.Timestamptz
25 MaxAttempts int32
26 }
27
28 // Enqueue inserts a job row and returns its id. Callers running inside a
29 // transaction should pass the tx as db so the enqueue is rolled back
30 // alongside any related state changes; same goes for the NOTIFY (issued
31 // separately by the caller via Notify).
32 func Enqueue(ctx context.Context, db DBTX, kind Kind, payload any, opts EnqueueOptions) (int64, error) {
33 body, err := json.Marshal(payload)
34 if err != nil {
35 return 0, fmt.Errorf("worker: marshal payload: %w", err)
36 }
37 q := workerdb.New()
38 row, err := q.EnqueueJob(ctx, db, workerdb.EnqueueJobParams{
39 Kind: string(kind),
40 Payload: body,
41 RunAt: opts.RunAt,
42 MaxAttempts: pgtype.Int4{Int32: opts.MaxAttempts, Valid: opts.MaxAttempts > 0},
43 })
44 if err != nil {
45 return 0, fmt.Errorf("worker: enqueue %s: %w", kind, err)
46 }
47 return row.ID, nil
48 }
49
50 // Notify wakes any LISTENing workers. Safe to call after a successful
51 // commit; if called inside a tx, the NOTIFY only delivers when the tx
52 // commits (Postgres semantics). Errors are non-fatal — workers also poll
53 // at a slow interval as a backstop.
54 func Notify(ctx context.Context, db DBTX) error {
55 _, err := db.Exec(ctx, "SELECT pg_notify($1, '')", NotifyChannel)
56 return err
57 }
58