tenseleyflow/shithub / e659af3

Browse files

S14: jobs, push_events, webhook_events_pending tables + repos.default_branch_oid

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
e659af39ff97426dfdf25bd26148492080d367bf
Parents
f177633
Tree
009ff24

17 changed files

StatusFile+-
M internal/meta/sqlc/models.go 55 18
A internal/migrationsfs/migrations/0018_jobs_and_push_events.sql 109 0
M internal/repos/queries/repos.sql 39 3
M internal/repos/sqlc/models.go 55 18
M internal/repos/sqlc/querier.go 13 0
M internal/repos/sqlc/repos.sql.go 121 3
M internal/users/sqlc/models.go 55 18
A internal/worker/queries/jobs.sql 87 0
A internal/worker/queries/push_events.sql 20 0
A internal/worker/queries/webhook_events_pending.sql 10 0
A internal/worker/sqlc/db.go 25 0
A internal/worker/sqlc/jobs.sql.go 227 0
C internal/worker/sqlc/models.go 0 0
A internal/worker/sqlc/push_events.sql.go 95 0
A internal/worker/sqlc/querier.go 65 0
A internal/worker/sqlc/webhook_events_pending.sql.go 41 0
M sqlc.yaml 16 0
internal/meta/sqlc/models.gomodified
@@ -81,6 +81,21 @@ type EmailVerification struct {
81
 	CreatedAt   pgtype.Timestamptz
81
 	CreatedAt   pgtype.Timestamptz
82
 }
82
 }
83
 
83
 
