tenseleyflow/shithub / 4fc3462

Browse files

runner: poll cancellation and stop active containers

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
4fc3462a651363c8271459699365a4dbffe820cb
Parents
105eb12
Tree
1f7caae

5 changed files

StatusFile+-
M internal/runner/api/client_test.go 26 0
M internal/runner/engine/docker.go 55 3
M internal/runner/engine/docker_test.go 80 4
M internal/runner/runner.go 114 35
M internal/runner/runner_test.go 97 5
internal/runner/api/client_test.gomodified
@@ -153,3 +153,29 @@ func TestAppendLog_Base64EncodesChunk(t *testing.T) {
153153
 		t.Fatalf("AppendLog: %v", err)
154154
 	}
155155
 }
156
+
157
+func TestCancelCheck_UsesJobTokenAndParsesResponse(t *testing.T) {
158
+	t.Parallel()
159
+	srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
160
+		if r.URL.Path != "/api/v1/jobs/10/cancel-check" {
161
+			t.Fatalf("path: %s", r.URL.Path)
162
+		}
163
+		if got := r.Header.Get("Authorization"); got != "Bearer job-token" {
164
+			t.Fatalf("Authorization: %q", got)
165
+		}
166
+		w.Header().Set("Content-Type", "application/json")
167
+		_, _ = w.Write([]byte(`{"cancelled":true,"next_token":"next","next_token_expires_at":"2026-05-10T21:15:00Z"}`))
168
+	}))
169
+	defer srv.Close()
170
+	client, err := New(Config{BaseURL: srv.URL, RunnerToken: "runner-token", HTTPClient: srv.Client()})
171
+	if err != nil {
172
+		t.Fatalf("New: %v", err)
173
+	}
174
+	resp, err := client.CancelCheck(t.Context(), 10, "job-token")
175
+	if err != nil {
176
+		t.Fatalf("CancelCheck: %v", err)
177
+	}
178
+	if !resp.Cancelled || resp.NextToken != "next" {
179
+		t.Fatalf("response: %#v", resp)
180
+	}
181
+}
internal/runner/engine/docker.gomodified
@@ -83,6 +83,7 @@ type Docker struct {
8383
 	cfg       DockerConfig
8484
 	streams   map[int64]chan LogChunk
8585
 	eventSubs map[int64]chan Event
86
+	active    map[int64]string
8687
 	mu        sync.Mutex
8788
 }
8889
 
@@ -120,7 +121,12 @@ func NewDocker(cfg DockerConfig) *Docker {
120121
 	if cfg.Logger == nil {
121122
 		cfg.Logger = slog.New(slog.NewTextHandler(io.Discard, nil))
122123
 	}
123
-	return &Docker{cfg: cfg, streams: make(map[int64]chan LogChunk), eventSubs: make(map[int64]chan Event)}
124
+	return &Docker{
125
+		cfg:       cfg,
126
+		streams:   make(map[int64]chan LogChunk),
127
+		eventSubs: make(map[int64]chan Event),
128
+		active:    make(map[int64]string),
129
+	}
124130
 }
125131
 
