tenseleyflow/shithub / 105eb12

Browse files

actions/lifecycle: request workflow cancellations

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
105eb128d5d013f9b5f09ef0cfb8af691ce9149a
Parents
c782968
Tree
aa6c6b4

8 changed files

StatusFile+-
A internal/actions/lifecycle/cancel.go 316 0
A internal/actions/lifecycle/cancel_test.go 189 0
M internal/actions/queries/workflow_jobs.sql 60 1
M internal/actions/queries/workflow_steps.sql 16 0
M internal/actions/sqlc/querier.go 3 0
M internal/actions/sqlc/workflow_jobs.sql.go 146 14
M internal/actions/sqlc/workflow_steps.sql.go 59 0
M internal/infra/metrics/metrics.go 8 0
internal/actions/lifecycle/cancel.goadded
@@ -0,0 +1,316 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+// Package lifecycle owns user-visible Actions run/job lifecycle mutations:
4
+// cancellation now, with re-runs and retention following in later S41g slices.
5
+package lifecycle
6
+
7
+import (
8
+	"context"
9
+	"errors"
10
+	"fmt"
11
+	"log/slog"
12
+	"strings"
13
+	"time"
14
+
15
+	"github.com/jackc/pgx/v5"
16
+	"github.com/jackc/pgx/v5/pgtype"
17
+	"github.com/jackc/pgx/v5/pgxpool"
18
+
19
+	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
20
+	"github.com/tenseleyFlow/shithub/internal/checks"
21
+	checksdb "github.com/tenseleyFlow/shithub/internal/checks/sqlc"
22
+	"github.com/tenseleyFlow/shithub/internal/infra/metrics"
23
+)
24
+
25
+const (
26
+	CancelReasonUser        = "user"
27
+	CancelReasonConcurrency = "concurrency"
28
+	CancelReasonTimeout     = "timeout"
29
+)
30
+
31
+// Deps wires lifecycle operations to postgres and optional warning logs.
32
+type Deps struct {
33
+	Pool   *pgxpool.Pool
34
+	Logger *slog.Logger
35
+}
36
+
37
+// CancelResult summarizes the durable state changes from a cancel request.
38
+type CancelResult struct {
39
+	RunID         int64
40
+	ChangedJobs   []actionsdb.WorkflowJob
41
+	RunCompleted  bool
42
+	RunConclusion actionsdb.CheckConclusion
43
+}
44
+
45
+// CancelRun requests cancellation for every queued/running job in a workflow
46
+// run. Queued jobs become terminal immediately; running jobs keep running with
47
+// cancel_requested=true so the runner's cancel-check loop can kill them.
48
+func CancelRun(ctx context.Context, deps Deps, runID int64, reason string) (CancelResult, error) {
49
+	if deps.Pool == nil {
50
+		return CancelResult{}, errors.New("actions lifecycle: nil Pool")
51
+	}
52
+	q := actionsdb.New()
53
+	tx, err := deps.Pool.Begin(ctx)
54
+	if err != nil {
55
+		return CancelResult{}, err
56
+	}
57
+	committed := false
58
+	defer func() {
59
+		if !committed {
60
+			_ = tx.Rollback(ctx)
61
+		}
62
+	}()
63
+
64
+	if _, err := q.GetWorkflowRunByID(ctx, tx, runID); err != nil {
65
+		return CancelResult{}, err
66
+	}
67
+	changed, err := q.RequestWorkflowRunCancel(ctx, tx, runID)
68
+	if err != nil {
69
+		return CancelResult{}, err
70
+	}
71
+	for _, job := range changed {
72
+		if job.Status == actionsdb.WorkflowJobStatusCancelled {
73
+			if _, err := q.CancelOpenWorkflowStepsForJob(ctx, tx, job.ID); err != nil {
74
+				return CancelResult{}, err
75
+			}
76
+		}
77
+	}
78
+	var (
79
+		runCompleted  bool
80
+		runConclusion actionsdb.CheckConclusion
81
+	)
82
+	if len(changed) > 0 {
83
+		runCompleted, runConclusion, err = rollupRunAfterCancel(ctx, q, tx, runID)
84
+		if err != nil {
85
+			return CancelResult{}, err
86
+		}
87
+	}
88
+	if err := tx.Commit(ctx); err != nil {
89
+		return CancelResult{}, err
90
+	}
91
+	committed = true
92
+
93
+	recordCancelledJobs(changed, reason)
94
+	syncChangedJobChecks(ctx, deps, changed)
95
+	return CancelResult{
96
+		RunID:         runID,
97
+		ChangedJobs:   changed,
98
+		RunCompleted:  runCompleted,
99
+		RunConclusion: runConclusion,
100
+	}, nil
101
+}
102
+
103
+// CancelJob requests cancellation for one queued/running job. Terminal jobs are
104
+// a successful no-op so a cancel/complete race does not surface as an error.
105
+func CancelJob(ctx context.Context, deps Deps, jobID int64, reason string) (CancelResult, error) {
106
+	if deps.Pool == nil {
107
+		return CancelResult{}, errors.New("actions lifecycle: nil Pool")
108
+	}
109
+	q := actionsdb.New()
110
+	tx, err := deps.Pool.Begin(ctx)
111
+	if err != nil {
112
+		return CancelResult{}, err
113
+	}
114
+	committed := false
115
+	defer func() {
116
+		if !committed {
117
+			_ = tx.Rollback(ctx)
118
+		}
119
+	}()
120
+
121
+	changedJob, err := q.RequestWorkflowJobCancel(ctx, tx, jobID)
122
+	var changed []actionsdb.WorkflowJob
123
+	var runID int64
124
+	switch {
125
+	case err == nil:
126
+		changed = []actionsdb.WorkflowJob{changedJob}
127
+		runID = changedJob.RunID
128
+		if changedJob.Status == actionsdb.WorkflowJobStatusCancelled {
129
+			if _, err := q.CancelOpenWorkflowStepsForJob(ctx, tx, changedJob.ID); err != nil {
130
+				return CancelResult{}, err
131
+			}
132
+		}
133
+	case errors.Is(err, pgx.ErrNoRows):
134
+		existing, getErr := q.GetWorkflowJobByID(ctx, tx, jobID)
135
+		if getErr != nil {
136
+			return CancelResult{}, getErr
137
+		}
138
+		runID = existing.RunID
139
+	default:
140
+		return CancelResult{}, err
141
+	}
142
+
143
+	var (
144
+		runCompleted  bool
145
+		runConclusion actionsdb.CheckConclusion
146
+	)
147
+	if len(changed) > 0 {
148
+		runCompleted, runConclusion, err = rollupRunAfterCancel(ctx, q, tx, runID)
149
+		if err != nil {
150
+			return CancelResult{}, err
151
+		}
152
+	}
153
+	if err := tx.Commit(ctx); err != nil {
154
+		return CancelResult{}, err
155
+	}
156
+	committed = true
157
+
158
+	recordCancelledJobs(changed, reason)
159
+	syncChangedJobChecks(ctx, deps, changed)
160
+	return CancelResult{
161
+		RunID:         runID,
162
+		ChangedJobs:   changed,
163
+		RunCompleted:  runCompleted,
164
+		RunConclusion: runConclusion,
165
+	}, nil
166
+}
167
+
168
+func rollupRunAfterCancel(
169
+	ctx context.Context,
170
+	q *actionsdb.Queries,
171
+	tx pgx.Tx,
172
+	runID int64,
173
+) (bool, actionsdb.CheckConclusion, error) {
174
+	jobs, err := q.ListJobsForRun(ctx, tx, runID)
175
+	if err != nil {
176
+		return false, "", err
177
+	}
178
+	runConclusion, complete := deriveWorkflowRunConclusion(jobs)
179
+	if complete {
180
+		if _, err := q.CompleteWorkflowRun(ctx, tx, actionsdb.CompleteWorkflowRunParams{
181
+			ID:         runID,
182
+			Conclusion: runConclusion,
183
+		}); err != nil {
184
+			return false, "", err
185
+		}
186
+		return true, runConclusion, nil
187
+	}
188
+	if err := q.MarkWorkflowRunRunning(ctx, tx, runID); err != nil {
189
+		return false, "", err
190
+	}
191
+	return false, "", nil
192
+}
193
+
194
+func deriveWorkflowRunConclusion(jobs []actionsdb.ListJobsForRunRow) (actionsdb.CheckConclusion, bool) {
195
+	if len(jobs) == 0 {
196
+		return actionsdb.CheckConclusionFailure, true
197
+	}
198
+	worst := actionsdb.CheckConclusionSuccess
199
+	for _, job := range jobs {
200
+		switch job.Status {
201
+		case actionsdb.WorkflowJobStatusCompleted, actionsdb.WorkflowJobStatusCancelled, actionsdb.WorkflowJobStatusSkipped:
202
+		default:
203
+			return "", false
204
+		}
205
+		if job.Status == actionsdb.WorkflowJobStatusCancelled {
206
+			worst = actionsdb.CheckConclusionCancelled
207
+			continue
208
+		}
209
+		if !job.Conclusion.Valid {
210
+			return actionsdb.CheckConclusionFailure, true
211
+		}
212
+		c := job.Conclusion.CheckConclusion
213
+		if c == actionsdb.CheckConclusionFailure ||
214
+			c == actionsdb.CheckConclusionTimedOut ||
215
+			c == actionsdb.CheckConclusionActionRequired {
216
+			return c, true
217
+		}
218
+		if c == actionsdb.CheckConclusionCancelled {
219
+			worst = actionsdb.CheckConclusionCancelled
220
+		}
221
+	}
222
+	return worst, true
223
+}
224
+
225
+func recordCancelledJobs(jobs []actionsdb.WorkflowJob, reason string) {
226
+	if len(jobs) == 0 {
227
+		return
228
+	}
229
+	metrics.ActionsJobsCancelledTotal.WithLabelValues(cancelReason(reason)).Add(float64(len(jobs)))
230
+}
231
+
232
+func cancelReason(reason string) string {
233
+	switch strings.TrimSpace(reason) {
234
+	case CancelReasonUser:
235
+		return CancelReasonUser
236
+	case CancelReasonConcurrency:
237
+		return CancelReasonConcurrency
238
+	case CancelReasonTimeout:
239
+		return CancelReasonTimeout
240
+	default:
241
+		return CancelReasonUser
242
+	}
243
+}
244
+
245
+func syncChangedJobChecks(ctx context.Context, deps Deps, jobs []actionsdb.WorkflowJob) {
246
+	for _, job := range jobs {
247
+		if job.Status != actionsdb.WorkflowJobStatusRunning &&
248
+			job.Status != actionsdb.WorkflowJobStatusCompleted &&
249
+			job.Status != actionsdb.WorkflowJobStatusCancelled {
250
+			continue
251
+		}
252
+		if err := SyncCheckRunForJob(ctx, deps, job); err != nil && deps.Logger != nil {
253
+			deps.Logger.WarnContext(ctx, "actions lifecycle: sync check_run", "job_id", job.ID, "error", err)
254
+		}
255
+	}
256
+}
257
+
258
+// SyncCheckRunForJob mirrors an Actions workflow_job row into its check_run
259
+// row. Missing check rows are non-fatal because check creation is already
260
+// best-effort in the trigger path and can be reconciled independently.
261
+func SyncCheckRunForJob(ctx context.Context, deps Deps, job actionsdb.WorkflowJob) error {
262
+	if deps.Pool == nil {
263
+		return errors.New("actions lifecycle: nil Pool")
264
+	}
265
+	run, err := actionsdb.New().GetWorkflowRunByID(ctx, deps.Pool, job.RunID)
266
+	if err != nil {
267
+		return err
268
+	}
269
+	name := strings.TrimSpace(job.JobName)
270
+	if name == "" {
271
+		name = job.JobKey
272
+	}
273
+	checkRun, err := checksdb.New().GetCheckRunByExternalID(ctx, deps.Pool, checksdb.GetCheckRunByExternalIDParams{
274
+		RepoID:     run.RepoID,
275
+		HeadSha:    run.HeadSha,
276
+		Name:       name,
277
+		ExternalID: pgtype.Text{String: fmt.Sprintf("workflow_run:%d:job:%s", job.RunID, job.JobKey), Valid: true},
278
+	})
279
+	if err != nil {
280
+		if errors.Is(err, pgx.ErrNoRows) {
281
+			return nil
282
+		}
283
+		return err
284
+	}
285
+	params := checks.UpdateParams{
286
+		RunID:        checkRun.ID,
287
+		HasStatus:    true,
288
+		HasStartedAt: true,
289
+		StartedAt:    timeFromPg(job.StartedAt),
290
+	}
291
+	switch job.Status {
292
+	case actionsdb.WorkflowJobStatusRunning:
293
+		params.Status = "in_progress"
294
+	case actionsdb.WorkflowJobStatusCompleted, actionsdb.WorkflowJobStatusCancelled:
295
+		params.Status = "completed"
296
+		params.HasConclusion = true
297
+		if job.Conclusion.Valid {
298
+			params.Conclusion = string(job.Conclusion.CheckConclusion)
299
+		} else if job.Status == actionsdb.WorkflowJobStatusCancelled {
300
+			params.Conclusion = string(actionsdb.CheckConclusionCancelled)
301
+		}
302
+		params.HasCompletedAt = true
303
+		params.CompletedAt = timeFromPg(job.CompletedAt)
304
+	default:
305
+		return nil
306
+	}
307
+	_, err = checks.Update(ctx, checks.Deps{Pool: deps.Pool, Logger: deps.Logger}, params)
308
+	return err
309
+}
310
+
311
+func timeFromPg(ts pgtype.Timestamptz) time.Time {
312
+	if !ts.Valid {
313
+		return time.Time{}
314
+	}
315
+	return ts.Time
316
+}
internal/actions/lifecycle/cancel_test.goadded
@@ -0,0 +1,189 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package lifecycle
4
+
5
+import (
6
+	"context"
7
+	"strings"
8
+	"testing"
9
+
10
+	"github.com/jackc/pgx/v5/pgtype"
11
+
12
+	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
13
+	reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
14
+	"github.com/tenseleyFlow/shithub/internal/testing/dbtest"
15
+	usersdb "github.com/tenseleyFlow/shithub/internal/users/sqlc"
16
+)
17
+
18
+const fixtureHash = "$argon2id$v=19$m=16384,t=1,p=1$" +
19
+	"AAAAAAAAAAAAAAAA$" +
20
+	"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
21
+
22
+func TestCancelRunCancelsQueuedJobsAndCompletesRun(t *testing.T) {
23
+	ctx := context.Background()
24
+	pool := dbtest.NewTestDB(t)
25
+	repoID, userID := setupLifecycleRepo(t, pool)
26
+	q := actionsdb.New()
27
+	run := insertLifecycleRun(t, pool, repoID, userID, 1)
28
+	job, err := q.InsertWorkflowJob(ctx, pool, actionsdb.InsertWorkflowJobParams{
29
+		RunID:          run.ID,
30
+		JobIndex:       0,
31
+		JobKey:         "build",
32
+		JobName:        "Build",
33
+		RunsOn:         "ubuntu-latest",
34
+		NeedsJobs:      []string{},
35
+		TimeoutMinutes: 30,
36
+		Permissions:    []byte(`{}`),
37
+		JobEnv:         []byte(`{}`),
38
+	})
39
+	if err != nil {
40
+		t.Fatalf("InsertWorkflowJob: %v", err)
41
+	}
42
+	step, err := q.InsertWorkflowStep(ctx, pool, actionsdb.InsertWorkflowStepParams{
43
+		JobID:      job.ID,
44
+		StepIndex:  0,
45
+		RunCommand: "go test ./...",
46
+		StepEnv:    []byte(`{}`),
47
+		StepWith:   []byte(`{}`),
48
+	})
49
+	if err != nil {
50
+		t.Fatalf("InsertWorkflowStep: %v", err)
51
+	}
52
+
53
+	result, err := CancelRun(ctx, Deps{Pool: pool}, run.ID, CancelReasonUser)
54
+	if err != nil {
55
+		t.Fatalf("CancelRun: %v", err)
56
+	}
57
+	if len(result.ChangedJobs) != 1 || !result.RunCompleted || result.RunConclusion != actionsdb.CheckConclusionCancelled {
58
+		t.Fatalf("result: %+v", result)
59
+	}
60
+	gotJob, err := q.GetWorkflowJobByID(ctx, pool, job.ID)
61
+	if err != nil {
62
+		t.Fatalf("GetWorkflowJobByID: %v", err)
63
+	}
64
+	if gotJob.Status != actionsdb.WorkflowJobStatusCancelled || !gotJob.CancelRequested ||
65
+		!gotJob.Conclusion.Valid || gotJob.Conclusion.CheckConclusion != actionsdb.CheckConclusionCancelled {
66
+		t.Fatalf("job: %+v", gotJob)
67
+	}
68
+	gotStep, err := q.GetWorkflowStepByID(ctx, pool, step.ID)
69
+	if err != nil {
70
+		t.Fatalf("GetWorkflowStepByID: %v", err)
71
+	}
72
+	if gotStep.Status != actionsdb.WorkflowStepStatusCancelled ||
73
+		!gotStep.Conclusion.Valid || gotStep.Conclusion.CheckConclusion != actionsdb.CheckConclusionCancelled {
74
+		t.Fatalf("step: %+v", gotStep)
75
+	}
76
+	gotRun, err := q.GetWorkflowRunByID(ctx, pool, run.ID)
77
+	if err != nil {
78
+		t.Fatalf("GetWorkflowRunByID: %v", err)
79
+	}
80
+	if gotRun.Status != actionsdb.WorkflowRunStatusCompleted ||
81
+		!gotRun.Conclusion.Valid || gotRun.Conclusion.CheckConclusion != actionsdb.CheckConclusionCancelled {
82
+		t.Fatalf("run: %+v", gotRun)
83
+	}
84
+}
85
+
86
+func TestCancelJobRequestsRunningJobWithoutTerminalOverwrite(t *testing.T) {
87
+	ctx := context.Background()
88
+	pool := dbtest.NewTestDB(t)
89
+	repoID, userID := setupLifecycleRepo(t, pool)
90
+	q := actionsdb.New()
91
+	run := insertLifecycleRun(t, pool, repoID, userID, 2)
92
+	job, err := q.InsertWorkflowJob(ctx, pool, actionsdb.InsertWorkflowJobParams{
93
+		RunID:          run.ID,
94
+		JobIndex:       0,
95
+		JobKey:         "test",
96
+		JobName:        "Test",
97
+		RunsOn:         "ubuntu-latest",
98
+		NeedsJobs:      []string{},
99
+		TimeoutMinutes: 30,
100
+		Permissions:    []byte(`{}`),
101
+		JobEnv:         []byte(`{}`),
102
+	})
103
+	if err != nil {
104
+		t.Fatalf("InsertWorkflowJob: %v", err)
105
+	}
106
+	runner, err := q.InsertRunner(ctx, pool, actionsdb.InsertRunnerParams{
107
+		Name:     "runner-1",
108
+		Labels:   []string{"ubuntu-latest"},
109
+		Capacity: 1,
110
+	})
111
+	if err != nil {
112
+		t.Fatalf("InsertRunner: %v", err)
113
+	}
114
+	if _, err := pool.Exec(ctx, `UPDATE workflow_jobs SET runner_id = $1, status = 'running', started_at = now() WHERE id = $2`, runner.ID, job.ID); err != nil {
115
+		t.Fatalf("mark job running: %v", err)
116
+	}
117
+
118
+	result, err := CancelJob(ctx, Deps{Pool: pool}, job.ID, CancelReasonUser)
119
+	if err != nil {
120
+		t.Fatalf("CancelJob: %v", err)
121
+	}
122
+	if len(result.ChangedJobs) != 1 || result.RunCompleted {
123
+		t.Fatalf("result: %+v", result)
124
+	}
125
+	gotJob, err := q.GetWorkflowJobByID(ctx, pool, job.ID)
126
+	if err != nil {
127
+		t.Fatalf("GetWorkflowJobByID: %v", err)
128
+	}
129
+	if gotJob.Status != actionsdb.WorkflowJobStatusRunning || !gotJob.CancelRequested || gotJob.Conclusion.Valid {
130
+		t.Fatalf("job: %+v", gotJob)
131
+	}
132
+	gotRun, err := q.GetWorkflowRunByID(ctx, pool, run.ID)
133
+	if err != nil {
134
+		t.Fatalf("GetWorkflowRunByID: %v", err)
135
+	}
136
+	if gotRun.Status != actionsdb.WorkflowRunStatusRunning || gotRun.Conclusion.Valid {
137
+		t.Fatalf("run: %+v", gotRun)
138
+	}
139
+
140
+	again, err := CancelJob(ctx, Deps{Pool: pool}, job.ID, CancelReasonUser)
141
+	if err != nil {
142
+		t.Fatalf("CancelJob repeat: %v", err)
143
+	}
144
+	if len(again.ChangedJobs) != 0 {
145
+		t.Fatalf("repeat was not idempotent: %+v", again)
146
+	}
147
+}
148
+
149
+func setupLifecycleRepo(t *testing.T, db actionsdb.DBTX) (repoID, userID int64) {
150
+	t.Helper()
151
+	ctx := context.Background()
152
+	user, err := usersdb.New().CreateUser(ctx, db, usersdb.CreateUserParams{
153
+		Username:     "alice",
154
+		DisplayName:  "Alice",
155
+		PasswordHash: fixtureHash,
156
+	})
157
+	if err != nil {
158
+		t.Fatalf("CreateUser: %v", err)
159
+	}
160
+	repo, err := reposdb.New().CreateRepo(ctx, db, reposdb.CreateRepoParams{
161
+		OwnerUserID:   pgtype.Int8{Int64: user.ID, Valid: true},
162
+		Name:          "demo",
163
+		DefaultBranch: "trunk",
164
+		Visibility:    reposdb.RepoVisibilityPublic,
165
+	})
166
+	if err != nil {
167
+		t.Fatalf("CreateRepo: %v", err)
168
+	}
169
+	return repo.ID, user.ID
170
+}
171
+
172
+func insertLifecycleRun(t *testing.T, db actionsdb.DBTX, repoID, userID, runIndex int64) actionsdb.WorkflowRun {
173
+	t.Helper()
174
+	run, err := actionsdb.New().InsertWorkflowRun(context.Background(), db, actionsdb.InsertWorkflowRunParams{
175
+		RepoID:       repoID,
176
+		RunIndex:     runIndex,
177
+		WorkflowFile: ".shithub/workflows/ci.yml",
178
+		WorkflowName: "CI",
179
+		HeadSha:      strings.Repeat("a", 40),
180
+		HeadRef:      "trunk",
181
+		Event:        actionsdb.WorkflowRunEventPush,
182
+		EventPayload: []byte(`{}`),
183
+		ActorUserID:  pgtype.Int8{Int64: userID, Valid: true},
184
+	})
185
+	if err != nil {
186
+		t.Fatalf("InsertWorkflowRun: %v", err)
187
+	}
188
+	return run
189
+}
internal/actions/queries/workflow_jobs.sqlmodified
@@ -35,6 +35,64 @@ RETURNING id, run_id, job_index, job_key, job_name, runs_on,
3535
           job_env, status, conclusion, cancel_requested,