84
+type Job struct {
85
+	ID          int64
86
+	Kind        string
87
+	Payload     []byte
88
+	RunAt       pgtype.Timestamptz
89
+	Attempts    int32
90
+	MaxAttempts int32
91
+	LastError   pgtype.Text
92
+	LockedBy    pgtype.Text
93
+	LockedAt    pgtype.Timestamptz
94
+	CompletedAt pgtype.Timestamptz
95
+	FailedAt    pgtype.Timestamptz
96
+	CreatedAt   pgtype.Timestamptz
97
+}
98
+
84
 type Meta struct {
99
 type Meta struct {
85
 	Key       string
100
 	Key       string
86
 	Value     []byte
101
 	Value     []byte
@@ -96,25 +111,39 @@ type PasswordReset struct {
96
 	CreatedAt pgtype.Timestamptz
111
 	CreatedAt pgtype.Timestamptz
97
 }
112
 }
98
 
113
 
114
+type PushEvent struct {
115
+	ID           int64
116
+	RepoID       int64
117
+	PusherUserID pgtype.Int8
118
+	BeforeSha    string
119
+	AfterSha     string
120
+	Ref          string
121
+	Protocol     string
122
+	RequestID    string
123
+	ProcessedAt  pgtype.Timestamptz
124
+	CreatedAt    pgtype.Timestamptz
125
+}
126
+
99
 type Repo struct {
127
 type Repo struct {
100
-	ID              int64
128
+	ID               int64
101
-	OwnerUserID     pgtype.Int8
129
+	OwnerUserID      pgtype.Int8
102
-	OwnerOrgID      pgtype.Int8
130
+	OwnerOrgID       pgtype.Int8
103
-	Name            string
131
+	Name             string
104
-	Description     string
132
+	Description      string
105
-	Visibility      RepoVisibility
133
+	Visibility       RepoVisibility
106
-	DefaultBranch   string
134
+	DefaultBranch    string
107
-	IsArchived      bool
135
+	IsArchived       bool
108
-	ArchivedAt      pgtype.Timestamptz
136
+	ArchivedAt       pgtype.Timestamptz
109
-	DeletedAt       pgtype.Timestamptz
137
+	DeletedAt        pgtype.Timestamptz
110
-	DiskUsedBytes   int64
138
+	DiskUsedBytes    int64
111
-	ForkOfRepoID    pgtype.Int8
139
+	ForkOfRepoID     pgtype.Int8
112
-	LicenseKey      pgtype.Text
140
+	LicenseKey       pgtype.Text
113
-	PrimaryLanguage pgtype.Text
141
+	PrimaryLanguage  pgtype.Text
114
-	HasIssues       bool
142
+	HasIssues        bool
115
-	HasPulls        bool
143
+	HasPulls         bool
116
-	CreatedAt       pgtype.Timestamptz
144
+	CreatedAt        pgtype.Timestamptz
117
-	UpdatedAt       pgtype.Timestamptz
145
+	UpdatedAt        pgtype.Timestamptz
146
+	DefaultBranchOid pgtype.Text
118
 }
147
 }
119
 
148
 
120
 type User struct {
149
 type User struct {
@@ -213,3 +242,11 @@ type UsernameRedirect struct {
213
 	UserID      int64
242
 	UserID      int64
214
 	ChangedAt   pgtype.Timestamptz
243
 	ChangedAt   pgtype.Timestamptz
215
 }
244
 }
245
+
246
+type WebhookEventsPending struct {
247
+	ID        int64
248
+	RepoID    int64
249
+	EventKind string
250
+	Payload   []byte
251
+	CreatedAt pgtype.Timestamptz
252
+}
internal/migrationsfs/migrations/0018_jobs_and_push_events.sqladded
@@ -0,0 +1,109 @@
1
+-- SPDX-License-Identifier: AGPL-3.0-or-later
2
+--
3
+-- S14 push processing pipeline.
4
+--
5
+-- Three tables + one column:
6
+--   * jobs                     — Postgres-backed work queue with
7
+--                                FOR UPDATE SKIP LOCKED dispatch.
8
+--   * push_events              — one row per ref pushed, written by the
9
+--                                post-receive hook; consumed by the
10
+--                                push:process job.
11
+--   * webhook_events_pending   — accumulator drained by the S33 webhook
12
+--                                deliverer; kept separate from the
13
+--                                generic job table so S33 controls its
14
+--                                own delivery cadence.
15
+--   * repos.default_branch_oid — set by push:process the first time the
16
+--                                repo's default branch receives a
17
+--                                commit; the home page reads it to
18
+--                                decide whether to render the empty or
19
+--                                populated view (S11/S17).
20
+--
21
+-- Why a Postgres queue instead of Redis: keeps the runtime dependency
22
+-- surface to one engine; FOR UPDATE SKIP LOCKED gives us safe concurrent
23
+-- dispatch out of the box; LISTEN/NOTIFY gives us idle wake-ups without
24
+-- polling. If we ever need cross-region or millions-of-jobs throughput
25
+-- we'll revisit, but that's well past MVP.
26
+
27
+-- +goose Up
28
+
29
+CREATE TABLE jobs (
30
+    id              bigserial   PRIMARY KEY,
31
+    kind            text        NOT NULL,
32
+    payload         jsonb       NOT NULL DEFAULT '{}',
33
+    run_at          timestamptz NOT NULL DEFAULT now(),
34
+    attempts        int         NOT NULL DEFAULT 0,
35
+    max_attempts    int         NOT NULL DEFAULT 5,
36
+    last_error      text,
37
+    locked_by       text,
38
+    locked_at       timestamptz,
39
+    completed_at    timestamptz,
40
+    failed_at       timestamptz,
41
+    created_at      timestamptz NOT NULL DEFAULT now(),
42
+
43
+    CONSTRAINT jobs_kind_length        CHECK (char_length(kind) BETWEEN 1 AND 100),
44
+    CONSTRAINT jobs_attempts_nonneg    CHECK (attempts >= 0),
45
+    CONSTRAINT jobs_max_attempts_pos   CHECK (max_attempts > 0)
46
+);
47
+
48
+-- The dispatch index: workers query for the oldest runnable row of a given
49
+-- kind. Partial because only un-completed and un-failed rows are dispatchable.
50
+CREATE INDEX jobs_dispatch_idx
51
+    ON jobs (kind, run_at)
52
+    WHERE completed_at IS NULL AND failed_at IS NULL;
53
+
54
+-- Visibility: which jobs are currently held by which worker.
55
+CREATE INDEX jobs_locked_idx
56
+    ON jobs (locked_by, locked_at)
57
+    WHERE locked_by IS NOT NULL;
58
+
59
+
60
+CREATE TABLE push_events (
61
+    id              bigserial   PRIMARY KEY,
62
+    repo_id         bigint      NOT NULL REFERENCES repos(id) ON DELETE CASCADE,
63
+    pusher_user_id  bigint      REFERENCES users(id) ON DELETE SET NULL,
64
+    before_sha      text        NOT NULL,
65
+    after_sha       text        NOT NULL,
66
+    ref             text        NOT NULL,
67
+    protocol        text        NOT NULL,
68
+    request_id      text        NOT NULL DEFAULT '',
69
+    processed_at    timestamptz,
70
+    created_at      timestamptz NOT NULL DEFAULT now(),
71
+
72
+    CONSTRAINT push_events_protocol      CHECK (protocol IN ('http', 'ssh')),
73
+    CONSTRAINT push_events_ref_length    CHECK (char_length(ref) BETWEEN 1 AND 255),
74
+    CONSTRAINT push_events_sha_length    CHECK (
75
+        char_length(before_sha) BETWEEN 1 AND 64 AND
76
+        char_length(after_sha)  BETWEEN 1 AND 64
77
+    )
78
+);
79
+
80
+CREATE INDEX push_events_repo_id_idx ON push_events (repo_id, created_at DESC);
81
+CREATE INDEX push_events_unprocessed_idx
82
+    ON push_events (created_at)
83
+    WHERE processed_at IS NULL;
84
+
85
+
86
+CREATE TABLE webhook_events_pending (
87
+    id              bigserial   PRIMARY KEY,
88
+    repo_id         bigint      NOT NULL REFERENCES repos(id) ON DELETE CASCADE,
89
+    event_kind      text        NOT NULL,
90
+    payload         jsonb       NOT NULL,
91
+    created_at      timestamptz NOT NULL DEFAULT now(),
92
+
93
+    CONSTRAINT webhook_events_pending_kind_length CHECK (char_length(event_kind) BETWEEN 1 AND 100)
94
+);
95
+
96
+CREATE INDEX webhook_events_pending_repo_id_idx
97
+    ON webhook_events_pending (repo_id, created_at);
98
+
99
+
100
+-- default_branch_oid is the OID at HEAD of repos.default_branch. NULL
101
+-- until the first push lands; the repo home view checks this to fork
102
+-- between empty and populated layouts (refs-on-disk fallback in S11).
103
+ALTER TABLE repos ADD COLUMN default_branch_oid text;
104
+
105
+-- +goose Down
106
+ALTER TABLE repos DROP COLUMN IF EXISTS default_branch_oid;
107
+DROP TABLE IF EXISTS webhook_events_pending;
108
+DROP TABLE IF EXISTS push_events;
109
+DROP TABLE IF EXISTS jobs;
internal/repos/queries/repos.sqlmodified
@@ -10,13 +10,30 @@ INSERT INTO repos (
10
 RETURNING id, owner_user_id, owner_org_id, name, description, visibility,
10
 RETURNING id, owner_user_id, owner_org_id, name, description, visibility,
11
           default_branch, is_archived, archived_at, deleted_at,
11
           default_branch, is_archived, archived_at, deleted_at,
12
           disk_used_bytes, fork_of_repo_id, license_key, primary_language,
12
           disk_used_bytes, fork_of_repo_id, license_key, primary_language,
13
-          has_issues, has_pulls, created_at, updated_at;
13
+          has_issues, has_pulls, created_at, updated_at, default_branch_oid;
14
+
15
+-- name: GetRepoByID :one
16
+SELECT id, owner_user_id, owner_org_id, name, description, visibility,
17
+       default_branch, is_archived, archived_at, deleted_at,
18
+       disk_used_bytes, fork_of_repo_id, license_key, primary_language,
19
+       has_issues, has_pulls, created_at, updated_at, default_branch_oid
20
+FROM repos
21
+WHERE id = $1;
22
+
23
+-- name: GetRepoOwnerUsernameByID :one
24
+-- Returns the owner_username for a repo. Used by size-recalc and other
25
+-- jobs that need to derive the bare-repo on-disk path without round-
26
+-- tripping through the full user row.
27
+SELECT u.username AS owner_username, r.name AS repo_name
28
+FROM repos r
29
+JOIN users u ON u.id = r.owner_user_id
30
+WHERE r.id = $1;
14
 
31
 
15
 -- name: GetRepoByOwnerUserAndName :one
32
 -- name: GetRepoByOwnerUserAndName :one
16
 SELECT id, owner_user_id, owner_org_id, name, description, visibility,
33
 SELECT id, owner_user_id, owner_org_id, name, description, visibility,
17
        default_branch, is_archived, archived_at, deleted_at,
34
        default_branch, is_archived, archived_at, deleted_at,
18
        disk_used_bytes, fork_of_repo_id, license_key, primary_language,
35
        disk_used_bytes, fork_of_repo_id, license_key, primary_language,
19
-       has_issues, has_pulls, created_at, updated_at
36
+       has_issues, has_pulls, created_at, updated_at, default_branch_oid
20
 FROM repos
37
 FROM repos
21
 WHERE owner_user_id = $1 AND name = $2 AND deleted_at IS NULL;
38
 WHERE owner_user_id = $1 AND name = $2 AND deleted_at IS NULL;
22
 
39
 
@@ -30,7 +47,7 @@ SELECT EXISTS(
30
 SELECT id, owner_user_id, owner_org_id, name, description, visibility,
47
 SELECT id, owner_user_id, owner_org_id, name, description, visibility,
31
        default_branch, is_archived, archived_at, deleted_at,
48
        default_branch, is_archived, archived_at, deleted_at,
32
        disk_used_bytes, fork_of_repo_id, license_key, primary_language,
49
        disk_used_bytes, fork_of_repo_id, license_key, primary_language,
33
-       has_issues, has_pulls, created_at, updated_at
50
+       has_issues, has_pulls, created_at, updated_at, default_branch_oid
34
 FROM repos
51
 FROM repos
35
 WHERE owner_user_id = $1 AND deleted_at IS NULL
52
 WHERE owner_user_id = $1 AND deleted_at IS NULL
36
 ORDER BY updated_at DESC;
53
 ORDER BY updated_at DESC;
@@ -44,3 +61,22 @@ UPDATE repos SET deleted_at = now() WHERE id = $1;
44
 
61
 
45
 -- name: UpdateRepoDiskUsed :exec
62
 -- name: UpdateRepoDiskUsed :exec
46
 UPDATE repos SET disk_used_bytes = $2 WHERE id = $1;
63
 UPDATE repos SET disk_used_bytes = $2 WHERE id = $1;
64
+
65
+-- name: UpdateRepoDefaultBranchOID :exec
66
+-- Set when push:process detects a commit on the repo's default branch.
67
+-- Pass NULL to clear (e.g. when the branch is force-deleted in a future
68
+-- sprint). The repo home view reads this to decide between empty and
69
+-- populated layouts.
70
+UPDATE repos SET default_branch_oid = sqlc.narg(default_branch_oid)::text WHERE id = $1;
71
+
72
+-- name: ListAllRepoFullNames :many
73
+-- Used by `shithubd hooks reinstall --all` to enumerate every active
74
+-- bare repo on disk and re-link its hooks.
75
+SELECT
76
+    r.id,
77
+    r.name,
78
+    u.username AS owner_username
79
+FROM repos r
80
+JOIN users u ON u.id = r.owner_user_id
81
+WHERE r.deleted_at IS NULL
82
+ORDER BY r.id;
internal/repos/sqlc/models.gomodified
@@ -81,6 +81,21 @@ type EmailVerification struct {
81
 	CreatedAt   pgtype.Timestamptz
81
 	CreatedAt   pgtype.Timestamptz
82
 }
82
 }
83
 
83
 
84
+type Job struct {
85
+	ID          int64
86
+	Kind        string
87
+	Payload     []byte
88
+	RunAt       pgtype.Timestamptz
89
+	Attempts    int32
90
+	MaxAttempts int32
91
+	LastError   pgtype.Text
92
+	LockedBy    pgtype.Text
93
+	LockedAt    pgtype.Timestamptz
94
+	CompletedAt pgtype.Timestamptz
95
+	FailedAt    pgtype.Timestamptz
96
+	CreatedAt   pgtype.Timestamptz
97
+}
98
+
84
 type Meta struct {
99
 type Meta struct {
85
 	Key       string
100
 	Key       string
86
 	Value     []byte
101
 	Value     []byte
@@ -96,25 +111,39 @@ type PasswordReset struct {
96
 	CreatedAt pgtype.Timestamptz
111
 	CreatedAt pgtype.Timestamptz
97
 }
112
 }
98
 
113
 
114
+type PushEvent struct {
115
+	ID           int64
116
+	RepoID       int64
117
+	PusherUserID pgtype.Int8
118
+	BeforeSha    string
119
+	AfterSha     string
120
+	Ref          string
121
+	Protocol     string
122
+	RequestID    string
123
+	ProcessedAt  pgtype.Timestamptz
124
+	CreatedAt    pgtype.Timestamptz
125
+}
126
+
99
 type Repo struct {
127
 type Repo struct {
100
-	ID              int64
128
+	ID               int64
101
-	OwnerUserID     pgtype.Int8
129
+	OwnerUserID      pgtype.Int8
102
-	OwnerOrgID      pgtype.Int8
130
+	OwnerOrgID       pgtype.Int8
103
-	Name            string
131
+	Name             string
104
-	Description     string
132
+	Description      string
105
-	Visibility      RepoVisibility
133
+	Visibility       RepoVisibility
106
-	DefaultBranch   string
134
+	DefaultBranch    string
107
-	IsArchived      bool
135
+	IsArchived       bool
108
-	ArchivedAt      pgtype.Timestamptz
136
+	ArchivedAt       pgtype.Timestamptz
109
-	DeletedAt       pgtype.Timestamptz
137
+	DeletedAt        pgtype.Timestamptz
110
-	DiskUsedBytes   int64
138
+	DiskUsedBytes    int64
111
-	ForkOfRepoID    pgtype.Int8
139
+	ForkOfRepoID     pgtype.Int8
112
-	LicenseKey      pgtype.Text
140
+	LicenseKey       pgtype.Text
113
-	PrimaryLanguage pgtype.Text
141
+	PrimaryLanguage  pgtype.Text
114
-	HasIssues       bool
142
+	HasIssues        bool
115
-	HasPulls        bool
143
+	HasPulls         bool
116
-	CreatedAt       pgtype.Timestamptz
144
+	CreatedAt        pgtype.Timestamptz
117
-	UpdatedAt       pgtype.Timestamptz
145
+	UpdatedAt        pgtype.Timestamptz
146
+	DefaultBranchOid pgtype.Text
118
 }
147
 }
119
 
148
 
120
 type User struct {
149
 type User struct {
@@ -213,3 +242,11 @@ type UsernameRedirect struct {
213
 	UserID      int64
242
 	UserID      int64
214
 	ChangedAt   pgtype.Timestamptz
243
 	ChangedAt   pgtype.Timestamptz
215
 }
244
 }
245
+
246
+type WebhookEventsPending struct {
247
+	ID        int64
248
+	RepoID    int64
249
+	EventKind string
250
+	Payload   []byte
251
+	CreatedAt pgtype.Timestamptz
252
+}
internal/repos/sqlc/querier.gomodified
@@ -15,9 +15,22 @@ type Querier interface {
15
 	// SPDX-License-Identifier: AGPL-3.0-or-later
15
 	// SPDX-License-Identifier: AGPL-3.0-or-later
16
 	CreateRepo(ctx context.Context, db DBTX, arg CreateRepoParams) (Repo, error)
16
 	CreateRepo(ctx context.Context, db DBTX, arg CreateRepoParams) (Repo, error)
17
 	ExistsRepoForOwnerUser(ctx context.Context, db DBTX, arg ExistsRepoForOwnerUserParams) (bool, error)
17
 	ExistsRepoForOwnerUser(ctx context.Context, db DBTX, arg ExistsRepoForOwnerUserParams) (bool, error)
18
+	GetRepoByID(ctx context.Context, db DBTX, id int64) (Repo, error)
18
 	GetRepoByOwnerUserAndName(ctx context.Context, db DBTX, arg GetRepoByOwnerUserAndNameParams) (Repo, error)
19
 	GetRepoByOwnerUserAndName(ctx context.Context, db DBTX, arg GetRepoByOwnerUserAndNameParams) (Repo, error)
20
+	// Returns the owner_username for a repo. Used by size-recalc and other
21
+	// jobs that need to derive the bare-repo on-disk path without round-
22
+	// tripping through the full user row.
23
+	GetRepoOwnerUsernameByID(ctx context.Context, db DBTX, id int64) (GetRepoOwnerUsernameByIDRow, error)
24
+	// Used by `shithubd hooks reinstall --all` to enumerate every active
25
+	// bare repo on disk and re-link its hooks.
26
+	ListAllRepoFullNames(ctx context.Context, db DBTX) ([]ListAllRepoFullNamesRow, error)
19
 	ListReposForOwnerUser(ctx context.Context, db DBTX, ownerUserID pgtype.Int8) ([]Repo, error)
27
 	ListReposForOwnerUser(ctx context.Context, db DBTX, ownerUserID pgtype.Int8) ([]Repo, error)
20
 	SoftDeleteRepo(ctx context.Context, db DBTX, id int64) error
28
 	SoftDeleteRepo(ctx context.Context, db DBTX, id int64) error
29
+	// Set when push:process detects a commit on the repo's default branch.
30
+	// Pass NULL to clear (e.g. when the branch is force-deleted in a future
31
+	// sprint). The repo home view reads this to decide between empty and
32
+	// populated layouts.
33
+	UpdateRepoDefaultBranchOID(ctx context.Context, db DBTX, arg UpdateRepoDefaultBranchOIDParams) error
21
 	UpdateRepoDiskUsed(ctx context.Context, db DBTX, arg UpdateRepoDiskUsedParams) error
34
 	UpdateRepoDiskUsed(ctx context.Context, db DBTX, arg UpdateRepoDiskUsedParams) error
22
 }
35
 }
23
 
36
 
internal/repos/sqlc/repos.sql.gomodified
@@ -34,7 +34,7 @@ INSERT INTO repos (
34
 RETURNING id, owner_user_id, owner_org_id, name, description, visibility,
34
 RETURNING id, owner_user_id, owner_org_id, name, description, visibility,
35
           default_branch, is_archived, archived_at, deleted_at,
35
           default_branch, is_archived, archived_at, deleted_at,
36
           disk_used_bytes, fork_of_repo_id, license_key, primary_language,
36
           disk_used_bytes, fork_of_repo_id, license_key, primary_language,
37
-          has_issues, has_pulls, created_at, updated_at
37
+          has_issues, has_pulls, created_at, updated_at, default_branch_oid
38
 `
38
 `
39
 
39
 
40
 type CreateRepoParams struct {
40
 type CreateRepoParams struct {
@@ -80,6 +80,7 @@ func (q *Queries) CreateRepo(ctx context.Context, db DBTX, arg CreateRepoParams)
80
 		&i.HasPulls,
80
 		&i.HasPulls,
81
 		&i.CreatedAt,
81
 		&i.CreatedAt,
82
 		&i.UpdatedAt,
82
 		&i.UpdatedAt,
83
+		&i.DefaultBranchOid,
83
 	)
84
 	)
84
 	return i, err
85
 	return i, err
85
 }
86
 }
@@ -103,11 +104,47 @@ func (q *Queries) ExistsRepoForOwnerUser(ctx context.Context, db DBTX, arg Exist
103
 	return exists, err
104
 	return exists, err
104
 }
105
 }
105
 
106
 
107
+const getRepoByID = `-- name: GetRepoByID :one
108
+SELECT id, owner_user_id, owner_org_id, name, description, visibility,
109
+       default_branch, is_archived, archived_at, deleted_at,
110
+       disk_used_bytes, fork_of_repo_id, license_key, primary_language,
111
+       has_issues, has_pulls, created_at, updated_at, default_branch_oid
112
+FROM repos
113
+WHERE id = $1
114
+`
115
+
116
+func (q *Queries) GetRepoByID(ctx context.Context, db DBTX, id int64) (Repo, error) {
117
+	row := db.QueryRow(ctx, getRepoByID, id)
118
+	var i Repo
119
+	err := row.Scan(
120
+		&i.ID,
121
+		&i.OwnerUserID,
122
+		&i.OwnerOrgID,
123
+		&i.Name,
124
+		&i.Description,
125
+		&i.Visibility,
126
+		&i.DefaultBranch,
127
+		&i.IsArchived,
128
+		&i.ArchivedAt,
129
+		&i.DeletedAt,
130
+		&i.DiskUsedBytes,
131
+		&i.ForkOfRepoID,
132
+		&i.LicenseKey,
133
+		&i.PrimaryLanguage,
134
+		&i.HasIssues,
135
+		&i.HasPulls,
136
+		&i.CreatedAt,
137
+		&i.UpdatedAt,
138
+		&i.DefaultBranchOid,
139
+	)
140
+	return i, err
141
+}
142
+
106
 const getRepoByOwnerUserAndName = `-- name: GetRepoByOwnerUserAndName :one
143
 const getRepoByOwnerUserAndName = `-- name: GetRepoByOwnerUserAndName :one
107
 SELECT id, owner_user_id, owner_org_id, name, description, visibility,
144
 SELECT id, owner_user_id, owner_org_id, name, description, visibility,
108
        default_branch, is_archived, archived_at, deleted_at,
145
        default_branch, is_archived, archived_at, deleted_at,
109
        disk_used_bytes, fork_of_repo_id, license_key, primary_language,
146
        disk_used_bytes, fork_of_repo_id, license_key, primary_language,
110
-       has_issues, has_pulls, created_at, updated_at
147
+       has_issues, has_pulls, created_at, updated_at, default_branch_oid
111
 FROM repos
148
 FROM repos
112
 WHERE owner_user_id = $1 AND name = $2 AND deleted_at IS NULL
149
 WHERE owner_user_id = $1 AND name = $2 AND deleted_at IS NULL
113
 `
150
 `
@@ -139,15 +176,77 @@ func (q *Queries) GetRepoByOwnerUserAndName(ctx context.Context, db DBTX, arg Ge
139
 		&i.HasPulls,
176
 		&i.HasPulls,
140
 		&i.CreatedAt,
177
 		&i.CreatedAt,
141
 		&i.UpdatedAt,
178
 		&i.UpdatedAt,
179
+		&i.DefaultBranchOid,
142
 	)
180
 	)
143
 	return i, err
181
 	return i, err
144
 }
