tenseleyflow/shithub / 74d1b46

Browse files

worker/jobs: gpg:backfill handler + KindGPGBackfill registration

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
74d1b46f33df22476bb8aebec3b91f2eb4f64a76
Parents
61261e7
Tree
bb47310

3 changed files

StatusFile+-
M cmd/shithubd/worker.go 4 0
A internal/worker/jobs/gpg_backfill.go 220 0
M internal/worker/types.go 11 0
cmd/shithubd/worker.gomodified
@@ -150,6 +150,10 @@ var workerCmd = &cobra.Command{
150
 			Pool: pool, Logger: logger, Stripe: stripeRemote,
150
 			Pool: pool, Logger: logger, Stripe: stripeRemote,
151
 		}))
151
 		}))
152
 
152
 
153
+		p.Register(worker.KindGPGBackfill, jobs.GPGBackfill(jobs.GPGBackfillDeps{
154
+			Pool: pool, RepoFS: rfs, Logger: logger,
155
+		}))
156
+
153
 		notifSender, _ := pickNotifEmailSender(cfg)
157
 		notifSender, _ := pickNotifEmailSender(cfg)
154
 		p.Register(worker.KindNotifyFanout, jobs.NotifyFanout(jobs.NotifyFanoutDeps{
158
 		p.Register(worker.KindNotifyFanout, jobs.NotifyFanout(jobs.NotifyFanoutDeps{
155
 			Pool:           pool,
159
 			Pool:           pool,
internal/worker/jobs/gpg_backfill.goadded
@@ -0,0 +1,220 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package jobs
4
+
5
+import (
6
+	"bufio"
7
+	"bytes"
8
+	"context"
9
+	"encoding/json"
10
+	"errors"
11
+	"fmt"
12
+	"log/slog"
13
+	"os/exec"
14
+	"strings"
15
+	"time"
16
+
17
+	"github.com/jackc/pgx/v5"
18
+	"github.com/jackc/pgx/v5/pgxpool"
19
+
20
+	"github.com/tenseleyFlow/shithub/internal/infra/storage"
21
+	"github.com/tenseleyFlow/shithub/internal/repos/sigverify"
22
+	reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
23
+	"github.com/tenseleyFlow/shithub/internal/worker"
24
+)
25
+
26
+// GPGBackfillDeps wires the gpg:backfill handler.
27
+type GPGBackfillDeps struct {
28
+	Pool   *pgxpool.Pool
29
+	RepoFS *storage.RepoFS
30
+	Logger *slog.Logger
31
+}
32
+
33
+// GPGBackfillPayload mirrors sigverify.BackfillPayload — duplicated
34
+// here so the jobs package doesn't pull sigverify in just for the
35
+// type definition. JSON wire shape MUST stay identical to
36
+// sigverify.BackfillPayload; both forms unmarshal the same bytes.
37
+type GPGBackfillPayload struct {
38
+	RepoID int64 `json:"repo_id"`
39
+}
40
+
41
+// perCommitTimeout bounds a single object's verification. A
42
+// pathological commit object (huge gpgsig with deep continuation
43
+// lines) shouldn't stall the whole queue.
44
+const perCommitTimeout = 5 * time.Second
45
+
46
+// GPGBackfill is the worker.Handler for KindGPGBackfill. One job per
47
+// repo; the handler enumerates every commit on the default branch
48
+// and every annotated tag, then runs sigverify.Verify / VerifyTag
49
+// and writes the result to commit_verification_cache.
50
+//
51
+// The handler is idempotent thanks to UpsertCommitVerification's
52
+// ON CONFLICT clause — re-running this job is safe and is in fact
53
+// the documented recovery path for a partially-completed backfill.
54
+//
55
+// Failure semantics: any per-commit cat-file failure is logged and
56
+// SKIPPED (not retried at the job level) so one corrupted object
57
+// doesn't poison the whole repo's backfill. The job itself returns
58
+// nil unless the repo lookup or git env is unreachable; those
59
+// surface as retryable errors so the worker pool's backoff kicks
60
+// in.
61
+func GPGBackfill(deps GPGBackfillDeps) worker.Handler {
62
+	return func(ctx context.Context, raw json.RawMessage) error {
63
+		var p GPGBackfillPayload
64
+		if err := json.Unmarshal(raw, &p); err != nil {
65
+			return worker.PoisonError(fmt.Errorf("bad payload: %w", err))
66
+		}
67
+		if p.RepoID == 0 {
68
+			return worker.PoisonError(errors.New("missing repo_id"))
69
+		}
70
+
71
+		rq := reposdb.New()
72
+		row, err := rq.GetRepoForBackfill(ctx, deps.Pool, p.RepoID)
73
+		if err != nil {
74
+			if errors.Is(err, pgx.ErrNoRows) {
75
+				// Repo was deleted between enqueue and dispatch.
76
+				// Poison so we don't retry a deleted target.
77
+				return worker.PoisonError(fmt.Errorf("repo %d not found", p.RepoID))
78
+			}
79
+			return fmt.Errorf("load repo: %w", err)
80
+		}
81
+
82
+		gitDir, err := deps.RepoFS.RepoPath(row.Owner, row.Name)
83
+		if err != nil {
84
+			return worker.PoisonError(fmt.Errorf("repo path: %w", err))
85
+		}
86
+
87
+		lookups := sigverify.NewSQLCLookups(deps.Pool)
88
+
89
+		commitsProcessed, err := backfillCommits(ctx, deps, gitDir, p.RepoID, row.DefaultBranch, lookups)
90
+		if err != nil {
91
+			return fmt.Errorf("backfill commits: %w", err)
92
+		}
93
+		tagsProcessed, err := backfillTags(ctx, deps, gitDir, p.RepoID, lookups)
94
+		if err != nil {
95
+			return fmt.Errorf("backfill tags: %w", err)
96
+		}
97
+
98
+		deps.Logger.InfoContext(ctx, "gpg backfill completed",
99
+			"repo_id", p.RepoID,
100
+			"commits", commitsProcessed,
101
+			"tags", tagsProcessed,
102
+		)
103
+		return nil
104
+	}
105
+}
106
+
107
+// backfillCommits walks every commit on the default branch and
108
+// verifies each. Returns the number processed (signed + unsigned;
109
+// the cache stamps both so future "is this verified" reads don't
110
+// re-walk).
111
+func backfillCommits(
112
+	ctx context.Context,
113
+	deps GPGBackfillDeps,
114
+	gitDir string,
115
+	repoID int64,
116
+	defaultBranch string,
117
+	lookups sigverify.Lookups,
118
+) (int, error) {
119
+	// Empty default branch (uninitialized repo) → nothing to walk.
120
+	if defaultBranch == "" {
121
+		return 0, nil
122
+	}
123
+
124
+	cmd := exec.CommandContext(ctx, "git", "-C", gitDir, "rev-list", defaultBranch)
125
+	stdout, err := cmd.StdoutPipe()
126
+	if err != nil {
127
+		return 0, fmt.Errorf("rev-list pipe: %w", err)
128
+	}
129
+	var stderr bytes.Buffer
130
+	cmd.Stderr = &stderr
131
+	if err := cmd.Start(); err != nil {
132
+		return 0, fmt.Errorf("rev-list start: %w", err)
133
+	}
134
+
135
+	rq := reposdb.New()
136
+	scanner := bufio.NewScanner(stdout)
137
+	count := 0
138
+	for scanner.Scan() {
139
+		oid := strings.TrimSpace(scanner.Text())
140
+		if len(oid) != 40 {
141
+			continue
142
+		}
143
+		if err := ctx.Err(); err != nil {
144
+			return count, err
145
+		}
146
+		verifyCtx, cancel := context.WithTimeout(ctx, perCommitTimeout)
147
+		result, vErr := sigverify.Verify(verifyCtx, gitDir, oid, lookups)
148
+		cancel()
149
+		if vErr != nil {
150
+			deps.Logger.WarnContext(ctx, "verify commit failed; skipping",
151
+				"oid", oid, "err", vErr)
152
+			continue
153
+		}
154
+		if wErr := sigverify.WriteResult(ctx, rq, deps.Pool, repoID, oid, sigverify.KindCommit, result); wErr != nil {
155
+			deps.Logger.WarnContext(ctx, "cache write failed; skipping",
156
+				"oid", oid, "err", wErr)
157
+			continue
158
+		}
159
+		count++
160
+	}
161
+	if err := scanner.Err(); err != nil {
162
+		return count, fmt.Errorf("scan rev-list: %w", err)
163
+	}
164
+	if err := cmd.Wait(); err != nil {
165
+		return count, fmt.Errorf("rev-list: %w: %s", err, stderr.String())
166
+	}
167
+	return count, nil
168
+}
169
+
170
+// backfillTags walks every annotated tag in the repo and verifies
171
+// each. Lightweight tags (which carry no signature) are skipped.
172
+// Returns the number processed.
173
+func backfillTags(
174
+	ctx context.Context,
175
+	deps GPGBackfillDeps,
176
+	gitDir string,
177
+	repoID int64,
178
+	lookups sigverify.Lookups,
179
+) (int, error) {
180
+	// for-each-ref filters to refs/tags and emits 'oid type'; we
181
+	// only want annotated tags (type=tag).
182
+	cmd := exec.CommandContext(ctx, "git", "-C", gitDir,
183
+		"for-each-ref", "--format=%(objectname) %(objecttype)", "refs/tags",
184
+	)
185
+	out, err := cmd.Output()
186
+	if err != nil {
187
+		return 0, fmt.Errorf("for-each-ref tags: %w", err)
188
+	}
189
+
190
+	rq := reposdb.New()
191
+	count := 0
192
+	for _, line := range strings.Split(strings.TrimSpace(string(out)), "\n") {
193
+		if line == "" {
194
+			continue
195
+		}
196
+		fields := strings.Fields(line)
197
+		if len(fields) != 2 || fields[1] != "tag" {
198
+			continue
199
+		}
200
+		oid := fields[0]
201
+		if err := ctx.Err(); err != nil {
202
+			return count, err
203
+		}
204
+		verifyCtx, cancel := context.WithTimeout(ctx, perCommitTimeout)
205
+		result, vErr := sigverify.VerifyTag(verifyCtx, gitDir, oid, lookups)
206
+		cancel()
207
+		if vErr != nil {
208
+			deps.Logger.WarnContext(ctx, "verify tag failed; skipping",
209
+				"oid", oid, "err", vErr)
210
+			continue
211
+		}
212
+		if wErr := sigverify.WriteResult(ctx, rq, deps.Pool, repoID, oid, sigverify.KindTag, result); wErr != nil {
213
+			deps.Logger.WarnContext(ctx, "cache write failed; skipping",
214
+				"oid", oid, "err", wErr)
215
+			continue
216
+		}
217
+		count++
218
+	}
219
+	return count, nil
220
+}
internal/worker/types.gomodified
@@ -94,6 +94,17 @@ const (
94
 	KindOrgBillingSeatSync Kind = "org:billing_seat_sync"
94
 	KindOrgBillingSeatSync Kind = "org:billing_seat_sync"
95
 )
95
 )
96
 
96
 
97
+// S51 GPG signature verification backfill. One job per repo;
98
+// payload {repo_id}. The handler walks the repo's default branch +
99
+// annotated tags and writes commit_verification_cache rows for every
100
+// signed object. Dispatched both eagerly (one job per repo when a
101
+// user adds a GPG key — DispatchForKey) and as a bulk admin command
102
+// (shithubd gpg-backfill-all — DispatchAll). The handler is
103
+// idempotent thanks to UpsertCommitVerification's ON CONFLICT.
104
+const (
105
+	KindGPGBackfill Kind = "gpg:backfill"
106
+)
107
+
97
 // NotifyChannel is the Postgres LISTEN/NOTIFY channel the pool subscribes
108
 // NotifyChannel is the Postgres LISTEN/NOTIFY channel the pool subscribes
98
 // to so it wakes up immediately when a job is enqueued, instead of
109
 // to so it wakes up immediately when a job is enqueued, instead of
99
 // polling. Callers wrapping enqueue in a tx must NOTIFY inside the
110
 // polling. Callers wrapping enqueue in a tx must NOTIFY inside the