tenseleyflow/shithub / 44f779f

Browse files

actions/admin: cancel active runs in bulk

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
44f779fe536a2e49f56fce1a288a3e2a601bd0d0
Parents
f1e40ac
Tree
3d2beab

5 changed files

StatusFile+-
M cmd/shithubd/admin_actions.go 116 0
M internal/actions/lifecycle/cancel_test.go 70 3
M internal/actions/queries/workflow_runs.sql 12 0
M internal/actions/sqlc/querier.go 1 0
M internal/actions/sqlc/workflow_runs.sql.go 62 0
cmd/shithubd/admin_actions.gomodified
@@ -3,13 +3,20 @@
33
 package main
44
 
55
 import (
6
+	"context"
67
 	"encoding/json"
8
+	"errors"
79
 	"fmt"
810
 	"os"
11
+	"time"
912
 
1013
 	"github.com/spf13/cobra"
1114
 
15
+	actionslifecycle "github.com/tenseleyFlow/shithub/internal/actions/lifecycle"
16
+	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
1217
 	"github.com/tenseleyFlow/shithub/internal/actions/workflow"
18
+	"github.com/tenseleyFlow/shithub/internal/infra/config"
19
+	"github.com/tenseleyFlow/shithub/internal/infra/db"
1320
 )
1421
 
1522
 // adminActionsCmd is the parent group for actions-related operator
@@ -80,7 +87,116 @@ Exit code 0 = clean parse, 2 = Error-severity diagnostics produced,
8087
 	},
8188
 }
8289
 
90
+func newAdminActionsCancelAllCmd() *cobra.Command {
91
+	var repoID int64
92
+	var limit int
93
+	var dryRun bool
94
+	var confirm bool
95
+
96
+	cmd := &cobra.Command{
97
+		Use:   "cancel-all",
98
+		Short: "Request cancellation for active Actions workflow runs",
99
+		Long: `Requests cancellation for queued/running Actions workflow runs.
100
+
101
+By default this scans all repositories, oldest first. Use --repo-id to scope
102
+the operation to one repository. Running jobs receive cancel_requested=true and
103
+are killed by their runner's cancel-check loop; queued jobs become terminal
104
+immediately.
105
+
106
+This is an operator break-glass command. Run with --dry-run first, then repeat
107
+with --confirm to mutate state.`,
108
+		Args: cobra.NoArgs,
109
+		RunE: func(cmd *cobra.Command, _ []string) error {
110
+			if repoID < 0 {
111
+				return errors.New("admin actions cancel-all: --repo-id must be zero or positive")
112
+			}
113
+			if limit < 1 || limit > 5000 {
114
+				return errors.New("admin actions cancel-all: --limit must be between 1 and 5000")
115
+			}
116
+			if !dryRun && !confirm {
117
+				return errors.New("admin actions cancel-all: refusing to mutate without --confirm; use --dry-run to inspect")
118
+			}
119
+
120
+			cfg, err := config.Load(nil)
121
+			if err != nil {
122
+				return err
123
+			}
124
+			if cfg.DB.URL == "" {
125
+				return errors.New("admin actions cancel-all: DB not configured (set SHITHUB_DATABASE_URL)")
126
+			}
127
+			ctx, cancel := context.WithTimeout(cmd.Context(), 5*time.Minute)
128
+			defer cancel()
129
+
130
+			pool, err := db.Open(ctx, db.Config{
131
+				URL:            cfg.DB.URL,
132
+				MaxConns:       2,
133
+				MinConns:       0,
134
+				ConnectTimeout: cfg.DB.ConnectTimeout,
135
+			})
136
+			if err != nil {
137
+				return fmt.Errorf("admin actions cancel-all: db open: %w", err)
138
+			}
139
+			defer pool.Close()
140
+
141
+			q := actionsdb.New()
142
+			runs, err := q.ListActiveWorkflowRunsForAdmin(ctx, pool, actionsdb.ListActiveWorkflowRunsForAdminParams{
143
+				RepoID:     repoID,
144
+				LimitCount: int32(limit),
145
+			})
146
+			if err != nil {
147
+				return fmt.Errorf("admin actions cancel-all: list active runs: %w", err)
148
+			}
149
+
150
+			out := cmd.OutOrStdout()
151
+			scope := "all repositories"
152
+			if repoID != 0 {
153
+				scope = fmt.Sprintf("repo_id=%d", repoID)
154
+			}
155
+			if dryRun {
156
+				for _, run := range runs {
157
+					_, _ = fmt.Fprintf(out,
158
+						"would cancel: run_id=%d repo_id=%d run_index=%d status=%s workflow=%s ref=%s sha=%s\n",
159
+						run.ID, run.RepoID, run.RunIndex, run.Status, run.WorkflowFile, run.HeadRef, run.HeadSha)
160
+				}
161
+				_, _ = fmt.Fprintf(out, "cancel-all dry-run: found %d active run(s) in %s (limit=%d)\n",
162
+					len(runs), scope, limit)
163
+				return nil
164
+			}
165
+
166
+			cancelledRuns := 0
167
+			cancelledJobs := 0
168
+			completedRuns := 0
169
+			for _, run := range runs {
170
+				result, err := actionslifecycle.CancelRun(ctx, actionslifecycle.Deps{Pool: pool}, run.ID, actionslifecycle.CancelReasonUser)
171
+				if err != nil {
172
+					return fmt.Errorf("admin actions cancel-all: cancel run %d: %w", run.ID, err)
173
+				}
174
+				if len(result.ChangedJobs) > 0 {
175
+					cancelledRuns++
176
+				}
177
+				cancelledJobs += len(result.ChangedJobs)
178
+				if result.RunCompleted {
179
+					completedRuns++
180
+				}
181
+				_, _ = fmt.Fprintf(out,
182
+					"cancelled: run_id=%d repo_id=%d run_index=%d changed_jobs=%d run_completed=%t\n",
183
+					run.ID, run.RepoID, run.RunIndex, len(result.ChangedJobs), result.RunCompleted)
184
+			}
185
+			_, _ = fmt.Fprintf(out,
186
+				"cancel-all: scanned %d active run(s) in %s; changed_runs=%d changed_jobs=%d completed_runs=%d\n",
187
+				len(runs), scope, cancelledRuns, cancelledJobs, completedRuns)
188
+			return nil
189
+		},
190
+	}
191
+	cmd.Flags().Int64Var(&repoID, "repo-id", 0, "Repository id to scope cancellation to; 0 scans all repositories")
192
+	cmd.Flags().IntVar(&limit, "limit", 500, "Maximum active runs to scan")
193
+	cmd.Flags().BoolVar(&dryRun, "dry-run", false, "Print matching active runs without mutating state")
194
+	cmd.Flags().BoolVar(&confirm, "confirm", false, "Confirm cancellation of matching active runs")
195
+	return cmd
196
+}
197
+
83198
 func init() {
84199
 	adminActionsCmd.AddCommand(adminActionsParseCmd)
200
+	adminActionsCmd.AddCommand(newAdminActionsCancelAllCmd())
85201
 	adminCmd.AddCommand(adminActionsCmd)
86202
 }