182
 }
145
 
183
 
184
+const getRepoOwnerUsernameByID = `-- name: GetRepoOwnerUsernameByID :one
185
+SELECT u.username AS owner_username, r.name AS repo_name
186
+FROM repos r
187
+JOIN users u ON u.id = r.owner_user_id
188
+WHERE r.id = $1
189
+`
190
+
191
+type GetRepoOwnerUsernameByIDRow struct {
192
+	OwnerUsername string
193
+	RepoName      string
194
+}
195
+
196
+// Returns the owner_username for a repo. Used by size-recalc and other
197
+// jobs that need to derive the bare-repo on-disk path without round-
198
+// tripping through the full user row.
199
+func (q *Queries) GetRepoOwnerUsernameByID(ctx context.Context, db DBTX, id int64) (GetRepoOwnerUsernameByIDRow, error) {
200
+	row := db.QueryRow(ctx, getRepoOwnerUsernameByID, id)
201
+	var i GetRepoOwnerUsernameByIDRow
202
+	err := row.Scan(&i.OwnerUsername, &i.RepoName)
203
+	return i, err
204
+}
205
+
206
+const listAllRepoFullNames = `-- name: ListAllRepoFullNames :many
207
+SELECT
208
+    r.id,
209
+    r.name,
210
+    u.username AS owner_username
211
+FROM repos r
212
+JOIN users u ON u.id = r.owner_user_id
213
+WHERE r.deleted_at IS NULL
214
+ORDER BY r.id
215
+`
216
+
217
+type ListAllRepoFullNamesRow struct {
218
+	ID            int64
219
+	Name          string
220
+	OwnerUsername string
221
+}
222
+
223
+// Used by `shithubd hooks reinstall --all` to enumerate every active
224
+// bare repo on disk and re-link its hooks.
225
+func (q *Queries) ListAllRepoFullNames(ctx context.Context, db DBTX) ([]ListAllRepoFullNamesRow, error) {
226
+	rows, err := db.Query(ctx, listAllRepoFullNames)
227
+	if err != nil {
228
+		return nil, err
229
+	}
230
+	defer rows.Close()
231
+	items := []ListAllRepoFullNamesRow{}
232
+	for rows.Next() {
233
+		var i ListAllRepoFullNamesRow
234
+		if err := rows.Scan(&i.ID, &i.Name, &i.OwnerUsername); err != nil {
235
+			return nil, err
236
+		}
237
+		items = append(items, i)
238
+	}
239
+	if err := rows.Err(); err != nil {
240
+		return nil, err
241
+	}
242
+	return items, nil
243
+}
244
+
146
 const listReposForOwnerUser = `-- name: ListReposForOwnerUser :many
245
 const listReposForOwnerUser = `-- name: ListReposForOwnerUser :many
