| 1 | // SPDX-License-Identifier: AGPL-3.0-or-later |
| 2 | |
| 3 | package runner |
| 4 | |
| 5 | import ( |
| 6 | "context" |
| 7 | "errors" |
| 8 | "strconv" |
| 9 | "testing" |
| 10 | "time" |
| 11 | |
| 12 | "github.com/tenseleyFlow/shithub/internal/runner/api" |
| 13 | "github.com/tenseleyFlow/shithub/internal/runner/engine" |
| 14 | ) |
| 15 | |
| 16 | type fakeAPI struct { |
| 17 | claim *api.Claim |
| 18 | statuses []api.StatusRequest |
| 19 | stepStatuses []api.StatusRequest |
| 20 | logs []api.LogRequest |
| 21 | cancelChecks int |
| 22 | cancelled bool |
| 23 | tokens []string |
| 24 | next int |
| 25 | } |
| 26 | |
| 27 | func (f *fakeAPI) Heartbeat(_ context.Context, _ api.HeartbeatRequest) (*api.Claim, error) { |
| 28 | return f.claim, nil |
| 29 | } |
| 30 | |
| 31 | func (f *fakeAPI) UpdateStatus(_ context.Context, _ int64, token string, req api.StatusRequest) (api.StatusResponse, error) { |
| 32 | f.tokens = append(f.tokens, token) |
| 33 | f.statuses = append(f.statuses, req) |
| 34 | if req.Status == "running" { |
| 35 | return api.StatusResponse{NextToken: f.nextToken()}, nil |
| 36 | } |
| 37 | return api.StatusResponse{}, nil |
| 38 | } |
| 39 | |
| 40 | func (f *fakeAPI) UpdateStepStatus(_ context.Context, _, _ int64, token string, req api.StatusRequest) (api.StepStatusResponse, error) { |
| 41 | f.tokens = append(f.tokens, token) |
| 42 | f.stepStatuses = append(f.stepStatuses, req) |
| 43 | return api.StepStatusResponse{NextToken: f.nextToken()}, nil |
| 44 | } |
| 45 | |
| 46 | func (f *fakeAPI) AppendLog(_ context.Context, _ int64, token string, req api.LogRequest) (api.LogResponse, error) { |
| 47 | f.tokens = append(f.tokens, token) |
| 48 | f.logs = append(f.logs, req) |
| 49 | return api.LogResponse{NextToken: f.nextToken()}, nil |
| 50 | } |
| 51 | |
| 52 | func (f *fakeAPI) CancelCheck(_ context.Context, _ int64, token string) (api.CancelCheckResponse, error) { |
| 53 | f.tokens = append(f.tokens, token) |
| 54 | f.cancelChecks++ |
| 55 | return api.CancelCheckResponse{Cancelled: f.cancelled, NextToken: f.nextToken()}, nil |
| 56 | } |
| 57 | |
| 58 | func (f *fakeAPI) nextToken() string { |
| 59 | f.next++ |
| 60 | return "next-token-" + strconv.Itoa(f.next) |
| 61 | } |
| 62 | |
| 63 | type fakeEngine struct { |
| 64 | job engine.Job |
| 65 | out engine.Outcome |
| 66 | logs []engine.LogChunk |
| 67 | err error |
| 68 | cancelled bool |
| 69 | } |
| 70 | |
| 71 | func (f *fakeEngine) Execute(_ context.Context, job engine.Job) (engine.Outcome, error) { |
| 72 | f.job = job |
| 73 | return f.out, f.err |
| 74 | } |
| 75 | |
| 76 | func (f *fakeEngine) StreamLogs(_ context.Context, _ int64) (<-chan engine.LogChunk, error) { |
| 77 | ch := make(chan engine.LogChunk, len(f.logs)) |
| 78 | for _, chunk := range f.logs { |
| 79 | ch <- chunk |
| 80 | } |
| 81 | close(ch) |
| 82 | return ch, nil |
| 83 | } |
| 84 | |
| 85 | func (f *fakeEngine) Cancel(_ context.Context, _ int64) error { |
| 86 | f.cancelled = true |
| 87 | return nil |
| 88 | } |
| 89 | |
| 90 | type fakeEventEngine struct { |
| 91 | fakeEngine |
| 92 | events []engine.Event |
| 93 | ch chan engine.Event |
| 94 | } |
| 95 | |
| 96 | func (f *fakeEventEngine) StreamEvents(_ context.Context, _ int64) (<-chan engine.Event, error) { |
| 97 | f.ch = make(chan engine.Event, len(f.events)) |
| 98 | return f.ch, nil |
| 99 | } |
| 100 | |
| 101 | func (f *fakeEventEngine) Execute(ctx context.Context, job engine.Job) (engine.Outcome, error) { |
| 102 | out, err := f.fakeEngine.Execute(ctx, job) |
| 103 | for _, event := range f.events { |
| 104 | f.ch <- event |
| 105 | } |
| 106 | close(f.ch) |
| 107 | return out, err |
| 108 | } |
| 109 | |
| 110 | type cancelBlockingEngine struct { |
| 111 | job engine.Job |
| 112 | logs chan engine.LogChunk |
| 113 | started chan struct{} |
| 114 | cancelled bool |
| 115 | } |
| 116 | |
| 117 | func newCancelBlockingEngine() *cancelBlockingEngine { |
| 118 | return &cancelBlockingEngine{ |
| 119 | logs: make(chan engine.LogChunk), |
| 120 | started: make(chan struct{}), |
| 121 | } |
| 122 | } |
| 123 | |
| 124 | func (f *cancelBlockingEngine) Execute(ctx context.Context, job engine.Job) (engine.Outcome, error) { |
| 125 | f.job = job |
| 126 | close(f.started) |
| 127 | <-ctx.Done() |
| 128 | close(f.logs) |
| 129 | now := time.Date(2026, 5, 10, 21, 0, 1, 0, time.UTC) |
| 130 | return engine.Outcome{ |
| 131 | Conclusion: engine.ConclusionCancelled, |
| 132 | StartedAt: now.Add(-time.Second), |
| 133 | CompletedAt: now, |
| 134 | }, ctx.Err() |
| 135 | } |
| 136 | |
| 137 | func (f *cancelBlockingEngine) StreamLogs(_ context.Context, _ int64) (<-chan engine.LogChunk, error) { |
| 138 | return f.logs, nil |
| 139 | } |
| 140 | |
| 141 | func (f *cancelBlockingEngine) Cancel(_ context.Context, _ int64) error { |
| 142 | f.cancelled = true |
| 143 | return nil |
| 144 | } |
| 145 | |
| 146 | type fakeWorkspaces struct { |
| 147 | dir string |
| 148 | removed bool |
| 149 | err error |
| 150 | } |
| 151 | |
| 152 | func (f *fakeWorkspaces) Prepare(_, _ int64) (string, error) { |
| 153 | return f.dir, f.err |
| 154 | } |
| 155 | |
| 156 | func (f *fakeWorkspaces) Remove(_, _ int64) error { |
| 157 | f.removed = true |
| 158 | return nil |
| 159 | } |
| 160 | |
| 161 | func TestRunOnce_NoClaim(t *testing.T) { |
| 162 | t.Parallel() |
| 163 | r := New(Options{API: &fakeAPI{}, Engine: &fakeEngine{}, Workspaces: &fakeWorkspaces{}}) |
| 164 | claimed, err := r.RunOnce(t.Context()) |
| 165 | if err != nil { |
| 166 | t.Fatalf("RunOnce: %v", err) |
| 167 | } |
| 168 | if claimed { |
| 169 | t.Fatal("claimed = true") |
| 170 | } |
| 171 | } |
| 172 | |
| 173 | func TestRunOnce_ExecutesAndCompletesSuccess(t *testing.T) { |
| 174 | t.Parallel() |
| 175 | now := time.Date(2026, 5, 10, 21, 0, 0, 0, time.UTC) |
| 176 | claim := &api.Claim{ |
| 177 | Token: "job-token", |
| 178 | Job: api.Job{ |
| 179 | ID: 10, |
| 180 | RunID: 20, |
| 181 | Env: map[string]string{"A": "B"}, |
| 182 | Steps: []api.Step{{ID: 30, Run: "echo hi"}}, |
| 183 | }, |
| 184 | } |
| 185 | fapi := &fakeAPI{claim: claim} |
| 186 | fengine := &fakeEngine{out: engine.Outcome{Conclusion: engine.ConclusionSuccess, StartedAt: now, CompletedAt: now.Add(time.Second)}} |
| 187 | workspaces := &fakeWorkspaces{dir: "/tmp/workspace"} |
| 188 | r := New(Options{ |
| 189 | API: fapi, |
| 190 | Engine: fengine, |
| 191 | Workspaces: workspaces, |
| 192 | DefaultImage: "runner-image", |
| 193 | Clock: func() time.Time { return now }, |
| 194 | }) |
| 195 | claimed, err := r.RunOnce(t.Context()) |
| 196 | if err != nil { |
| 197 | t.Fatalf("RunOnce: %v", err) |
| 198 | } |
| 199 | if !claimed { |
| 200 | t.Fatal("claimed = false") |
| 201 | } |
| 202 | if fengine.job.WorkspaceDir != "/tmp/workspace" || fengine.job.Image != "runner-image" { |
| 203 | t.Fatalf("engine job: %#v", fengine.job) |
| 204 | } |
| 205 | if len(fapi.statuses) != 2 { |
| 206 | t.Fatalf("statuses: %#v", fapi.statuses) |
| 207 | } |
| 208 | if fapi.statuses[0].Status != "running" || fapi.statuses[1].Conclusion != engine.ConclusionSuccess { |
| 209 | t.Fatalf("statuses: %#v", fapi.statuses) |
| 210 | } |
| 211 | if len(fapi.tokens) != 2 || fapi.tokens[0] != "job-token" || fapi.tokens[1] != "next-token-1" { |
| 212 | t.Fatalf("tokens: %#v", fapi.tokens) |
| 213 | } |
| 214 | if !workspaces.removed { |
| 215 | t.Fatal("workspace was not removed") |
| 216 | } |
| 217 | } |
| 218 | |
| 219 | func TestRunOnce_StreamsLogsAndMarksSteps(t *testing.T) { |
| 220 | t.Parallel() |
| 221 | now := time.Date(2026, 5, 10, 21, 0, 0, 0, time.UTC) |
| 222 | claim := &api.Claim{ |
| 223 | Token: "job-token", |
| 224 | Job: api.Job{ |
| 225 | ID: 10, |
| 226 | RunID: 20, |
| 227 | Steps: []api.Step{{ID: 30, Run: "echo hi"}}, |
| 228 | }, |
| 229 | } |
| 230 | fapi := &fakeAPI{claim: claim} |
| 231 | fengine := &fakeEngine{ |
| 232 | out: engine.Outcome{ |
| 233 | Conclusion: engine.ConclusionSuccess, |
| 234 | StartedAt: now, |
| 235 | CompletedAt: now.Add(time.Second), |
| 236 | StepOutcomes: []engine.StepOutcome{{ |
| 237 | StepID: 30, |
| 238 | Status: "completed", |
| 239 | Conclusion: engine.ConclusionSuccess, |
| 240 | StartedAt: now, |
| 241 | CompletedAt: now.Add(500 * time.Millisecond), |
| 242 | }}, |
| 243 | }, |
| 244 | logs: []engine.LogChunk{{ |
| 245 | JobID: 10, |
| 246 | StepID: 30, |
| 247 | Seq: 0, |
| 248 | Chunk: []byte("hello\n"), |
| 249 | }}, |
| 250 | } |
| 251 | r := New(Options{ |
| 252 | API: fapi, |
| 253 | Engine: fengine, |
| 254 | Workspaces: &fakeWorkspaces{dir: "/tmp/workspace"}, |
| 255 | Clock: func() time.Time { return now }, |
| 256 | }) |
| 257 | claimed, err := r.RunOnce(t.Context()) |
| 258 | if err != nil { |
| 259 | t.Fatalf("RunOnce: %v", err) |
| 260 | } |
| 261 | if !claimed { |
| 262 | t.Fatal("claimed = false") |
| 263 | } |
| 264 | if len(fapi.logs) != 1 || fapi.logs[0].StepID != 30 || string(fapi.logs[0].Chunk) != "hello\n" { |
| 265 | t.Fatalf("logs: %#v", fapi.logs) |
| 266 | } |
| 267 | if len(fapi.stepStatuses) != 1 || fapi.stepStatuses[0].Status != "completed" || |
| 268 | fapi.stepStatuses[0].Conclusion != engine.ConclusionSuccess { |
| 269 | t.Fatalf("step statuses: %#v", fapi.stepStatuses) |
| 270 | } |
| 271 | wantTokens := []string{"job-token", "next-token-1", "next-token-2", "next-token-3"} |
| 272 | if len(fapi.tokens) != len(wantTokens) { |
| 273 | t.Fatalf("tokens: got %#v, want %#v", fapi.tokens, wantTokens) |
| 274 | } |
| 275 | for i, want := range wantTokens { |
| 276 | if fapi.tokens[i] != want { |
| 277 | t.Fatalf("tokens[%d]: got %q, want %q; all=%#v", i, fapi.tokens[i], want, fapi.tokens) |
| 278 | } |
| 279 | } |
| 280 | } |
| 281 | |
| 282 | func TestRunOnce_PrefersOrderedEventStream(t *testing.T) { |
| 283 | t.Parallel() |
| 284 | now := time.Date(2026, 5, 10, 21, 0, 0, 0, time.UTC) |
| 285 | logEvent := engine.LogChunk{JobID: 10, StepID: 30, Seq: 0, Chunk: []byte("hello\n")} |
| 286 | stepEvent := engine.StepOutcome{ |
| 287 | StepID: 30, |
| 288 | Status: "completed", |
| 289 | Conclusion: engine.ConclusionSuccess, |
| 290 | StartedAt: now, |
| 291 | CompletedAt: now.Add(500 * time.Millisecond), |
| 292 | } |
| 293 | fapi := &fakeAPI{claim: &api.Claim{ |
| 294 | Token: "job-token", |
| 295 | Job: api.Job{ID: 10, RunID: 20, Steps: []api.Step{{ID: 30, Run: "echo hi"}}}, |
| 296 | }} |
| 297 | fengine := &fakeEventEngine{ |
| 298 | fakeEngine: fakeEngine{out: engine.Outcome{ |
| 299 | Conclusion: engine.ConclusionSuccess, |
| 300 | StartedAt: now, |
| 301 | CompletedAt: now.Add(time.Second), |
| 302 | StepOutcomes: []engine.StepOutcome{stepEvent}, |
| 303 | }}, |
| 304 | events: []engine.Event{{Log: &logEvent}, {Step: &stepEvent}}, |
| 305 | } |
| 306 | r := New(Options{ |
| 307 | API: fapi, |
| 308 | Engine: fengine, |
| 309 | Workspaces: &fakeWorkspaces{dir: "/tmp/workspace"}, |
| 310 | Clock: func() time.Time { return now }, |
| 311 | }) |
| 312 | if _, err := r.RunOnce(t.Context()); err != nil { |
| 313 | t.Fatalf("RunOnce: %v", err) |
| 314 | } |
| 315 | if len(fapi.logs) != 1 || len(fapi.stepStatuses) != 1 { |
| 316 | t.Fatalf("logs=%#v stepStatuses=%#v", fapi.logs, fapi.stepStatuses) |
| 317 | } |
| 318 | wantTokens := []string{"job-token", "next-token-1", "next-token-2", "next-token-3"} |
| 319 | if len(fapi.tokens) != len(wantTokens) { |
| 320 | t.Fatalf("tokens: got %#v, want %#v", fapi.tokens, wantTokens) |
| 321 | } |
| 322 | for i, want := range wantTokens { |
| 323 | if fapi.tokens[i] != want { |
| 324 | t.Fatalf("tokens[%d]: got %q, want %q; all=%#v", i, fapi.tokens[i], want, fapi.tokens) |
| 325 | } |
| 326 | } |
| 327 | } |
| 328 | |
| 329 | func TestRunOnce_EngineFailureStillCompletesJob(t *testing.T) { |
| 330 | t.Parallel() |
| 331 | now := time.Date(2026, 5, 10, 21, 0, 0, 0, time.UTC) |
| 332 | fapi := &fakeAPI{claim: &api.Claim{Token: "job-token", Job: api.Job{ID: 10, RunID: 20}}} |
| 333 | fengine := &fakeEngine{ |
| 334 | out: engine.Outcome{Conclusion: engine.ConclusionFailure, StartedAt: now, CompletedAt: now.Add(time.Second)}, |
| 335 | err: errors.New("exit 1"), |
| 336 | } |
| 337 | r := New(Options{ |
| 338 | API: fapi, |
| 339 | Engine: fengine, |
| 340 | Workspaces: &fakeWorkspaces{dir: "/tmp/workspace"}, |
| 341 | Clock: func() time.Time { return now }, |
| 342 | }) |
| 343 | if _, err := r.RunOnce(t.Context()); err != nil { |
| 344 | t.Fatalf("RunOnce: %v", err) |
| 345 | } |
| 346 | if fapi.statuses[1].Conclusion != engine.ConclusionFailure { |
| 347 | t.Fatalf("completion: %#v", fapi.statuses[1]) |
| 348 | } |
| 349 | } |
| 350 | |
| 351 | func TestRunOnce_CancelCheckStopsEngineAndMarksJobCancelled(t *testing.T) { |
| 352 | t.Parallel() |
| 353 | now := time.Date(2026, 5, 10, 21, 0, 0, 0, time.UTC) |
| 354 | fapi := &fakeAPI{ |
| 355 | claim: &api.Claim{Token: "job-token", Job: api.Job{ID: 10, RunID: 20}}, |
| 356 | cancelled: true, |
| 357 | } |
| 358 | fengine := newCancelBlockingEngine() |
| 359 | r := New(Options{ |
| 360 | API: fapi, |
| 361 | Engine: fengine, |
| 362 | Workspaces: &fakeWorkspaces{dir: "/tmp/workspace"}, |
| 363 | Clock: func() time.Time { return now }, |
| 364 | CancelPollInterval: time.Nanosecond, |
| 365 | Sleep: func(ctx context.Context, _ time.Duration) error { |
| 366 | select { |
| 367 | case <-ctx.Done(): |
| 368 | return ctx.Err() |
| 369 | default: |
| 370 | return nil |
| 371 | } |
| 372 | }, |
| 373 | }) |
| 374 | claimed, err := r.RunOnce(t.Context()) |
| 375 | if err != nil { |
| 376 | t.Fatalf("RunOnce: %v", err) |
| 377 | } |
| 378 | if !claimed { |
| 379 | t.Fatal("claimed = false") |
| 380 | } |
| 381 | if !fengine.cancelled { |
| 382 | t.Fatal("engine Cancel was not called") |
| 383 | } |
| 384 | if fapi.cancelChecks == 0 { |
| 385 | t.Fatal("cancel-check was not called") |
| 386 | } |
| 387 | if len(fapi.statuses) != 2 || |
| 388 | fapi.statuses[0].Status != "running" || |
| 389 | fapi.statuses[1].Status != "cancelled" || |
| 390 | fapi.statuses[1].Conclusion != engine.ConclusionCancelled { |
| 391 | t.Fatalf("statuses: %#v", fapi.statuses) |
| 392 | } |
| 393 | } |
| 394 | |
| 395 | func TestRunOnce_PrepareFailureMarksJobFailed(t *testing.T) { |
| 396 | t.Parallel() |
| 397 | fapi := &fakeAPI{claim: &api.Claim{Token: "job-token", Job: api.Job{ID: 10, RunID: 20}}} |
| 398 | r := New(Options{ |
| 399 | API: fapi, |
| 400 | Engine: &fakeEngine{}, |
| 401 | Workspaces: &fakeWorkspaces{err: errors.New("disk full")}, |
| 402 | Clock: func() time.Time { return time.Date(2026, 5, 10, 21, 0, 0, 0, time.UTC) }, |
| 403 | }) |
| 404 | claimed, err := r.RunOnce(t.Context()) |
| 405 | if err == nil { |
| 406 | t.Fatal("RunOnce returned nil error") |
| 407 | } |
| 408 | if !claimed { |
| 409 | t.Fatal("claimed = false") |
| 410 | } |
| 411 | if len(fapi.statuses) != 1 || fapi.statuses[0].Conclusion != engine.ConclusionFailure { |
| 412 | t.Fatalf("statuses: %#v", fapi.statuses) |
| 413 | } |
| 414 | } |
| 415 |