| 1 | // SPDX-License-Identifier: AGPL-3.0-or-later |
| 2 | |
| 3 | package api |
| 4 | |
| 5 | import ( |
| 6 | "context" |
| 7 | "encoding/base64" |
| 8 | "encoding/json" |
| 9 | "errors" |
| 10 | "fmt" |
| 11 | "io" |
| 12 | "net/http" |
| 13 | "net/url" |
| 14 | "regexp" |
| 15 | "sort" |
| 16 | "strconv" |
| 17 | "strings" |
| 18 | "time" |
| 19 | |
| 20 | "github.com/go-chi/chi/v5" |
| 21 | "github.com/jackc/pgx/v5" |
| 22 | "github.com/jackc/pgx/v5/pgtype" |
| 23 | |
| 24 | actionsevents "github.com/tenseleyFlow/shithub/internal/actions/events" |
| 25 | "github.com/tenseleyFlow/shithub/internal/actions/finalize" |
| 26 | actionslifecycle "github.com/tenseleyFlow/shithub/internal/actions/lifecycle" |
| 27 | "github.com/tenseleyFlow/shithub/internal/actions/logstream" |
| 28 | "github.com/tenseleyFlow/shithub/internal/actions/runnerlabels" |
| 29 | "github.com/tenseleyFlow/shithub/internal/actions/runnertoken" |
| 30 | actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc" |
| 31 | "github.com/tenseleyFlow/shithub/internal/auth/runnerjwt" |
| 32 | "github.com/tenseleyFlow/shithub/internal/infra/metrics" |
| 33 | "github.com/tenseleyFlow/shithub/internal/ratelimit" |
| 34 | reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc" |
| 35 | "github.com/tenseleyFlow/shithub/internal/runner/scrub" |
| 36 | "github.com/tenseleyFlow/shithub/internal/worker" |
| 37 | ) |
| 38 | |
| 39 | var runnerHeartbeatLimit = ratelimit.Policy{ |
| 40 | Scope: "actions:runner_dispatch", |
| 41 | Max: 60, |
| 42 | Window: time.Minute, |
| 43 | } |
| 44 | |
| 45 | func (h *Handlers) mountRunners(r chi.Router) { |
| 46 | r.Post("/api/v1/runners/heartbeat", h.runnerHeartbeat) |
| 47 | r.Post("/api/v1/jobs/{id}/logs", h.runnerJobLogs) |
| 48 | r.Post("/api/v1/jobs/{id}/steps/{step_id}/status", h.runnerStepStatus) |
| 49 | r.Post("/api/v1/jobs/{id}/status", h.runnerJobStatus) |
| 50 | r.Post("/api/v1/jobs/{id}/artifacts/upload", h.runnerJobArtifactUpload) |
| 51 | r.Post("/api/v1/jobs/{id}/cancel-check", h.runnerJobCancelCheck) |
| 52 | } |
| 53 | |
| 54 | type runnerHeartbeatRequest struct { |
| 55 | Labels []string `json:"labels"` |
| 56 | Capacity int `json:"capacity"` |
| 57 | } |
| 58 | |
| 59 | func (h *Handlers) runnerHeartbeat(w http.ResponseWriter, r *http.Request) { |
| 60 | if h.d.RunnerJWT == nil { |
| 61 | writeAPIError(w, http.StatusServiceUnavailable, "runner API is not configured") |
| 62 | return |
| 63 | } |
| 64 | runner, ok := h.authenticateRunner(w, r) |
| 65 | if !ok { |
| 66 | return |
| 67 | } |
| 68 | if !h.allowRunnerHeartbeat(w, r, runner.ID) { |
| 69 | return |
| 70 | } |
| 71 | |
| 72 | var body runnerHeartbeatRequest |
| 73 | dec := json.NewDecoder(r.Body) |
| 74 | dec.DisallowUnknownFields() |
| 75 | if err := dec.Decode(&body); err != nil && !errors.Is(err, io.EOF) { |
| 76 | writeAPIError(w, http.StatusBadRequest, "invalid JSON: "+err.Error()) |
| 77 | return |
| 78 | } |
| 79 | labels := runner.Labels |
| 80 | if body.Labels != nil { |
| 81 | var err error |
| 82 | labels, err = runnerlabels.Normalize(body.Labels) |
| 83 | if err != nil { |
| 84 | writeAPIError(w, http.StatusBadRequest, err.Error()) |
| 85 | return |
| 86 | } |
| 87 | } |
| 88 | capacity := int(runner.Capacity) |
| 89 | if body.Capacity != 0 { |
| 90 | capacity = body.Capacity |
| 91 | } |
| 92 | if capacity < 1 || capacity > 64 { |
| 93 | writeAPIError(w, http.StatusBadRequest, "capacity must be between 1 and 64") |
| 94 | return |
| 95 | } |
| 96 | |
| 97 | job, steps, resolvedSecrets, claimed, err := h.claimRunnerJob(r.Context(), runner.ID, labels, int32(capacity)) |
| 98 | if err != nil { |
| 99 | h.d.Logger.ErrorContext(r.Context(), "runner heartbeat claim failed", "runner_id", runner.ID, "error", err) |
| 100 | writeAPIError(w, http.StatusInternalServerError, "runner heartbeat failed") |
| 101 | return |
| 102 | } |
| 103 | if !claimed { |
| 104 | metrics.ActionsRunnerHeartbeatsTotal.WithLabelValues("no_job").Inc() |
| 105 | w.WriteHeader(http.StatusNoContent) |
| 106 | return |
| 107 | } |
| 108 | |
| 109 | token, claims, err := h.d.RunnerJWT.Mint(runnerjwt.MintParams{ |
| 110 | RunnerID: runner.ID, |
| 111 | JobID: job.ID, |
| 112 | RunID: job.RunID, |
| 113 | RepoID: job.RepoID, |
| 114 | Purpose: runnerjwt.PurposeAPI, |
| 115 | }) |
| 116 | if err != nil { |
| 117 | h.d.Logger.ErrorContext(r.Context(), "runner jwt mint failed", "runner_id", runner.ID, "job_id", job.ID, "error", err) |
| 118 | writeAPIError(w, http.StatusInternalServerError, "runner token mint failed") |
| 119 | return |
| 120 | } |
| 121 | checkoutToken, _, err := h.d.RunnerJWT.Mint(runnerjwt.MintParams{ |
| 122 | RunnerID: runner.ID, |
| 123 | JobID: job.ID, |
| 124 | RunID: job.RunID, |
| 125 | RepoID: job.RepoID, |
| 126 | Purpose: runnerjwt.PurposeCheckout, |
| 127 | }) |
| 128 | if err != nil { |
| 129 | h.d.Logger.ErrorContext(r.Context(), "runner checkout token mint failed", "runner_id", runner.ID, "job_id", job.ID, "error", err) |
| 130 | writeAPIError(w, http.StatusInternalServerError, "runner checkout token mint failed") |
| 131 | return |
| 132 | } |
| 133 | metrics.ActionsRunnerHeartbeatsTotal.WithLabelValues("claimed").Inc() |
| 134 | metrics.ActionsRunnerJWTTotal.WithLabelValues("issued").Add(2) |
| 135 | writeJSON(w, http.StatusOK, h.presentRunnerClaim(job, steps, resolvedSecrets, token, checkoutToken, time.Unix(claims.Exp, 0))) |
| 136 | } |
| 137 | |
| 138 | func (h *Handlers) authenticateRunner(w http.ResponseWriter, r *http.Request) (actionsdb.GetRunnerByTokenHashRow, bool) { |
| 139 | const prefix = "Bearer " |
| 140 | authz := r.Header.Get("Authorization") |
| 141 | if !strings.HasPrefix(authz, prefix) { |
| 142 | writeAPIError(w, http.StatusUnauthorized, "runner token required") |
| 143 | return actionsdb.GetRunnerByTokenHashRow{}, false |
| 144 | } |
| 145 | hash, err := runnertoken.HashOf(strings.TrimSpace(strings.TrimPrefix(authz, prefix))) |
| 146 | if err != nil { |
| 147 | writeAPIError(w, http.StatusUnauthorized, "runner token invalid") |
| 148 | return actionsdb.GetRunnerByTokenHashRow{}, false |
| 149 | } |
| 150 | runner, err := actionsdb.New().GetRunnerByTokenHash(r.Context(), h.d.Pool, hash) |
| 151 | if err != nil { |
| 152 | writeAPIError(w, http.StatusUnauthorized, "runner token invalid") |
| 153 | return actionsdb.GetRunnerByTokenHashRow{}, false |
| 154 | } |
| 155 | return runner, true |
| 156 | } |
| 157 | |
| 158 | func (h *Handlers) allowRunnerHeartbeat(w http.ResponseWriter, r *http.Request, runnerID int64) bool { |
| 159 | if h.d.RateLimiter == nil { |
| 160 | return true |
| 161 | } |
| 162 | decision, err := h.d.RateLimiter.Allow(r.Context(), runnerHeartbeatLimit, fmt.Sprintf("runner:%d", runnerID)) |
| 163 | if err != nil { |
| 164 | h.d.Logger.WarnContext(r.Context(), "runner heartbeat rate-limit failed", "runner_id", runnerID, "error", err) |
| 165 | } |
| 166 | ratelimit.StampHeaders(w, decision) |
| 167 | if !decision.Allowed { |
| 168 | w.Header().Set("Retry-After", fmt.Sprintf("%d", int(decision.RetryAfter/time.Second))) |
| 169 | writeAPIError(w, http.StatusTooManyRequests, "rate limit exceeded") |
| 170 | return false |
| 171 | } |
| 172 | return true |
| 173 | } |
| 174 | |
| 175 | func (h *Handlers) claimRunnerJob( |
| 176 | ctx context.Context, |
| 177 | runnerID int64, |
| 178 | labels []string, |
| 179 | capacity int32, |
| 180 | ) (actionsdb.ClaimQueuedWorkflowJobRow, []actionsdb.ListRunnerStepsForJobRow, map[string]string, bool, error) { |
| 181 | q := actionsdb.New() |
| 182 | tx, err := h.d.Pool.Begin(ctx) |
| 183 | if err != nil { |
| 184 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err |
| 185 | } |
| 186 | committed := false |
| 187 | defer func() { |
| 188 | if !committed { |
| 189 | _ = tx.Rollback(ctx) |
| 190 | } |
| 191 | }() |
| 192 | |
| 193 | if _, err := q.LockRunnerByID(ctx, tx, runnerID); err != nil { |
| 194 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err |
| 195 | } |
| 196 | running, err := q.CountRunningJobsForRunner(ctx, tx, runnerID) |
| 197 | if err != nil { |
| 198 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err |
| 199 | } |
| 200 | if running >= capacity { |
| 201 | if _, err := q.HeartbeatRunner(ctx, tx, actionsdb.HeartbeatRunnerParams{ |
| 202 | ID: runnerID, |
| 203 | Labels: labels, |
| 204 | Capacity: capacity, |
| 205 | Status: actionsdb.WorkflowRunnerStatusBusy, |
| 206 | }); err != nil { |
| 207 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err |
| 208 | } |
| 209 | if err := tx.Commit(ctx); err != nil { |
| 210 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err |
| 211 | } |
| 212 | committed = true |
| 213 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, nil |
| 214 | } |
| 215 | |
| 216 | job, err := q.ClaimQueuedWorkflowJob(ctx, tx, actionsdb.ClaimQueuedWorkflowJobParams{ |
| 217 | RunnerID: runnerID, |
| 218 | Labels: labels, |
| 219 | }) |
| 220 | if err != nil { |
| 221 | if !errors.Is(err, pgx.ErrNoRows) { |
| 222 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err |
| 223 | } |
| 224 | if _, err := q.HeartbeatRunner(ctx, tx, actionsdb.HeartbeatRunnerParams{ |
| 225 | ID: runnerID, |
| 226 | Labels: labels, |
| 227 | Capacity: capacity, |
| 228 | Status: actionsdb.WorkflowRunnerStatusIdle, |
| 229 | }); err != nil { |
| 230 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err |
| 231 | } |
| 232 | if err := tx.Commit(ctx); err != nil { |
| 233 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err |
| 234 | } |
| 235 | committed = true |
| 236 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, nil |
| 237 | } |
| 238 | run, err := q.StartWorkflowRun(ctx, tx, job.RunID) |
| 239 | switch { |
| 240 | case err == nil: |
| 241 | if err := actionsevents.EmitRunTx(ctx, tx, run, actionsevents.ActionRunning); err != nil { |
| 242 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err |
| 243 | } |
| 244 | case errors.Is(err, pgx.ErrNoRows): |
| 245 | run, err = q.GetWorkflowRunByID(ctx, tx, job.RunID) |
| 246 | if err != nil { |
| 247 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err |
| 248 | } |
| 249 | default: |
| 250 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err |
| 251 | } |
| 252 | if err := actionsevents.EmitJobTx(ctx, tx, run, claimRowWorkflowJob(job), actionsevents.ActionRunning); err != nil { |
| 253 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err |
| 254 | } |
| 255 | steps, err := q.ListRunnerStepsForJob(ctx, tx, job.ID) |
| 256 | if err != nil { |
| 257 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err |
| 258 | } |
| 259 | resolvedSecrets, err := h.resolveVisibleSecretsFromDB(ctx, tx, job.RepoID) |
| 260 | if err != nil { |
| 261 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err |
| 262 | } |
| 263 | if err := h.storeJobSecretMaskSnapshot(ctx, tx, job.ID, secretMaskValues(resolvedSecrets)); err != nil { |
| 264 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err |
| 265 | } |
| 266 | status := actionsdb.WorkflowRunnerStatusIdle |
| 267 | if running+1 >= capacity { |
| 268 | status = actionsdb.WorkflowRunnerStatusBusy |
| 269 | } |
| 270 | if _, err := q.HeartbeatRunner(ctx, tx, actionsdb.HeartbeatRunnerParams{ |
| 271 | ID: runnerID, |
| 272 | Labels: labels, |
| 273 | Capacity: capacity, |
| 274 | Status: status, |
| 275 | }); err != nil { |
| 276 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err |
| 277 | } |
| 278 | if err := tx.Commit(ctx); err != nil { |
| 279 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err |
| 280 | } |
| 281 | committed = true |
| 282 | return job, steps, resolvedSecrets, true, nil |
| 283 | } |
| 284 | |
| 285 | type runnerJobAuth struct { |
| 286 | Claims runnerjwt.Claims |
| 287 | RunnerID int64 |
| 288 | Job actionsdb.WorkflowJob |
| 289 | } |
| 290 | |
| 291 | func (h *Handlers) authenticateRunnerJob(w http.ResponseWriter, r *http.Request) (runnerJobAuth, bool) { |
| 292 | if h.d.RunnerJWT == nil { |
| 293 | writeAPIError(w, http.StatusServiceUnavailable, "runner API is not configured") |
| 294 | return runnerJobAuth{}, false |
| 295 | } |
| 296 | pathJobID, err := strconv.ParseInt(chi.URLParam(r, "id"), 10, 64) |
| 297 | if err != nil || pathJobID <= 0 { |
| 298 | writeAPIError(w, http.StatusNotFound, "job not found") |
| 299 | return runnerJobAuth{}, false |
| 300 | } |
| 301 | const prefix = "Bearer " |
| 302 | authz := r.Header.Get("Authorization") |
| 303 | if !strings.HasPrefix(authz, prefix) { |
| 304 | writeAPIError(w, http.StatusUnauthorized, "job token required") |
| 305 | return runnerJobAuth{}, false |
| 306 | } |
| 307 | claims, err := h.d.RunnerJWT.Verify(strings.TrimSpace(strings.TrimPrefix(authz, prefix))) |
| 308 | if err != nil { |
| 309 | metrics.ActionsRunnerJWTTotal.WithLabelValues("rejected").Inc() |
| 310 | writeAPIError(w, http.StatusUnauthorized, "job token invalid") |
| 311 | return runnerJobAuth{}, false |
| 312 | } |
| 313 | if claims.Purpose != "" && claims.Purpose != runnerjwt.PurposeAPI { |
| 314 | metrics.ActionsRunnerJWTTotal.WithLabelValues("rejected").Inc() |
| 315 | writeAPIError(w, http.StatusUnauthorized, "job token invalid") |
| 316 | return runnerJobAuth{}, false |
| 317 | } |
| 318 | if claims.JobID != pathJobID { |
| 319 | writeAPIError(w, http.StatusNotFound, "job not found") |
| 320 | return runnerJobAuth{}, false |
| 321 | } |
| 322 | runnerID, err := claims.RunnerID() |
| 323 | if err != nil { |
| 324 | metrics.ActionsRunnerJWTTotal.WithLabelValues("rejected").Inc() |
| 325 | writeAPIError(w, http.StatusUnauthorized, "job token invalid") |
| 326 | return runnerJobAuth{}, false |
| 327 | } |
| 328 | job, err := actionsdb.New().GetWorkflowJobByID(r.Context(), h.d.Pool, pathJobID) |
| 329 | if err != nil { |
| 330 | if errors.Is(err, pgx.ErrNoRows) { |
| 331 | writeAPIError(w, http.StatusNotFound, "job not found") |
| 332 | } else { |
| 333 | writeAPIError(w, http.StatusInternalServerError, "job lookup failed") |
| 334 | } |
| 335 | return runnerJobAuth{}, false |
| 336 | } |
| 337 | if job.RunID != claims.RunID || !job.RunnerID.Valid || job.RunnerID.Int64 != runnerID { |
| 338 | writeAPIError(w, http.StatusNotFound, "job not found") |
| 339 | return runnerJobAuth{}, false |
| 340 | } |
| 341 | if err := runnerjwt.Consume(r.Context(), h.d.Pool, claims); err != nil { |
| 342 | if errors.Is(err, runnerjwt.ErrReplay) { |
| 343 | metrics.ActionsRunnerJWTTotal.WithLabelValues("replay").Inc() |
| 344 | writeAPIError(w, http.StatusUnauthorized, "job token replayed") |
| 345 | } else { |
| 346 | metrics.ActionsRunnerJWTTotal.WithLabelValues("rejected").Inc() |
| 347 | h.d.Logger.ErrorContext(r.Context(), "runner jwt consume failed", "job_id", pathJobID, "error", err) |
| 348 | writeAPIError(w, http.StatusUnauthorized, "job token invalid") |
| 349 | } |
| 350 | return runnerJobAuth{}, false |
| 351 | } |
| 352 | return runnerJobAuth{Claims: claims, RunnerID: runnerID, Job: job}, true |
| 353 | } |
| 354 | |
| 355 | type runnerLogRequest struct { |
| 356 | Seq int32 `json:"seq"` |
| 357 | Chunk string `json:"chunk"` |
| 358 | StepID int64 `json:"step_id,omitempty"` |
| 359 | } |
| 360 | |
| 361 | func (h *Handlers) runnerJobLogs(w http.ResponseWriter, r *http.Request) { |
| 362 | auth, ok := h.authenticateRunnerJob(w, r) |
| 363 | if !ok { |
| 364 | return |
| 365 | } |
| 366 | var body runnerLogRequest |
| 367 | if err := decodeJSONBody(r.Body, &body); err != nil { |
| 368 | writeAPIError(w, http.StatusBadRequest, "invalid JSON: "+err.Error()) |
| 369 | return |
| 370 | } |
| 371 | if body.Seq < 0 { |
| 372 | writeAPIError(w, http.StatusBadRequest, "seq must be non-negative") |
| 373 | return |
| 374 | } |
| 375 | chunk, err := decodeBase64(body.Chunk) |
| 376 | if err != nil { |
| 377 | writeAPIError(w, http.StatusBadRequest, "chunk must be base64") |
| 378 | return |
| 379 | } |
| 380 | if len(chunk) == 0 || len(chunk) > 512*1024 { |
| 381 | writeAPIError(w, http.StatusBadRequest, "chunk must be between 1 and 524288 bytes") |
| 382 | return |
| 383 | } |
| 384 | values, err := h.jobSecretMaskValues(r.Context(), auth.Job.ID, auth.Claims.RepoID) |
| 385 | if err != nil { |
| 386 | h.d.Logger.ErrorContext(r.Context(), "runner log mask resolution failed", "repo_id", auth.Claims.RepoID, "job_id", auth.Claims.JobID, "error", err) |
| 387 | writeAPIError(w, http.StatusInternalServerError, "log mask resolution failed") |
| 388 | return |
| 389 | } |
| 390 | stepID, ok := h.resolveLogStep(w, r, auth.Job.ID, body.StepID) |
| 391 | if !ok { |
| 392 | return |
| 393 | } |
| 394 | if err := h.appendScrubbedLogChunk(r.Context(), stepID, body.Seq, chunk, values); err != nil { |
| 395 | writeAPIError(w, http.StatusInternalServerError, "append log failed") |
| 396 | return |
| 397 | } |
| 398 | h.writeNextTokenResponse(w, r, http.StatusAccepted, auth, map[string]any{"accepted": true}) |
| 399 | } |
| 400 | |
| 401 | func (h *Handlers) resolveLogStep(w http.ResponseWriter, r *http.Request, jobID, stepID int64) (int64, bool) { |
| 402 | q := actionsdb.New() |
| 403 | if stepID == 0 { |
| 404 | step, err := q.GetFirstStepForJob(r.Context(), h.d.Pool, jobID) |
| 405 | if err != nil { |
| 406 | writeAPIError(w, http.StatusNotFound, "step not found") |
| 407 | return 0, false |
| 408 | } |
| 409 | return step.ID, true |
| 410 | } |
| 411 | step, err := q.GetWorkflowStepByID(r.Context(), h.d.Pool, stepID) |
| 412 | if err != nil || step.JobID != jobID { |
| 413 | writeAPIError(w, http.StatusNotFound, "step not found") |
| 414 | return 0, false |
| 415 | } |
| 416 | return step.ID, true |
| 417 | } |
| 418 | |
| 419 | type runnerStatusRequest struct { |
| 420 | Status string `json:"status"` |
| 421 | Conclusion string `json:"conclusion,omitempty"` |
| 422 | StartedAt string `json:"started_at,omitempty"` |
| 423 | CompletedAt string `json:"completed_at,omitempty"` |
| 424 | } |
| 425 | |
| 426 | func (h *Handlers) runnerJobStatus(w http.ResponseWriter, r *http.Request) { |
| 427 | auth, ok := h.authenticateRunnerJob(w, r) |
| 428 | if !ok { |
| 429 | return |
| 430 | } |
| 431 | var body runnerStatusRequest |
| 432 | if err := decodeJSONBody(r.Body, &body); err != nil { |
| 433 | writeAPIError(w, http.StatusBadRequest, "invalid JSON: "+err.Error()) |
| 434 | return |
| 435 | } |
| 436 | update, terminal, err := normalizeJobStatusUpdate(auth.Job, body) |
| 437 | if err != nil { |
| 438 | writeAPIError(w, http.StatusBadRequest, err.Error()) |
| 439 | return |
| 440 | } |
| 441 | updated, runCompleted, runConclusion, err := h.applyJobStatus(r.Context(), auth.Job, update) |
| 442 | if err != nil { |
| 443 | writeAPIError(w, http.StatusInternalServerError, "status update failed") |
| 444 | return |
| 445 | } |
| 446 | if err := h.updateCheckRunForJob(r.Context(), updated); err != nil { |
| 447 | h.d.Logger.WarnContext(r.Context(), "runner check_run update failed", "job_id", updated.ID, "error", err) |
| 448 | } |
| 449 | |
| 450 | bodyMap := map[string]any{ |
| 451 | "status": string(updated.Status), |
| 452 | "conclusion": nullableConclusion(updated.Conclusion), |
| 453 | } |
| 454 | if runCompleted { |
| 455 | bodyMap["run_status"] = "completed" |
| 456 | bodyMap["run_conclusion"] = string(runConclusion) |
| 457 | } |
| 458 | if terminal { |
| 459 | writeJSON(w, http.StatusOK, bodyMap) |
| 460 | return |
| 461 | } |
| 462 | h.writeNextTokenResponse(w, r, http.StatusOK, auth, bodyMap) |
| 463 | } |
| 464 | |
| 465 | func (h *Handlers) runnerStepStatus(w http.ResponseWriter, r *http.Request) { |
| 466 | auth, ok := h.authenticateRunnerJob(w, r) |
| 467 | if !ok { |
| 468 | return |
| 469 | } |
| 470 | stepID, err := strconv.ParseInt(chi.URLParam(r, "step_id"), 10, 64) |
| 471 | if err != nil || stepID <= 0 { |
| 472 | writeAPIError(w, http.StatusNotFound, "step not found") |
| 473 | return |
| 474 | } |
| 475 | q := actionsdb.New() |
| 476 | step, err := q.GetWorkflowStepByID(r.Context(), h.d.Pool, stepID) |
| 477 | if err != nil || step.JobID != auth.Job.ID { |
| 478 | writeAPIError(w, http.StatusNotFound, "step not found") |
| 479 | return |
| 480 | } |
| 481 | var body runnerStatusRequest |
| 482 | if err := decodeJSONBody(r.Body, &body); err != nil { |
| 483 | writeAPIError(w, http.StatusBadRequest, "invalid JSON: "+err.Error()) |
| 484 | return |
| 485 | } |
| 486 | update, terminal, err := normalizeStepStatusUpdate(step, body) |
| 487 | if err != nil { |
| 488 | writeAPIError(w, http.StatusBadRequest, err.Error()) |
| 489 | return |
| 490 | } |
| 491 | updated, err := h.applyStepStatus(r.Context(), step, update, terminal) |
| 492 | if err != nil { |
| 493 | writeAPIError(w, http.StatusInternalServerError, "step status update failed") |
| 494 | return |
| 495 | } |
| 496 | recordStepTimeout(step, updated) |
| 497 | h.writeNextTokenResponse(w, r, http.StatusOK, auth, map[string]any{ |
| 498 | "status": string(updated.Status), |
| 499 | "conclusion": nullableConclusion(updated.Conclusion), |
| 500 | }) |
| 501 | } |
| 502 | |
| 503 | type normalizedJobStatusUpdate struct { |
| 504 | Status actionsdb.WorkflowJobStatus |
| 505 | Conclusion actionsdb.NullCheckConclusion |
| 506 | StartedAt pgtype.Timestamptz |
| 507 | CompletedAt pgtype.Timestamptz |
| 508 | } |
| 509 | |
| 510 | func normalizeJobStatusUpdate(job actionsdb.WorkflowJob, body runnerStatusRequest) (normalizedJobStatusUpdate, bool, error) { |
| 511 | now := time.Now().UTC() |
| 512 | status := actionsdb.WorkflowJobStatus(strings.TrimSpace(body.Status)) |
| 513 | if status == "" { |
| 514 | return normalizedJobStatusUpdate{}, false, errors.New("status is required") |
| 515 | } |
| 516 | if !validWorkflowJobTransition(job.Status, status) { |
| 517 | return normalizedJobStatusUpdate{}, false, fmt.Errorf("invalid status transition %s -> %s", job.Status, status) |
| 518 | } |
| 519 | startedAt := job.StartedAt |
| 520 | if body.StartedAt != "" { |
| 521 | t, err := parseTimeOptional(body.StartedAt) |
| 522 | if err != nil { |
| 523 | return normalizedJobStatusUpdate{}, false, fmt.Errorf("started_at: %w", err) |
| 524 | } |
| 525 | startedAt = pgtype.Timestamptz{Time: t, Valid: !t.IsZero()} |
| 526 | } |
| 527 | if !startedAt.Valid && (status == actionsdb.WorkflowJobStatusRunning || |
| 528 | status == actionsdb.WorkflowJobStatusCompleted || |
| 529 | status == actionsdb.WorkflowJobStatusCancelled) { |
| 530 | startedAt = pgtype.Timestamptz{Time: now, Valid: true} |
| 531 | } |
| 532 | completedAt := job.CompletedAt |
| 533 | terminal := status == actionsdb.WorkflowJobStatusCompleted || status == actionsdb.WorkflowJobStatusCancelled |
| 534 | if body.CompletedAt != "" { |
| 535 | t, err := parseTimeOptional(body.CompletedAt) |
| 536 | if err != nil { |
| 537 | return normalizedJobStatusUpdate{}, false, fmt.Errorf("completed_at: %w", err) |
| 538 | } |
| 539 | completedAt = pgtype.Timestamptz{Time: t, Valid: !t.IsZero()} |
| 540 | } |
| 541 | if terminal && !completedAt.Valid { |
| 542 | completedAt = pgtype.Timestamptz{Time: now, Valid: true} |
| 543 | } |
| 544 | conclusion := actionsdb.NullCheckConclusion{} |
| 545 | if terminal { |
| 546 | c := strings.TrimSpace(body.Conclusion) |
| 547 | if c == "" && status == actionsdb.WorkflowJobStatusCancelled { |
| 548 | c = "cancelled" |
| 549 | } |
| 550 | if !validRunnerConclusion(c) { |
| 551 | return normalizedJobStatusUpdate{}, false, errors.New("invalid or missing conclusion") |
| 552 | } |
| 553 | conclusion = actionsdb.NullCheckConclusion{CheckConclusion: actionsdb.CheckConclusion(c), Valid: true} |
| 554 | } else if strings.TrimSpace(body.Conclusion) != "" { |
| 555 | return normalizedJobStatusUpdate{}, false, errors.New("conclusion is only valid for terminal statuses") |
| 556 | } |
| 557 | return normalizedJobStatusUpdate{ |
| 558 | Status: status, |
| 559 | Conclusion: conclusion, |
| 560 | StartedAt: startedAt, |
| 561 | CompletedAt: completedAt, |
| 562 | }, terminal, nil |
| 563 | } |
| 564 | |
| 565 | func validWorkflowJobTransition(from, to actionsdb.WorkflowJobStatus) bool { |
| 566 | switch to { |
| 567 | case actionsdb.WorkflowJobStatusRunning: |
| 568 | return from == actionsdb.WorkflowJobStatusQueued || from == actionsdb.WorkflowJobStatusRunning |
| 569 | case actionsdb.WorkflowJobStatusCompleted: |
| 570 | return from == actionsdb.WorkflowJobStatusQueued || from == actionsdb.WorkflowJobStatusRunning || from == actionsdb.WorkflowJobStatusCompleted |
| 571 | case actionsdb.WorkflowJobStatusCancelled: |
| 572 | return from == actionsdb.WorkflowJobStatusQueued || from == actionsdb.WorkflowJobStatusRunning || from == actionsdb.WorkflowJobStatusCancelled |
| 573 | default: |
| 574 | return false |
| 575 | } |
| 576 | } |
| 577 | |
| 578 | type normalizedStepStatusUpdate struct { |
| 579 | Status actionsdb.WorkflowStepStatus |
| 580 | Conclusion actionsdb.NullCheckConclusion |
| 581 | StartedAt pgtype.Timestamptz |
| 582 | CompletedAt pgtype.Timestamptz |
| 583 | } |
| 584 | |
| 585 | func normalizeStepStatusUpdate(step actionsdb.WorkflowStep, body runnerStatusRequest) (normalizedStepStatusUpdate, bool, error) { |
| 586 | now := time.Now().UTC() |
| 587 | status := actionsdb.WorkflowStepStatus(strings.TrimSpace(body.Status)) |
| 588 | if status == "" { |
| 589 | return normalizedStepStatusUpdate{}, false, errors.New("status is required") |
| 590 | } |
| 591 | if !validWorkflowStepTransition(step.Status, status) { |
| 592 | return normalizedStepStatusUpdate{}, false, fmt.Errorf("invalid status transition %s -> %s", step.Status, status) |
| 593 | } |
| 594 | startedAt := step.StartedAt |
| 595 | if body.StartedAt != "" { |
| 596 | t, err := parseTimeOptional(body.StartedAt) |
| 597 | if err != nil { |
| 598 | return normalizedStepStatusUpdate{}, false, fmt.Errorf("started_at: %w", err) |
| 599 | } |
| 600 | startedAt = pgtype.Timestamptz{Time: t, Valid: !t.IsZero()} |
| 601 | } |
| 602 | if !startedAt.Valid && (status == actionsdb.WorkflowStepStatusRunning || |
| 603 | status == actionsdb.WorkflowStepStatusCompleted || |
| 604 | status == actionsdb.WorkflowStepStatusCancelled) { |
| 605 | startedAt = pgtype.Timestamptz{Time: now, Valid: true} |
| 606 | } |
| 607 | completedAt := step.CompletedAt |
| 608 | terminal := status == actionsdb.WorkflowStepStatusCompleted || |
| 609 | status == actionsdb.WorkflowStepStatusCancelled || |
| 610 | status == actionsdb.WorkflowStepStatusSkipped |
| 611 | if body.CompletedAt != "" { |
| 612 | t, err := parseTimeOptional(body.CompletedAt) |
| 613 | if err != nil { |
| 614 | return normalizedStepStatusUpdate{}, false, fmt.Errorf("completed_at: %w", err) |
| 615 | } |
| 616 | completedAt = pgtype.Timestamptz{Time: t, Valid: !t.IsZero()} |
| 617 | } |
| 618 | if terminal && !completedAt.Valid { |
| 619 | completedAt = pgtype.Timestamptz{Time: now, Valid: true} |
| 620 | } |
| 621 | conclusion := actionsdb.NullCheckConclusion{} |
| 622 | if terminal { |
| 623 | c := strings.TrimSpace(body.Conclusion) |
| 624 | if c == "" && status == actionsdb.WorkflowStepStatusCancelled { |
| 625 | c = "cancelled" |
| 626 | } |
| 627 | if c == "" && status == actionsdb.WorkflowStepStatusSkipped { |
| 628 | c = "skipped" |
| 629 | } |
| 630 | if !validRunnerConclusion(c) { |
| 631 | return normalizedStepStatusUpdate{}, false, errors.New("invalid or missing conclusion") |
| 632 | } |
| 633 | conclusion = actionsdb.NullCheckConclusion{CheckConclusion: actionsdb.CheckConclusion(c), Valid: true} |
| 634 | } else if strings.TrimSpace(body.Conclusion) != "" { |
| 635 | return normalizedStepStatusUpdate{}, false, errors.New("conclusion is only valid for terminal statuses") |
| 636 | } |
| 637 | return normalizedStepStatusUpdate{ |
| 638 | Status: status, |
| 639 | Conclusion: conclusion, |
| 640 | StartedAt: startedAt, |
| 641 | CompletedAt: completedAt, |
| 642 | }, terminal, nil |
| 643 | } |
| 644 | |
| 645 | func validWorkflowStepTransition(from, to actionsdb.WorkflowStepStatus) bool { |
| 646 | switch to { |
| 647 | case actionsdb.WorkflowStepStatusRunning: |
| 648 | return from == actionsdb.WorkflowStepStatusQueued || from == actionsdb.WorkflowStepStatusRunning |
| 649 | case actionsdb.WorkflowStepStatusCompleted: |
| 650 | return from == actionsdb.WorkflowStepStatusQueued || from == actionsdb.WorkflowStepStatusRunning || from == actionsdb.WorkflowStepStatusCompleted |
| 651 | case actionsdb.WorkflowStepStatusCancelled: |
| 652 | return from == actionsdb.WorkflowStepStatusQueued || from == actionsdb.WorkflowStepStatusRunning || from == actionsdb.WorkflowStepStatusCancelled |
| 653 | case actionsdb.WorkflowStepStatusSkipped: |
| 654 | return from == actionsdb.WorkflowStepStatusQueued || from == actionsdb.WorkflowStepStatusRunning || from == actionsdb.WorkflowStepStatusSkipped |
| 655 | default: |
| 656 | return false |
| 657 | } |
| 658 | } |
| 659 | |
| 660 | func recordStepTimeout(before, after actionsdb.WorkflowStep) { |
| 661 | if !after.Conclusion.Valid || after.Conclusion.CheckConclusion != actionsdb.CheckConclusionTimedOut { |
| 662 | return |
| 663 | } |
| 664 | if before.Conclusion.Valid && before.Conclusion.CheckConclusion == actionsdb.CheckConclusionTimedOut { |
| 665 | return |
| 666 | } |
| 667 | metrics.ActionsStepTimeoutsTotal.Inc() |
| 668 | } |
| 669 | |
| 670 | func (h *Handlers) applyStepStatus( |
| 671 | ctx context.Context, |
| 672 | step actionsdb.WorkflowStep, |
| 673 | update normalizedStepStatusUpdate, |
| 674 | terminal bool, |
| 675 | ) (actionsdb.WorkflowStep, error) { |
| 676 | q := actionsdb.New() |
| 677 | tx, err := h.d.Pool.Begin(ctx) |
| 678 | if err != nil { |
| 679 | return actionsdb.WorkflowStep{}, err |
| 680 | } |
| 681 | committed := false |
| 682 | defer func() { |
| 683 | if !committed { |
| 684 | _ = tx.Rollback(ctx) |
| 685 | } |
| 686 | }() |
| 687 | updated, err := q.UpdateWorkflowStepStatus(ctx, tx, actionsdb.UpdateWorkflowStepStatusParams{ |
| 688 | ID: step.ID, |
| 689 | Status: update.Status, |
| 690 | Conclusion: update.Conclusion, |
| 691 | StartedAt: update.StartedAt, |
| 692 | CompletedAt: update.CompletedAt, |
| 693 | }) |
| 694 | if err != nil { |
| 695 | return actionsdb.WorkflowStep{}, err |
| 696 | } |
| 697 | shouldNotify := false |
| 698 | if terminal && h.d.ObjectStore != nil { |
| 699 | if _, err := worker.Enqueue(ctx, tx, finalize.KindWorkflowFinalizeStep, finalize.Payload{StepID: step.ID}, worker.EnqueueOptions{}); err != nil { |
| 700 | return actionsdb.WorkflowStep{}, err |
| 701 | } |
| 702 | shouldNotify = true |
| 703 | } |
| 704 | if terminal { |
| 705 | if err := logstream.NotifyDone(ctx, tx, step.ID); err != nil { |
| 706 | return actionsdb.WorkflowStep{}, err |
| 707 | } |
| 708 | } |
| 709 | if err := tx.Commit(ctx); err != nil { |
| 710 | return actionsdb.WorkflowStep{}, err |
| 711 | } |
| 712 | committed = true |
| 713 | if shouldNotify { |
| 714 | if err := worker.Notify(ctx, h.d.Pool); err != nil && h.d.Logger != nil { |
| 715 | h.d.Logger.WarnContext(ctx, "runner step finalizer notify failed", "step_id", step.ID, "error", err) |
| 716 | } |
| 717 | } |
| 718 | return updated, nil |
| 719 | } |
| 720 | |
| 721 | func (h *Handlers) applyJobStatus( |
| 722 | ctx context.Context, |
| 723 | job actionsdb.WorkflowJob, |
| 724 | update normalizedJobStatusUpdate, |
| 725 | ) (actionsdb.WorkflowJob, bool, actionsdb.CheckConclusion, error) { |
| 726 | q := actionsdb.New() |
| 727 | tx, err := h.d.Pool.Begin(ctx) |
| 728 | if err != nil { |
| 729 | return actionsdb.WorkflowJob{}, false, "", err |
| 730 | } |
| 731 | committed := false |
| 732 | defer func() { |
| 733 | if !committed { |
| 734 | _ = tx.Rollback(ctx) |
| 735 | } |
| 736 | }() |
| 737 | updated, err := q.UpdateWorkflowJobStatus(ctx, tx, actionsdb.UpdateWorkflowJobStatusParams{ |
| 738 | ID: job.ID, |
| 739 | Status: update.Status, |
| 740 | Conclusion: update.Conclusion, |
| 741 | StartedAt: update.StartedAt, |
| 742 | CompletedAt: update.CompletedAt, |
| 743 | }) |
| 744 | if err != nil { |
| 745 | return actionsdb.WorkflowJob{}, false, "", err |
| 746 | } |
| 747 | notifyWorker := false |
| 748 | if updated.Status == actionsdb.WorkflowJobStatusCancelled { |
| 749 | steps, err := q.CancelOpenWorkflowStepsForJob(ctx, tx, updated.ID) |
| 750 | if err != nil { |
| 751 | return actionsdb.WorkflowJob{}, false, "", err |
| 752 | } |
| 753 | for _, step := range steps { |
| 754 | if err := logstream.NotifyDone(ctx, tx, step.ID); err != nil { |
| 755 | return actionsdb.WorkflowJob{}, false, "", err |
| 756 | } |
| 757 | if h.d.ObjectStore != nil { |
| 758 | if _, err := worker.Enqueue(ctx, tx, finalize.KindWorkflowFinalizeStep, finalize.Payload{StepID: step.ID}, worker.EnqueueOptions{}); err != nil { |
| 759 | return actionsdb.WorkflowJob{}, false, "", err |
| 760 | } |
| 761 | notifyWorker = true |
| 762 | } |
| 763 | } |
| 764 | } |
| 765 | jobs, err := q.ListJobsForRun(ctx, tx, updated.RunID) |
| 766 | if err != nil { |
| 767 | return actionsdb.WorkflowJob{}, false, "", err |
| 768 | } |
| 769 | runConclusion, complete := deriveWorkflowRunConclusion(jobs) |
| 770 | runAfter, err := q.GetWorkflowRunByID(ctx, tx, updated.RunID) |
| 771 | if err != nil { |
| 772 | return actionsdb.WorkflowJob{}, false, "", err |
| 773 | } |
| 774 | runBefore := runAfter |
| 775 | runStarted := false |
| 776 | runTerminalChanged := false |
| 777 | if complete { |
| 778 | runAfter, err = q.CompleteWorkflowRun(ctx, tx, actionsdb.CompleteWorkflowRunParams{ |
| 779 | ID: updated.RunID, |
| 780 | Conclusion: runConclusion, |
| 781 | }) |
| 782 | if err != nil { |
| 783 | return actionsdb.WorkflowJob{}, false, "", err |
| 784 | } |
| 785 | runTerminalChanged = workflowRunLifecycleChanged(runBefore, runAfter) |
| 786 | } else { |
| 787 | startedRun, err := q.StartWorkflowRun(ctx, tx, updated.RunID) |
| 788 | if err == nil { |
| 789 | runAfter = startedRun |
| 790 | runStarted = true |
| 791 | } else if !errors.Is(err, pgx.ErrNoRows) { |
| 792 | return actionsdb.WorkflowJob{}, false, "", err |
| 793 | } |
| 794 | } |
| 795 | if jobLifecycleChanged(job, updated) { |
| 796 | if err := actionsevents.EmitJobTx(ctx, tx, runAfter, updated, workflowJobEventAction(updated.Status)); err != nil { |
| 797 | return actionsdb.WorkflowJob{}, false, "", err |
| 798 | } |
| 799 | } |
| 800 | if runStarted { |
| 801 | if err := actionsevents.EmitRunTx(ctx, tx, runAfter, actionsevents.ActionRunning); err != nil { |
| 802 | return actionsdb.WorkflowJob{}, false, "", err |
| 803 | } |
| 804 | } |
| 805 | if complete && runTerminalChanged { |
| 806 | if err := actionsevents.EmitRunTx(ctx, tx, runAfter, workflowRunEventAction(runAfter.Status)); err != nil { |
| 807 | return actionsdb.WorkflowJob{}, false, "", err |
| 808 | } |
| 809 | } |
| 810 | if err := tx.Commit(ctx); err != nil { |
| 811 | return actionsdb.WorkflowJob{}, false, "", err |
| 812 | } |
| 813 | committed = true |
| 814 | if notifyWorker { |
| 815 | if err := worker.Notify(ctx, h.d.Pool); err != nil && h.d.Logger != nil { |
| 816 | h.d.Logger.WarnContext(ctx, "runner cancelled-step finalizer notify failed", "job_id", updated.ID, "error", err) |
| 817 | } |
| 818 | } |
| 819 | return updated, complete, runConclusion, nil |
| 820 | } |
| 821 | |
| 822 | func claimRowWorkflowJob(row actionsdb.ClaimQueuedWorkflowJobRow) actionsdb.WorkflowJob { |
| 823 | return actionsdb.WorkflowJob{ |
| 824 | ID: row.ID, |
| 825 | RunID: row.RunID, |
| 826 | JobIndex: row.JobIndex, |
| 827 | JobKey: row.JobKey, |
| 828 | JobName: row.JobName, |
| 829 | RunsOn: row.RunsOn, |
| 830 | RunnerID: row.RunnerID, |
| 831 | NeedsJobs: row.NeedsJobs, |
| 832 | IfExpr: row.IfExpr, |
| 833 | TimeoutMinutes: row.TimeoutMinutes, |
| 834 | Permissions: row.Permissions, |
| 835 | JobEnv: row.JobEnv, |
| 836 | Status: row.Status, |
| 837 | Conclusion: row.Conclusion, |
| 838 | CancelRequested: row.CancelRequested, |
| 839 | StartedAt: row.StartedAt, |
| 840 | CompletedAt: row.CompletedAt, |
| 841 | Version: row.Version, |
| 842 | CreatedAt: row.CreatedAt, |
| 843 | UpdatedAt: row.UpdatedAt, |
| 844 | } |
| 845 | } |
| 846 | |
| 847 | func jobLifecycleChanged(before, after actionsdb.WorkflowJob) bool { |
| 848 | if before.Status != after.Status { |
| 849 | return true |
| 850 | } |
| 851 | if before.Conclusion.Valid != after.Conclusion.Valid { |
| 852 | return true |
| 853 | } |
| 854 | return before.Conclusion.Valid && before.Conclusion.CheckConclusion != after.Conclusion.CheckConclusion |
| 855 | } |
| 856 | |
| 857 | func workflowRunLifecycleChanged(before, after actionsdb.WorkflowRun) bool { |
| 858 | if before.Status != after.Status { |
| 859 | return true |
| 860 | } |
| 861 | if before.Conclusion.Valid != after.Conclusion.Valid { |
| 862 | return true |
| 863 | } |
| 864 | return before.Conclusion.Valid && before.Conclusion.CheckConclusion != after.Conclusion.CheckConclusion |
| 865 | } |
| 866 | |
| 867 | func workflowJobEventAction(status actionsdb.WorkflowJobStatus) string { |
| 868 | switch status { |
| 869 | case actionsdb.WorkflowJobStatusCancelled: |
| 870 | return actionsevents.ActionCancelled |
| 871 | case actionsdb.WorkflowJobStatusCompleted, actionsdb.WorkflowJobStatusSkipped: |
| 872 | return actionsevents.ActionCompleted |
| 873 | case actionsdb.WorkflowJobStatusRunning: |
| 874 | return actionsevents.ActionRunning |
| 875 | default: |
| 876 | return actionsevents.ActionQueued |
| 877 | } |
| 878 | } |
| 879 | |
| 880 | func workflowRunEventAction(status actionsdb.WorkflowRunStatus) string { |
| 881 | if status == actionsdb.WorkflowRunStatusCancelled { |
| 882 | return actionsevents.ActionCancelled |
| 883 | } |
| 884 | return actionsevents.ActionCompleted |
| 885 | } |
| 886 | |
| 887 | func deriveWorkflowRunConclusion(jobs []actionsdb.ListJobsForRunRow) (actionsdb.CheckConclusion, bool) { |
| 888 | if len(jobs) == 0 { |
| 889 | return actionsdb.CheckConclusionFailure, true |
| 890 | } |
| 891 | worst := actionsdb.CheckConclusionSuccess |
| 892 | for _, job := range jobs { |
| 893 | switch job.Status { |
| 894 | case actionsdb.WorkflowJobStatusCompleted, actionsdb.WorkflowJobStatusCancelled, actionsdb.WorkflowJobStatusSkipped: |
| 895 | default: |
| 896 | return "", false |
| 897 | } |
| 898 | if job.Status == actionsdb.WorkflowJobStatusCancelled { |
| 899 | worst = actionsdb.CheckConclusionCancelled |
| 900 | continue |
| 901 | } |
| 902 | if !job.Conclusion.Valid { |
| 903 | return actionsdb.CheckConclusionFailure, true |
| 904 | } |
| 905 | c := job.Conclusion.CheckConclusion |
| 906 | if c == actionsdb.CheckConclusionFailure || |
| 907 | c == actionsdb.CheckConclusionTimedOut || |
| 908 | c == actionsdb.CheckConclusionActionRequired { |
| 909 | return c, true |
| 910 | } |
| 911 | if c == actionsdb.CheckConclusionCancelled { |
| 912 | worst = actionsdb.CheckConclusionCancelled |
| 913 | } |
| 914 | } |
| 915 | return worst, true |
| 916 | } |
| 917 | |
| 918 | func (h *Handlers) updateCheckRunForJob(ctx context.Context, job actionsdb.WorkflowJob) error { |
| 919 | return actionslifecycle.SyncCheckRunForJob(ctx, actionslifecycle.Deps{Pool: h.d.Pool, Logger: h.d.Logger}, job) |
| 920 | } |
| 921 | |
| 922 | type runnerArtifactUploadRequest struct { |
| 923 | Name string `json:"name"` |
| 924 | SizeBytes int64 `json:"size_bytes"` |
| 925 | } |
| 926 | |
| 927 | var artifactNameRE = regexp.MustCompile(`^[A-Za-z0-9._-]+$`) |
| 928 | |
| 929 | func (h *Handlers) runnerJobArtifactUpload(w http.ResponseWriter, r *http.Request) { |
| 930 | if h.d.ObjectStore == nil { |
| 931 | writeAPIError(w, http.StatusServiceUnavailable, "object storage is not configured") |
| 932 | return |
| 933 | } |
| 934 | auth, ok := h.authenticateRunnerJob(w, r) |
| 935 | if !ok { |
| 936 | return |
| 937 | } |
| 938 | var body runnerArtifactUploadRequest |
| 939 | if err := decodeJSONBody(r.Body, &body); err != nil { |
| 940 | writeAPIError(w, http.StatusBadRequest, "invalid JSON: "+err.Error()) |
| 941 | return |
| 942 | } |
| 943 | body.Name = strings.TrimSpace(body.Name) |
| 944 | if !validArtifactName(body.Name) { |
| 945 | writeAPIError(w, http.StatusBadRequest, "invalid artifact name") |
| 946 | return |
| 947 | } |
| 948 | if body.SizeBytes < 0 { |
| 949 | writeAPIError(w, http.StatusBadRequest, "size_bytes must be non-negative") |
| 950 | return |
| 951 | } |
| 952 | objectKey := fmt.Sprintf("actions/runs/%d/artifacts/%s", auth.Claims.RunID, body.Name) |
| 953 | artifact, err := actionsdb.New().InsertArtifact(r.Context(), h.d.Pool, actionsdb.InsertArtifactParams{ |
| 954 | RunID: auth.Claims.RunID, |
| 955 | Name: body.Name, |
| 956 | ObjectKey: objectKey, |
| 957 | ByteCount: body.SizeBytes, |
| 958 | ExpiresAt: pgtype.Timestamptz{ |
| 959 | Time: time.Now().UTC().Add(90 * 24 * time.Hour), |
| 960 | Valid: true, |
| 961 | }, |
| 962 | }) |
| 963 | if err != nil { |
| 964 | writeAPIError(w, http.StatusInternalServerError, "artifact create failed") |
| 965 | return |
| 966 | } |
| 967 | uploadURL, err := h.d.ObjectStore.SignedURL(r.Context(), objectKey, 15*time.Minute, http.MethodPut) |
| 968 | if err != nil { |
| 969 | writeAPIError(w, http.StatusInternalServerError, "artifact upload url failed") |
| 970 | return |
| 971 | } |
| 972 | h.writeNextTokenResponse(w, r, http.StatusCreated, auth, map[string]any{ |
| 973 | "artifact_id": artifact.ID, |
| 974 | "upload_url": uploadURL, |
| 975 | }) |
| 976 | } |
| 977 | |
| 978 | func validArtifactName(name string) bool { |
| 979 | return len(name) >= 1 && |
| 980 | len(name) <= 100 && |
| 981 | artifactNameRE.MatchString(name) && |
| 982 | !strings.HasPrefix(name, "..") && |
| 983 | !strings.Contains(name, "/") |
| 984 | } |
| 985 | |
| 986 | func (h *Handlers) runnerJobCancelCheck(w http.ResponseWriter, r *http.Request) { |
| 987 | auth, ok := h.authenticateRunnerJob(w, r) |
| 988 | if !ok { |
| 989 | return |
| 990 | } |
| 991 | h.writeNextTokenResponse(w, r, http.StatusOK, auth, map[string]any{ |
| 992 | "cancelled": auth.Job.CancelRequested, |
| 993 | }) |
| 994 | } |
| 995 | |
| 996 | type secretResolutionDB interface { |
| 997 | actionsdb.DBTX |
| 998 | reposdb.DBTX |
| 999 | } |
| 1000 | |
| 1001 | func (h *Handlers) resolveVisibleSecrets(ctx context.Context, repoID int64) (map[string]string, error) { |
| 1002 | return h.resolveVisibleSecretsFromDB(ctx, h.d.Pool, repoID) |
| 1003 | } |
| 1004 | |
| 1005 | func (h *Handlers) resolveVisibleSecretsFromDB(ctx context.Context, db secretResolutionDB, repoID int64) (map[string]string, error) { |
| 1006 | if h.d.SecretBox == nil { |
| 1007 | return nil, nil |
| 1008 | } |
| 1009 | repo, err := reposdb.New().GetRepoByID(ctx, db, repoID) |
| 1010 | if err != nil { |
| 1011 | return nil, err |
| 1012 | } |
| 1013 | out := map[string]string{} |
| 1014 | if repo.OwnerOrgID.Valid { |
| 1015 | if err := h.mergeOrgSecrets(ctx, db, repo.OwnerOrgID.Int64, out); err != nil { |
| 1016 | return nil, err |
| 1017 | } |
| 1018 | } |
| 1019 | if err := h.mergeRepoSecrets(ctx, db, repo.ID, out); err != nil { |
| 1020 | return nil, err |
| 1021 | } |
| 1022 | if len(out) == 0 { |
| 1023 | return nil, nil |
| 1024 | } |
| 1025 | return out, nil |
| 1026 | } |
| 1027 | |
| 1028 | func (h *Handlers) mergeRepoSecrets(ctx context.Context, db actionsdb.DBTX, repoID int64, out map[string]string) error { |
| 1029 | q := actionsdb.New() |
| 1030 | items, err := q.ListRepoSecrets(ctx, db, pgtype.Int8{Int64: repoID, Valid: true}) |
| 1031 | if err != nil { |
| 1032 | return err |
| 1033 | } |
| 1034 | for _, item := range items { |
| 1035 | row, err := q.GetRepoSecret(ctx, db, actionsdb.GetRepoSecretParams{ |
| 1036 | RepoID: pgtype.Int8{Int64: repoID, Valid: true}, |
| 1037 | Name: item.Name, |
| 1038 | }) |
| 1039 | if err != nil { |
| 1040 | return err |
| 1041 | } |
| 1042 | plaintext, err := h.d.SecretBox.Open(row.Ciphertext, row.Nonce) |
| 1043 | if err != nil { |
| 1044 | return err |
| 1045 | } |
| 1046 | out[item.Name] = string(plaintext) |
| 1047 | } |
| 1048 | return nil |
| 1049 | } |
| 1050 | |
| 1051 | func (h *Handlers) mergeOrgSecrets(ctx context.Context, db actionsdb.DBTX, orgID int64, out map[string]string) error { |
| 1052 | q := actionsdb.New() |
| 1053 | items, err := q.ListOrgSecrets(ctx, db, pgtype.Int8{Int64: orgID, Valid: true}) |
| 1054 | if err != nil { |
| 1055 | return err |
| 1056 | } |
| 1057 | for _, item := range items { |
| 1058 | row, err := q.GetOrgSecret(ctx, db, actionsdb.GetOrgSecretParams{ |
| 1059 | OrgID: pgtype.Int8{Int64: orgID, Valid: true}, |
| 1060 | Name: item.Name, |
| 1061 | }) |
| 1062 | if err != nil { |
| 1063 | return err |
| 1064 | } |
| 1065 | plaintext, err := h.d.SecretBox.Open(row.Ciphertext, row.Nonce) |
| 1066 | if err != nil { |
| 1067 | return err |
| 1068 | } |
| 1069 | out[item.Name] = string(plaintext) |
| 1070 | } |
| 1071 | return nil |
| 1072 | } |
| 1073 | |
| 1074 | func (h *Handlers) logMaskValues(ctx context.Context, repoID int64) ([]string, error) { |
| 1075 | resolved, err := h.resolveVisibleSecrets(ctx, repoID) |
| 1076 | if err != nil { |
| 1077 | return nil, err |
| 1078 | } |
| 1079 | return secretMaskValues(resolved), nil |
| 1080 | } |
| 1081 | |
| 1082 | func (h *Handlers) storeJobSecretMaskSnapshot(ctx context.Context, db actionsdb.DBTX, jobID int64, values []string) error { |
| 1083 | if h.d.SecretBox == nil { |
| 1084 | return nil |
| 1085 | } |
| 1086 | if values == nil { |
| 1087 | values = []string{} |
| 1088 | } |
| 1089 | payload, err := json.Marshal(values) |
| 1090 | if err != nil { |
| 1091 | return err |
| 1092 | } |
| 1093 | ciphertext, nonce, err := h.d.SecretBox.Seal(payload) |
| 1094 | if err != nil { |
| 1095 | return err |
| 1096 | } |
| 1097 | return actionsdb.New().UpsertWorkflowJobSecretMask(ctx, db, actionsdb.UpsertWorkflowJobSecretMaskParams{ |
| 1098 | JobID: jobID, |
| 1099 | Ciphertext: ciphertext, |
| 1100 | Nonce: nonce, |
| 1101 | }) |
| 1102 | } |
| 1103 | |
| 1104 | func (h *Handlers) jobSecretMaskValues(ctx context.Context, jobID, repoID int64) ([]string, error) { |
| 1105 | if h.d.SecretBox == nil { |
| 1106 | return nil, nil |
| 1107 | } |
| 1108 | row, err := actionsdb.New().GetWorkflowJobSecretMask(ctx, h.d.Pool, jobID) |
| 1109 | if err != nil { |
| 1110 | if errors.Is(err, pgx.ErrNoRows) { |
| 1111 | return h.logMaskValues(ctx, repoID) |
| 1112 | } |
| 1113 | return nil, err |
| 1114 | } |
| 1115 | plaintext, err := h.d.SecretBox.Open(row.Ciphertext, row.Nonce) |
| 1116 | if err != nil { |
| 1117 | return nil, err |
| 1118 | } |
| 1119 | var values []string |
| 1120 | if err := json.Unmarshal(plaintext, &values); err != nil { |
| 1121 | return nil, err |
| 1122 | } |
| 1123 | sort.Strings(values) |
| 1124 | return values, nil |
| 1125 | } |
| 1126 | |
| 1127 | func secretMaskValues(resolved map[string]string) []string { |
| 1128 | if len(resolved) == 0 { |
| 1129 | return nil |
| 1130 | } |
| 1131 | values := make([]string, 0, len(resolved)) |
| 1132 | for _, value := range resolved { |
| 1133 | values = append(values, value) |
| 1134 | } |
| 1135 | sort.Strings(values) |
| 1136 | return values |
| 1137 | } |
| 1138 | |
| 1139 | func cloneStringMap(in map[string]string) map[string]string { |
| 1140 | if len(in) == 0 { |
| 1141 | return nil |
| 1142 | } |
| 1143 | out := make(map[string]string, len(in)) |
| 1144 | for k, v := range in { |
| 1145 | out[k] = v |
| 1146 | } |
| 1147 | return out |
| 1148 | } |
| 1149 | |
| 1150 | func (h *Handlers) appendScrubbedLogChunk(ctx context.Context, stepID int64, seq int32, chunk []byte, values []string) error { |
| 1151 | q := actionsdb.New() |
| 1152 | if len(values) == 0 { |
| 1153 | row, err := q.AppendStepLogChunk(ctx, h.d.Pool, actionsdb.AppendStepLogChunkParams{ |
| 1154 | StepID: stepID, |
| 1155 | Seq: seq, |
| 1156 | Chunk: chunk, |
| 1157 | }) |
| 1158 | if errors.Is(err, pgx.ErrNoRows) { |
| 1159 | return nil |
| 1160 | } |
| 1161 | if err != nil { |
| 1162 | return err |
| 1163 | } |
| 1164 | return logstream.NotifyChunk(ctx, h.d.Pool, stepID, row.Seq) |
| 1165 | } |
| 1166 | |
| 1167 | tx, err := h.d.Pool.Begin(ctx) |
| 1168 | if err != nil { |
| 1169 | return err |
| 1170 | } |
| 1171 | committed := false |
| 1172 | defer func() { |
| 1173 | if !committed { |
| 1174 | _ = tx.Rollback(ctx) |
| 1175 | } |
| 1176 | }() |
| 1177 | |
| 1178 | if _, err := q.GetStepLogChunkByStepSeq(ctx, tx, actionsdb.GetStepLogChunkByStepSeqParams{ |
| 1179 | StepID: stepID, |
| 1180 | Seq: seq, |
| 1181 | }); err == nil { |
| 1182 | if err := tx.Commit(ctx); err != nil { |
| 1183 | return err |
| 1184 | } |
| 1185 | committed = true |
| 1186 | return nil |
| 1187 | } else if !errors.Is(err, pgx.ErrNoRows) { |
| 1188 | return err |
| 1189 | } |
| 1190 | |
| 1191 | var replacements uint64 |
| 1192 | prev, err := q.GetStepLogChunkBefore(ctx, tx, actionsdb.GetStepLogChunkBeforeParams{ |
| 1193 | StepID: stepID, |
| 1194 | Seq: seq, |
| 1195 | }) |
| 1196 | switch { |
| 1197 | case err == nil: |
| 1198 | if carry := scrubCarryLen(prev.Chunk, values); carry > 0 { |
| 1199 | prefix := append([]byte(nil), prev.Chunk[:len(prev.Chunk)-carry]...) |
| 1200 | combined := append(append([]byte(nil), prev.Chunk[len(prev.Chunk)-carry:]...), chunk...) |
| 1201 | chunk, replacements = scrubChunk(combined, values) |
| 1202 | if err := q.UpdateStepLogChunk(ctx, tx, actionsdb.UpdateStepLogChunkParams{ |
| 1203 | ID: prev.ID, |
| 1204 | Chunk: prefix, |
| 1205 | }); err != nil { |
| 1206 | return err |
| 1207 | } |
| 1208 | } else { |
| 1209 | chunk, replacements = scrubChunk(chunk, values) |
| 1210 | } |
| 1211 | case errors.Is(err, pgx.ErrNoRows): |
| 1212 | chunk, replacements = scrubChunk(chunk, values) |
| 1213 | default: |
| 1214 | return err |
| 1215 | } |
| 1216 | |
| 1217 | row, err := q.AppendStepLogChunk(ctx, tx, actionsdb.AppendStepLogChunkParams{ |
| 1218 | StepID: stepID, |
| 1219 | Seq: seq, |
| 1220 | Chunk: chunk, |
| 1221 | }) |
| 1222 | switch { |
| 1223 | case err == nil: |
| 1224 | if err := logstream.NotifyChunk(ctx, tx, stepID, row.Seq); err != nil { |
| 1225 | return err |
| 1226 | } |
| 1227 | case errors.Is(err, pgx.ErrNoRows): |
| 1228 | default: |
| 1229 | return err |
| 1230 | } |
| 1231 | if err := tx.Commit(ctx); err != nil { |
| 1232 | return err |
| 1233 | } |
| 1234 | committed = true |
| 1235 | if replacements > 0 { |
| 1236 | metrics.ActionsLogScrubReplacementsTotal.WithLabelValues("server").Add(float64(replacements)) |
| 1237 | } |
| 1238 | return nil |
| 1239 | } |
| 1240 | |
| 1241 | func scrubChunk(chunk []byte, values []string) ([]byte, uint64) { |
| 1242 | if len(values) == 0 { |
| 1243 | return chunk, 0 |
| 1244 | } |
| 1245 | s := scrub.New(values) |
| 1246 | out := s.Scrub(chunk) |
| 1247 | return append(out, s.Flush()...), s.Replacements() |
| 1248 | } |
| 1249 | |
| 1250 | func scrubCarryLen(chunk []byte, values []string) int { |
| 1251 | if len(chunk) == 0 || len(values) == 0 { |
| 1252 | return 0 |
| 1253 | } |
| 1254 | text := string(chunk) |
| 1255 | keep := 0 |
| 1256 | for _, value := range values { |
| 1257 | if value == "" { |
| 1258 | continue |
| 1259 | } |
| 1260 | max := len(value) - 1 |
| 1261 | if max > len(text) { |
| 1262 | max = len(text) |
| 1263 | } |
| 1264 | for n := max; n > keep; n-- { |
| 1265 | if strings.HasSuffix(text, value[:n]) { |
| 1266 | keep = n |
| 1267 | break |
| 1268 | } |
| 1269 | } |
| 1270 | } |
| 1271 | return keep |
| 1272 | } |
| 1273 | |
| 1274 | func (h *Handlers) writeNextTokenResponse( |
| 1275 | w http.ResponseWriter, |
| 1276 | r *http.Request, |
| 1277 | status int, |
| 1278 | auth runnerJobAuth, |
| 1279 | body map[string]any, |
| 1280 | ) { |
| 1281 | token, claims, err := h.d.RunnerJWT.Mint(runnerjwt.MintParams{ |
| 1282 | RunnerID: auth.RunnerID, |
| 1283 | JobID: auth.Claims.JobID, |
| 1284 | RunID: auth.Claims.RunID, |
| 1285 | RepoID: auth.Claims.RepoID, |
| 1286 | Purpose: runnerjwt.PurposeAPI, |
| 1287 | }) |
| 1288 | if err != nil { |
| 1289 | h.d.Logger.ErrorContext(r.Context(), "runner next-token mint failed", "job_id", auth.Claims.JobID, "error", err) |
| 1290 | writeAPIError(w, http.StatusInternalServerError, "runner token mint failed") |
| 1291 | return |
| 1292 | } |
| 1293 | metrics.ActionsRunnerJWTTotal.WithLabelValues("issued").Inc() |
| 1294 | body["next_token"] = token |
| 1295 | body["next_token_expires_at"] = time.Unix(claims.Exp, 0).UTC().Format(time.RFC3339) |
| 1296 | writeJSON(w, status, body) |
| 1297 | } |
| 1298 | |
| 1299 | type runnerClaimResponse struct { |
| 1300 | Token string `json:"token"` |
| 1301 | ExpiresAt string `json:"expires_at"` |
| 1302 | Job runnerJobPayload `json:"job"` |
| 1303 | } |
| 1304 | |
| 1305 | type runnerJobPayload struct { |
| 1306 | ID int64 `json:"id"` |
| 1307 | RunID int64 `json:"run_id"` |
| 1308 | RepoID int64 `json:"repo_id"` |
| 1309 | RunIndex int64 `json:"run_index"` |
| 1310 | WorkflowFile string `json:"workflow_file"` |
| 1311 | WorkflowName string `json:"workflow_name"` |
| 1312 | CheckoutURL string `json:"checkout_url"` |
| 1313 | CheckoutToken string `json:"checkout_token"` |
| 1314 | HeadSHA string `json:"head_sha"` |
| 1315 | HeadRef string `json:"head_ref"` |
| 1316 | Event string `json:"event"` |
| 1317 | EventPayload json.RawMessage `json:"event_payload"` |
| 1318 | JobKey string `json:"job_key"` |
| 1319 | JobName string `json:"job_name"` |
| 1320 | RunsOn string `json:"runs_on"` |
| 1321 | Needs []string `json:"needs"` |
| 1322 | If string `json:"if"` |
| 1323 | TimeoutMinutes int32 `json:"timeout_minutes"` |
| 1324 | Permissions json.RawMessage `json:"permissions"` |
| 1325 | Secrets map[string]string `json:"secrets"` |
| 1326 | MaskValues []string `json:"mask_values"` |
| 1327 | Env json.RawMessage `json:"env"` |
| 1328 | Steps []runnerStep `json:"steps"` |
| 1329 | } |
| 1330 | |
| 1331 | type runnerStep struct { |
| 1332 | ID int64 `json:"id"` |
| 1333 | Index int32 `json:"index"` |
| 1334 | StepID string `json:"step_id"` |
| 1335 | Name string `json:"name"` |
| 1336 | If string `json:"if"` |
| 1337 | Run string `json:"run"` |
| 1338 | Uses string `json:"uses"` |
| 1339 | WorkingDirectory string `json:"working_directory"` |
| 1340 | Env json.RawMessage `json:"env"` |
| 1341 | With json.RawMessage `json:"with"` |
| 1342 | ContinueOnError bool `json:"continue_on_error"` |
| 1343 | } |
| 1344 | |
| 1345 | func (h *Handlers) presentRunnerClaim( |
| 1346 | job actionsdb.ClaimQueuedWorkflowJobRow, |
| 1347 | steps []actionsdb.ListRunnerStepsForJobRow, |
| 1348 | resolvedSecrets map[string]string, |
| 1349 | token string, |
| 1350 | checkoutToken string, |
| 1351 | expiresAt time.Time, |
| 1352 | ) runnerClaimResponse { |
| 1353 | outSteps := make([]runnerStep, 0, len(steps)) |
| 1354 | for _, step := range steps { |
| 1355 | outSteps = append(outSteps, runnerStep{ |
| 1356 | ID: step.ID, |
| 1357 | Index: step.StepIndex, |
| 1358 | StepID: step.StepID, |
| 1359 | Name: step.StepName, |
| 1360 | If: step.IfExpr, |
| 1361 | Run: step.RunCommand, |
| 1362 | Uses: step.UsesAlias, |
| 1363 | WorkingDirectory: step.WorkingDirectory, |
| 1364 | Env: rawJSONOrObject(step.StepEnv), |
| 1365 | With: rawJSONOrObject(step.StepWith), |
| 1366 | ContinueOnError: step.ContinueOnError, |
| 1367 | }) |
| 1368 | } |
| 1369 | return runnerClaimResponse{ |
| 1370 | Token: token, |
| 1371 | ExpiresAt: expiresAt.UTC().Format(time.RFC3339), |
| 1372 | Job: runnerJobPayload{ |
| 1373 | ID: job.ID, |
| 1374 | RunID: job.RunID, |
| 1375 | RepoID: job.RepoID, |
| 1376 | RunIndex: job.RunIndex, |
| 1377 | WorkflowFile: job.WorkflowFile, |
| 1378 | WorkflowName: job.WorkflowName, |
| 1379 | CheckoutURL: h.checkoutURL(job.RepoOwner, job.RepoName), |
| 1380 | CheckoutToken: checkoutToken, |
| 1381 | HeadSHA: job.HeadSha, |
| 1382 | HeadRef: job.HeadRef, |
| 1383 | Event: string(job.Event), |
| 1384 | EventPayload: rawJSONOrObject(job.EventPayload), |
| 1385 | JobKey: job.JobKey, |
| 1386 | JobName: job.JobName, |
| 1387 | RunsOn: job.RunsOn, |
| 1388 | Needs: job.NeedsJobs, |
| 1389 | If: job.IfExpr, |
| 1390 | TimeoutMinutes: job.TimeoutMinutes, |
| 1391 | Permissions: rawJSONOrObject(job.Permissions), |
| 1392 | Secrets: cloneStringMap(resolvedSecrets), |
| 1393 | MaskValues: secretMaskValues(resolvedSecrets), |
| 1394 | Env: rawJSONOrObject(job.JobEnv), |
| 1395 | Steps: outSteps, |
| 1396 | }, |
| 1397 | } |
| 1398 | } |
| 1399 | |
| 1400 | func (h *Handlers) checkoutURL(owner, repoName string) string { |
| 1401 | base := strings.TrimRight(strings.TrimSpace(h.d.BaseURL), "/") |
| 1402 | if base == "" { |
| 1403 | return "" |
| 1404 | } |
| 1405 | return base + "/" + url.PathEscape(owner) + "/" + url.PathEscape(repoName) + ".git" |
| 1406 | } |
| 1407 | |
| 1408 | func rawJSONOrObject(b []byte) json.RawMessage { |
| 1409 | if len(b) == 0 || !json.Valid(b) { |
| 1410 | return json.RawMessage(`{}`) |
| 1411 | } |
| 1412 | return json.RawMessage(b) |
| 1413 | } |
| 1414 | |
| 1415 | func decodeJSONBody(r io.Reader, v any) error { |
| 1416 | dec := json.NewDecoder(r) |
| 1417 | dec.DisallowUnknownFields() |
| 1418 | return dec.Decode(v) |
| 1419 | } |
| 1420 | |
| 1421 | func decodeBase64(s string) ([]byte, error) { |
| 1422 | if out, err := base64.StdEncoding.DecodeString(s); err == nil { |
| 1423 | return out, nil |
| 1424 | } |
| 1425 | return base64.RawStdEncoding.DecodeString(s) |
| 1426 | } |
| 1427 | |
| 1428 | func validRunnerConclusion(c string) bool { |
| 1429 | switch actionsdb.CheckConclusion(c) { |
| 1430 | case actionsdb.CheckConclusionSuccess, |
| 1431 | actionsdb.CheckConclusionFailure, |
| 1432 | actionsdb.CheckConclusionNeutral, |
| 1433 | actionsdb.CheckConclusionCancelled, |
| 1434 | actionsdb.CheckConclusionSkipped, |
| 1435 | actionsdb.CheckConclusionTimedOut, |
| 1436 | actionsdb.CheckConclusionActionRequired: |
| 1437 | return true |
| 1438 | default: |
| 1439 | return false |
| 1440 | } |
| 1441 | } |
| 1442 | |
| 1443 | func nullableConclusion(c actionsdb.NullCheckConclusion) any { |
| 1444 | if !c.Valid { |
| 1445 | return nil |
| 1446 | } |
| 1447 | return string(c.CheckConclusion) |
| 1448 | } |
| 1449 |