147
 SELECT id, owner_user_id, owner_org_id, name, description, visibility,
246
 SELECT id, owner_user_id, owner_org_id, name, description, visibility,
148
        default_branch, is_archived, archived_at, deleted_at,
247
        default_branch, is_archived, archived_at, deleted_at,
149
        disk_used_bytes, fork_of_repo_id, license_key, primary_language,
248
        disk_used_bytes, fork_of_repo_id, license_key, primary_language,
150
-       has_issues, has_pulls, created_at, updated_at
249
+       has_issues, has_pulls, created_at, updated_at, default_branch_oid
151
 FROM repos
250
 FROM repos
152
 WHERE owner_user_id = $1 AND deleted_at IS NULL
251
 WHERE owner_user_id = $1 AND deleted_at IS NULL
153
 ORDER BY updated_at DESC
252
 ORDER BY updated_at DESC
@@ -181,6 +280,7 @@ func (q *Queries) ListReposForOwnerUser(ctx context.Context, db DBTX, ownerUserI
181
 			&i.HasPulls,
280
 			&i.HasPulls,
182
 			&i.CreatedAt,
281
 			&i.CreatedAt,
183
 			&i.UpdatedAt,
282
 			&i.UpdatedAt,
283
+			&i.DefaultBranchOid,
184
 		); err != nil {
284
 		); err != nil {
185
 			return nil, err
285
 			return nil, err
186
 		}
286
 		}
@@ -201,6 +301,24 @@ func (q *Queries) SoftDeleteRepo(ctx context.Context, db DBTX, id int64) error {
201
 	return err
301
 	return err
202
 }
302
 }
203
 
303
 
304
+const updateRepoDefaultBranchOID = `-- name: UpdateRepoDefaultBranchOID :exec
305
+UPDATE repos SET default_branch_oid = $2::text WHERE id = $1
306
+`
307
+
308
+type UpdateRepoDefaultBranchOIDParams struct {
309
+	ID               int64
310
+	DefaultBranchOid pgtype.Text
311
+}
312
+
313
+// Set when push:process detects a commit on the repo's default branch.
314
+// Pass NULL to clear (e.g. when the branch is force-deleted in a future
315
+// sprint). The repo home view reads this to decide between empty and
316
+// populated layouts.
317
+func (q *Queries) UpdateRepoDefaultBranchOID(ctx context.Context, db DBTX, arg UpdateRepoDefaultBranchOIDParams) error {
318
+	_, err := db.Exec(ctx, updateRepoDefaultBranchOID, arg.ID, arg.DefaultBranchOid)
319
+	return err
320
+}
321
+
204
 const updateRepoDiskUsed = `-- name: UpdateRepoDiskUsed :exec
322
 const updateRepoDiskUsed = `-- name: UpdateRepoDiskUsed :exec
205
 UPDATE repos SET disk_used_bytes = $2 WHERE id = $1
323
 UPDATE repos SET disk_used_bytes = $2 WHERE id = $1
206
 `
324
 `
internal/users/sqlc/models.gomodified
@@ -81,6 +81,21 @@ type EmailVerification struct {
81
 	CreatedAt   pgtype.Timestamptz
81
 	CreatedAt   pgtype.Timestamptz
82
 }
82
 }
83
 
83
 
