runner: enforce workflow job timeouts
Authored by
mfwolffe <wolffemf@dukes.jmu.edu>
- SHA
f23d1f1f413d17fe5af9fa3264d10ea6bd1d829f- Parents
-
69bbad7 - Tree
cfa483f
f23d1f1
f23d1f1f413d17fe5af9fa3264d10ea6bd1d829f69bbad7
cfa483f| Status | File | + | - |
|---|---|---|---|
| M |
internal/infra/metrics/metrics.go
|
7 | 0 |
| M |
internal/runner/engine/docker.go
|
44 | 8 |
| M |
internal/runner/engine/docker_test.go
|
86 | 0 |
| M |
internal/runner/engine/types.go
|
5 | 0 |
| M |
internal/web/handlers/api/runners.go
|
11 | 0 |
| M |
internal/web/handlers/api/runners_test.go
|
82 | 0 |
internal/infra/metrics/metrics.gomodified@@ -170,6 +170,12 @@ var ( | |||
| 170 | }, | 170 | }, |
| 171 | []string{"kind"}, | 171 | []string{"kind"}, |
| 172 | ) | 172 | ) |
| 173 | + ActionsStepTimeoutsTotal = prometheus.NewCounter( | ||
| 174 | + prometheus.CounterOpts{ | ||
| 175 | + Name: "shithub_actions_step_timeouts_total", | ||
| 176 | + Help: "Total Actions steps reported as timed out by runners.", | ||
| 177 | + }, | ||
| 178 | + ) | ||
| 173 | ) | 179 | ) |
| 174 | 180 | ||
| 175 | func init() { | 181 | func init() { |
@@ -193,6 +199,7 @@ func init() { | |||
| 193 | ActionsJobsCancelledTotal, | 199 | ActionsJobsCancelledTotal, |
| 194 | ActionsLogScrubReplacementsTotal, | 200 | ActionsLogScrubReplacementsTotal, |
| 195 | ActionsRunsPrunedTotal, | 201 | ActionsRunsPrunedTotal, |
| 202 | + ActionsStepTimeoutsTotal, | ||
| 196 | ) | 203 | ) |
| 197 | } | 204 | } |
| 198 | 205 | ||
internal/runner/engine/docker.gomodified@@ -71,6 +71,7 @@ type DockerConfig struct { | |||
| 71 | LogChunkBytes int | 71 | LogChunkBytes int |
| 72 | LogFlushInterval time.Duration | 72 | LogFlushInterval time.Duration |
| 73 | StepLogLimit int64 | 73 | StepLogLimit int64 |
| 74 | + TimeoutMinute time.Duration | ||
| 74 | Stdout io.Writer | 75 | Stdout io.Writer |
| 75 | Stderr io.Writer | 76 | Stderr io.Writer |
| 76 | Runner CommandRunner | 77 | Runner CommandRunner |
@@ -100,6 +101,9 @@ func NewDocker(cfg DockerConfig) *Docker { | |||
| 100 | if cfg.StepLogLimit <= 0 { | 101 | if cfg.StepLogLimit <= 0 { |
| 101 | cfg.StepLogLimit = 10 * 1024 * 1024 | 102 | cfg.StepLogLimit = 10 * 1024 * 1024 |
| 102 | } | 103 | } |
| 104 | + if cfg.TimeoutMinute <= 0 { | ||
| 105 | + cfg.TimeoutMinute = time.Minute | ||
| 106 | + } | ||
| 103 | if cfg.Stdout == nil { | 107 | if cfg.Stdout == nil { |
| 104 | cfg.Stdout = io.Discard | 108 | cfg.Stdout = io.Discard |
| 105 | } | 109 | } |
@@ -136,7 +140,7 @@ func (d *Docker) Execute(ctx context.Context, job Job) (Outcome, error) { | |||
| 136 | defer d.closeEventStream(job.ID) | 140 | defer d.closeEventStream(job.ID) |
| 137 | if job.TimeoutMinutes > 0 { | 141 | if job.TimeoutMinutes > 0 { |
| 138 | var cancel context.CancelFunc | 142 | var cancel context.CancelFunc |
| 139 | - ctx, cancel = context.WithTimeout(ctx, time.Duration(job.TimeoutMinutes)*time.Minute) | 143 | + ctx, cancel = context.WithTimeoutCause(ctx, time.Duration(job.TimeoutMinutes)*d.cfg.TimeoutMinute, ErrJobTimedOut) |
| 140 | defer cancel() | 144 | defer cancel() |
| 141 | } | 145 | } |
| 142 | if err := os.MkdirAll(job.WorkspaceDir, 0o700); err != nil { | 146 | if err := os.MkdirAll(job.WorkspaceDir, 0o700); err != nil { |
@@ -156,12 +160,12 @@ func (d *Docker) Execute(ctx context.Context, job Job) (Outcome, error) { | |||
| 156 | CompletedAt: stepCompleted, | 160 | CompletedAt: stepCompleted, |
| 157 | } | 161 | } |
| 158 | outcome.StepOutcomes = append(outcome.StepOutcomes, stepOutcome) | 162 | outcome.StepOutcomes = append(outcome.StepOutcomes, stepOutcome) |
| 159 | - if emitErr := d.emitStepOutcome(ctx, job.ID, stepOutcome); emitErr != nil { | 163 | + if emitErr := d.emitStepOutcomeAfterRun(ctx, job.ID, stepOutcome); emitErr != nil { |
| 160 | outcome.Conclusion = conclusionForError(emitErr) | 164 | outcome.Conclusion = conclusionForError(emitErr) |
| 161 | outcome.CompletedAt = time.Now().UTC() | 165 | outcome.CompletedAt = time.Now().UTC() |
| 162 | return outcome, emitErr | 166 | return outcome, emitErr |
| 163 | } | 167 | } |
| 164 | - if step.ContinueOnError { | 168 | + if step.ContinueOnError && !errors.Is(err, ErrJobTimedOut) { |
| 165 | continue | 169 | continue |
| 166 | } | 170 | } |
| 167 | outcome.Conclusion = conclusionForError(err) | 171 | outcome.Conclusion = conclusionForError(err) |
@@ -176,7 +180,7 @@ func (d *Docker) Execute(ctx context.Context, job Job) (Outcome, error) { | |||
| 176 | CompletedAt: time.Now().UTC(), | 180 | CompletedAt: time.Now().UTC(), |
| 177 | } | 181 | } |
| 178 | outcome.StepOutcomes = append(outcome.StepOutcomes, stepOutcome) | 182 | outcome.StepOutcomes = append(outcome.StepOutcomes, stepOutcome) |
| 179 | - if err := d.emitStepOutcome(ctx, job.ID, stepOutcome); err != nil { | 183 | + if err := d.emitStepOutcomeAfterRun(ctx, job.ID, stepOutcome); err != nil { |
| 180 | outcome.Conclusion = conclusionForError(err) | 184 | outcome.Conclusion = conclusionForError(err) |
| 181 | outcome.CompletedAt = time.Now().UTC() | 185 | outcome.CompletedAt = time.Now().UTC() |
| 182 | return outcome, err | 186 | return outcome, err |
@@ -204,6 +208,15 @@ func (d *Docker) executeStep(ctx context.Context, job Job, step Step) error { | |||
| 204 | out := io.MultiWriter(d.cfg.Stdout, writer) | 208 | out := io.MultiWriter(d.cfg.Stdout, writer) |
| 205 | errOut := io.MultiWriter(d.cfg.Stderr, writer) | 209 | errOut := io.MultiWriter(d.cfg.Stderr, writer) |
| 206 | if err := d.cfg.Runner.Run(ctx, d.cfg.Binary, invocation.args, invocation.env, out, errOut); err != nil { | 210 | if err := d.cfg.Runner.Run(ctx, d.cfg.Binary, invocation.args, invocation.env, out, errOut); err != nil { |
| 211 | + if isJobTimeout(ctx, err) { | ||
| 212 | + killCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second) | ||
| 213 | + killErr := d.killContainer(killCtx, invocation.containerName) | ||
| 214 | + cancel() | ||
| 215 | + if killErr != nil { | ||
| 216 | + err = errors.Join(err, killErr) | ||
| 217 | + } | ||
| 218 | + err = fmt.Errorf("%w: %w", ErrJobTimedOut, err) | ||
| 219 | + } | ||
| 207 | d.logStep(ctx, "runner step completed", job, step, invocation, conclusionForError(err)) | 220 | d.logStep(ctx, "runner step completed", job, step, invocation, conclusionForError(err)) |
| 208 | if closeErr := writer.Close(); closeErr != nil { | 221 | if closeErr := writer.Close(); closeErr != nil { |
| 209 | return fmt.Errorf("runner engine: step %q failed: %w", stepLabel(step), errors.Join(err, closeErr)) | 222 | return fmt.Errorf("runner engine: step %q failed: %w", stepLabel(step), errors.Join(err, closeErr)) |
@@ -376,7 +389,11 @@ func (d *Docker) Cancel(ctx context.Context, jobID int64) error { | |||
| 376 | } | 389 | } |
| 377 | killCtx, cancel := context.WithTimeout(ctx, 5*time.Second) | 390 | killCtx, cancel := context.WithTimeout(ctx, 5*time.Second) |
| 378 | defer cancel() | 391 | defer cancel() |
| 379 | - if err := d.cfg.Runner.Run(killCtx, d.cfg.Binary, []string{"kill", name}, nil, d.cfg.Stdout, d.cfg.Stderr); err != nil { | 392 | + return d.killContainer(killCtx, name) |
| 393 | +} | ||
| 394 | + | ||
| 395 | +func (d *Docker) killContainer(ctx context.Context, name string) error { | ||
| 396 | + if err := d.cfg.Runner.Run(ctx, d.cfg.Binary, []string{"kill", name}, nil, d.cfg.Stdout, d.cfg.Stderr); err != nil { | ||
| 380 | return fmt.Errorf("runner engine: kill container %s: %w", name, err) | 397 | return fmt.Errorf("runner engine: kill container %s: %w", name, err) |
| 381 | } | 398 | } |
| 382 | return nil | 399 | return nil |
@@ -477,6 +494,15 @@ func (d *Docker) emitStepOutcome(ctx context.Context, jobID int64, step StepOutc | |||
| 477 | } | 494 | } |
| 478 | } | 495 | } |
| 479 | 496 | ||
| 497 | +func (d *Docker) emitStepOutcomeAfterRun(ctx context.Context, jobID int64, step StepOutcome) error { | ||
| 498 | + if ctx.Err() == nil { | ||
| 499 | + return d.emitStepOutcome(ctx, jobID, step) | ||
| 500 | + } | ||
| 501 | + emitCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second) | ||
| 502 | + defer cancel() | ||
| 503 | + return d.emitStepOutcome(emitCtx, jobID, step) | ||
| 504 | +} | ||
| 505 | + | ||
| 480 | func (d *Docker) newStepLogWriter(ctx context.Context, jobID, stepID int64, jobMasks []string) *stepLogWriter { | 506 | func (d *Docker) newStepLogWriter(ctx context.Context, jobID, stepID int64, jobMasks []string) *stepLogWriter { |
| 481 | w := &stepLogWriter{ | 507 | w := &stepLogWriter{ |
| 482 | ctx: ctx, | 508 | ctx: ctx, |
@@ -634,15 +660,25 @@ func (w *stepLogWriter) emitChunkLocked(chunk []byte) error { | |||
| 634 | } | 660 | } |
| 635 | 661 | ||
| 636 | func conclusionForError(err error) string { | 662 | func conclusionForError(err error) string { |
| 663 | + if errors.Is(err, ErrJobTimedOut) { | ||
| 664 | + return ConclusionTimedOut | ||
| 665 | + } | ||
| 637 | if errors.Is(err, context.Canceled) { | 666 | if errors.Is(err, context.Canceled) { |
| 638 | return ConclusionCancelled | 667 | return ConclusionCancelled |
| 639 | } | 668 | } |
| 640 | - if errors.Is(err, context.DeadlineExceeded) { | ||
| 641 | - return ConclusionTimedOut | ||
| 642 | - } | ||
| 643 | return ConclusionFailure | 669 | return ConclusionFailure |
| 644 | } | 670 | } |
| 645 | 671 | ||
| 672 | +func isJobTimeout(ctx context.Context, err error) bool { | ||
| 673 | + if errors.Is(err, ErrJobTimedOut) { | ||
| 674 | + return true | ||
| 675 | + } | ||
| 676 | + if !errors.Is(err, context.DeadlineExceeded) { | ||
| 677 | + return false | ||
| 678 | + } | ||
| 679 | + return errors.Is(context.Cause(ctx), ErrJobTimedOut) | ||
| 680 | +} | ||
| 681 | + | ||
| 646 | func containerWorkdir(wd string) (string, error) { | 682 | func containerWorkdir(wd string) (string, error) { |
| 647 | wd = strings.TrimSpace(wd) | 683 | wd = strings.TrimSpace(wd) |
| 648 | if wd == "" { | 684 | if wd == "" { |
internal/runner/engine/docker_test.gomodified@@ -74,6 +74,35 @@ func (r *cancellableRunner) Run(ctx context.Context, _ string, args []string, _ | |||
| 74 | } | 74 | } |
| 75 | } | 75 | } |
| 76 | 76 | ||
| 77 | +type timeoutRunner struct { | ||
| 78 | + started chan struct{} | ||
| 79 | + killed chan struct{} | ||
| 80 | + killArgs []string | ||
| 81 | + startOnce sync.Once | ||
| 82 | + killOnce sync.Once | ||
| 83 | + mu sync.Mutex | ||
| 84 | +} | ||
| 85 | + | ||
| 86 | +func newTimeoutRunner() *timeoutRunner { | ||
| 87 | + return &timeoutRunner{ | ||
| 88 | + started: make(chan struct{}), | ||
| 89 | + killed: make(chan struct{}), | ||
| 90 | + } | ||
| 91 | +} | ||
| 92 | + | ||
| 93 | +func (r *timeoutRunner) Run(ctx context.Context, _ string, args []string, _ []string, _, _ io.Writer) error { | ||
| 94 | + if len(args) > 0 && args[0] == "kill" { | ||
| 95 | + r.mu.Lock() | ||
| 96 | + r.killArgs = append([]string{}, args...) | ||
| 97 | + r.mu.Unlock() | ||
| 98 | + r.killOnce.Do(func() { close(r.killed) }) | ||
| 99 | + return nil | ||
| 100 | + } | ||
| 101 | + r.startOnce.Do(func() { close(r.started) }) | ||
| 102 | + <-ctx.Done() | ||
| 103 | + return ctx.Err() | ||
| 104 | +} | ||
| 105 | + | ||
| 77 | func TestDockerExecute_BuildsResourceCappedRunCommand(t *testing.T) { | 106 | func TestDockerExecute_BuildsResourceCappedRunCommand(t *testing.T) { |
| 78 | t.Parallel() | 107 | t.Parallel() |
| 79 | rec := &recordingRunner{} | 108 | rec := &recordingRunner{} |
@@ -436,6 +465,63 @@ func TestDockerCancelKillsActiveContainer(t *testing.T) { | |||
| 436 | } | 465 | } |
| 437 | } | 466 | } |
| 438 | 467 | ||
| 468 | +func TestDockerExecute_TimeoutKillsActiveContainerAndReportsTimedOut(t *testing.T) { | ||
| 469 | + t.Parallel() | ||
| 470 | + rec := newTimeoutRunner() | ||
| 471 | + d := NewDocker(DockerConfig{ | ||
| 472 | + DefaultImage: "runner-image", | ||
| 473 | + Network: "bridge", | ||
| 474 | + Memory: "2g", | ||
| 475 | + CPUs: "2", | ||
| 476 | + Runner: rec, | ||
| 477 | + TimeoutMinute: time.Millisecond, | ||
| 478 | + LogChunkBytes: 4, | ||
| 479 | + StepLogLimit: 1024, | ||
| 480 | + LogFlushInterval: time.Hour, | ||
| 481 | + }) | ||
| 482 | + events, err := d.StreamEvents(t.Context(), 99) | ||
| 483 | + if err != nil { | ||
| 484 | + t.Fatalf("StreamEvents: %v", err) | ||
| 485 | + } | ||
| 486 | + out, err := d.Execute(t.Context(), Job{ | ||
| 487 | + ID: 99, | ||
| 488 | + TimeoutMinutes: 1, | ||
| 489 | + WorkspaceDir: t.TempDir(), | ||
| 490 | + Steps: []Step{{ID: 123, Run: "sleep 600", ContinueOnError: true}}, | ||
| 491 | + }) | ||
| 492 | + if !errors.Is(err, ErrJobTimedOut) { | ||
| 493 | + t.Fatalf("Execute error: got %v, want ErrJobTimedOut", err) | ||
| 494 | + } | ||
| 495 | + if out.Conclusion != ConclusionTimedOut { | ||
| 496 | + t.Fatalf("Conclusion: %q", out.Conclusion) | ||
| 497 | + } | ||
| 498 | + if len(out.StepOutcomes) != 1 || | ||
| 499 | + out.StepOutcomes[0].StepID != 123 || | ||
| 500 | + out.StepOutcomes[0].Status != "completed" || | ||
| 501 | + out.StepOutcomes[0].Conclusion != ConclusionTimedOut { | ||
| 502 | + t.Fatalf("StepOutcomes: %#v", out.StepOutcomes) | ||
| 503 | + } | ||
| 504 | + select { | ||
| 505 | + case <-rec.killed: | ||
| 506 | + case <-time.After(time.Second): | ||
| 507 | + t.Fatal("timeout did not kill active container") | ||
| 508 | + } | ||
| 509 | + rec.mu.Lock() | ||
| 510 | + killArgs := append([]string{}, rec.killArgs...) | ||
| 511 | + rec.mu.Unlock() | ||
| 512 | + want := []string{"kill", "shithub-job-99-step-123"} | ||
| 513 | + if !reflect.DeepEqual(killArgs, want) { | ||
| 514 | + t.Fatalf("kill args: got %#v want %#v", killArgs, want) | ||
| 515 | + } | ||
| 516 | + var got []Event | ||
| 517 | + for event := range events { | ||
| 518 | + got = append(got, event) | ||
| 519 | + } | ||
| 520 | + if len(got) != 1 || got[0].Step == nil || got[0].Step.Conclusion != ConclusionTimedOut { | ||
| 521 | + t.Fatalf("timeout step event: %#v", got) | ||
| 522 | + } | ||
| 523 | +} | ||
| 524 | + | ||
| 439 | func TestDockerExecute_FailureMapsToFailureConclusion(t *testing.T) { | 525 | func TestDockerExecute_FailureMapsToFailureConclusion(t *testing.T) { |
| 440 | t.Parallel() | 526 | t.Parallel() |
| 441 | d := NewDocker(DockerConfig{ | 527 | d := NewDocker(DockerConfig{ |
internal/runner/engine/types.gomodified@@ -6,6 +6,7 @@ package engine | |||
| 6 | import ( | 6 | import ( |
| 7 | "context" | 7 | "context" |
| 8 | "encoding/json" | 8 | "encoding/json" |
| 9 | + "errors" | ||
| 9 | "time" | 10 | "time" |
| 10 | ) | 11 | ) |
| 11 | 12 | ||
@@ -16,6 +17,10 @@ const ( | |||
| 16 | ConclusionTimedOut = "timed_out" | 17 | ConclusionTimedOut = "timed_out" |
| 17 | ) | 18 | ) |
| 18 | 19 | ||
| 20 | +// ErrJobTimedOut marks an execution failure caused by the workflow job's | ||
| 21 | +// timeout-minutes deadline, not by runner shutdown or user cancellation. | ||
| 22 | +var ErrJobTimedOut = errors.New("runner engine: job timed out") | ||
| 23 | + | ||
| 19 | type Engine interface { | 24 | type Engine interface { |
| 20 | Execute(ctx context.Context, job Job) (Outcome, error) | 25 | Execute(ctx context.Context, job Job) (Outcome, error) |
| 21 | StreamLogs(ctx context.Context, jobID int64) (<-chan LogChunk, error) | 26 | StreamLogs(ctx context.Context, jobID int64) (<-chan LogChunk, error) |
internal/web/handlers/api/runners.gomodified@@ -459,6 +459,7 @@ func (h *Handlers) runnerStepStatus(w http.ResponseWriter, r *http.Request) { | |||
| 459 | writeAPIError(w, http.StatusInternalServerError, "step status update failed") | 459 | writeAPIError(w, http.StatusInternalServerError, "step status update failed") |
| 460 | return | 460 | return |
| 461 | } | 461 | } |
| 462 | + recordStepTimeout(step, updated) | ||
| 462 | h.writeNextTokenResponse(w, r, http.StatusOK, auth, map[string]any{ | 463 | h.writeNextTokenResponse(w, r, http.StatusOK, auth, map[string]any{ |
| 463 | "status": string(updated.Status), | 464 | "status": string(updated.Status), |
| 464 | "conclusion": nullableConclusion(updated.Conclusion), | 465 | "conclusion": nullableConclusion(updated.Conclusion), |
@@ -622,6 +623,16 @@ func validWorkflowStepTransition(from, to actionsdb.WorkflowStepStatus) bool { | |||
| 622 | } | 623 | } |
| 623 | } | 624 | } |
| 624 | 625 | ||
| 626 | +func recordStepTimeout(before, after actionsdb.WorkflowStep) { | ||
| 627 | + if !after.Conclusion.Valid || after.Conclusion.CheckConclusion != actionsdb.CheckConclusionTimedOut { | ||
| 628 | + return | ||
| 629 | + } | ||
| 630 | + if before.Conclusion.Valid && before.Conclusion.CheckConclusion == actionsdb.CheckConclusionTimedOut { | ||
| 631 | + return | ||
| 632 | + } | ||
| 633 | + metrics.ActionsStepTimeoutsTotal.Inc() | ||
| 634 | +} | ||
| 635 | + | ||
| 625 | func (h *Handlers) applyStepStatus( | 636 | func (h *Handlers) applyStepStatus( |
| 626 | ctx context.Context, | 637 | ctx context.Context, |
| 627 | step actionsdb.WorkflowStep, | 638 | step actionsdb.WorkflowStep, |
internal/web/handlers/api/runners_test.gomodified@@ -19,6 +19,7 @@ import ( | |||
| 19 | "github.com/go-chi/chi/v5" | 19 | "github.com/go-chi/chi/v5" |
| 20 | "github.com/jackc/pgx/v5/pgtype" | 20 | "github.com/jackc/pgx/v5/pgtype" |
| 21 | "github.com/jackc/pgx/v5/pgxpool" | 21 | "github.com/jackc/pgx/v5/pgxpool" |
| 22 | + dto "github.com/prometheus/client_model/go" | ||
| 22 | 23 | ||
| 23 | "github.com/tenseleyFlow/shithub/internal/actions/finalize" | 24 | "github.com/tenseleyFlow/shithub/internal/actions/finalize" |
| 24 | "github.com/tenseleyFlow/shithub/internal/actions/runnertoken" | 25 | "github.com/tenseleyFlow/shithub/internal/actions/runnertoken" |
@@ -29,6 +30,7 @@ import ( | |||
| 29 | "github.com/tenseleyFlow/shithub/internal/auth/pat" | 30 | "github.com/tenseleyFlow/shithub/internal/auth/pat" |
| 30 | "github.com/tenseleyFlow/shithub/internal/auth/runnerjwt" | 31 | "github.com/tenseleyFlow/shithub/internal/auth/runnerjwt" |
| 31 | "github.com/tenseleyFlow/shithub/internal/auth/secretbox" | 32 | "github.com/tenseleyFlow/shithub/internal/auth/secretbox" |
| 33 | + "github.com/tenseleyFlow/shithub/internal/infra/metrics" | ||
| 32 | "github.com/tenseleyFlow/shithub/internal/infra/storage" | 34 | "github.com/tenseleyFlow/shithub/internal/infra/storage" |
| 33 | repogit "github.com/tenseleyFlow/shithub/internal/repos/git" | 35 | repogit "github.com/tenseleyFlow/shithub/internal/repos/git" |
| 34 | reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc" | 36 | reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc" |
@@ -448,6 +450,74 @@ func TestRunnerStepStatusEnqueuesFinalizeWorker(t *testing.T) { | |||
| 448 | } | 450 | } |
| 449 | } | 451 | } |
| 450 | 452 | ||
| 453 | +func TestRunnerStepStatusRecordsTimeoutMetricOnce(t *testing.T) { | ||
| 454 | + pool := dbtest.NewTestDB(t) | ||
| 455 | + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) | ||
| 456 | + repoID, userID := setupRunnerAPIRepo(t, pool) | ||
| 457 | + enqueueRunnerAPIRun(t, pool, logger, repoID, userID) | ||
| 458 | + token, _ := registerRunnerForTest(t, pool, []string{"ubuntu-latest"}, 1) | ||
| 459 | + router := newRunnerAPIRouter(t, pool, logger, runnerAPISigner(t, time.Now())) | ||
| 460 | + | ||
| 461 | + req := httptest.NewRequest(http.MethodPost, "/api/v1/runners/heartbeat", | ||
| 462 | + strings.NewReader(`{"labels":["ubuntu-latest"],"capacity":1}`)) | ||
| 463 | + req.Header.Set("Authorization", "Bearer "+token) | ||
| 464 | + rr := httptest.NewRecorder() | ||
| 465 | + router.ServeHTTP(rr, req) | ||
| 466 | + if rr.Code != http.StatusOK { | ||
| 467 | + t.Fatalf("heartbeat status: got %d, want 200; body=%s", rr.Code, rr.Body.String()) | ||
| 468 | + } | ||
| 469 | + var claim struct { | ||
| 470 | + Token string `json:"token"` | ||
| 471 | + Job struct { | ||
| 472 | + ID int64 `json:"id"` | ||
| 473 | + Steps []struct { | ||
| 474 | + ID int64 `json:"id"` | ||
| 475 | + } `json:"steps"` | ||
| 476 | + } `json:"job"` | ||
| 477 | + } | ||
| 478 | + if err := json.Unmarshal(rr.Body.Bytes(), &claim); err != nil { | ||
| 479 | + t.Fatalf("decode claim: %v", err) | ||
| 480 | + } | ||
| 481 | + if len(claim.Job.Steps) == 0 { | ||
| 482 | + t.Fatalf("claim steps: %+v", claim.Job.Steps) | ||
| 483 | + } | ||
| 484 | + stepID := claim.Job.Steps[0].ID | ||
| 485 | + before := actionsStepTimeoutsValue(t) | ||
| 486 | + | ||
| 487 | + req = httptest.NewRequest(http.MethodPost, fmt.Sprintf("/api/v1/jobs/%d/steps/%d/status", claim.Job.ID, stepID), | ||
| 488 | + strings.NewReader(`{"status":"completed","conclusion":"timed_out"}`)) | ||
| 489 | + req.Header.Set("Authorization", "Bearer "+claim.Token) | ||
| 490 | + rr = httptest.NewRecorder() | ||
| 491 | + router.ServeHTTP(rr, req) | ||
| 492 | + if rr.Code != http.StatusOK { | ||
| 493 | + t.Fatalf("step status: got %d, want 200; body=%s", rr.Code, rr.Body.String()) | ||
| 494 | + } | ||
| 495 | + var statusResp struct { | ||
| 496 | + NextToken string `json:"next_token"` | ||
| 497 | + } | ||
| 498 | + if err := json.Unmarshal(rr.Body.Bytes(), &statusResp); err != nil { | ||
| 499 | + t.Fatalf("decode status response: %v", err) | ||
| 500 | + } | ||
| 501 | + if statusResp.NextToken == "" { | ||
| 502 | + t.Fatalf("missing next token: %s", rr.Body.String()) | ||
| 503 | + } | ||
| 504 | + if got := actionsStepTimeoutsValue(t); got != before+1 { | ||
| 505 | + t.Fatalf("timeout metric after first report: got %v, want %v", got, before+1) | ||
| 506 | + } | ||
| 507 | + | ||
| 508 | + req = httptest.NewRequest(http.MethodPost, fmt.Sprintf("/api/v1/jobs/%d/steps/%d/status", claim.Job.ID, stepID), | ||
| 509 | + strings.NewReader(`{"status":"completed","conclusion":"timed_out"}`)) | ||
| 510 | + req.Header.Set("Authorization", "Bearer "+statusResp.NextToken) | ||
| 511 | + rr = httptest.NewRecorder() | ||
| 512 | + router.ServeHTTP(rr, req) | ||
| 513 | + if rr.Code != http.StatusOK { | ||
| 514 | + t.Fatalf("duplicate step status: got %d, want 200; body=%s", rr.Code, rr.Body.String()) | ||
| 515 | + } | ||
| 516 | + if got := actionsStepTimeoutsValue(t); got != before+1 { | ||
| 517 | + t.Fatalf("timeout metric after duplicate report: got %v, want still %v", got, before+1) | ||
| 518 | + } | ||
| 519 | +} | ||
| 520 | + | ||
| 451 | func TestWorkflowJobCancelAPIRequestsCancellation(t *testing.T) { | 521 | func TestWorkflowJobCancelAPIRequestsCancellation(t *testing.T) { |
| 452 | ctx := context.Background() | 522 | ctx := context.Background() |
| 453 | pool := dbtest.NewTestDB(t) | 523 | pool := dbtest.NewTestDB(t) |
@@ -667,6 +737,18 @@ func newRunnerAPIRouterWithSecretBox( | |||
| 667 | return r | 737 | return r |
| 668 | } | 738 | } |
| 669 | 739 | ||
| 740 | +func actionsStepTimeoutsValue(t *testing.T) float64 { | ||
| 741 | + t.Helper() | ||
| 742 | + var metric dto.Metric | ||
| 743 | + if err := metrics.ActionsStepTimeoutsTotal.Write(&metric); err != nil { | ||
| 744 | + t.Fatalf("read timeout metric: %v", err) | ||
| 745 | + } | ||
| 746 | + if metric.Counter == nil { | ||
| 747 | + return 0 | ||
| 748 | + } | ||
| 749 | + return metric.Counter.GetValue() | ||
| 750 | +} | ||
| 751 | + | ||
| 670 | const runnerAPIOldWorkflow = `name: CI | 752 | const runnerAPIOldWorkflow = `name: CI |
| 671 | on: push | 753 | on: push |
| 672 | jobs: | 754 | jobs: |