tenseleyflow/shithub / e85a1c4

Browse files

api/runners: heartbeat claims queued jobs (S41c)

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
e85a1c4184ee3c8580997298ce9724573eac4400
Parents
ce48194
Tree
613dae7

17 changed files

StatusFile+-
M cmd/shithubd/admin_runner.go 2 25
M internal/actions/queries/workflow_jobs.sql 48 0
M internal/actions/queries/workflow_runners.sql 18 0
M internal/actions/queries/workflow_runs.sql 8 0
M internal/actions/queries/workflow_steps.sql 9 0
A internal/actions/runnerlabels/labels.go 47 0
A internal/actions/runnerlabels/labels_test.go 33 0
M internal/actions/sqlc/querier.go 6 0
M internal/actions/sqlc/workflow_jobs.sql.go 127 0
M internal/actions/sqlc/workflow_runners.sql.go 66 0
M internal/actions/sqlc/workflow_runs.sql.go 14 0
M internal/actions/sqlc/workflow_steps.sql.go 70 0
M internal/web/auth_wiring.go 14 3
M internal/web/handlers/api/api.go 21 2
A internal/web/handlers/api/runners.go 323 0
A internal/web/handlers/api/runners_test.go 240 0
M internal/web/server.go 12 1
cmd/shithubd/admin_runner.gomodified
@@ -6,7 +6,6 @@ import (
66
 	"context"
77
 	"errors"
88
 	"fmt"
9
-	"regexp"
109
 	"strconv"
1110
 	"strings"
1211
 	"text/tabwriter"
@@ -16,14 +15,13 @@ import (
1615
 	"github.com/jackc/pgx/v5/pgxpool"
1716
 	"github.com/spf13/cobra"
1817
 
18
+	"github.com/tenseleyFlow/shithub/internal/actions/runnerlabels"
1919
 	"github.com/tenseleyFlow/shithub/internal/actions/runnertoken"
2020
 	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
2121
 	"github.com/tenseleyFlow/shithub/internal/infra/config"
2222
 	"github.com/tenseleyFlow/shithub/internal/infra/db"
2323
 )
2424
 
25
-var runnerLabelRE = regexp.MustCompile(`^[A-Za-z0-9_.-]+$`)
26
-
2725
 func newAdminRunnerCmd() *cobra.Command {
2826
 	cmd := &cobra.Command{
2927
 		Use:   "runner",
@@ -206,28 +204,7 @@ func openAdminRunnerPool(ctx context.Context, cfg config.Config, op string) (*pg
206204
 }
207205
 
208206
 func parseRunnerLabels(raw string) ([]string, error) {
209
-	raw = strings.TrimSpace(raw)
210
-	if raw == "" {
211
-		return []string{}, nil
212
-	}
213
-	parts := strings.Split(raw, ",")
214
-	seen := make(map[string]struct{}, len(parts))
215
-	labels := make([]string, 0, len(parts))
216
-	for _, part := range parts {
217
-		label := strings.TrimSpace(part)
218
-		if label == "" {
219
-			return nil, errors.New("admin runner: labels must not contain empty entries")
220
-		}
221
-		if len(label) > 100 || !runnerLabelRE.MatchString(label) {
222
-			return nil, fmt.Errorf("admin runner: invalid label %q", label)
223
-		}
224
-		if _, ok := seen[label]; ok {
225
-			continue
226
-		}
227
-		seen[label] = struct{}{}
228
-		labels = append(labels, label)
229
-	}
230
-	return labels, nil
207
+	return runnerlabels.ParseCSV(raw)
231208
 }
232209
 
233210
 func init() {
internal/actions/queries/workflow_jobs.sqlmodified
@@ -21,6 +21,54 @@ SELECT id, run_id, job_index, job_key, job_name, runs_on,
2121
 FROM workflow_jobs
2222
 WHERE id = $1;
2323
 
24
+-- name: CountRunningJobsForRunner :one
25
+SELECT COUNT(*)::integer
26
+FROM workflow_jobs
27
+WHERE runner_id = sqlc.arg(runner_id)::bigint AND status = 'running';
28
+
29
+-- name: ClaimQueuedWorkflowJob :one
30
+WITH candidate AS (
31
+    SELECT j.id
32
+    FROM workflow_jobs j
33
+    WHERE j.status = 'queued'
34
+      AND j.runner_id IS NULL
35
+      AND (j.runs_on = '' OR j.runs_on = ANY(sqlc.arg(labels)::text[]))
36
+      AND NOT EXISTS (
37
+          SELECT 1
38
+          FROM workflow_jobs dep
39
+          WHERE dep.run_id = j.run_id
40
+            AND dep.job_key = ANY(j.needs_jobs)
41
+            AND (dep.status <> 'completed' OR dep.conclusion <> 'success')
42
+      )
43
+    ORDER BY j.created_at ASC, j.id ASC
44
+    FOR UPDATE OF j SKIP LOCKED
45
+    LIMIT 1
46
+),
47
+claimed AS (
48
+    UPDATE workflow_jobs j
49
+    SET runner_id = sqlc.arg(runner_id)::bigint,
50
+        status = 'running',
51
+        started_at = COALESCE(j.started_at, now()),
52
+        version = j.version + 1,
53
+        updated_at = now()
54
+    FROM candidate c
55
+    WHERE j.id = c.id
56
+    RETURNING j.id, j.run_id, j.job_index, j.job_key, j.job_name, j.runs_on,
57
+              j.runner_id, j.needs_jobs, j.if_expr, j.timeout_minutes,
58
+              j.permissions, j.job_env, j.status, j.conclusion,
59
+              j.cancel_requested, j.started_at, j.completed_at, j.version,
60
+              j.created_at, j.updated_at
61
+)
62
+SELECT c.id, c.run_id, c.job_index, c.job_key, c.job_name, c.runs_on,
63
+       c.runner_id, c.needs_jobs, c.if_expr, c.timeout_minutes,
64
+       c.permissions, c.job_env, c.status, c.conclusion,
65
+       c.cancel_requested, c.started_at, c.completed_at, c.version,
66
+       c.created_at, c.updated_at,
67
+       r.repo_id, r.run_index, r.workflow_file, r.workflow_name,
68
+       r.head_sha, r.head_ref, r.event
69
+FROM claimed c
70
+JOIN workflow_runs r ON r.id = c.run_id;
71
+
2472
 -- name: ListJobsForRun :many
2573
 SELECT id, run_id, job_index, job_key, job_name, runs_on, status,
2674
        conclusion, started_at, completed_at, created_at
internal/actions/queries/workflow_runners.sqlmodified
@@ -23,6 +23,24 @@ SELECT id, name, labels, capacity, status, last_heartbeat_at, created_at
2323
 FROM workflow_runners
2424
 ORDER BY name ASC;
2525
 
26
+-- name: LockRunnerByID :one
27
+SELECT id, name, labels, capacity, status, last_heartbeat_at,
28
+       registered_by_user_id, created_at, updated_at
29
+FROM workflow_runners
30
+WHERE id = $1
31
+FOR UPDATE;
32
+
33
+-- name: HeartbeatRunner :one
34
+UPDATE workflow_runners
35
+SET labels = $2,
36
+    capacity = $3,
37
+    last_heartbeat_at = now(),
38
+    status = $4,
39
+    updated_at = now()
40
+WHERE id = $1
41
+RETURNING id, name, labels, capacity, status, last_heartbeat_at,
42
+          registered_by_user_id, created_at, updated_at;
43
+
2644
 -- name: TouchRunnerHeartbeat :exec
2745
 UPDATE workflow_runners
2846
 SET last_heartbeat_at = now(),
internal/actions/queries/workflow_runs.sqlmodified
@@ -62,6 +62,14 @@ SELECT id, repo_id, run_index, workflow_file, workflow_name,
6262
 FROM workflow_runs
6363
 WHERE id = $1;
6464
 
65
+-- name: MarkWorkflowRunRunning :exec
66
+UPDATE workflow_runs
67
+SET status = 'running',
68
+    started_at = COALESCE(started_at, now()),
69
+    version = version + 1,
70
+    updated_at = now()
71
+WHERE id = $1 AND status = 'queued';
72
+
6573
 -- name: NextRunIndexForRepo :one
6674
 -- Atomic next-index emitter: take the max + 1 for this repo. Pairs
6775
 -- with the (repo_id, run_index) UNIQUE so concurrent inserts that
internal/actions/queries/workflow_steps.sqlmodified
@@ -23,6 +23,15 @@ SELECT id, job_id, step_index, step_id, step_name, if_expr,
2323
 FROM workflow_steps
2424
 WHERE id = $1;
2525
 
26
+-- name: ListRunnerStepsForJob :many
27
+SELECT id, job_id, step_index, step_id, step_name, if_expr,
28
+       run_command, uses_alias, working_directory, step_env,
29
+       continue_on_error, status, conclusion, log_byte_count,
30
+       started_at, completed_at, created_at, step_with
31
+FROM workflow_steps
32
+WHERE job_id = $1
33
+ORDER BY step_index ASC;
34
+
2635
 -- name: ListStepsForJob :many
2736
 SELECT id, job_id, step_index, step_id, step_name, run_command,
2837
        uses_alias, status, conclusion, log_byte_count,
internal/actions/runnerlabels/labels.goadded
@@ -0,0 +1,47 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+// Package runnerlabels normalizes Actions runner labels for registration and
4
+// heartbeat matching.
5
+package runnerlabels
6
+
7
+import (
8
+	"errors"
9
+	"fmt"
10
+	"regexp"
11
+	"strings"
12
+)
13
+
14
+var labelRE = regexp.MustCompile(`^[A-Za-z0-9_.-]+$`)
15
+
16
+// ParseCSV parses a comma-separated label list.
17
+func ParseCSV(raw string) ([]string, error) {
18
+	raw = strings.TrimSpace(raw)
19
+	if raw == "" {
20
+		return []string{}, nil
21
+	}
22
+	return Normalize(strings.Split(raw, ","))
23
+}
24
+
25
+// Normalize trims, validates, and de-duplicates labels while preserving order.
26
+func Normalize(labels []string) ([]string, error) {
27
+	if len(labels) == 0 {
28
+		return []string{}, nil
29
+	}
30
+	seen := make(map[string]struct{}, len(labels))
31
+	out := make([]string, 0, len(labels))
32
+	for _, label := range labels {
33
+		label = strings.TrimSpace(label)
34
+		if label == "" {
35
+			return nil, errors.New("runner labels must not contain empty entries")
36
+		}
37
+		if len(label) > 100 || !labelRE.MatchString(label) {
38
+			return nil, fmt.Errorf("invalid runner label %q", label)
39
+		}
40
+		if _, ok := seen[label]; ok {
41
+			continue
42
+		}
43
+		seen[label] = struct{}{}
44
+		out = append(out, label)
45
+	}
46
+	return out, nil
47
+}
internal/actions/runnerlabels/labels_test.goadded
@@ -0,0 +1,33 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package runnerlabels_test
4
+
5
+import (
6
+	"reflect"
7
+	"testing"
8
+
9
+	"github.com/tenseleyFlow/shithub/internal/actions/runnerlabels"
10
+)
11
+
12
+func TestParseCSV(t *testing.T) {
13
+	got, err := runnerlabels.ParseCSV(" self-hosted, linux,linux,ubuntu-24.04 ")
14
+	if err != nil {
15
+		t.Fatalf("ParseCSV: %v", err)
16
+	}
17
+	want := []string{"self-hosted", "linux", "ubuntu-24.04"}
18
+	if !reflect.DeepEqual(got, want) {
19
+		t.Fatalf("labels: got %#v, want %#v", got, want)
20
+	}
21
+}
22
+
23
+func TestNormalizeRejectsInvalidLabels(t *testing.T) {
24
+	for _, labels := range [][]string{
25
+		{"linux", ""},
26
+		{"has space"},
27
+		{"semi;colon"},
28
+	} {
29
+		if _, err := runnerlabels.Normalize(labels); err == nil {
30
+			t.Fatalf("Normalize(%#v) returned nil error", labels)
31
+		}
32
+	}
33
+}
internal/actions/sqlc/querier.gomodified
@@ -13,6 +13,8 @@ import (
1313
 type Querier interface {
1414
 	// SPDX-License-Identifier: AGPL-3.0-or-later
1515
 	AppendStepLogChunk(ctx context.Context, db DBTX, arg AppendStepLogChunkParams) (AppendStepLogChunkRow, error)
16
+	ClaimQueuedWorkflowJob(ctx context.Context, db DBTX, arg ClaimQueuedWorkflowJobParams) (ClaimQueuedWorkflowJobRow, error)
17
+	CountRunningJobsForRunner(ctx context.Context, db DBTX, runnerID int64) (int32, error)
1618
 	DeleteExpiredArtifacts(ctx context.Context, db DBTX) ([]DeleteExpiredArtifactsRow, error)
1719
 	DeleteExpiredRunnerJWTUses(ctx context.Context, db DBTX) error
1820
 	DeleteOrgSecret(ctx context.Context, db DBTX, arg DeleteOrgSecretParams) error
@@ -41,6 +43,7 @@ type Querier interface {
4143
 	GetWorkflowJobByID(ctx context.Context, db DBTX, id int64) (WorkflowJob, error)
4244
 	GetWorkflowRunByID(ctx context.Context, db DBTX, id int64) (WorkflowRun, error)
4345
 	GetWorkflowStepByID(ctx context.Context, db DBTX, id int64) (WorkflowStep, error)
46
+	HeartbeatRunner(ctx context.Context, db DBTX, arg HeartbeatRunnerParams) (WorkflowRunner, error)
4447
 	// SPDX-License-Identifier: AGPL-3.0-or-later
4548
 	InsertArtifact(ctx context.Context, db DBTX, arg InsertArtifactParams) (WorkflowArtifact, error)
4649
 	// SPDX-License-Identifier: AGPL-3.0-or-later
@@ -58,10 +61,12 @@ type Querier interface {
5861
 	ListOrgVariables(ctx context.Context, db DBTX, orgID pgtype.Int8) ([]ListOrgVariablesRow, error)
5962
 	ListRepoSecrets(ctx context.Context, db DBTX, repoID pgtype.Int8) ([]ListRepoSecretsRow, error)
6063
 	ListRepoVariables(ctx context.Context, db DBTX, repoID pgtype.Int8) ([]ListRepoVariablesRow, error)
64
+	ListRunnerStepsForJob(ctx context.Context, db DBTX, jobID int64) ([]ListRunnerStepsForJobRow, error)
6165
 	ListRunners(ctx context.Context, db DBTX) ([]ListRunnersRow, error)
6266
 	ListStepLogChunks(ctx context.Context, db DBTX, arg ListStepLogChunksParams) ([]WorkflowStepLogChunk, error)
6367
 	ListStepsForJob(ctx context.Context, db DBTX, jobID int64) ([]ListStepsForJobRow, error)
6468
 	ListWorkflowRunsForRepo(ctx context.Context, db DBTX, arg ListWorkflowRunsForRepoParams) ([]ListWorkflowRunsForRepoRow, error)
69
+	LockRunnerByID(ctx context.Context, db DBTX, id int64) (WorkflowRunner, error)
6570
 	// Companion to EnqueueWorkflowRun for the conflict path: when an
6671
 	// INSERT ... ON CONFLICT DO NOTHING returns no rows, the trigger
6772
 	// handler uses this to find the existing row so it can surface a
@@ -69,6 +74,7 @@ type Querier interface {
6974
 	LookupWorkflowRunByTriggerEvent(ctx context.Context, db DBTX, arg LookupWorkflowRunByTriggerEventParams) (WorkflowRun, error)
7075
 	// SPDX-License-Identifier: AGPL-3.0-or-later
7176
 	MarkRunnerJWTUsed(ctx context.Context, db DBTX, arg MarkRunnerJWTUsedParams) (RunnerJwtUsed, error)
77
+	MarkWorkflowRunRunning(ctx context.Context, db DBTX, id int64) error
7278
 	// Atomic next-index emitter: take the max + 1 for this repo. Pairs
7379
 	// with the (repo_id, run_index) UNIQUE so concurrent inserts that
7480
 	// race here will catch a unique-violation and the caller retries.
internal/actions/sqlc/workflow_jobs.sql.gomodified
@@ -11,6 +11,133 @@ import (
1111
 	"github.com/jackc/pgx/v5/pgtype"
1212
 )
1313
 
14
+const claimQueuedWorkflowJob = `-- name: ClaimQueuedWorkflowJob :one
15
+WITH candidate AS (
16
+    SELECT j.id
17
+    FROM workflow_jobs j
18
+    WHERE j.status = 'queued'
19
+      AND j.runner_id IS NULL
20
+      AND (j.runs_on = '' OR j.runs_on = ANY($1::text[]))
21
+      AND NOT EXISTS (
22
+          SELECT 1
23
+          FROM workflow_jobs dep
24
+          WHERE dep.run_id = j.run_id
25
+            AND dep.job_key = ANY(j.needs_jobs)
26
+            AND (dep.status <> 'completed' OR dep.conclusion <> 'success')
27
+      )
28
+    ORDER BY j.created_at ASC, j.id ASC
29
+    FOR UPDATE OF j SKIP LOCKED
30
+    LIMIT 1
31
+),
32
+claimed AS (
33
+    UPDATE workflow_jobs j
34
+    SET runner_id = $2::bigint,
35
+        status = 'running',
36
+        started_at = COALESCE(j.started_at, now()),
37
+        version = j.version + 1,
38
+        updated_at = now()
39
+    FROM candidate c
40
+    WHERE j.id = c.id
41
+    RETURNING j.id, j.run_id, j.job_index, j.job_key, j.job_name, j.runs_on,
42
+              j.runner_id, j.needs_jobs, j.if_expr, j.timeout_minutes,
43
+              j.permissions, j.job_env, j.status, j.conclusion,
44
+              j.cancel_requested, j.started_at, j.completed_at, j.version,
45
+              j.created_at, j.updated_at
46
+)
47
+SELECT c.id, c.run_id, c.job_index, c.job_key, c.job_name, c.runs_on,
48
+       c.runner_id, c.needs_jobs, c.if_expr, c.timeout_minutes,
49
+       c.permissions, c.job_env, c.status, c.conclusion,
50
+       c.cancel_requested, c.started_at, c.completed_at, c.version,
51
+       c.created_at, c.updated_at,
52
+       r.repo_id, r.run_index, r.workflow_file, r.workflow_name,
53
+       r.head_sha, r.head_ref, r.event
54
+FROM claimed c
55
+JOIN workflow_runs r ON r.id = c.run_id
56
+`
57
+
58
+type ClaimQueuedWorkflowJobParams struct {
59
+	Labels   []string
60
+	RunnerID int64
61
+}
62
+
63
+type ClaimQueuedWorkflowJobRow struct {
64
+	ID              int64
65
+	RunID           int64
66
+	JobIndex        int32
67
+	JobKey          string
68
+	JobName         string
69
+	RunsOn          string
70
+	RunnerID        pgtype.Int8
71
+	NeedsJobs       []string
72
+	IfExpr          string
73
+	TimeoutMinutes  int32
74
+	Permissions     []byte
75
+	JobEnv          []byte
76
+	Status          WorkflowJobStatus
77
+	Conclusion      NullCheckConclusion
78
+	CancelRequested bool
79
+	StartedAt       pgtype.Timestamptz
80
+	CompletedAt     pgtype.Timestamptz
81
+	Version         int32
82
+	CreatedAt       pgtype.Timestamptz
83
+	UpdatedAt       pgtype.Timestamptz
84
+	RepoID          int64
85
+	RunIndex        int64
86
+	WorkflowFile    string
87
+	WorkflowName    string
88
+	HeadSha         string
89
+	HeadRef         string
90
+	Event           WorkflowRunEvent
91
+}
92
+
93
+func (q *Queries) ClaimQueuedWorkflowJob(ctx context.Context, db DBTX, arg ClaimQueuedWorkflowJobParams) (ClaimQueuedWorkflowJobRow, error) {
94
+	row := db.QueryRow(ctx, claimQueuedWorkflowJob, arg.Labels, arg.RunnerID)
95
+	var i ClaimQueuedWorkflowJobRow
96
+	err := row.Scan(
97
+		&i.ID,
98
+		&i.RunID,
99
+		&i.JobIndex,
100
+		&i.JobKey,
101
+		&i.JobName,
102
+		&i.RunsOn,
103
+		&i.RunnerID,
104
+		&i.NeedsJobs,
105
+		&i.IfExpr,
106
+		&i.TimeoutMinutes,
107
+		&i.Permissions,
108
+		&i.JobEnv,
109
+		&i.Status,
110
+		&i.Conclusion,
111
+		&i.CancelRequested,
112
+		&i.StartedAt,
113
+		&i.CompletedAt,
114
+		&i.Version,
115
+		&i.CreatedAt,
116
+		&i.UpdatedAt,
117
+		&i.RepoID,
118
+		&i.RunIndex,
119
+		&i.WorkflowFile,
120
+		&i.WorkflowName,
121
+		&i.HeadSha,
122
+		&i.HeadRef,
123
+		&i.Event,
124
+	)
125
+	return i, err
126
+}
127
+
128
+const countRunningJobsForRunner = `-- name: CountRunningJobsForRunner :one
129
+SELECT COUNT(*)::integer
130
+FROM workflow_jobs
131
+WHERE runner_id = $1::bigint AND status = 'running'
132
+`
133
+
134
+func (q *Queries) CountRunningJobsForRunner(ctx context.Context, db DBTX, runnerID int64) (int32, error) {
135
+	row := db.QueryRow(ctx, countRunningJobsForRunner, runnerID)
136
+	var column_1 int32
137
+	err := row.Scan(&column_1)
138
+	return column_1, err
139
+}
140
+
14141
 const getWorkflowJobByID = `-- name: GetWorkflowJobByID :one
15142
 SELECT id, run_id, job_index, job_key, job_name, runs_on,
16143
        runner_id, needs_jobs, if_expr, timeout_minutes, permissions,
internal/actions/sqlc/workflow_runners.sql.gomodified
@@ -94,6 +94,47 @@ func (q *Queries) GetRunnerByTokenHash(ctx context.Context, db DBTX, tokenHash [
9494
 	return i, err
9595
 }
9696
 
97
+const heartbeatRunner = `-- name: HeartbeatRunner :one
98
+UPDATE workflow_runners
99
+SET labels = $2,
100
+    capacity = $3,
101
+    last_heartbeat_at = now(),
102
+    status = $4,
103
+    updated_at = now()
104
+WHERE id = $1
105
+RETURNING id, name, labels, capacity, status, last_heartbeat_at,
106
+          registered_by_user_id, created_at, updated_at
107
+`
108
+
109
+type HeartbeatRunnerParams struct {
110
+	ID       int64
111
+	Labels   []string
112
+	Capacity int32
113
+	Status   WorkflowRunnerStatus
114
+}
115
+
116
+func (q *Queries) HeartbeatRunner(ctx context.Context, db DBTX, arg HeartbeatRunnerParams) (WorkflowRunner, error) {
117
+	row := db.QueryRow(ctx, heartbeatRunner,
118
+		arg.ID,
119
+		arg.Labels,
120
+		arg.Capacity,
121
+		arg.Status,
122
+	)
123
+	var i WorkflowRunner
124
+	err := row.Scan(
125
+		&i.ID,
126
+		&i.Name,
127
+		&i.Labels,
128
+		&i.Capacity,
129
+		&i.Status,
130
+		&i.LastHeartbeatAt,
131
+		&i.RegisteredByUserID,
132
+		&i.CreatedAt,
133
+		&i.UpdatedAt,
134
+	)
135
+	return i, err
136
+}
137
+
97138
 const insertRunner = `-- name: InsertRunner :one
98139
 
99140
 INSERT INTO workflow_runners (name, labels, capacity, registered_by_user_id)
@@ -202,6 +243,31 @@ func (q *Queries) ListRunners(ctx context.Context, db DBTX) ([]ListRunnersRow, e
202243
 	return items, nil
203244
 }
204245
 
246
+const lockRunnerByID = `-- name: LockRunnerByID :one
247
+SELECT id, name, labels, capacity, status, last_heartbeat_at,
248
+       registered_by_user_id, created_at, updated_at
249
+FROM workflow_runners
250
+WHERE id = $1
251
+FOR UPDATE
252
+`
253
+
254
+func (q *Queries) LockRunnerByID(ctx context.Context, db DBTX, id int64) (WorkflowRunner, error) {
255
+	row := db.QueryRow(ctx, lockRunnerByID, id)
256
+	var i WorkflowRunner
257
+	err := row.Scan(
258
+		&i.ID,
259
+		&i.Name,
260
+		&i.Labels,
261
+		&i.Capacity,
262
+		&i.Status,
263
+		&i.LastHeartbeatAt,
264
+		&i.RegisteredByUserID,
265
+		&i.CreatedAt,
266
+		&i.UpdatedAt,
267
+	)
268
+	return i, err
269
+}
270
+
205271
 const revokeAllTokensForRunner = `-- name: RevokeAllTokensForRunner :exec
206272
 UPDATE runner_tokens
207273
 SET revoked_at = now()
internal/actions/sqlc/workflow_runs.sql.gomodified
@@ -335,6 +335,20 @@ func (q *Queries) LookupWorkflowRunByTriggerEvent(ctx context.Context, db DBTX,
335335
 	return i, err
336336
 }
337337
 
338
+const markWorkflowRunRunning = `-- name: MarkWorkflowRunRunning :exec
339
+UPDATE workflow_runs
340
+SET status = 'running',
341
+    started_at = COALESCE(started_at, now()),
342
+    version = version + 1,
343
+    updated_at = now()
344
+WHERE id = $1 AND status = 'queued'
345
+`
346
+
347
+func (q *Queries) MarkWorkflowRunRunning(ctx context.Context, db DBTX, id int64) error {
348
+	_, err := db.Exec(ctx, markWorkflowRunRunning, id)
349
+	return err
350
+}
351
+
338352
 const nextRunIndexForRepo = `-- name: NextRunIndexForRepo :one
339353
 SELECT (COALESCE(MAX(run_index), 0) + 1)::bigint AS next_index
340354
 FROM workflow_runs
internal/actions/sqlc/workflow_steps.sql.gomodified
@@ -122,6 +122,76 @@ func (q *Queries) InsertWorkflowStep(ctx context.Context, db DBTX, arg InsertWor
122122
 	return i, err
123123
 }
124124
 
125
+const listRunnerStepsForJob = `-- name: ListRunnerStepsForJob :many
126
+SELECT id, job_id, step_index, step_id, step_name, if_expr,
127
+       run_command, uses_alias, working_directory, step_env,
128
+       continue_on_error, status, conclusion, log_byte_count,
129
+       started_at, completed_at, created_at, step_with
130
+FROM workflow_steps
131
+WHERE job_id = $1
132
+ORDER BY step_index ASC
133
+`
134
+
135
+type ListRunnerStepsForJobRow struct {
136
+	ID               int64
137
+	JobID            int64
138
+	StepIndex        int32
139
+	StepID           string
140
+	StepName         string
141
+	IfExpr           string
142
+	RunCommand       string
143
+	UsesAlias        string
144
+	WorkingDirectory string
145
+	StepEnv          []byte
146
+	ContinueOnError  bool
147
+	Status           WorkflowStepStatus
148
+	Conclusion       NullCheckConclusion
149
+	LogByteCount     int64
150
+	StartedAt        pgtype.Timestamptz
151
+	CompletedAt      pgtype.Timestamptz
152
+	CreatedAt        pgtype.Timestamptz
153
+	StepWith         []byte
154
+}
155
+
156
+func (q *Queries) ListRunnerStepsForJob(ctx context.Context, db DBTX, jobID int64) ([]ListRunnerStepsForJobRow, error) {
157
+	rows, err := db.Query(ctx, listRunnerStepsForJob, jobID)
158
+	if err != nil {
159
+		return nil, err
160
+	}
161
+	defer rows.Close()
162
+	items := []ListRunnerStepsForJobRow{}
163
+	for rows.Next() {
164
+		var i ListRunnerStepsForJobRow
165
+		if err := rows.Scan(
166
+			&i.ID,
167
+			&i.JobID,
168
+			&i.StepIndex,
169
+			&i.StepID,
170
+			&i.StepName,
171
+			&i.IfExpr,
172
+			&i.RunCommand,
173
+			&i.UsesAlias,
174
+			&i.WorkingDirectory,
175
+			&i.StepEnv,
176
+			&i.ContinueOnError,
177
+			&i.Status,
178
+			&i.Conclusion,
179
+			&i.LogByteCount,
180
+			&i.StartedAt,
181
+			&i.CompletedAt,
182
+			&i.CreatedAt,
183
+			&i.StepWith,
184
+		); err != nil {
185
+			return nil, err
186
+		}
187
+		items = append(items, i)
188
+	}
189
+	if err := rows.Err(); err != nil {
190
+		return nil, err
191
+	}
192
+	return items, nil
193
+}
194
+
125195
 const listStepsForJob = `-- name: ListStepsForJob :many
126196
 SELECT id, job_id, step_index, step_id, step_name, run_command,
127197
        uses_alias, status, conclusion, log_byte_count,
internal/web/auth_wiring.gomodified
@@ -17,6 +17,7 @@ import (
1717
 	"github.com/tenseleyFlow/shithub/internal/auth/email"
1818
 	"github.com/tenseleyFlow/shithub/internal/auth/password"
1919
 	"github.com/tenseleyFlow/shithub/internal/auth/pat"
20
+	"github.com/tenseleyFlow/shithub/internal/auth/runnerjwt"
2021
 	"github.com/tenseleyFlow/shithub/internal/auth/secretbox"
2122
 	"github.com/tenseleyFlow/shithub/internal/auth/session"
2223
 	"github.com/tenseleyFlow/shithub/internal/auth/throttle"
@@ -36,10 +37,20 @@ import (
3637
 var sharedPATDebouncer = pat.NewDebouncer(0)
3738
 
3839
 // buildAPIHandlers wires the PAT-authenticated API surface.
39
-func buildAPIHandlers(pool *pgxpool.Pool) (*apih.Handlers, error) {
40
+func buildAPIHandlers(
41
+	pool *pgxpool.Pool,
42
+	objectStore storage.ObjectStore,
43
+	runnerJWT *runnerjwt.Signer,
44
+	rateLimiter *ratelimit.Limiter,
45
+	logger *slog.Logger,
46
+) (*apih.Handlers, error) {
4047
 	return apih.New(apih.Deps{
41
-		Pool:      pool,
42
-		Debouncer: sharedPATDebouncer,
48
+		Pool:        pool,
49
+		Debouncer:   sharedPATDebouncer,
50
+		Logger:      logger,
51
+		ObjectStore: objectStore,
52
+		RunnerJWT:   runnerJWT,
53
+		RateLimiter: rateLimiter,
4354
 	})
4455
 }
4556
 
internal/web/handlers/api/api.gomodified
@@ -12,12 +12,16 @@ package api
1212
 import (
1313
 	"encoding/json"
1414
 	"errors"
15
+	"log/slog"
1516
 	"net/http"
1617
 
1718
 	"github.com/go-chi/chi/v5"
1819
 	"github.com/jackc/pgx/v5/pgxpool"
1920
 
2021
 	"github.com/tenseleyFlow/shithub/internal/auth/pat"
22
+	"github.com/tenseleyFlow/shithub/internal/auth/runnerjwt"
23
+	"github.com/tenseleyFlow/shithub/internal/infra/storage"
24
+	"github.com/tenseleyFlow/shithub/internal/ratelimit"
2125
 	usersdb "github.com/tenseleyFlow/shithub/internal/users/sqlc"
2226
 	"github.com/tenseleyFlow/shithub/internal/web/middleware"
2327
 )
@@ -25,8 +29,12 @@ import (
2529
 // Deps is the wiring the API handlers need. Constructed by the web
2630
 // package and injected at registration time.
2731
 type Deps struct {
28
-	Pool      *pgxpool.Pool
29
-	Debouncer *pat.Debouncer
32
+	Pool        *pgxpool.Pool
33
+	Debouncer   *pat.Debouncer
34
+	Logger      *slog.Logger
35
+	ObjectStore storage.ObjectStore
36
+	RunnerJWT   *runnerjwt.Signer
37
+	RateLimiter *ratelimit.Limiter
3038
 }
3139
 
3240
 // Handlers is the registered API handler set. Construct with New.
@@ -43,6 +51,9 @@ func New(d Deps) (*Handlers, error) {
4351
 	if d.Debouncer == nil {
4452
 		d.Debouncer = pat.NewDebouncer(0)
4553
 	}
54
+	if d.Logger == nil {
55
+		d.Logger = slog.Default()
56
+	}
4657
 	return &Handlers{d: d, q: usersdb.New()}, nil
4758
 }
4859
 
@@ -57,9 +68,17 @@ func New(d Deps) (*Handlers, error) {
5768
 // a 50 MB JSON blob to weaponize the parser.
5869
 const apiMaxBodyBytes = 256 * 1024
5970
 
71
+// runnerAPIMaxBodyBytes must fit a 512 KiB raw log chunk after base64
72
+// expansion plus JSON framing.
73
+const runnerAPIMaxBodyBytes = 768 * 1024
74
+
6075
 // Mount registers /api/v1/* on r. Caller is responsible for putting r
6176
 // in a CSRF-exempt group.
6277
 func (h *Handlers) Mount(r chi.Router) {
78
+	r.Group(func(r chi.Router) {
79
+		r.Use(middleware.MaxBodySize(runnerAPIMaxBodyBytes))
80
+		h.mountRunners(r)
81
+	})
6382
 	r.Group(func(r chi.Router) {
6483
 		r.Use(middleware.MaxBodySize(apiMaxBodyBytes))
6584
 		r.Use(middleware.PATAuthMiddleware(middleware.PATConfig{
internal/web/handlers/api/runners.goadded
@@ -0,0 +1,323 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package api
4
+
5
+import (
6
+	"context"
7
+	"encoding/json"
8
+	"errors"
9
+	"fmt"
10
+	"io"
11
+	"net/http"
12
+	"strings"
13
+	"time"
14
+
15
+	"github.com/go-chi/chi/v5"
16
+	"github.com/jackc/pgx/v5"
17
+
18
+	"github.com/tenseleyFlow/shithub/internal/actions/runnerlabels"
19
+	"github.com/tenseleyFlow/shithub/internal/actions/runnertoken"
20
+	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
21
+	"github.com/tenseleyFlow/shithub/internal/auth/runnerjwt"
22
+	"github.com/tenseleyFlow/shithub/internal/ratelimit"
23
+)
24
+
25
+var runnerHeartbeatLimit = ratelimit.Policy{
26
+	Scope:  "actions:runner_dispatch",
27
+	Max:    60,
28
+	Window: time.Minute,
29
+}
30
+
31
+func (h *Handlers) mountRunners(r chi.Router) {
32
+	r.Post("/api/v1/runners/heartbeat", h.runnerHeartbeat)
33
+}
34
+
35
+type runnerHeartbeatRequest struct {
36
+	Labels   []string `json:"labels"`
37
+	Capacity int      `json:"capacity"`
38
+}
39
+
40
+func (h *Handlers) runnerHeartbeat(w http.ResponseWriter, r *http.Request) {
41
+	if h.d.RunnerJWT == nil {
42
+		writeAPIError(w, http.StatusServiceUnavailable, "runner API is not configured")
43
+		return
44
+	}
45
+	runner, ok := h.authenticateRunner(w, r)
46
+	if !ok {
47
+		return
48
+	}
49
+	if !h.allowRunnerHeartbeat(w, r, runner.ID) {
50
+		return
51
+	}
52
+
53
+	var body runnerHeartbeatRequest
54
+	dec := json.NewDecoder(r.Body)
55
+	dec.DisallowUnknownFields()
56
+	if err := dec.Decode(&body); err != nil && !errors.Is(err, io.EOF) {
57
+		writeAPIError(w, http.StatusBadRequest, "invalid JSON: "+err.Error())
58
+		return
59
+	}
60
+	labels := runner.Labels
61
+	if body.Labels != nil {
62
+		var err error
63
+		labels, err = runnerlabels.Normalize(body.Labels)
64
+		if err != nil {
65
+			writeAPIError(w, http.StatusBadRequest, err.Error())
66
+			return
67
+		}
68
+	}
69
+	capacity := int(runner.Capacity)
70
+	if body.Capacity != 0 {
71
+		capacity = body.Capacity
72
+	}
73
+	if capacity < 1 || capacity > 64 {
74
+		writeAPIError(w, http.StatusBadRequest, "capacity must be between 1 and 64")
75
+		return
76
+	}
77
+
78
+	job, steps, claimed, err := h.claimRunnerJob(r.Context(), runner.ID, labels, int32(capacity))
79
+	if err != nil {
80
+		h.d.Logger.ErrorContext(r.Context(), "runner heartbeat claim failed", "runner_id", runner.ID, "error", err)
81
+		writeAPIError(w, http.StatusInternalServerError, "runner heartbeat failed")
82
+		return
83
+	}
84
+	if !claimed {
85
+		w.WriteHeader(http.StatusNoContent)
86
+		return
87
+	}
88
+
89
+	token, claims, err := h.d.RunnerJWT.Mint(runnerjwt.MintParams{
90
+		RunnerID: runner.ID,
91
+		JobID:    job.ID,
92
+		RunID:    job.RunID,
93
+		RepoID:   job.RepoID,
94
+	})
95
+	if err != nil {
96
+		h.d.Logger.ErrorContext(r.Context(), "runner jwt mint failed", "runner_id", runner.ID, "job_id", job.ID, "error", err)
97
+		writeAPIError(w, http.StatusInternalServerError, "runner token mint failed")
98
+		return
99
+	}
100
+	writeJSON(w, http.StatusOK, presentRunnerClaim(job, steps, token, time.Unix(claims.Exp, 0)))
101
+}
102
+
103
+func (h *Handlers) authenticateRunner(w http.ResponseWriter, r *http.Request) (actionsdb.GetRunnerByTokenHashRow, bool) {
104
+	const prefix = "Bearer "
105
+	authz := r.Header.Get("Authorization")
106
+	if !strings.HasPrefix(authz, prefix) {
107
+		writeAPIError(w, http.StatusUnauthorized, "runner token required")
108
+		return actionsdb.GetRunnerByTokenHashRow{}, false
109
+	}
110
+	hash, err := runnertoken.HashOf(strings.TrimSpace(strings.TrimPrefix(authz, prefix)))
111
+	if err != nil {
112
+		writeAPIError(w, http.StatusUnauthorized, "runner token invalid")
113
+		return actionsdb.GetRunnerByTokenHashRow{}, false
114
+	}
115
+	runner, err := actionsdb.New().GetRunnerByTokenHash(r.Context(), h.d.Pool, hash)
116
+	if err != nil {
117
+		writeAPIError(w, http.StatusUnauthorized, "runner token invalid")
118
+		return actionsdb.GetRunnerByTokenHashRow{}, false
119
+	}
120
+	return runner, true
121
+}
122
+
123
+func (h *Handlers) allowRunnerHeartbeat(w http.ResponseWriter, r *http.Request, runnerID int64) bool {
124
+	if h.d.RateLimiter == nil {
125
+		return true
126
+	}
127
+	decision, err := h.d.RateLimiter.Allow(r.Context(), runnerHeartbeatLimit, fmt.Sprintf("runner:%d", runnerID))
128
+	if err != nil {
129
+		h.d.Logger.WarnContext(r.Context(), "runner heartbeat rate-limit failed", "runner_id", runnerID, "error", err)
130
+	}
131
+	ratelimit.StampHeaders(w, decision)
132
+	if !decision.Allowed {
133
+		w.Header().Set("Retry-After", fmt.Sprintf("%d", int(decision.RetryAfter/time.Second)))
134
+		writeAPIError(w, http.StatusTooManyRequests, "rate limit exceeded")
135
+		return false
136
+	}
137
+	return true
138
+}
139
+
140
+func (h *Handlers) claimRunnerJob(
141
+	ctx context.Context,
142
+	runnerID int64,
143
+	labels []string,
144
+	capacity int32,
145
+) (actionsdb.ClaimQueuedWorkflowJobRow, []actionsdb.ListRunnerStepsForJobRow, bool, error) {
146
+	q := actionsdb.New()
147
+	tx, err := h.d.Pool.Begin(ctx)
148
+	if err != nil {
149
+		return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, false, err
150
+	}
151
+	committed := false
152
+	defer func() {
153
+		if !committed {
154
+			_ = tx.Rollback(ctx)
155
+		}
156
+	}()
157
+
158
+	if _, err := q.LockRunnerByID(ctx, tx, runnerID); err != nil {
159
+		return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, false, err
160
+	}
161
+	running, err := q.CountRunningJobsForRunner(ctx, tx, runnerID)
162
+	if err != nil {
163
+		return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, false, err
164
+	}
165
+	if running >= capacity {
166
+		if _, err := q.HeartbeatRunner(ctx, tx, actionsdb.HeartbeatRunnerParams{
167
+			ID:       runnerID,
168
+			Labels:   labels,
169
+			Capacity: capacity,
170
+			Status:   actionsdb.WorkflowRunnerStatusBusy,
171
+		}); err != nil {
172
+			return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, false, err
173
+		}
174
+		if err := tx.Commit(ctx); err != nil {
175
+			return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, false, err
176
+		}
177
+		committed = true
178
+		return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, false, nil
179
+	}
180
+
181
+	job, err := q.ClaimQueuedWorkflowJob(ctx, tx, actionsdb.ClaimQueuedWorkflowJobParams{
182
+		RunnerID: runnerID,
183
+		Labels:   labels,
184
+	})
185
+	if err != nil {
186
+		if !errors.Is(err, pgx.ErrNoRows) {
187
+			return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, false, err
188
+		}
189
+		if _, err := q.HeartbeatRunner(ctx, tx, actionsdb.HeartbeatRunnerParams{
190
+			ID:       runnerID,
191
+			Labels:   labels,
192
+			Capacity: capacity,
193
+			Status:   actionsdb.WorkflowRunnerStatusIdle,
194
+		}); err != nil {
195
+			return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, false, err
196
+		}
197
+		if err := tx.Commit(ctx); err != nil {
198
+			return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, false, err
199
+		}
200
+		committed = true
201
+		return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, false, nil
202
+	}
203
+	if err := q.MarkWorkflowRunRunning(ctx, tx, job.RunID); err != nil {
204
+		return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, false, err
205
+	}
206
+	steps, err := q.ListRunnerStepsForJob(ctx, tx, job.ID)
207
+	if err != nil {
208
+		return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, false, err
209
+	}
210
+	status := actionsdb.WorkflowRunnerStatusIdle
211
+	if running+1 >= capacity {
212
+		status = actionsdb.WorkflowRunnerStatusBusy
213
+	}
214
+	if _, err := q.HeartbeatRunner(ctx, tx, actionsdb.HeartbeatRunnerParams{
215
+		ID:       runnerID,
216
+		Labels:   labels,
217
+		Capacity: capacity,
218
+		Status:   status,
219
+	}); err != nil {
220
+		return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, false, err
221
+	}
222
+	if err := tx.Commit(ctx); err != nil {
223
+		return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, false, err
224
+	}
225
+	committed = true
226
+	return job, steps, true, nil
227
+}
228
+
229
+type runnerClaimResponse struct {
230
+	Token     string           `json:"token"`
231
+	ExpiresAt string           `json:"expires_at"`
232
+	Job       runnerJobPayload `json:"job"`
233
+}
234
+
235
+type runnerJobPayload struct {
236
+	ID             int64           `json:"id"`
237
+	RunID          int64           `json:"run_id"`
238
+	RepoID         int64           `json:"repo_id"`
239
+	RunIndex       int64           `json:"run_index"`
240
+	WorkflowFile   string          `json:"workflow_file"`
241
+	WorkflowName   string          `json:"workflow_name"`
242
+	HeadSHA        string          `json:"head_sha"`
243
+	HeadRef        string          `json:"head_ref"`
244
+	Event          string          `json:"event"`
245
+	JobKey         string          `json:"job_key"`
246
+	JobName        string          `json:"job_name"`
247
+	RunsOn         string          `json:"runs_on"`
248
+	Needs          []string        `json:"needs"`
249
+	If             string          `json:"if"`
250
+	TimeoutMinutes int32           `json:"timeout_minutes"`
251
+	Permissions    json.RawMessage `json:"permissions"`
252
+	Env            json.RawMessage `json:"env"`
253
+	Steps          []runnerStep    `json:"steps"`
254
+}
255
+
256
+type runnerStep struct {
257
+	ID               int64           `json:"id"`
258
+	Index            int32           `json:"index"`
259
+	StepID           string          `json:"step_id"`
260
+	Name             string          `json:"name"`
261
+	If               string          `json:"if"`
262
+	Run              string          `json:"run"`
263
+	Uses             string          `json:"uses"`
264
+	WorkingDirectory string          `json:"working_directory"`
265
+	Env              json.RawMessage `json:"env"`
266
+	With             json.RawMessage `json:"with"`
267
+	ContinueOnError  bool            `json:"continue_on_error"`
268
+}
269
+
270
+func presentRunnerClaim(
271
+	job actionsdb.ClaimQueuedWorkflowJobRow,
272
+	steps []actionsdb.ListRunnerStepsForJobRow,
273
+	token string,
274
+	expiresAt time.Time,
275
+) runnerClaimResponse {
276
+	outSteps := make([]runnerStep, 0, len(steps))
277
+	for _, step := range steps {
278
+		outSteps = append(outSteps, runnerStep{
279
+			ID:               step.ID,
280
+			Index:            step.StepIndex,
281
+			StepID:           step.StepID,
282
+			Name:             step.StepName,
283
+			If:               step.IfExpr,
284
+			Run:              step.RunCommand,
285
+			Uses:             step.UsesAlias,
286
+			WorkingDirectory: step.WorkingDirectory,
287
+			Env:              rawJSONOrObject(step.StepEnv),
288
+			With:             rawJSONOrObject(step.StepWith),
289
+			ContinueOnError:  step.ContinueOnError,
290
+		})
291
+	}
292
+	return runnerClaimResponse{
293
+		Token:     token,
294
+		ExpiresAt: expiresAt.UTC().Format(time.RFC3339),
295
+		Job: runnerJobPayload{
296
+			ID:             job.ID,
297
+			RunID:          job.RunID,
298
+			RepoID:         job.RepoID,
299
+			RunIndex:       job.RunIndex,
300
+			WorkflowFile:   job.WorkflowFile,
301
+			WorkflowName:   job.WorkflowName,
302
+			HeadSHA:        job.HeadSha,
303
+			HeadRef:        job.HeadRef,
304
+			Event:          string(job.Event),
305
+			JobKey:         job.JobKey,
306
+			JobName:        job.JobName,
307
+			RunsOn:         job.RunsOn,
308
+			Needs:          job.NeedsJobs,
309
+			If:             job.IfExpr,
310
+			TimeoutMinutes: job.TimeoutMinutes,
311
+			Permissions:    rawJSONOrObject(job.Permissions),
312
+			Env:            rawJSONOrObject(job.JobEnv),
313
+			Steps:          outSteps,
314
+		},
315
+	}
316
+}
317
+
318
+func rawJSONOrObject(b []byte) json.RawMessage {
319
+	if len(b) == 0 || !json.Valid(b) {
320
+		return json.RawMessage(`{}`)
321
+	}
322
+	return json.RawMessage(b)
323
+}
internal/web/handlers/api/runners_test.goadded
@@ -0,0 +1,240 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package api_test
4
+
5
+import (
6
+	"bytes"
7
+	"context"
8
+	"encoding/json"
9
+	"io"
10
+	"log/slog"
11
+	"net/http"
12
+	"net/http/httptest"
13
+	"strings"
14
+	"testing"
15
+	"time"
16
+
17
+	"github.com/go-chi/chi/v5"
18
+	"github.com/jackc/pgx/v5/pgtype"
19
+	"github.com/jackc/pgx/v5/pgxpool"
20
+
21
+	"github.com/tenseleyFlow/shithub/internal/actions/runnertoken"
22
+	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
23
+	"github.com/tenseleyFlow/shithub/internal/actions/trigger"
24
+	"github.com/tenseleyFlow/shithub/internal/actions/workflow"
25
+	"github.com/tenseleyFlow/shithub/internal/auth/runnerjwt"
26
+	reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
27
+	"github.com/tenseleyFlow/shithub/internal/testing/dbtest"
28
+	usersdb "github.com/tenseleyFlow/shithub/internal/users/sqlc"
29
+	apih "github.com/tenseleyFlow/shithub/internal/web/handlers/api"
30
+)
31
+
32
+const runnerAPIFixtureHash = "$argon2id$v=19$m=16384,t=1,p=1$" +
33
+	"AAAAAAAAAAAAAAAA$" +
34
+	"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
35
+
36
+func TestRunnerHeartbeatClaimsQueuedJob(t *testing.T) {
37
+	ctx := context.Background()
38
+	pool := dbtest.NewTestDB(t)
39
+	logger := slog.New(slog.NewTextHandler(io.Discard, nil))
40
+	repoID, userID := setupRunnerAPIRepo(t, pool)
41
+	runID := enqueueRunnerAPIRun(t, pool, logger, repoID, userID)
42
+
43
+	token, runnerID := registerRunnerForTest(t, pool, []string{"ubuntu-latest", "linux"}, 1)
44
+	signer := runnerAPISigner(t, time.Date(2026, 5, 10, 12, 0, 0, 0, time.UTC))
45
+	router := newRunnerAPIRouter(t, pool, logger, signer)
46
+
47
+	req := httptest.NewRequest(http.MethodPost, "/api/v1/runners/heartbeat",
48
+		strings.NewReader(`{"labels":["ubuntu-latest","linux"],"capacity":1}`))
49
+	req.Header.Set("Authorization", "Bearer "+token)
50
+	rr := httptest.NewRecorder()
51
+	router.ServeHTTP(rr, req)
52
+
53
+	if rr.Code != http.StatusOK {
54
+		t.Fatalf("status: got %d, want 200; body=%s", rr.Code, rr.Body.String())
55
+	}
56
+	var resp struct {
57
+		Token string `json:"token"`
58
+		Job   struct {
59
+			ID     int64 `json:"id"`
60
+			RunID  int64 `json:"run_id"`
61
+			RepoID int64 `json:"repo_id"`
62
+			Steps  []struct {
63
+				Run  string `json:"run"`
64
+				Uses string `json:"uses"`
65
+			} `json:"steps"`
66
+		} `json:"job"`
67
+	}
68
+	if err := json.Unmarshal(rr.Body.Bytes(), &resp); err != nil {
69
+		t.Fatalf("decode response: %v", err)
70
+	}
71
+	if resp.Token == "" {
72
+		t.Fatal("response token is empty")
73
+	}
74
+	if resp.Job.RunID != runID || resp.Job.RepoID != repoID || len(resp.Job.Steps) != 2 {
75
+		t.Fatalf("unexpected job payload: %+v", resp.Job)
76
+	}
77
+	claims, err := signer.Verify(resp.Token)
78
+	if err != nil {
79
+		t.Fatalf("verify runner JWT: %v", err)
80
+	}
81
+	if claims.JobID != resp.Job.ID || claims.RunID != runID || claims.RepoID != repoID {
82
+		t.Fatalf("claims/job mismatch: claims=%+v job=%+v", claims, resp.Job)
83
+	}
84
+	claimRunnerID, err := claims.RunnerID()
85
+	if err != nil {
86
+		t.Fatalf("claims RunnerID: %v", err)
87
+	}
88
+	if claimRunnerID != runnerID {
89
+		t.Fatalf("claims runner_id: got %d, want %d", claimRunnerID, runnerID)
90
+	}
91
+
92
+	q := actionsdb.New()
93
+	job, err := q.GetWorkflowJobByID(ctx, pool, resp.Job.ID)
94
+	if err != nil {
95
+		t.Fatalf("GetWorkflowJobByID: %v", err)
96
+	}
97
+	if job.Status != actionsdb.WorkflowJobStatusRunning || !job.RunnerID.Valid || job.RunnerID.Int64 != runnerID {
98
+		t.Fatalf("job not claimed by runner: %+v", job)
99
+	}
100
+	run, err := q.GetWorkflowRunByID(ctx, pool, runID)
101
+	if err != nil {
102
+		t.Fatalf("GetWorkflowRunByID: %v", err)
103
+	}
104
+	if run.Status != actionsdb.WorkflowRunStatusRunning {
105
+		t.Fatalf("run status: got %s, want running", run.Status)
106
+	}
107
+
108
+	// Capacity is enforced server-side: a second heartbeat from the same
109
+	// runner sees one running job and receives no additional claim.
110
+	req = httptest.NewRequest(http.MethodPost, "/api/v1/runners/heartbeat",
111
+		strings.NewReader(`{"labels":["ubuntu-latest","linux"],"capacity":1}`))
112
+	req.Header.Set("Authorization", "Bearer "+token)
113
+	rr = httptest.NewRecorder()
114
+	router.ServeHTTP(rr, req)
115
+	if rr.Code != http.StatusNoContent {
116
+		t.Fatalf("second heartbeat status: got %d, want 204; body=%s", rr.Code, rr.Body.String())
117
+	}
118
+}
119
+
120
+func TestRunnerHeartbeatRejectsBadToken(t *testing.T) {
121
+	pool := dbtest.NewTestDB(t)
122
+	router := newRunnerAPIRouter(t, pool, slog.New(slog.NewTextHandler(io.Discard, nil)), runnerAPISigner(t, time.Now()))
123
+	req := httptest.NewRequest(http.MethodPost, "/api/v1/runners/heartbeat", bytes.NewReader([]byte(`{}`)))
124
+	req.Header.Set("Authorization", "Bearer not-hex")
125
+	rr := httptest.NewRecorder()
126
+	router.ServeHTTP(rr, req)
127
+	if rr.Code != http.StatusUnauthorized {
128
+		t.Fatalf("status: got %d, want 401; body=%s", rr.Code, rr.Body.String())
129
+	}
130
+}
131
+
132
+func newRunnerAPIRouter(t *testing.T, pool *pgxpool.Pool, logger *slog.Logger, signer *runnerjwt.Signer) http.Handler {
133
+	t.Helper()
134
+	h, err := apih.New(apih.Deps{
135
+		Pool:      pool,
136
+		Logger:    logger,
137
+		RunnerJWT: signer,
138
+	})
139
+	if err != nil {
140
+		t.Fatalf("api.New: %v", err)
141
+	}
142
+	r := chi.NewRouter()
143
+	h.Mount(r)
144
+	return r
145
+}
146
+
147
+func setupRunnerAPIRepo(t *testing.T, pool *pgxpool.Pool) (repoID, userID int64) {
148
+	t.Helper()
149
+	ctx := context.Background()
150
+	user, err := usersdb.New().CreateUser(ctx, pool, usersdb.CreateUserParams{
151
+		Username:     "alice",
152
+		DisplayName:  "Alice",
153
+		PasswordHash: runnerAPIFixtureHash,
154
+	})
155
+	if err != nil {
156
+		t.Fatalf("CreateUser: %v", err)
157
+	}
158
+	repo, err := reposdb.New().CreateRepo(ctx, pool, reposdb.CreateRepoParams{
159
+		OwnerUserID:   pgtype.Int8{Int64: user.ID, Valid: true},
160
+		Name:          "demo",
161
+		DefaultBranch: "trunk",
162
+		Visibility:    reposdb.RepoVisibilityPublic,
163
+	})
164
+	if err != nil {
165
+		t.Fatalf("CreateRepo: %v", err)
166
+	}
167
+	return repo.ID, user.ID
168
+}
169
+
170
+func enqueueRunnerAPIRun(t *testing.T, pool *pgxpool.Pool, logger *slog.Logger, repoID, userID int64) int64 {
171
+	t.Helper()
172
+	wf, diags, err := workflow.Parse([]byte(`name: ci
173
+on: push
174
+jobs:
175
+  build:
176
+    runs-on: ubuntu-latest
177
+    steps:
178
+      - uses: actions/checkout@v4
179
+      - run: go test ./...
180
+`))
181
+	if err != nil {
182
+		t.Fatalf("workflow.Parse: %v", err)
183
+	}
184
+	for _, d := range diags {
185
+		if d.Severity == workflow.Error {
186
+			t.Fatalf("workflow diagnostic: %v", d)
187
+		}
188
+	}
189
+	res, err := trigger.Enqueue(context.Background(), trigger.Deps{Pool: pool, Logger: logger}, trigger.EnqueueParams{
190
+		RepoID:         repoID,
191
+		WorkflowFile:   ".shithub/workflows/ci.yml",
192
+		HeadSHA:        strings.Repeat("a", 40),
193
+		HeadRef:        "refs/heads/trunk",
194
+		EventKind:      trigger.EventPush,
195
+		EventPayload:   map[string]any{"ref": "refs/heads/trunk"},
196
+		ActorUserID:    userID,
197
+		TriggerEventID: "push:test",
198
+		Workflow:       wf,
199
+	})
200
+	if err != nil {
201
+		t.Fatalf("trigger.Enqueue: %v", err)
202
+	}
203
+	return res.RunID
204
+}
205
+
206
+func registerRunnerForTest(t *testing.T, pool *pgxpool.Pool, labels []string, capacity int32) (token string, runnerID int64) {
207
+	t.Helper()
208
+	token, tokenHash, err := runnertoken.New()
209
+	if err != nil {
210
+		t.Fatalf("runnertoken.New: %v", err)
211
+	}
212
+	q := actionsdb.New()
213
+	runner, err := q.InsertRunner(context.Background(), pool, actionsdb.InsertRunnerParams{
214
+		Name:     "runner1",
215
+		Labels:   labels,
216
+		Capacity: capacity,
217
+	})
218
+	if err != nil {
219
+		t.Fatalf("InsertRunner: %v", err)
220
+	}
221
+	if _, err := q.InsertRunnerToken(context.Background(), pool, actionsdb.InsertRunnerTokenParams{
222
+		RunnerID:  runner.ID,
223
+		TokenHash: tokenHash,
224
+	}); err != nil {
225
+		t.Fatalf("InsertRunnerToken: %v", err)
226
+	}
227
+	return token, runner.ID
228
+}
229
+
230
+func runnerAPISigner(t *testing.T, now time.Time) *runnerjwt.Signer {
231
+	t.Helper()
232
+	signer, err := runnerjwt.NewFromKey(
233
+		bytes.Repeat([]byte{0x7c}, 32),
234
+		runnerjwt.WithClock(func() time.Time { return now }),
235
+	)
236
+	if err != nil {
237
+		t.Fatalf("runnerjwt.NewFromKey: %v", err)
238
+	}
239
+	return signer
240
+}
internal/web/server.gomodified
@@ -23,6 +23,7 @@ import (
2323
 	"github.com/jackc/pgx/v5/pgxpool"
2424
 	"golang.org/x/crypto/chacha20poly1305"
2525
 
26
+	"github.com/tenseleyFlow/shithub/internal/auth/runnerjwt"
2627
 	"github.com/tenseleyFlow/shithub/internal/auth/session"
2728
 	"github.com/tenseleyFlow/shithub/internal/infra/config"
2829
 	"github.com/tenseleyFlow/shithub/internal/infra/db"
@@ -164,7 +165,17 @@ func Run(ctx context.Context, opts Options) error {
164165
 		}
165166
 		deps.AuthMounter = auth.Mount
166167
 
167
-		api, err := buildAPIHandlers(pool)
168
+		var runnerJWT *runnerjwt.Signer
169
+		if cfg.Auth.TOTPKeyB64 != "" {
170
+			runnerJWT, err = runnerjwt.NewFromTOTPKeyB64(cfg.Auth.TOTPKeyB64)
171
+			if err != nil {
172
+				return fmt.Errorf("runner jwt: %w", err)
173
+			}
174
+		} else {
175
+			logger.Warn("actions runner API disabled: auth.totp_key_b64 is not configured",
176
+				"hint", "set SHITHUB_TOTP_KEY=$(openssl rand -base64 32) to enable runner job JWTs")
177
+		}
178
+		api, err := buildAPIHandlers(pool, objectStore, runnerJWT, ratelimit.New(pool), logger)
168179
 		if err != nil {
169180
 			return fmt.Errorf("api handlers: %w", err)
170181
 		}