tenseleyflow/shithub / b70f1f2

Browse files

api/runners: accept job logs status and artifacts (S41c)

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
b70f1f25a51a91af1751e373c9453e3471eea784
Parents
e85a1c4
Tree
1579a12

11 changed files

StatusFile+-
M internal/actions/queries/workflow_jobs.sql 14 0
M internal/actions/queries/workflow_runs.sql 15 0
M internal/actions/queries/workflow_step_log_chunks.sql 1 0
M internal/actions/queries/workflow_steps.sql 11 0
M internal/actions/sqlc/querier.go 3 0
M internal/actions/sqlc/workflow_jobs.sql.go 57 0
M internal/actions/sqlc/workflow_runs.sql.go 52 0
M internal/actions/sqlc/workflow_step_log_chunks.sql.go 1 0
M internal/actions/sqlc/workflow_steps.sql.go 41 0
M internal/web/handlers/api/runners.go 516 0
M internal/web/handlers/api/runners_test.go 119 10
internal/actions/queries/workflow_jobs.sqlmodified
@@ -21,6 +21,20 @@ SELECT id, run_id, job_index, job_key, job_name, runs_on,
21
 FROM workflow_jobs
21
 FROM workflow_jobs
22
 WHERE id = $1;
22
 WHERE id = $1;
23
 
23
 
24
+-- name: UpdateWorkflowJobStatus :one
25
+UPDATE workflow_jobs
26
+SET status = $2,
27
+    conclusion = sqlc.narg(conclusion)::check_conclusion,
28
+    started_at = sqlc.narg(started_at)::timestamptz,
29
+    completed_at = sqlc.narg(completed_at)::timestamptz,
30
+    version = version + 1,
31
+    updated_at = now()
32
+WHERE id = $1
33
+RETURNING id, run_id, job_index, job_key, job_name, runs_on,
34
+          runner_id, needs_jobs, if_expr, timeout_minutes, permissions,
35
+          job_env, status, conclusion, cancel_requested,
36
+          started_at, completed_at, version, created_at, updated_at;
37
+
24
 -- name: CountRunningJobsForRunner :one
38
 -- name: CountRunningJobsForRunner :one
25
 SELECT COUNT(*)::integer
39
 SELECT COUNT(*)::integer
26
 FROM workflow_jobs
40
 FROM workflow_jobs
internal/actions/queries/workflow_runs.sqlmodified
@@ -70,6 +70,21 @@ SET status = 'running',
70
     updated_at = now()
70
     updated_at = now()
71
 WHERE id = $1 AND status = 'queued';
71
 WHERE id = $1 AND status = 'queued';
72
 
72
 
73
+-- name: CompleteWorkflowRun :one
74
+UPDATE workflow_runs
75
+SET status = 'completed',
76
+    conclusion = sqlc.arg(conclusion)::check_conclusion,
77
+    started_at = COALESCE(started_at, now()),
78
+    completed_at = COALESCE(completed_at, now()),
79
+    version = version + 1,
80
+    updated_at = now()
81
+WHERE id = $1
82
+RETURNING id, repo_id, run_index, workflow_file, workflow_name,
83
+          head_sha, head_ref, event, event_payload,
84
+          actor_user_id, parent_run_id, concurrency_group,
85
+          status, conclusion, pinned, need_approval, approved_by_user_id,
86
+          started_at, completed_at, version, created_at, updated_at, trigger_event_id;
87
+
73
 -- name: NextRunIndexForRepo :one
88
 -- name: NextRunIndexForRepo :one
74
 -- Atomic next-index emitter: take the max + 1 for this repo. Pairs
89
 -- Atomic next-index emitter: take the max + 1 for this repo. Pairs
75
 -- with the (repo_id, run_index) UNIQUE so concurrent inserts that
90
 -- with the (repo_id, run_index) UNIQUE so concurrent inserts that
internal/actions/queries/workflow_step_log_chunks.sqlmodified
@@ -3,6 +3,7 @@
3
 -- name: AppendStepLogChunk :one
3
 -- name: AppendStepLogChunk :one
4
 INSERT INTO workflow_step_log_chunks (step_id, seq, chunk)
4
 INSERT INTO workflow_step_log_chunks (step_id, seq, chunk)
5
 VALUES ($1, $2, $3)
5
 VALUES ($1, $2, $3)
6
+ON CONFLICT (step_id, seq) DO NOTHING
6
 RETURNING id, step_id, seq, created_at;
7
 RETURNING id, step_id, seq, created_at;
7
 
8
 
8
 -- name: ListStepLogChunks :many
9
 -- name: ListStepLogChunks :many
internal/actions/queries/workflow_steps.sqlmodified
@@ -32,6 +32,17 @@ FROM workflow_steps
32
 WHERE job_id = $1
32
 WHERE job_id = $1
33
 ORDER BY step_index ASC;
33
 ORDER BY step_index ASC;
34
 
34
 
35
+-- name: GetFirstStepForJob :one
36
+SELECT id, job_id, step_index, step_id, step_name, if_expr,
37
+       run_command, uses_alias, working_directory, step_env,
38
+       continue_on_error, status, conclusion, log_object_key,
39
+       log_byte_count, started_at, completed_at, version,
40
+       created_at, updated_at, step_with
41
+FROM workflow_steps
42
+WHERE job_id = $1
43
+ORDER BY step_index ASC
44
+LIMIT 1;
45
+
35
 -- name: ListStepsForJob :many
46
 -- name: ListStepsForJob :many
36
 SELECT id, job_id, step_index, step_id, step_name, run_command,
47
 SELECT id, job_id, step_index, step_id, step_name, run_command,
37
        uses_alias, status, conclusion, log_byte_count,
48
        uses_alias, status, conclusion, log_byte_count,
