tenseleyflow/shithub / e95a7a7

Browse files

S14: worker pool, job handlers, prometheus metrics

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
e95a7a71164bf2ad4cf5dc488dad845417801608
Parents
e659af3
Tree
3926ee8

10 changed files

StatusFile+-
M internal/infra/metrics/metrics.go 29 0
A internal/worker/backoff_test.go 59 0
A internal/worker/enqueue.go 57 0
A internal/worker/jobs/jobs_purge.go 64 0
A internal/worker/jobs/push_process.go 153 0
A internal/worker/jobs/push_process_test.go 292 0
A internal/worker/jobs/repo_size_recalc.go 105 0
A internal/worker/pool.go 317 0
A internal/worker/pool_integration_test.go 234 0
A internal/worker/types.go 90 0
internal/infra/metrics/metrics.gomodified
@@ -86,6 +86,32 @@ var (
8686
 	)
8787
 )
8888
 
89
+// Worker metrics. The pool updates these on every dispatch.
90
+var (
91
+	WorkerJobsProcessedTotal = prometheus.NewCounterVec(
92
+		prometheus.CounterOpts{
93
+			Name: "shithub_worker_jobs_processed_total",
94
+			Help: "Worker jobs processed by kind and outcome (ok, retry, failed, poison).",
95
+		},
96
+		[]string{"kind", "outcome"},
97
+	)
98
+	WorkerJobDurationSeconds = prometheus.NewHistogramVec(
99
+		prometheus.HistogramOpts{
100
+			Name:    "shithub_worker_job_duration_seconds",
101
+			Help:    "Worker handler latency by kind.",
102
+			Buckets: prometheus.ExponentialBuckets(0.005, 2.5, 12),
103
+		},
104
+		[]string{"kind"},
105
+	)
106
+	WorkerInFlight = prometheus.NewGaugeVec(
107
+		prometheus.GaugeOpts{
108
+			Name: "shithub_worker_in_flight",
109
+			Help: "Worker handler invocations currently in flight by kind.",
110
+		},
111
+		[]string{"kind"},
112
+	)
113
+)
114
+
89115
 func init() {
90116
 	Registry.MustRegister(
91117
 		HTTPRequestsTotal,
@@ -96,6 +122,9 @@ func init() {
96122
 		DBConnsIdle,
97123
 		DBConnsTotal,
98124
 		DBAcquireWaitDurationTotal,
125
+		WorkerJobsProcessedTotal,
126
+		WorkerJobDurationSeconds,
127
+		WorkerInFlight,
99128
 	)
100129
 }
101130
 
internal/worker/backoff_test.goadded
@@ -0,0 +1,59 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package worker_test
4
+
5
+import (
6
+	"testing"
7
+	"time"
8
+
9
+	"github.com/tenseleyFlow/shithub/internal/worker"
10
+)
11
+
12
+func TestBackoff_Doubles(t *testing.T) {
13
+	t.Parallel()
14
+	cases := []struct {
15
+		attempts int
16
+		want     time.Duration
17
+	}{
18
+		{1, 30 * time.Second},
19
+		{2, 60 * time.Second},
20
+		{3, 120 * time.Second},
21
+		{4, 240 * time.Second},
22
+		{5, 480 * time.Second},
23
+		{6, 960 * time.Second},
24
+		{7, 1920 * time.Second},
25
+		{8, time.Hour},  // capped
26
+		{10, time.Hour}, // still capped
27
+	}
28
+	for _, c := range cases {
29
+		got := worker.Backoff(c.attempts, nil)
30
+		if got != c.want {
31
+			t.Errorf("Backoff(%d) = %v, want %v", c.attempts, got, c.want)
32
+		}
33
+	}
34
+}
35
+
36
+func TestBackoff_JitterStaysInBand(t *testing.T) {
37
+	t.Parallel()
38
+	const attempts = 4
39
+	base := worker.Backoff(attempts, nil)
40
+	low := time.Duration(float64(base) * 0.8)
41
+	high := time.Duration(float64(base) * 1.2)
42
+	for i := 0; i < 100; i++ {
43
+		j := float64(i) / 100.0
44
+		got := worker.Backoff(attempts, func() float64 { return j })
45
+		if got < low || got >= high {
46
+			t.Fatalf("jitter %v: got %v, want in [%v, %v)", j, got, low, high)
47
+		}
48
+	}
49
+}
50
+
51
+func TestBackoff_NonPositiveAttemptsClampsToOne(t *testing.T) {
52
+	t.Parallel()
53
+	if got := worker.Backoff(0, nil); got != 30*time.Second {
54
+		t.Errorf("Backoff(0) = %v, want 30s", got)
55
+	}
56
+	if got := worker.Backoff(-5, nil); got != 30*time.Second {
57
+		t.Errorf("Backoff(-5) = %v, want 30s", got)
58
+	}
59
+}
internal/worker/enqueue.goadded
@@ -0,0 +1,57 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package worker
4
+
5
+import (
6
+	"context"
7
+	"encoding/json"
8
+	"fmt"
9
+
10
+	"github.com/jackc/pgx/v5/pgtype"
11
+
12
+	workerdb "github.com/tenseleyFlow/shithub/internal/worker/sqlc"
13
+)
14
+
15
+// DBTX matches the pgx interface that sqlc-generated methods accept
16
+// (anything providing Exec/Query/QueryRow). The pool, a tx, and the
17
+// helpers in dbtest all satisfy it.
18
+type DBTX = workerdb.DBTX
19
+
20
+// EnqueueOptions tunes a single enqueue. RunAt zero means "now"; pass a
21
+// future time to delay the first run. MaxAttempts zero means use the
22
+// table default (5).
23
+type EnqueueOptions struct {
24
+	RunAt       pgtype.Timestamptz
25
+	MaxAttempts int32
26
+}
27
+
28
+// Enqueue inserts a job row and returns its id. Callers running inside a
29
+// transaction should pass the tx as db so the enqueue is rolled back
30
+// alongside any related state changes; same goes for the NOTIFY (issued
31
+// separately by the caller via Notify).
32
+func Enqueue(ctx context.Context, db DBTX, kind Kind, payload any, opts EnqueueOptions) (int64, error) {
33
+	body, err := json.Marshal(payload)
34
+	if err != nil {
35
+		return 0, fmt.Errorf("worker: marshal payload: %w", err)
36
+	}
37
+	q := workerdb.New()
38
+	row, err := q.EnqueueJob(ctx, db, workerdb.EnqueueJobParams{
39
+		Kind:        string(kind),
40
+		Payload:     body,
41
+		RunAt:       opts.RunAt,
42
+		MaxAttempts: pgtype.Int4{Int32: opts.MaxAttempts, Valid: opts.MaxAttempts > 0},
43
+	})
44
+	if err != nil {
45
+		return 0, fmt.Errorf("worker: enqueue %s: %w", kind, err)
46
+	}
47
+	return row.ID, nil
48
+}
49
+
50
+// Notify wakes any LISTENing workers. Safe to call after a successful
51
+// commit; if called inside a tx, the NOTIFY only delivers when the tx
52
+// commits (Postgres semantics). Errors are non-fatal — workers also poll
53
+// at a slow interval as a backstop.
54
+func Notify(ctx context.Context, db DBTX) error {
55
+	_, err := db.Exec(ctx, "SELECT pg_notify($1, '')", NotifyChannel)
56
+	return err
57
+}
internal/worker/jobs/jobs_purge.goadded
@@ -0,0 +1,64 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package jobs
4
+
5
+import (
6
+	"context"
7
+	"encoding/json"
8
+	"fmt"
9
+	"log/slog"
10
+	"time"
11
+
12
+	"github.com/jackc/pgx/v5/pgtype"
13
+	"github.com/jackc/pgx/v5/pgxpool"
14
+
15
+	"github.com/tenseleyFlow/shithub/internal/worker"
16
+	workerdb "github.com/tenseleyFlow/shithub/internal/worker/sqlc"
17
+)
18
+
19
+// JobsPurgeDeps wires the purge handler.
20
+type JobsPurgeDeps struct {
21
+	Pool   *pgxpool.Pool
22
+	Logger *slog.Logger
23
+}
24
+
25
+// JobsPurgePayload — empty object is fine; defaults below.
26
+type JobsPurgePayload struct {
27
+	CompletedOlderThanDays int `json:"completed_older_than_days,omitempty"`
28
+	FailedOlderThanDays    int `json:"failed_older_than_days,omitempty"`
29
+}
30
+
31
+// JobsPurge deletes completed/failed jobs older than the configured
32
+// retention. Retention defaults: 14 days completed, 30 days failed.
33
+// Designed to run as a cron job (S26 ships scheduling); for now it can
34
+// be enqueued ad-hoc by the operator.
35
+func JobsPurge(deps JobsPurgeDeps) worker.Handler {
36
+	return func(ctx context.Context, raw json.RawMessage) error {
37
+		p := JobsPurgePayload{}
38
+		if len(raw) > 0 {
39
+			_ = json.Unmarshal(raw, &p) // tolerant of empty/malformed; we have defaults
40
+		}
41
+		if p.CompletedOlderThanDays <= 0 {
42
+			p.CompletedOlderThanDays = 14
43
+		}
44
+		if p.FailedOlderThanDays <= 0 {
45
+			p.FailedOlderThanDays = 30
46
+		}
47
+		now := time.Now()
48
+		q := workerdb.New()
49
+		completedCutoff := pgtype.Timestamptz{Time: now.Add(-time.Duration(p.CompletedOlderThanDays) * 24 * time.Hour), Valid: true}
50
+		failedCutoff := pgtype.Timestamptz{Time: now.Add(-time.Duration(p.FailedOlderThanDays) * 24 * time.Hour), Valid: true}
51
+
52
+		nC, err := q.PurgeCompletedJobs(ctx, deps.Pool, completedCutoff)
53
+		if err != nil {
54
+			return fmt.Errorf("purge completed: %w", err)
55
+		}
56
+		nF, err := q.PurgeFailedJobs(ctx, deps.Pool, failedCutoff)
57
+		if err != nil {
58
+			return fmt.Errorf("purge failed: %w", err)
59
+		}
60
+		deps.Logger.InfoContext(ctx, "jobs:purge",
61
+			"completed_deleted", nC, "failed_deleted", nF)
62
+		return nil
63
+	}
64
+}
internal/worker/jobs/push_process.goadded
@@ -0,0 +1,153 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+// Package jobs holds the concrete worker handlers wired into the pool
4
+// at boot. Each file is one kind; handlers stay short and idempotent.
5
+package jobs
6
+
7
+import (
8
+	"context"
9
+	"encoding/json"
10
+	"errors"
11
+	"fmt"
12
+	"log/slog"
13
+	"strings"
14
+
15
+	"github.com/jackc/pgx/v5"
16
+	"github.com/jackc/pgx/v5/pgtype"
17
+	"github.com/jackc/pgx/v5/pgxpool"
18
+
19
+	"github.com/tenseleyFlow/shithub/internal/infra/storage"
20
+	reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
21
+	"github.com/tenseleyFlow/shithub/internal/worker"
22
+	workerdb "github.com/tenseleyFlow/shithub/internal/worker/sqlc"
23
+)
24
+
25
+// PushProcessDeps wires the data this handler needs.
26
+type PushProcessDeps struct {
27
+	Pool   *pgxpool.Pool
28
+	RepoFS *storage.RepoFS
29
+	Logger *slog.Logger
30
+}
31
+
32
+// PushProcessPayload is the JSON shape post-receive enqueues.
33
+type PushProcessPayload struct {
34
+	PushEventID int64 `json:"push_event_id"`
35
+}
36
+
37
+// PushProcess returns a handler that:
38
+//
39
+//  1. Loads the push_event by id.
40
+//  2. Updates repos.default_branch_oid if the ref matches the default
41
+//     branch and the after_sha is non-zero.
42
+//  3. Enqueues a repo:size_recalc job (separate handler — du is
43
+//     potentially slow, isolate it).
44
+//  4. Inserts a webhook_events_pending row carrying the push payload
45
+//     (S33 deliverer drains).
46
+//  5. Marks the push_event processed.
47
+//
48
+// The handler is idempotent on processed_at: re-runs after the first
49
+// successful run are no-ops.
50
+func PushProcess(deps PushProcessDeps) worker.Handler {
51
+	return func(ctx context.Context, raw json.RawMessage) error {
52
+		var p PushProcessPayload
53
+		if err := json.Unmarshal(raw, &p); err != nil {
54
+			return worker.PoisonError(fmt.Errorf("bad payload: %w", err))
55
+		}
56
+		if p.PushEventID == 0 {
57
+			return worker.PoisonError(errors.New("missing push_event_id"))
58
+		}
59
+
60
+		wq := workerdb.New()
61
+		event, err := wq.GetPushEvent(ctx, deps.Pool, p.PushEventID)
62
+		if err != nil {
63
+			if errors.Is(err, pgx.ErrNoRows) {
64
+				return worker.PoisonError(fmt.Errorf("push_event %d not found", p.PushEventID))
65
+			}
66
+			return fmt.Errorf("load push_event: %w", err)
67
+		}
68
+		if event.ProcessedAt.Valid {
69
+			return nil // idempotent: already done.
70
+		}
71
+
72
+		rq := reposdb.New()
73
+		repo, err := rq.GetRepoByID(ctx, deps.Pool, event.RepoID)
74
+		if err != nil {
75
+			return fmt.Errorf("load repo: %w", err)
76
+		}
77
+
78
+		// 2: derive default-branch OID. The ref looks like "refs/heads/<name>".
79
+		const refPrefix = "refs/heads/"
80
+		if strings.HasPrefix(event.Ref, refPrefix) {
81
+			branch := event.Ref[len(refPrefix):]
82
+			if branch == repo.DefaultBranch {
83
+				newOID := event.AfterSha
84
+				if isZeroSHA(newOID) {
85
+					// branch deleted — clear oid.
86
+					_ = rq.UpdateRepoDefaultBranchOID(ctx, deps.Pool, reposdb.UpdateRepoDefaultBranchOIDParams{
87
+						ID:               repo.ID,
88
+						DefaultBranchOid: pgtype.Text{Valid: false},
89
+					})
90
+				} else {
91
+					_ = rq.UpdateRepoDefaultBranchOID(ctx, deps.Pool, reposdb.UpdateRepoDefaultBranchOIDParams{
92
+						ID:               repo.ID,
93
+						DefaultBranchOid: pgtype.Text{String: newOID, Valid: true},
94
+					})
95
+				}
96
+			}
97
+		}
98
+
99
+		// 3: enqueue size recalc — separate kind, runs independently.
100
+		if _, err := worker.Enqueue(ctx, deps.Pool, worker.KindRepoSizeRecalc,
101
+			map[string]any{"repo_id": repo.ID},
102
+			worker.EnqueueOptions{}); err != nil {
103
+			deps.Logger.WarnContext(ctx, "push:process: enqueue size_recalc",
104
+				"push_event_id", event.ID, "error", err)
105
+		}
106
+
107
+		// 4: stash the payload for S33 to drain.
108
+		body, _ := json.Marshal(map[string]any{
109
+			"push_event_id":  event.ID,
110
+			"repo_id":        event.RepoID,
111
+			"pusher_user_id": int64ValueOrZero(event.PusherUserID),
112
+			"before_sha":     event.BeforeSha,
113
+			"after_sha":      event.AfterSha,
114
+			"ref":            event.Ref,
115
+			"protocol":       event.Protocol,
116
+			"request_id":     event.RequestID,
117
+		})
118
+		if _, err := wq.InsertWebhookEventPending(ctx, deps.Pool, workerdb.InsertWebhookEventPendingParams{
119
+			RepoID:    event.RepoID,
120
+			EventKind: "push",
121
+			Payload:   body,
122
+		}); err != nil {
123
+			return fmt.Errorf("insert webhook pending: %w", err)
124
+		}
125
+
126
+		// 5: mark processed last so a partial failure earlier triggers a
127
+		// retry that retries the whole pipeline. Idempotency is via the
128
+		// processed_at guard at the top.
129
+		if err := wq.MarkPushEventProcessed(ctx, deps.Pool, event.ID); err != nil {
130
+			return fmt.Errorf("mark processed: %w", err)
131
+		}
132
+
133
+		// Wake any size_recalc workers waiting on LISTEN.
134
+		_ = worker.Notify(ctx, deps.Pool)
135
+		return nil
136
+	}
137
+}
138
+
139
+func isZeroSHA(s string) bool {
140
+	for _, c := range s {
141
+		if c != '0' {
142
+			return false
143
+		}
144
+	}
145
+	return s != ""
146
+}
147
+
148
+func int64ValueOrZero(p pgtype.Int8) int64 {
149
+	if p.Valid {
150
+		return p.Int64
151
+	}
152
+	return 0
153
+}
internal/worker/jobs/push_process_test.goadded
@@ -0,0 +1,292 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package jobs_test
4
+
5
+import (
6
+	"context"
7
+	"encoding/json"
8
+	"log/slog"
9
+	"os"
10
+	"strings"
11
+	"testing"
12
+	"time"
13
+
14
+	"github.com/jackc/pgx/v5/pgtype"
15
+
16
+	"github.com/tenseleyFlow/shithub/internal/auth/audit"
17
+	"github.com/tenseleyFlow/shithub/internal/auth/throttle"
18
+	"github.com/tenseleyFlow/shithub/internal/infra/storage"
19
+	"github.com/tenseleyFlow/shithub/internal/repos"
20
+	repogit "github.com/tenseleyFlow/shithub/internal/repos/git"
21
+	reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
22
+	"github.com/tenseleyFlow/shithub/internal/testing/dbtest"
23
+	usersdb "github.com/tenseleyFlow/shithub/internal/users/sqlc"
24
+	"github.com/tenseleyFlow/shithub/internal/worker/jobs"
25
+	workerdb "github.com/tenseleyFlow/shithub/internal/worker/sqlc"
26
+)
27
+
28
+const fixtureHash = "$argon2id$v=19$m=16384,t=1,p=1$" +
29
+	"AAAAAAAAAAAAAAAA$" +
30
+	"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
31
+
32
+// TestPushProcess_HappyPath exercises the full push:process pipeline
33
+// against real Postgres + real bare repo. Verifies that the handler:
34
+//   - sets repos.default_branch_oid when the ref is the default branch,
35
+//   - inserts a webhook_events_pending row,
36
+//   - marks the push_event processed_at,
37
+//   - enqueues a follow-up repo:size_recalc job.
38
+func TestPushProcess_HappyPath(t *testing.T) {
39
+	t.Parallel()
40
+	pool := dbtest.NewTestDB(t)
41
+	root := t.TempDir()
42
+	rfs, err := storage.NewRepoFS(root)
43
+	if err != nil {
44
+		t.Fatalf("NewRepoFS: %v", err)
45
+	}
46
+
47
+	// User + verified email so repos.Create accepts a templated initial
48
+	// commit (the create path needs author identity for plumbing).
49
+	uq := usersdb.New()
50
+	user, err := uq.CreateUser(context.Background(), pool, usersdb.CreateUserParams{
51
+		Username: "alice", DisplayName: "alice", PasswordHash: fixtureHash,
52
+	})
53
+	if err != nil {
54
+		t.Fatalf("CreateUser: %v", err)
55
+	}
56
+	em, err := uq.CreateUserEmail(context.Background(), pool, usersdb.CreateUserEmailParams{
57
+		UserID: user.ID, Email: "alice@example.com", IsPrimary: true, Verified: true,
58
+	})
59
+	if err != nil {
60
+		t.Fatalf("CreateUserEmail: %v", err)
61
+	}
62
+	_ = uq.LinkUserPrimaryEmail(context.Background(), pool, usersdb.LinkUserPrimaryEmailParams{
63
+		ID: user.ID, PrimaryEmailID: pgtype.Int8{Int64: em.ID, Valid: true},
64
+	})
65
+
66
+	res, err := repos.Create(context.Background(), repos.Deps{
67
+		Pool: pool, RepoFS: rfs, Audit: audit.NewRecorder(), Limiter: throttle.NewLimiter(),
68
+	}, repos.Params{
69
+		OwnerUserID: user.ID, OwnerUsername: "alice",
70
+		Name: "demo", Visibility: "public", InitReadme: true,
71
+	})
72
+	if err != nil {
73
+		t.Fatalf("repos.Create: %v", err)
74
+	}
75
+
76
+	// Insert a push_event covering the initial commit on refs/heads/trunk.
77
+	wq := workerdb.New()
78
+	event, err := wq.InsertPushEvent(context.Background(), pool, workerdb.InsertPushEventParams{
79
+		RepoID:       res.Repo.ID,
80
+		BeforeSha:    strings.Repeat("0", 40),
81
+		AfterSha:     res.InitialCommitOID,
82
+		Ref:          "refs/heads/trunk",
83
+		Protocol:     "ssh",
84
+		PusherUserID: pgtype.Int8{Int64: user.ID, Valid: true},
85
+		RequestID:    pgtype.Text{String: "test-req", Valid: true},
86
+	})
87
+	if err != nil {
88
+		t.Fatalf("InsertPushEvent: %v", err)
89
+	}
90
+
91
+	// Run the handler directly.
92
+	logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelWarn}))
93
+	handler := jobs.PushProcess(jobs.PushProcessDeps{Pool: pool, RepoFS: rfs, Logger: logger})
94
+	payload, _ := json.Marshal(jobs.PushProcessPayload{PushEventID: event.ID})
95
+	if err := handler(context.Background(), payload); err != nil {
96
+		t.Fatalf("push:process: %v", err)
97
+	}
98
+
99
+	// Default branch OID should now match the initial commit.
100
+	rq := reposdb.New()
101
+	repo, err := rq.GetRepoByID(context.Background(), pool, res.Repo.ID)
102
+	if err != nil {
103
+		t.Fatalf("GetRepoByID: %v", err)
104
+	}
105
+	if !repo.DefaultBranchOid.Valid || repo.DefaultBranchOid.String != res.InitialCommitOID {
106
+		t.Errorf("default_branch_oid = %v, want %q", repo.DefaultBranchOid, res.InitialCommitOID)
107
+	}
108
+
109
+	// Push event marked processed.
110
+	got, err := wq.GetPushEvent(context.Background(), pool, event.ID)
111
+	if err != nil {
112
+		t.Fatalf("GetPushEvent: %v", err)
113
+	}
114
+	if !got.ProcessedAt.Valid {
115
+		t.Errorf("processed_at not set")
116
+	}
117
+
118
+	// Webhook event row exists.
119
+	var webhookCount int
120
+	row := pool.QueryRow(context.Background(),
121
+		`SELECT count(*) FROM webhook_events_pending WHERE repo_id = $1 AND event_kind = 'push'`,
122
+		res.Repo.ID)
123
+	if err := row.Scan(&webhookCount); err != nil {
124
+		t.Fatalf("count webhook_events_pending: %v", err)
125
+	}
126
+	if webhookCount != 1 {
127
+		t.Errorf("webhook_events_pending count = %d, want 1", webhookCount)
128
+	}
129
+
130
+	// repo:size_recalc job enqueued.
131
+	var sizeJobCount int
132
+	row = pool.QueryRow(context.Background(),
133
+		`SELECT count(*) FROM jobs WHERE kind = 'repo:size_recalc' AND completed_at IS NULL AND failed_at IS NULL`)
134
+	_ = row.Scan(&sizeJobCount)
135
+	if sizeJobCount < 1 {
136
+		t.Errorf("repo:size_recalc not enqueued (count=%d)", sizeJobCount)
137
+	}
138
+
139
+	// Sanity: re-running is a no-op (idempotent on processed_at).
140
+	if err := handler(context.Background(), payload); err != nil {
141
+		t.Fatalf("re-run: %v", err)
142
+	}
143
+}
144
+
145
+// TestRepoSizeRecalc_UpdatesDiskUsedBytes drives the size recalc end-to-
146
+// end against a real repo and verifies disk_used_bytes is non-zero.
147
+func TestRepoSizeRecalc_UpdatesDiskUsedBytes(t *testing.T) {
148
+	t.Parallel()
149
+	pool := dbtest.NewTestDB(t)
150
+	root := t.TempDir()
151
+	rfs, err := storage.NewRepoFS(root)
152
+	if err != nil {
153
+		t.Fatalf("NewRepoFS: %v", err)
154
+	}
155
+
156
+	uq := usersdb.New()
157
+	user, _ := uq.CreateUser(context.Background(), pool, usersdb.CreateUserParams{
158
+		Username: "bob", DisplayName: "bob", PasswordHash: fixtureHash,
159
+	})
160
+	em, _ := uq.CreateUserEmail(context.Background(), pool, usersdb.CreateUserEmailParams{
161
+		UserID: user.ID, Email: "bob@example.com", IsPrimary: true, Verified: true,
162
+	})
163
+	_ = uq.LinkUserPrimaryEmail(context.Background(), pool, usersdb.LinkUserPrimaryEmailParams{
164
+		ID: user.ID, PrimaryEmailID: pgtype.Int8{Int64: em.ID, Valid: true},
165
+	})
166
+	res, err := repos.Create(context.Background(), repos.Deps{
167
+		Pool: pool, RepoFS: rfs, Audit: audit.NewRecorder(), Limiter: throttle.NewLimiter(),
168
+	}, repos.Params{
169
+		OwnerUserID: user.ID, OwnerUsername: "bob",
170
+		Name: "demo", Visibility: "public", InitReadme: true,
171
+	})
172
+	if err != nil {
173
+		t.Fatalf("repos.Create: %v", err)
174
+	}
175
+
176
+	logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelWarn}))
177
+	handler := jobs.RepoSizeRecalc(jobs.RepoSizeRecalcDeps{Pool: pool, RepoFS: rfs, Logger: logger})
178
+	payload, _ := json.Marshal(jobs.RepoSizeRecalcPayload{RepoID: res.Repo.ID})
179
+	if err := handler(context.Background(), payload); err != nil {
180
+		t.Fatalf("repo:size_recalc: %v", err)
181
+	}
182
+
183
+	rq := reposdb.New()
184
+	repo, _ := rq.GetRepoByID(context.Background(), pool, res.Repo.ID)
185
+	if repo.DiskUsedBytes <= 0 {
186
+		t.Errorf("disk_used_bytes = %d, want > 0", repo.DiskUsedBytes)
187
+	}
188
+}
189
+
190
+// TestPushProcess_BranchNotDefault: a push to refs/heads/feat shouldn't
191
+// overwrite default_branch_oid on a repo whose default_branch is trunk.
192
+func TestPushProcess_BranchNotDefault(t *testing.T) {
193
+	t.Parallel()
194
+	pool := dbtest.NewTestDB(t)
195
+	root := t.TempDir()
196
+	rfs, _ := storage.NewRepoFS(root)
197
+	uq := usersdb.New()
198
+	user, _ := uq.CreateUser(context.Background(), pool, usersdb.CreateUserParams{
199
+		Username: "carol", DisplayName: "carol", PasswordHash: fixtureHash,
200
+	})
201
+	em, _ := uq.CreateUserEmail(context.Background(), pool, usersdb.CreateUserEmailParams{
202
+		UserID: user.ID, Email: "carol@example.com", IsPrimary: true, Verified: true,
203
+	})
204
+	_ = uq.LinkUserPrimaryEmail(context.Background(), pool, usersdb.LinkUserPrimaryEmailParams{
205
+		ID: user.ID, PrimaryEmailID: pgtype.Int8{Int64: em.ID, Valid: true},
206
+	})
207
+	res, _ := repos.Create(context.Background(), repos.Deps{
208
+		Pool: pool, RepoFS: rfs, Audit: audit.NewRecorder(), Limiter: throttle.NewLimiter(),
209
+	}, repos.Params{
210
+		OwnerUserID: user.ID, OwnerUsername: "carol",
211
+		Name: "demo", Visibility: "public", InitReadme: true,
212
+	})
213
+
214
+	wq := workerdb.New()
215
+	event, err := wq.InsertPushEvent(context.Background(), pool, workerdb.InsertPushEventParams{
216
+		RepoID:       res.Repo.ID,
217
+		BeforeSha:    strings.Repeat("0", 40),
218
+		AfterSha:     "deadbeef" + strings.Repeat("0", 32),
219
+		Ref:          "refs/heads/feat",
220
+		Protocol:     "ssh",
221
+		PusherUserID: pgtype.Int8{Int64: user.ID, Valid: true},
222
+	})
223
+	if err != nil {
224
+		t.Fatalf("InsertPushEvent: %v", err)
225
+	}
226
+	logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelWarn}))
227
+	handler := jobs.PushProcess(jobs.PushProcessDeps{Pool: pool, RepoFS: rfs, Logger: logger})
228
+	payload, _ := json.Marshal(jobs.PushProcessPayload{PushEventID: event.ID})
229
+	if err := handler(context.Background(), payload); err != nil {
230
+		t.Fatalf("push:process: %v", err)
231
+	}
232
+	rq := reposdb.New()
233
+	repo, _ := rq.GetRepoByID(context.Background(), pool, res.Repo.ID)
234
+	if repo.DefaultBranchOid.Valid {
235
+		t.Errorf("default_branch_oid set to %q for non-default ref", repo.DefaultBranchOid.String)
236
+	}
237
+}
238
+
239
+// Sanity check that the package's core helpers don't break under nil
240
+// payload (defensive — production hooks always send populated payloads).
241
+func TestPushProcess_RejectsBadPayload(t *testing.T) {
242
+	t.Parallel()
243
+	pool := dbtest.NewTestDB(t)
244
+	rfs, _ := storage.NewRepoFS(t.TempDir())
245
+	logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelWarn}))
246
+	handler := jobs.PushProcess(jobs.PushProcessDeps{Pool: pool, RepoFS: rfs, Logger: logger})
247
+
248
+	// Empty payload → poison.
249
+	if err := handler(context.Background(), json.RawMessage(`{}`)); err == nil {
250
+		t.Errorf("empty payload: want error, got nil")
251
+	}
252
+	// Reference unknown event → poison.
253
+	if err := handler(context.Background(), json.RawMessage(`{"push_event_id": 99999}`)); err == nil {
254
+		t.Errorf("missing event: want error, got nil")
255
+	}
256
+}
257
+
258
+// Belt + braces: when InitReadme=true the initial commit has a real OID
259
+// matching the on-disk HEAD, which validates our test fixtures match
260
+// reality.
261
+func TestRepoFixture_HeadMatchesInitialCommit(t *testing.T) {
262
+	t.Parallel()
263
+	pool := dbtest.NewTestDB(t)
264
+	root := t.TempDir()
265
+	rfs, _ := storage.NewRepoFS(root)
266
+	uq := usersdb.New()
267
+	user, _ := uq.CreateUser(context.Background(), pool, usersdb.CreateUserParams{
268
+		Username: "dave", DisplayName: "dave", PasswordHash: fixtureHash,
269
+	})
270
+	em, _ := uq.CreateUserEmail(context.Background(), pool, usersdb.CreateUserEmailParams{
271
+		UserID: user.ID, Email: "dave@example.com", IsPrimary: true, Verified: true,
272
+	})
273
+	_ = uq.LinkUserPrimaryEmail(context.Background(), pool, usersdb.LinkUserPrimaryEmailParams{
274
+		ID: user.ID, PrimaryEmailID: pgtype.Int8{Int64: em.ID, Valid: true},
275
+	})
276
+	res, _ := repos.Create(context.Background(), repos.Deps{
277
+		Pool: pool, RepoFS: rfs, Audit: audit.NewRecorder(), Limiter: throttle.NewLimiter(),
278
+	}, repos.Params{
279
+		OwnerUserID: user.ID, OwnerUsername: "dave",
280
+		Name: "demo", Visibility: "public", InitReadme: true,
281
+	})
282
+	gitDir, _ := rfs.RepoPath("dave", "demo")
283
+	head, found, err := repogit.HeadOf(context.Background(), gitDir, "trunk")
284
+	if err != nil || !found {
285
+		t.Fatalf("HeadOf trunk: found=%v err=%v", found, err)
286
+	}
287
+	if head.OID != res.InitialCommitOID {
288
+		t.Errorf("HeadOf.OID = %q, want %q", head.OID, res.InitialCommitOID)
289
+	}
290
+	// brevity check so the linter is happy with imports.
291
+	_ = time.Second
292
+}
internal/worker/jobs/repo_size_recalc.goadded
@@ -0,0 +1,105 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package jobs
4
+
5
+import (
6
+	"context"
7
+	"encoding/json"
8
+	"errors"
9
+	"fmt"
10
+	"io/fs"
11
+	"log/slog"
12
+	"path/filepath"
13
+
14
+	"github.com/jackc/pgx/v5"
15
+	"github.com/jackc/pgx/v5/pgxpool"
16
+
17
+	"github.com/tenseleyFlow/shithub/internal/infra/storage"
18
+	reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
19
+	"github.com/tenseleyFlow/shithub/internal/worker"
20
+)
21
+
22
+// RepoSizeRecalcDeps wires the size-recalc handler.
23
+type RepoSizeRecalcDeps struct {
24
+	Pool   *pgxpool.Pool
25
+	RepoFS *storage.RepoFS
26
+	Logger *slog.Logger
27
+}
28
+
29
+// RepoSizeRecalcPayload — { "repo_id": <int> }.
30
+type RepoSizeRecalcPayload struct {
31
+	RepoID int64 `json:"repo_id"`
32
+}
33
+
34
+// RepoSizeRecalc walks the bare-repo tree and updates
35
+// repos.disk_used_bytes. Walked in pure Go (no shelling out to du) so
36
+// we get a portable sum and don't have to wrangle stderr from a
37
+// blocked subprocess.
38
+//
39
+// Concurrent runs may compute slightly different sizes if a push lands
40
+// mid-walk; that's acceptable — the *last* one wins, and quotas (post-
41
+// MVP) tolerate small drift.
42
+func RepoSizeRecalc(deps RepoSizeRecalcDeps) worker.Handler {
43
+	return func(ctx context.Context, raw json.RawMessage) error {
44
+		var p RepoSizeRecalcPayload
45
+		if err := json.Unmarshal(raw, &p); err != nil {
46
+			return worker.PoisonError(fmt.Errorf("bad payload: %w", err))
47
+		}
48
+		if p.RepoID == 0 {
49
+			return worker.PoisonError(errors.New("missing repo_id"))
50
+		}
51
+
52
+		rq := reposdb.New()
53
+		ownerRow, err := rq.GetRepoOwnerUsernameByID(ctx, deps.Pool, p.RepoID)
54
+		if err != nil {
55
+			if errors.Is(err, pgx.ErrNoRows) {
56
+				return worker.PoisonError(fmt.Errorf("repo %d not found", p.RepoID))
57
+			}
58
+			return fmt.Errorf("load repo: %w", err)
59
+		}
60
+
61
+		gitDir, err := deps.RepoFS.RepoPath(ownerRow.OwnerUsername, ownerRow.RepoName)
62
+		if err != nil {
63
+			return worker.PoisonError(fmt.Errorf("repo path: %w", err))
64
+		}
65
+		size, err := walkSize(ctx, gitDir)
66
+		if err != nil {
67
+			return fmt.Errorf("walk size: %w", err)
68
+		}
69
+		if err := rq.UpdateRepoDiskUsed(ctx, deps.Pool, reposdb.UpdateRepoDiskUsedParams{
70
+			ID:            p.RepoID,
71
+			DiskUsedBytes: size,
72
+		}); err != nil {
73
+			return fmt.Errorf("update disk_used: %w", err)
74
+		}
75
+		return nil
76
+	}
77
+}
78
+
79
+// walkSize sums the byte size of every regular file under root. Walks
80
+// once; doesn't follow symlinks (we never create any inside a bare
81
+// repo). Honors ctx so a long-running walk on a giant repo can be
82
+// cancelled by graceful shutdown.
83
+func walkSize(ctx context.Context, root string) (int64, error) {
84
+	var total int64
85
+	err := filepath.WalkDir(root, func(path string, d fs.DirEntry, walkErr error) error {
86
+		if walkErr != nil {
87
+			return walkErr
88
+		}
89
+		if err := ctx.Err(); err != nil {
90
+			return err
91
+		}
92
+		if d.IsDir() {
93
+			return nil
94
+		}
95
+		info, err := d.Info()
96
+		if err != nil {
97
+			return err
98
+		}
99
+		if info.Mode().IsRegular() {
100
+			total += info.Size()
101
+		}
102
+		return nil
103
+	})
104
+	return total, err
105
+}
internal/worker/pool.goadded
@@ -0,0 +1,317 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package worker
4
+
5
+import (
6
+	"context"
7
+	"encoding/json"
8
+	"errors"
9
+	"fmt"
10
+	"log/slog"
11
+	mrand "math/rand"
12
+	"os"
13
+	"strconv"
14
+	"sync"
15
+	"time"
16
+
17
+	"github.com/jackc/pgx/v5"
18
+	"github.com/jackc/pgx/v5/pgtype"
19
+	"github.com/jackc/pgx/v5/pgxpool"
20
+
21
+	"github.com/tenseleyFlow/shithub/internal/infra/metrics"
22
+	workerdb "github.com/tenseleyFlow/shithub/internal/worker/sqlc"
23
+)
24
+
25
+// PoolConfig configures Pool. Leave fields zero for the documented
26
+// defaults.
27
+type PoolConfig struct {
28
+	Workers    int           // default 4
29
+	IdlePoll   time.Duration // default 5s — backstop when LISTEN drops a wake
30
+	JobTimeout time.Duration // default 5min, applied per-job via context
31
+	InstanceID string        // default "<hostname>:<pid>"
32
+	Logger     *slog.Logger  // default discards
33
+}
34
+
35
+// Pool dispatches jobs from the queue. Construct via NewPool, register
36
+// handlers via Register, run via Run.
37
+type Pool struct {
38
+	cfg      PoolConfig
39
+	db       *pgxpool.Pool
40
+	q        *workerdb.Queries
41
+	handlers map[Kind]Handler
42
+	rng      *mrand.Rand
43
+	mu       sync.Mutex // guards handlers + rng
44
+}
45
+
46
+// NewPool wires a pool against an open pgx pool. Callers register
47
+// handlers before calling Run.
48
+func NewPool(db *pgxpool.Pool, cfg PoolConfig) *Pool {
49
+	if cfg.Workers <= 0 {
50
+		cfg.Workers = 4
51
+	}
52
+	if cfg.IdlePoll <= 0 {
53
+		cfg.IdlePoll = 5 * time.Second
54
+	}
55
+	if cfg.JobTimeout <= 0 {
56
+		cfg.JobTimeout = 5 * time.Minute
57
+	}
58
+	if cfg.InstanceID == "" {
59
+		host, _ := os.Hostname()
60
+		cfg.InstanceID = host + ":" + strconv.Itoa(os.Getpid())
61
+	}
62
+	if cfg.Logger == nil {
63
+		cfg.Logger = slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}))
64
+	}
65
+	return &Pool{
66
+		cfg:      cfg,
67
+		db:       db,
68
+		q:        workerdb.New(),
69
+		handlers: make(map[Kind]Handler),
70
+		// nolint:gosec // G404: jitter is non-cryptographic by design.
71
+		rng: mrand.New(mrand.NewSource(time.Now().UnixNano())),
72
+	}
73
+}
74
+
75
+// Register associates a handler with a kind. Re-registering replaces
76
+// the previous handler. Registration is goroutine-safe so test harnesses
77
+// can swap handlers between runs.
78
+func (p *Pool) Register(kind Kind, h Handler) {
79
+	p.mu.Lock()
80
+	defer p.mu.Unlock()
81
+	p.handlers[kind] = h
82
+}
83
+
84
+// Run blocks until ctx is cancelled. Spawns cfg.Workers worker goroutines
85
+// plus one LISTEN goroutine that fans out wake-ups. Returns nil after a
86
+// clean drain; returns ctx.Err() if drain timed out.
87
+func (p *Pool) Run(ctx context.Context) error {
88
+	p.cfg.Logger.InfoContext(ctx, "worker: starting",
89
+		"workers", p.cfg.Workers,
90
+		"instance_id", p.cfg.InstanceID,
91
+		"kinds", p.kindList())
92
+
93
+	wake := make(chan struct{}, p.cfg.Workers)
94
+	var wg sync.WaitGroup
95
+
96
+	// LISTEN goroutine. Holds a dedicated conn for the lifetime of Run.
97
+	wg.Add(1)
98
+	go func() {
99
+		defer wg.Done()
100
+		p.listenLoop(ctx, wake)
101
+	}()
102
+
103
+	for i := 0; i < p.cfg.Workers; i++ {
104
+		wg.Add(1)
105
+		go func(id int) {
106
+			defer wg.Done()
107
+			p.workerLoop(ctx, id, wake)
108
+		}(i)
109
+	}
110
+
111
+	wg.Wait()
112
+	p.cfg.Logger.InfoContext(ctx, "worker: stopped")
113
+	return nil
114
+}
115
+
116
+func (p *Pool) kindList() []string {
117
+	p.mu.Lock()
118
+	defer p.mu.Unlock()
119
+	out := make([]string, 0, len(p.handlers))
120
+	for k := range p.handlers {
121
+		out = append(out, string(k))
122
+	}
123
+	return out
124
+}
125
+
126
+// listenLoop maintains a LISTEN on NotifyChannel. On each NOTIFY, fan
127
+// out to wake. On reconnect-required errors, sleeps briefly and retries.
128
+func (p *Pool) listenLoop(ctx context.Context, wake chan<- struct{}) {
129
+	for {
130
+		if err := ctx.Err(); err != nil {
131
+			return
132
+		}
133
+		if err := p.listenOnce(ctx, wake); err != nil && !errors.Is(err, context.Canceled) {
134
+			p.cfg.Logger.WarnContext(ctx, "worker: listen restart", "error", err)
135
+			select {
136
+			case <-ctx.Done():
137
+				return
138
+			case <-time.After(2 * time.Second):
139
+			}
140
+		}
141
+	}
142
+}
143
+
144
+func (p *Pool) listenOnce(ctx context.Context, wake chan<- struct{}) error {
145
+	conn, err := p.db.Acquire(ctx)
146
+	if err != nil {
147
+		return fmt.Errorf("acquire: %w", err)
148
+	}
149
+	defer conn.Release()
150
+
151
+	if _, err := conn.Exec(ctx, "LISTEN "+NotifyChannel); err != nil {
152
+		return fmt.Errorf("LISTEN: %w", err)
153
+	}
154
+	for {
155
+		_, err := conn.Conn().WaitForNotification(ctx)
156
+		if err != nil {
157
+			return err
158
+		}
159
+		// Fan out to as many workers as are idle. Non-blocking sends so
160
+		// we don't stall on a saturated pool.
161
+		for i := 0; i < p.cfg.Workers; i++ {
162
+			select {
163
+			case wake <- struct{}{}:
164
+			default:
165
+			}
166
+		}
167
+	}
168
+}
169
+
170
+func (p *Pool) workerLoop(ctx context.Context, id int, wake <-chan struct{}) {
171
+	logger := p.cfg.Logger.With("worker_id", id)
172
+	ticker := time.NewTicker(p.cfg.IdlePoll)
173
+	defer ticker.Stop()
174
+
175
+	for {
176
+		select {
177
+		case <-ctx.Done():
178
+			return
179
+		case <-wake:
180
+		case <-ticker.C:
181
+		}
182
+		// Drain: try every registered kind; if any kind returned a job
183
+		// loop back immediately without waiting on wake/tick.
184
+		for {
185
+			any, err := p.tryClaimAndRun(ctx, logger)
186
+			if err != nil {
187
+				logger.WarnContext(ctx, "worker: claim cycle error", "error", err)
188
+				break
189
+			}
190
+			if !any {
191
+				break
192
+			}
193
+		}
194
+	}
195
+}
196
+
197
+// tryClaimAndRun walks every registered kind and attempts to claim one
198
+// job. Returns true if any kind produced work this pass.
199
+func (p *Pool) tryClaimAndRun(ctx context.Context, logger *slog.Logger) (bool, error) {
200
+	p.mu.Lock()
201
+	kinds := make([]Kind, 0, len(p.handlers))
202
+	for k := range p.handlers {
203
+		kinds = append(kinds, k)
204
+	}
205
+	p.mu.Unlock()
206
+
207
+	any := false
208
+	for _, kind := range kinds {
209
+		if ctx.Err() != nil {
210
+			return any, ctx.Err()
211
+		}
212
+		ran, err := p.runOne(ctx, kind, logger)
213
+		if err != nil {
214
+			return any, err
215
+		}
216
+		if ran {
217
+			any = true
218
+		}
219
+	}
220
+	return any, nil
221
+}
222
+
223
+// runOne claims one job of the given kind, runs it, and records the
224
+// outcome. Returns ran=true when a job was claimed (regardless of
225
+// success), false when the queue had nothing for this kind.
226
+func (p *Pool) runOne(ctx context.Context, kind Kind, logger *slog.Logger) (bool, error) {
227
+	job, err := p.q.ClaimJob(ctx, p.db, workerdb.ClaimJobParams{
228
+		Kind:     string(kind),
229
+		LockedBy: pgtype.Text{String: p.cfg.InstanceID, Valid: true},
230
+	})
231
+	if errors.Is(err, pgx.ErrNoRows) {
232
+		return false, nil
233
+	}
234
+	if err != nil {
235
+		return false, fmt.Errorf("claim %s: %w", kind, err)
236
+	}
237
+
238
+	p.mu.Lock()
239
+	h, ok := p.handlers[Kind(job.Kind)]
240
+	p.mu.Unlock()
241
+	if !ok {
242
+		// Registered handler vanished between claim and dispatch; fail
243
+		// the job rather than block the queue. Should never happen in
244
+		// practice — Register is one-shot at boot.
245
+		_ = p.q.MarkJobFailed(ctx, p.db, workerdb.MarkJobFailedParams{
246
+			ID:        job.ID,
247
+			LastError: pgtype.Text{String: "no handler registered", Valid: true},
248
+		})
249
+		return true, nil
250
+	}
251
+
252
+	jobCtx, cancel := context.WithTimeout(ctx, p.cfg.JobTimeout)
253
+	start := time.Now()
254
+	metrics.WorkerInFlight.WithLabelValues(job.Kind).Inc()
255
+	runErr := safeRun(jobCtx, h, job.Payload)
256
+	metrics.WorkerInFlight.WithLabelValues(job.Kind).Dec()
257
+	cancel()
258
+	metrics.WorkerJobDurationSeconds.WithLabelValues(job.Kind).Observe(time.Since(start).Seconds())
259
+
260
+	logger.InfoContext(ctx, "worker: dispatched",
261
+		"job_id", job.ID,
262
+		"kind", job.Kind,
263
+		"attempt", job.Attempts,
264
+		"duration_ms", time.Since(start).Milliseconds(),
265
+		"ok", runErr == nil,
266
+	)
267
+
268
+	if runErr == nil {
269
+		if err := p.q.MarkJobCompleted(ctx, p.db, job.ID); err != nil {
270
+			logger.ErrorContext(ctx, "worker: mark completed", "job_id", job.ID, "error", err)
271
+		}
272
+		metrics.WorkerJobsProcessedTotal.WithLabelValues(job.Kind, "ok").Inc()
273
+		return true, nil
274
+	}
275
+
276
+	// Failure path. Poison errors skip retry.
277
+	if errors.Is(runErr, ErrPoison) {
278
+		_ = p.q.MarkJobFailed(ctx, p.db, workerdb.MarkJobFailedParams{
279
+			ID:        job.ID,
280
+			LastError: pgtype.Text{String: runErr.Error(), Valid: true},
281
+		})
282
+		metrics.WorkerJobsProcessedTotal.WithLabelValues(job.Kind, "poison").Inc()
283
+		return true, nil
284
+	}
285
+
286
+	if int(job.Attempts) >= int(job.MaxAttempts) {
287
+		_ = p.q.MarkJobFailed(ctx, p.db, workerdb.MarkJobFailedParams{
288
+			ID:        job.ID,
289
+			LastError: pgtype.Text{String: runErr.Error(), Valid: true},
290
+		})
291
+		metrics.WorkerJobsProcessedTotal.WithLabelValues(job.Kind, "failed").Inc()
292
+		return true, nil
293
+	}
294
+
295
+	p.mu.Lock()
296
+	delay := Backoff(int(job.Attempts), p.rng.Float64)
297
+	p.mu.Unlock()
298
+	_ = p.q.RescheduleJob(ctx, p.db, workerdb.RescheduleJobParams{
299
+		ID:        job.ID,
300
+		LastError: pgtype.Text{String: runErr.Error(), Valid: true},
301
+		RunAt:     pgtype.Timestamptz{Time: time.Now().Add(delay), Valid: true},
302
+	})
303
+	metrics.WorkerJobsProcessedTotal.WithLabelValues(job.Kind, "retry").Inc()
304
+	return true, nil
305
+}
306
+
307
+// safeRun wraps the handler in a recover so a panicking handler doesn't
308
+// take the worker goroutine with it; the job is rescheduled like any
309
+// other failure.
310
+func safeRun(ctx context.Context, h Handler, payload json.RawMessage) (err error) {
311
+	defer func() {
312
+		if r := recover(); r != nil {
313
+			err = fmt.Errorf("worker: handler panic: %v", r)
314
+		}
315
+	}()
316
+	return h(ctx, payload)
317
+}
internal/worker/pool_integration_test.goadded
@@ -0,0 +1,234 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package worker_test
4
+
5
+import (
6
+	"context"
7
+	"encoding/json"
8
+	"errors"
9
+	"sync"
10
+	"sync/atomic"
11
+	"testing"
12
+	"time"
13
+
14
+	"github.com/jackc/pgx/v5/pgtype"
15
+
16
+	"github.com/tenseleyFlow/shithub/internal/testing/dbtest"
17
+	"github.com/tenseleyFlow/shithub/internal/worker"
18
+	workerdb "github.com/tenseleyFlow/shithub/internal/worker/sqlc"
19
+)
20
+
21
+// testKind is unique per test so handlers don't bleed across parallel
22
+// tests sharing the worker package's runtime state.
23
+const (
24
+	testKindHappy   worker.Kind = "test:happy"
25
+	testKindRetry   worker.Kind = "test:retry"
26
+	testKindPoison  worker.Kind = "test:poison"
27
+	testKindFanIn50 worker.Kind = "test:fanin50"
28
+)
29
+
30
+// runUntil starts the pool in a goroutine and returns a stop func that
31
+// cancels the context and waits for clean exit.
32
+func runPool(t *testing.T, p *worker.Pool) (cancel func()) {
33
+	t.Helper()
34
+	ctx, c := context.WithCancel(context.Background())
35
+	done := make(chan struct{})
36
+	go func() {
37
+		_ = p.Run(ctx)
38
+		close(done)
39
+	}()
40
+	return func() {
41
+		c()
42
+		select {
43
+		case <-done:
44
+		case <-time.After(10 * time.Second):
45
+			t.Fatal("pool did not stop in 10s")
46
+		}
47
+	}
48
+}
49
+
50
+func TestPool_HappyPath(t *testing.T) {
51
+	t.Parallel()
52
+	pool := dbtest.NewTestDB(t)
53
+
54
+	var seen atomic.Int64
55
+	p := worker.NewPool(pool, worker.PoolConfig{Workers: 2, IdlePoll: 200 * time.Millisecond})
56
+	p.Register(testKindHappy, func(_ context.Context, _ json.RawMessage) error {
57
+		seen.Add(1)
58
+		return nil
59
+	})
60
+	stop := runPool(t, p)
61
+	defer stop()
62
+
63
+	id, err := worker.Enqueue(context.Background(), pool, testKindHappy, map[string]any{"x": 1}, worker.EnqueueOptions{})
64
+	if err != nil {
65
+		t.Fatalf("Enqueue: %v", err)
66
+	}
67
+	if err := worker.Notify(context.Background(), pool); err != nil {
68
+		t.Fatalf("Notify: %v", err)
69
+	}
70
+
71
+	waitFor(t, 5*time.Second, func() bool { return seen.Load() == 1 })
72
+
73
+	q := workerdb.New()
74
+	job, err := q.GetJob(context.Background(), pool, id)
75
+	if err != nil {
76
+		t.Fatalf("GetJob: %v", err)
77
+	}
78
+	if !job.CompletedAt.Valid {
79
+		t.Errorf("job %d: completed_at unset; last_error=%v", id, job.LastError.String)
80
+	}
81
+}
82
+
83
+func TestPool_RetryThenSucceed(t *testing.T) {
84
+	t.Parallel()
85
+	pool := dbtest.NewTestDB(t)
86
+
87
+	var attempts atomic.Int64
88
+	p := worker.NewPool(pool, worker.PoolConfig{Workers: 2, IdlePoll: 100 * time.Millisecond})
89
+	p.Register(testKindRetry, func(_ context.Context, _ json.RawMessage) error {
90
+		if attempts.Add(1) < 2 {
91
+			return errors.New("transient")
92
+		}
93
+		return nil
94
+	})
95
+	stop := runPool(t, p)
96
+	defer stop()
97
+
98
+	// Enqueue with run_at well in the past so reschedule fires immediately.
99
+	id, err := worker.Enqueue(context.Background(), pool, testKindRetry, map[string]any{}, worker.EnqueueOptions{MaxAttempts: 5})
100
+	if err != nil {
101
+		t.Fatalf("Enqueue: %v", err)
102
+	}
103
+	_ = worker.Notify(context.Background(), pool)
104
+
105
+	// Force the rescheduled run_at to "now" so we don't wait the full
106
+	// backoff. We do this by polling: after the first attempt fails,
107
+	// the row's run_at is base * 2 ≈ 60s. We bypass via direct UPDATE.
108
+	q := workerdb.New()
109
+	waitFor(t, 5*time.Second, func() bool { return attempts.Load() >= 1 })
110
+	if _, err := pool.Exec(context.Background(), `UPDATE jobs SET run_at = now() WHERE id = $1`, id); err != nil {
111
+		t.Fatalf("force run_at: %v", err)
112
+	}
113
+	_ = worker.Notify(context.Background(), pool)
114
+
115
+	waitFor(t, 5*time.Second, func() bool { return attempts.Load() >= 2 })
116
+	waitFor(t, 5*time.Second, func() bool {
117
+		j, err := q.GetJob(context.Background(), pool, id)
118
+		return err == nil && j.CompletedAt.Valid
119
+	})
120
+}
121
+
122
+func TestPool_PoisonGoesStraightToFailed(t *testing.T) {
123
+	t.Parallel()
124
+	pool := dbtest.NewTestDB(t)
125
+
126
+	var calls atomic.Int64
127
+	p := worker.NewPool(pool, worker.PoolConfig{Workers: 1, IdlePoll: 100 * time.Millisecond})
128
+	p.Register(testKindPoison, func(_ context.Context, _ json.RawMessage) error {
129
+		calls.Add(1)
130
+		return worker.PoisonError(errors.New("nope"))
131
+	})
132
+	stop := runPool(t, p)
133
+	defer stop()
134
+
135
+	id, err := worker.Enqueue(context.Background(), pool, testKindPoison, map[string]any{}, worker.EnqueueOptions{})
136
+	if err != nil {
137
+		t.Fatalf("Enqueue: %v", err)
138
+	}
139
+	_ = worker.Notify(context.Background(), pool)
140
+
141
+	q := workerdb.New()
142
+	waitFor(t, 5*time.Second, func() bool {
143
+		j, err := q.GetJob(context.Background(), pool, id)
144
+		return err == nil && j.FailedAt.Valid
145
+	})
146
+	if got := calls.Load(); got != 1 {
147
+		t.Errorf("calls = %d, want 1 (no retry on poison)", got)
148
+	}
149
+	j, _ := q.GetJob(context.Background(), pool, id)
150
+	if !j.LastError.Valid || j.LastError.String == "" {
151
+		t.Errorf("last_error not recorded on poison")
152
+	}
153
+}
154
+
155
+func TestPool_ConcurrentClaimsExactlyOnce(t *testing.T) {
156
+	t.Parallel()
157
+	pool := dbtest.NewTestDB(t)
158
+
159
+	const total = 50
160
+	processed := make(map[int64]int) // job_id → times processed
161
+	var mu sync.Mutex
162
+	p := worker.NewPool(pool, worker.PoolConfig{Workers: 4, IdlePoll: 50 * time.Millisecond})
163
+	p.Register(testKindFanIn50, func(_ context.Context, raw json.RawMessage) error {
164
+		var payload struct {
165
+			ID int64 `json:"id"`
166
+		}
167
+		_ = json.Unmarshal(raw, &payload)
168
+		mu.Lock()
169
+		processed[payload.ID]++
170
+		mu.Unlock()
171
+		return nil
172
+	})
173
+	stop := runPool(t, p)
174
+	defer stop()
175
+
176
+	for i := 0; i < total; i++ {
177
+		_, err := worker.Enqueue(context.Background(), pool, testKindFanIn50,
178
+			map[string]any{"id": i}, worker.EnqueueOptions{})
179
+		if err != nil {
180
+			t.Fatalf("Enqueue: %v", err)
181
+		}
182
+	}
183
+	_ = worker.Notify(context.Background(), pool)
184
+
185
+	waitFor(t, 10*time.Second, func() bool {
186
+		mu.Lock()
187
+		defer mu.Unlock()
188
+		return len(processed) == total
189
+	})
190
+
191
+	mu.Lock()
192
+	defer mu.Unlock()
193
+	for id, count := range processed {
194
+		if count != 1 {
195
+			t.Errorf("job %d processed %d times, want 1", id, count)
196
+		}
197
+	}
198
+}
199
+
200
+func TestEnqueue_DelayedRunAt(t *testing.T) {
201
+	t.Parallel()
202
+	pool := dbtest.NewTestDB(t)
203
+	future := time.Now().Add(1 * time.Hour)
204
+	id, err := worker.Enqueue(context.Background(), pool, "test:delayed", map[string]any{}, worker.EnqueueOptions{
205
+		RunAt: pgtype.Timestamptz{Time: future, Valid: true},
206
+	})
207
+	if err != nil {
208
+		t.Fatalf("Enqueue: %v", err)
209
+	}
210
+	q := workerdb.New()
211
+	job, err := q.GetJob(context.Background(), pool, id)
212
+	if err != nil {
213
+		t.Fatalf("GetJob: %v", err)
214
+	}
215
+	if !job.RunAt.Time.Equal(future.UTC().Truncate(time.Microsecond)) {
216
+		// pg truncates to microseconds; allow a tiny delta.
217
+		if d := job.RunAt.Time.Sub(future); d > time.Second || d < -time.Second {
218
+			t.Errorf("run_at = %v, want %v", job.RunAt.Time, future)
219
+		}
220
+	}
221
+}
222
+
223
+// waitFor polls cond every 50ms up to limit. Fails the test on timeout.
224
+func waitFor(t *testing.T, limit time.Duration, cond func() bool) {
225
+	t.Helper()
226
+	deadline := time.Now().Add(limit)
227
+	for time.Now().Before(deadline) {
228
+		if cond() {
229
+			return
230
+		}
231
+		time.Sleep(50 * time.Millisecond)
232
+	}
233
+	t.Fatalf("waitFor: condition not met within %v", limit)
234
+}
internal/worker/types.goadded
@@ -0,0 +1,90 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+// Package worker drives the Postgres-backed job queue introduced in
4
+// S14. The pool dispatches one Job per goroutine, claiming rows via
5
+// FOR UPDATE SKIP LOCKED so concurrent workers don't double-process.
6
+//
7
+// Job kinds, their payload schema, and their handlers live alongside
8
+// this package in sub-packages (jobs/<kind>.go). The pool itself is
9
+// kind-agnostic: a Handler is just a func that takes a payload and
10
+// returns nil-or-error.
11
+package worker
12
+
13
+import (
14
+	"context"
15
+	"encoding/json"
16
+	"errors"
17
+	"fmt"
18
+	"time"
19
+)
20
+
21
+// Kind is the canonical name of a job. Use lowercase letters and
22
+// colon-separated namespaces (e.g. "push:process", "repo:size_recalc").
23
+// Kind doubles as the dispatch index — workers query
24
+// `WHERE kind = $1` so adding new kinds doesn't disturb existing ones.
25
+type Kind string
26
+
27
+// Built-in kinds shipped in S14.
28
+const (
29
+	KindPushProcess    Kind = "push:process"
30
+	KindRepoSizeRecalc Kind = "repo:size_recalc"
31
+	KindJobsPurge      Kind = "jobs:purge_completed"
32
+)
33
+
34
+// NotifyChannel is the Postgres LISTEN/NOTIFY channel the pool subscribes
35
+// to so it wakes up immediately when a job is enqueued, instead of
36
+// polling. Callers wrapping enqueue in a tx must NOTIFY inside the
37
+// same tx so the notification only fires on commit.
38
+const NotifyChannel = "shithub_jobs"
39
+
40
+// Handler runs one job. The framework supplies the raw JSON payload;
41
+// handlers Unmarshal into their own typed schema. A nil error reports
42
+// success and the job is marked completed; any non-nil error triggers
43
+// the backoff/retry path. ErrPoison is the explicit "do not retry" signal
44
+// — useful when the input is malformed and retrying can't help.
45
+type Handler func(ctx context.Context, payload json.RawMessage) error
46
+
47
+// ErrPoison wraps a handler error that should NOT be retried. The pool
48
+// jumps the job straight to MarkJobFailed instead of rescheduling.
49
+var ErrPoison = errors.New("worker: poison job")
50
+
51
+// PoisonError wraps cause as a poison error. The cause is preserved in
52
+// last_error for operator inspection.
53
+func PoisonError(cause error) error {
54
+	return fmt.Errorf("%w: %v", ErrPoison, cause)
55
+}
56
+
57
+// Backoff returns the delay before retrying a job that is about to be
58
+// rescheduled. The formula is `30s * 2^attempts` capped at 1 hour, with
59
+// ±20% jitter so a fleet doesn't synchronize retries on a sibling
60
+// dependency outage.
61
+//
62
+// `attempts` is the number of attempts already made (1-indexed: the
63
+// just-failed attempt counts as 1).
64
+func Backoff(attempts int, jitter func() float64) time.Duration {
65
+	if attempts < 1 {
66
+		attempts = 1
67
+	}
68
+	const (
69
+		base = 30 * time.Second
70
+		cap_ = time.Hour
71
+	)
72
+	// Compute base * 2^(attempts-1), guarding against overflow.
73
+	d := base
74
+	for i := 1; i < attempts; i++ {
75
+		d *= 2
76
+		if d >= cap_ {
77
+			d = cap_
78
+			break
79
+		}
80
+	}
81
+	if d > cap_ {
82
+		d = cap_
83
+	}
84
+	if jitter != nil {
85
+		// jitter() in [0,1) → multiplier in [0.8, 1.2)
86
+		mult := 0.8 + 0.4*jitter()
87
+		d = time.Duration(float64(d) * mult)
88
+	}
89
+	return d
90
+}