126132
 func (d *Docker) Execute(ctx context.Context, job Job) (Outcome, error) {
@@ -191,6 +197,8 @@ func (d *Docker) executeStep(ctx context.Context, job Job, step Step) error {
191197
 	if err != nil {
192198
 		return err
193199
 	}
200
+	d.setActiveContainer(job.ID, invocation.containerName)
201
+	defer d.clearActiveContainer(job.ID, invocation.containerName)
194202
 	d.logStep(ctx, "runner step starting", job, step, invocation, "")
195203
 	writer := d.newStepLogWriter(ctx, job.ID, step.ID, job.MaskValues)
196204
 	out := io.MultiWriter(d.cfg.Stdout, writer)
@@ -212,6 +220,7 @@ func (d *Docker) executeStep(ctx context.Context, job Job, step Step) error {
212220
 type dockerInvocation struct {
213221
 	args           []string
214222
 	env            []string
223
+	containerName  string
215224
 	image          string
216225
 	network        string
217226
 	memory         string
@@ -246,9 +255,11 @@ func (d *Docker) dockerInvocation(job Job, step Step) (dockerInvocation, error)
246255
 	if d.cfg.AllowRoot && permissionsRequestRoot(job.Permissions) {
247256
 		user = "0:0"
248257
 	}
258
+	containerName := dockerContainerName(job, step)
249259
 	args := []string{
250260
 		"run",
251261
 		"--rm",
262
+		"--name", containerName,
252263
 		"--network=" + d.cfg.Network,
253264
 		"--memory=" + d.cfg.Memory,
254265
 		"--cpus=" + d.cfg.CPUs,
@@ -286,6 +297,7 @@ func (d *Docker) dockerInvocation(job Job, step Step) (dockerInvocation, error)
286297
 	return dockerInvocation{
287298
 		args:           args,
288299
 		env:            processEnv,
300
+		containerName:  containerName,
289301
 		image:          image,
290302
 		network:        d.cfg.Network,
291303
 		memory:         d.cfg.Memory,
@@ -357,8 +369,40 @@ func (d *Docker) StreamEvents(_ context.Context, jobID int64) (<-chan Event, err
357369
 	return d.ensureEventStream(jobID), nil
358370
 }
359371
 
360
-func (d *Docker) Cancel(_ context.Context, _ int64) error {
361
-	return ErrUnsupported
372
+func (d *Docker) Cancel(ctx context.Context, jobID int64) error {
373
+	name := d.activeContainer(jobID)
374
+	if name == "" {
375
+		return nil
376
+	}
377
+	killCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
378
+	defer cancel()
379
+	if err := d.cfg.Runner.Run(killCtx, d.cfg.Binary, []string{"kill", name}, nil, d.cfg.Stdout, d.cfg.Stderr); err != nil {
380
+		return fmt.Errorf("runner engine: kill container %s: %w", name, err)
381
+	}
382
+	return nil
383
+}
384
+
385
+func (d *Docker) setActiveContainer(jobID int64, name string) {
386
+	if name == "" {
387
+		return
388
+	}
389
+	d.mu.Lock()
390
+	d.active[jobID] = name
391
+	d.mu.Unlock()
392
+}
393
+
394
+func (d *Docker) clearActiveContainer(jobID int64, name string) {
395
+	d.mu.Lock()
396
+	if d.active[jobID] == name {
397
+		delete(d.active, jobID)
398
+	}
399
+	d.mu.Unlock()
400
+}
401
+
402
+func (d *Docker) activeContainer(jobID int64) string {
403
+	d.mu.Lock()
404
+	defer d.mu.Unlock()
405
+	return d.active[jobID]
362406
 }
363407
 
364408
 func (d *Docker) ensureStream(jobID int64) chan LogChunk {
@@ -614,6 +658,14 @@ func containerWorkdir(wd string) (string, error) {
614658
 	return clean, nil
615659
 }
616660
 
661
+func dockerContainerName(job Job, step Step) string {
662
+	stepID := step.ID
663
+	if stepID == 0 {
664
+		stepID = int64(step.Index)
665
+	}
666
+	return fmt.Sprintf("shithub-job-%d-step-%d", job.ID, stepID)
667
+}
668
+
617669
 var envNameRE = regexp.MustCompile(`^[A-Za-z_][A-Za-z0-9_]*$`)
618670
 
619671
 func validateEnv(env map[string]string) (map[string]string, error) {
internal/runner/engine/docker_test.gomodified
@@ -8,6 +8,7 @@ import (
88
 	"io"
99
 	"reflect"
1010
 	"strings"
11
+	"sync"
1112
 	"testing"
1213
 	"time"
1314
 )
@@ -42,6 +43,37 @@ func (secretLoggingRunner) Run(_ context.Context, _ string, _ []string, _ []stri
4243
 	return nil
4344
 }
4445
 
46
+type cancellableRunner struct {
47
+	started  chan struct{}
48
+	killed   chan struct{}
49
+	killArgs []string
50
+	mu       sync.Mutex
51
+}
52
+
53
+func newCancellableRunner() *cancellableRunner {
54
+	return &cancellableRunner{
55
+		started: make(chan struct{}),
56
+		killed:  make(chan struct{}),
57
+	}
58
+}
59
+
60
+func (r *cancellableRunner) Run(ctx context.Context, _ string, args []string, _ []string, _, _ io.Writer) error {
61
+	if len(args) > 0 && args[0] == "kill" {
62
+		r.mu.Lock()
63
+		r.killArgs = append([]string{}, args...)
64
+		r.mu.Unlock()
65
+		close(r.killed)
66
+		return nil
67
+	}
68
+	close(r.started)
69
+	select {
70
+	case <-r.killed:
71
+		return context.Canceled
72
+	case <-ctx.Done():
73
+		return ctx.Err()
74
+	}
75
+}
76
+
4577
 func TestDockerExecute_BuildsResourceCappedRunCommand(t *testing.T) {
4678
 	t.Parallel()
4779
 	rec := &recordingRunner{}
@@ -73,7 +105,8 @@ func TestDockerExecute_BuildsResourceCappedRunCommand(t *testing.T) {
73105
 		t.Fatalf("Conclusion: %q", out.Conclusion)
74106
 	}
75107
 	want := []string{
76
-		"run", "--rm", "--network=none", "--memory=2g", "--cpus=2",
108
+		"run", "--rm", "--name", "shithub-job-1-step-0",
109
+		"--network=none", "--memory=2g", "--cpus=2",
77110
 		"--pids-limit=512", "--read-only",
78111
 		"--tmpfs", "/tmp:rw,exec,nosuid,nodev,size=1g",
79112
 		"--cap-drop=ALL", "--cap-add=DAC_OVERRIDE", "--cap-add=SETGID", "--cap-add=SETUID",
@@ -81,7 +114,7 @@ func TestDockerExecute_BuildsResourceCappedRunCommand(t *testing.T) {
81114
 		"--ulimit", "nofile=4096:4096", "--ulimit", "nproc=512:512",
82115
 		"--user", "65534:65534",
83116
 		"--workdir=/workspace/subdir",
84
-		"--mount", rec.args[23],
117
+		"--mount", rec.args[25],
85118
 		"--env", "A", "--env", "B",
86119
 		"runner-image", "bash", "-c", "echo hi",
87120
 	}
@@ -91,8 +124,8 @@ func TestDockerExecute_BuildsResourceCappedRunCommand(t *testing.T) {
91124
 	if !reflect.DeepEqual(rec.args, want) {
92125
 		t.Fatalf("args:\ngot  %#v\nwant %#v", rec.args, want)
93126
 	}
94
-	if !strings.HasPrefix(rec.args[23], "type=bind,src=") || !strings.HasSuffix(rec.args[23], ",dst=/workspace,rw") {
95
-		t.Fatalf("workspace mount arg: %q", rec.args[23])
127
+	if !strings.HasPrefix(rec.args[25], "type=bind,src=") || !strings.HasSuffix(rec.args[25], ",dst=/workspace,rw") {
128
+		t.Fatalf("workspace mount arg: %q", rec.args[25])
96129
 	}
97130
 	if wantEnv := []string{"A=job", "B=step"}; !reflect.DeepEqual(rec.env, wantEnv) {
98131
 		t.Fatalf("env:\ngot  %#v\nwant %#v", rec.env, wantEnv)
@@ -360,6 +393,49 @@ func TestDockerExecute_StreamsOrderedEvents(t *testing.T) {
360393
 	}
361394
 }
362395
 
396
+func TestDockerCancelKillsActiveContainer(t *testing.T) {
397
+	t.Parallel()
398
+	rec := newCancellableRunner()
399
+	d := NewDocker(DockerConfig{
400
+		DefaultImage: "runner-image",
401
+		Network:      "bridge",
402
+		Memory:       "2g",
403
+		CPUs:         "2",
404
+		Runner:       rec,
405
+	})
406
+	type executeResult struct {
407
+		out Outcome
408
+		err error
409
+	}
410
+	done := make(chan executeResult, 1)
411
+	go func() {
412
+		out, err := d.Execute(t.Context(), Job{
413
+			ID:           99,
414
+			WorkspaceDir: t.TempDir(),
415
+			Steps:        []Step{{ID: 123, Run: "sleep 600"}},
416
+		})
417
+		done <- executeResult{out: out, err: err}
418
+	}()
419
+	<-rec.started
420
+	if err := d.Cancel(t.Context(), 99); err != nil {
421
+		t.Fatalf("Cancel: %v", err)
422
+	}
423
+	res := <-done
424
+	if !errors.Is(res.err, context.Canceled) {
425
+		t.Fatalf("Execute error: %v", res.err)
426
+	}
427
+	if res.out.Conclusion != ConclusionCancelled {
428
+		t.Fatalf("Conclusion: %q", res.out.Conclusion)
429
+	}
430
+	rec.mu.Lock()
431
+	killArgs := append([]string{}, rec.killArgs...)
432
+	rec.mu.Unlock()
433
+	want := []string{"kill", "shithub-job-99-step-123"}
434
+	if !reflect.DeepEqual(killArgs, want) {
435
+		t.Fatalf("kill args: got %#v want %#v", killArgs, want)
436
+	}
437
+}
438
+
363439
 func TestDockerExecute_FailureMapsToFailureConclusion(t *testing.T) {
364440
 	t.Parallel()
365441
 	d := NewDocker(DockerConfig{
internal/runner/runner.gomodified
@@ -10,6 +10,7 @@ import (
1010
 	"io"
1111
 	"log/slog"
1212
 	"sync"
13
+	"sync/atomic"
1314
 	"time"
1415
 
1516
 	"github.com/tenseleyFlow/shithub/internal/runner/api"
@@ -21,6 +22,7 @@ type API interface {
2122
 	UpdateStatus(ctx context.Context, jobID int64, token string, req api.StatusRequest) (api.StatusResponse, error)
2223
 	UpdateStepStatus(ctx context.Context, jobID, stepID int64, token string, req api.StatusRequest) (api.StepStatusResponse, error)
2324
 	AppendLog(ctx context.Context, jobID int64, token string, req api.LogRequest) (api.LogResponse, error)
25
+	CancelCheck(ctx context.Context, jobID int64, token string) (api.CancelCheckResponse, error)
2426
 }
2527
 
2628
 type Workspaces interface {
@@ -31,29 +33,31 @@ type Workspaces interface {
3133
 type SleepFunc func(ctx context.Context, d time.Duration) error
3234
 
3335
 type Options struct {
34
-	API          API
35
-	Engine       engine.Engine
36
-	Workspaces   Workspaces
37
-	Logger       *slog.Logger
38
-	Labels       []string
39
-	Capacity     int
40
-	PollInterval time.Duration
41
-	DefaultImage string
42
-	Clock        func() time.Time
43
-	Sleep        SleepFunc
36
+	API                API
37
+	Engine             engine.Engine
38
+	Workspaces         Workspaces
39
+	Logger             *slog.Logger
40
+	Labels             []string
41
+	Capacity           int
42
+	PollInterval       time.Duration
43
+	CancelPollInterval time.Duration
44
+	DefaultImage       string
45
+	Clock              func() time.Time
46
+	Sleep              SleepFunc
4447
 }
4548
 
4649
 type Runner struct {
47
-	api          API
48
-	engine       engine.Engine
49
-	workspaces   Workspaces
50
-	logger       *slog.Logger
51
-	labels       []string
52
-	capacity     int
53
-	pollInterval time.Duration
54
-	defaultImage string
55
-	clock        func() time.Time
56
-	sleep        SleepFunc
50
+	api                API
51
+	engine             engine.Engine
52
+	workspaces         Workspaces
53
+	logger             *slog.Logger
54
+	labels             []string
55
+	capacity           int
56
+	pollInterval       time.Duration
57
+	cancelPollInterval time.Duration
58
+	defaultImage       string
59
+	clock              func() time.Time
60
+	sleep              SleepFunc
5761
 }
5862
 
5963
 func New(opts Options) *Runner {
@@ -73,21 +77,26 @@ func New(opts Options) *Runner {
7377
 	if poll <= 0 {
7478
 		poll = 5 * time.Second
7579
 	}
80
+	cancelPoll := opts.CancelPollInterval
81
+	if cancelPoll <= 0 {
82
+		cancelPoll = 2 * time.Second
83
+	}
7684
 	capacity := opts.Capacity
7785
 	if capacity <= 0 {
7886
 		capacity = 1
7987
 	}
8088
 	return &Runner{
81
-		api:          opts.API,
82
-		engine:       opts.Engine,
83
-		workspaces:   opts.Workspaces,
84
-		logger:       logger,
85
-		labels:       append([]string{}, opts.Labels...),
86
-		capacity:     capacity,
87
-		pollInterval: poll,
88
-		defaultImage: opts.DefaultImage,
89
-		clock:        clock,
90
-		sleep:        sleep,
89
+		api:                opts.API,
90
+		engine:             opts.Engine,
91
+		workspaces:         opts.Workspaces,
92
+		logger:             logger,
93
+		labels:             append([]string{}, opts.Labels...),
94
+		capacity:           capacity,
95
+		pollInterval:       poll,
96
+		cancelPollInterval: cancelPoll,
97
+		defaultImage:       opts.DefaultImage,
98
+		clock:              clock,
99
+		sleep:              sleep,
91100
 	}
92101
 }
93102
 
@@ -165,7 +174,19 @@ func (r *Runner) RunOnce(ctx context.Context) (bool, error) {
165174
 		}()
166175
 	}
167176
 
168
-	outcome, execErr := r.engine.Execute(ctx, toEngineJob(claim.Job, workspaceDir, r.defaultImage))
177
+	execCtx, execCancel := context.WithCancel(ctx)
178
+	watchCtx, stopCancelWatch := context.WithCancel(ctx)
179
+	cancelRequested := atomic.Bool{}
180
+	cancelWatchErr := make(chan error, 1)
181
+	go func() {
182
+		cancelWatchErr <- r.watchCancel(watchCtx, session, claim.Job.ID, execCancel, &cancelRequested)
183
+	}()
184
+
185
+	outcome, execErr := r.engine.Execute(execCtx, toEngineJob(claim.Job, workspaceDir, r.defaultImage))
186
+	stopCancelWatch()
187
+	if err := <-cancelWatchErr; err != nil {
188
+		return true, fmt.Errorf("watch job cancellation: %w", err)
189
+	}
169190
 	if err := <-drainErr; err != nil {
170191
 		return true, fmt.Errorf("stream runner events: %w", err)
171192
 	}
@@ -188,6 +209,11 @@ func (r *Runner) RunOnce(ctx context.Context) (bool, error) {
188209
 	if conclusion == "" {
189210
 		conclusion = engine.ConclusionFailure
190211
 	}
212
+	finalStatus := "completed"
213
+	if cancelRequested.Load() {
214
+		finalStatus = "cancelled"
215
+		conclusion = engine.ConclusionCancelled
216
+	}
191217
 	completed := outcome.CompletedAt
192218
 	if completed.IsZero() {
193219
 		completed = r.clock()
@@ -195,28 +221,68 @@ func (r *Runner) RunOnce(ctx context.Context) (bool, error) {
195221
 	if outcome.StartedAt.IsZero() {
196222
 		outcome.StartedAt = started
197223
 	}
198
-	if err := r.complete(ctx, session, conclusion, outcome.StartedAt, completed); err != nil {
224
+	if err := r.finish(ctx, session, finalStatus, conclusion, outcome.StartedAt, completed); err != nil {
199225
 		return true, err
200226
 	}
201
-	if execErr != nil {
227
+	if execErr != nil && !cancelRequested.Load() {
202228
 		r.logger.WarnContext(ctx, "job completed with failing engine outcome", "job_id", claim.Job.ID, "conclusion", conclusion, "error", execErr)
203229
 	}
204230
 	return true, nil
205231
 }
206232
 
207233
 func (r *Runner) complete(ctx context.Context, session *jobSession, conclusion string, started, completed time.Time) error {
234
+	return r.finish(ctx, session, "completed", conclusion, started, completed)
235
+}
236
+
237
+func (r *Runner) finish(ctx context.Context, session *jobSession, status, conclusion string, started, completed time.Time) error {
208238
 	_, err := session.UpdateStatus(ctx, api.StatusRequest{
209
-		Status:      "completed",
239
+		Status:      status,
210240
 		Conclusion:  conclusion,
211241
 		StartedAt:   started,
212242
 		CompletedAt: completed,
213243
 	})
214244
 	if err != nil {
215
-		return fmt.Errorf("mark job completed: %w", err)
245
+		return fmt.Errorf("mark job %s: %w", status, err)
216246
 	}
217247
 	return nil
218248
 }
219249
 
250
+func (r *Runner) watchCancel(
251
+	ctx context.Context,
252
+	session *jobSession,
253
+	jobID int64,
254
+	execCancel context.CancelFunc,
255
+	cancelRequested *atomic.Bool,
256
+) error {
257
+	for {
258
+		if err := r.sleep(ctx, r.cancelPollInterval); err != nil {
259
+			if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
260
+				return nil
261
+			}
262
+			return err
263
+		}
264
+		resp, err := session.CancelCheck(ctx)
265
+		if err != nil {
266
+			if ctx.Err() != nil {
267
+				return nil
268
+			}
269
+			r.logger.WarnContext(ctx, "runner cancel check failed", "job_id", jobID, "error", err)
270
+			continue
271
+		}
272
+		if !resp.Cancelled {
273
+			continue
274
+		}
275
+		cancelRequested.Store(true)
276
+		killCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
277
+		if err := r.engine.Cancel(killCtx, jobID); err != nil && !errors.Is(err, engine.ErrUnsupported) {
278
+			r.logger.WarnContext(ctx, "runner engine cancel failed", "job_id", jobID, "error", err)
279
+		}
280
+		cancel()
281
+		execCancel()
282
+		return nil
283
+	}
284
+}
285
+
220286
 type jobSession struct {
221287
 	api   API
222288
 	jobID int64
@@ -274,6 +340,19 @@ func (s *jobSession) AppendLog(ctx context.Context, chunk engine.LogChunk) error
274340
 	return nil
275341
 }
276342
 
343
+func (s *jobSession) CancelCheck(ctx context.Context) (api.CancelCheckResponse, error) {
344
+	s.mu.Lock()
345
+	defer s.mu.Unlock()
346
+	resp, err := s.api.CancelCheck(ctx, s.jobID, s.token)
347
+	if err != nil {
348
+		return resp, err
349
+	}
350
+	if resp.NextToken != "" {
351
+		s.token = resp.NextToken
352
+	}
353
+	return resp, nil
354
+}
355
+
277356
 func drainLogs(ctx context.Context, session *jobSession, logs <-chan engine.LogChunk) error {
278357
 	for {
279358
 		select {
internal/runner/runner_test.gomodified
@@ -18,6 +18,8 @@ type fakeAPI struct {
1818
 	statuses     []api.StatusRequest
1919
 	stepStatuses []api.StatusRequest
2020
 	logs         []api.LogRequest
21
+	cancelChecks int
22
+	cancelled    bool
2123
 	tokens       []string
2224
 	next         int
2325
 }
@@ -47,16 +49,23 @@ func (f *fakeAPI) AppendLog(_ context.Context, _ int64, token string, req api.Lo
4749
 	return api.LogResponse{NextToken: f.nextToken()}, nil
4850
 }
4951
 
52
+func (f *fakeAPI) CancelCheck(_ context.Context, _ int64, token string) (api.CancelCheckResponse, error) {
53
+	f.tokens = append(f.tokens, token)
54
+	f.cancelChecks++
55
+	return api.CancelCheckResponse{Cancelled: f.cancelled, NextToken: f.nextToken()}, nil
56
+}
57
+
5058
 func (f *fakeAPI) nextToken() string {
5159
 	f.next++
5260
 	return "next-token-" + strconv.Itoa(f.next)
5361
 }
5462
 
5563
 type fakeEngine struct {
56
-	job  engine.Job
57
-	out  engine.Outcome
58
-	logs []engine.LogChunk
59
-	err  error
64
+	job       engine.Job
65
+	out       engine.Outcome
66
+	logs      []engine.LogChunk
67
+	err       error
68
+	cancelled bool
6069
 }
6170
 
6271
 func (f *fakeEngine) Execute(_ context.Context, job engine.Job) (engine.Outcome, error) {
@@ -73,7 +82,10 @@ func (f *fakeEngine) StreamLogs(_ context.Context, _ int64) (<-chan engine.LogCh
7382
 	return ch, nil
7483
 }
7584
 
76
-func (f *fakeEngine) Cancel(_ context.Context, _ int64) error { return nil }
85
+func (f *fakeEngine) Cancel(_ context.Context, _ int64) error {
86
+	f.cancelled = true
87
+	return nil
88
+}
7789
 
7890
 type fakeEventEngine struct {
7991
 	fakeEngine
@@ -95,6 +107,42 @@ func (f *fakeEventEngine) Execute(ctx context.Context, job engine.Job) (engine.O
95107
 	return out, err
96108
 }
97109
 
110
+type cancelBlockingEngine struct {
111
+	job       engine.Job
112
+	logs      chan engine.LogChunk
113
+	started   chan struct{}
114
+	cancelled bool
115
+}
116
+
117
+func newCancelBlockingEngine() *cancelBlockingEngine {
118
+	return &cancelBlockingEngine{
119
+		logs:    make(chan engine.LogChunk),
120
+		started: make(chan struct{}),
121
+	}
122
+}
123
+
124
+func (f *cancelBlockingEngine) Execute(ctx context.Context, job engine.Job) (engine.Outcome, error) {
125
+	f.job = job
126
+	close(f.started)
127
+	<-ctx.Done()
128
+	close(f.logs)
129
+	now := time.Date(2026, 5, 10, 21, 0, 1, 0, time.UTC)
130
+	return engine.Outcome{
131
+		Conclusion:  engine.ConclusionCancelled,
132
+		StartedAt:   now.Add(-time.Second),
133
+		CompletedAt: now,
134
+	}, ctx.Err()
135
+}
136
+
137
+func (f *cancelBlockingEngine) StreamLogs(_ context.Context, _ int64) (<-chan engine.LogChunk, error) {
138
+	return f.logs, nil
139
+}
140
+
141
+func (f *cancelBlockingEngine) Cancel(_ context.Context, _ int64) error {
142
+	f.cancelled = true
143
+	return nil
144
+}
145
+
98146
 type fakeWorkspaces struct {
99147
 	dir     string
100148
 	removed bool
@@ -300,6 +348,50 @@ func TestRunOnce_EngineFailureStillCompletesJob(t *testing.T) {
300348
 	}
301349
 }
302350
 
351
+func TestRunOnce_CancelCheckStopsEngineAndMarksJobCancelled(t *testing.T) {
352
+	t.Parallel()
353
+	now := time.Date(2026, 5, 10, 21, 0, 0, 0, time.UTC)
354
+	fapi := &fakeAPI{
355
+		claim:     &api.Claim{Token: "job-token", Job: api.Job{ID: 10, RunID: 20}},
356
+		cancelled: true,
357
+	}
358
+	fengine := newCancelBlockingEngine()
359
+	r := New(Options{
360
+		API:                fapi,
361
+		Engine:             fengine,
362
+		Workspaces:         &fakeWorkspaces{dir: "/tmp/workspace"},
363
+		Clock:              func() time.Time { return now },
364
+		CancelPollInterval: time.Nanosecond,
365
+		Sleep: func(ctx context.Context, _ time.Duration) error {
366
+			select {
367
+			case <-ctx.Done():
368
+				return ctx.Err()
369
+			default:
370
+				return nil
371
+			}
372
+		},
373
+	})
374
+	claimed, err := r.RunOnce(t.Context())
375
+	if err != nil {
376
+		t.Fatalf("RunOnce: %v", err)
377
+	}
378
+	if !claimed {
379
+		t.Fatal("claimed = false")
380
+	}
381
+	if !fengine.cancelled {
382
+		t.Fatal("engine Cancel was not called")
383
+	}
384
+	if fapi.cancelChecks == 0 {
385
+		t.Fatal("cancel-check was not called")
386
+	}
387
+	if len(fapi.statuses) != 2 ||
388
+		fapi.statuses[0].Status != "running" ||
389
+		fapi.statuses[1].Status != "cancelled" ||
390
+		fapi.statuses[1].Conclusion != engine.ConclusionCancelled {
391
+		t.Fatalf("statuses: %#v", fapi.statuses)
392
+	}
393
+}
394
+
303395
 func TestRunOnce_PrepareFailureMarksJobFailed(t *testing.T) {
304396
 	t.Parallel()
305397
 	fapi := &fakeAPI{claim: &api.Claim{Token: "job-token", Job: api.Job{ID: 10, RunID: 20}}}