internal/actions/sqlc/querier.gomodified
@@ -14,6 +14,7 @@ type Querier interface {
14
 	// SPDX-License-Identifier: AGPL-3.0-or-later
14
 	// SPDX-License-Identifier: AGPL-3.0-or-later
15
 	AppendStepLogChunk(ctx context.Context, db DBTX, arg AppendStepLogChunkParams) (AppendStepLogChunkRow, error)
15
 	AppendStepLogChunk(ctx context.Context, db DBTX, arg AppendStepLogChunkParams) (AppendStepLogChunkRow, error)
16
 	ClaimQueuedWorkflowJob(ctx context.Context, db DBTX, arg ClaimQueuedWorkflowJobParams) (ClaimQueuedWorkflowJobRow, error)
16
 	ClaimQueuedWorkflowJob(ctx context.Context, db DBTX, arg ClaimQueuedWorkflowJobParams) (ClaimQueuedWorkflowJobRow, error)
17
+	CompleteWorkflowRun(ctx context.Context, db DBTX, arg CompleteWorkflowRunParams) (WorkflowRun, error)
17
 	CountRunningJobsForRunner(ctx context.Context, db DBTX, runnerID int64) (int32, error)
18
 	CountRunningJobsForRunner(ctx context.Context, db DBTX, runnerID int64) (int32, error)
18
 	DeleteExpiredArtifacts(ctx context.Context, db DBTX) ([]DeleteExpiredArtifactsRow, error)
19
 	DeleteExpiredArtifacts(ctx context.Context, db DBTX) ([]DeleteExpiredArtifactsRow, error)
19
 	DeleteExpiredRunnerJWTUses(ctx context.Context, db DBTX) error
20
 	DeleteExpiredRunnerJWTUses(ctx context.Context, db DBTX) error
@@ -33,6 +34,7 @@ type Querier interface {
33
 	// target.
34
 	// target.
34
 	EnqueueWorkflowRun(ctx context.Context, db DBTX, arg EnqueueWorkflowRunParams) (WorkflowRun, error)
35
 	EnqueueWorkflowRun(ctx context.Context, db DBTX, arg EnqueueWorkflowRunParams) (WorkflowRun, error)
35
 	GetArtifactByID(ctx context.Context, db DBTX, id int64) (WorkflowArtifact, error)
36
 	GetArtifactByID(ctx context.Context, db DBTX, id int64) (WorkflowArtifact, error)
37
+	GetFirstStepForJob(ctx context.Context, db DBTX, jobID int64) (WorkflowStep, error)
36
 	GetOrgSecret(ctx context.Context, db DBTX, arg GetOrgSecretParams) (GetOrgSecretRow, error)
38
 	GetOrgSecret(ctx context.Context, db DBTX, arg GetOrgSecretParams) (GetOrgSecretRow, error)
37
 	GetOrgVariable(ctx context.Context, db DBTX, arg GetOrgVariableParams) (GetOrgVariableRow, error)
39
 	GetOrgVariable(ctx context.Context, db DBTX, arg GetOrgVariableParams) (GetOrgVariableRow, error)
38
 	GetRepoSecret(ctx context.Context, db DBTX, arg GetRepoSecretParams) (GetRepoSecretRow, error)
40
 	GetRepoSecret(ctx context.Context, db DBTX, arg GetRepoSecretParams) (GetRepoSecretRow, error)
@@ -83,6 +85,7 @@ type Querier interface {
83
 	NextRunIndexForRepo(ctx context.Context, db DBTX, repoID int64) (int64, error)
85
 	NextRunIndexForRepo(ctx context.Context, db DBTX, repoID int64) (int64, error)
84
 	RevokeAllTokensForRunner(ctx context.Context, db DBTX, runnerID int64) error
86
 	RevokeAllTokensForRunner(ctx context.Context, db DBTX, runnerID int64) error
85
 	TouchRunnerHeartbeat(ctx context.Context, db DBTX, arg TouchRunnerHeartbeatParams) error
87
 	TouchRunnerHeartbeat(ctx context.Context, db DBTX, arg TouchRunnerHeartbeatParams) error
88
+	UpdateWorkflowJobStatus(ctx context.Context, db DBTX, arg UpdateWorkflowJobStatusParams) (WorkflowJob, error)
86
 	UpsertOrgSecret(ctx context.Context, db DBTX, arg UpsertOrgSecretParams) (WorkflowSecret, error)
89
 	UpsertOrgSecret(ctx context.Context, db DBTX, arg UpsertOrgSecretParams) (WorkflowSecret, error)
87
 	UpsertOrgVariable(ctx context.Context, db DBTX, arg UpsertOrgVariableParams) (ActionsVariable, error)
90
 	UpsertOrgVariable(ctx context.Context, db DBTX, arg UpsertOrgVariableParams) (ActionsVariable, error)
88
 	// SPDX-License-Identifier: AGPL-3.0-or-later
91
 	// SPDX-License-Identifier: AGPL-3.0-or-later
internal/actions/sqlc/workflow_jobs.sql.gomodified
@@ -296,3 +296,60 @@ func (q *Queries) ListJobsForRun(ctx context.Context, db DBTX, runID int64) ([]L
296
 	}
296
 	}
297
 	return items, nil
297
 	return items, nil
298
 }
298
 }
299
+
300
+const updateWorkflowJobStatus = `-- name: UpdateWorkflowJobStatus :one
301
+UPDATE workflow_jobs
302
+SET status = $2,
303
+    conclusion = $3::check_conclusion,
304
+    started_at = $4::timestamptz,
305
+    completed_at = $5::timestamptz,
306
+    version = version + 1,
307
+    updated_at = now()
308
+WHERE id = $1
309
+RETURNING id, run_id, job_index, job_key, job_name, runs_on,
310
+          runner_id, needs_jobs, if_expr, timeout_minutes, permissions,
311
+          job_env, status, conclusion, cancel_requested,
312
+          started_at, completed_at, version, created_at, updated_at
313
+`
314
+
315
+type UpdateWorkflowJobStatusParams struct {
316
+	ID          int64
317
+	Status      WorkflowJobStatus
318
+	Conclusion  NullCheckConclusion
319
+	StartedAt   pgtype.Timestamptz
320
+	CompletedAt pgtype.Timestamptz
321
+}
322
+
323
+func (q *Queries) UpdateWorkflowJobStatus(ctx context.Context, db DBTX, arg UpdateWorkflowJobStatusParams) (WorkflowJob, error) {
324
+	row := db.QueryRow(ctx, updateWorkflowJobStatus,
325
+		arg.ID,
326
+		arg.Status,
327
+		arg.Conclusion,
328
+		arg.StartedAt,
329
+		arg.CompletedAt,
330
+	)
331
+	var i WorkflowJob
332
+	err := row.Scan(
333
+		&i.ID,
334
+		&i.RunID,
335
+		&i.JobIndex,
336
+		&i.JobKey,
337
+		&i.JobName,
338
+		&i.RunsOn,
339
+		&i.RunnerID,
340
+		&i.NeedsJobs,
341
+		&i.IfExpr,
342
+		&i.TimeoutMinutes,
343
+		&i.Permissions,
344
+		&i.JobEnv,
345
+		&i.Status,
346
+		&i.Conclusion,
347
+		&i.CancelRequested,
348
+		&i.StartedAt,
349
+		&i.CompletedAt,
350
+		&i.Version,
351
+		&i.CreatedAt,
352
+		&i.UpdatedAt,
353
+	)
354
+	return i, err
355
+}
internal/actions/sqlc/workflow_runs.sql.gomodified
@@ -11,6 +11,58 @@ import (
11
 	"github.com/jackc/pgx/v5/pgtype"
11
 	"github.com/jackc/pgx/v5/pgtype"
12
 )
12
 )
13
 
13
 