84
+type Job struct {
85
+	ID          int64
86
+	Kind        string
87
+	Payload     []byte
88
+	RunAt       pgtype.Timestamptz
89
+	Attempts    int32
90
+	MaxAttempts int32
91
+	LastError   pgtype.Text
92
+	LockedBy    pgtype.Text
93
+	LockedAt    pgtype.Timestamptz
94
+	CompletedAt pgtype.Timestamptz
95
+	FailedAt    pgtype.Timestamptz
96
+	CreatedAt   pgtype.Timestamptz
97
+}
98
+
84
 type Meta struct {
99
 type Meta struct {
85
 	Key       string
100
 	Key       string
86
 	Value     []byte
101
 	Value     []byte
@@ -96,25 +111,39 @@ type PasswordReset struct {
96
 	CreatedAt pgtype.Timestamptz
111
 	CreatedAt pgtype.Timestamptz
97
 }
112
 }
98
 
113
 
114
+type PushEvent struct {
115
+	ID           int64
116
+	RepoID       int64
117
+	PusherUserID pgtype.Int8
118
+	BeforeSha    string
119
+	AfterSha     string
120
+	Ref          string
121
+	Protocol     string
122
+	RequestID    string
123
+	ProcessedAt  pgtype.Timestamptz
124
+	CreatedAt    pgtype.Timestamptz
125
+}
126
+
99
 type Repo struct {
127
 type Repo struct {
100
-	ID              int64
128
+	ID               int64
101
-	OwnerUserID     pgtype.Int8
129
+	OwnerUserID      pgtype.Int8
102
-	OwnerOrgID      pgtype.Int8
130
+	OwnerOrgID       pgtype.Int8
103
-	Name            string
131
+	Name             string
104
-	Description     string
132
+	Description      string
105
-	Visibility      RepoVisibility
133
+	Visibility       RepoVisibility
106
-	DefaultBranch   string
134
+	DefaultBranch    string
107
-	IsArchived      bool
135
+	IsArchived       bool
108
-	ArchivedAt      pgtype.Timestamptz
136
+	ArchivedAt       pgtype.Timestamptz
109
-	DeletedAt       pgtype.Timestamptz
137
+	DeletedAt        pgtype.Timestamptz
110
-	DiskUsedBytes   int64
138
+	DiskUsedBytes    int64
111
-	ForkOfRepoID    pgtype.Int8
139
+	ForkOfRepoID     pgtype.Int8
112
-	LicenseKey      pgtype.Text
140
+	LicenseKey       pgtype.Text
113
-	PrimaryLanguage pgtype.Text
141
+	PrimaryLanguage  pgtype.Text
114
-	HasIssues       bool
142
+	HasIssues        bool
115
-	HasPulls        bool
143
+	HasPulls         bool
116
-	CreatedAt       pgtype.Timestamptz
144
+	CreatedAt        pgtype.Timestamptz
117
-	UpdatedAt       pgtype.Timestamptz
145
+	UpdatedAt        pgtype.Timestamptz
146
+	DefaultBranchOid pgtype.Text
118
 }
147
 }
119
 
148
 
120
 type User struct {
149
 type User struct {
@@ -213,3 +242,11 @@ type UsernameRedirect struct {
213
 	UserID      int64
242
 	UserID      int64
214
 	ChangedAt   pgtype.Timestamptz
243
 	ChangedAt   pgtype.Timestamptz
215
 }
244
 }
245
+
246
+type WebhookEventsPending struct {
247
+	ID        int64
248
+	RepoID    int64
249
+	EventKind string
250
+	Payload   []byte
251
+	CreatedAt pgtype.Timestamptz
252
+}
internal/worker/queries/jobs.sqladded
@@ -0,0 +1,87 @@
1
+-- SPDX-License-Identifier: AGPL-3.0-or-later
2
+--
3
+-- Jobs queue. The dispatch query uses FOR UPDATE SKIP LOCKED — the
4
+-- canonical Postgres pattern for safe concurrent dequeue. Multiple
5
+-- workers can run ClaimJob concurrently; each gets a different row.
6
+
7
+-- name: EnqueueJob :one
8
+-- Insert a new job. run_at defaults to now() so the job is immediately
9
+-- runnable; pass a future timestamp for delayed work. The returned row
10
+-- carries the assigned ID, which callers persist when the enqueue is
11
+-- done inside a wider transaction.
12
+INSERT INTO jobs (kind, payload, run_at, max_attempts)
13
+VALUES ($1, $2, COALESCE(sqlc.narg(run_at)::timestamptz, now()), COALESCE(sqlc.narg(max_attempts)::int, 5))
14
+RETURNING *;
15
+
16
+-- name: ClaimJob :one
17
+-- Atomically claim the oldest runnable job of a given kind. Workers
18
+-- supply their own instance ID as locked_by so admins can see who's
19
+-- holding what. Returns pgx.ErrNoRows when the queue is empty for
20
+-- that kind.
21
+UPDATE jobs
22
+SET locked_by = $2,
23
+    locked_at = now(),
24
+    attempts  = jobs.attempts + 1
25
+WHERE id = (
26
+    SELECT j.id FROM jobs j
27
+    WHERE j.kind         = $1
28
+      AND j.completed_at IS NULL
29
+      AND j.failed_at    IS NULL
30
+      AND j.run_at       <= now()
31
+      AND (j.locked_by IS NULL OR j.locked_at < now() - interval '5 minutes')
32
+    ORDER BY j.run_at ASC, j.id ASC
33
+    FOR UPDATE SKIP LOCKED
34
+    LIMIT 1
35
+)
36
+RETURNING *;
37
+
38
+-- name: MarkJobCompleted :exec
39
+-- On success: clear lock + set completed_at.
40
+UPDATE jobs
41
+SET completed_at = now(),
42
+    locked_by    = NULL,
43
+    locked_at    = NULL
44
+WHERE id = $1;
45
+
46
+-- name: RescheduleJob :exec
47
+-- Transient failure: clear the lock, record the error, push run_at
48
+-- forward by the caller-computed backoff. attempts was incremented
49
+-- by ClaimJob, so checking attempts >= max_attempts elsewhere decides
50
+-- between Reschedule and MarkJobFailed.
51
+UPDATE jobs
52
+SET locked_by  = NULL,
53
+    locked_at  = NULL,
54
+    last_error = $2,
55
+    run_at     = $3
56
+WHERE id = $1;
57
+
58
+-- name: MarkJobFailed :exec
59
+-- Terminal failure (attempts hit max). Holds locked_by NULL so the
60
+-- row is no longer "in flight" to dashboards; failed_at marks it
61
+-- visible to a future poison-job inspector (S34 admin panel).
62
+UPDATE jobs
63
+SET failed_at  = now(),
64
+    locked_by  = NULL,
65
+    locked_at  = NULL,
66
+    last_error = $2
67
+WHERE id = $1;
68
+
69
+-- name: PurgeCompletedJobs :execrows
70
+-- Delete completed jobs older than the supplied cutoff. Used by the
71
+-- jobs:purge_completed maintenance job. Returns the number of rows
72
+-- deleted so the cron handler can log progress.
73
+DELETE FROM jobs
74
+WHERE completed_at IS NOT NULL
75
+  AND completed_at < $1;
76
+
77
+-- name: PurgeFailedJobs :execrows
78
+-- Same as PurgeCompletedJobs but for terminally failed rows. Kept
79
+-- separate because operators usually want a longer retention on
80
+-- failures to inspect what blew up.
81
+DELETE FROM jobs
82
+WHERE failed_at IS NOT NULL
83
+  AND failed_at < $1;
84
+
85
+-- name: GetJob :one
86
+-- Single-row lookup by id. Used in tests and the admin panel.
87
+SELECT * FROM jobs WHERE id = $1;
internal/worker/queries/push_events.sqladded
@@ -0,0 +1,20 @@
1
+-- SPDX-License-Identifier: AGPL-3.0-or-later
2
+--
3
+-- push_events records what landed; the post-receive hook writes; the
4
+-- push:process job consumes.
5
+
6
+-- name: InsertPushEvent :one
7
+INSERT INTO push_events (
8
+    repo_id, pusher_user_id, before_sha, after_sha, ref, protocol, request_id
9
+) VALUES (
10
+    $1, sqlc.narg(pusher_user_id)::bigint, $2, $3, $4, $5, COALESCE(sqlc.narg(request_id)::text, '')
11
+)
12
+RETURNING *;
13
+
14
+-- name: GetPushEvent :one
15
+SELECT * FROM push_events WHERE id = $1;
16
+
17
+-- name: MarkPushEventProcessed :exec
18
+UPDATE push_events
19
+SET processed_at = now()
20
+WHERE id = $1;
internal/worker/queries/webhook_events_pending.sqladded
@@ -0,0 +1,10 @@
1
+-- SPDX-License-Identifier: AGPL-3.0-or-later
2
+--
3
+-- The S33 webhook deliverer drains this. S14 only inserts; the
4
+-- accumulator is intentionally separate from the generic jobs table so
5
+-- S33 controls its own retry / batching / fan-out shape.
6
+
7
+-- name: InsertWebhookEventPending :one
8
+INSERT INTO webhook_events_pending (repo_id, event_kind, payload)
9
+VALUES ($1, $2, $3)
10
+RETURNING *;
internal/worker/sqlc/db.goadded
@@ -0,0 +1,25 @@
1
+// Code generated by sqlc. DO NOT EDIT.
2
+// versions:
3
+//   sqlc v1.31.1
4
+
5
+package workerdb
6
+
7
+import (
8
+	"context"
9
+
10
+	"github.com/jackc/pgx/v5"
11
+	"github.com/jackc/pgx/v5/pgconn"
12
+)
13
+
14
+type DBTX interface {
15
+	Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error)
16
+	Query(context.Context, string, ...interface{}) (pgx.Rows, error)
17
+	QueryRow(context.Context, string, ...interface{}) pgx.Row
18
+}
19
+
20
+func New() *Queries {
21
+	return &Queries{}
22
+}
23
+
24
+type Queries struct {
25
+}
internal/worker/sqlc/jobs.sql.goadded
@@ -0,0 +1,227 @@
1
+// Code generated by sqlc. DO NOT EDIT.
2
+// versions:
3
+//   sqlc v1.31.1
4
+// source: jobs.sql
5
+
6
+package workerdb
7
+
8
+import (
9
+	"context"
10
+
11
+	"github.com/jackc/pgx/v5/pgtype"
12
+)
13
+
14
+const claimJob = `-- name: ClaimJob :one
15
+UPDATE jobs
16
+SET locked_by = $2,
17
+    locked_at = now(),
18
+    attempts  = jobs.attempts + 1
19
+WHERE id = (
20
+    SELECT j.id FROM jobs j
21
+    WHERE j.kind         = $1
22
+      AND j.completed_at IS NULL
23
+      AND j.failed_at    IS NULL
24
+      AND j.run_at       <= now()
25
+      AND (j.locked_by IS NULL OR j.locked_at < now() - interval '5 minutes')
26
+    ORDER BY j.run_at ASC, j.id ASC
27
+    FOR UPDATE SKIP LOCKED
28
+    LIMIT 1
29
+)
30
+RETURNING id, kind, payload, run_at, attempts, max_attempts, last_error, locked_by, locked_at, completed_at, failed_at, created_at
31
+`
32
+
33
+type ClaimJobParams struct {
34
+	Kind     string
35
+	LockedBy pgtype.Text
36
+}
37
+
38
+// Atomically claim the oldest runnable job of a given kind. Workers
39
+// supply their own instance ID as locked_by so admins can see who's
40
+// holding what. Returns pgx.ErrNoRows when the queue is empty for
41
+// that kind.
42
+func (q *Queries) ClaimJob(ctx context.Context, db DBTX, arg ClaimJobParams) (Job, error) {
43
+	row := db.QueryRow(ctx, claimJob, arg.Kind, arg.LockedBy)
44
+	var i Job
45
+	err := row.Scan(
46
+		&i.ID,
47
+		&i.Kind,
48
+		&i.Payload,
49
+		&i.RunAt,
50
+		&i.Attempts,
51
+		&i.MaxAttempts,
52
+		&i.LastError,
53
+		&i.LockedBy,
54
+		&i.LockedAt,
55
+		&i.CompletedAt,
56
+		&i.FailedAt,
57
+		&i.CreatedAt,
58
+	)
59
+	return i, err
60
+}
61
+
62
+const enqueueJob = `-- name: EnqueueJob :one
63
+
64
+INSERT INTO jobs (kind, payload, run_at, max_attempts)
65
+VALUES ($1, $2, COALESCE($3::timestamptz, now()), COALESCE($4::int, 5))
66
+RETURNING id, kind, payload, run_at, attempts, max_attempts, last_error, locked_by, locked_at, completed_at, failed_at, created_at
67
+`
68
+
69
+type EnqueueJobParams struct {
70
+	Kind        string
71
+	Payload     []byte
72
+	RunAt       pgtype.Timestamptz
73
+	MaxAttempts pgtype.Int4
74
+}
75
+
76
+// SPDX-License-Identifier: AGPL-3.0-or-later
77
+//
78
+// Jobs queue. The dispatch query uses FOR UPDATE SKIP LOCKED — the
79
+// canonical Postgres pattern for safe concurrent dequeue. Multiple
80
+// workers can run ClaimJob concurrently; each gets a different row.
81
+// Insert a new job. run_at defaults to now() so the job is immediately
82
+// runnable; pass a future timestamp for delayed work. The returned row
83
+// carries the assigned ID, which callers persist when the enqueue is
84
+// done inside a wider transaction.
85
+func (q *Queries) EnqueueJob(ctx context.Context, db DBTX, arg EnqueueJobParams) (Job, error) {
86
+	row := db.QueryRow(ctx, enqueueJob,
87
+		arg.Kind,
88
+		arg.Payload,
89
+		arg.RunAt,
90
+		arg.MaxAttempts,
91
+	)
92
+	var i Job
93
+	err := row.Scan(
94
+		&i.ID,
95
+		&i.Kind,
96
+		&i.Payload,
97
+		&i.RunAt,
98
+		&i.Attempts,
99
+		&i.MaxAttempts,
100
+		&i.LastError,
101
+		&i.LockedBy,
102
+		&i.LockedAt,
103
+		&i.CompletedAt,
104
+		&i.FailedAt,
105
+		&i.CreatedAt,
106
+	)
107
+	return i, err
108
+}
109
+
110
+const getJob = `-- name: GetJob :one
111
+SELECT id, kind, payload, run_at, attempts, max_attempts, last_error, locked_by, locked_at, completed_at, failed_at, created_at FROM jobs WHERE id = $1
112
+`
113
+
114
+// Single-row lookup by id. Used in tests and the admin panel.
115
+func (q *Queries) GetJob(ctx context.Context, db DBTX, id int64) (Job, error) {
116
+	row := db.QueryRow(ctx, getJob, id)
117
+	var i Job
118
+	err := row.Scan(
119
+		&i.ID,
120
+		&i.Kind,
121
+		&i.Payload,
122
+		&i.RunAt,
123
+		&i.Attempts,
124
+		&i.MaxAttempts,
125
+		&i.LastError,
126
+		&i.LockedBy,
127
+		&i.LockedAt,
128
+		&i.CompletedAt,
129
+		&i.FailedAt,
130
+		&i.CreatedAt,
131
+	)
132
+	return i, err
133
+}
134
+
135
+const markJobCompleted = `-- name: MarkJobCompleted :exec
136
+UPDATE jobs
137
+SET completed_at = now(),
138
+    locked_by    = NULL,
139
+    locked_at    = NULL
140
+WHERE id = $1
141
+`
142
+
143
+// On success: clear lock + set completed_at.
144
+func (q *Queries) MarkJobCompleted(ctx context.Context, db DBTX, id int64) error {
145
+	_, err := db.Exec(ctx, markJobCompleted, id)
146
+	return err
147
+}
148
+
149
+const markJobFailed = `-- name: MarkJobFailed :exec
150
+UPDATE jobs
151
+SET failed_at  = now(),
152
+    locked_by  = NULL,
153
+    locked_at  = NULL,
154
+    last_error = $2
155
+WHERE id = $1
156
+`
157
+
158
+type MarkJobFailedParams struct {
159
+	ID        int64
160
+	LastError pgtype.Text
161
+}
162
+
163
+// Terminal failure (attempts hit max). Holds locked_by NULL so the
164
+// row is no longer "in flight" to dashboards; failed_at marks it
165
+// visible to a future poison-job inspector (S34 admin panel).
166
+func (q *Queries) MarkJobFailed(ctx context.Context, db DBTX, arg MarkJobFailedParams) error {
167
+	_, err := db.Exec(ctx, markJobFailed, arg.ID, arg.LastError)
168
+	return err
169
+}
170
+
171
+const purgeCompletedJobs = `-- name: PurgeCompletedJobs :execrows
172
+DELETE FROM jobs
173
+WHERE completed_at IS NOT NULL
174
+  AND completed_at < $1
175
+`
176
+
177
+// Delete completed jobs older than the supplied cutoff. Used by the
178
+// jobs:purge_completed maintenance job. Returns the number of rows
179
+// deleted so the cron handler can log progress.
180
+func (q *Queries) PurgeCompletedJobs(ctx context.Context, db DBTX, completedAt pgtype.Timestamptz) (int64, error) {
181
+	result, err := db.Exec(ctx, purgeCompletedJobs, completedAt)
182
+	if err != nil {
183
+		return 0, err
184
+	}
185
+	return result.RowsAffected(), nil
186
+}
187
+
188
+const purgeFailedJobs = `-- name: PurgeFailedJobs :execrows
189
+DELETE FROM jobs
190
+WHERE failed_at IS NOT NULL
191
+  AND failed_at < $1
192
+`
193
+
194
+// Same as PurgeCompletedJobs but for terminally failed rows. Kept
195
+// separate because operators usually want a longer retention on
196
+// failures to inspect what blew up.
197
+func (q *Queries) PurgeFailedJobs(ctx context.Context, db DBTX, failedAt pgtype.Timestamptz) (int64, error) {
198
+	result, err := db.Exec(ctx, purgeFailedJobs, failedAt)
199
+	if err != nil {
200
+		return 0, err
201
+	}
202
+	return result.RowsAffected(), nil
203
+}
204
+
205
+const rescheduleJob = `-- name: RescheduleJob :exec
206
+UPDATE jobs
207
+SET locked_by  = NULL,
208
+    locked_at  = NULL,
209
+    last_error = $2,
210
+    run_at     = $3
211
+WHERE id = $1
212
+`
213
+
214
+type RescheduleJobParams struct {
215
+	ID        int64
216
+	LastError pgtype.Text
217
+	RunAt     pgtype.Timestamptz
218
+}
219
+
220
+// Transient failure: clear the lock, record the error, push run_at
221
+// forward by the caller-computed backoff. attempts was incremented
222
+// by ClaimJob, so checking attempts >= max_attempts elsewhere decides
223
+// between Reschedule and MarkJobFailed.
224
+func (q *Queries) RescheduleJob(ctx context.Context, db DBTX, arg RescheduleJobParams) error {
225
+	_, err := db.Exec(ctx, rescheduleJob, arg.ID, arg.LastError, arg.RunAt)
226
+	return err
227
+}
internal/meta/sqlc/models.go → internal/worker/sqlc/models.gocopied (76% similarity)
@@ -2,7 +2,7 @@
2
 // versions:
2
 // versions:
3
 //   sqlc v1.31.1
3
 //   sqlc v1.31.1
4
 
4
 
5
-package metadb
5
+package workerdb
6
 
6
 
7
 import (
7
 import (
8
 	"database/sql/driver"
8
 	"database/sql/driver"
@@ -81,6 +81,21 @@ type EmailVerification struct {
81
 	CreatedAt   pgtype.Timestamptz
81
 	CreatedAt   pgtype.Timestamptz
82
 }
82
 }
83
 
83
 
84
+type Job struct {
85
+	ID          int64
86
+	Kind        string
87
+	Payload     []byte
88
+	RunAt       pgtype.Timestamptz
89
+	Attempts    int32
90
+	MaxAttempts int32
91
+	LastError   pgtype.Text
92
+	LockedBy    pgtype.Text
93
+	LockedAt    pgtype.Timestamptz
94
+	CompletedAt pgtype.Timestamptz
95
+	FailedAt    pgtype.Timestamptz
96
+	CreatedAt   pgtype.Timestamptz
97
+}
98
+
84
 type Meta struct {
99
 type Meta struct {
85
 	Key       string
100
 	Key       string
86
 	Value     []byte
101
 	Value     []byte
@@ -96,25 +111,39 @@ type PasswordReset struct {
96
 	CreatedAt pgtype.Timestamptz
111
 	CreatedAt pgtype.Timestamptz
97
 }
112
 }
98
 
113
 
114
+type PushEvent struct {
115
+	ID           int64
116
+	RepoID       int64
117
+	PusherUserID pgtype.Int8
118
+	BeforeSha    string
119
+	AfterSha     string
120
+	Ref          string
121
+	Protocol     string
122
+	RequestID    string
123
+	ProcessedAt  pgtype.Timestamptz
124
+	CreatedAt    pgtype.Timestamptz
125
+}
126
+
99
 type Repo struct {
127
 type Repo struct {
100
-	ID              int64
128
+	ID               int64
101
-	OwnerUserID     pgtype.Int8
129
+	OwnerUserID      pgtype.Int8
102
-	OwnerOrgID      pgtype.Int8
130
+	OwnerOrgID       pgtype.Int8
103
-	Name            string
131
+	Name             string
104
-	Description     string
132
+	Description      string
105
-	Visibility      RepoVisibility
133
+	Visibility       RepoVisibility
106
-	DefaultBranch   string
134
+	DefaultBranch    string
107
-	IsArchived      bool
135
+	IsArchived       bool
108
-	ArchivedAt      pgtype.Timestamptz
136
+	ArchivedAt       pgtype.Timestamptz
109
-	DeletedAt       pgtype.Timestamptz
137
+	DeletedAt        pgtype.Timestamptz
110
-	DiskUsedBytes   int64
138
+	DiskUsedBytes    int64
111
-	ForkOfRepoID    pgtype.Int8
139
+	ForkOfRepoID     pgtype.Int8
112
-	LicenseKey      pgtype.Text
140
+	LicenseKey       pgtype.Text
113
-	PrimaryLanguage pgtype.Text
141
+	PrimaryLanguage  pgtype.Text
114
-	HasIssues       bool
142
+	HasIssues        bool
115
-	HasPulls        bool
143
+	HasPulls         bool
116
-	CreatedAt       pgtype.Timestamptz
144
+	CreatedAt        pgtype.Timestamptz
117
-	UpdatedAt       pgtype.Timestamptz
145
+	UpdatedAt        pgtype.Timestamptz
146
+	DefaultBranchOid pgtype.Text
118
 }
147
 }
119
 
148
 
120
 type User struct {
149
 type User struct {
@@ -213,3 +242,11 @@ type UsernameRedirect struct {
213
 	UserID      int64
242
 	UserID      int64
214
 	ChangedAt   pgtype.Timestamptz
243
 	ChangedAt   pgtype.Timestamptz
215
 }
244
 }
245
+
246
+type WebhookEventsPending struct {
247
+	ID        int64
248
+	RepoID    int64
249
+	EventKind string
250
+	Payload   []byte
251
+	CreatedAt pgtype.Timestamptz
252
+}
internal/worker/sqlc/push_events.sql.goadded
@@ -0,0 +1,95 @@
1
+// Code generated by sqlc. DO NOT EDIT.
2
+// versions:
3
+//   sqlc v1.31.1
4
+// source: push_events.sql
5
+
6
+package workerdb
7
+
8
+import (
9
+	"context"
10
+
11
+	"github.com/jackc/pgx/v5/pgtype"
12
+)
13
+
14
+const getPushEvent = `-- name: GetPushEvent :one
15
+SELECT id, repo_id, pusher_user_id, before_sha, after_sha, ref, protocol, request_id, processed_at, created_at FROM push_events WHERE id = $1
16
+`
17
+
18
+func (q *Queries) GetPushEvent(ctx context.Context, db DBTX, id int64) (PushEvent, error) {
19
+	row := db.QueryRow(ctx, getPushEvent, id)
20
+	var i PushEvent
21
+	err := row.Scan(
22
+		&i.ID,
23
+		&i.RepoID,
24
+		&i.PusherUserID,
25
+		&i.BeforeSha,
26
+		&i.AfterSha,
27
+		&i.Ref,
28
+		&i.Protocol,
29
+		&i.RequestID,
30
+		&i.ProcessedAt,
31
+		&i.CreatedAt,
32
+	)
33
+	return i, err
34
+}
35
+
36
+const insertPushEvent = `-- name: InsertPushEvent :one
37
+
38
+INSERT INTO push_events (
39
+    repo_id, pusher_user_id, before_sha, after_sha, ref, protocol, request_id
40
+) VALUES (
41
+    $1, $6::bigint, $2, $3, $4, $5, COALESCE($7::text, '')
42
+)
43
+RETURNING id, repo_id, pusher_user_id, before_sha, after_sha, ref, protocol, request_id, processed_at, created_at
44
+`
45
+
46
+type InsertPushEventParams struct {
47
+	RepoID       int64
48
+	BeforeSha    string
49
+	AfterSha     string
50
+	Ref          string
51
+	Protocol     string
52
+	PusherUserID pgtype.Int8
53
+	RequestID    pgtype.Text
54
+}
55
+
56
+// SPDX-License-Identifier: AGPL-3.0-or-later
57
+//
58
+// push_events records what landed; the post-receive hook writes; the
59
+// push:process job consumes.
60
+func (q *Queries) InsertPushEvent(ctx context.Context, db DBTX, arg InsertPushEventParams) (PushEvent, error) {
61
+	row := db.QueryRow(ctx, insertPushEvent,
62
+		arg.RepoID,
63
+		arg.BeforeSha,
64
+		arg.AfterSha,
65
+		arg.Ref,
66
+		arg.Protocol,
67
+		arg.PusherUserID,
68
+		arg.RequestID,
69
+	)
70
+	var i PushEvent
71
+	err := row.Scan(
72
+		&i.ID,
73
+		&i.RepoID,
74
+		&i.PusherUserID,
75
+		&i.BeforeSha,
76
+		&i.AfterSha,
77
+		&i.Ref,
78
+		&i.Protocol,
79
+		&i.RequestID,
80
+		&i.ProcessedAt,
81
+		&i.CreatedAt,
82
+	)
83
+	return i, err
84
+}
85
+
86
+const markPushEventProcessed = `-- name: MarkPushEventProcessed :exec
87
+UPDATE push_events
88
+SET processed_at = now()
89
+WHERE id = $1
90
+`
91
+
92
+func (q *Queries) MarkPushEventProcessed(ctx context.Context, db DBTX, id int64) error {
93
+	_, err := db.Exec(ctx, markPushEventProcessed, id)
94
+	return err
95
+}
internal/worker/sqlc/querier.goadded
@@ -0,0 +1,65 @@
1
+// Code generated by sqlc. DO NOT EDIT.
2
+// versions:
3
+//   sqlc v1.31.1
4
+
5
+package workerdb
6
+
7
+import (
8
+	"context"
9
+
10
+	"github.com/jackc/pgx/v5/pgtype"
11
+)
12
+
13
+type Querier interface {
14
+	// Atomically claim the oldest runnable job of a given kind. Workers
15
+	// supply their own instance ID as locked_by so admins can see who's
16
+	// holding what. Returns pgx.ErrNoRows when the queue is empty for
17
+	// that kind.
18
+	ClaimJob(ctx context.Context, db DBTX, arg ClaimJobParams) (Job, error)
19
+	// SPDX-License-Identifier: AGPL-3.0-or-later
20
+	//
21
+	// Jobs queue. The dispatch query uses FOR UPDATE SKIP LOCKED — the
22
+	// canonical Postgres pattern for safe concurrent dequeue. Multiple
23
+	// workers can run ClaimJob concurrently; each gets a different row.
24
+	// Insert a new job. run_at defaults to now() so the job is immediately
25
+	// runnable; pass a future timestamp for delayed work. The returned row
26
+	// carries the assigned ID, which callers persist when the enqueue is
27
+	// done inside a wider transaction.
28
+	EnqueueJob(ctx context.Context, db DBTX, arg EnqueueJobParams) (Job, error)
29
+	// Single-row lookup by id. Used in tests and the admin panel.
30
+	GetJob(ctx context.Context, db DBTX, id int64) (Job, error)
31
+	GetPushEvent(ctx context.Context, db DBTX, id int64) (PushEvent, error)
32
+	// SPDX-License-Identifier: AGPL-3.0-or-later
33
+	//
34
+	// push_events records what landed; the post-receive hook writes; the
35
+	// push:process job consumes.
36
+	InsertPushEvent(ctx context.Context, db DBTX, arg InsertPushEventParams) (PushEvent, error)
37
+	// SPDX-License-Identifier: AGPL-3.0-or-later
38
+	//
39
+	// The S33 webhook deliverer drains this. S14 only inserts; the
40
+	// accumulator is intentionally separate from the generic jobs table so
41
+	// S33 controls its own retry / batching / fan-out shape.
42
+	InsertWebhookEventPending(ctx context.Context, db DBTX, arg InsertWebhookEventPendingParams) (WebhookEventsPending, error)
43
+	// On success: clear lock + set completed_at.
44
+	MarkJobCompleted(ctx context.Context, db DBTX, id int64) error
45
+	// Terminal failure (attempts hit max). Holds locked_by NULL so the
46
+	// row is no longer "in flight" to dashboards; failed_at marks it
47
+	// visible to a future poison-job inspector (S34 admin panel).
48
+	MarkJobFailed(ctx context.Context, db DBTX, arg MarkJobFailedParams) error
49
+	MarkPushEventProcessed(ctx context.Context, db DBTX, id int64) error
50
+	// Delete completed jobs older than the supplied cutoff. Used by the
51
+	// jobs:purge_completed maintenance job. Returns the number of rows
52
+	// deleted so the cron handler can log progress.
53
+	PurgeCompletedJobs(ctx context.Context, db DBTX, completedAt pgtype.Timestamptz) (int64, error)
54
+	// Same as PurgeCompletedJobs but for terminally failed rows. Kept
55
+	// separate because operators usually want a longer retention on
56
+	// failures to inspect what blew up.
57
+	PurgeFailedJobs(ctx context.Context, db DBTX, failedAt pgtype.Timestamptz) (int64, error)
58
+	// Transient failure: clear the lock, record the error, push run_at
59
+	// forward by the caller-computed backoff. attempts was incremented
60
+	// by ClaimJob, so checking attempts >= max_attempts elsewhere decides
61
+	// between Reschedule and MarkJobFailed.
62
+	RescheduleJob(ctx context.Context, db DBTX, arg RescheduleJobParams) error
63
+}
64
+
65
+var _ Querier = (*Queries)(nil)
internal/worker/sqlc/webhook_events_pending.sql.goadded
@@ -0,0 +1,41 @@
1
+// Code generated by sqlc. DO NOT EDIT.
2
+// versions:
3
+//   sqlc v1.31.1
4
+// source: webhook_events_pending.sql
5
+
6
+package workerdb
7
+
8
+import (
9
+	"context"
10
+)
11
+
12
+const insertWebhookEventPending = `-- name: InsertWebhookEventPending :one
13
+
14
+INSERT INTO webhook_events_pending (repo_id, event_kind, payload)
15
+VALUES ($1, $2, $3)
16
+RETURNING id, repo_id, event_kind, payload, created_at
17
+`
18
+
19
+type InsertWebhookEventPendingParams struct {
20
+	RepoID    int64
21
+	EventKind string
22
+	Payload   []byte
23
+}
24
+
25
+// SPDX-License-Identifier: AGPL-3.0-or-later
26
+//
27
+// The S33 webhook deliverer drains this. S14 only inserts; the
28
+// accumulator is intentionally separate from the generic jobs table so
29
+// S33 controls its own retry / batching / fan-out shape.
30
+func (q *Queries) InsertWebhookEventPending(ctx context.Context, db DBTX, arg InsertWebhookEventPendingParams) (WebhookEventsPending, error) {
31
+	row := db.QueryRow(ctx, insertWebhookEventPending, arg.RepoID, arg.EventKind, arg.Payload)
32
+	var i WebhookEventsPending
33
+	err := row.Scan(
34
+		&i.ID,
35
+		&i.RepoID,
36
+		&i.EventKind,
37
+		&i.Payload,
38
+		&i.CreatedAt,
39
+	)
40
+	return i, err
41
+}
sqlc.yamlmodified
@@ -49,3 +49,19 @@ sql:
49
         emit_exact_table_names: false
49
         emit_exact_table_names: false
50
         emit_empty_slices: true
50
         emit_empty_slices: true
51
         emit_methods_with_db_argument: true
51
         emit_methods_with_db_argument: true
52
+
53
+  - engine: postgresql
54
+    schema: internal/migrationsfs/migrations
55
+    queries: internal/worker/queries
56
+    gen:
57
+      go:
58
+        package: workerdb
59
+        out: internal/worker/sqlc
60
+        sql_package: pgx/v5
61
+        emit_json_tags: false
62
+        emit_pointers_for_null_types: false
63
+        emit_prepared_queries: false
64
+        emit_interface: true
65
+        emit_exact_table_names: false
66
+        emit_empty_slices: true
67
+        emit_methods_with_db_argument: true