Go · 51004 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 package api_test
4
5 import (
6 "bytes"
7 "context"
8 "encoding/base64"
9 "encoding/json"
10 "fmt"
11 "io"
12 "log/slog"
13 "net/http"
14 "net/http/httptest"
15 "strings"
16 "testing"
17 "time"
18
19 "github.com/go-chi/chi/v5"
20 "github.com/jackc/pgx/v5/pgtype"
21 "github.com/jackc/pgx/v5/pgxpool"
22 dto "github.com/prometheus/client_model/go"
23
24 "github.com/tenseleyFlow/shithub/internal/actions/finalize"
25 actionslifecycle "github.com/tenseleyFlow/shithub/internal/actions/lifecycle"
26 "github.com/tenseleyFlow/shithub/internal/actions/runnertoken"
27 actionsecrets "github.com/tenseleyFlow/shithub/internal/actions/secrets"
28 actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
29 "github.com/tenseleyFlow/shithub/internal/actions/trigger"
30 "github.com/tenseleyFlow/shithub/internal/actions/workflow"
31 "github.com/tenseleyFlow/shithub/internal/auth/pat"
32 "github.com/tenseleyFlow/shithub/internal/auth/runnerjwt"
33 "github.com/tenseleyFlow/shithub/internal/auth/secretbox"
34 "github.com/tenseleyFlow/shithub/internal/infra/metrics"
35 "github.com/tenseleyFlow/shithub/internal/infra/storage"
36 "github.com/tenseleyFlow/shithub/internal/ratelimit"
37 repogit "github.com/tenseleyFlow/shithub/internal/repos/git"
38 reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
39 "github.com/tenseleyFlow/shithub/internal/testing/dbtest"
40 usersdb "github.com/tenseleyFlow/shithub/internal/users/sqlc"
41 apih "github.com/tenseleyFlow/shithub/internal/web/handlers/api"
42 "github.com/tenseleyFlow/shithub/internal/web/handlers/api/apilimit"
43 workerdb "github.com/tenseleyFlow/shithub/internal/worker/sqlc"
44 )
45
46 const runnerAPIFixtureHash = "$argon2id$v=19$m=16384,t=1,p=1$" +
47 "AAAAAAAAAAAAAAAA$" +
48 "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
49
50 func TestRunnerHeartbeatClaimsQueuedJob(t *testing.T) {
51 ctx := context.Background()
52 pool := dbtest.NewTestDB(t)
53 logger := slog.New(slog.NewTextHandler(io.Discard, nil))
54 repoID, userID := setupRunnerAPIRepo(t, pool)
55 runID := enqueueRunnerAPIRun(t, pool, logger, repoID, userID)
56
57 token, runnerID := registerRunnerForTest(t, pool, []string{"ubuntu-latest", "linux"}, 1)
58 signer := runnerAPISigner(t, time.Date(2026, 5, 10, 12, 0, 0, 0, time.UTC))
59 router := newRunnerAPIRouter(t, pool, logger, signer)
60
61 req := httptest.NewRequest(http.MethodPost, "/api/v1/runners/heartbeat",
62 strings.NewReader(`{"labels":["ubuntu-latest","linux"],"capacity":1,"host_name":"runner-host","version":"dev-test"}`))
63 req.Header.Set("Authorization", "Bearer "+token)
64 rr := httptest.NewRecorder()
65 router.ServeHTTP(rr, req)
66
67 if rr.Code != http.StatusOK {
68 t.Fatalf("status: got %d, want 200; body=%s", rr.Code, rr.Body.String())
69 }
70 var resp struct {
71 Token string `json:"token"`
72 Job struct {
73 ID int64 `json:"id"`
74 RunID int64 `json:"run_id"`
75 RepoID int64 `json:"repo_id"`
76 CheckoutURL string `json:"checkout_url"`
77 CheckoutToken string `json:"checkout_token"`
78 EventPayload map[string]any `json:"event_payload"`
79 Steps []struct {
80 Run string `json:"run"`
81 Uses string `json:"uses"`
82 } `json:"steps"`
83 } `json:"job"`
84 }
85 if err := json.Unmarshal(rr.Body.Bytes(), &resp); err != nil {
86 t.Fatalf("decode response: %v", err)
87 }
88 if resp.Token == "" {
89 t.Fatal("response token is empty")
90 }
91 if resp.Job.RunID != runID || resp.Job.RepoID != repoID || len(resp.Job.Steps) != 2 {
92 t.Fatalf("unexpected job payload: %+v", resp.Job)
93 }
94 if resp.Job.CheckoutURL != "https://shithub.test/alice/demo.git" || resp.Job.CheckoutToken == "" {
95 t.Fatalf("checkout payload: url=%q token_empty=%t", resp.Job.CheckoutURL, resp.Job.CheckoutToken == "")
96 }
97 if resp.Job.EventPayload["ref"] != "refs/heads/trunk" {
98 t.Fatalf("event payload not returned to runner: %#v", resp.Job.EventPayload)
99 }
100 claims, err := signer.Verify(resp.Token)
101 if err != nil {
102 t.Fatalf("verify runner JWT: %v", err)
103 }
104 if claims.JobID != resp.Job.ID || claims.RunID != runID || claims.RepoID != repoID {
105 t.Fatalf("claims/job mismatch: claims=%+v job=%+v", claims, resp.Job)
106 }
107 claimRunnerID, err := claims.RunnerID()
108 if err != nil {
109 t.Fatalf("claims RunnerID: %v", err)
110 }
111 if claimRunnerID != runnerID {
112 t.Fatalf("claims runner_id: got %d, want %d", claimRunnerID, runnerID)
113 }
114 if claims.Purpose != runnerjwt.PurposeAPI {
115 t.Fatalf("api token purpose: got %q", claims.Purpose)
116 }
117 checkoutClaims, err := signer.Verify(resp.Job.CheckoutToken)
118 if err != nil {
119 t.Fatalf("verify checkout token: %v", err)
120 }
121 if checkoutClaims.JobID != resp.Job.ID || checkoutClaims.RunID != runID || checkoutClaims.RepoID != repoID ||
122 checkoutClaims.Purpose != runnerjwt.PurposeCheckout {
123 t.Fatalf("checkout claims/job mismatch: claims=%+v job=%+v", checkoutClaims, resp.Job)
124 }
125 runnerRow, err := actionsdb.New().GetRunnerByID(ctx, pool, runnerID)
126 if err != nil {
127 t.Fatalf("GetRunnerByID: %v", err)
128 }
129 if runnerRow.HostName != "runner-host" || runnerRow.Version != "dev-test" {
130 t.Fatalf("runner metadata: host=%q version=%q", runnerRow.HostName, runnerRow.Version)
131 }
132
133 var logResp struct {
134 Accepted bool `json:"accepted"`
135 NextToken string `json:"next_token"`
136 }
137 logBody := fmt.Sprintf(`{"seq":0,"chunk":%q}`, base64.StdEncoding.EncodeToString([]byte("hello\n")))
138 req = httptest.NewRequest(http.MethodPost, fmt.Sprintf("/api/v1/jobs/%d/logs", resp.Job.ID), strings.NewReader(logBody))
139 req.Header.Set("Authorization", "Bearer "+resp.Token)
140 rr = httptest.NewRecorder()
141 router.ServeHTTP(rr, req)
142 if rr.Code != http.StatusAccepted {
143 t.Fatalf("logs status: got %d, want 202; body=%s", rr.Code, rr.Body.String())
144 }
145 if err := json.Unmarshal(rr.Body.Bytes(), &logResp); err != nil {
146 t.Fatalf("decode log response: %v", err)
147 }
148 if !logResp.Accepted || logResp.NextToken == "" {
149 t.Fatalf("unexpected log response: %+v", logResp)
150 }
151
152 req = httptest.NewRequest(http.MethodPost, fmt.Sprintf("/api/v1/jobs/%d/logs", resp.Job.ID), strings.NewReader(logBody))
153 req.Header.Set("Authorization", "Bearer "+resp.Token)
154 rr = httptest.NewRecorder()
155 router.ServeHTTP(rr, req)
156 if rr.Code != http.StatusUnauthorized {
157 t.Fatalf("replay status: got %d, want 401; body=%s", rr.Code, rr.Body.String())
158 }
159
160 req = httptest.NewRequest(http.MethodPost, fmt.Sprintf("/api/v1/jobs/%d/status", resp.Job.ID),
161 strings.NewReader(`{"status":"completed","conclusion":"success"}`))
162 req.Header.Set("Authorization", "Bearer "+logResp.NextToken)
163 rr = httptest.NewRecorder()
164 router.ServeHTTP(rr, req)
165 if rr.Code != http.StatusOK {
166 t.Fatalf("complete status: got %d, want 200; body=%s", rr.Code, rr.Body.String())
167 }
168
169 q := actionsdb.New()
170 job, err := q.GetWorkflowJobByID(ctx, pool, resp.Job.ID)
171 if err != nil {
172 t.Fatalf("GetWorkflowJobByID: %v", err)
173 }
174 if job.Status != actionsdb.WorkflowJobStatusCompleted || !job.RunnerID.Valid || job.RunnerID.Int64 != runnerID ||
175 !job.Conclusion.Valid || job.Conclusion.CheckConclusion != actionsdb.CheckConclusionSuccess {
176 t.Fatalf("job not completed by runner: %+v", job)
177 }
178 run, err := q.GetWorkflowRunByID(ctx, pool, runID)
179 if err != nil {
180 t.Fatalf("GetWorkflowRunByID: %v", err)
181 }
182 if run.Status != actionsdb.WorkflowRunStatusCompleted ||
183 !run.Conclusion.Valid || run.Conclusion.CheckConclusion != actionsdb.CheckConclusionSuccess {
184 t.Fatalf("run not completed successfully: %+v", run)
185 }
186
187 // The completed job is no longer counted against capacity, and no other
188 // queued job exists, so the heartbeat is an empty 204.
189 req = httptest.NewRequest(http.MethodPost, "/api/v1/runners/heartbeat",
190 strings.NewReader(`{"labels":["ubuntu-latest","linux"],"capacity":1}`))
191 req.Header.Set("Authorization", "Bearer "+token)
192 rr = httptest.NewRecorder()
193 router.ServeHTTP(rr, req)
194 if rr.Code != http.StatusNoContent {
195 t.Fatalf("second heartbeat status: got %d, want 204; body=%s", rr.Code, rr.Body.String())
196 }
197 }
198
199 func TestRunnerHeartbeatRespectsRepoConcurrencyCap(t *testing.T) {
200 ctx := context.Background()
201 pool := dbtest.NewTestDB(t)
202 logger := slog.New(slog.NewTextHandler(io.Discard, nil))
203 repoID, userID := setupRunnerAPIRepo(t, pool)
204 q := actionsdb.New()
205 if _, err := q.UpsertActionsRepoPolicy(ctx, pool, actionsdb.UpsertActionsRepoPolicyParams{
206 RepoID: repoID,
207 ActionsEnabled: actionsdb.ActionsPolicyStateInherit,
208 MaxRepoConcurrentJobs: pgtype.Int4{Int32: 1, Valid: true},
209 }); err != nil {
210 t.Fatalf("UpsertActionsRepoPolicy: %v", err)
211 }
212
213 firstRunID := enqueueRunnerAPIRunWithTriggerID(t, pool, logger, repoID, userID, "repo-cap-1")
214 token, _ := registerRunnerForTest(t, pool, []string{"ubuntu-latest", "linux"}, 2)
215 router := newRunnerAPIRouter(t, pool, logger, runnerAPISigner(t, time.Now()))
216 firstClaim := claimRunnerHeartbeat(t, router, token, 2)
217 if firstClaim.Job.RunID != firstRunID {
218 t.Fatalf("first claim run_id=%d want %d", firstClaim.Job.RunID, firstRunID)
219 }
220
221 secondRunID := enqueueRunnerAPIRunWithTriggerID(t, pool, logger, repoID, userID, "repo-cap-2")
222 req := httptest.NewRequest(http.MethodPost, "/api/v1/runners/heartbeat",
223 strings.NewReader(`{"labels":["ubuntu-latest","linux"],"capacity":2}`))
224 req.Header.Set("Authorization", "Bearer "+token)
225 rr := httptest.NewRecorder()
226 router.ServeHTTP(rr, req)
227 if rr.Code != http.StatusNoContent {
228 t.Fatalf("capped heartbeat status: got %d, want 204; body=%s", rr.Code, rr.Body.String())
229 }
230 jobs, err := q.ListJobsForRun(ctx, pool, secondRunID)
231 if err != nil {
232 t.Fatalf("ListJobsForRun: %v", err)
233 }
234 if len(jobs) != 1 || jobs[0].Status != actionsdb.WorkflowJobStatusQueued {
235 t.Fatalf("second run job should remain queued/unclaimed: %+v", jobs)
236 }
237 secondJob, err := q.GetWorkflowJobByID(ctx, pool, jobs[0].ID)
238 if err != nil {
239 t.Fatalf("GetWorkflowJobByID: %v", err)
240 }
241 if secondJob.RunnerID.Valid {
242 t.Fatalf("second run job should not have a runner: %+v", secondJob)
243 }
244
245 if _, err := q.UpdateWorkflowJobStatus(ctx, pool, actionsdb.UpdateWorkflowJobStatusParams{
246 ID: firstClaim.Job.ID,
247 Status: actionsdb.WorkflowJobStatusCompleted,
248 Conclusion: actionsdb.NullCheckConclusion{CheckConclusion: actionsdb.CheckConclusionSuccess, Valid: true},
249 CompletedAt: pgtype.Timestamptz{
250 Time: time.Now(),
251 Valid: true,
252 },
253 }); err != nil {
254 t.Fatalf("UpdateWorkflowJobStatus: %v", err)
255 }
256 secondClaim := claimRunnerHeartbeat(t, router, token, 2)
257 if secondClaim.Job.RunID != secondRunID {
258 t.Fatalf("second claim run_id=%d want %d", secondClaim.Job.RunID, secondRunID)
259 }
260 }
261
262 func TestRunnerHeartbeatRespectsOwnerConcurrencyCap(t *testing.T) {
263 ctx := context.Background()
264 pool := dbtest.NewTestDB(t)
265 logger := slog.New(slog.NewTextHandler(io.Discard, nil))
266 firstRepoID, userID := setupRunnerAPIRepo(t, pool)
267 secondRepo, err := reposdb.New().CreateRepo(ctx, pool, reposdb.CreateRepoParams{
268 OwnerUserID: pgtype.Int8{Int64: userID, Valid: true},
269 Name: "second",
270 DefaultBranch: "trunk",
271 Visibility: reposdb.RepoVisibilityPublic,
272 })
273 if err != nil {
274 t.Fatalf("CreateRepo second: %v", err)
275 }
276 q := actionsdb.New()
277 if _, err := q.UpsertActionsRepoPolicy(ctx, pool, actionsdb.UpsertActionsRepoPolicyParams{
278 RepoID: secondRepo.ID,
279 ActionsEnabled: actionsdb.ActionsPolicyStateInherit,
280 MaxOwnerConcurrentJobs: pgtype.Int4{Int32: 1, Valid: true},
281 }); err != nil {
282 t.Fatalf("UpsertActionsRepoPolicy: %v", err)
283 }
284
285 firstRunID := enqueueRunnerAPIRunWithTriggerID(t, pool, logger, firstRepoID, userID, "owner-cap-1")
286 token, _ := registerRunnerForTest(t, pool, []string{"ubuntu-latest", "linux"}, 2)
287 router := newRunnerAPIRouter(t, pool, logger, runnerAPISigner(t, time.Now()))
288 firstClaim := claimRunnerHeartbeat(t, router, token, 2)
289 if firstClaim.Job.RunID != firstRunID {
290 t.Fatalf("first claim run_id=%d want %d", firstClaim.Job.RunID, firstRunID)
291 }
292
293 secondRunID := enqueueRunnerAPIRunWithTriggerID(t, pool, logger, secondRepo.ID, userID, "owner-cap-2")
294 req := httptest.NewRequest(http.MethodPost, "/api/v1/runners/heartbeat",
295 strings.NewReader(`{"labels":["ubuntu-latest","linux"],"capacity":2}`))
296 req.Header.Set("Authorization", "Bearer "+token)
297 rr := httptest.NewRecorder()
298 router.ServeHTTP(rr, req)
299 if rr.Code != http.StatusNoContent {
300 t.Fatalf("owner-capped heartbeat status: got %d, want 204; body=%s", rr.Code, rr.Body.String())
301 }
302 jobs, err := q.ListJobsForRun(ctx, pool, secondRunID)
303 if err != nil {
304 t.Fatalf("ListJobsForRun: %v", err)
305 }
306 if len(jobs) != 1 || jobs[0].Status != actionsdb.WorkflowJobStatusQueued {
307 t.Fatalf("second owner job should remain queued/unclaimed: %+v", jobs)
308 }
309 secondJob, err := q.GetWorkflowJobByID(ctx, pool, jobs[0].ID)
310 if err != nil {
311 t.Fatalf("GetWorkflowJobByID: %v", err)
312 }
313 if secondJob.RunnerID.Valid {
314 t.Fatalf("second owner job should not have a runner: %+v", secondJob)
315 }
316 }
317
318 func TestRunnerHeartbeatSiteDisableOverridesRepoEnable(t *testing.T) {
319 ctx := context.Background()
320 pool := dbtest.NewTestDB(t)
321 logger := slog.New(slog.NewTextHandler(io.Discard, nil))
322 repoID, userID := setupRunnerAPIRepo(t, pool)
323 q := actionsdb.New()
324 if _, err := q.UpsertActionsRepoPolicy(ctx, pool, actionsdb.UpsertActionsRepoPolicyParams{
325 RepoID: repoID,
326 ActionsEnabled: actionsdb.ActionsPolicyStateEnabled,
327 }); err != nil {
328 t.Fatalf("UpsertActionsRepoPolicy: %v", err)
329 }
330 runID := enqueueRunnerAPIRunWithTriggerID(t, pool, logger, repoID, userID, "site-disable-claim")
331 if _, err := pool.Exec(ctx, `UPDATE actions_site_policy SET actions_enabled = false WHERE id = true`); err != nil {
332 t.Fatalf("disable site policy: %v", err)
333 }
334
335 token, _ := registerRunnerForTest(t, pool, []string{"ubuntu-latest", "linux"}, 1)
336 router := newRunnerAPIRouter(t, pool, logger, runnerAPISigner(t, time.Now()))
337 req := httptest.NewRequest(http.MethodPost, "/api/v1/runners/heartbeat",
338 strings.NewReader(`{"labels":["ubuntu-latest","linux"],"capacity":1}`))
339 req.Header.Set("Authorization", "Bearer "+token)
340 rr := httptest.NewRecorder()
341 router.ServeHTTP(rr, req)
342 if rr.Code != http.StatusNoContent {
343 t.Fatalf("site-disabled heartbeat status: got %d, want 204; body=%s", rr.Code, rr.Body.String())
344 }
345 jobs, err := q.ListJobsForRun(ctx, pool, runID)
346 if err != nil {
347 t.Fatalf("ListJobsForRun: %v", err)
348 }
349 if len(jobs) != 1 || jobs[0].Status != actionsdb.WorkflowJobStatusQueued {
350 t.Fatalf("site-disabled job should remain queued: %+v", jobs)
351 }
352 job, err := q.GetWorkflowJobByID(ctx, pool, jobs[0].ID)
353 if err != nil {
354 t.Fatalf("GetWorkflowJobByID: %v", err)
355 }
356 if job.RunnerID.Valid {
357 t.Fatalf("site-disabled job should not have a runner: %+v", job)
358 }
359 }
360
361 func TestRunnerHeartbeatDoesNotClaimWhenDraining(t *testing.T) {
362 ctx := context.Background()
363 pool := dbtest.NewTestDB(t)
364 logger := slog.New(slog.NewTextHandler(io.Discard, nil))
365 repoID, userID := setupRunnerAPIRepo(t, pool)
366 runID := enqueueRunnerAPIRun(t, pool, logger, repoID, userID)
367 token, runnerID := registerRunnerForTest(t, pool, []string{"ubuntu-latest", "linux"}, 1)
368 q := actionsdb.New()
369 if _, err := q.SetRunnerDraining(ctx, pool, actionsdb.SetRunnerDrainingParams{
370 ID: runnerID,
371 DrainReason: "maintenance",
372 }); err != nil {
373 t.Fatalf("SetRunnerDraining: %v", err)
374 }
375 router := newRunnerAPIRouter(t, pool, logger, runnerAPISigner(t, time.Now()))
376
377 req := httptest.NewRequest(http.MethodPost, "/api/v1/runners/heartbeat",
378 strings.NewReader(`{"labels":["ubuntu-latest","linux"],"capacity":1,"host_name":"draining-host","version":"dev-test"}`))
379 req.Header.Set("Authorization", "Bearer "+token)
380 rr := httptest.NewRecorder()
381 router.ServeHTTP(rr, req)
382
383 if rr.Code != http.StatusNoContent {
384 t.Fatalf("status: got %d, want 204; body=%s", rr.Code, rr.Body.String())
385 }
386 jobs, err := q.ListJobsForRun(ctx, pool, runID)
387 if err != nil {
388 t.Fatalf("ListJobsForRun: %v", err)
389 }
390 if len(jobs) != 1 || jobs[0].Status != actionsdb.WorkflowJobStatusQueued {
391 t.Fatalf("job was claimed while runner drained: %#v", jobs)
392 }
393 job, err := q.GetWorkflowJobByID(ctx, pool, jobs[0].ID)
394 if err != nil {
395 t.Fatalf("GetWorkflowJobByID: %v", err)
396 }
397 if job.RunnerID.Valid {
398 t.Fatalf("job was assigned to runner while drained: %+v", job)
399 }
400 runnerRow, err := q.GetRunnerByID(ctx, pool, runnerID)
401 if err != nil {
402 t.Fatalf("GetRunnerByID: %v", err)
403 }
404 if !runnerRow.DrainingAt.Valid || runnerRow.HostName != "draining-host" {
405 t.Fatalf("runner drain/metadata not preserved: %+v", runnerRow)
406 }
407 }
408
409 func TestRunnerJobTokenRejectedAfterRunnerRevoked(t *testing.T) {
410 ctx := context.Background()
411 pool := dbtest.NewTestDB(t)
412 logger := slog.New(slog.NewTextHandler(io.Discard, nil))
413 repoID, userID := setupRunnerAPIRepo(t, pool)
414 enqueueRunnerAPIRun(t, pool, logger, repoID, userID)
415 token, runnerID := registerRunnerForTest(t, pool, []string{"ubuntu-latest", "linux"}, 1)
416 router := newRunnerAPIRouter(t, pool, logger, runnerAPISigner(t, time.Now()))
417
418 req := httptest.NewRequest(http.MethodPost, "/api/v1/runners/heartbeat",
419 strings.NewReader(`{"labels":["ubuntu-latest","linux"],"capacity":1}`))
420 req.Header.Set("Authorization", "Bearer "+token)
421 rr := httptest.NewRecorder()
422 router.ServeHTTP(rr, req)
423 if rr.Code != http.StatusOK {
424 t.Fatalf("claim status: got %d, want 200; body=%s", rr.Code, rr.Body.String())
425 }
426 var claim struct {
427 Token string `json:"token"`
428 Job struct {
429 ID int64 `json:"id"`
430 } `json:"job"`
431 }
432 if err := json.Unmarshal(rr.Body.Bytes(), &claim); err != nil {
433 t.Fatalf("decode claim: %v", err)
434 }
435 q := actionsdb.New()
436 if _, err := q.RevokeRunner(ctx, pool, actionsdb.RevokeRunnerParams{
437 ID: runnerID,
438 RevokedReason: "compromised",
439 }); err != nil {
440 t.Fatalf("RevokeRunner: %v", err)
441 }
442 if err := q.RevokeAllTokensForRunner(ctx, pool, runnerID); err != nil {
443 t.Fatalf("RevokeAllTokensForRunner: %v", err)
444 }
445
446 statusReq := httptest.NewRequest(http.MethodPost, fmt.Sprintf("/api/v1/jobs/%d/status", claim.Job.ID),
447 strings.NewReader(`{"status":"running"}`))
448 statusReq.Header.Set("Authorization", "Bearer "+claim.Token)
449 statusRR := httptest.NewRecorder()
450 router.ServeHTTP(statusRR, statusReq)
451 if statusRR.Code != http.StatusUnauthorized {
452 t.Fatalf("status: got %d, want 401; body=%s", statusRR.Code, statusRR.Body.String())
453 }
454 }
455
456 func TestRunnerHeartbeatBypassesGlobalAnonAPILimit(t *testing.T) {
457 pool := dbtest.NewTestDB(t)
458 logger := slog.New(slog.NewTextHandler(io.Discard, nil))
459 token, _ := registerRunnerForTest(t, pool, []string{"ubuntu-latest", "linux"}, 1)
460 signer := runnerAPISigner(t, time.Date(2026, 5, 10, 12, 0, 0, 0, time.UTC))
461
462 h, err := apih.New(apih.Deps{
463 Pool: pool,
464 Logger: logger,
465 BaseURL: "https://shithub.test",
466 RunnerJWT: signer,
467 RateLimiter: ratelimit.New(pool),
468 APILimit: apilimit.Config{
469 AuthedPerHour: 1,
470 AnonPerHour: 1,
471 Logger: logger,
472 },
473 })
474 if err != nil {
475 t.Fatalf("api.New: %v", err)
476 }
477 router := chi.NewRouter()
478 h.Mount(router)
479
480 req := httptest.NewRequest(http.MethodGet, "/api/v1/meta", nil)
481 req.RemoteAddr = "10.0.0.77:12345"
482 rr := httptest.NewRecorder()
483 router.ServeHTTP(rr, req)
484 if rr.Code != http.StatusOK {
485 t.Fatalf("meta status: got %d, want 200; body=%s", rr.Code, rr.Body.String())
486 }
487
488 req = httptest.NewRequest(http.MethodPost, "/api/v1/runners/heartbeat",
489 strings.NewReader(`{"labels":["ubuntu-latest","linux"],"capacity":1}`))
490 req.Header.Set("Authorization", "Bearer "+token)
491 req.RemoteAddr = "10.0.0.77:12346"
492 rr = httptest.NewRecorder()
493 router.ServeHTTP(rr, req)
494
495 if rr.Code != http.StatusNoContent {
496 t.Fatalf("heartbeat status: got %d, want 204; body=%s", rr.Code, rr.Body.String())
497 }
498 if got := rr.Header().Get("X-RateLimit-Limit"); got != "60" {
499 t.Errorf("runner heartbeat limit header: got %q, want 60", got)
500 }
501 }
502
503 func TestRunnerSecretsAreClaimedAndServerScrubsLogs(t *testing.T) {
504 ctx := context.Background()
505 pool := dbtest.NewTestDB(t)
506 logger := slog.New(slog.NewTextHandler(io.Discard, nil))
507 repoID, userID := setupRunnerAPIRepo(t, pool)
508 runID := enqueueRunnerAPIRun(t, pool, logger, repoID, userID)
509 box := testRunnerAPISecretBox(t)
510 if err := (actionsecrets.Deps{Pool: pool, Box: box}).Set(ctx, actionsecrets.RepoScope(repoID), "TOKEN", []byte("hunter2"), userID); err != nil {
511 t.Fatalf("Set secret: %v", err)
512 }
513
514 token, _ := registerRunnerForTest(t, pool, []string{"ubuntu-latest", "linux"}, 1)
515 signer := runnerAPISigner(t, time.Date(2026, 5, 10, 12, 0, 0, 0, time.UTC))
516 router := newRunnerAPIRouterWithSecretBox(t, pool, logger, signer, box)
517
518 req := httptest.NewRequest(http.MethodPost, "/api/v1/runners/heartbeat",
519 strings.NewReader(`{"labels":["ubuntu-latest","linux"],"capacity":1}`))
520 req.Header.Set("Authorization", "Bearer "+token)
521 rr := httptest.NewRecorder()
522 router.ServeHTTP(rr, req)
523 if rr.Code != http.StatusOK {
524 t.Fatalf("heartbeat status: got %d, want 200; body=%s", rr.Code, rr.Body.String())
525 }
526 var claim struct {
527 Token string `json:"token"`
528 Job struct {
529 ID int64 `json:"id"`
530 RunID int64 `json:"run_id"`
531 Secrets map[string]string `json:"secrets"`
532 MaskValues []string `json:"mask_values"`
533 } `json:"job"`
534 }
535 if err := json.Unmarshal(rr.Body.Bytes(), &claim); err != nil {
536 t.Fatalf("decode claim: %v", err)
537 }
538 if claim.Job.RunID != runID || claim.Job.Secrets["TOKEN"] != "hunter2" || !containsString(claim.Job.MaskValues, "hunter2") {
539 t.Fatalf("claim did not include masked secret context: %+v", claim.Job)
540 }
541 if _, err := actionsdb.New().GetWorkflowJobSecretMask(ctx, pool, claim.Job.ID); err != nil {
542 t.Fatalf("GetWorkflowJobSecretMask: %v", err)
543 }
544 if err := (actionsecrets.Deps{Pool: pool, Box: box}).Set(ctx, actionsecrets.RepoScope(repoID), "TOKEN", []byte("rotated"), userID); err != nil {
545 t.Fatalf("rotate secret after claim: %v", err)
546 }
547
548 rawLog := []byte("before hunter2 after\n")
549 logBody := fmt.Sprintf(`{"seq":0,"chunk":%q}`, base64.StdEncoding.EncodeToString(rawLog))
550 req = httptest.NewRequest(http.MethodPost, fmt.Sprintf("/api/v1/jobs/%d/logs", claim.Job.ID), strings.NewReader(logBody))
551 req.Header.Set("Authorization", "Bearer "+claim.Token)
552 rr = httptest.NewRecorder()
553 router.ServeHTTP(rr, req)
554 if rr.Code != http.StatusAccepted {
555 t.Fatalf("logs status: got %d, want 202; body=%s", rr.Code, rr.Body.String())
556 }
557 step, err := actionsdb.New().GetFirstStepForJob(ctx, pool, claim.Job.ID)
558 if err != nil {
559 t.Fatalf("GetFirstStepForJob: %v", err)
560 }
561 chunks, err := actionsdb.New().ListStepLogChunks(ctx, pool, actionsdb.ListStepLogChunksParams{
562 StepID: step.ID,
563 Seq: -1,
564 Limit: 10,
565 })
566 if err != nil {
567 t.Fatalf("ListStepLogChunks: %v", err)
568 }
569 if len(chunks) != 1 {
570 t.Fatalf("chunks: %#v", chunks)
571 }
572 got := string(chunks[0].Chunk)
573 if strings.Contains(got, "hunter2") || got != "before *** after\n" {
574 t.Fatalf("stored log chunk was not scrubbed: %q", got)
575 }
576 }
577
578 func TestRunnerHeartbeatDoesNotClaimApprovalPendingRun(t *testing.T) {
579 ctx := context.Background()
580 pool := dbtest.NewTestDB(t)
581 logger := slog.New(slog.NewTextHandler(io.Discard, nil))
582 repoID, userID := setupRunnerAPIRepo(t, pool)
583 runID := enqueueRunnerAPIRun(t, pool, logger, repoID, userID)
584 q := actionsdb.New()
585 if _, err := pool.Exec(ctx, `UPDATE workflow_runs SET need_approval = true WHERE id = $1`, runID); err != nil {
586 t.Fatalf("mark run approval pending: %v", err)
587 }
588 if _, err := q.InsertWorkflowRunApproval(ctx, pool, actionsdb.InsertWorkflowRunApprovalParams{
589 RunID: runID,
590 RequestedReason: "test approval",
591 }); err != nil {
592 t.Fatalf("InsertWorkflowRunApproval: %v", err)
593 }
594
595 token, _ := registerRunnerForTest(t, pool, []string{"ubuntu-latest", "linux"}, 1)
596 router := newRunnerAPIRouter(t, pool, logger, runnerAPISigner(t, time.Now()))
597
598 req := httptest.NewRequest(http.MethodPost, "/api/v1/runners/heartbeat",
599 strings.NewReader(`{"labels":["ubuntu-latest","linux"],"capacity":1}`))
600 req.Header.Set("Authorization", "Bearer "+token)
601 rr := httptest.NewRecorder()
602 router.ServeHTTP(rr, req)
603 if rr.Code != http.StatusNoContent {
604 t.Fatalf("pending heartbeat status: got %d, want 204; body=%s", rr.Code, rr.Body.String())
605 }
606 jobs, err := q.ListJobsForRun(ctx, pool, runID)
607 if err != nil {
608 t.Fatalf("ListJobsForRun: %v", err)
609 }
610 if len(jobs) != 1 || jobs[0].Status != actionsdb.WorkflowJobStatusQueued {
611 t.Fatalf("approval-pending job changed: %+v", jobs)
612 }
613
614 if _, err := actionslifecycle.ApproveRun(ctx, actionslifecycle.Deps{Pool: pool, Logger: logger}, runID, userID); err != nil {
615 t.Fatalf("ApproveRun: %v", err)
616 }
617 req = httptest.NewRequest(http.MethodPost, "/api/v1/runners/heartbeat",
618 strings.NewReader(`{"labels":["ubuntu-latest","linux"],"capacity":1}`))
619 req.Header.Set("Authorization", "Bearer "+token)
620 rr = httptest.NewRecorder()
621 router.ServeHTTP(rr, req)
622 if rr.Code != http.StatusOK {
623 t.Fatalf("approved heartbeat status: got %d, want 200; body=%s", rr.Code, rr.Body.String())
624 }
625 }
626
627 func TestRunnerDoesNotInjectSecretsIntoPullRequestRuns(t *testing.T) {
628 ctx := context.Background()
629 pool := dbtest.NewTestDB(t)
630 logger := slog.New(slog.NewTextHandler(io.Discard, nil))
631 repoID, userID := setupRunnerAPIRepo(t, pool)
632 runID := enqueueRunnerAPIEventRun(t, pool, logger, repoID, userID, trigger.EventPullRequest, map[string]any{"action": "opened"})
633 box := testRunnerAPISecretBox(t)
634 if err := (actionsecrets.Deps{Pool: pool, Box: box}).Set(ctx, actionsecrets.RepoScope(repoID), "TOKEN", []byte("hunter2"), userID); err != nil {
635 t.Fatalf("Set secret: %v", err)
636 }
637
638 token, _ := registerRunnerForTest(t, pool, []string{"ubuntu-latest", "linux"}, 1)
639 router := newRunnerAPIRouterWithSecretBox(t, pool, logger, runnerAPISigner(t, time.Now()), box)
640 req := httptest.NewRequest(http.MethodPost, "/api/v1/runners/heartbeat",
641 strings.NewReader(`{"labels":["ubuntu-latest","linux"],"capacity":1}`))
642 req.Header.Set("Authorization", "Bearer "+token)
643 rr := httptest.NewRecorder()
644 router.ServeHTTP(rr, req)
645 if rr.Code != http.StatusOK {
646 t.Fatalf("heartbeat status: got %d, want 200; body=%s", rr.Code, rr.Body.String())
647 }
648 var claim struct {
649 Job struct {
650 RunID int64 `json:"run_id"`
651 Secrets map[string]string `json:"secrets"`
652 MaskValues []string `json:"mask_values"`
653 } `json:"job"`
654 }
655 if err := json.Unmarshal(rr.Body.Bytes(), &claim); err != nil {
656 t.Fatalf("decode claim: %v", err)
657 }
658 if claim.Job.RunID != runID {
659 t.Fatalf("claimed wrong run: %+v", claim.Job)
660 }
661 if len(claim.Job.Secrets) != 0 || len(claim.Job.MaskValues) != 0 {
662 t.Fatalf("pull_request run received secrets: %+v", claim.Job)
663 }
664 }
665
666 func TestRunnerServerScrubsSecretSplitAcrossLogPosts(t *testing.T) {
667 ctx := context.Background()
668 pool := dbtest.NewTestDB(t)
669 logger := slog.New(slog.NewTextHandler(io.Discard, nil))
670 repoID, userID := setupRunnerAPIRepo(t, pool)
671 enqueueRunnerAPIRun(t, pool, logger, repoID, userID)
672 box := testRunnerAPISecretBox(t)
673 if err := (actionsecrets.Deps{Pool: pool, Box: box}).Set(ctx, actionsecrets.RepoScope(repoID), "TOKEN", []byte("hunter2"), userID); err != nil {
674 t.Fatalf("Set secret: %v", err)
675 }
676
677 token, _ := registerRunnerForTest(t, pool, []string{"ubuntu-latest", "linux"}, 1)
678 signer := runnerAPISigner(t, time.Date(2026, 5, 10, 12, 0, 0, 0, time.UTC))
679 router := newRunnerAPIRouterWithSecretBox(t, pool, logger, signer, box)
680
681 req := httptest.NewRequest(http.MethodPost, "/api/v1/runners/heartbeat",
682 strings.NewReader(`{"labels":["ubuntu-latest","linux"],"capacity":1}`))
683 req.Header.Set("Authorization", "Bearer "+token)
684 rr := httptest.NewRecorder()
685 router.ServeHTTP(rr, req)
686 if rr.Code != http.StatusOK {
687 t.Fatalf("heartbeat status: got %d, want 200; body=%s", rr.Code, rr.Body.String())
688 }
689 var claim struct {
690 Token string `json:"token"`
691 Job struct {
692 ID int64 `json:"id"`
693 } `json:"job"`
694 }
695 if err := json.Unmarshal(rr.Body.Bytes(), &claim); err != nil {
696 t.Fatalf("decode claim: %v", err)
697 }
698
699 next := postRunnerLogChunk(t, router, claim.Job.ID, claim.Token, 0, []byte("before hun"))
700 next = postRunnerLogChunk(t, router, claim.Job.ID, next, 1, []byte("ter2 after\n"))
701
702 step, err := actionsdb.New().GetFirstStepForJob(ctx, pool, claim.Job.ID)
703 if err != nil {
704 t.Fatalf("GetFirstStepForJob: %v", err)
705 }
706 chunks, err := actionsdb.New().ListStepLogChunks(ctx, pool, actionsdb.ListStepLogChunksParams{
707 StepID: step.ID,
708 Seq: -1,
709 Limit: 10,
710 })
711 if err != nil {
712 t.Fatalf("ListStepLogChunks: %v", err)
713 }
714 var combined strings.Builder
715 for _, chunk := range chunks {
716 combined.Write(chunk.Chunk)
717 }
718 got := combined.String()
719 if strings.Contains(got, "hunter2") || got != "before *** after\n" {
720 t.Fatalf("stored log chunks were not scrubbed across boundary: chunks=%#v combined=%q next=%q", chunks, got, next)
721 }
722 }
723
724 func TestRunnerHeartbeatRejectsBadToken(t *testing.T) {
725 pool := dbtest.NewTestDB(t)
726 router := newRunnerAPIRouter(t, pool, slog.New(slog.NewTextHandler(io.Discard, nil)), runnerAPISigner(t, time.Now()))
727 req := httptest.NewRequest(http.MethodPost, "/api/v1/runners/heartbeat", bytes.NewReader([]byte(`{}`)))
728 req.Header.Set("Authorization", "Bearer not-hex")
729 rr := httptest.NewRecorder()
730 router.ServeHTTP(rr, req)
731 if rr.Code != http.StatusUnauthorized {
732 t.Fatalf("status: got %d, want 401; body=%s", rr.Code, rr.Body.String())
733 }
734 }
735
736 func TestRunnerArtifactUploadReturnsSignedURL(t *testing.T) {
737 ctx := context.Background()
738 pool := dbtest.NewTestDB(t)
739 logger := slog.New(slog.NewTextHandler(io.Discard, nil))
740 repoID, userID := setupRunnerAPIRepo(t, pool)
741 enqueueRunnerAPIRun(t, pool, logger, repoID, userID)
742 token, _ := registerRunnerForTest(t, pool, []string{"ubuntu-latest"}, 1)
743 router := newRunnerAPIRouter(t, pool, logger, runnerAPISigner(t, time.Now()), storage.NewMemoryStore())
744
745 req := httptest.NewRequest(http.MethodPost, "/api/v1/runners/heartbeat",
746 strings.NewReader(`{"labels":["ubuntu-latest"],"capacity":1}`))
747 req.Header.Set("Authorization", "Bearer "+token)
748 rr := httptest.NewRecorder()
749 router.ServeHTTP(rr, req)
750 if rr.Code != http.StatusOK {
751 t.Fatalf("heartbeat status: got %d, want 200; body=%s", rr.Code, rr.Body.String())
752 }
753 var claim struct {
754 Token string `json:"token"`
755 Job struct {
756 ID int64 `json:"id"`
757 RunID int64 `json:"run_id"`
758 } `json:"job"`
759 }
760 if err := json.Unmarshal(rr.Body.Bytes(), &claim); err != nil {
761 t.Fatalf("decode claim: %v", err)
762 }
763
764 req = httptest.NewRequest(http.MethodPost, fmt.Sprintf("/api/v1/jobs/%d/artifacts/upload", claim.Job.ID),
765 strings.NewReader(`{"name":"test-results.tgz","size_bytes":123}`))
766 req.Header.Set("Authorization", "Bearer "+claim.Token)
767 rr = httptest.NewRecorder()
768 router.ServeHTTP(rr, req)
769 if rr.Code != http.StatusCreated {
770 t.Fatalf("artifact status: got %d, want 201; body=%s", rr.Code, rr.Body.String())
771 }
772 var upload struct {
773 ArtifactID int64 `json:"artifact_id"`
774 UploadURL string `json:"upload_url"`
775 NextToken string `json:"next_token"`
776 }
777 if err := json.Unmarshal(rr.Body.Bytes(), &upload); err != nil {
778 t.Fatalf("decode upload: %v", err)
779 }
780 if upload.ArtifactID == 0 || upload.NextToken == "" ||
781 !strings.HasPrefix(upload.UploadURL, "mem://actions/runs/") {
782 t.Fatalf("unexpected upload response: %+v", upload)
783 }
784 artifacts, err := actionsdb.New().ListArtifactsForRun(ctx, pool, claim.Job.RunID)
785 if err != nil {
786 t.Fatalf("ListArtifactsForRun: %v", err)
787 }
788 if len(artifacts) != 1 || artifacts[0].Name != "test-results.tgz" || artifacts[0].ByteCount != 123 {
789 t.Fatalf("unexpected artifacts: %+v", artifacts)
790 }
791 }
792
793 func TestRunnerStepStatusEnqueuesFinalizeWorker(t *testing.T) {
794 ctx := context.Background()
795 pool := dbtest.NewTestDB(t)
796 logger := slog.New(slog.NewTextHandler(io.Discard, nil))
797 repoID, userID := setupRunnerAPIRepo(t, pool)
798 enqueueRunnerAPIRun(t, pool, logger, repoID, userID)
799 token, _ := registerRunnerForTest(t, pool, []string{"ubuntu-latest"}, 1)
800 router := newRunnerAPIRouter(t, pool, logger, runnerAPISigner(t, time.Now()), storage.NewMemoryStore())
801
802 req := httptest.NewRequest(http.MethodPost, "/api/v1/runners/heartbeat",
803 strings.NewReader(`{"labels":["ubuntu-latest"],"capacity":1}`))
804 req.Header.Set("Authorization", "Bearer "+token)
805 rr := httptest.NewRecorder()
806 router.ServeHTTP(rr, req)
807 if rr.Code != http.StatusOK {
808 t.Fatalf("heartbeat status: got %d, want 200; body=%s", rr.Code, rr.Body.String())
809 }
810 var claim struct {
811 Token string `json:"token"`
812 Job struct {
813 ID int64 `json:"id"`
814 Steps []struct {
815 ID int64 `json:"id"`
816 Run string `json:"run"`
817 } `json:"steps"`
818 } `json:"job"`
819 }
820 if err := json.Unmarshal(rr.Body.Bytes(), &claim); err != nil {
821 t.Fatalf("decode claim: %v", err)
822 }
823 if len(claim.Job.Steps) < 2 {
824 t.Fatalf("claim steps: %+v", claim.Job.Steps)
825 }
826 stepID := claim.Job.Steps[1].ID
827
828 req = httptest.NewRequest(http.MethodPost, fmt.Sprintf("/api/v1/jobs/%d/steps/%d/status", claim.Job.ID, stepID),
829 strings.NewReader(`{"status":"completed","conclusion":"success"}`))
830 req.Header.Set("Authorization", "Bearer "+claim.Token)
831 rr = httptest.NewRecorder()
832 router.ServeHTTP(rr, req)
833 if rr.Code != http.StatusOK {
834 t.Fatalf("step status: got %d, want 200; body=%s", rr.Code, rr.Body.String())
835 }
836 var statusResp struct {
837 Status string `json:"status"`
838 Conclusion string `json:"conclusion"`
839 NextToken string `json:"next_token"`
840 }
841 if err := json.Unmarshal(rr.Body.Bytes(), &statusResp); err != nil {
842 t.Fatalf("decode status response: %v", err)
843 }
844 if statusResp.Status != "completed" || statusResp.Conclusion != "success" || statusResp.NextToken == "" {
845 t.Fatalf("unexpected step status response: %+v", statusResp)
846 }
847 step, err := actionsdb.New().GetWorkflowStepByID(ctx, pool, stepID)
848 if err != nil {
849 t.Fatalf("GetWorkflowStepByID: %v", err)
850 }
851 if step.Status != actionsdb.WorkflowStepStatusCompleted ||
852 !step.Conclusion.Valid || step.Conclusion.CheckConclusion != actionsdb.CheckConclusionSuccess {
853 t.Fatalf("step was not completed: %+v", step)
854 }
855 job, err := workerdb.New().ClaimJob(ctx, pool, workerdb.ClaimJobParams{
856 Kind: string(finalize.KindWorkflowFinalizeStep),
857 LockedBy: pgtype.Text{String: "test", Valid: true},
858 })
859 if err != nil {
860 t.Fatalf("ClaimJob finalize: %v", err)
861 }
862 var payload finalize.Payload
863 if err := json.Unmarshal(job.Payload, &payload); err != nil {
864 t.Fatalf("decode worker payload: %v", err)
865 }
866 if payload.StepID != stepID {
867 t.Fatalf("worker payload: %+v, want step_id %d", payload, stepID)
868 }
869 }
870
871 func TestRunnerStepStatusRecordsTimeoutMetricOnce(t *testing.T) {
872 pool := dbtest.NewTestDB(t)
873 logger := slog.New(slog.NewTextHandler(io.Discard, nil))
874 repoID, userID := setupRunnerAPIRepo(t, pool)
875 enqueueRunnerAPIRun(t, pool, logger, repoID, userID)
876 token, _ := registerRunnerForTest(t, pool, []string{"ubuntu-latest"}, 1)
877 router := newRunnerAPIRouter(t, pool, logger, runnerAPISigner(t, time.Now()))
878
879 req := httptest.NewRequest(http.MethodPost, "/api/v1/runners/heartbeat",
880 strings.NewReader(`{"labels":["ubuntu-latest"],"capacity":1}`))
881 req.Header.Set("Authorization", "Bearer "+token)
882 rr := httptest.NewRecorder()
883 router.ServeHTTP(rr, req)
884 if rr.Code != http.StatusOK {
885 t.Fatalf("heartbeat status: got %d, want 200; body=%s", rr.Code, rr.Body.String())
886 }
887 var claim struct {
888 Token string `json:"token"`
889 Job struct {
890 ID int64 `json:"id"`
891 Steps []struct {
892 ID int64 `json:"id"`
893 } `json:"steps"`
894 } `json:"job"`
895 }
896 if err := json.Unmarshal(rr.Body.Bytes(), &claim); err != nil {
897 t.Fatalf("decode claim: %v", err)
898 }
899 if len(claim.Job.Steps) == 0 {
900 t.Fatalf("claim steps: %+v", claim.Job.Steps)
901 }
902 stepID := claim.Job.Steps[0].ID
903 before := actionsStepTimeoutsValue(t)
904
905 req = httptest.NewRequest(http.MethodPost, fmt.Sprintf("/api/v1/jobs/%d/steps/%d/status", claim.Job.ID, stepID),
906 strings.NewReader(`{"status":"completed","conclusion":"timed_out"}`))
907 req.Header.Set("Authorization", "Bearer "+claim.Token)
908 rr = httptest.NewRecorder()
909 router.ServeHTTP(rr, req)
910 if rr.Code != http.StatusOK {
911 t.Fatalf("step status: got %d, want 200; body=%s", rr.Code, rr.Body.String())
912 }
913 var statusResp struct {
914 NextToken string `json:"next_token"`
915 }
916 if err := json.Unmarshal(rr.Body.Bytes(), &statusResp); err != nil {
917 t.Fatalf("decode status response: %v", err)
918 }
919 if statusResp.NextToken == "" {
920 t.Fatalf("missing next token: %s", rr.Body.String())
921 }
922 if got := actionsStepTimeoutsValue(t); got != before+1 {
923 t.Fatalf("timeout metric after first report: got %v, want %v", got, before+1)
924 }
925
926 req = httptest.NewRequest(http.MethodPost, fmt.Sprintf("/api/v1/jobs/%d/steps/%d/status", claim.Job.ID, stepID),
927 strings.NewReader(`{"status":"completed","conclusion":"timed_out"}`))
928 req.Header.Set("Authorization", "Bearer "+statusResp.NextToken)
929 rr = httptest.NewRecorder()
930 router.ServeHTTP(rr, req)
931 if rr.Code != http.StatusOK {
932 t.Fatalf("duplicate step status: got %d, want 200; body=%s", rr.Code, rr.Body.String())
933 }
934 if got := actionsStepTimeoutsValue(t); got != before+1 {
935 t.Fatalf("timeout metric after duplicate report: got %v, want still %v", got, before+1)
936 }
937 }
938
939 func TestWorkflowJobCancelAPIRequestsCancellation(t *testing.T) {
940 ctx := context.Background()
941 pool := dbtest.NewTestDB(t)
942 logger := slog.New(slog.NewTextHandler(io.Discard, nil))
943 repoID, userID := setupRunnerAPIRepo(t, pool)
944 runID := enqueueRunnerAPIRun(t, pool, logger, repoID, userID)
945 jobs, err := actionsdb.New().ListJobsForRun(ctx, pool, runID)
946 if err != nil {
947 t.Fatalf("ListJobsForRun: %v", err)
948 }
949 if len(jobs) != 1 {
950 t.Fatalf("jobs: %+v", jobs)
951 }
952 rawPAT := mintRunnerAPIPAT(t, pool, userID, string(pat.ScopeRepoWrite))
953 router := newRunnerAPIRouter(t, pool, logger, runnerAPISigner(t, time.Now()))
954
955 req := httptest.NewRequest(http.MethodPost, fmt.Sprintf("/api/v1/jobs/%d/cancel", jobs[0].ID), nil)
956 req.Header.Set("Authorization", "Bearer "+rawPAT)
957 rr := httptest.NewRecorder()
958 router.ServeHTTP(rr, req)
959 if rr.Code != http.StatusAccepted {
960 t.Fatalf("cancel status: got %d, want 202; body=%s", rr.Code, rr.Body.String())
961 }
962 var body struct {
963 ChangedJobs int `json:"changed_jobs"`
964 RunCompleted bool `json:"run_completed"`
965 }
966 if err := json.Unmarshal(rr.Body.Bytes(), &body); err != nil {
967 t.Fatalf("decode response: %v", err)
968 }
969 if body.ChangedJobs != 1 || !body.RunCompleted {
970 t.Fatalf("response: %+v", body)
971 }
972 job, err := actionsdb.New().GetWorkflowJobByID(ctx, pool, jobs[0].ID)
973 if err != nil {
974 t.Fatalf("GetWorkflowJobByID: %v", err)
975 }
976 if job.Status != actionsdb.WorkflowJobStatusCancelled || !job.CancelRequested ||
977 !job.Conclusion.Valid || job.Conclusion.CheckConclusion != actionsdb.CheckConclusionCancelled {
978 t.Fatalf("job: %+v", job)
979 }
980 run, err := actionsdb.New().GetWorkflowRunByID(ctx, pool, runID)
981 if err != nil {
982 t.Fatalf("GetWorkflowRunByID: %v", err)
983 }
984 if run.Status != actionsdb.WorkflowRunStatusCompleted ||
985 !run.Conclusion.Valid || run.Conclusion.CheckConclusion != actionsdb.CheckConclusionCancelled {
986 t.Fatalf("run: %+v", run)
987 }
988 }
989
990 func TestWorkflowRunRerunAPIQueuesOriginalCommitWorkflow(t *testing.T) {
991 ctx := context.Background()
992 pool := dbtest.NewTestDB(t)
993 logger := slog.New(slog.NewTextHandler(io.Discard, nil))
994 repoID, userID := setupRunnerAPIRepo(t, pool)
995 rfs, err := storage.NewRepoFS(t.TempDir())
996 if err != nil {
997 t.Fatalf("NewRepoFS: %v", err)
998 }
999 gitDir, err := rfs.RepoPath("alice", "demo")
1000 if err != nil {
1001 t.Fatalf("RepoPath: %v", err)
1002 }
1003 if err := rfs.InitBare(ctx, gitDir); err != nil {
1004 t.Fatalf("InitBare: %v", err)
1005 }
1006 oldSHA := commitRunnerAPIWorkflow(t, gitDir, runnerAPIOldWorkflow, time.Date(2026, 5, 11, 12, 0, 0, 0, time.UTC))
1007 wf := parseRunnerAPIWorkflow(t, runnerAPIOldWorkflow)
1008 original, err := trigger.Enqueue(ctx, trigger.Deps{Pool: pool, Logger: logger}, trigger.EnqueueParams{
1009 RepoID: repoID,
1010 WorkflowFile: ".shithub/workflows/ci.yml",
1011 HeadSHA: oldSHA,
1012 HeadRef: "refs/heads/trunk",
1013 EventKind: trigger.EventPush,
1014 EventPayload: map[string]any{"ref": "refs/heads/trunk"},
1015 ActorUserID: userID,
1016 TriggerEventID: "push:api-rerun",
1017 Workflow: wf,
1018 })
1019 if err != nil {
1020 t.Fatalf("trigger.Enqueue original: %v", err)
1021 }
1022 if _, err := actionsdb.New().CompleteWorkflowRun(ctx, pool, actionsdb.CompleteWorkflowRunParams{
1023 ID: original.RunID,
1024 Conclusion: actionsdb.CheckConclusionFailure,
1025 }); err != nil {
1026 t.Fatalf("CompleteWorkflowRun: %v", err)
1027 }
1028 _ = commitRunnerAPIWorkflow(t, gitDir, runnerAPINewWorkflow, time.Date(2026, 5, 11, 12, 5, 0, 0, time.UTC))
1029
1030 rawPAT := mintRunnerAPIPAT(t, pool, userID, string(pat.ScopeRepoWrite))
1031 router := newRunnerAPIRouterWithRepoFS(t, pool, logger, runnerAPISigner(t, time.Now()), rfs)
1032 req := httptest.NewRequest(http.MethodPost, fmt.Sprintf("/api/v1/runs/%d/rerun", original.RunID), nil)
1033 req.Header.Set("Authorization", "Bearer "+rawPAT)
1034 rr := httptest.NewRecorder()
1035 router.ServeHTTP(rr, req)
1036 if rr.Code != http.StatusCreated {
1037 t.Fatalf("rerun status: got %d, want 201; body=%s", rr.Code, rr.Body.String())
1038 }
1039 var body struct {
1040 RunID int64 `json:"run_id"`
1041 RunIndex int64 `json:"run_index"`
1042 ParentRunID int64 `json:"parent_run_id"`
1043 }
1044 if err := json.Unmarshal(rr.Body.Bytes(), &body); err != nil {
1045 t.Fatalf("decode response: %v", err)
1046 }
1047 if body.RunID == 0 || body.RunID == original.RunID || body.ParentRunID != original.RunID {
1048 t.Fatalf("response: %+v original=%+v", body, original)
1049 }
1050 rerun, err := actionsdb.New().GetWorkflowRunByID(ctx, pool, body.RunID)
1051 if err != nil {
1052 t.Fatalf("GetWorkflowRunByID rerun: %v", err)
1053 }
1054 if rerun.HeadSha != oldSHA || !rerun.ParentRunID.Valid || rerun.ParentRunID.Int64 != original.RunID {
1055 t.Fatalf("rerun row: %+v oldSHA=%s", rerun, oldSHA)
1056 }
1057 jobs, err := actionsdb.New().ListJobsForRun(ctx, pool, body.RunID)
1058 if err != nil {
1059 t.Fatalf("ListJobsForRun: %v", err)
1060 }
1061 if len(jobs) != 1 || jobs[0].JobKey != "old_job" {
1062 t.Fatalf("rerun jobs came from wrong workflow: %+v", jobs)
1063 }
1064 }
1065
1066 func postRunnerLogChunk(t *testing.T, router http.Handler, jobID int64, token string, seq int32, chunk []byte) string {
1067 t.Helper()
1068 body := fmt.Sprintf(`{"seq":%d,"chunk":%q}`, seq, base64.StdEncoding.EncodeToString(chunk))
1069 req := httptest.NewRequest(http.MethodPost, fmt.Sprintf("/api/v1/jobs/%d/logs", jobID), strings.NewReader(body))
1070 req.Header.Set("Authorization", "Bearer "+token)
1071 rr := httptest.NewRecorder()
1072 router.ServeHTTP(rr, req)
1073 if rr.Code != http.StatusAccepted {
1074 t.Fatalf("logs status: got %d, want 202; body=%s", rr.Code, rr.Body.String())
1075 }
1076 var resp struct {
1077 NextToken string `json:"next_token"`
1078 }
1079 if err := json.Unmarshal(rr.Body.Bytes(), &resp); err != nil {
1080 t.Fatalf("decode log response: %v", err)
1081 }
1082 if resp.NextToken == "" {
1083 t.Fatalf("empty next token in log response: %s", rr.Body.String())
1084 }
1085 return resp.NextToken
1086 }
1087
1088 func newRunnerAPIRouter(
1089 t *testing.T,
1090 pool *pgxpool.Pool,
1091 logger *slog.Logger,
1092 signer *runnerjwt.Signer,
1093 stores ...storage.ObjectStore,
1094 ) http.Handler {
1095 t.Helper()
1096 var store storage.ObjectStore
1097 if len(stores) > 0 {
1098 store = stores[0]
1099 }
1100 h, err := apih.New(apih.Deps{
1101 Pool: pool,
1102 Logger: logger,
1103 BaseURL: "https://shithub.test",
1104 RunnerJWT: signer,
1105 ObjectStore: store,
1106 })
1107 if err != nil {
1108 t.Fatalf("api.New: %v", err)
1109 }
1110 r := chi.NewRouter()
1111 h.Mount(r)
1112 return r
1113 }
1114
1115 func newRunnerAPIRouterWithRepoFS(
1116 t *testing.T,
1117 pool *pgxpool.Pool,
1118 logger *slog.Logger,
1119 signer *runnerjwt.Signer,
1120 rfs *storage.RepoFS,
1121 ) http.Handler {
1122 t.Helper()
1123 h, err := apih.New(apih.Deps{
1124 Pool: pool,
1125 Logger: logger,
1126 BaseURL: "https://shithub.test",
1127 RunnerJWT: signer,
1128 RepoFS: rfs,
1129 })
1130 if err != nil {
1131 t.Fatalf("api.New: %v", err)
1132 }
1133 r := chi.NewRouter()
1134 h.Mount(r)
1135 return r
1136 }
1137
1138 func newRunnerAPIRouterWithSecretBox(
1139 t *testing.T,
1140 pool *pgxpool.Pool,
1141 logger *slog.Logger,
1142 signer *runnerjwt.Signer,
1143 box *secretbox.Box,
1144 ) http.Handler {
1145 t.Helper()
1146 h, err := apih.New(apih.Deps{
1147 Pool: pool,
1148 Logger: logger,
1149 BaseURL: "https://shithub.test",
1150 RunnerJWT: signer,
1151 SecretBox: box,
1152 })
1153 if err != nil {
1154 t.Fatalf("api.New: %v", err)
1155 }
1156 r := chi.NewRouter()
1157 h.Mount(r)
1158 return r
1159 }
1160
1161 func actionsStepTimeoutsValue(t *testing.T) float64 {
1162 t.Helper()
1163 var metric dto.Metric
1164 if err := metrics.ActionsStepTimeoutsTotal.Write(&metric); err != nil {
1165 t.Fatalf("read timeout metric: %v", err)
1166 }
1167 if metric.Counter == nil {
1168 return 0
1169 }
1170 return metric.Counter.GetValue()
1171 }
1172
1173 const runnerAPIOldWorkflow = `name: CI
1174 on: push
1175 jobs:
1176 old_job:
1177 name: Old job
1178 runs-on: ubuntu-latest
1179 steps:
1180 - run: echo old
1181 `
1182
1183 const runnerAPINewWorkflow = `name: CI
1184 on: push
1185 jobs:
1186 new_job:
1187 name: New job
1188 runs-on: ubuntu-latest
1189 steps:
1190 - run: echo new
1191 `
1192
1193 func commitRunnerAPIWorkflow(t *testing.T, gitDir, body string, when time.Time) string {
1194 t.Helper()
1195 commit, err := (repogit.InitialCommit{
1196 GitDir: gitDir,
1197 AuthorName: "Alice",
1198 AuthorEmail: "alice@example.test",
1199 Branch: "trunk",
1200 Message: "Update workflow",
1201 When: when,
1202 Files: []repogit.FileEntry{
1203 {Path: ".shithub/workflows/ci.yml", Body: []byte(body)},
1204 },
1205 }).Build(context.Background())
1206 if err != nil {
1207 t.Fatalf("InitialCommit.Build: %v", err)
1208 }
1209 return commit
1210 }
1211
1212 func parseRunnerAPIWorkflow(t *testing.T, body string) *workflow.Workflow {
1213 t.Helper()
1214 wf, diags, err := workflow.Parse([]byte(body))
1215 if err != nil {
1216 t.Fatalf("workflow.Parse: %v", err)
1217 }
1218 for _, d := range diags {
1219 if d.Severity == workflow.Error {
1220 t.Fatalf("workflow diagnostic: %v", d)
1221 }
1222 }
1223 return wf
1224 }
1225
1226 func testRunnerAPISecretBox(t *testing.T) *secretbox.Box {
1227 t.Helper()
1228 key, err := secretbox.GenerateKey()
1229 if err != nil {
1230 t.Fatalf("GenerateKey: %v", err)
1231 }
1232 box, err := secretbox.FromBytes(key)
1233 if err != nil {
1234 t.Fatalf("secretbox.FromBytes: %v", err)
1235 }
1236 return box
1237 }
1238
1239 func setupRunnerAPIRepo(t *testing.T, pool *pgxpool.Pool) (repoID, userID int64) {
1240 t.Helper()
1241 ctx := context.Background()
1242 user, err := usersdb.New().CreateUser(ctx, pool, usersdb.CreateUserParams{
1243 Username: "alice",
1244 DisplayName: "Alice",
1245 PasswordHash: runnerAPIFixtureHash,
1246 })
1247 if err != nil {
1248 t.Fatalf("CreateUser: %v", err)
1249 }
1250 repo, err := reposdb.New().CreateRepo(ctx, pool, reposdb.CreateRepoParams{
1251 OwnerUserID: pgtype.Int8{Int64: user.ID, Valid: true},
1252 Name: "demo",
1253 DefaultBranch: "trunk",
1254 Visibility: reposdb.RepoVisibilityPublic,
1255 })
1256 if err != nil {
1257 t.Fatalf("CreateRepo: %v", err)
1258 }
1259 return repo.ID, user.ID
1260 }
1261
1262 func enqueueRunnerAPIRun(t *testing.T, pool *pgxpool.Pool, logger *slog.Logger, repoID, userID int64) int64 {
1263 t.Helper()
1264 return enqueueRunnerAPIRunWithTriggerID(t, pool, logger, repoID, userID, "runner-api-test:push")
1265 }
1266
1267 func enqueueRunnerAPIRunWithTriggerID(t *testing.T, pool *pgxpool.Pool, logger *slog.Logger, repoID, userID int64, triggerID string) int64 {
1268 t.Helper()
1269 return enqueueRunnerAPIEventRunWithTriggerID(t, pool, logger, repoID, userID, trigger.EventPush, map[string]any{"ref": "refs/heads/trunk"}, triggerID)
1270 }
1271
1272 func enqueueRunnerAPIEventRun(t *testing.T, pool *pgxpool.Pool, logger *slog.Logger, repoID, userID int64, event trigger.EventKind, payload map[string]any) int64 {
1273 t.Helper()
1274 return enqueueRunnerAPIEventRunWithTriggerID(t, pool, logger, repoID, userID, event, payload, "runner-api-test:"+string(event))
1275 }
1276
1277 func enqueueRunnerAPIEventRunWithTriggerID(t *testing.T, pool *pgxpool.Pool, logger *slog.Logger, repoID, userID int64, event trigger.EventKind, payload map[string]any, triggerID string) int64 {
1278 t.Helper()
1279 wf, diags, err := workflow.Parse([]byte(`name: ci
1280 on: push
1281 jobs:
1282 build:
1283 runs-on: ubuntu-latest
1284 steps:
1285 - uses: actions/checkout@v4
1286 - run: go test ./...
1287 `))
1288 if err != nil {
1289 t.Fatalf("workflow.Parse: %v", err)
1290 }
1291 for _, d := range diags {
1292 if d.Severity == workflow.Error {
1293 t.Fatalf("workflow diagnostic: %v", d)
1294 }
1295 }
1296 res, err := trigger.Enqueue(context.Background(), trigger.Deps{Pool: pool, Logger: logger}, trigger.EnqueueParams{
1297 RepoID: repoID,
1298 WorkflowFile: ".shithub/workflows/ci.yml",
1299 HeadSHA: strings.Repeat("a", 40),
1300 HeadRef: "refs/heads/trunk",
1301 EventKind: event,
1302 EventPayload: payload,
1303 ActorUserID: userID,
1304 TriggerEventID: triggerID,
1305 Workflow: wf,
1306 })
1307 if err != nil {
1308 t.Fatalf("trigger.Enqueue: %v", err)
1309 }
1310 return res.RunID
1311 }
1312
1313 type runnerHeartbeatClaim struct {
1314 Token string `json:"token"`
1315 Job struct {
1316 ID int64 `json:"id"`
1317 RunID int64 `json:"run_id"`
1318 } `json:"job"`
1319 }
1320
1321 func claimRunnerHeartbeat(t *testing.T, router http.Handler, token string, capacity int) runnerHeartbeatClaim {
1322 t.Helper()
1323 body := fmt.Sprintf(`{"labels":["ubuntu-latest","linux"],"capacity":%d}`, capacity)
1324 req := httptest.NewRequest(http.MethodPost, "/api/v1/runners/heartbeat", strings.NewReader(body))
1325 req.Header.Set("Authorization", "Bearer "+token)
1326 rr := httptest.NewRecorder()
1327 router.ServeHTTP(rr, req)
1328 if rr.Code != http.StatusOK {
1329 t.Fatalf("heartbeat status: got %d, want 200; body=%s", rr.Code, rr.Body.String())
1330 }
1331 var claim runnerHeartbeatClaim
1332 if err := json.Unmarshal(rr.Body.Bytes(), &claim); err != nil {
1333 t.Fatalf("decode heartbeat claim: %v", err)
1334 }
1335 if claim.Token == "" || claim.Job.ID == 0 || claim.Job.RunID == 0 {
1336 t.Fatalf("incomplete heartbeat claim: %+v", claim)
1337 }
1338 return claim
1339 }
1340
1341 func registerRunnerForTest(t *testing.T, pool *pgxpool.Pool, labels []string, capacity int32) (token string, runnerID int64) {
1342 t.Helper()
1343 token, tokenHash, err := runnertoken.New()
1344 if err != nil {
1345 t.Fatalf("runnertoken.New: %v", err)
1346 }
1347 q := actionsdb.New()
1348 runner, err := q.InsertRunner(context.Background(), pool, actionsdb.InsertRunnerParams{
1349 Name: "runner1",
1350 Labels: labels,
1351 Capacity: capacity,
1352 })
1353 if err != nil {
1354 t.Fatalf("InsertRunner: %v", err)
1355 }
1356 if _, err := q.InsertRunnerToken(context.Background(), pool, actionsdb.InsertRunnerTokenParams{
1357 RunnerID: runner.ID,
1358 TokenHash: tokenHash,
1359 }); err != nil {
1360 t.Fatalf("InsertRunnerToken: %v", err)
1361 }
1362 return token, runner.ID
1363 }
1364
1365 func mintRunnerAPIPAT(t *testing.T, pool *pgxpool.Pool, userID int64, scopes ...string) string {
1366 t.Helper()
1367 raw, hash, prefix, err := pat.Mint()
1368 if err != nil {
1369 t.Fatalf("pat.Mint: %v", err)
1370 }
1371 if _, err := usersdb.New().InsertUserToken(context.Background(), pool, usersdb.InsertUserTokenParams{
1372 UserID: userID,
1373 Name: "api test",
1374 TokenHash: hash,
1375 TokenPrefix: prefix,
1376 Scopes: scopes,
1377 }); err != nil {
1378 t.Fatalf("InsertUserToken: %v", err)
1379 }
1380 return raw
1381 }
1382
1383 func containsString(items []string, want string) bool {
1384 for _, item := range items {
1385 if item == want {
1386 return true
1387 }
1388 }
1389 return false
1390 }
1391
1392 func runnerAPISigner(t *testing.T, now time.Time) *runnerjwt.Signer {
1393 t.Helper()
1394 signer, err := runnerjwt.NewFromKey(
1395 bytes.Repeat([]byte{0x7c}, 32),
1396 runnerjwt.WithClock(func() time.Time { return now }),
1397 )
1398 if err != nil {
1399 t.Fatalf("runnerjwt.NewFromKey: %v", err)
1400 }
1401 return signer
1402 }
1403