internal/actions/lifecycle/cancel_test.gomodified
@@ -146,12 +146,67 @@ func TestCancelJobRequestsRunningJobWithoutTerminalOverwrite(t *testing.T) {
146146
 	}
147147
 }
148148
 
149
+func TestListActiveWorkflowRunsForAdminFiltersActiveRuns(t *testing.T) {
150
+	ctx := context.Background()
151
+	pool := dbtest.NewTestDB(t)
152
+	repoID, userID := setupLifecycleRepo(t, pool)
153
+	q := actionsdb.New()
154
+
155
+	queued := insertLifecycleRun(t, pool, repoID, userID, 1)
156
+	running := insertLifecycleRun(t, pool, repoID, userID, 2)
157
+	running, err := q.StartWorkflowRun(ctx, pool, running.ID)
158
+	if err != nil {
159
+		t.Fatalf("StartWorkflowRun: %v", err)
160
+	}
161
+	completed := insertLifecycleRun(t, pool, repoID, userID, 3)
162
+	if _, err := q.CompleteWorkflowRun(ctx, pool, actionsdb.CompleteWorkflowRunParams{
163
+		ID:         completed.ID,
164
+		Conclusion: actionsdb.CheckConclusionSuccess,
165
+	}); err != nil {
166
+		t.Fatalf("CompleteWorkflowRun: %v", err)
167
+	}
168
+	otherRepoID, otherUserID := setupNamedLifecycleRepo(t, pool, "bob", "other")
169
+	otherRepoRun := insertLifecycleRun(t, pool, otherRepoID, otherUserID, 1)
170
+
171
+	all, err := q.ListActiveWorkflowRunsForAdmin(ctx, pool, actionsdb.ListActiveWorkflowRunsForAdminParams{
172
+		RepoID:     0,
173
+		LimitCount: 10,
174
+	})
175
+	if err != nil {
176
+		t.Fatalf("ListActiveWorkflowRunsForAdmin all: %v", err)
177
+	}
178
+	assertRunIDs(t, all, queued.ID, running.ID, otherRepoRun.ID)
179
+
180
+	repoOnly, err := q.ListActiveWorkflowRunsForAdmin(ctx, pool, actionsdb.ListActiveWorkflowRunsForAdminParams{
181
+		RepoID:     repoID,
182
+		LimitCount: 10,
183
+	})
184
+	if err != nil {
185
+		t.Fatalf("ListActiveWorkflowRunsForAdmin repo: %v", err)
186
+	}
187
+	assertRunIDs(t, repoOnly, queued.ID, running.ID)
188
+
189
+	limited, err := q.ListActiveWorkflowRunsForAdmin(ctx, pool, actionsdb.ListActiveWorkflowRunsForAdminParams{
190
+		RepoID:     0,
191
+		LimitCount: 1,
192
+	})
193
+	if err != nil {
194
+		t.Fatalf("ListActiveWorkflowRunsForAdmin limited: %v", err)
195
+	}
196
+	assertRunIDs(t, limited, queued.ID)
197
+}
198
+
149199
 func setupLifecycleRepo(t *testing.T, db actionsdb.DBTX) (repoID, userID int64) {
200
+	t.Helper()
201
+	return setupNamedLifecycleRepo(t, db, "alice", "demo")
202
+}
203
+
204
+func setupNamedLifecycleRepo(t *testing.T, db actionsdb.DBTX, username, repoName string) (repoID, userID int64) {
150205
 	t.Helper()
151206
 	ctx := context.Background()
152207
 	user, err := usersdb.New().CreateUser(ctx, db, usersdb.CreateUserParams{
153
-		Username:     "alice",
154
-		DisplayName:  "Alice",
208
+		Username:     username,
209
+		DisplayName:  username,
155210
 		PasswordHash: fixtureHash,
156211
 	})
157212
 	if err != nil {
@@ -159,7 +214,7 @@ func setupLifecycleRepo(t *testing.T, db actionsdb.DBTX) (repoID, userID int64)
159214
 	}
160215
 	repo, err := reposdb.New().CreateRepo(ctx, db, reposdb.CreateRepoParams{
161216
 		OwnerUserID:   pgtype.Int8{Int64: user.ID, Valid: true},
162
-		Name:          "demo",
217
+		Name:          repoName,
163218
 		DefaultBranch: "trunk",
164219
 		Visibility:    reposdb.RepoVisibilityPublic,
165220
 	})
@@ -169,6 +224,18 @@ func setupLifecycleRepo(t *testing.T, db actionsdb.DBTX) (repoID, userID int64)
169224
 	return repo.ID, user.ID
170225
 }
