tenseleyflow/shithub / 8523373

Browse files

actions/concurrency: enforce workflow concurrency groups

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
8523373ced0d73fe66f109dab20e8230e119ef2a
Parents
18171f0
Tree
b72f8e1

11 changed files

StatusFile+-
A internal/actions/concurrency/concurrency.go 139 0
A internal/actions/concurrency/resolve.go 78 0
A internal/actions/concurrency/resolve_test.go 49 0
M internal/actions/queries/workflow_jobs.sql 19 0
M internal/actions/queries/workflow_runs.sql 28 0
M internal/actions/sqlc/querier.go 5 0
M internal/actions/sqlc/workflow_jobs.sql.go 19 0
M internal/actions/sqlc/workflow_runs.sql.go 79 0
M internal/actions/trigger/enqueue.go 29 6
M internal/actions/trigger/enqueue_test.go 191 2
M internal/infra/metrics/metrics.go 7 0
internal/actions/concurrency/concurrency.goadded
@@ -0,0 +1,139 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+// Package concurrency owns workflow-level Actions concurrency groups.
4
+package concurrency
5
+
6
+import (
7
+	"context"
8
+	"errors"
9
+	"fmt"
10
+	"strings"
11
+	"unicode/utf8"
12
+
13
+	"github.com/tenseleyFlow/shithub/internal/actions/runstate"
14
+	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
15
+	"github.com/tenseleyFlow/shithub/internal/actions/workflow"
16
+)
17
+
18
+const (
19
+	// MaxGroupChars mirrors workflow_runs.concurrency_group's CHECK
20
+	// constraint. Enforcing it before insert gives workflow authors a useful
21
+	// error instead of a generic constraint failure.
22
+	MaxGroupChars = 256
23
+
24
+	// CancelReason is the metrics label used when a newer run cancels older
25
+	// group occupants.
26
+	CancelReason = "concurrency"
27
+)
28
+
29
+// ResolveInput carries the trigger-time context available to concurrency.group
30
+// expression evaluation. Secrets are intentionally not populated here.
31
+type ResolveInput struct {
32
+	Workflow     *workflow.Workflow
33
+	EventPayload map[string]any
34
+	HeadSHA      string
35
+	HeadRef      string
36
+}
37
+
38
+// Resolution is the trigger-time concurrency policy for one workflow run.
39
+type Resolution struct {
40
+	Group            string
41
+	CancelInProgress bool
42
+}
43
+
44
+// Resolve evaluates workflow.concurrency.group against the trigger context.
45
+func Resolve(in ResolveInput) (Resolution, error) {
46
+	if in.Workflow == nil {
47
+		return Resolution{}, errors.New("actions concurrency: nil Workflow")
48
+	}
49
+	c := in.Workflow.Concurrency
50
+	raw := strings.TrimSpace(c.Group.Raw)
51
+	if raw == "" {
52
+		return Resolution{}, nil
53
+	}
54
+	group, err := ResolveGroup(raw, EvalContext{
55
+		EventPayload: in.EventPayload,
56
+		HeadSHA:      in.HeadSHA,
57
+		HeadRef:      in.HeadRef,
58
+	})
59
+	if err != nil {
60
+		return Resolution{}, err
61
+	}
62
+	if group == "" {
63
+		return Resolution{}, nil
64
+	}
65
+	return Resolution{Group: group, CancelInProgress: c.CancelInProgress}, nil
66
+}
67
+
68
+// EnforceParams identifies a newly enqueued run whose concurrency group should
69
+// be checked against older active runs in the same repo.
70
+type EnforceParams struct {
71
+	Run              actionsdb.WorkflowRun
72
+	CancelInProgress bool
73
+}
74
+
75
+// EnforceResult summarizes what the slot manager observed and changed.
76
+type EnforceResult struct {
77
+	BlockingRuns  []actionsdb.WorkflowRun
78
+	CancelledJobs []actionsdb.WorkflowJob
79
+}
80
+
81
+// Enforce applies workflow-level concurrency rules. With cancel-in-progress it
82
+// requests cancellation for older active occupants. Without it, this only locks
83
+// and reports blockers; ClaimQueuedWorkflowJob keeps the newer run pending.
84
+func Enforce(
85
+	ctx context.Context,
86
+	q *actionsdb.Queries,
87
+	db actionsdb.DBTX,
88
+	p EnforceParams,
89
+) (EnforceResult, error) {
90
+	if q == nil {
91
+		return EnforceResult{}, errors.New("actions concurrency: nil Queries")
92
+	}
93
+	if strings.TrimSpace(p.Run.ConcurrencyGroup) == "" {
94
+		return EnforceResult{}, nil
95
+	}
96
+	blockers, err := q.ListBlockingConcurrencyRunsForUpdate(ctx, db, actionsdb.ListBlockingConcurrencyRunsForUpdateParams{
97
+		RepoID:           p.Run.RepoID,
98
+		ConcurrencyGroup: p.Run.ConcurrencyGroup,
99
+		RunID:            p.Run.ID,
100
+	})
101
+	if err != nil {
102
+		return EnforceResult{}, err
103
+	}
104
+	out := EnforceResult{BlockingRuns: blockers}
105
+	if !p.CancelInProgress || len(blockers) == 0 {
106
+		return out, nil
107
+	}
108
+	for _, blocker := range blockers {
109
+		changed, err := q.RequestWorkflowRunCancel(ctx, db, blocker.ID)
110
+		if err != nil {
111
+			return EnforceResult{}, err
112
+		}
113
+		for _, job := range changed {
114
+			if job.Status == actionsdb.WorkflowJobStatusCancelled {
115
+				if _, err := q.CancelOpenWorkflowStepsForJob(ctx, db, job.ID); err != nil {
116
+					return EnforceResult{}, err
117
+				}
118
+			}
119
+		}
120
+		if len(changed) > 0 {
121
+			if _, _, err := runstate.RollupAfterCancel(ctx, q, db, blocker.ID); err != nil {
122
+				return EnforceResult{}, err
123
+			}
124
+			out.CancelledJobs = append(out.CancelledJobs, changed...)
125
+		}
126
+	}
127
+	return out, nil
128
+}
129
+
130
+func validateGroup(group string) (string, error) {
131
+	group = strings.TrimSpace(group)
132
+	if group == "" {
133
+		return "", nil
134
+	}
135
+	if utf8.RuneCountInString(group) > MaxGroupChars {
136
+		return "", fmt.Errorf("actions concurrency: group exceeds %d characters", MaxGroupChars)
137
+	}
138
+	return group, nil
139
+}
internal/actions/concurrency/resolve.goadded
@@ -0,0 +1,78 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package concurrency
4
+
5
+import (
6
+	"fmt"
7
+	"strings"
8
+
9
+	"github.com/tenseleyFlow/shithub/internal/actions/expr"
10
+)
11
+
12
+// EvalContext is the limited trigger-time expression context for a
13
+// concurrency.group value. It deliberately excludes secrets.
14
+type EvalContext struct {
15
+	EventPayload map[string]any
16
+	HeadSHA      string
17
+	HeadRef      string
18
+}
19
+
20
+// ResolveGroup evaluates `${{ ... }}` fragments in a concurrency group value.
21
+// Literal text outside expressions is preserved.
22
+func ResolveGroup(raw string, in EvalContext) (string, error) {
23
+	ctx := expr.Context{
24
+		Shithub: expr.ShithubContext{
25
+			Event: in.EventPayload,
26
+			SHA:   in.HeadSHA,
27
+			Ref:   in.HeadRef,
28
+		},
29
+		Untrusted: expr.DefaultUntrusted(),
30
+	}
31
+	var out strings.Builder
32
+	if err := walkExpressions(raw, func(literal, body string) error {
33
+		if body == "" {
34
+			out.WriteString(literal)
35
+			return nil
36
+		}
37
+		out.WriteString(literal)
38
+		v, err := eval(body, &ctx)
39
+		if err != nil {
40
+			return err
41
+		}
42
+		out.WriteString(v.String())
43
+		return nil
44
+	}); err != nil {
45
+		return "", fmt.Errorf("actions concurrency: resolve group: %w", err)
46
+	}
47
+	return validateGroup(out.String())
48
+}
49
+
50
+func eval(body string, ctx *expr.Context) (expr.Value, error) {
51
+	toks, err := expr.Lex(strings.TrimSpace(body))
52
+	if err != nil {
53
+		return expr.Value{}, err
54
+	}
55
+	ast, err := expr.Parse(toks)
56
+	if err != nil {
57
+		return expr.Value{}, err
58
+	}
59
+	return expr.Eval(ast, ctx)
60
+}
61
+
62
+func walkExpressions(raw string, fn func(literal, body string) error) error {
63
+	for {
64
+		start := strings.Index(raw, "${{")
65
+		if start < 0 {
66
+			return fn(raw, "")
67
+		}
68
+		end := strings.Index(raw[start+3:], "}}")
69
+		if end < 0 {
70
+			return fmt.Errorf("render expression: missing closing }}")
71
+		}
72
+		end += start + 3
73
+		if err := fn(raw[:start], raw[start+3:end]); err != nil {
74
+			return err
75
+		}
76
+		raw = raw[end+2:]
77
+	}
78
+}
internal/actions/concurrency/resolve_test.goadded
@@ -0,0 +1,49 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package concurrency_test
4
+
5
+import (
6
+	"strings"
7
+	"testing"
8
+
9
+	"github.com/tenseleyFlow/shithub/internal/actions/concurrency"
10
+)
11
+
12
+func TestResolveGroup_EvaluatesTriggerContext(t *testing.T) {
13
+	t.Parallel()
14
+	got, err := concurrency.ResolveGroup("branch-${{ shithub.ref }}-${{ shithub.event.head_commit.id }}", concurrency.EvalContext{
15
+		EventPayload: map[string]any{
16
+			"head_commit": map[string]any{"id": "abc123"},
17
+		},
18
+		HeadSHA: "abc123",
19
+		HeadRef: "refs/heads/trunk",
20
+	})
21
+	if err != nil {
22
+		t.Fatalf("ResolveGroup: %v", err)
23
+	}
24
+	want := "branch-refs/heads/trunk-abc123"
25
+	if got != want {
26
+		t.Fatalf("group: got %q want %q", got, want)
27
+	}
28
+}
29
+
30
+func TestResolveGroup_GithubAliasAndMissingEventPath(t *testing.T) {
31
+	t.Parallel()
32
+	got, err := concurrency.ResolveGroup("${{ github.ref }}-${{ shithub.event.missing }}", concurrency.EvalContext{
33
+		HeadRef: "refs/heads/main",
34
+	})
35
+	if err != nil {
36
+		t.Fatalf("ResolveGroup: %v", err)
37
+	}
38
+	if got != "refs/heads/main-" {
39
+		t.Fatalf("group: got %q", got)
40
+	}
41
+}
42
+
43
+func TestResolveGroup_RejectsTooLongGroup(t *testing.T) {
44
+	t.Parallel()
45
+	_, err := concurrency.ResolveGroup(strings.Repeat("x", concurrency.MaxGroupChars+1), concurrency.EvalContext{})
46
+	if err == nil {
47
+		t.Fatal("ResolveGroup returned nil error for oversized group")
48
+	}
49
+}
internal/actions/queries/workflow_jobs.sqlmodified
@@ -102,7 +102,9 @@ WHERE runner_id = sqlc.arg(runner_id)::bigint AND status = 'running';
102102
 WITH candidate AS (
103103
     SELECT j.id
104104
     FROM workflow_jobs j
105
+    JOIN workflow_runs r ON r.id = j.run_id
105106
     WHERE j.status = 'queued'
107
+      AND r.status IN ('queued', 'running')
106108
       AND j.cancel_requested = false
107109
       AND j.runner_id IS NULL
108110
       AND (j.runs_on = '' OR j.runs_on = ANY(sqlc.arg(labels)::text[]))
@@ -113,6 +115,23 @@ WITH candidate AS (
113115
             AND dep.job_key = ANY(j.needs_jobs)
114116
             AND (dep.status <> 'completed' OR dep.conclusion <> 'success')
115117
       )
118
+      AND NOT EXISTS (
119
+          SELECT 1
120
+          FROM workflow_runs blocker
121
+          WHERE r.concurrency_group <> ''
122
+            AND blocker.repo_id = r.repo_id
123
+            AND blocker.concurrency_group = r.concurrency_group
124
+            AND blocker.id <> r.id
125
+            AND blocker.status IN ('queued', 'running')
126
+            AND (blocker.created_at, blocker.id) < (r.created_at, r.id)
127
+            AND EXISTS (
128
+                SELECT 1
129
+                FROM workflow_jobs blocker_job
130
+                WHERE blocker_job.run_id = blocker.id
131
+                  AND blocker_job.status IN ('queued', 'running')
132
+                  AND blocker_job.cancel_requested = false
133
+            )
134
+      )
116135
     ORDER BY j.created_at ASC, j.id ASC
117136
     FOR UPDATE OF j SKIP LOCKED
118137
     LIMIT 1
internal/actions/queries/workflow_runs.sqlmodified
@@ -62,6 +62,34 @@ SELECT id, repo_id, run_index, workflow_file, workflow_name,
6262
 FROM workflow_runs
6363
 WHERE id = $1;
6464
 
65
+-- name: ListBlockingConcurrencyRunsForUpdate :many
66
+-- Older queued/running runs with the same group block the new run while they
67
+-- still have at least one queued/running job that has not already received a
68
+-- cancel request. cancel-in-progress releases the slot by flipping that job
69
+-- flag even if the runner is still draining the old container.
70
+SELECT r.id, r.repo_id, r.run_index, r.workflow_file, r.workflow_name,
71
+       r.head_sha, r.head_ref, r.event, r.event_payload,
72
+       r.actor_user_id, r.parent_run_id, r.concurrency_group,
73
+       r.status, r.conclusion, r.pinned, r.need_approval, r.approved_by_user_id,
74
+       r.started_at, r.completed_at, r.version, r.created_at, r.updated_at, r.trigger_event_id
75
+FROM workflow_runs r
76
+JOIN workflow_runs current_run ON current_run.id = sqlc.arg(run_id)::bigint
77
+WHERE r.repo_id = sqlc.arg(repo_id)::bigint
78
+  AND r.concurrency_group = sqlc.arg(concurrency_group)::text
79
+  AND r.concurrency_group <> ''
80
+  AND r.id <> current_run.id
81
+  AND r.status IN ('queued', 'running')
82
+  AND (r.created_at, r.id) < (current_run.created_at, current_run.id)
83
+  AND EXISTS (
84
+      SELECT 1
85
+      FROM workflow_jobs j
86
+      WHERE j.run_id = r.id
87
+        AND j.status IN ('queued', 'running')
88
+        AND j.cancel_requested = false
89
+  )
90
+ORDER BY r.created_at ASC, r.id ASC
91
+FOR UPDATE OF r;
92
+
6593
 -- name: GetWorkflowRunForRepoByIndex :one
6694
 SELECT r.id, r.repo_id, r.run_index, r.workflow_file, r.workflow_name,
6795
        r.head_sha, r.head_ref, r.event, r.event_payload,
internal/actions/sqlc/querier.gomodified
@@ -67,6 +67,11 @@ type Querier interface {
6767
 	InsertWorkflowStep(ctx context.Context, db DBTX, arg InsertWorkflowStepParams) (WorkflowStep, error)
6868
 	ListAllStepLogChunksForStep(ctx context.Context, db DBTX, stepID int64) ([]WorkflowStepLogChunk, error)
6969
 	ListArtifactsForRun(ctx context.Context, db DBTX, runID int64) ([]ListArtifactsForRunRow, error)
70
+	// Older queued/running runs with the same group block the new run while they
71
+	// still have at least one queued/running job that has not already received a
72
+	// cancel request. cancel-in-progress releases the slot by flipping that job
73
+	// flag even if the runner is still draining the old container.
74
+	ListBlockingConcurrencyRunsForUpdate(ctx context.Context, db DBTX, arg ListBlockingConcurrencyRunsForUpdateParams) ([]WorkflowRun, error)
7075
 	ListExpiredWorkflowArtifactsForCleanup(ctx context.Context, db DBTX, arg ListExpiredWorkflowArtifactsForCleanupParams) ([]ListExpiredWorkflowArtifactsForCleanupRow, error)
7176
 	ListJobsForRun(ctx context.Context, db DBTX, runID int64) ([]ListJobsForRunRow, error)
7277
 	ListOrgSecrets(ctx context.Context, db DBTX, orgID pgtype.Int8) ([]ListOrgSecretsRow, error)
internal/actions/sqlc/workflow_jobs.sql.gomodified
@@ -15,7 +15,9 @@ const claimQueuedWorkflowJob = `-- name: ClaimQueuedWorkflowJob :one
1515
 WITH candidate AS (
1616
     SELECT j.id
1717
     FROM workflow_jobs j
18
+    JOIN workflow_runs r ON r.id = j.run_id
1819
     WHERE j.status = 'queued'
20
+      AND r.status IN ('queued', 'running')
1921
       AND j.cancel_requested = false
2022
       AND j.runner_id IS NULL
2123
       AND (j.runs_on = '' OR j.runs_on = ANY($1::text[]))
@@ -26,6 +28,23 @@ WITH candidate AS (
2628
             AND dep.job_key = ANY(j.needs_jobs)
2729
             AND (dep.status <> 'completed' OR dep.conclusion <> 'success')
2830
       )
31
+      AND NOT EXISTS (
32
+          SELECT 1
33
+          FROM workflow_runs blocker
34
+          WHERE r.concurrency_group <> ''
35
+            AND blocker.repo_id = r.repo_id
36
+            AND blocker.concurrency_group = r.concurrency_group
37
+            AND blocker.id <> r.id
38
+            AND blocker.status IN ('queued', 'running')
39
+            AND (blocker.created_at, blocker.id) < (r.created_at, r.id)
40
+            AND EXISTS (
41
+                SELECT 1
42
+                FROM workflow_jobs blocker_job
43
+                WHERE blocker_job.run_id = blocker.id
44
+                  AND blocker_job.status IN ('queued', 'running')
45
+                  AND blocker_job.cancel_requested = false
46
+            )
47
+      )
2948
     ORDER BY j.created_at ASC, j.id ASC
3049
     FOR UPDATE OF j SKIP LOCKED
3150
     LIMIT 1
internal/actions/sqlc/workflow_runs.sql.gomodified
@@ -398,6 +398,85 @@ func (q *Queries) InsertWorkflowRun(ctx context.Context, db DBTX, arg InsertWork
398398
 	return i, err
399399
 }
400400
 
401
+const listBlockingConcurrencyRunsForUpdate = `-- name: ListBlockingConcurrencyRunsForUpdate :many
402
+SELECT r.id, r.repo_id, r.run_index, r.workflow_file, r.workflow_name,
403
+       r.head_sha, r.head_ref, r.event, r.event_payload,
404
+       r.actor_user_id, r.parent_run_id, r.concurrency_group,
405
+       r.status, r.conclusion, r.pinned, r.need_approval, r.approved_by_user_id,
406
+       r.started_at, r.completed_at, r.version, r.created_at, r.updated_at, r.trigger_event_id
407
+FROM workflow_runs r
408
+JOIN workflow_runs current_run ON current_run.id = $1::bigint
409
+WHERE r.repo_id = $2::bigint
410
+  AND r.concurrency_group = $3::text
411
+  AND r.concurrency_group <> ''
412
+  AND r.id <> current_run.id
413
+  AND r.status IN ('queued', 'running')
414
+  AND (r.created_at, r.id) < (current_run.created_at, current_run.id)
415
+  AND EXISTS (
416
+      SELECT 1
417
+      FROM workflow_jobs j
418
+      WHERE j.run_id = r.id
419
+        AND j.status IN ('queued', 'running')
420
+        AND j.cancel_requested = false
421
+  )
422
+ORDER BY r.created_at ASC, r.id ASC
423
+FOR UPDATE OF r
424
+`
425
+
426
+type ListBlockingConcurrencyRunsForUpdateParams struct {
427
+	RunID            int64
428
+	RepoID           int64
429
+	ConcurrencyGroup string
430
+}
431
+
432
+// Older queued/running runs with the same group block the new run while they
433
+// still have at least one queued/running job that has not already received a
434
+// cancel request. cancel-in-progress releases the slot by flipping that job
435
+// flag even if the runner is still draining the old container.
436
+func (q *Queries) ListBlockingConcurrencyRunsForUpdate(ctx context.Context, db DBTX, arg ListBlockingConcurrencyRunsForUpdateParams) ([]WorkflowRun, error) {
437
+	rows, err := db.Query(ctx, listBlockingConcurrencyRunsForUpdate, arg.RunID, arg.RepoID, arg.ConcurrencyGroup)
438
+	if err != nil {
439
+		return nil, err
440
+	}
441
+	defer rows.Close()
442
+	items := []WorkflowRun{}
443
+	for rows.Next() {
444
+		var i WorkflowRun
445
+		if err := rows.Scan(
446
+			&i.ID,
447
+			&i.RepoID,
448
+			&i.RunIndex,
449
+			&i.WorkflowFile,
450
+			&i.WorkflowName,
451
+			&i.HeadSha,
452
+			&i.HeadRef,
453
+			&i.Event,
454
+			&i.EventPayload,
455
+			&i.ActorUserID,
456
+			&i.ParentRunID,
457
+			&i.ConcurrencyGroup,
458
+			&i.Status,
459
+			&i.Conclusion,
460
+			&i.Pinned,
461
+			&i.NeedApproval,
462
+			&i.ApprovedByUserID,
463
+			&i.StartedAt,
464
+			&i.CompletedAt,
465
+			&i.Version,
466
+			&i.CreatedAt,
467
+			&i.UpdatedAt,
468
+			&i.TriggerEventID,
469
+		); err != nil {
470
+			return nil, err
471
+		}
472
+		items = append(items, i)
473
+	}
474
+	if err := rows.Err(); err != nil {
475
+		return nil, err
476
+	}
477
+	return items, nil
478
+}
479
+
401480
 const listWorkflowRunWorkflowsForRepo = `-- name: ListWorkflowRunWorkflowsForRepo :many
402481
 WITH ranked AS (
403482
     SELECT workflow_file,
internal/actions/trigger/enqueue.gomodified
@@ -13,6 +13,8 @@ import (
1313
 	"github.com/jackc/pgx/v5/pgtype"
1414
 	"github.com/jackc/pgx/v5/pgxpool"
1515
 
16
+	"github.com/tenseleyFlow/shithub/internal/actions/checksync"
17
+	"github.com/tenseleyFlow/shithub/internal/actions/concurrency"
1618
 	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
1719
 	"github.com/tenseleyFlow/shithub/internal/actions/workflow"
1820
 	"github.com/tenseleyFlow/shithub/internal/checks"
@@ -72,9 +74,8 @@ type EnqueueParams struct {
7274
 	TriggerEventID string
7375
 
7476
 	// Workflow is the parsed workflow. The matching jobs/steps are
75
-	// persisted; concurrency.group is captured at trigger time as a
76
-	// string (expression resolution against the event context lands
77
-	// in S41g when the slot manager actually consumes it).
77
+	// persisted; concurrency.group is resolved against the trigger
78
+	// context and enforced before runners can claim younger jobs.
7879
 	Workflow *workflow.Workflow
7980
 }
8081
 
@@ -110,6 +111,15 @@ func Enqueue(ctx context.Context, deps Deps, p EnqueueParams) (Result, error) {
110111
 	if err := validateParams(&p); err != nil {
111112
 		return Result{}, err
112113
 	}
114
+	concurrencyResolution, err := concurrency.Resolve(concurrency.ResolveInput{
115
+		Workflow:     p.Workflow,
116
+		EventPayload: p.EventPayload,
117
+		HeadSHA:      p.HeadSHA,
118
+		HeadRef:      p.HeadRef,
119
+	})
120
+	if err != nil {
121
+		return Result{}, fmt.Errorf("trigger: concurrency: %w", err)
122
+	}
113123
 
114124
 	q := actionsdb.New()
115125
 
@@ -138,8 +148,6 @@ func Enqueue(ctx context.Context, deps Deps, p EnqueueParams) (Result, error) {
138148
 		return Result{}, fmt.Errorf("trigger: marshal event payload: %w", err)
139149
 	}
140150
 
141
-	concurrencyGroup := p.Workflow.Concurrency.Group.Raw
142
-
143151
 	run, err := q.EnqueueWorkflowRun(ctx, tx, actionsdb.EnqueueWorkflowRunParams{
144152
 		RepoID:           p.RepoID,
145153
 		RunIndex:         runIndex,
@@ -151,7 +159,7 @@ func Enqueue(ctx context.Context, deps Deps, p EnqueueParams) (Result, error) {
151159
 		EventPayload:     payloadBytes,
152160
 		ActorUserID:      pgInt8(p.ActorUserID),
153161
 		ParentRunID:      pgInt8(p.ParentRunID),
154
-		ConcurrencyGroup: concurrencyGroup,
162
+		ConcurrencyGroup: concurrencyResolution.Group,
155163
 		NeedApproval:     false,
156164
 		TriggerEventID:   p.TriggerEventID,
157165
 	})
@@ -238,11 +246,26 @@ func Enqueue(ctx context.Context, deps Deps, p EnqueueParams) (Result, error) {
238246
 		}
239247
 	}
240248
 
249
+	concurrencyResult, err := concurrency.Enforce(ctx, q, tx, concurrency.EnforceParams{
250
+		Run:              run,
251
+		CancelInProgress: concurrencyResolution.CancelInProgress,
252
+	})
253
+	if err != nil {
254
+		return Result{}, fmt.Errorf("trigger: enforce concurrency: %w", err)
255
+	}
256
+
241257
 	if err := tx.Commit(ctx); err != nil {
242258
 		return Result{}, fmt.Errorf("trigger: commit run tx: %w", err)
243259
 	}
244260
 	committed = true
245261
 
262
+	if len(concurrencyResult.CancelledJobs) > 0 {
263
+		metrics.ActionsJobsCancelledTotal.WithLabelValues(concurrency.CancelReason).Add(float64(len(concurrencyResult.CancelledJobs)))
264
+		checksync.ChangedJobs(ctx, checksync.Deps{Pool: deps.Pool, Logger: deps.Logger}, concurrencyResult.CancelledJobs)
265
+	} else if !concurrencyResolution.CancelInProgress && len(concurrencyResult.BlockingRuns) > 0 {
266
+		metrics.ActionsConcurrencyQueuedTotal.Inc()
267
+	}
268
+
246269
 	// check_run rows: separate concern, post-commit so the run+jobs+
247270
 	// steps are durable before we touch a different subsystem. ExternalID
248271
 	// idempotency means a retry of just this phase converges cleanly.
internal/actions/trigger/enqueue_test.gomodified
@@ -5,6 +5,7 @@ package trigger_test
55
 import (
66
 	"context"
77
 	"errors"
8
+	"fmt"
89
 	"io"
910
 	"log/slog"
1011
 	"strings"
@@ -64,7 +65,7 @@ func setupEnq(t *testing.T) enqFx {
6465
 // steps. Used by every enqueue test.
6566
 func fixtureWorkflow(t *testing.T) *workflow.Workflow {
6667
 	t.Helper()
67
-	src := []byte(`name: ci
68
+	return workflowFromYAML(t, `name: ci
6869
 on: push
6970
 jobs:
7071
   build:
@@ -73,7 +74,27 @@ jobs:
7374
       - uses: actions/checkout@v4
7475
       - run: echo hello
7576
 `)
76
-	w, diags, err := workflow.Parse(src)
77
+}
78
+
79
+func concurrencyWorkflow(t *testing.T, group string, cancelInProgress bool) *workflow.Workflow {
80
+	t.Helper()
81
+	return workflowFromYAML(t, fmt.Sprintf(`name: ci
82
+on: push
83
+concurrency:
84
+  group: "%s"
85
+  cancel-in-progress: %t
86
+jobs:
87
+  build:
88
+    runs-on: ubuntu-latest
89
+    steps:
90
+      - uses: actions/checkout@v4
91
+      - run: echo hello
92
+`, group, cancelInProgress))
93
+}
94
+
95
+func workflowFromYAML(t *testing.T, src string) *workflow.Workflow {
96
+	t.Helper()
97
+	w, diags, err := workflow.Parse([]byte(src))
7798
 	if err != nil {
7899
 		t.Fatalf("parse fixture: %v", err)
79100
 	}
@@ -124,6 +145,174 @@ func TestEnqueue_HappyPath(t *testing.T) {
124145
 	}
125146
 }
126147
 
148
+func TestEnqueue_ResolvesConcurrencyGroupExpression(t *testing.T) {
149
+	f := setupEnq(t)
150
+	ctx := context.Background()
151
+	res, err := trigger.Enqueue(ctx, f.deps, trigger.EnqueueParams{
152
+		RepoID:         f.repoID,
153
+		WorkflowFile:   ".shithub/workflows/ci.yml",
154
+		HeadSHA:        strings.Repeat("a", 40),
155
+		HeadRef:        "refs/heads/feature",
156
+		EventKind:      trigger.EventPush,
157
+		EventPayload:   map[string]any{"ref": "refs/heads/feature"},
158
+		ActorUserID:    f.userID,
159
+		TriggerEventID: "push:concurrency-expr",
160
+		Workflow:       concurrencyWorkflow(t, "branch-${{ shithub.ref }}", false),
161
+	})
162
+	if err != nil {
163
+		t.Fatalf("Enqueue: %v", err)
164
+	}
165
+	run, err := actionsdb.New().GetWorkflowRunByID(ctx, f.pool, res.RunID)
166
+	if err != nil {
167
+		t.Fatalf("GetWorkflowRunByID: %v", err)
168
+	}
169
+	if run.ConcurrencyGroup != "branch-refs/heads/feature" {
170
+		t.Fatalf("concurrency_group: got %q", run.ConcurrencyGroup)
171
+	}
172
+}
173
+
174
+func TestEnqueue_CancelInProgressCancelsOlderQueuedRun(t *testing.T) {
175
+	f := setupEnq(t)
176
+	ctx := context.Background()
177
+	q := actionsdb.New()
178
+	first, err := trigger.Enqueue(ctx, f.deps, trigger.EnqueueParams{
179
+		RepoID:         f.repoID,
180
+		WorkflowFile:   ".shithub/workflows/ci.yml",
181
+		HeadSHA:        strings.Repeat("a", 40),
182
+		HeadRef:        "refs/heads/trunk",
183
+		EventKind:      trigger.EventPush,
184
+		EventPayload:   map[string]any{"ref": "refs/heads/trunk"},
185
+		ActorUserID:    f.userID,
186
+		TriggerEventID: "push:concurrency-cancel-1",
187
+		Workflow:       concurrencyWorkflow(t, "${{ shithub.ref }}", false),
188
+	})
189
+	if err != nil {
190
+		t.Fatalf("first Enqueue: %v", err)
191
+	}
192
+	second, err := trigger.Enqueue(ctx, f.deps, trigger.EnqueueParams{
193
+		RepoID:         f.repoID,
194
+		WorkflowFile:   ".shithub/workflows/ci.yml",
195
+		HeadSHA:        strings.Repeat("b", 40),
196
+		HeadRef:        "refs/heads/trunk",
197
+		EventKind:      trigger.EventPush,
198
+		EventPayload:   map[string]any{"ref": "refs/heads/trunk"},
199
+		ActorUserID:    f.userID,
200
+		TriggerEventID: "push:concurrency-cancel-2",
201
+		Workflow:       concurrencyWorkflow(t, "${{ shithub.ref }}", true),
202
+	})
203
+	if err != nil {
204
+		t.Fatalf("second Enqueue: %v", err)
205
+	}
206
+	oldRun, err := q.GetWorkflowRunByID(ctx, f.pool, first.RunID)
207
+	if err != nil {
208
+		t.Fatalf("GetWorkflowRunByID old: %v", err)
209
+	}
210
+	if oldRun.Status != actionsdb.WorkflowRunStatusCompleted ||
211
+		!oldRun.Conclusion.Valid ||
212
+		oldRun.Conclusion.CheckConclusion != actionsdb.CheckConclusionCancelled {
213
+		t.Fatalf("old run not cancelled: %+v", oldRun)
214
+	}
215
+	oldJobs, err := q.ListJobsForRun(ctx, f.pool, first.RunID)
216
+	if err != nil {
217
+		t.Fatalf("ListJobsForRun old: %v", err)
218
+	}
219
+	if len(oldJobs) != 1 || oldJobs[0].Status != actionsdb.WorkflowJobStatusCancelled {
220
+		t.Fatalf("old jobs not cancelled: %+v", oldJobs)
221
+	}
222
+	oldSteps, err := q.ListStepsForJob(ctx, f.pool, oldJobs[0].ID)
223
+	if err != nil {
224
+		t.Fatalf("ListStepsForJob old: %v", err)
225
+	}
226
+	for _, step := range oldSteps {
227
+		if step.Status != actionsdb.WorkflowStepStatusCancelled {
228
+			t.Fatalf("step %d status: got %s want cancelled", step.ID, step.Status)
229
+		}
230
+	}
231
+	newRun, err := q.GetWorkflowRunByID(ctx, f.pool, second.RunID)
232
+	if err != nil {
233
+		t.Fatalf("GetWorkflowRunByID new: %v", err)
234
+	}
235
+	if newRun.Status != actionsdb.WorkflowRunStatusQueued {
236
+		t.Fatalf("new run status: got %s want queued", newRun.Status)
237
+	}
238
+}
239
+
240
+func TestClaimQueuedWorkflowJob_BlocksYoungerConcurrencyRun(t *testing.T) {
241
+	f := setupEnq(t)
242
+	ctx := context.Background()
243
+	q := actionsdb.New()
244
+	first, err := trigger.Enqueue(ctx, f.deps, trigger.EnqueueParams{
245
+		RepoID:         f.repoID,
246
+		WorkflowFile:   ".shithub/workflows/ci.yml",
247
+		HeadSHA:        strings.Repeat("c", 40),
248
+		HeadRef:        "refs/heads/trunk",
249
+		EventKind:      trigger.EventPush,
250
+		EventPayload:   map[string]any{"ref": "refs/heads/trunk"},
251
+		ActorUserID:    f.userID,
252
+		TriggerEventID: "push:concurrency-block-1",
253
+		Workflow:       concurrencyWorkflow(t, "${{ shithub.ref }}", false),
254
+	})
255
+	if err != nil {
256
+		t.Fatalf("first Enqueue: %v", err)
257
+	}
258
+	second, err := trigger.Enqueue(ctx, f.deps, trigger.EnqueueParams{
259
+		RepoID:         f.repoID,
260
+		WorkflowFile:   ".shithub/workflows/ci.yml",
261
+		HeadSHA:        strings.Repeat("d", 40),
262
+		HeadRef:        "refs/heads/trunk",
263
+		EventKind:      trigger.EventPush,
264
+		EventPayload:   map[string]any{"ref": "refs/heads/trunk"},
265
+		ActorUserID:    f.userID,
266
+		TriggerEventID: "push:concurrency-block-2",
267
+		Workflow:       concurrencyWorkflow(t, "${{ shithub.ref }}", false),
268
+	})
269
+	if err != nil {
270
+		t.Fatalf("second Enqueue: %v", err)
271
+	}
272
+	runner, err := q.InsertRunner(ctx, f.pool, actionsdb.InsertRunnerParams{
273
+		Name:     "runner-block",
274
+		Labels:   []string{"ubuntu-latest"},
275
+		Capacity: 2,
276
+	})
277
+	if err != nil {
278
+		t.Fatalf("InsertRunner: %v", err)
279
+	}
280
+	claimed, err := q.ClaimQueuedWorkflowJob(ctx, f.pool, actionsdb.ClaimQueuedWorkflowJobParams{
281
+		Labels:   []string{"ubuntu-latest"},
282
+		RunnerID: runner.ID,
283
+	})
284
+	if err != nil {
285
+		t.Fatalf("first ClaimQueuedWorkflowJob: %v", err)
286
+	}
287
+	if claimed.RunID != first.RunID {
288
+		t.Fatalf("claimed run: got %d want first run %d", claimed.RunID, first.RunID)
289
+	}
290
+	_, err = q.ClaimQueuedWorkflowJob(ctx, f.pool, actionsdb.ClaimQueuedWorkflowJobParams{
291
+		Labels:   []string{"ubuntu-latest"},
292
+		RunnerID: runner.ID,
293
+	})
294
+	if !errors.Is(err, pgx.ErrNoRows) {
295
+		t.Fatalf("second claim error: got %v want pgx.ErrNoRows", err)
296
+	}
297
+	changed, err := q.RequestWorkflowRunCancel(ctx, f.pool, first.RunID)
298
+	if err != nil {
299
+		t.Fatalf("RequestWorkflowRunCancel: %v", err)
300
+	}
301
+	if len(changed) != 1 || !changed[0].CancelRequested {
302
+		t.Fatalf("cancel request did not release blocker: %+v", changed)
303
+	}
304
+	released, err := q.ClaimQueuedWorkflowJob(ctx, f.pool, actionsdb.ClaimQueuedWorkflowJobParams{
305
+		Labels:   []string{"ubuntu-latest"},
306
+		RunnerID: runner.ID,
307
+	})
308
+	if err != nil {
309
+		t.Fatalf("claim after cancel request: %v", err)
310
+	}
311
+	if released.RunID != second.RunID {
312
+		t.Fatalf("released claim run: got %d want second run %d", released.RunID, second.RunID)
313
+	}
314
+}
315
+
127316
 func TestEnqueue_IdempotentSecondCall(t *testing.T) {
128317
 	f := setupEnq(t)
129318
 	ctx := context.Background()
internal/infra/metrics/metrics.gomodified
@@ -156,6 +156,12 @@ var (
156156
 		},
157157
 		[]string{"reason"},
158158
 	)
159
+	ActionsConcurrencyQueuedTotal = prometheus.NewCounter(
160
+		prometheus.CounterOpts{
161
+			Name: "shithub_actions_concurrency_queued_total",
162
+			Help: "Total Actions workflow runs queued behind an older active run in the same concurrency group.",
163
+		},
164
+	)
159165
 	ActionsLogScrubReplacementsTotal = prometheus.NewCounterVec(
160166
 		prometheus.CounterOpts{
161167
 			Name: "shithub_actions_log_scrub_replacements_total",
@@ -197,6 +203,7 @@ func init() {
197203
 		ActionsRunnerHeartbeatsTotal,
198204
 		ActionsRunnerJWTTotal,
199205
 		ActionsJobsCancelledTotal,
206
+		ActionsConcurrencyQueuedTotal,
200207
 		ActionsLogScrubReplacementsTotal,
201208
 		ActionsRunsPrunedTotal,
202209
 		ActionsStepTimeoutsTotal,