3636
           started_at, completed_at, version, created_at, updated_at;
3737
 
38
+-- name: RequestWorkflowJobCancel :one
39
+UPDATE workflow_jobs
40
+SET cancel_requested = true,
41
+    status = CASE
42
+        WHEN status = 'queued' THEN 'cancelled'::workflow_job_status
43
+        ELSE status
44
+    END,
45
+    conclusion = CASE
46
+        WHEN status = 'queued' THEN 'cancelled'::check_conclusion
47
+        ELSE conclusion
48
+    END,
49
+    started_at = CASE
50
+        WHEN status = 'queued' THEN COALESCE(started_at, now())
51
+        ELSE started_at
52
+    END,
53
+    completed_at = CASE
54
+        WHEN status = 'queued' THEN COALESCE(completed_at, now())
55
+        ELSE completed_at
56
+    END,
57
+    version = version + 1,
58
+    updated_at = now()
59
+WHERE id = $1
60
+  AND status IN ('queued', 'running')
61
+  AND (status = 'queued' OR cancel_requested = false)
62
+RETURNING id, run_id, job_index, job_key, job_name, runs_on,
63
+          runner_id, needs_jobs, if_expr, timeout_minutes, permissions,
64
+          job_env, status, conclusion, cancel_requested,
65
+          started_at, completed_at, version, created_at, updated_at;
66
+
67
+-- name: RequestWorkflowRunCancel :many
68
+UPDATE workflow_jobs
69
+SET cancel_requested = true,
70
+    status = CASE
71
+        WHEN status = 'queued' THEN 'cancelled'::workflow_job_status
72
+        ELSE status
73
+    END,
74
+    conclusion = CASE
75
+        WHEN status = 'queued' THEN 'cancelled'::check_conclusion
76
+        ELSE conclusion
77
+    END,
78
+    started_at = CASE
79
+        WHEN status = 'queued' THEN COALESCE(started_at, now())
80
+        ELSE started_at
81
+    END,
82
+    completed_at = CASE
83
+        WHEN status = 'queued' THEN COALESCE(completed_at, now())
84
+        ELSE completed_at
85
+    END,
86
+    version = version + 1,
87
+    updated_at = now()
88
+WHERE run_id = $1
89
+  AND status IN ('queued', 'running')
90
+  AND (status = 'queued' OR cancel_requested = false)
91
+RETURNING id, run_id, job_index, job_key, job_name, runs_on,
92
+          runner_id, needs_jobs, if_expr, timeout_minutes, permissions,
93
+          job_env, status, conclusion, cancel_requested,
94
+          started_at, completed_at, version, created_at, updated_at;
95
+
3896
 -- name: CountRunningJobsForRunner :one