14
+const completeWorkflowRun = `-- name: CompleteWorkflowRun :one
15
+UPDATE workflow_runs
16
+SET status = 'completed',
17
+    conclusion = $2::check_conclusion,
18
+    started_at = COALESCE(started_at, now()),
19
+    completed_at = COALESCE(completed_at, now()),
20
+    version = version + 1,
21
+    updated_at = now()
22
+WHERE id = $1
23
+RETURNING id, repo_id, run_index, workflow_file, workflow_name,
24
+          head_sha, head_ref, event, event_payload,
25
+          actor_user_id, parent_run_id, concurrency_group,
26
+          status, conclusion, pinned, need_approval, approved_by_user_id,
27
+          started_at, completed_at, version, created_at, updated_at, trigger_event_id
28
+`
29
+
30
+type CompleteWorkflowRunParams struct {
31
+	ID         int64
32
+	Conclusion CheckConclusion
33
+}
34
+
35
+func (q *Queries) CompleteWorkflowRun(ctx context.Context, db DBTX, arg CompleteWorkflowRunParams) (WorkflowRun, error) {
36
+	row := db.QueryRow(ctx, completeWorkflowRun, arg.ID, arg.Conclusion)
37
+	var i WorkflowRun
38
+	err := row.Scan(
39
+		&i.ID,
40
+		&i.RepoID,
41
+		&i.RunIndex,
42
+		&i.WorkflowFile,
43
+		&i.WorkflowName,
44
+		&i.HeadSha,
45
+		&i.HeadRef,
46
+		&i.Event,
47
+		&i.EventPayload,
48
+		&i.ActorUserID,
49
+		&i.ParentRunID,
50
+		&i.ConcurrencyGroup,
51
+		&i.Status,
52
+		&i.Conclusion,
53
+		&i.Pinned,
54
+		&i.NeedApproval,
55
+		&i.ApprovedByUserID,
56
+		&i.StartedAt,
57
+		&i.CompletedAt,
58
+		&i.Version,
59
+		&i.CreatedAt,
60
+		&i.UpdatedAt,
61
+		&i.TriggerEventID,
62
+	)
63
+	return i, err
64
+}
65
+
14
 const enqueueWorkflowRun = `-- name: EnqueueWorkflowRun :one
66
 const enqueueWorkflowRun = `-- name: EnqueueWorkflowRun :one
