Go · 46627 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 package api
4
5 import (
6 "context"
7 "encoding/base64"
8 "encoding/json"
9 "errors"
10 "fmt"
11 "io"
12 "net/http"
13 "net/url"
14 "regexp"
15 "sort"
16 "strconv"
17 "strings"
18 "time"
19
20 "github.com/go-chi/chi/v5"
21 "github.com/jackc/pgx/v5"
22 "github.com/jackc/pgx/v5/pgtype"
23
24 actionsevents "github.com/tenseleyFlow/shithub/internal/actions/events"
25 "github.com/tenseleyFlow/shithub/internal/actions/finalize"
26 actionslifecycle "github.com/tenseleyFlow/shithub/internal/actions/lifecycle"
27 "github.com/tenseleyFlow/shithub/internal/actions/logstream"
28 "github.com/tenseleyFlow/shithub/internal/actions/runnerlabels"
29 "github.com/tenseleyFlow/shithub/internal/actions/runnertoken"
30 actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
31 "github.com/tenseleyFlow/shithub/internal/auth/runnerjwt"
32 "github.com/tenseleyFlow/shithub/internal/infra/metrics"
33 "github.com/tenseleyFlow/shithub/internal/ratelimit"
34 reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
35 "github.com/tenseleyFlow/shithub/internal/runner/scrub"
36 "github.com/tenseleyFlow/shithub/internal/worker"
37 )
38
39 var runnerHeartbeatLimit = ratelimit.Policy{
40 Scope: "actions:runner_dispatch",
41 Max: 60,
42 Window: time.Minute,
43 }
44
45 func (h *Handlers) mountRunners(r chi.Router) {
46 r.Post("/api/v1/runners/heartbeat", h.runnerHeartbeat)
47 r.Post("/api/v1/jobs/{id}/logs", h.runnerJobLogs)
48 r.Post("/api/v1/jobs/{id}/steps/{step_id}/status", h.runnerStepStatus)
49 r.Post("/api/v1/jobs/{id}/status", h.runnerJobStatus)
50 r.Post("/api/v1/jobs/{id}/artifacts/upload", h.runnerJobArtifactUpload)
51 r.Post("/api/v1/jobs/{id}/cancel-check", h.runnerJobCancelCheck)
52 }
53
54 type runnerHeartbeatRequest struct {
55 Labels []string `json:"labels"`
56 Capacity int `json:"capacity"`
57 }
58
59 func (h *Handlers) runnerHeartbeat(w http.ResponseWriter, r *http.Request) {
60 if h.d.RunnerJWT == nil {
61 writeAPIError(w, http.StatusServiceUnavailable, "runner API is not configured")
62 return
63 }
64 runner, ok := h.authenticateRunner(w, r)
65 if !ok {
66 return
67 }
68 if !h.allowRunnerHeartbeat(w, r, runner.ID) {
69 return
70 }
71
72 var body runnerHeartbeatRequest
73 dec := json.NewDecoder(r.Body)
74 dec.DisallowUnknownFields()
75 if err := dec.Decode(&body); err != nil && !errors.Is(err, io.EOF) {
76 writeAPIError(w, http.StatusBadRequest, "invalid JSON: "+err.Error())
77 return
78 }
79 labels := runner.Labels
80 if body.Labels != nil {
81 var err error
82 labels, err = runnerlabels.Normalize(body.Labels)
83 if err != nil {
84 writeAPIError(w, http.StatusBadRequest, err.Error())
85 return
86 }
87 }
88 capacity := int(runner.Capacity)
89 if body.Capacity != 0 {
90 capacity = body.Capacity
91 }
92 if capacity < 1 || capacity > 64 {
93 writeAPIError(w, http.StatusBadRequest, "capacity must be between 1 and 64")
94 return
95 }
96
97 job, steps, resolvedSecrets, claimed, err := h.claimRunnerJob(r.Context(), runner.ID, labels, int32(capacity))
98 if err != nil {
99 h.d.Logger.ErrorContext(r.Context(), "runner heartbeat claim failed", "runner_id", runner.ID, "error", err)
100 writeAPIError(w, http.StatusInternalServerError, "runner heartbeat failed")
101 return
102 }
103 if !claimed {
104 metrics.ActionsRunnerHeartbeatsTotal.WithLabelValues("no_job").Inc()
105 w.WriteHeader(http.StatusNoContent)
106 return
107 }
108
109 token, claims, err := h.d.RunnerJWT.Mint(runnerjwt.MintParams{
110 RunnerID: runner.ID,
111 JobID: job.ID,
112 RunID: job.RunID,
113 RepoID: job.RepoID,
114 Purpose: runnerjwt.PurposeAPI,
115 })
116 if err != nil {
117 h.d.Logger.ErrorContext(r.Context(), "runner jwt mint failed", "runner_id", runner.ID, "job_id", job.ID, "error", err)
118 writeAPIError(w, http.StatusInternalServerError, "runner token mint failed")
119 return
120 }
121 checkoutToken, _, err := h.d.RunnerJWT.Mint(runnerjwt.MintParams{
122 RunnerID: runner.ID,
123 JobID: job.ID,
124 RunID: job.RunID,
125 RepoID: job.RepoID,
126 Purpose: runnerjwt.PurposeCheckout,
127 })
128 if err != nil {
129 h.d.Logger.ErrorContext(r.Context(), "runner checkout token mint failed", "runner_id", runner.ID, "job_id", job.ID, "error", err)
130 writeAPIError(w, http.StatusInternalServerError, "runner checkout token mint failed")
131 return
132 }
133 metrics.ActionsRunnerHeartbeatsTotal.WithLabelValues("claimed").Inc()
134 metrics.ActionsRunnerJWTTotal.WithLabelValues("issued").Add(2)
135 writeJSON(w, http.StatusOK, h.presentRunnerClaim(job, steps, resolvedSecrets, token, checkoutToken, time.Unix(claims.Exp, 0)))
136 }
137
138 func (h *Handlers) authenticateRunner(w http.ResponseWriter, r *http.Request) (actionsdb.GetRunnerByTokenHashRow, bool) {
139 const prefix = "Bearer "
140 authz := r.Header.Get("Authorization")
141 if !strings.HasPrefix(authz, prefix) {
142 writeAPIError(w, http.StatusUnauthorized, "runner token required")
143 return actionsdb.GetRunnerByTokenHashRow{}, false
144 }
145 hash, err := runnertoken.HashOf(strings.TrimSpace(strings.TrimPrefix(authz, prefix)))
146 if err != nil {
147 writeAPIError(w, http.StatusUnauthorized, "runner token invalid")
148 return actionsdb.GetRunnerByTokenHashRow{}, false
149 }
150 runner, err := actionsdb.New().GetRunnerByTokenHash(r.Context(), h.d.Pool, hash)
151 if err != nil {
152 writeAPIError(w, http.StatusUnauthorized, "runner token invalid")
153 return actionsdb.GetRunnerByTokenHashRow{}, false
154 }
155 return runner, true
156 }
157
158 func (h *Handlers) allowRunnerHeartbeat(w http.ResponseWriter, r *http.Request, runnerID int64) bool {
159 if h.d.RateLimiter == nil {
160 return true
161 }
162 decision, err := h.d.RateLimiter.Allow(r.Context(), runnerHeartbeatLimit, fmt.Sprintf("runner:%d", runnerID))
163 if err != nil {
164 h.d.Logger.WarnContext(r.Context(), "runner heartbeat rate-limit failed", "runner_id", runnerID, "error", err)
165 }
166 ratelimit.StampHeaders(w, decision)
167 if !decision.Allowed {
168 w.Header().Set("Retry-After", fmt.Sprintf("%d", int(decision.RetryAfter/time.Second)))
169 writeAPIError(w, http.StatusTooManyRequests, "rate limit exceeded")
170 return false
171 }
172 return true
173 }
174
175 func (h *Handlers) claimRunnerJob(
176 ctx context.Context,
177 runnerID int64,
178 labels []string,
179 capacity int32,
180 ) (actionsdb.ClaimQueuedWorkflowJobRow, []actionsdb.ListRunnerStepsForJobRow, map[string]string, bool, error) {
181 q := actionsdb.New()
182 tx, err := h.d.Pool.Begin(ctx)
183 if err != nil {
184 return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err
185 }
186 committed := false
187 defer func() {
188 if !committed {
189 _ = tx.Rollback(ctx)
190 }
191 }()
192
193 if _, err := q.LockRunnerByID(ctx, tx, runnerID); err != nil {
194 return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err
195 }
196 running, err := q.CountRunningJobsForRunner(ctx, tx, runnerID)
197 if err != nil {
198 return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err
199 }
200 if running >= capacity {
201 if _, err := q.HeartbeatRunner(ctx, tx, actionsdb.HeartbeatRunnerParams{
202 ID: runnerID,
203 Labels: labels,
204 Capacity: capacity,
205 Status: actionsdb.WorkflowRunnerStatusBusy,
206 }); err != nil {
207 return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err
208 }
209 if err := tx.Commit(ctx); err != nil {
210 return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err
211 }
212 committed = true
213 return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, nil
214 }
215
216 job, err := q.ClaimQueuedWorkflowJob(ctx, tx, actionsdb.ClaimQueuedWorkflowJobParams{
217 RunnerID: runnerID,
218 Labels: labels,
219 })
220 if err != nil {
221 if !errors.Is(err, pgx.ErrNoRows) {
222 return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err
223 }
224 if _, err := q.HeartbeatRunner(ctx, tx, actionsdb.HeartbeatRunnerParams{
225 ID: runnerID,
226 Labels: labels,
227 Capacity: capacity,
228 Status: actionsdb.WorkflowRunnerStatusIdle,
229 }); err != nil {
230 return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err
231 }
232 if err := tx.Commit(ctx); err != nil {
233 return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err
234 }
235 committed = true
236 return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, nil
237 }
238 run, err := q.StartWorkflowRun(ctx, tx, job.RunID)
239 switch {
240 case err == nil:
241 if err := actionsevents.EmitRunTx(ctx, tx, run, actionsevents.ActionRunning); err != nil {
242 return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err
243 }
244 case errors.Is(err, pgx.ErrNoRows):
245 run, err = q.GetWorkflowRunByID(ctx, tx, job.RunID)
246 if err != nil {
247 return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err
248 }
249 default:
250 return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err
251 }
252 if err := actionsevents.EmitJobTx(ctx, tx, run, claimRowWorkflowJob(job), actionsevents.ActionRunning); err != nil {
253 return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err
254 }
255 steps, err := q.ListRunnerStepsForJob(ctx, tx, job.ID)
256 if err != nil {
257 return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err
258 }
259 resolvedSecrets, err := h.resolveVisibleSecretsFromDB(ctx, tx, job.RepoID)
260 if err != nil {
261 return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err
262 }
263 if err := h.storeJobSecretMaskSnapshot(ctx, tx, job.ID, secretMaskValues(resolvedSecrets)); err != nil {
264 return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err
265 }
266 status := actionsdb.WorkflowRunnerStatusIdle
267 if running+1 >= capacity {
268 status = actionsdb.WorkflowRunnerStatusBusy
269 }
270 if _, err := q.HeartbeatRunner(ctx, tx, actionsdb.HeartbeatRunnerParams{
271 ID: runnerID,
272 Labels: labels,
273 Capacity: capacity,
274 Status: status,
275 }); err != nil {
276 return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err
277 }
278 if err := tx.Commit(ctx); err != nil {
279 return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err
280 }
281 committed = true
282 return job, steps, resolvedSecrets, true, nil
283 }
284
285 type runnerJobAuth struct {
286 Claims runnerjwt.Claims
287 RunnerID int64
288 Job actionsdb.WorkflowJob
289 }
290
291 func (h *Handlers) authenticateRunnerJob(w http.ResponseWriter, r *http.Request) (runnerJobAuth, bool) {
292 if h.d.RunnerJWT == nil {
293 writeAPIError(w, http.StatusServiceUnavailable, "runner API is not configured")
294 return runnerJobAuth{}, false
295 }
296 pathJobID, err := strconv.ParseInt(chi.URLParam(r, "id"), 10, 64)
297 if err != nil || pathJobID <= 0 {
298 writeAPIError(w, http.StatusNotFound, "job not found")
299 return runnerJobAuth{}, false
300 }
301 const prefix = "Bearer "
302 authz := r.Header.Get("Authorization")
303 if !strings.HasPrefix(authz, prefix) {
304 writeAPIError(w, http.StatusUnauthorized, "job token required")
305 return runnerJobAuth{}, false
306 }
307 claims, err := h.d.RunnerJWT.Verify(strings.TrimSpace(strings.TrimPrefix(authz, prefix)))
308 if err != nil {
309 metrics.ActionsRunnerJWTTotal.WithLabelValues("rejected").Inc()
310 writeAPIError(w, http.StatusUnauthorized, "job token invalid")
311 return runnerJobAuth{}, false
312 }
313 if claims.Purpose != "" && claims.Purpose != runnerjwt.PurposeAPI {
314 metrics.ActionsRunnerJWTTotal.WithLabelValues("rejected").Inc()
315 writeAPIError(w, http.StatusUnauthorized, "job token invalid")
316 return runnerJobAuth{}, false
317 }
318 if claims.JobID != pathJobID {
319 writeAPIError(w, http.StatusNotFound, "job not found")
320 return runnerJobAuth{}, false
321 }
322 runnerID, err := claims.RunnerID()
323 if err != nil {
324 metrics.ActionsRunnerJWTTotal.WithLabelValues("rejected").Inc()
325 writeAPIError(w, http.StatusUnauthorized, "job token invalid")
326 return runnerJobAuth{}, false
327 }
328 job, err := actionsdb.New().GetWorkflowJobByID(r.Context(), h.d.Pool, pathJobID)
329 if err != nil {
330 if errors.Is(err, pgx.ErrNoRows) {
331 writeAPIError(w, http.StatusNotFound, "job not found")
332 } else {
333 writeAPIError(w, http.StatusInternalServerError, "job lookup failed")
334 }
335 return runnerJobAuth{}, false
336 }
337 if job.RunID != claims.RunID || !job.RunnerID.Valid || job.RunnerID.Int64 != runnerID {
338 writeAPIError(w, http.StatusNotFound, "job not found")
339 return runnerJobAuth{}, false
340 }
341 if err := runnerjwt.Consume(r.Context(), h.d.Pool, claims); err != nil {
342 if errors.Is(err, runnerjwt.ErrReplay) {
343 metrics.ActionsRunnerJWTTotal.WithLabelValues("replay").Inc()
344 writeAPIError(w, http.StatusUnauthorized, "job token replayed")
345 } else {
346 metrics.ActionsRunnerJWTTotal.WithLabelValues("rejected").Inc()
347 h.d.Logger.ErrorContext(r.Context(), "runner jwt consume failed", "job_id", pathJobID, "error", err)
348 writeAPIError(w, http.StatusUnauthorized, "job token invalid")
349 }
350 return runnerJobAuth{}, false
351 }
352 return runnerJobAuth{Claims: claims, RunnerID: runnerID, Job: job}, true
353 }
354
355 type runnerLogRequest struct {
356 Seq int32 `json:"seq"`
357 Chunk string `json:"chunk"`
358 StepID int64 `json:"step_id,omitempty"`
359 }
360
361 func (h *Handlers) runnerJobLogs(w http.ResponseWriter, r *http.Request) {
362 auth, ok := h.authenticateRunnerJob(w, r)
363 if !ok {
364 return
365 }
366 var body runnerLogRequest
367 if err := decodeJSONBody(r.Body, &body); err != nil {
368 writeAPIError(w, http.StatusBadRequest, "invalid JSON: "+err.Error())
369 return
370 }
371 if body.Seq < 0 {
372 writeAPIError(w, http.StatusBadRequest, "seq must be non-negative")
373 return
374 }
375 chunk, err := decodeBase64(body.Chunk)
376 if err != nil {
377 writeAPIError(w, http.StatusBadRequest, "chunk must be base64")
378 return
379 }
380 if len(chunk) == 0 || len(chunk) > 512*1024 {
381 writeAPIError(w, http.StatusBadRequest, "chunk must be between 1 and 524288 bytes")
382 return
383 }
384 values, err := h.jobSecretMaskValues(r.Context(), auth.Job.ID, auth.Claims.RepoID)
385 if err != nil {
386 h.d.Logger.ErrorContext(r.Context(), "runner log mask resolution failed", "repo_id", auth.Claims.RepoID, "job_id", auth.Claims.JobID, "error", err)
387 writeAPIError(w, http.StatusInternalServerError, "log mask resolution failed")
388 return
389 }
390 stepID, ok := h.resolveLogStep(w, r, auth.Job.ID, body.StepID)
391 if !ok {
392 return
393 }
394 if err := h.appendScrubbedLogChunk(r.Context(), stepID, body.Seq, chunk, values); err != nil {
395 writeAPIError(w, http.StatusInternalServerError, "append log failed")
396 return
397 }
398 h.writeNextTokenResponse(w, r, http.StatusAccepted, auth, map[string]any{"accepted": true})
399 }
400
401 func (h *Handlers) resolveLogStep(w http.ResponseWriter, r *http.Request, jobID, stepID int64) (int64, bool) {
402 q := actionsdb.New()
403 if stepID == 0 {
404 step, err := q.GetFirstStepForJob(r.Context(), h.d.Pool, jobID)
405 if err != nil {
406 writeAPIError(w, http.StatusNotFound, "step not found")
407 return 0, false
408 }
409 return step.ID, true
410 }
411 step, err := q.GetWorkflowStepByID(r.Context(), h.d.Pool, stepID)
412 if err != nil || step.JobID != jobID {
413 writeAPIError(w, http.StatusNotFound, "step not found")
414 return 0, false
415 }
416 return step.ID, true
417 }
418
419 type runnerStatusRequest struct {
420 Status string `json:"status"`
421 Conclusion string `json:"conclusion,omitempty"`
422 StartedAt string `json:"started_at,omitempty"`
423 CompletedAt string `json:"completed_at,omitempty"`
424 }
425
426 func (h *Handlers) runnerJobStatus(w http.ResponseWriter, r *http.Request) {
427 auth, ok := h.authenticateRunnerJob(w, r)
428 if !ok {
429 return
430 }
431 var body runnerStatusRequest
432 if err := decodeJSONBody(r.Body, &body); err != nil {
433 writeAPIError(w, http.StatusBadRequest, "invalid JSON: "+err.Error())
434 return
435 }
436 update, terminal, err := normalizeJobStatusUpdate(auth.Job, body)
437 if err != nil {
438 writeAPIError(w, http.StatusBadRequest, err.Error())
439 return
440 }
441 updated, runCompleted, runConclusion, err := h.applyJobStatus(r.Context(), auth.Job, update)
442 if err != nil {
443 writeAPIError(w, http.StatusInternalServerError, "status update failed")
444 return
445 }
446 if err := h.updateCheckRunForJob(r.Context(), updated); err != nil {
447 h.d.Logger.WarnContext(r.Context(), "runner check_run update failed", "job_id", updated.ID, "error", err)
448 }
449
450 bodyMap := map[string]any{
451 "status": string(updated.Status),
452 "conclusion": nullableConclusion(updated.Conclusion),
453 }
454 if runCompleted {
455 bodyMap["run_status"] = "completed"
456 bodyMap["run_conclusion"] = string(runConclusion)
457 }
458 if terminal {
459 writeJSON(w, http.StatusOK, bodyMap)
460 return
461 }
462 h.writeNextTokenResponse(w, r, http.StatusOK, auth, bodyMap)
463 }
464
465 func (h *Handlers) runnerStepStatus(w http.ResponseWriter, r *http.Request) {
466 auth, ok := h.authenticateRunnerJob(w, r)
467 if !ok {
468 return
469 }
470 stepID, err := strconv.ParseInt(chi.URLParam(r, "step_id"), 10, 64)
471 if err != nil || stepID <= 0 {
472 writeAPIError(w, http.StatusNotFound, "step not found")
473 return
474 }
475 q := actionsdb.New()
476 step, err := q.GetWorkflowStepByID(r.Context(), h.d.Pool, stepID)
477 if err != nil || step.JobID != auth.Job.ID {
478 writeAPIError(w, http.StatusNotFound, "step not found")
479 return
480 }
481 var body runnerStatusRequest
482 if err := decodeJSONBody(r.Body, &body); err != nil {
483 writeAPIError(w, http.StatusBadRequest, "invalid JSON: "+err.Error())
484 return
485 }
486 update, terminal, err := normalizeStepStatusUpdate(step, body)
487 if err != nil {
488 writeAPIError(w, http.StatusBadRequest, err.Error())
489 return
490 }
491 updated, err := h.applyStepStatus(r.Context(), step, update, terminal)
492 if err != nil {
493 writeAPIError(w, http.StatusInternalServerError, "step status update failed")
494 return
495 }
496 recordStepTimeout(step, updated)
497 h.writeNextTokenResponse(w, r, http.StatusOK, auth, map[string]any{
498 "status": string(updated.Status),
499 "conclusion": nullableConclusion(updated.Conclusion),
500 })
501 }
502
503 type normalizedJobStatusUpdate struct {
504 Status actionsdb.WorkflowJobStatus
505 Conclusion actionsdb.NullCheckConclusion
506 StartedAt pgtype.Timestamptz
507 CompletedAt pgtype.Timestamptz
508 }
509
510 func normalizeJobStatusUpdate(job actionsdb.WorkflowJob, body runnerStatusRequest) (normalizedJobStatusUpdate, bool, error) {
511 now := time.Now().UTC()
512 status := actionsdb.WorkflowJobStatus(strings.TrimSpace(body.Status))
513 if status == "" {
514 return normalizedJobStatusUpdate{}, false, errors.New("status is required")
515 }
516 if !validWorkflowJobTransition(job.Status, status) {
517 return normalizedJobStatusUpdate{}, false, fmt.Errorf("invalid status transition %s -> %s", job.Status, status)
518 }
519 startedAt := job.StartedAt
520 if body.StartedAt != "" {
521 t, err := parseTimeOptional(body.StartedAt)
522 if err != nil {
523 return normalizedJobStatusUpdate{}, false, fmt.Errorf("started_at: %w", err)
524 }
525 startedAt = pgtype.Timestamptz{Time: t, Valid: !t.IsZero()}
526 }
527 if !startedAt.Valid && (status == actionsdb.WorkflowJobStatusRunning ||
528 status == actionsdb.WorkflowJobStatusCompleted ||
529 status == actionsdb.WorkflowJobStatusCancelled) {
530 startedAt = pgtype.Timestamptz{Time: now, Valid: true}
531 }
532 completedAt := job.CompletedAt
533 terminal := status == actionsdb.WorkflowJobStatusCompleted || status == actionsdb.WorkflowJobStatusCancelled
534 if body.CompletedAt != "" {
535 t, err := parseTimeOptional(body.CompletedAt)
536 if err != nil {
537 return normalizedJobStatusUpdate{}, false, fmt.Errorf("completed_at: %w", err)
538 }
539 completedAt = pgtype.Timestamptz{Time: t, Valid: !t.IsZero()}
540 }
541 if terminal && !completedAt.Valid {
542 completedAt = pgtype.Timestamptz{Time: now, Valid: true}
543 }
544 conclusion := actionsdb.NullCheckConclusion{}
545 if terminal {
546 c := strings.TrimSpace(body.Conclusion)
547 if c == "" && status == actionsdb.WorkflowJobStatusCancelled {
548 c = "cancelled"
549 }
550 if !validRunnerConclusion(c) {
551 return normalizedJobStatusUpdate{}, false, errors.New("invalid or missing conclusion")
552 }
553 conclusion = actionsdb.NullCheckConclusion{CheckConclusion: actionsdb.CheckConclusion(c), Valid: true}
554 } else if strings.TrimSpace(body.Conclusion) != "" {
555 return normalizedJobStatusUpdate{}, false, errors.New("conclusion is only valid for terminal statuses")
556 }
557 return normalizedJobStatusUpdate{
558 Status: status,
559 Conclusion: conclusion,
560 StartedAt: startedAt,
561 CompletedAt: completedAt,
562 }, terminal, nil
563 }
564
565 func validWorkflowJobTransition(from, to actionsdb.WorkflowJobStatus) bool {
566 switch to {
567 case actionsdb.WorkflowJobStatusRunning:
568 return from == actionsdb.WorkflowJobStatusQueued || from == actionsdb.WorkflowJobStatusRunning
569 case actionsdb.WorkflowJobStatusCompleted:
570 return from == actionsdb.WorkflowJobStatusQueued || from == actionsdb.WorkflowJobStatusRunning || from == actionsdb.WorkflowJobStatusCompleted
571 case actionsdb.WorkflowJobStatusCancelled:
572 return from == actionsdb.WorkflowJobStatusQueued || from == actionsdb.WorkflowJobStatusRunning || from == actionsdb.WorkflowJobStatusCancelled
573 default:
574 return false
575 }
576 }
577
578 type normalizedStepStatusUpdate struct {
579 Status actionsdb.WorkflowStepStatus
580 Conclusion actionsdb.NullCheckConclusion
581 StartedAt pgtype.Timestamptz
582 CompletedAt pgtype.Timestamptz
583 }
584
585 func normalizeStepStatusUpdate(step actionsdb.WorkflowStep, body runnerStatusRequest) (normalizedStepStatusUpdate, bool, error) {
586 now := time.Now().UTC()
587 status := actionsdb.WorkflowStepStatus(strings.TrimSpace(body.Status))
588 if status == "" {
589 return normalizedStepStatusUpdate{}, false, errors.New("status is required")
590 }
591 if !validWorkflowStepTransition(step.Status, status) {
592 return normalizedStepStatusUpdate{}, false, fmt.Errorf("invalid status transition %s -> %s", step.Status, status)
593 }
594 startedAt := step.StartedAt
595 if body.StartedAt != "" {
596 t, err := parseTimeOptional(body.StartedAt)
597 if err != nil {
598 return normalizedStepStatusUpdate{}, false, fmt.Errorf("started_at: %w", err)
599 }
600 startedAt = pgtype.Timestamptz{Time: t, Valid: !t.IsZero()}
601 }
602 if !startedAt.Valid && (status == actionsdb.WorkflowStepStatusRunning ||
603 status == actionsdb.WorkflowStepStatusCompleted ||
604 status == actionsdb.WorkflowStepStatusCancelled) {
605 startedAt = pgtype.Timestamptz{Time: now, Valid: true}
606 }
607 completedAt := step.CompletedAt
608 terminal := status == actionsdb.WorkflowStepStatusCompleted ||
609 status == actionsdb.WorkflowStepStatusCancelled ||
610 status == actionsdb.WorkflowStepStatusSkipped
611 if body.CompletedAt != "" {
612 t, err := parseTimeOptional(body.CompletedAt)
613 if err != nil {
614 return normalizedStepStatusUpdate{}, false, fmt.Errorf("completed_at: %w", err)
615 }
616 completedAt = pgtype.Timestamptz{Time: t, Valid: !t.IsZero()}
617 }
618 if terminal && !completedAt.Valid {
619 completedAt = pgtype.Timestamptz{Time: now, Valid: true}
620 }
621 conclusion := actionsdb.NullCheckConclusion{}
622 if terminal {
623 c := strings.TrimSpace(body.Conclusion)
624 if c == "" && status == actionsdb.WorkflowStepStatusCancelled {
625 c = "cancelled"
626 }
627 if c == "" && status == actionsdb.WorkflowStepStatusSkipped {
628 c = "skipped"
629 }
630 if !validRunnerConclusion(c) {
631 return normalizedStepStatusUpdate{}, false, errors.New("invalid or missing conclusion")
632 }
633 conclusion = actionsdb.NullCheckConclusion{CheckConclusion: actionsdb.CheckConclusion(c), Valid: true}
634 } else if strings.TrimSpace(body.Conclusion) != "" {
635 return normalizedStepStatusUpdate{}, false, errors.New("conclusion is only valid for terminal statuses")
636 }
637 return normalizedStepStatusUpdate{
638 Status: status,
639 Conclusion: conclusion,
640 StartedAt: startedAt,
641 CompletedAt: completedAt,
642 }, terminal, nil
643 }
644
645 func validWorkflowStepTransition(from, to actionsdb.WorkflowStepStatus) bool {
646 switch to {
647 case actionsdb.WorkflowStepStatusRunning:
648 return from == actionsdb.WorkflowStepStatusQueued || from == actionsdb.WorkflowStepStatusRunning
649 case actionsdb.WorkflowStepStatusCompleted:
650 return from == actionsdb.WorkflowStepStatusQueued || from == actionsdb.WorkflowStepStatusRunning || from == actionsdb.WorkflowStepStatusCompleted
651 case actionsdb.WorkflowStepStatusCancelled:
652 return from == actionsdb.WorkflowStepStatusQueued || from == actionsdb.WorkflowStepStatusRunning || from == actionsdb.WorkflowStepStatusCancelled
653 case actionsdb.WorkflowStepStatusSkipped:
654 return from == actionsdb.WorkflowStepStatusQueued || from == actionsdb.WorkflowStepStatusRunning || from == actionsdb.WorkflowStepStatusSkipped
655 default:
656 return false
657 }
658 }
659
660 func recordStepTimeout(before, after actionsdb.WorkflowStep) {
661 if !after.Conclusion.Valid || after.Conclusion.CheckConclusion != actionsdb.CheckConclusionTimedOut {
662 return
663 }
664 if before.Conclusion.Valid && before.Conclusion.CheckConclusion == actionsdb.CheckConclusionTimedOut {
665 return
666 }
667 metrics.ActionsStepTimeoutsTotal.Inc()
668 }
669
670 func (h *Handlers) applyStepStatus(
671 ctx context.Context,
672 step actionsdb.WorkflowStep,
673 update normalizedStepStatusUpdate,
674 terminal bool,
675 ) (actionsdb.WorkflowStep, error) {
676 q := actionsdb.New()
677 tx, err := h.d.Pool.Begin(ctx)
678 if err != nil {
679 return actionsdb.WorkflowStep{}, err
680 }
681 committed := false
682 defer func() {
683 if !committed {
684 _ = tx.Rollback(ctx)
685 }
686 }()
687 updated, err := q.UpdateWorkflowStepStatus(ctx, tx, actionsdb.UpdateWorkflowStepStatusParams{
688 ID: step.ID,
689 Status: update.Status,
690 Conclusion: update.Conclusion,
691 StartedAt: update.StartedAt,
692 CompletedAt: update.CompletedAt,
693 })
694 if err != nil {
695 return actionsdb.WorkflowStep{}, err
696 }
697 shouldNotify := false
698 if terminal && h.d.ObjectStore != nil {
699 if _, err := worker.Enqueue(ctx, tx, finalize.KindWorkflowFinalizeStep, finalize.Payload{StepID: step.ID}, worker.EnqueueOptions{}); err != nil {
700 return actionsdb.WorkflowStep{}, err
701 }
702 shouldNotify = true
703 }
704 if terminal {
705 if err := logstream.NotifyDone(ctx, tx, step.ID); err != nil {
706 return actionsdb.WorkflowStep{}, err
707 }
708 }
709 if err := tx.Commit(ctx); err != nil {
710 return actionsdb.WorkflowStep{}, err
711 }
712 committed = true
713 if shouldNotify {
714 if err := worker.Notify(ctx, h.d.Pool); err != nil && h.d.Logger != nil {
715 h.d.Logger.WarnContext(ctx, "runner step finalizer notify failed", "step_id", step.ID, "error", err)
716 }
717 }
718 return updated, nil
719 }
720
721 func (h *Handlers) applyJobStatus(
722 ctx context.Context,
723 job actionsdb.WorkflowJob,
724 update normalizedJobStatusUpdate,
725 ) (actionsdb.WorkflowJob, bool, actionsdb.CheckConclusion, error) {
726 q := actionsdb.New()
727 tx, err := h.d.Pool.Begin(ctx)
728 if err != nil {
729 return actionsdb.WorkflowJob{}, false, "", err
730 }
731 committed := false
732 defer func() {
733 if !committed {
734 _ = tx.Rollback(ctx)
735 }
736 }()
737 updated, err := q.UpdateWorkflowJobStatus(ctx, tx, actionsdb.UpdateWorkflowJobStatusParams{
738 ID: job.ID,
739 Status: update.Status,
740 Conclusion: update.Conclusion,
741 StartedAt: update.StartedAt,
742 CompletedAt: update.CompletedAt,
743 })
744 if err != nil {
745 return actionsdb.WorkflowJob{}, false, "", err
746 }
747 notifyWorker := false
748 if updated.Status == actionsdb.WorkflowJobStatusCancelled {
749 steps, err := q.CancelOpenWorkflowStepsForJob(ctx, tx, updated.ID)
750 if err != nil {
751 return actionsdb.WorkflowJob{}, false, "", err
752 }
753 for _, step := range steps {
754 if err := logstream.NotifyDone(ctx, tx, step.ID); err != nil {
755 return actionsdb.WorkflowJob{}, false, "", err
756 }
757 if h.d.ObjectStore != nil {
758 if _, err := worker.Enqueue(ctx, tx, finalize.KindWorkflowFinalizeStep, finalize.Payload{StepID: step.ID}, worker.EnqueueOptions{}); err != nil {
759 return actionsdb.WorkflowJob{}, false, "", err
760 }
761 notifyWorker = true
762 }
763 }
764 }
765 jobs, err := q.ListJobsForRun(ctx, tx, updated.RunID)
766 if err != nil {
767 return actionsdb.WorkflowJob{}, false, "", err
768 }
769 runConclusion, complete := deriveWorkflowRunConclusion(jobs)
770 runAfter, err := q.GetWorkflowRunByID(ctx, tx, updated.RunID)
771 if err != nil {
772 return actionsdb.WorkflowJob{}, false, "", err
773 }
774 runBefore := runAfter
775 runStarted := false
776 runTerminalChanged := false
777 if complete {
778 runAfter, err = q.CompleteWorkflowRun(ctx, tx, actionsdb.CompleteWorkflowRunParams{
779 ID: updated.RunID,
780 Conclusion: runConclusion,
781 })
782 if err != nil {
783 return actionsdb.WorkflowJob{}, false, "", err
784 }
785 runTerminalChanged = workflowRunLifecycleChanged(runBefore, runAfter)
786 } else {
787 startedRun, err := q.StartWorkflowRun(ctx, tx, updated.RunID)
788 if err == nil {
789 runAfter = startedRun
790 runStarted = true
791 } else if !errors.Is(err, pgx.ErrNoRows) {
792 return actionsdb.WorkflowJob{}, false, "", err
793 }
794 }
795 if jobLifecycleChanged(job, updated) {
796 if err := actionsevents.EmitJobTx(ctx, tx, runAfter, updated, workflowJobEventAction(updated.Status)); err != nil {
797 return actionsdb.WorkflowJob{}, false, "", err
798 }
799 }
800 if runStarted {
801 if err := actionsevents.EmitRunTx(ctx, tx, runAfter, actionsevents.ActionRunning); err != nil {
802 return actionsdb.WorkflowJob{}, false, "", err
803 }
804 }
805 if complete && runTerminalChanged {
806 if err := actionsevents.EmitRunTx(ctx, tx, runAfter, workflowRunEventAction(runAfter.Status)); err != nil {
807 return actionsdb.WorkflowJob{}, false, "", err
808 }
809 }
810 if err := tx.Commit(ctx); err != nil {
811 return actionsdb.WorkflowJob{}, false, "", err
812 }
813 committed = true
814 if notifyWorker {
815 if err := worker.Notify(ctx, h.d.Pool); err != nil && h.d.Logger != nil {
816 h.d.Logger.WarnContext(ctx, "runner cancelled-step finalizer notify failed", "job_id", updated.ID, "error", err)
817 }
818 }
819 return updated, complete, runConclusion, nil
820 }
821
822 func claimRowWorkflowJob(row actionsdb.ClaimQueuedWorkflowJobRow) actionsdb.WorkflowJob {
823 return actionsdb.WorkflowJob{
824 ID: row.ID,
825 RunID: row.RunID,
826 JobIndex: row.JobIndex,
827 JobKey: row.JobKey,
828 JobName: row.JobName,
829 RunsOn: row.RunsOn,
830 RunnerID: row.RunnerID,
831 NeedsJobs: row.NeedsJobs,
832 IfExpr: row.IfExpr,
833 TimeoutMinutes: row.TimeoutMinutes,
834 Permissions: row.Permissions,
835 JobEnv: row.JobEnv,
836 Status: row.Status,
837 Conclusion: row.Conclusion,
838 CancelRequested: row.CancelRequested,
839 StartedAt: row.StartedAt,
840 CompletedAt: row.CompletedAt,
841 Version: row.Version,
842 CreatedAt: row.CreatedAt,
843 UpdatedAt: row.UpdatedAt,
844 }
845 }
846
847 func jobLifecycleChanged(before, after actionsdb.WorkflowJob) bool {
848 if before.Status != after.Status {
849 return true
850 }
851 if before.Conclusion.Valid != after.Conclusion.Valid {
852 return true
853 }
854 return before.Conclusion.Valid && before.Conclusion.CheckConclusion != after.Conclusion.CheckConclusion
855 }
856
857 func workflowRunLifecycleChanged(before, after actionsdb.WorkflowRun) bool {
858 if before.Status != after.Status {
859 return true
860 }
861 if before.Conclusion.Valid != after.Conclusion.Valid {
862 return true
863 }
864 return before.Conclusion.Valid && before.Conclusion.CheckConclusion != after.Conclusion.CheckConclusion
865 }
866
867 func workflowJobEventAction(status actionsdb.WorkflowJobStatus) string {
868 switch status {
869 case actionsdb.WorkflowJobStatusCancelled:
870 return actionsevents.ActionCancelled
871 case actionsdb.WorkflowJobStatusCompleted, actionsdb.WorkflowJobStatusSkipped:
872 return actionsevents.ActionCompleted
873 case actionsdb.WorkflowJobStatusRunning:
874 return actionsevents.ActionRunning
875 default:
876 return actionsevents.ActionQueued
877 }
878 }
879
880 func workflowRunEventAction(status actionsdb.WorkflowRunStatus) string {
881 if status == actionsdb.WorkflowRunStatusCancelled {
882 return actionsevents.ActionCancelled
883 }
884 return actionsevents.ActionCompleted
885 }
886
887 func deriveWorkflowRunConclusion(jobs []actionsdb.ListJobsForRunRow) (actionsdb.CheckConclusion, bool) {
888 if len(jobs) == 0 {
889 return actionsdb.CheckConclusionFailure, true
890 }
891 worst := actionsdb.CheckConclusionSuccess
892 for _, job := range jobs {
893 switch job.Status {
894 case actionsdb.WorkflowJobStatusCompleted, actionsdb.WorkflowJobStatusCancelled, actionsdb.WorkflowJobStatusSkipped:
895 default:
896 return "", false
897 }
898 if job.Status == actionsdb.WorkflowJobStatusCancelled {
899 worst = actionsdb.CheckConclusionCancelled
900 continue
901 }
902 if !job.Conclusion.Valid {
903 return actionsdb.CheckConclusionFailure, true
904 }
905 c := job.Conclusion.CheckConclusion
906 if c == actionsdb.CheckConclusionFailure ||
907 c == actionsdb.CheckConclusionTimedOut ||
908 c == actionsdb.CheckConclusionActionRequired {
909 return c, true
910 }
911 if c == actionsdb.CheckConclusionCancelled {
912 worst = actionsdb.CheckConclusionCancelled
913 }
914 }
915 return worst, true
916 }
917
918 func (h *Handlers) updateCheckRunForJob(ctx context.Context, job actionsdb.WorkflowJob) error {
919 return actionslifecycle.SyncCheckRunForJob(ctx, actionslifecycle.Deps{Pool: h.d.Pool, Logger: h.d.Logger}, job)
920 }
921
922 type runnerArtifactUploadRequest struct {
923 Name string `json:"name"`
924 SizeBytes int64 `json:"size_bytes"`
925 }
926
927 var artifactNameRE = regexp.MustCompile(`^[A-Za-z0-9._-]+$`)
928
929 func (h *Handlers) runnerJobArtifactUpload(w http.ResponseWriter, r *http.Request) {
930 if h.d.ObjectStore == nil {
931 writeAPIError(w, http.StatusServiceUnavailable, "object storage is not configured")
932 return
933 }
934 auth, ok := h.authenticateRunnerJob(w, r)
935 if !ok {
936 return
937 }
938 var body runnerArtifactUploadRequest
939 if err := decodeJSONBody(r.Body, &body); err != nil {
940 writeAPIError(w, http.StatusBadRequest, "invalid JSON: "+err.Error())
941 return
942 }
943 body.Name = strings.TrimSpace(body.Name)
944 if !validArtifactName(body.Name) {
945 writeAPIError(w, http.StatusBadRequest, "invalid artifact name")
946 return
947 }
948 if body.SizeBytes < 0 {
949 writeAPIError(w, http.StatusBadRequest, "size_bytes must be non-negative")
950 return
951 }
952 objectKey := fmt.Sprintf("actions/runs/%d/artifacts/%s", auth.Claims.RunID, body.Name)
953 artifact, err := actionsdb.New().InsertArtifact(r.Context(), h.d.Pool, actionsdb.InsertArtifactParams{
954 RunID: auth.Claims.RunID,
955 Name: body.Name,
956 ObjectKey: objectKey,
957 ByteCount: body.SizeBytes,
958 ExpiresAt: pgtype.Timestamptz{
959 Time: time.Now().UTC().Add(90 * 24 * time.Hour),
960 Valid: true,
961 },
962 })
963 if err != nil {
964 writeAPIError(w, http.StatusInternalServerError, "artifact create failed")
965 return
966 }
967 uploadURL, err := h.d.ObjectStore.SignedURL(r.Context(), objectKey, 15*time.Minute, http.MethodPut)
968 if err != nil {
969 writeAPIError(w, http.StatusInternalServerError, "artifact upload url failed")
970 return
971 }
972 h.writeNextTokenResponse(w, r, http.StatusCreated, auth, map[string]any{
973 "artifact_id": artifact.ID,
974 "upload_url": uploadURL,
975 })
976 }
977
978 func validArtifactName(name string) bool {
979 return len(name) >= 1 &&
980 len(name) <= 100 &&
981 artifactNameRE.MatchString(name) &&
982 !strings.HasPrefix(name, "..") &&
983 !strings.Contains(name, "/")
984 }
985
986 func (h *Handlers) runnerJobCancelCheck(w http.ResponseWriter, r *http.Request) {
987 auth, ok := h.authenticateRunnerJob(w, r)
988 if !ok {
989 return
990 }
991 h.writeNextTokenResponse(w, r, http.StatusOK, auth, map[string]any{
992 "cancelled": auth.Job.CancelRequested,
993 })
994 }
995
996 type secretResolutionDB interface {
997 actionsdb.DBTX
998 reposdb.DBTX
999 }
1000
1001 func (h *Handlers) resolveVisibleSecrets(ctx context.Context, repoID int64) (map[string]string, error) {
1002 return h.resolveVisibleSecretsFromDB(ctx, h.d.Pool, repoID)
1003 }
1004
1005 func (h *Handlers) resolveVisibleSecretsFromDB(ctx context.Context, db secretResolutionDB, repoID int64) (map[string]string, error) {
1006 if h.d.SecretBox == nil {
1007 return nil, nil
1008 }
1009 repo, err := reposdb.New().GetRepoByID(ctx, db, repoID)
1010 if err != nil {
1011 return nil, err
1012 }
1013 out := map[string]string{}
1014 if repo.OwnerOrgID.Valid {
1015 if err := h.mergeOrgSecrets(ctx, db, repo.OwnerOrgID.Int64, out); err != nil {
1016 return nil, err
1017 }
1018 }
1019 if err := h.mergeRepoSecrets(ctx, db, repo.ID, out); err != nil {
1020 return nil, err
1021 }
1022 if len(out) == 0 {
1023 return nil, nil
1024 }
1025 return out, nil
1026 }
1027
1028 func (h *Handlers) mergeRepoSecrets(ctx context.Context, db actionsdb.DBTX, repoID int64, out map[string]string) error {
1029 q := actionsdb.New()
1030 items, err := q.ListRepoSecrets(ctx, db, pgtype.Int8{Int64: repoID, Valid: true})
1031 if err != nil {
1032 return err
1033 }
1034 for _, item := range items {
1035 row, err := q.GetRepoSecret(ctx, db, actionsdb.GetRepoSecretParams{
1036 RepoID: pgtype.Int8{Int64: repoID, Valid: true},
1037 Name: item.Name,
1038 })
1039 if err != nil {
1040 return err
1041 }
1042 plaintext, err := h.d.SecretBox.Open(row.Ciphertext, row.Nonce)
1043 if err != nil {
1044 return err
1045 }
1046 out[item.Name] = string(plaintext)
1047 }
1048 return nil
1049 }
1050
1051 func (h *Handlers) mergeOrgSecrets(ctx context.Context, db actionsdb.DBTX, orgID int64, out map[string]string) error {
1052 q := actionsdb.New()
1053 items, err := q.ListOrgSecrets(ctx, db, pgtype.Int8{Int64: orgID, Valid: true})
1054 if err != nil {
1055 return err
1056 }
1057 for _, item := range items {
1058 row, err := q.GetOrgSecret(ctx, db, actionsdb.GetOrgSecretParams{
1059 OrgID: pgtype.Int8{Int64: orgID, Valid: true},
1060 Name: item.Name,
1061 })
1062 if err != nil {
1063 return err
1064 }
1065 plaintext, err := h.d.SecretBox.Open(row.Ciphertext, row.Nonce)
1066 if err != nil {
1067 return err
1068 }
1069 out[item.Name] = string(plaintext)
1070 }
1071 return nil
1072 }
1073
1074 func (h *Handlers) logMaskValues(ctx context.Context, repoID int64) ([]string, error) {
1075 resolved, err := h.resolveVisibleSecrets(ctx, repoID)
1076 if err != nil {
1077 return nil, err
1078 }
1079 return secretMaskValues(resolved), nil
1080 }
1081
1082 func (h *Handlers) storeJobSecretMaskSnapshot(ctx context.Context, db actionsdb.DBTX, jobID int64, values []string) error {
1083 if h.d.SecretBox == nil {
1084 return nil
1085 }
1086 if values == nil {
1087 values = []string{}
1088 }
1089 payload, err := json.Marshal(values)
1090 if err != nil {
1091 return err
1092 }
1093 ciphertext, nonce, err := h.d.SecretBox.Seal(payload)
1094 if err != nil {
1095 return err
1096 }
1097 return actionsdb.New().UpsertWorkflowJobSecretMask(ctx, db, actionsdb.UpsertWorkflowJobSecretMaskParams{
1098 JobID: jobID,
1099 Ciphertext: ciphertext,
1100 Nonce: nonce,
1101 })
1102 }
1103
1104 func (h *Handlers) jobSecretMaskValues(ctx context.Context, jobID, repoID int64) ([]string, error) {
1105 if h.d.SecretBox == nil {
1106 return nil, nil
1107 }
1108 row, err := actionsdb.New().GetWorkflowJobSecretMask(ctx, h.d.Pool, jobID)
1109 if err != nil {
1110 if errors.Is(err, pgx.ErrNoRows) {
1111 return h.logMaskValues(ctx, repoID)
1112 }
1113 return nil, err
1114 }
1115 plaintext, err := h.d.SecretBox.Open(row.Ciphertext, row.Nonce)
1116 if err != nil {
1117 return nil, err
1118 }
1119 var values []string
1120 if err := json.Unmarshal(plaintext, &values); err != nil {
1121 return nil, err
1122 }
1123 sort.Strings(values)
1124 return values, nil
1125 }
1126
1127 func secretMaskValues(resolved map[string]string) []string {
1128 if len(resolved) == 0 {
1129 return nil
1130 }
1131 values := make([]string, 0, len(resolved))
1132 for _, value := range resolved {
1133 values = append(values, value)
1134 }
1135 sort.Strings(values)
1136 return values
1137 }
1138
1139 func cloneStringMap(in map[string]string) map[string]string {
1140 if len(in) == 0 {
1141 return nil
1142 }
1143 out := make(map[string]string, len(in))
1144 for k, v := range in {
1145 out[k] = v
1146 }
1147 return out
1148 }
1149
1150 func (h *Handlers) appendScrubbedLogChunk(ctx context.Context, stepID int64, seq int32, chunk []byte, values []string) error {
1151 q := actionsdb.New()
1152 if len(values) == 0 {
1153 row, err := q.AppendStepLogChunk(ctx, h.d.Pool, actionsdb.AppendStepLogChunkParams{
1154 StepID: stepID,
1155 Seq: seq,
1156 Chunk: chunk,
1157 })
1158 if errors.Is(err, pgx.ErrNoRows) {
1159 return nil
1160 }
1161 if err != nil {
1162 return err
1163 }
1164 return logstream.NotifyChunk(ctx, h.d.Pool, stepID, row.Seq)
1165 }
1166
1167 tx, err := h.d.Pool.Begin(ctx)
1168 if err != nil {
1169 return err
1170 }
1171 committed := false
1172 defer func() {
1173 if !committed {
1174 _ = tx.Rollback(ctx)
1175 }
1176 }()
1177
1178 if _, err := q.GetStepLogChunkByStepSeq(ctx, tx, actionsdb.GetStepLogChunkByStepSeqParams{
1179 StepID: stepID,
1180 Seq: seq,
1181 }); err == nil {
1182 if err := tx.Commit(ctx); err != nil {
1183 return err
1184 }
1185 committed = true
1186 return nil
1187 } else if !errors.Is(err, pgx.ErrNoRows) {
1188 return err
1189 }
1190
1191 var replacements uint64
1192 prev, err := q.GetStepLogChunkBefore(ctx, tx, actionsdb.GetStepLogChunkBeforeParams{
1193 StepID: stepID,
1194 Seq: seq,
1195 })
1196 switch {
1197 case err == nil:
1198 if carry := scrubCarryLen(prev.Chunk, values); carry > 0 {
1199 prefix := append([]byte(nil), prev.Chunk[:len(prev.Chunk)-carry]...)
1200 combined := append(append([]byte(nil), prev.Chunk[len(prev.Chunk)-carry:]...), chunk...)
1201 chunk, replacements = scrubChunk(combined, values)
1202 if err := q.UpdateStepLogChunk(ctx, tx, actionsdb.UpdateStepLogChunkParams{
1203 ID: prev.ID,
1204 Chunk: prefix,
1205 }); err != nil {
1206 return err
1207 }
1208 } else {
1209 chunk, replacements = scrubChunk(chunk, values)
1210 }
1211 case errors.Is(err, pgx.ErrNoRows):
1212 chunk, replacements = scrubChunk(chunk, values)
1213 default:
1214 return err
1215 }
1216
1217 row, err := q.AppendStepLogChunk(ctx, tx, actionsdb.AppendStepLogChunkParams{
1218 StepID: stepID,
1219 Seq: seq,
1220 Chunk: chunk,
1221 })
1222 switch {
1223 case err == nil:
1224 if err := logstream.NotifyChunk(ctx, tx, stepID, row.Seq); err != nil {
1225 return err
1226 }
1227 case errors.Is(err, pgx.ErrNoRows):
1228 default:
1229 return err
1230 }
1231 if err := tx.Commit(ctx); err != nil {
1232 return err
1233 }
1234 committed = true
1235 if replacements > 0 {
1236 metrics.ActionsLogScrubReplacementsTotal.WithLabelValues("server").Add(float64(replacements))
1237 }
1238 return nil
1239 }
1240
1241 func scrubChunk(chunk []byte, values []string) ([]byte, uint64) {
1242 if len(values) == 0 {
1243 return chunk, 0
1244 }
1245 s := scrub.New(values)
1246 out := s.Scrub(chunk)
1247 return append(out, s.Flush()...), s.Replacements()
1248 }
1249
1250 func scrubCarryLen(chunk []byte, values []string) int {
1251 if len(chunk) == 0 || len(values) == 0 {
1252 return 0
1253 }
1254 text := string(chunk)
1255 keep := 0
1256 for _, value := range values {
1257 if value == "" {
1258 continue
1259 }
1260 max := len(value) - 1
1261 if max > len(text) {
1262 max = len(text)
1263 }
1264 for n := max; n > keep; n-- {
1265 if strings.HasSuffix(text, value[:n]) {
1266 keep = n
1267 break
1268 }
1269 }
1270 }
1271 return keep
1272 }
1273
1274 func (h *Handlers) writeNextTokenResponse(
1275 w http.ResponseWriter,
1276 r *http.Request,
1277 status int,
1278 auth runnerJobAuth,
1279 body map[string]any,
1280 ) {
1281 token, claims, err := h.d.RunnerJWT.Mint(runnerjwt.MintParams{
1282 RunnerID: auth.RunnerID,
1283 JobID: auth.Claims.JobID,
1284 RunID: auth.Claims.RunID,
1285 RepoID: auth.Claims.RepoID,
1286 Purpose: runnerjwt.PurposeAPI,
1287 })
1288 if err != nil {
1289 h.d.Logger.ErrorContext(r.Context(), "runner next-token mint failed", "job_id", auth.Claims.JobID, "error", err)
1290 writeAPIError(w, http.StatusInternalServerError, "runner token mint failed")
1291 return
1292 }
1293 metrics.ActionsRunnerJWTTotal.WithLabelValues("issued").Inc()
1294 body["next_token"] = token
1295 body["next_token_expires_at"] = time.Unix(claims.Exp, 0).UTC().Format(time.RFC3339)
1296 writeJSON(w, status, body)
1297 }
1298
1299 type runnerClaimResponse struct {
1300 Token string `json:"token"`
1301 ExpiresAt string `json:"expires_at"`
1302 Job runnerJobPayload `json:"job"`
1303 }
1304
1305 type runnerJobPayload struct {
1306 ID int64 `json:"id"`
1307 RunID int64 `json:"run_id"`
1308 RepoID int64 `json:"repo_id"`
1309 RunIndex int64 `json:"run_index"`
1310 WorkflowFile string `json:"workflow_file"`
1311 WorkflowName string `json:"workflow_name"`
1312 CheckoutURL string `json:"checkout_url"`
1313 CheckoutToken string `json:"checkout_token"`
1314 HeadSHA string `json:"head_sha"`
1315 HeadRef string `json:"head_ref"`
1316 Event string `json:"event"`
1317 EventPayload json.RawMessage `json:"event_payload"`
1318 JobKey string `json:"job_key"`
1319 JobName string `json:"job_name"`
1320 RunsOn string `json:"runs_on"`
1321 Needs []string `json:"needs"`
1322 If string `json:"if"`
1323 TimeoutMinutes int32 `json:"timeout_minutes"`
1324 Permissions json.RawMessage `json:"permissions"`
1325 Secrets map[string]string `json:"secrets"`
1326 MaskValues []string `json:"mask_values"`
1327 Env json.RawMessage `json:"env"`
1328 Steps []runnerStep `json:"steps"`
1329 }
1330
1331 type runnerStep struct {
1332 ID int64 `json:"id"`
1333 Index int32 `json:"index"`
1334 StepID string `json:"step_id"`
1335 Name string `json:"name"`
1336 If string `json:"if"`
1337 Run string `json:"run"`
1338 Uses string `json:"uses"`
1339 WorkingDirectory string `json:"working_directory"`
1340 Env json.RawMessage `json:"env"`
1341 With json.RawMessage `json:"with"`
1342 ContinueOnError bool `json:"continue_on_error"`
1343 }
1344
1345 func (h *Handlers) presentRunnerClaim(
1346 job actionsdb.ClaimQueuedWorkflowJobRow,
1347 steps []actionsdb.ListRunnerStepsForJobRow,
1348 resolvedSecrets map[string]string,
1349 token string,
1350 checkoutToken string,
1351 expiresAt time.Time,
1352 ) runnerClaimResponse {
1353 outSteps := make([]runnerStep, 0, len(steps))
1354 for _, step := range steps {
1355 outSteps = append(outSteps, runnerStep{
1356 ID: step.ID,
1357 Index: step.StepIndex,
1358 StepID: step.StepID,
1359 Name: step.StepName,
1360 If: step.IfExpr,
1361 Run: step.RunCommand,
1362 Uses: step.UsesAlias,
1363 WorkingDirectory: step.WorkingDirectory,
1364 Env: rawJSONOrObject(step.StepEnv),
1365 With: rawJSONOrObject(step.StepWith),
1366 ContinueOnError: step.ContinueOnError,
1367 })
1368 }
1369 return runnerClaimResponse{
1370 Token: token,
1371 ExpiresAt: expiresAt.UTC().Format(time.RFC3339),
1372 Job: runnerJobPayload{
1373 ID: job.ID,
1374 RunID: job.RunID,
1375 RepoID: job.RepoID,
1376 RunIndex: job.RunIndex,
1377 WorkflowFile: job.WorkflowFile,
1378 WorkflowName: job.WorkflowName,
1379 CheckoutURL: h.checkoutURL(job.RepoOwner, job.RepoName),
1380 CheckoutToken: checkoutToken,
1381 HeadSHA: job.HeadSha,
1382 HeadRef: job.HeadRef,
1383 Event: string(job.Event),
1384 EventPayload: rawJSONOrObject(job.EventPayload),
1385 JobKey: job.JobKey,
1386 JobName: job.JobName,
1387 RunsOn: job.RunsOn,
1388 Needs: job.NeedsJobs,
1389 If: job.IfExpr,
1390 TimeoutMinutes: job.TimeoutMinutes,
1391 Permissions: rawJSONOrObject(job.Permissions),
1392 Secrets: cloneStringMap(resolvedSecrets),
1393 MaskValues: secretMaskValues(resolvedSecrets),
1394 Env: rawJSONOrObject(job.JobEnv),
1395 Steps: outSteps,
1396 },
1397 }
1398 }
1399
1400 func (h *Handlers) checkoutURL(owner, repoName string) string {
1401 base := strings.TrimRight(strings.TrimSpace(h.d.BaseURL), "/")
1402 if base == "" {
1403 return ""
1404 }
1405 return base + "/" + url.PathEscape(owner) + "/" + url.PathEscape(repoName) + ".git"
1406 }
1407
1408 func rawJSONOrObject(b []byte) json.RawMessage {
1409 if len(b) == 0 || !json.Valid(b) {
1410 return json.RawMessage(`{}`)
1411 }
1412 return json.RawMessage(b)
1413 }
1414
1415 func decodeJSONBody(r io.Reader, v any) error {
1416 dec := json.NewDecoder(r)
1417 dec.DisallowUnknownFields()
1418 return dec.Decode(v)
1419 }
1420
1421 func decodeBase64(s string) ([]byte, error) {
1422 if out, err := base64.StdEncoding.DecodeString(s); err == nil {
1423 return out, nil
1424 }
1425 return base64.RawStdEncoding.DecodeString(s)
1426 }
1427
1428 func validRunnerConclusion(c string) bool {
1429 switch actionsdb.CheckConclusion(c) {
1430 case actionsdb.CheckConclusionSuccess,
1431 actionsdb.CheckConclusionFailure,
1432 actionsdb.CheckConclusionNeutral,
1433 actionsdb.CheckConclusionCancelled,
1434 actionsdb.CheckConclusionSkipped,
1435 actionsdb.CheckConclusionTimedOut,
1436 actionsdb.CheckConclusionActionRequired:
1437 return true
1438 default:
1439 return false
1440 }
1441 }
1442
1443 func nullableConclusion(c actionsdb.NullCheckConclusion) any {
1444 if !c.Valid {
1445 return nil
1446 }
1447 return string(c.CheckConclusion)
1448 }
1449