3997
 SELECT COUNT(*)::integer
4098
 FROM workflow_jobs
@@ -45,6 +103,7 @@ WITH candidate AS (
45103
     SELECT j.id
46104
     FROM workflow_jobs j
47105
     WHERE j.status = 'queued'
106
+      AND j.cancel_requested = false
48107
       AND j.runner_id IS NULL
49108
       AND (j.runs_on = '' OR j.runs_on = ANY(sqlc.arg(labels)::text[]))
50109
       AND NOT EXISTS (
@@ -85,7 +144,7 @@ JOIN workflow_runs r ON r.id = c.run_id;
85144
 
86145
 -- name: ListJobsForRun :many
87146
 SELECT id, run_id, job_index, job_key, job_name, runs_on, status,
88
-       conclusion, needs_jobs, started_at, completed_at, created_at, updated_at
147
+       conclusion, cancel_requested, needs_jobs, started_at, completed_at, created_at, updated_at
89148
 FROM workflow_jobs
90149
 WHERE run_id = $1
91150
 ORDER BY job_index ASC;
internal/actions/queries/workflow_steps.sqlmodified
@@ -38,6 +38,22 @@ RETURNING id, job_id, step_index, step_id, step_name, if_expr,
3838
           log_byte_count, started_at, completed_at, version,
3939
           created_at, updated_at, step_with;
4040
 
41
+-- name: CancelOpenWorkflowStepsForJob :many
42
+UPDATE workflow_steps
43
+SET status = 'cancelled',
44
+    conclusion = 'cancelled',
45
+    started_at = COALESCE(started_at, now()),
46
+    completed_at = COALESCE(completed_at, now()),
47
+    version = version + 1,
48
+    updated_at = now()
49
+WHERE job_id = $1
50
+  AND status IN ('queued', 'running')
51
+RETURNING id, job_id, step_index, step_id, step_name, if_expr,
52
+          run_command, uses_alias, working_directory, step_env,
53
+          continue_on_error, status, conclusion, log_object_key,
54
+          log_byte_count, started_at, completed_at, version,
55
+          created_at, updated_at, step_with;
56
+
4157
 -- name: UpdateWorkflowStepLogObject :one
4258
 UPDATE workflow_steps
4359
 SET log_object_key = sqlc.narg(log_object_key)::text,
internal/actions/sqlc/querier.gomodified
@@ -13,6 +13,7 @@ import (
1313
 type Querier interface {
1414
 	// SPDX-License-Identifier: AGPL-3.0-or-later
1515
 	AppendStepLogChunk(ctx context.Context, db DBTX, arg AppendStepLogChunkParams) (AppendStepLogChunkRow, error)
16
+	CancelOpenWorkflowStepsForJob(ctx context.Context, db DBTX, jobID int64) ([]WorkflowStep, error)
1617
 	ClaimQueuedWorkflowJob(ctx context.Context, db DBTX, arg ClaimQueuedWorkflowJobParams) (ClaimQueuedWorkflowJobRow, error)
1718
 	CompleteWorkflowRun(ctx context.Context, db DBTX, arg CompleteWorkflowRunParams) (WorkflowRun, error)
1819
 	CountRunningJobsForRunner(ctx context.Context, db DBTX, runnerID int64) (int32, error)
@@ -90,6 +91,8 @@ type Querier interface {
9091
 	// Cast to bigint so sqlc generates int64 (the column type) rather
9192
 	// than int32 (the type the +1 literal would default to).
9293
 	NextRunIndexForRepo(ctx context.Context, db DBTX, repoID int64) (int64, error)
94
+	RequestWorkflowJobCancel(ctx context.Context, db DBTX, id int64) (WorkflowJob, error)
95
+	RequestWorkflowRunCancel(ctx context.Context, db DBTX, runID int64) ([]WorkflowJob, error)
9396
 	RevokeAllTokensForRunner(ctx context.Context, db DBTX, runnerID int64) error
9497
 	TouchRunnerHeartbeat(ctx context.Context, db DBTX, arg TouchRunnerHeartbeatParams) error
9598
 	UpdateStepLogChunk(ctx context.Context, db DBTX, arg UpdateStepLogChunkParams) error
internal/actions/sqlc/workflow_jobs.sql.gomodified
@@ -16,6 +16,7 @@ WITH candidate AS (
1616
     SELECT j.id
1717
     FROM workflow_jobs j
1818
     WHERE j.status = 'queued'
19
+      AND j.cancel_requested = false
1920
       AND j.runner_id IS NULL
2021
       AND (j.runs_on = '' OR j.runs_on = ANY($1::text[]))
2122
       AND NOT EXISTS (
@@ -247,26 +248,27 @@ func (q *Queries) InsertWorkflowJob(ctx context.Context, db DBTX, arg InsertWork
247248
 
248249
 const listJobsForRun = `-- name: ListJobsForRun :many
249250
 SELECT id, run_id, job_index, job_key, job_name, runs_on, status,
250
-       conclusion, needs_jobs, started_at, completed_at, created_at, updated_at
251
+       conclusion, cancel_requested, needs_jobs, started_at, completed_at, created_at, updated_at
251252
 FROM workflow_jobs
252253
 WHERE run_id = $1
253254
 ORDER BY job_index ASC
254255
 `
255256
 
256257
 type ListJobsForRunRow struct {
257
-	ID          int64
258
-	RunID       int64
259
-	JobIndex    int32
260
-	JobKey      string
261
-	JobName     string
262
-	RunsOn      string
263
-	Status      WorkflowJobStatus
264
-	Conclusion  NullCheckConclusion
265
-	NeedsJobs   []string
266
-	StartedAt   pgtype.Timestamptz
267
-	CompletedAt pgtype.Timestamptz
268
-	CreatedAt   pgtype.Timestamptz
269
-	UpdatedAt   pgtype.Timestamptz
258
+	ID              int64
259
+	RunID           int64
260
+	JobIndex        int32
261
+	JobKey          string
262
+	JobName         string
263
+	RunsOn          string
264
+	Status          WorkflowJobStatus
265
+	Conclusion      NullCheckConclusion
266
+	CancelRequested bool
267
+	NeedsJobs       []string
268
+	StartedAt       pgtype.Timestamptz
269
+	CompletedAt     pgtype.Timestamptz
270
+	CreatedAt       pgtype.Timestamptz
271
+	UpdatedAt       pgtype.Timestamptz
270272
 }
271273
 
272274
 func (q *Queries) ListJobsForRun(ctx context.Context, db DBTX, runID int64) ([]ListJobsForRunRow, error) {
@@ -287,6 +289,7 @@ func (q *Queries) ListJobsForRun(ctx context.Context, db DBTX, runID int64) ([]L
287289
 			&i.RunsOn,
288290
 			&i.Status,
289291
 			&i.Conclusion,
292
+			&i.CancelRequested,
290293
 			&i.NeedsJobs,
291294
 			&i.StartedAt,
292295
 			&i.CompletedAt,
@@ -303,6 +306,135 @@ func (q *Queries) ListJobsForRun(ctx context.Context, db DBTX, runID int64) ([]L
303306
 	return items, nil
304307
 }
305308
 
309
+const requestWorkflowJobCancel = `-- name: RequestWorkflowJobCancel :one
310
+UPDATE workflow_jobs
311
+SET cancel_requested = true,
312
+    status = CASE
313
+        WHEN status = 'queued' THEN 'cancelled'::workflow_job_status
314
+        ELSE status
315
+    END,
316
+    conclusion = CASE
317
+        WHEN status = 'queued' THEN 'cancelled'::check_conclusion
318
+        ELSE conclusion
319
+    END,
320
+    started_at = CASE
321
+        WHEN status = 'queued' THEN COALESCE(started_at, now())
322
+        ELSE started_at
323
+    END,
324
+    completed_at = CASE
325
+        WHEN status = 'queued' THEN COALESCE(completed_at, now())
326
+        ELSE completed_at
327
+    END,
328
+    version = version + 1,
329
+    updated_at = now()
330
+WHERE id = $1
331
+  AND status IN ('queued', 'running')
332
+  AND (status = 'queued' OR cancel_requested = false)
333
+RETURNING id, run_id, job_index, job_key, job_name, runs_on,
334
+          runner_id, needs_jobs, if_expr, timeout_minutes, permissions,
335
+          job_env, status, conclusion, cancel_requested,
336
+          started_at, completed_at, version, created_at, updated_at
337
+`
338
+
339
+func (q *Queries) RequestWorkflowJobCancel(ctx context.Context, db DBTX, id int64) (WorkflowJob, error) {
340
+	row := db.QueryRow(ctx, requestWorkflowJobCancel, id)
341
+	var i WorkflowJob
342
+	err := row.Scan(
343
+		&i.ID,
344
+		&i.RunID,
345
+		&i.JobIndex,
346
+		&i.JobKey,
347
+		&i.JobName,
348
+		&i.RunsOn,
349
+		&i.RunnerID,
350
+		&i.NeedsJobs,
351
+		&i.IfExpr,
352
+		&i.TimeoutMinutes,
353
+		&i.Permissions,
354
+		&i.JobEnv,
355
+		&i.Status,
356
+		&i.Conclusion,
357
+		&i.CancelRequested,
358
+		&i.StartedAt,
359
+		&i.CompletedAt,
360
+		&i.Version,
361
+		&i.CreatedAt,
362
+		&i.UpdatedAt,
363
+	)
364
+	return i, err
365
+}
366
+
367
+const requestWorkflowRunCancel = `-- name: RequestWorkflowRunCancel :many
368
+UPDATE workflow_jobs
369
+SET cancel_requested = true,
370
+    status = CASE
371
+        WHEN status = 'queued' THEN 'cancelled'::workflow_job_status
372
+        ELSE status
373
+    END,
374
+    conclusion = CASE
375
+        WHEN status = 'queued' THEN 'cancelled'::check_conclusion
376
+        ELSE conclusion
377
+    END,
378
+    started_at = CASE
379
+        WHEN status = 'queued' THEN COALESCE(started_at, now())
380
+        ELSE started_at
381
+    END,
382
+    completed_at = CASE
383
+        WHEN status = 'queued' THEN COALESCE(completed_at, now())
384
+        ELSE completed_at
385
+    END,
386
+    version = version + 1,
387
+    updated_at = now()
388
+WHERE run_id = $1
389
+  AND status IN ('queued', 'running')
390
+  AND (status = 'queued' OR cancel_requested = false)
391
+RETURNING id, run_id, job_index, job_key, job_name, runs_on,
392
+          runner_id, needs_jobs, if_expr, timeout_minutes, permissions,
393
+          job_env, status, conclusion, cancel_requested,
394
+          started_at, completed_at, version, created_at, updated_at
395
+`
396
+
397
+func (q *Queries) RequestWorkflowRunCancel(ctx context.Context, db DBTX, runID int64) ([]WorkflowJob, error) {
398
+	rows, err := db.Query(ctx, requestWorkflowRunCancel, runID)
399
+	if err != nil {
400
+		return nil, err
401
+	}
402
+	defer rows.Close()
403
+	items := []WorkflowJob{}
404
+	for rows.Next() {
405
+		var i WorkflowJob
406
+		if err := rows.Scan(
407
+			&i.ID,
408
+			&i.RunID,
409
+			&i.JobIndex,
410
+			&i.JobKey,
411
+			&i.JobName,
412
+			&i.RunsOn,
413
+			&i.RunnerID,
414
+			&i.NeedsJobs,
415
+			&i.IfExpr,
416
+			&i.TimeoutMinutes,
417
+			&i.Permissions,
418
+			&i.JobEnv,
419
+			&i.Status,
420
+			&i.Conclusion,
421
+			&i.CancelRequested,
422
+			&i.StartedAt,
423
+			&i.CompletedAt,
424
+			&i.Version,
425
+			&i.CreatedAt,
426
+			&i.UpdatedAt,
427
+		); err != nil {
428
+			return nil, err
429
+		}
430
+		items = append(items, i)
431
+	}
432
+	if err := rows.Err(); err != nil {
433
+		return nil, err
434
+	}
435
+	return items, nil
436
+}
437
+
306438
 const updateWorkflowJobStatus = `-- name: UpdateWorkflowJobStatus :one
307439
 UPDATE workflow_jobs
308440
 SET status = $2,
internal/actions/sqlc/workflow_steps.sql.gomodified
@@ -11,6 +11,65 @@ import (
1111
 	"github.com/jackc/pgx/v5/pgtype"
1212
 )
1313
 
14
+const cancelOpenWorkflowStepsForJob = `-- name: CancelOpenWorkflowStepsForJob :many
15
+UPDATE workflow_steps
16
+SET status = 'cancelled',
17
+    conclusion = 'cancelled',
18
+    started_at = COALESCE(started_at, now()),
19
+    completed_at = COALESCE(completed_at, now()),
20
+    version = version + 1,
21
+    updated_at = now()
22
+WHERE job_id = $1
23
+  AND status IN ('queued', 'running')
24
+RETURNING id, job_id, step_index, step_id, step_name, if_expr,
25
+          run_command, uses_alias, working_directory, step_env,
26
+          continue_on_error, status, conclusion, log_object_key,
27
+          log_byte_count, started_at, completed_at, version,
28
+          created_at, updated_at, step_with
29
+`
30
+
31
+func (q *Queries) CancelOpenWorkflowStepsForJob(ctx context.Context, db DBTX, jobID int64) ([]WorkflowStep, error) {
32
+	rows, err := db.Query(ctx, cancelOpenWorkflowStepsForJob, jobID)
33
+	if err != nil {
34
+		return nil, err
35
+	}
36
+	defer rows.Close()
37
+	items := []WorkflowStep{}
38
+	for rows.Next() {
39
+		var i WorkflowStep
40
+		if err := rows.Scan(
41
+			&i.ID,
42
+			&i.JobID,
43
+			&i.StepIndex,
44
+			&i.StepID,
45
+			&i.StepName,
46
+			&i.IfExpr,
47
+			&i.RunCommand,
48
+			&i.UsesAlias,
49
+			&i.WorkingDirectory,
50
+			&i.StepEnv,
51
+			&i.ContinueOnError,
52
+			&i.Status,
53
+			&i.Conclusion,
54
+			&i.LogObjectKey,
55
+			&i.LogByteCount,
56
+			&i.StartedAt,
57
+			&i.CompletedAt,
58
+			&i.Version,
59
+			&i.CreatedAt,
60
+			&i.UpdatedAt,
61
+			&i.StepWith,
62
+		); err != nil {
63
+			return nil, err
64
+		}
65
+		items = append(items, i)
66
+	}
67
+	if err := rows.Err(); err != nil {
68
+		return nil, err
69
+	}
70
+	return items, nil
71
+}
72
+
1473
 const getFirstStepForJob = `-- name: GetFirstStepForJob :one
1574
 SELECT id, job_id, step_index, step_id, step_name, if_expr,
1675
        run_command, uses_alias, working_directory, step_env,
internal/infra/metrics/metrics.gomodified
@@ -149,6 +149,13 @@ var (
149149
 		},
150150
 		[]string{"result"},
151151
 	)
152
+	ActionsJobsCancelledTotal = prometheus.NewCounterVec(
153
+		prometheus.CounterOpts{
154
+			Name: "shithub_actions_jobs_cancelled_total",
155
+			Help: "Total Actions job cancellation requests by reason (user, concurrency, timeout).",
156
+		},
157
+		[]string{"reason"},
158
+	)
152159
 	ActionsLogScrubReplacementsTotal = prometheus.NewCounterVec(
153160
 		prometheus.CounterOpts{
154161
 			Name: "shithub_actions_log_scrub_replacements_total",
@@ -176,6 +183,7 @@ func init() {
176183
 		ActionsRunnerRegistrationsTotal,
177184
 		ActionsRunnerHeartbeatsTotal,
178185
 		ActionsRunnerJWTTotal,
186
+		ActionsJobsCancelledTotal,
179187
 		ActionsLogScrubReplacementsTotal,
180188
 	)
181189
 }