15
 INSERT INTO workflow_runs (
67
 INSERT INTO workflow_runs (
16
     repo_id, run_index, workflow_file, workflow_name,
68
     repo_id, run_index, workflow_file, workflow_name,
internal/actions/sqlc/workflow_step_log_chunks.sql.gomodified
@@ -15,6 +15,7 @@ const appendStepLogChunk = `-- name: AppendStepLogChunk :one
15
 
15
 
16
 INSERT INTO workflow_step_log_chunks (step_id, seq, chunk)
16
 INSERT INTO workflow_step_log_chunks (step_id, seq, chunk)
17
 VALUES ($1, $2, $3)
17
 VALUES ($1, $2, $3)
18
+ON CONFLICT (step_id, seq) DO NOTHING
18
 RETURNING id, step_id, seq, created_at
19
 RETURNING id, step_id, seq, created_at
19
 `
20
 `
20
 
21
 
internal/actions/sqlc/workflow_steps.sql.gomodified
@@ -11,6 +11,47 @@ import (
11
 	"github.com/jackc/pgx/v5/pgtype"
11
 	"github.com/jackc/pgx/v5/pgtype"
12
 )
12
 )
13
 
13
 
14
+const getFirstStepForJob = `-- name: GetFirstStepForJob :one
15
+SELECT id, job_id, step_index, step_id, step_name, if_expr,
16
+       run_command, uses_alias, working_directory, step_env,
17
+       continue_on_error, status, conclusion, log_object_key,
18
+       log_byte_count, started_at, completed_at, version,
19
+       created_at, updated_at, step_with
20
+FROM workflow_steps
21
+WHERE job_id = $1
22
+ORDER BY step_index ASC
23
+LIMIT 1
24
+`
25
+
26
+func (q *Queries) GetFirstStepForJob(ctx context.Context, db DBTX, jobID int64) (WorkflowStep, error) {
27
+	row := db.QueryRow(ctx, getFirstStepForJob, jobID)
28
+	var i WorkflowStep
29
+	err := row.Scan(
30
+		&i.ID,
31
+		&i.JobID,
32
+		&i.StepIndex,
33
+		&i.StepID,
34
+		&i.StepName,
35
+		&i.IfExpr,
36
+		&i.RunCommand,
37
+		&i.UsesAlias,
38
+		&i.WorkingDirectory,
39
+		&i.StepEnv,
40
+		&i.ContinueOnError,
41
+		&i.Status,
42
+		&i.Conclusion,
43
+		&i.LogObjectKey,
44
+		&i.LogByteCount,
45
+		&i.StartedAt,
46
+		&i.CompletedAt,
47
+		&i.Version,
48
+		&i.CreatedAt,
49
+		&i.UpdatedAt,
50
+		&i.StepWith,
51
+	)
52
+	return i, err
53
+}
54
+
14
 const getWorkflowStepByID = `-- name: GetWorkflowStepByID :one
55
 const getWorkflowStepByID = `-- name: GetWorkflowStepByID :one
15
 SELECT id, job_id, step_index, step_id, step_name, if_expr,
56
 SELECT id, job_id, step_index, step_id, step_name, if_expr,
16
        run_command, uses_alias, working_directory, step_env,
57
        run_command, uses_alias, working_directory, step_env,
internal/web/handlers/api/runners.gomodified
@@ -4,21 +4,27 @@ package api
4
 
4
 
5
 import (
5
 import (
6
 	"context"
6
 	"context"
7
+	"encoding/base64"
7
 	"encoding/json"
8
 	"encoding/json"
8
 	"errors"
9
 	"errors"
9
 	"fmt"
10
 	"fmt"
10
 	"io"
11
 	"io"
11
 	"net/http"
12
 	"net/http"
13
+	"regexp"
14
+	"strconv"
12
 	"strings"
15
 	"strings"
13
 	"time"
16
 	"time"
14
 
17
 
15
 	"github.com/go-chi/chi/v5"
18
 	"github.com/go-chi/chi/v5"
16
 	"github.com/jackc/pgx/v5"
19
 	"github.com/jackc/pgx/v5"
20
+	"github.com/jackc/pgx/v5/pgtype"
17
 
21
 
18
 	"github.com/tenseleyFlow/shithub/internal/actions/runnerlabels"
22
 	"github.com/tenseleyFlow/shithub/internal/actions/runnerlabels"
19
 	"github.com/tenseleyFlow/shithub/internal/actions/runnertoken"
23
 	"github.com/tenseleyFlow/shithub/internal/actions/runnertoken"
20
 	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
24
 	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
21
 	"github.com/tenseleyFlow/shithub/internal/auth/runnerjwt"
25
 	"github.com/tenseleyFlow/shithub/internal/auth/runnerjwt"
26
+	"github.com/tenseleyFlow/shithub/internal/checks"
27
+	checksdb "github.com/tenseleyFlow/shithub/internal/checks/sqlc"
22
 	"github.com/tenseleyFlow/shithub/internal/ratelimit"
28
 	"github.com/tenseleyFlow/shithub/internal/ratelimit"
23
 )
29
 )
24
 
30
 
@@ -30,6 +36,10 @@ var runnerHeartbeatLimit = ratelimit.Policy{
30
 
36
 
31
 func (h *Handlers) mountRunners(r chi.Router) {
37
 func (h *Handlers) mountRunners(r chi.Router) {
32
 	r.Post("/api/v1/runners/heartbeat", h.runnerHeartbeat)
38
 	r.Post("/api/v1/runners/heartbeat", h.runnerHeartbeat)
39
+	r.Post("/api/v1/jobs/{id}/logs", h.runnerJobLogs)
40
+	r.Post("/api/v1/jobs/{id}/status", h.runnerJobStatus)
41
+	r.Post("/api/v1/jobs/{id}/artifacts/upload", h.runnerJobArtifactUpload)
42
+	r.Post("/api/v1/jobs/{id}/cancel-check", h.runnerJobCancelCheck)
33
 }
43
 }
34
 
44
 
35
 type runnerHeartbeatRequest struct {
45
 type runnerHeartbeatRequest struct {
@@ -226,6 +236,470 @@ func (h *Handlers) claimRunnerJob(
226
 	return job, steps, true, nil
236
 	return job, steps, true, nil
227
 }
237
 }
228
 
238
 
239
+type runnerJobAuth struct {
240
+	Claims   runnerjwt.Claims
241
+	RunnerID int64
242
+	Job      actionsdb.WorkflowJob
243
+}
244
+
245
+func (h *Handlers) authenticateRunnerJob(w http.ResponseWriter, r *http.Request) (runnerJobAuth, bool) {
246
+	if h.d.RunnerJWT == nil {
247
+		writeAPIError(w, http.StatusServiceUnavailable, "runner API is not configured")
248
+		return runnerJobAuth{}, false
249
+	}
250
+	pathJobID, err := strconv.ParseInt(chi.URLParam(r, "id"), 10, 64)
251
+	if err != nil || pathJobID <= 0 {
252
+		writeAPIError(w, http.StatusNotFound, "job not found")
253
+		return runnerJobAuth{}, false
254
+	}
255
+	const prefix = "Bearer "
256
+	authz := r.Header.Get("Authorization")
257
+	if !strings.HasPrefix(authz, prefix) {
258
+		writeAPIError(w, http.StatusUnauthorized, "job token required")
259
+		return runnerJobAuth{}, false
260
+	}
261
+	claims, err := h.d.RunnerJWT.Verify(strings.TrimSpace(strings.TrimPrefix(authz, prefix)))
262
+	if err != nil {
263
+		writeAPIError(w, http.StatusUnauthorized, "job token invalid")
264
+		return runnerJobAuth{}, false
265
+	}
266
+	if claims.JobID != pathJobID {
267
+		writeAPIError(w, http.StatusNotFound, "job not found")
268
+		return runnerJobAuth{}, false
269
+	}
270
+	runnerID, err := claims.RunnerID()
271
+	if err != nil {
272
+		writeAPIError(w, http.StatusUnauthorized, "job token invalid")
273
+		return runnerJobAuth{}, false
274
+	}
275
+	job, err := actionsdb.New().GetWorkflowJobByID(r.Context(), h.d.Pool, pathJobID)
276
+	if err != nil {
277
+		if errors.Is(err, pgx.ErrNoRows) {
278
+			writeAPIError(w, http.StatusNotFound, "job not found")
279
+		} else {
280
+			writeAPIError(w, http.StatusInternalServerError, "job lookup failed")
281
+		}
282
+		return runnerJobAuth{}, false
283
+	}
284
+	if job.RunID != claims.RunID || !job.RunnerID.Valid || job.RunnerID.Int64 != runnerID {
285
+		writeAPIError(w, http.StatusNotFound, "job not found")
286
+		return runnerJobAuth{}, false
287
+	}
288
+	if err := runnerjwt.Consume(r.Context(), h.d.Pool, claims); err != nil {
289
+		if errors.Is(err, runnerjwt.ErrReplay) {
290
+			writeAPIError(w, http.StatusUnauthorized, "job token replayed")
291
+		} else {
292
+			h.d.Logger.ErrorContext(r.Context(), "runner jwt consume failed", "job_id", pathJobID, "error", err)
293
+			writeAPIError(w, http.StatusUnauthorized, "job token invalid")
294
+		}
295
+		return runnerJobAuth{}, false
296
+	}
297
+	return runnerJobAuth{Claims: claims, RunnerID: runnerID, Job: job}, true
298
+}
299
+
300
+type runnerLogRequest struct {
301
+	Seq    int32  `json:"seq"`
302
+	Chunk  string `json:"chunk"`
303
+	StepID int64  `json:"step_id,omitempty"`
304
+}
305
+
306
+func (h *Handlers) runnerJobLogs(w http.ResponseWriter, r *http.Request) {
307
+	auth, ok := h.authenticateRunnerJob(w, r)
308
+	if !ok {
309
+		return
310
+	}
311
+	var body runnerLogRequest
312
+	if err := decodeJSONBody(r.Body, &body); err != nil {
313
+		writeAPIError(w, http.StatusBadRequest, "invalid JSON: "+err.Error())
314
+		return
315
+	}
316
+	if body.Seq < 0 {
317
+		writeAPIError(w, http.StatusBadRequest, "seq must be non-negative")
318
+		return
319
+	}
320
+	chunk, err := decodeBase64(body.Chunk)
321
+	if err != nil {
322
+		writeAPIError(w, http.StatusBadRequest, "chunk must be base64")
323
+		return
324
+	}
325
+	if len(chunk) == 0 || len(chunk) > 512*1024 {
326
+		writeAPIError(w, http.StatusBadRequest, "chunk must be between 1 and 524288 bytes")
327
+		return
328
+	}
329
+	stepID, ok := h.resolveLogStep(w, r, auth.Job.ID, body.StepID)
330
+	if !ok {
331
+		return
332
+	}
333
+	if _, err := actionsdb.New().AppendStepLogChunk(r.Context(), h.d.Pool, actionsdb.AppendStepLogChunkParams{
334
+		StepID: stepID,
335
+		Seq:    body.Seq,
336
+		Chunk:  chunk,
337
+	}); err != nil && !errors.Is(err, pgx.ErrNoRows) {
338
+		writeAPIError(w, http.StatusInternalServerError, "append log failed")
339
+		return
340
+	}
341
+	h.writeNextTokenResponse(w, r, http.StatusAccepted, auth, map[string]any{"accepted": true})
342
+}
343
+
344
+func (h *Handlers) resolveLogStep(w http.ResponseWriter, r *http.Request, jobID, stepID int64) (int64, bool) {
345
+	q := actionsdb.New()
346
+	if stepID == 0 {
347
+		step, err := q.GetFirstStepForJob(r.Context(), h.d.Pool, jobID)
348
+		if err != nil {
349
+			writeAPIError(w, http.StatusNotFound, "step not found")
350
+			return 0, false
351
+		}
352
+		return step.ID, true
353
+	}
354
+	step, err := q.GetWorkflowStepByID(r.Context(), h.d.Pool, stepID)
355
+	if err != nil || step.JobID != jobID {
356
+		writeAPIError(w, http.StatusNotFound, "step not found")
357
+		return 0, false
358
+	}
359
+	return step.ID, true
360
+}
361
+
362
+type runnerStatusRequest struct {
363
+	Status      string `json:"status"`
364
+	Conclusion  string `json:"conclusion,omitempty"`
365
+	StartedAt   string `json:"started_at,omitempty"`
366
+	CompletedAt string `json:"completed_at,omitempty"`
367
+}
368
+
369
+func (h *Handlers) runnerJobStatus(w http.ResponseWriter, r *http.Request) {
370
+	auth, ok := h.authenticateRunnerJob(w, r)
371
+	if !ok {
372
+		return
373
+	}
374
+	var body runnerStatusRequest
375
+	if err := decodeJSONBody(r.Body, &body); err != nil {
376
+		writeAPIError(w, http.StatusBadRequest, "invalid JSON: "+err.Error())
377
+		return
378
+	}
379
+	update, terminal, err := normalizeJobStatusUpdate(auth.Job, body)
380
+	if err != nil {
381
+		writeAPIError(w, http.StatusBadRequest, err.Error())
382
+		return
383
+	}
384
+	updated, runCompleted, runConclusion, err := h.applyJobStatus(r.Context(), auth.Job, update)
385
+	if err != nil {
386
+		writeAPIError(w, http.StatusInternalServerError, "status update failed")
387
+		return
388
+	}
389
+	if err := h.updateCheckRunForJob(r.Context(), updated); err != nil {
390
+		h.d.Logger.WarnContext(r.Context(), "runner check_run update failed", "job_id", updated.ID, "error", err)
391
+	}
392
+
393
+	bodyMap := map[string]any{
394
+		"status":     string(updated.Status),
395
+		"conclusion": nullableConclusion(updated.Conclusion),
396
+	}
397
+	if runCompleted {
398
+		bodyMap["run_status"] = "completed"
399
+		bodyMap["run_conclusion"] = string(runConclusion)
400
+	}
401
+	if terminal {
402
+		writeJSON(w, http.StatusOK, bodyMap)
403
+		return
404
+	}
405
+	h.writeNextTokenResponse(w, r, http.StatusOK, auth, bodyMap)
406
+}
407
+
408
+type normalizedJobStatusUpdate struct {
409
+	Status      actionsdb.WorkflowJobStatus
410
+	Conclusion  actionsdb.NullCheckConclusion
411
+	StartedAt   pgtype.Timestamptz
412
+	CompletedAt pgtype.Timestamptz
413
+}
414
+
415
+func normalizeJobStatusUpdate(job actionsdb.WorkflowJob, body runnerStatusRequest) (normalizedJobStatusUpdate, bool, error) {
416
+	now := time.Now().UTC()
417
+	status := actionsdb.WorkflowJobStatus(strings.TrimSpace(body.Status))
418
+	if status == "" {
419
+		return normalizedJobStatusUpdate{}, false, errors.New("status is required")
420
+	}
421
+	if !validWorkflowJobTransition(job.Status, status) {
422
+		return normalizedJobStatusUpdate{}, false, fmt.Errorf("invalid status transition %s -> %s", job.Status, status)
423
+	}
424
+	startedAt := job.StartedAt
425
+	if body.StartedAt != "" {
426
+		t, err := parseTimeOptional(body.StartedAt)
427
+		if err != nil {
428
+			return normalizedJobStatusUpdate{}, false, fmt.Errorf("started_at: %w", err)
429
+		}
430
+		startedAt = pgtype.Timestamptz{Time: t, Valid: !t.IsZero()}
431
+	}
432
+	if !startedAt.Valid && (status == actionsdb.WorkflowJobStatusRunning ||
433
+		status == actionsdb.WorkflowJobStatusCompleted ||
434
+		status == actionsdb.WorkflowJobStatusCancelled) {
435
+		startedAt = pgtype.Timestamptz{Time: now, Valid: true}
436
+	}
437
+	completedAt := job.CompletedAt
438
+	terminal := status == actionsdb.WorkflowJobStatusCompleted || status == actionsdb.WorkflowJobStatusCancelled
439
+	if body.CompletedAt != "" {
440
+		t, err := parseTimeOptional(body.CompletedAt)
441
+		if err != nil {
442
+			return normalizedJobStatusUpdate{}, false, fmt.Errorf("completed_at: %w", err)
443
+		}
444
+		completedAt = pgtype.Timestamptz{Time: t, Valid: !t.IsZero()}
445
+	}
446
+	if terminal && !completedAt.Valid {
447
+		completedAt = pgtype.Timestamptz{Time: now, Valid: true}
448
+	}
449
+	conclusion := actionsdb.NullCheckConclusion{}
450
+	if terminal {
451
+		c := strings.TrimSpace(body.Conclusion)
452
+		if c == "" && status == actionsdb.WorkflowJobStatusCancelled {
453
+			c = "cancelled"
454
+		}
455
+		if !validRunnerConclusion(c) {
456
+			return normalizedJobStatusUpdate{}, false, errors.New("invalid or missing conclusion")
457
+		}
458
+		conclusion = actionsdb.NullCheckConclusion{CheckConclusion: actionsdb.CheckConclusion(c), Valid: true}
459
+	} else if strings.TrimSpace(body.Conclusion) != "" {
460
+		return normalizedJobStatusUpdate{}, false, errors.New("conclusion is only valid for terminal statuses")
461
+	}
462
+	return normalizedJobStatusUpdate{
463
+		Status:      status,
464
+		Conclusion:  conclusion,
465
+		StartedAt:   startedAt,
466
+		CompletedAt: completedAt,
467
+	}, terminal, nil
468
+}
469
+
470
+func validWorkflowJobTransition(from, to actionsdb.WorkflowJobStatus) bool {
471
+	switch to {
472
+	case actionsdb.WorkflowJobStatusRunning:
473
+		return from == actionsdb.WorkflowJobStatusQueued || from == actionsdb.WorkflowJobStatusRunning
474
+	case actionsdb.WorkflowJobStatusCompleted:
475
+		return from == actionsdb.WorkflowJobStatusQueued || from == actionsdb.WorkflowJobStatusRunning || from == actionsdb.WorkflowJobStatusCompleted
476
+	case actionsdb.WorkflowJobStatusCancelled:
477
+		return from == actionsdb.WorkflowJobStatusQueued || from == actionsdb.WorkflowJobStatusRunning || from == actionsdb.WorkflowJobStatusCancelled
478
+	default:
479
+		return false
480
+	}
481
+}
482
+
483
+func (h *Handlers) applyJobStatus(
484
+	ctx context.Context,
485
+	job actionsdb.WorkflowJob,
486
+	update normalizedJobStatusUpdate,
487
+) (actionsdb.WorkflowJob, bool, actionsdb.CheckConclusion, error) {
488
+	q := actionsdb.New()
489
+	tx, err := h.d.Pool.Begin(ctx)
490
+	if err != nil {
491
+		return actionsdb.WorkflowJob{}, false, "", err
492
+	}
493
+	committed := false
494
+	defer func() {
495
+		if !committed {
496
+			_ = tx.Rollback(ctx)
497
+		}
498
+	}()
499
+	updated, err := q.UpdateWorkflowJobStatus(ctx, tx, actionsdb.UpdateWorkflowJobStatusParams{
500
+		ID:          job.ID,
501
+		Status:      update.Status,
502
+		Conclusion:  update.Conclusion,
503
+		StartedAt:   update.StartedAt,
504
+		CompletedAt: update.CompletedAt,
505
+	})
506
+	if err != nil {
507
+		return actionsdb.WorkflowJob{}, false, "", err
508
+	}
509
+	jobs, err := q.ListJobsForRun(ctx, tx, updated.RunID)
510
+	if err != nil {
511
+		return actionsdb.WorkflowJob{}, false, "", err
512
+	}
513
+	runConclusion, complete := deriveWorkflowRunConclusion(jobs)
514
+	if complete {
515
+		if _, err := q.CompleteWorkflowRun(ctx, tx, actionsdb.CompleteWorkflowRunParams{
516
+			ID:         updated.RunID,
517
+			Conclusion: runConclusion,
518
+		}); err != nil {
519
+			return actionsdb.WorkflowJob{}, false, "", err
520
+		}
521
+	} else if err := q.MarkWorkflowRunRunning(ctx, tx, updated.RunID); err != nil {
522
+		return actionsdb.WorkflowJob{}, false, "", err
523
+	}
524
+	if err := tx.Commit(ctx); err != nil {
525
+		return actionsdb.WorkflowJob{}, false, "", err
526
+	}
527
+	committed = true
528
+	return updated, complete, runConclusion, nil
529
+}
530
+
531
+func deriveWorkflowRunConclusion(jobs []actionsdb.ListJobsForRunRow) (actionsdb.CheckConclusion, bool) {
532
+	if len(jobs) == 0 {
533
+		return actionsdb.CheckConclusionFailure, true
534
+	}
535
+	worst := actionsdb.CheckConclusionSuccess
536
+	for _, job := range jobs {
537
+		switch job.Status {
538
+		case actionsdb.WorkflowJobStatusCompleted, actionsdb.WorkflowJobStatusCancelled, actionsdb.WorkflowJobStatusSkipped:
539
+		default:
540
+			return "", false
541
+		}
542
+		if job.Status == actionsdb.WorkflowJobStatusCancelled {
543
+			worst = actionsdb.CheckConclusionCancelled
544
+			continue
545
+		}
546
+		if !job.Conclusion.Valid {
547
+			return actionsdb.CheckConclusionFailure, true
548
+		}
549
+		c := job.Conclusion.CheckConclusion
550
+		if c == actionsdb.CheckConclusionFailure ||
551
+			c == actionsdb.CheckConclusionTimedOut ||
552
+			c == actionsdb.CheckConclusionActionRequired {
553
+			return c, true
554
+		}
555
+		if c == actionsdb.CheckConclusionCancelled {
556
+			worst = actionsdb.CheckConclusionCancelled
557
+		}
558
+	}
559
+	return worst, true
560
+}
561
+
562
+func (h *Handlers) updateCheckRunForJob(ctx context.Context, job actionsdb.WorkflowJob) error {
563
+	run, err := actionsdb.New().GetWorkflowRunByID(ctx, h.d.Pool, job.RunID)
564
+	if err != nil {
565
+		return err
566
+	}
567
+	name := job.JobName
568
+	if name == "" {
569
+		name = job.JobKey
570
+	}
571
+	checkRun, err := checksdb.New().GetCheckRunByExternalID(ctx, h.d.Pool, checksdb.GetCheckRunByExternalIDParams{
572
+		RepoID:     run.RepoID,
573
+		HeadSha:    run.HeadSha,
574
+		Name:       name,
575
+		ExternalID: pgtype.Text{String: fmt.Sprintf("workflow_run:%d:job:%s", job.RunID, job.JobKey), Valid: true},
576
+	})
577
+	if err != nil {
578
+		return err
579
+	}
580
+	params := checks.UpdateParams{
581
+		RunID:        checkRun.ID,
582
+		HasStatus:    true,
583
+		HasStartedAt: true,
584
+		StartedAt:    timeFromPg(job.StartedAt),
585
+	}
586
+	switch job.Status {
587
+	case actionsdb.WorkflowJobStatusRunning:
588
+		params.Status = "in_progress"
589
+	case actionsdb.WorkflowJobStatusCompleted, actionsdb.WorkflowJobStatusCancelled:
590
+		params.Status = "completed"
591
+		params.HasConclusion = true
592
+		if job.Conclusion.Valid {
593
+			params.Conclusion = string(job.Conclusion.CheckConclusion)
594
+		} else if job.Status == actionsdb.WorkflowJobStatusCancelled {
595
+			params.Conclusion = "cancelled"
596
+		}
597
+		params.HasCompletedAt = true
598
+		params.CompletedAt = timeFromPg(job.CompletedAt)
599
+	default:
600
+		return nil
601
+	}
602
+	_, err = checks.Update(ctx, checks.Deps{Pool: h.d.Pool, Logger: h.d.Logger}, params)
603
+	return err
604
+}
605
+
606
+type runnerArtifactUploadRequest struct {
607
+	Name      string `json:"name"`
608
+	SizeBytes int64  `json:"size_bytes"`
609
+}
610
+
611
+var artifactNameRE = regexp.MustCompile(`^[A-Za-z0-9._-]+$`)
612
+
613
+func (h *Handlers) runnerJobArtifactUpload(w http.ResponseWriter, r *http.Request) {
614
+	if h.d.ObjectStore == nil {
615
+		writeAPIError(w, http.StatusServiceUnavailable, "object storage is not configured")
616
+		return
617
+	}
618
+	auth, ok := h.authenticateRunnerJob(w, r)
619
+	if !ok {
620
+		return
621
+	}
622
+	var body runnerArtifactUploadRequest
623
+	if err := decodeJSONBody(r.Body, &body); err != nil {
624
+		writeAPIError(w, http.StatusBadRequest, "invalid JSON: "+err.Error())
625
+		return
626
+	}
627
+	body.Name = strings.TrimSpace(body.Name)
628
+	if !validArtifactName(body.Name) {
629
+		writeAPIError(w, http.StatusBadRequest, "invalid artifact name")
630
+		return
631
+	}
632
+	if body.SizeBytes < 0 {
633
+		writeAPIError(w, http.StatusBadRequest, "size_bytes must be non-negative")
634
+		return
635
+	}
636
+	objectKey := fmt.Sprintf("actions/runs/%d/artifacts/%s", auth.Claims.RunID, body.Name)
637
+	artifact, err := actionsdb.New().InsertArtifact(r.Context(), h.d.Pool, actionsdb.InsertArtifactParams{
638
+		RunID:     auth.Claims.RunID,
639
+		Name:      body.Name,
640
+		ObjectKey: objectKey,
641
+		ByteCount: body.SizeBytes,
642
+		ExpiresAt: pgtype.Timestamptz{
643
+			Time:  time.Now().UTC().Add(90 * 24 * time.Hour),
644
+			Valid: true,
645
+		},
646
+	})
647
+	if err != nil {
648
+		writeAPIError(w, http.StatusInternalServerError, "artifact create failed")
649
+		return
650
+	}
651
+	uploadURL, err := h.d.ObjectStore.SignedURL(r.Context(), objectKey, 15*time.Minute, http.MethodPut)
652
+	if err != nil {
653
+		writeAPIError(w, http.StatusInternalServerError, "artifact upload url failed")
654
+		return
655
+	}
656
+	h.writeNextTokenResponse(w, r, http.StatusCreated, auth, map[string]any{
657
+		"artifact_id": artifact.ID,
658
+		"upload_url":  uploadURL,
659
+	})
660
+}
661
+
662
+func validArtifactName(name string) bool {
663
+	return len(name) >= 1 &&
664
+		len(name) <= 100 &&
665
+		artifactNameRE.MatchString(name) &&
666
+		!strings.HasPrefix(name, "..") &&
667
+		!strings.Contains(name, "/")
668
+}
669
+
670
+func (h *Handlers) runnerJobCancelCheck(w http.ResponseWriter, r *http.Request) {
671
+	auth, ok := h.authenticateRunnerJob(w, r)
672
+	if !ok {
673
+		return
674
+	}
675
+	h.writeNextTokenResponse(w, r, http.StatusOK, auth, map[string]any{
676
+		"cancelled": auth.Job.CancelRequested,
677
+	})
678
+}
679
+
680
+func (h *Handlers) writeNextTokenResponse(
681
+	w http.ResponseWriter,
682
+	r *http.Request,
683
+	status int,
684
+	auth runnerJobAuth,
685
+	body map[string]any,
686
+) {
687
+	token, claims, err := h.d.RunnerJWT.Mint(runnerjwt.MintParams{
688
+		RunnerID: auth.RunnerID,
689
+		JobID:    auth.Claims.JobID,
690
+		RunID:    auth.Claims.RunID,
691
+		RepoID:   auth.Claims.RepoID,
692
+	})
693
+	if err != nil {
694
+		h.d.Logger.ErrorContext(r.Context(), "runner next-token mint failed", "job_id", auth.Claims.JobID, "error", err)
695
+		writeAPIError(w, http.StatusInternalServerError, "runner token mint failed")
696
+		return
697
+	}
698
+	body["next_token"] = token
699
+	body["next_token_expires_at"] = time.Unix(claims.Exp, 0).UTC().Format(time.RFC3339)
700
+	writeJSON(w, status, body)
701
+}
702
+
229
 type runnerClaimResponse struct {
703
 type runnerClaimResponse struct {
230
 	Token     string           `json:"token"`
704
 	Token     string           `json:"token"`
231
 	ExpiresAt string           `json:"expires_at"`
705
 	ExpiresAt string           `json:"expires_at"`
@@ -321,3 +795,45 @@ func rawJSONOrObject(b []byte) json.RawMessage {
321
 	}
795
 	}
322
 	return json.RawMessage(b)
796
 	return json.RawMessage(b)
323
 }
797
 }
798
+
799
+func decodeJSONBody(r io.Reader, v any) error {
800
+	dec := json.NewDecoder(r)
801
+	dec.DisallowUnknownFields()
802
+	return dec.Decode(v)
803
+}
804
+
805
+func decodeBase64(s string) ([]byte, error) {
806
+	if out, err := base64.StdEncoding.DecodeString(s); err == nil {
807
+		return out, nil
808
+	}
809
+	return base64.RawStdEncoding.DecodeString(s)
810
+}
811
+
812
+func validRunnerConclusion(c string) bool {
813
+	switch actionsdb.CheckConclusion(c) {
814
+	case actionsdb.CheckConclusionSuccess,
815
+		actionsdb.CheckConclusionFailure,
816
+		actionsdb.CheckConclusionNeutral,
817
+		actionsdb.CheckConclusionCancelled,
818
+		actionsdb.CheckConclusionSkipped,
819
+		actionsdb.CheckConclusionTimedOut,
820
+		actionsdb.CheckConclusionActionRequired:
821
+		return true
822
+	default:
823
+		return false
824
+	}
825
+}
826
+
827
+func nullableConclusion(c actionsdb.NullCheckConclusion) any {
828
+	if !c.Valid {
829
+		return nil
830
+	}
831
+	return string(c.CheckConclusion)
832
+}
833
+
834
+func timeFromPg(t pgtype.Timestamptz) time.Time {
835
+	if !t.Valid {
836
+		return time.Time{}
837
+	}
838
+	return t.Time
839
+}
internal/web/handlers/api/runners_test.gomodified
@@ -5,7 +5,9 @@ package api_test
5
 import (
5
 import (
6
 	"bytes"
6
 	"bytes"
7
 	"context"
7
 	"context"
8
+	"encoding/base64"
8
 	"encoding/json"
9
 	"encoding/json"
10
+	"fmt"
9
 	"io"
11
 	"io"
10
 	"log/slog"
12
 	"log/slog"
11
 	"net/http"
13
 	"net/http"
@@ -23,6 +25,7 @@ import (
23
 	"github.com/tenseleyFlow/shithub/internal/actions/trigger"
25
 	"github.com/tenseleyFlow/shithub/internal/actions/trigger"
24
 	"github.com/tenseleyFlow/shithub/internal/actions/workflow"
26
 	"github.com/tenseleyFlow/shithub/internal/actions/workflow"
25
 	"github.com/tenseleyFlow/shithub/internal/auth/runnerjwt"
27
 	"github.com/tenseleyFlow/shithub/internal/auth/runnerjwt"
28
+	"github.com/tenseleyFlow/shithub/internal/infra/storage"
26
 	reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
29
 	reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
27
 	"github.com/tenseleyFlow/shithub/internal/testing/dbtest"
30
 	"github.com/tenseleyFlow/shithub/internal/testing/dbtest"
28
 	usersdb "github.com/tenseleyFlow/shithub/internal/users/sqlc"
31
 	usersdb "github.com/tenseleyFlow/shithub/internal/users/sqlc"
@@ -89,24 +92,62 @@ func TestRunnerHeartbeatClaimsQueuedJob(t *testing.T) {
89
 		t.Fatalf("claims runner_id: got %d, want %d", claimRunnerID, runnerID)
92
 		t.Fatalf("claims runner_id: got %d, want %d", claimRunnerID, runnerID)
90
 	}
93
 	}
91
 
94
 
95
+	var logResp struct {
96
+		Accepted  bool   `json:"accepted"`
97
+		NextToken string `json:"next_token"`
98
+	}
99
+	logBody := fmt.Sprintf(`{"seq":0,"chunk":%q}`, base64.StdEncoding.EncodeToString([]byte("hello\n")))
100
+	req = httptest.NewRequest(http.MethodPost, fmt.Sprintf("/api/v1/jobs/%d/logs", resp.Job.ID), strings.NewReader(logBody))
101
+	req.Header.Set("Authorization", "Bearer "+resp.Token)
102
+	rr = httptest.NewRecorder()
103
+	router.ServeHTTP(rr, req)
104
+	if rr.Code != http.StatusAccepted {
105
+		t.Fatalf("logs status: got %d, want 202; body=%s", rr.Code, rr.Body.String())
106
+	}
107
+	if err := json.Unmarshal(rr.Body.Bytes(), &logResp); err != nil {
108
+		t.Fatalf("decode log response: %v", err)
109
+	}
110
+	if !logResp.Accepted || logResp.NextToken == "" {
111
+		t.Fatalf("unexpected log response: %+v", logResp)
112
+	}
113
+
114
+	req = httptest.NewRequest(http.MethodPost, fmt.Sprintf("/api/v1/jobs/%d/logs", resp.Job.ID), strings.NewReader(logBody))
115
+	req.Header.Set("Authorization", "Bearer "+resp.Token)
116
+	rr = httptest.NewRecorder()
117
+	router.ServeHTTP(rr, req)
118
+	if rr.Code != http.StatusUnauthorized {
119
+		t.Fatalf("replay status: got %d, want 401; body=%s", rr.Code, rr.Body.String())
120
+	}
121
+
122
+	req = httptest.NewRequest(http.MethodPost, fmt.Sprintf("/api/v1/jobs/%d/status", resp.Job.ID),
123
+		strings.NewReader(`{"status":"completed","conclusion":"success"}`))
124
+	req.Header.Set("Authorization", "Bearer "+logResp.NextToken)
125
+	rr = httptest.NewRecorder()
126
+	router.ServeHTTP(rr, req)
127
+	if rr.Code != http.StatusOK {
128
+		t.Fatalf("complete status: got %d, want 200; body=%s", rr.Code, rr.Body.String())
129
+	}
130
+
92
 	q := actionsdb.New()
131
 	q := actionsdb.New()
93
 	job, err := q.GetWorkflowJobByID(ctx, pool, resp.Job.ID)
132
 	job, err := q.GetWorkflowJobByID(ctx, pool, resp.Job.ID)
94
 	if err != nil {
133
 	if err != nil {
95
 		t.Fatalf("GetWorkflowJobByID: %v", err)
134
 		t.Fatalf("GetWorkflowJobByID: %v", err)
96
 	}
135
 	}
97
-	if job.Status != actionsdb.WorkflowJobStatusRunning || !job.RunnerID.Valid || job.RunnerID.Int64 != runnerID {
136
+	if job.Status != actionsdb.WorkflowJobStatusCompleted || !job.RunnerID.Valid || job.RunnerID.Int64 != runnerID ||
98
-		t.Fatalf("job not claimed by runner: %+v", job)
137
+		!job.Conclusion.Valid || job.Conclusion.CheckConclusion != actionsdb.CheckConclusionSuccess {
138
+		t.Fatalf("job not completed by runner: %+v", job)
99
 	}
139
 	}
100
 	run, err := q.GetWorkflowRunByID(ctx, pool, runID)
140
 	run, err := q.GetWorkflowRunByID(ctx, pool, runID)
101
 	if err != nil {
141
 	if err != nil {
102
 		t.Fatalf("GetWorkflowRunByID: %v", err)
142
 		t.Fatalf("GetWorkflowRunByID: %v", err)
103
 	}
143
 	}
104
-	if run.Status != actionsdb.WorkflowRunStatusRunning {
144
+	if run.Status != actionsdb.WorkflowRunStatusCompleted ||
105
-		t.Fatalf("run status: got %s, want running", run.Status)
145
+		!run.Conclusion.Valid || run.Conclusion.CheckConclusion != actionsdb.CheckConclusionSuccess {
146
+		t.Fatalf("run not completed successfully: %+v", run)
106
 	}
147
 	}
107
 
148
 
108
-	// Capacity is enforced server-side: a second heartbeat from the same
149
+	// The completed job is no longer counted against capacity, and no other
109
-	// runner sees one running job and receives no additional claim.
150
+	// queued job exists, so the heartbeat is an empty 204.
110
 	req = httptest.NewRequest(http.MethodPost, "/api/v1/runners/heartbeat",
151
 	req = httptest.NewRequest(http.MethodPost, "/api/v1/runners/heartbeat",
111
 		strings.NewReader(`{"labels":["ubuntu-latest","linux"],"capacity":1}`))
152
 		strings.NewReader(`{"labels":["ubuntu-latest","linux"],"capacity":1}`))
112
 	req.Header.Set("Authorization", "Bearer "+token)
153
 	req.Header.Set("Authorization", "Bearer "+token)
@@ -129,12 +170,80 @@ func TestRunnerHeartbeatRejectsBadToken(t *testing.T) {
129
 	}
170
 	}
130
 }
171
 }
131
 
172
 
132
-func newRunnerAPIRouter(t *testing.T, pool *pgxpool.Pool, logger *slog.Logger, signer *runnerjwt.Signer) http.Handler {
173
+func TestRunnerArtifactUploadReturnsSignedURL(t *testing.T) {
174
+	ctx := context.Background()
175
+	pool := dbtest.NewTestDB(t)
176
+	logger := slog.New(slog.NewTextHandler(io.Discard, nil))
177
+	repoID, userID := setupRunnerAPIRepo(t, pool)
178
+	enqueueRunnerAPIRun(t, pool, logger, repoID, userID)
179
+	token, _ := registerRunnerForTest(t, pool, []string{"ubuntu-latest"}, 1)
180
+	router := newRunnerAPIRouter(t, pool, logger, runnerAPISigner(t, time.Now()), storage.NewMemoryStore())
181
+
182
+	req := httptest.NewRequest(http.MethodPost, "/api/v1/runners/heartbeat",
183
+		strings.NewReader(`{"labels":["ubuntu-latest"],"capacity":1}`))
184
+	req.Header.Set("Authorization", "Bearer "+token)
185
+	rr := httptest.NewRecorder()
186
+	router.ServeHTTP(rr, req)
187
+	if rr.Code != http.StatusOK {
188
+		t.Fatalf("heartbeat status: got %d, want 200; body=%s", rr.Code, rr.Body.String())
189
+	}
190
+	var claim struct {
191
+		Token string `json:"token"`
192
+		Job   struct {
193
+			ID    int64 `json:"id"`
194
+			RunID int64 `json:"run_id"`
195
+		} `json:"job"`
196
+	}
197
+	if err := json.Unmarshal(rr.Body.Bytes(), &claim); err != nil {
198
+		t.Fatalf("decode claim: %v", err)
199
+	}
200
+
201
+	req = httptest.NewRequest(http.MethodPost, fmt.Sprintf("/api/v1/jobs/%d/artifacts/upload", claim.Job.ID),
202
+		strings.NewReader(`{"name":"test-results.tgz","size_bytes":123}`))
203
+	req.Header.Set("Authorization", "Bearer "+claim.Token)
204
+	rr = httptest.NewRecorder()
205
+	router.ServeHTTP(rr, req)
206
+	if rr.Code != http.StatusCreated {
207
+		t.Fatalf("artifact status: got %d, want 201; body=%s", rr.Code, rr.Body.String())
208
+	}
209
+	var upload struct {
210
+		ArtifactID int64  `json:"artifact_id"`
211
+		UploadURL  string `json:"upload_url"`
212
+		NextToken  string `json:"next_token"`
213
+	}
214
+	if err := json.Unmarshal(rr.Body.Bytes(), &upload); err != nil {
215
+		t.Fatalf("decode upload: %v", err)
216
+	}
217
+	if upload.ArtifactID == 0 || upload.NextToken == "" ||
218
+		!strings.HasPrefix(upload.UploadURL, "mem://actions/runs/") {
219
+		t.Fatalf("unexpected upload response: %+v", upload)
220
+	}
221
+	artifacts, err := actionsdb.New().ListArtifactsForRun(ctx, pool, claim.Job.RunID)
222
+	if err != nil {
223
+		t.Fatalf("ListArtifactsForRun: %v", err)
224
+	}
225
+	if len(artifacts) != 1 || artifacts[0].Name != "test-results.tgz" || artifacts[0].ByteCount != 123 {
226
+		t.Fatalf("unexpected artifacts: %+v", artifacts)
227
+	}
228
+}
229
+
230
+func newRunnerAPIRouter(
231
+	t *testing.T,
232
+	pool *pgxpool.Pool,
233
+	logger *slog.Logger,
234
+	signer *runnerjwt.Signer,
235
+	stores ...storage.ObjectStore,
236
+) http.Handler {
133
 	t.Helper()
237
 	t.Helper()
238
+	var store storage.ObjectStore
239
+	if len(stores) > 0 {
240
+		store = stores[0]
241
+	}
134
 	h, err := apih.New(apih.Deps{
242
 	h, err := apih.New(apih.Deps{
135
-		Pool:      pool,
243
+		Pool:        pool,
136
-		Logger:    logger,
244
+		Logger:      logger,
137
-		RunnerJWT: signer,
245
+		RunnerJWT:   signer,
246
+		ObjectStore: store,
138
 	})
247
 	})
139
 	if err != nil {
248
 	if err != nil {
140
 		t.Fatalf("api.New: %v", err)
249
 		t.Fatalf("api.New: %v", err)