tenseleyflow/shithub / 85b6823

Browse files

actions: make site disable a hard runner gate

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
85b68237ff92ce0aad81b01cc5c24ee72fa996e6
Parents
0514c27
Tree
4405700

6 changed files

StatusFile+-
M internal/actions/policy/policy_test.go 24 0
M internal/actions/queries/actions_policy.sql 1 0
M internal/actions/queries/workflow_jobs.sql 1 0
M internal/actions/sqlc/actions_policy.sql.go 1 0
M internal/actions/sqlc/workflow_jobs.sql.go 1 0
M internal/web/handlers/api/runners_test.go 202 2
internal/actions/policy/policy_test.gomodified
@@ -213,6 +213,30 @@ func TestEvaluateTrigger_DeniesArchivedAndDisabledRepos(t *testing.T) {
213213
 	}
214214
 }
215215
 
216
+func TestEvaluateTrigger_SiteDisableOverridesRepoEnable(t *testing.T) {
217
+	t.Parallel()
218
+	f := setupPolicyFx(t)
219
+	q := actionsdb.New()
220
+	if _, err := q.UpsertActionsRepoPolicy(f.ctx, f.pool, actionsdb.UpsertActionsRepoPolicyParams{
221
+		RepoID:         f.repo.ID,
222
+		ActionsEnabled: actionsdb.ActionsPolicyStateEnabled,
223
+	}); err != nil {
224
+		t.Fatalf("UpsertActionsRepoPolicy: %v", err)
225
+	}
226
+	if _, err := f.pool.Exec(f.ctx, `UPDATE actions_site_policy SET actions_enabled = false WHERE id = true`); err != nil {
227
+		t.Fatalf("disable site policy: %v", err)
228
+	}
229
+
230
+	dec, err := actionspolicy.EvaluateTrigger(f.ctx, actionspolicy.Deps{Pool: f.pool}, actionspolicy.TriggerRequest{
231
+		Repo:        f.repo,
232
+		EventKind:   string(trigger.EventPush),
233
+		ActorUserID: f.owner.ID,
234
+	})
235
+	if !errors.Is(err, actionspolicy.ErrActionsDisabled) || dec.Allow || dec.Policy.ActionsEnabled {
236
+		t.Fatalf("site-disabled decision=%+v err=%v", dec, err)
237
+	}
238
+}
239
+
216240
 func TestEvaluateTrigger_EnforcesQueueAndActorCaps(t *testing.T) {
217241
 	t.Parallel()
218242
 	f := setupPolicyFx(t)
internal/actions/queries/actions_policy.sqlmodified
@@ -7,6 +7,7 @@ SELECT
77
     COALESCE(op.actions_enabled, 'inherit'::actions_policy_state)::actions_policy_state AS org_actions_enabled,
88
     COALESCE(rp.actions_enabled, 'inherit'::actions_policy_state)::actions_policy_state AS repo_actions_enabled,
99
     CASE
10
+        WHEN COALESCE(sp.actions_enabled, true) = false THEN false
1011
         WHEN COALESCE(rp.actions_enabled, 'inherit'::actions_policy_state) = 'enabled' THEN true
1112
         WHEN COALESCE(rp.actions_enabled, 'inherit'::actions_policy_state) = 'disabled' THEN false
1213
         WHEN COALESCE(op.actions_enabled, 'inherit'::actions_policy_state) = 'enabled' THEN true
internal/actions/queries/workflow_jobs.sqlmodified
@@ -113,6 +113,7 @@ WITH candidate AS (
113113
       AND j.cancel_requested = false
114114
       AND j.runner_id IS NULL
115115
       AND CASE
116
+          WHEN COALESCE(sp.actions_enabled, true) = false THEN false
116117
           WHEN COALESCE(rp.actions_enabled, 'inherit'::actions_policy_state) = 'enabled' THEN true
117118
           WHEN COALESCE(rp.actions_enabled, 'inherit'::actions_policy_state) = 'disabled' THEN false
118119
           WHEN COALESCE(op.actions_enabled, 'inherit'::actions_policy_state) = 'enabled' THEN true
internal/actions/sqlc/actions_policy.sql.gomodified
@@ -111,6 +111,7 @@ SELECT
111111
     COALESCE(op.actions_enabled, 'inherit'::actions_policy_state)::actions_policy_state AS org_actions_enabled,
112112
     COALESCE(rp.actions_enabled, 'inherit'::actions_policy_state)::actions_policy_state AS repo_actions_enabled,
113113
     CASE
114
+        WHEN COALESCE(sp.actions_enabled, true) = false THEN false
114115
         WHEN COALESCE(rp.actions_enabled, 'inherit'::actions_policy_state) = 'enabled' THEN true
115116
         WHEN COALESCE(rp.actions_enabled, 'inherit'::actions_policy_state) = 'disabled' THEN false
116117
         WHEN COALESCE(op.actions_enabled, 'inherit'::actions_policy_state) = 'enabled' THEN true
internal/actions/sqlc/workflow_jobs.sql.gomodified
@@ -26,6 +26,7 @@ WITH candidate AS (
2626
       AND j.cancel_requested = false
2727
       AND j.runner_id IS NULL
2828
       AND CASE
29
+          WHEN COALESCE(sp.actions_enabled, true) = false THEN false
2930
           WHEN COALESCE(rp.actions_enabled, 'inherit'::actions_policy_state) = 'enabled' THEN true
3031
           WHEN COALESCE(rp.actions_enabled, 'inherit'::actions_policy_state) = 'disabled' THEN false
3132
           WHEN COALESCE(op.actions_enabled, 'inherit'::actions_policy_state) = 'enabled' THEN true
internal/web/handlers/api/runners_test.gomodified
@@ -196,6 +196,168 @@ func TestRunnerHeartbeatClaimsQueuedJob(t *testing.T) {
196196
 	}
197197
 }
198198
 
199
+func TestRunnerHeartbeatRespectsRepoConcurrencyCap(t *testing.T) {
200
+	ctx := context.Background()
201
+	pool := dbtest.NewTestDB(t)
202
+	logger := slog.New(slog.NewTextHandler(io.Discard, nil))
203
+	repoID, userID := setupRunnerAPIRepo(t, pool)
204
+	q := actionsdb.New()
205
+	if _, err := q.UpsertActionsRepoPolicy(ctx, pool, actionsdb.UpsertActionsRepoPolicyParams{
206
+		RepoID:                repoID,
207
+		ActionsEnabled:        actionsdb.ActionsPolicyStateInherit,
208
+		MaxRepoConcurrentJobs: pgtype.Int4{Int32: 1, Valid: true},
209
+	}); err != nil {
210
+		t.Fatalf("UpsertActionsRepoPolicy: %v", err)
211
+	}
212
+
213
+	firstRunID := enqueueRunnerAPIRunWithTriggerID(t, pool, logger, repoID, userID, "repo-cap-1")
214
+	token, _ := registerRunnerForTest(t, pool, []string{"ubuntu-latest", "linux"}, 2)
215
+	router := newRunnerAPIRouter(t, pool, logger, runnerAPISigner(t, time.Now()))
216
+	firstClaim := claimRunnerHeartbeat(t, router, token, 2)
217
+	if firstClaim.Job.RunID != firstRunID {
218
+		t.Fatalf("first claim run_id=%d want %d", firstClaim.Job.RunID, firstRunID)
219
+	}
220
+
221
+	secondRunID := enqueueRunnerAPIRunWithTriggerID(t, pool, logger, repoID, userID, "repo-cap-2")
222
+	req := httptest.NewRequest(http.MethodPost, "/api/v1/runners/heartbeat",
223
+		strings.NewReader(`{"labels":["ubuntu-latest","linux"],"capacity":2}`))
224
+	req.Header.Set("Authorization", "Bearer "+token)
225
+	rr := httptest.NewRecorder()
226
+	router.ServeHTTP(rr, req)
227
+	if rr.Code != http.StatusNoContent {
228
+		t.Fatalf("capped heartbeat status: got %d, want 204; body=%s", rr.Code, rr.Body.String())
229
+	}
230
+	jobs, err := q.ListJobsForRun(ctx, pool, secondRunID)
231
+	if err != nil {
232
+		t.Fatalf("ListJobsForRun: %v", err)
233
+	}
234
+	if len(jobs) != 1 || jobs[0].Status != actionsdb.WorkflowJobStatusQueued {
235
+		t.Fatalf("second run job should remain queued/unclaimed: %+v", jobs)
236
+	}
237
+	secondJob, err := q.GetWorkflowJobByID(ctx, pool, jobs[0].ID)
238
+	if err != nil {
239
+		t.Fatalf("GetWorkflowJobByID: %v", err)
240
+	}
241
+	if secondJob.RunnerID.Valid {
242
+		t.Fatalf("second run job should not have a runner: %+v", secondJob)
243
+	}
244
+
245
+	if _, err := q.UpdateWorkflowJobStatus(ctx, pool, actionsdb.UpdateWorkflowJobStatusParams{
246
+		ID:         firstClaim.Job.ID,
247
+		Status:     actionsdb.WorkflowJobStatusCompleted,
248
+		Conclusion: actionsdb.NullCheckConclusion{CheckConclusion: actionsdb.CheckConclusionSuccess, Valid: true},
249
+		CompletedAt: pgtype.Timestamptz{
250
+			Time:  time.Now(),
251
+			Valid: true,
252
+		},
253
+	}); err != nil {
254
+		t.Fatalf("UpdateWorkflowJobStatus: %v", err)
255
+	}
256
+	secondClaim := claimRunnerHeartbeat(t, router, token, 2)
257
+	if secondClaim.Job.RunID != secondRunID {
258
+		t.Fatalf("second claim run_id=%d want %d", secondClaim.Job.RunID, secondRunID)
259
+	}
260
+}
261
+
262
+func TestRunnerHeartbeatRespectsOwnerConcurrencyCap(t *testing.T) {
263
+	ctx := context.Background()
264
+	pool := dbtest.NewTestDB(t)
265
+	logger := slog.New(slog.NewTextHandler(io.Discard, nil))
266
+	firstRepoID, userID := setupRunnerAPIRepo(t, pool)
267
+	secondRepo, err := reposdb.New().CreateRepo(ctx, pool, reposdb.CreateRepoParams{
268
+		OwnerUserID:   pgtype.Int8{Int64: userID, Valid: true},
269
+		Name:          "second",
270
+		DefaultBranch: "trunk",
271
+		Visibility:    reposdb.RepoVisibilityPublic,
272
+	})
273
+	if err != nil {
274
+		t.Fatalf("CreateRepo second: %v", err)
275
+	}
276
+	q := actionsdb.New()
277
+	if _, err := q.UpsertActionsRepoPolicy(ctx, pool, actionsdb.UpsertActionsRepoPolicyParams{
278
+		RepoID:                 secondRepo.ID,
279
+		ActionsEnabled:         actionsdb.ActionsPolicyStateInherit,
280
+		MaxOwnerConcurrentJobs: pgtype.Int4{Int32: 1, Valid: true},
281
+	}); err != nil {
282
+		t.Fatalf("UpsertActionsRepoPolicy: %v", err)
283
+	}
284
+
285
+	firstRunID := enqueueRunnerAPIRunWithTriggerID(t, pool, logger, firstRepoID, userID, "owner-cap-1")
286
+	token, _ := registerRunnerForTest(t, pool, []string{"ubuntu-latest", "linux"}, 2)
287
+	router := newRunnerAPIRouter(t, pool, logger, runnerAPISigner(t, time.Now()))
288
+	firstClaim := claimRunnerHeartbeat(t, router, token, 2)
289
+	if firstClaim.Job.RunID != firstRunID {
290
+		t.Fatalf("first claim run_id=%d want %d", firstClaim.Job.RunID, firstRunID)
291
+	}
292
+
293
+	secondRunID := enqueueRunnerAPIRunWithTriggerID(t, pool, logger, secondRepo.ID, userID, "owner-cap-2")
294
+	req := httptest.NewRequest(http.MethodPost, "/api/v1/runners/heartbeat",
295
+		strings.NewReader(`{"labels":["ubuntu-latest","linux"],"capacity":2}`))
296
+	req.Header.Set("Authorization", "Bearer "+token)
297
+	rr := httptest.NewRecorder()
298
+	router.ServeHTTP(rr, req)
299
+	if rr.Code != http.StatusNoContent {
300
+		t.Fatalf("owner-capped heartbeat status: got %d, want 204; body=%s", rr.Code, rr.Body.String())
301
+	}
302
+	jobs, err := q.ListJobsForRun(ctx, pool, secondRunID)
303
+	if err != nil {
304
+		t.Fatalf("ListJobsForRun: %v", err)
305
+	}
306
+	if len(jobs) != 1 || jobs[0].Status != actionsdb.WorkflowJobStatusQueued {
307
+		t.Fatalf("second owner job should remain queued/unclaimed: %+v", jobs)
308
+	}
309
+	secondJob, err := q.GetWorkflowJobByID(ctx, pool, jobs[0].ID)
310
+	if err != nil {
311
+		t.Fatalf("GetWorkflowJobByID: %v", err)
312
+	}
313
+	if secondJob.RunnerID.Valid {
314
+		t.Fatalf("second owner job should not have a runner: %+v", secondJob)
315
+	}
316
+}
317
+
318
+func TestRunnerHeartbeatSiteDisableOverridesRepoEnable(t *testing.T) {
319
+	ctx := context.Background()
320
+	pool := dbtest.NewTestDB(t)
321
+	logger := slog.New(slog.NewTextHandler(io.Discard, nil))
322
+	repoID, userID := setupRunnerAPIRepo(t, pool)
323
+	q := actionsdb.New()
324
+	if _, err := q.UpsertActionsRepoPolicy(ctx, pool, actionsdb.UpsertActionsRepoPolicyParams{
325
+		RepoID:         repoID,
326
+		ActionsEnabled: actionsdb.ActionsPolicyStateEnabled,
327
+	}); err != nil {
328
+		t.Fatalf("UpsertActionsRepoPolicy: %v", err)
329
+	}
330
+	runID := enqueueRunnerAPIRunWithTriggerID(t, pool, logger, repoID, userID, "site-disable-claim")
331
+	if _, err := pool.Exec(ctx, `UPDATE actions_site_policy SET actions_enabled = false WHERE id = true`); err != nil {
332
+		t.Fatalf("disable site policy: %v", err)
333
+	}
334
+
335
+	token, _ := registerRunnerForTest(t, pool, []string{"ubuntu-latest", "linux"}, 1)
336
+	router := newRunnerAPIRouter(t, pool, logger, runnerAPISigner(t, time.Now()))
337
+	req := httptest.NewRequest(http.MethodPost, "/api/v1/runners/heartbeat",
338
+		strings.NewReader(`{"labels":["ubuntu-latest","linux"],"capacity":1}`))
339
+	req.Header.Set("Authorization", "Bearer "+token)
340
+	rr := httptest.NewRecorder()
341
+	router.ServeHTTP(rr, req)
342
+	if rr.Code != http.StatusNoContent {
343
+		t.Fatalf("site-disabled heartbeat status: got %d, want 204; body=%s", rr.Code, rr.Body.String())
344
+	}
345
+	jobs, err := q.ListJobsForRun(ctx, pool, runID)
346
+	if err != nil {
347
+		t.Fatalf("ListJobsForRun: %v", err)
348
+	}
349
+	if len(jobs) != 1 || jobs[0].Status != actionsdb.WorkflowJobStatusQueued {
350
+		t.Fatalf("site-disabled job should remain queued: %+v", jobs)
351
+	}
352
+	job, err := q.GetWorkflowJobByID(ctx, pool, jobs[0].ID)
353
+	if err != nil {
354
+		t.Fatalf("GetWorkflowJobByID: %v", err)
355
+	}
356
+	if job.RunnerID.Valid {
357
+		t.Fatalf("site-disabled job should not have a runner: %+v", job)
358
+	}
359
+}
360
+
199361
 func TestRunnerHeartbeatDoesNotClaimWhenDraining(t *testing.T) {
200362
 	ctx := context.Background()
201363
 	pool := dbtest.NewTestDB(t)
@@ -1099,10 +1261,20 @@ func setupRunnerAPIRepo(t *testing.T, pool *pgxpool.Pool) (repoID, userID int64)
10991261
 
11001262
 func enqueueRunnerAPIRun(t *testing.T, pool *pgxpool.Pool, logger *slog.Logger, repoID, userID int64) int64 {
11011263
 	t.Helper()
1102
-	return enqueueRunnerAPIEventRun(t, pool, logger, repoID, userID, trigger.EventPush, map[string]any{"ref": "refs/heads/trunk"})
1264
+	return enqueueRunnerAPIRunWithTriggerID(t, pool, logger, repoID, userID, "runner-api-test:push")
1265
+}
1266
+
1267
+func enqueueRunnerAPIRunWithTriggerID(t *testing.T, pool *pgxpool.Pool, logger *slog.Logger, repoID, userID int64, triggerID string) int64 {
1268
+	t.Helper()
1269
+	return enqueueRunnerAPIEventRunWithTriggerID(t, pool, logger, repoID, userID, trigger.EventPush, map[string]any{"ref": "refs/heads/trunk"}, triggerID)
11031270
 }
11041271
 
11051272
 func enqueueRunnerAPIEventRun(t *testing.T, pool *pgxpool.Pool, logger *slog.Logger, repoID, userID int64, event trigger.EventKind, payload map[string]any) int64 {
1273
+	t.Helper()
1274
+	return enqueueRunnerAPIEventRunWithTriggerID(t, pool, logger, repoID, userID, event, payload, "runner-api-test:"+string(event))
1275
+}
1276
+
1277
+func enqueueRunnerAPIEventRunWithTriggerID(t *testing.T, pool *pgxpool.Pool, logger *slog.Logger, repoID, userID int64, event trigger.EventKind, payload map[string]any, triggerID string) int64 {
11061278
 	t.Helper()
11071279
 	wf, diags, err := workflow.Parse([]byte(`name: ci
11081280
 on: push
@@ -1129,7 +1301,7 @@ jobs:
11291301
 		EventKind:      event,
11301302
 		EventPayload:   payload,
11311303
 		ActorUserID:    userID,
1132
-		TriggerEventID: "runner-api-test:" + string(event),
1304
+		TriggerEventID: triggerID,
11331305
 		Workflow:       wf,
11341306
 	})
11351307
 	if err != nil {
@@ -1138,6 +1310,34 @@ jobs:
11381310
 	return res.RunID
11391311
 }
11401312
 
1313
+type runnerHeartbeatClaim struct {
1314
+	Token string `json:"token"`
1315
+	Job   struct {
1316
+		ID    int64 `json:"id"`
1317
+		RunID int64 `json:"run_id"`
1318
+	} `json:"job"`
1319
+}
1320
+
1321
+func claimRunnerHeartbeat(t *testing.T, router http.Handler, token string, capacity int) runnerHeartbeatClaim {
1322
+	t.Helper()
1323
+	body := fmt.Sprintf(`{"labels":["ubuntu-latest","linux"],"capacity":%d}`, capacity)
1324
+	req := httptest.NewRequest(http.MethodPost, "/api/v1/runners/heartbeat", strings.NewReader(body))
1325
+	req.Header.Set("Authorization", "Bearer "+token)
1326
+	rr := httptest.NewRecorder()
1327
+	router.ServeHTTP(rr, req)
1328
+	if rr.Code != http.StatusOK {
1329
+		t.Fatalf("heartbeat status: got %d, want 200; body=%s", rr.Code, rr.Body.String())
1330
+	}
1331
+	var claim runnerHeartbeatClaim
1332
+	if err := json.Unmarshal(rr.Body.Bytes(), &claim); err != nil {
1333
+		t.Fatalf("decode heartbeat claim: %v", err)
1334
+	}
1335
+	if claim.Token == "" || claim.Job.ID == 0 || claim.Job.RunID == 0 {
1336
+		t.Fatalf("incomplete heartbeat claim: %+v", claim)
1337
+	}
1338
+	return claim
1339
+}
1340
+
11411341
 func registerRunnerForTest(t *testing.T, pool *pgxpool.Pool, labels []string, capacity int32) (token string, runnerID int64) {
11421342
 	t.Helper()
11431343
 	token, tokenHash, err := runnertoken.New()