171226
 
227
+func assertRunIDs(t *testing.T, runs []actionsdb.WorkflowRun, want ...int64) {
228
+	t.Helper()
229
+	if len(runs) != len(want) {
230
+		t.Fatalf("got %d runs, want %d: %+v", len(runs), len(want), runs)
231
+	}
232
+	for i := range want {
233
+		if runs[i].ID != want[i] {
234
+			t.Fatalf("run[%d] id=%d, want %d; runs=%+v", i, runs[i].ID, want[i], runs)
235
+		}
236
+	}
237
+}
238
+
172239
 func insertLifecycleRun(t *testing.T, db actionsdb.DBTX, repoID, userID, runIndex int64) actionsdb.WorkflowRun {
173240
 	t.Helper()
174241
 	run, err := actionsdb.New().InsertWorkflowRun(context.Background(), db, actionsdb.InsertWorkflowRunParams{
internal/actions/queries/workflow_runs.sqlmodified
@@ -176,6 +176,18 @@ WHERE r.repo_id = sqlc.arg(repo_id)::bigint
176176
   AND (sqlc.narg(conclusion)::check_conclusion IS NULL OR r.conclusion = sqlc.narg(conclusion)::check_conclusion)
177177
   AND (sqlc.narg(actor_username)::text IS NULL OR u.username = sqlc.narg(actor_username)::citext);
178178
 
179
+-- name: ListActiveWorkflowRunsForAdmin :many
180
+SELECT id, repo_id, run_index, workflow_file, workflow_name,
181
+       head_sha, head_ref, event, event_payload,
182
+       actor_user_id, parent_run_id, concurrency_group,
183
+       status, conclusion, pinned, need_approval, approved_by_user_id,
184
+       started_at, completed_at, version, created_at, updated_at, trigger_event_id
185
+FROM workflow_runs
186
+WHERE status IN ('queued', 'running')
187
+  AND (sqlc.arg(repo_id)::bigint = 0 OR repo_id = sqlc.arg(repo_id)::bigint)
188
+ORDER BY created_at ASC, id ASC
189
+LIMIT sqlc.arg(limit_count)::int;
190
+
179191
 -- name: ListWorkflowRunWorkflowsForRepo :many
180192
 WITH ranked AS (
181193
     SELECT workflow_file,
internal/actions/sqlc/querier.gomodified
@@ -65,6 +65,7 @@ type Querier interface {
6565
 	InsertWorkflowRun(ctx context.Context, db DBTX, arg InsertWorkflowRunParams) (WorkflowRun, error)
6666
 	// SPDX-License-Identifier: AGPL-3.0-or-later
6767
 	InsertWorkflowStep(ctx context.Context, db DBTX, arg InsertWorkflowStepParams) (WorkflowStep, error)
68
+	ListActiveWorkflowRunsForAdmin(ctx context.Context, db DBTX, arg ListActiveWorkflowRunsForAdminParams) ([]WorkflowRun, error)
6869
 	ListAllStepLogChunksForStep(ctx context.Context, db DBTX, stepID int64) ([]WorkflowStepLogChunk, error)
6970
 	ListArtifactsForRun(ctx context.Context, db DBTX, runID int64) ([]ListArtifactsForRunRow, error)
7071
 	// Older queued/running runs with the same group block the new run while they
internal/actions/sqlc/workflow_runs.sql.gomodified
@@ -398,6 +398,68 @@ func (q *Queries) InsertWorkflowRun(ctx context.Context, db DBTX, arg InsertWork
398398
 	return i, err
399399
 }
400400
 
401
+const listActiveWorkflowRunsForAdmin = `-- name: ListActiveWorkflowRunsForAdmin :many
402
+SELECT id, repo_id, run_index, workflow_file, workflow_name,
403
+       head_sha, head_ref, event, event_payload,
404
+       actor_user_id, parent_run_id, concurrency_group,
405
+       status, conclusion, pinned, need_approval, approved_by_user_id,
406
+       started_at, completed_at, version, created_at, updated_at, trigger_event_id
407
+FROM workflow_runs
408
+WHERE status IN ('queued', 'running')
409
+  AND ($1::bigint = 0 OR repo_id = $1::bigint)
410
+ORDER BY created_at ASC, id ASC
411
+LIMIT $2::int
412
+`
413
+
414
+type ListActiveWorkflowRunsForAdminParams struct {
415
+	RepoID     int64
416
+	LimitCount int32
417
+}
418
+
419
+func (q *Queries) ListActiveWorkflowRunsForAdmin(ctx context.Context, db DBTX, arg ListActiveWorkflowRunsForAdminParams) ([]WorkflowRun, error) {
420
+	rows, err := db.Query(ctx, listActiveWorkflowRunsForAdmin, arg.RepoID, arg.LimitCount)
421
+	if err != nil {
422
+		return nil, err
423
+	}
424
+	defer rows.Close()
425
+	items := []WorkflowRun{}
426
+	for rows.Next() {
427
+		var i WorkflowRun
428
+		if err := rows.Scan(
429
+			&i.ID,
430
+			&i.RepoID,
431
+			&i.RunIndex,
432
+			&i.WorkflowFile,
433
+			&i.WorkflowName,
434
+			&i.HeadSha,
435
+			&i.HeadRef,
436
+			&i.Event,
437
+			&i.EventPayload,
438
+			&i.ActorUserID,
439
+			&i.ParentRunID,
440
+			&i.ConcurrencyGroup,
441
+			&i.Status,
442
+			&i.Conclusion,
443
+			&i.Pinned,
444
+			&i.NeedApproval,
445
+			&i.ApprovedByUserID,
446
+			&i.StartedAt,
447
+			&i.CompletedAt,
448
+			&i.Version,
449
+			&i.CreatedAt,
450
+			&i.UpdatedAt,
451
+			&i.TriggerEventID,
452
+		); err != nil {
453
+			return nil, err
454
+		}
455
+		items = append(items, i)
456
+	}
457
+	if err := rows.Err(); err != nil {
458
+		return nil, err
459
+	}
460
+	return items, nil
461
+}
462
+
401463
 const listBlockingConcurrencyRunsForUpdate = `-- name: ListBlockingConcurrencyRunsForUpdate :many
402464
 SELECT r.id, r.repo_id, r.run_index, r.workflow_file, r.workflow_name,
403465
        r.head_sha, r.head_ref, r.event, r.event_payload,