| 1 | // SPDX-License-Identifier: AGPL-3.0-or-later |
| 2 | |
| 3 | package runner |
| 4 | |
| 5 | import ( |
| 6 | "context" |
| 7 | "errors" |
| 8 | "strconv" |
| 9 | "testing" |
| 10 | "time" |
| 11 | |
| 12 | "github.com/tenseleyFlow/shithub/internal/runner/api" |
| 13 | "github.com/tenseleyFlow/shithub/internal/runner/engine" |
| 14 | ) |
| 15 | |
| 16 | type fakeAPI struct { |
| 17 | claim *api.Claim |
| 18 | heartbeats []api.HeartbeatRequest |
| 19 | statuses []api.StatusRequest |
| 20 | stepStatuses []api.StatusRequest |
| 21 | logs []api.LogRequest |
| 22 | cancelChecks int |
| 23 | cancelled bool |
| 24 | tokens []string |
| 25 | next int |
| 26 | } |
| 27 | |
| 28 | func (f *fakeAPI) Heartbeat(_ context.Context, req api.HeartbeatRequest) (*api.Claim, error) { |
| 29 | f.heartbeats = append(f.heartbeats, req) |
| 30 | return f.claim, nil |
| 31 | } |
| 32 | |
| 33 | func (f *fakeAPI) UpdateStatus(_ context.Context, _ int64, token string, req api.StatusRequest) (api.StatusResponse, error) { |
| 34 | f.tokens = append(f.tokens, token) |
| 35 | f.statuses = append(f.statuses, req) |
| 36 | if req.Status == "running" { |
| 37 | return api.StatusResponse{NextToken: f.nextToken()}, nil |
| 38 | } |
| 39 | return api.StatusResponse{}, nil |
| 40 | } |
| 41 | |
| 42 | func (f *fakeAPI) UpdateStepStatus(_ context.Context, _, _ int64, token string, req api.StatusRequest) (api.StepStatusResponse, error) { |
| 43 | f.tokens = append(f.tokens, token) |
| 44 | f.stepStatuses = append(f.stepStatuses, req) |
| 45 | return api.StepStatusResponse{NextToken: f.nextToken()}, nil |
| 46 | } |
| 47 | |
| 48 | func (f *fakeAPI) AppendLog(_ context.Context, _ int64, token string, req api.LogRequest) (api.LogResponse, error) { |
| 49 | f.tokens = append(f.tokens, token) |
| 50 | f.logs = append(f.logs, req) |
| 51 | return api.LogResponse{NextToken: f.nextToken()}, nil |
| 52 | } |
| 53 | |
| 54 | func (f *fakeAPI) CancelCheck(_ context.Context, _ int64, token string) (api.CancelCheckResponse, error) { |
| 55 | f.tokens = append(f.tokens, token) |
| 56 | f.cancelChecks++ |
| 57 | return api.CancelCheckResponse{Cancelled: f.cancelled, NextToken: f.nextToken()}, nil |
| 58 | } |
| 59 | |
| 60 | func (f *fakeAPI) nextToken() string { |
| 61 | f.next++ |
| 62 | return "next-token-" + strconv.Itoa(f.next) |
| 63 | } |
| 64 | |
| 65 | type fakeEngine struct { |
| 66 | job engine.Job |
| 67 | out engine.Outcome |
| 68 | logs []engine.LogChunk |
| 69 | err error |
| 70 | cancelled bool |
| 71 | } |
| 72 | |
| 73 | func (f *fakeEngine) Execute(_ context.Context, job engine.Job) (engine.Outcome, error) { |
| 74 | f.job = job |
| 75 | return f.out, f.err |
| 76 | } |
| 77 | |
| 78 | func (f *fakeEngine) StreamLogs(_ context.Context, _ int64) (<-chan engine.LogChunk, error) { |
| 79 | ch := make(chan engine.LogChunk, len(f.logs)) |
| 80 | for _, chunk := range f.logs { |
| 81 | ch <- chunk |
| 82 | } |
| 83 | close(ch) |
| 84 | return ch, nil |
| 85 | } |
| 86 | |
| 87 | func (f *fakeEngine) Cancel(_ context.Context, _ int64) error { |
| 88 | f.cancelled = true |
| 89 | return nil |
| 90 | } |
| 91 | |
| 92 | type fakeEventEngine struct { |
| 93 | fakeEngine |
| 94 | events []engine.Event |
| 95 | ch chan engine.Event |
| 96 | } |
| 97 | |
| 98 | func (f *fakeEventEngine) StreamEvents(_ context.Context, _ int64) (<-chan engine.Event, error) { |
| 99 | f.ch = make(chan engine.Event, len(f.events)) |
| 100 | return f.ch, nil |
| 101 | } |
| 102 | |
| 103 | func (f *fakeEventEngine) Execute(ctx context.Context, job engine.Job) (engine.Outcome, error) { |
| 104 | out, err := f.fakeEngine.Execute(ctx, job) |
| 105 | for _, event := range f.events { |
| 106 | f.ch <- event |
| 107 | } |
| 108 | close(f.ch) |
| 109 | return out, err |
| 110 | } |
| 111 | |
| 112 | type cancelBlockingEngine struct { |
| 113 | job engine.Job |
| 114 | logs chan engine.LogChunk |
| 115 | started chan struct{} |
| 116 | cancelled bool |
| 117 | } |
| 118 | |
| 119 | func newCancelBlockingEngine() *cancelBlockingEngine { |
| 120 | return &cancelBlockingEngine{ |
| 121 | logs: make(chan engine.LogChunk), |
| 122 | started: make(chan struct{}), |
| 123 | } |
| 124 | } |
| 125 | |
| 126 | func (f *cancelBlockingEngine) Execute(ctx context.Context, job engine.Job) (engine.Outcome, error) { |
| 127 | f.job = job |
| 128 | close(f.started) |
| 129 | <-ctx.Done() |
| 130 | close(f.logs) |
| 131 | now := time.Date(2026, 5, 10, 21, 0, 1, 0, time.UTC) |
| 132 | return engine.Outcome{ |
| 133 | Conclusion: engine.ConclusionCancelled, |
| 134 | StartedAt: now.Add(-time.Second), |
| 135 | CompletedAt: now, |
| 136 | }, ctx.Err() |
| 137 | } |
| 138 | |
| 139 | func (f *cancelBlockingEngine) StreamLogs(_ context.Context, _ int64) (<-chan engine.LogChunk, error) { |
| 140 | return f.logs, nil |
| 141 | } |
| 142 | |
| 143 | func (f *cancelBlockingEngine) Cancel(_ context.Context, _ int64) error { |
| 144 | f.cancelled = true |
| 145 | return nil |
| 146 | } |
| 147 | |
| 148 | type fakeWorkspaces struct { |
| 149 | dir string |
| 150 | removed bool |
| 151 | err error |
| 152 | } |
| 153 | |
| 154 | func (f *fakeWorkspaces) Prepare(_, _ int64) (string, error) { |
| 155 | return f.dir, f.err |
| 156 | } |
| 157 | |
| 158 | func (f *fakeWorkspaces) Remove(_, _ int64) error { |
| 159 | f.removed = true |
| 160 | return nil |
| 161 | } |
| 162 | |
| 163 | func TestRunOnce_NoClaim(t *testing.T) { |
| 164 | t.Parallel() |
| 165 | fapi := &fakeAPI{} |
| 166 | r := New(Options{ |
| 167 | API: fapi, |
| 168 | Engine: &fakeEngine{}, |
| 169 | Workspaces: &fakeWorkspaces{}, |
| 170 | Labels: []string{"self-hosted", "linux"}, |
| 171 | Capacity: 2, |
| 172 | HostName: "runner-host", |
| 173 | Version: "dev-test", |
| 174 | }) |
| 175 | claimed, err := r.RunOnce(t.Context()) |
| 176 | if err != nil { |
| 177 | t.Fatalf("RunOnce: %v", err) |
| 178 | } |
| 179 | if claimed { |
| 180 | t.Fatal("claimed = true") |
| 181 | } |
| 182 | if len(fapi.heartbeats) != 1 { |
| 183 | t.Fatalf("heartbeats: %#v", fapi.heartbeats) |
| 184 | } |
| 185 | got := fapi.heartbeats[0] |
| 186 | if got.Capacity != 2 || got.HostName != "runner-host" || got.Version != "dev-test" || |
| 187 | len(got.Labels) != 2 || got.Labels[0] != "self-hosted" || got.Labels[1] != "linux" { |
| 188 | t.Fatalf("heartbeat: %#v", got) |
| 189 | } |
| 190 | } |
| 191 | |
| 192 | func TestRunOnce_ExecutesAndCompletesSuccess(t *testing.T) { |
| 193 | t.Parallel() |
| 194 | now := time.Date(2026, 5, 10, 21, 0, 0, 0, time.UTC) |
| 195 | claim := &api.Claim{ |
| 196 | Token: "job-token", |
| 197 | Job: api.Job{ |
| 198 | ID: 10, |
| 199 | RunID: 20, |
| 200 | Env: map[string]string{"A": "B"}, |
| 201 | Steps: []api.Step{{ID: 30, Run: "echo hi"}}, |
| 202 | }, |
| 203 | } |
| 204 | fapi := &fakeAPI{claim: claim} |
| 205 | fengine := &fakeEngine{out: engine.Outcome{Conclusion: engine.ConclusionSuccess, StartedAt: now, CompletedAt: now.Add(time.Second)}} |
| 206 | workspaces := &fakeWorkspaces{dir: "/tmp/workspace"} |
| 207 | r := New(Options{ |
| 208 | API: fapi, |
| 209 | Engine: fengine, |
| 210 | Workspaces: workspaces, |
| 211 | DefaultImage: "runner-image", |
| 212 | Clock: func() time.Time { return now }, |
| 213 | }) |
| 214 | claimed, err := r.RunOnce(t.Context()) |
| 215 | if err != nil { |
| 216 | t.Fatalf("RunOnce: %v", err) |
| 217 | } |
| 218 | if !claimed { |
| 219 | t.Fatal("claimed = false") |
| 220 | } |
| 221 | if fengine.job.WorkspaceDir != "/tmp/workspace" || fengine.job.Image != "runner-image" { |
| 222 | t.Fatalf("engine job: %#v", fengine.job) |
| 223 | } |
| 224 | if len(fapi.statuses) != 2 { |
| 225 | t.Fatalf("statuses: %#v", fapi.statuses) |
| 226 | } |
| 227 | if fapi.statuses[0].Status != "running" || fapi.statuses[1].Conclusion != engine.ConclusionSuccess { |
| 228 | t.Fatalf("statuses: %#v", fapi.statuses) |
| 229 | } |
| 230 | if len(fapi.tokens) != 2 || fapi.tokens[0] != "job-token" || fapi.tokens[1] != "next-token-1" { |
| 231 | t.Fatalf("tokens: %#v", fapi.tokens) |
| 232 | } |
| 233 | if !workspaces.removed { |
| 234 | t.Fatal("workspace was not removed") |
| 235 | } |
| 236 | } |
| 237 | |
| 238 | func TestRunOnce_StreamsLogsAndMarksSteps(t *testing.T) { |
| 239 | t.Parallel() |
| 240 | now := time.Date(2026, 5, 10, 21, 0, 0, 0, time.UTC) |
| 241 | claim := &api.Claim{ |
| 242 | Token: "job-token", |
| 243 | Job: api.Job{ |
| 244 | ID: 10, |
| 245 | RunID: 20, |
| 246 | Steps: []api.Step{{ID: 30, Run: "echo hi"}}, |
| 247 | }, |
| 248 | } |
| 249 | fapi := &fakeAPI{claim: claim} |
| 250 | fengine := &fakeEngine{ |
| 251 | out: engine.Outcome{ |
| 252 | Conclusion: engine.ConclusionSuccess, |
| 253 | StartedAt: now, |
| 254 | CompletedAt: now.Add(time.Second), |
| 255 | StepOutcomes: []engine.StepOutcome{{ |
| 256 | StepID: 30, |
| 257 | Status: "completed", |
| 258 | Conclusion: engine.ConclusionSuccess, |
| 259 | StartedAt: now, |
| 260 | CompletedAt: now.Add(500 * time.Millisecond), |
| 261 | }}, |
| 262 | }, |
| 263 | logs: []engine.LogChunk{{ |
| 264 | JobID: 10, |
| 265 | StepID: 30, |
| 266 | Seq: 0, |
| 267 | Chunk: []byte("hello\n"), |
| 268 | }}, |
| 269 | } |
| 270 | r := New(Options{ |
| 271 | API: fapi, |
| 272 | Engine: fengine, |
| 273 | Workspaces: &fakeWorkspaces{dir: "/tmp/workspace"}, |
| 274 | Clock: func() time.Time { return now }, |
| 275 | }) |
| 276 | claimed, err := r.RunOnce(t.Context()) |
| 277 | if err != nil { |
| 278 | t.Fatalf("RunOnce: %v", err) |
| 279 | } |
| 280 | if !claimed { |
| 281 | t.Fatal("claimed = false") |
| 282 | } |
| 283 | if len(fapi.logs) != 1 || fapi.logs[0].StepID != 30 || string(fapi.logs[0].Chunk) != "hello\n" { |
| 284 | t.Fatalf("logs: %#v", fapi.logs) |
| 285 | } |
| 286 | if len(fapi.stepStatuses) != 1 || fapi.stepStatuses[0].Status != "completed" || |
| 287 | fapi.stepStatuses[0].Conclusion != engine.ConclusionSuccess { |
| 288 | t.Fatalf("step statuses: %#v", fapi.stepStatuses) |
| 289 | } |
| 290 | wantTokens := []string{"job-token", "next-token-1", "next-token-2", "next-token-3"} |
| 291 | if len(fapi.tokens) != len(wantTokens) { |
| 292 | t.Fatalf("tokens: got %#v, want %#v", fapi.tokens, wantTokens) |
| 293 | } |
| 294 | for i, want := range wantTokens { |
| 295 | if fapi.tokens[i] != want { |
| 296 | t.Fatalf("tokens[%d]: got %q, want %q; all=%#v", i, fapi.tokens[i], want, fapi.tokens) |
| 297 | } |
| 298 | } |
| 299 | } |
| 300 | |
| 301 | func TestRunOnce_PrefersOrderedEventStream(t *testing.T) { |
| 302 | t.Parallel() |
| 303 | now := time.Date(2026, 5, 10, 21, 0, 0, 0, time.UTC) |
| 304 | logEvent := engine.LogChunk{JobID: 10, StepID: 30, Seq: 0, Chunk: []byte("hello\n")} |
| 305 | stepEvent := engine.StepOutcome{ |
| 306 | StepID: 30, |
| 307 | Status: "completed", |
| 308 | Conclusion: engine.ConclusionSuccess, |
| 309 | StartedAt: now, |
| 310 | CompletedAt: now.Add(500 * time.Millisecond), |
| 311 | } |
| 312 | fapi := &fakeAPI{claim: &api.Claim{ |
| 313 | Token: "job-token", |
| 314 | Job: api.Job{ID: 10, RunID: 20, Steps: []api.Step{{ID: 30, Run: "echo hi"}}}, |
| 315 | }} |
| 316 | fengine := &fakeEventEngine{ |
| 317 | fakeEngine: fakeEngine{out: engine.Outcome{ |
| 318 | Conclusion: engine.ConclusionSuccess, |
| 319 | StartedAt: now, |
| 320 | CompletedAt: now.Add(time.Second), |
| 321 | StepOutcomes: []engine.StepOutcome{stepEvent}, |
| 322 | }}, |
| 323 | events: []engine.Event{{Log: &logEvent}, {Step: &stepEvent}}, |
| 324 | } |
| 325 | r := New(Options{ |
| 326 | API: fapi, |
| 327 | Engine: fengine, |
| 328 | Workspaces: &fakeWorkspaces{dir: "/tmp/workspace"}, |
| 329 | Clock: func() time.Time { return now }, |
| 330 | }) |
| 331 | if _, err := r.RunOnce(t.Context()); err != nil { |
| 332 | t.Fatalf("RunOnce: %v", err) |
| 333 | } |
| 334 | if len(fapi.logs) != 1 || len(fapi.stepStatuses) != 1 { |
| 335 | t.Fatalf("logs=%#v stepStatuses=%#v", fapi.logs, fapi.stepStatuses) |
| 336 | } |
| 337 | wantTokens := []string{"job-token", "next-token-1", "next-token-2", "next-token-3"} |
| 338 | if len(fapi.tokens) != len(wantTokens) { |
| 339 | t.Fatalf("tokens: got %#v, want %#v", fapi.tokens, wantTokens) |
| 340 | } |
| 341 | for i, want := range wantTokens { |
| 342 | if fapi.tokens[i] != want { |
| 343 | t.Fatalf("tokens[%d]: got %q, want %q; all=%#v", i, fapi.tokens[i], want, fapi.tokens) |
| 344 | } |
| 345 | } |
| 346 | } |
| 347 | |
| 348 | func TestRunOnce_EngineFailureStillCompletesJob(t *testing.T) { |
| 349 | t.Parallel() |
| 350 | now := time.Date(2026, 5, 10, 21, 0, 0, 0, time.UTC) |
| 351 | fapi := &fakeAPI{claim: &api.Claim{Token: "job-token", Job: api.Job{ID: 10, RunID: 20}}} |
| 352 | fengine := &fakeEngine{ |
| 353 | out: engine.Outcome{Conclusion: engine.ConclusionFailure, StartedAt: now, CompletedAt: now.Add(time.Second)}, |
| 354 | err: errors.New("exit 1"), |
| 355 | } |
| 356 | r := New(Options{ |
| 357 | API: fapi, |
| 358 | Engine: fengine, |
| 359 | Workspaces: &fakeWorkspaces{dir: "/tmp/workspace"}, |
| 360 | Clock: func() time.Time { return now }, |
| 361 | }) |
| 362 | if _, err := r.RunOnce(t.Context()); err != nil { |
| 363 | t.Fatalf("RunOnce: %v", err) |
| 364 | } |
| 365 | if fapi.statuses[1].Conclusion != engine.ConclusionFailure { |
| 366 | t.Fatalf("completion: %#v", fapi.statuses[1]) |
| 367 | } |
| 368 | } |
| 369 | |
| 370 | func TestRunOnce_CancelCheckStopsEngineAndMarksJobCancelled(t *testing.T) { |
| 371 | t.Parallel() |
| 372 | now := time.Date(2026, 5, 10, 21, 0, 0, 0, time.UTC) |
| 373 | fapi := &fakeAPI{ |
| 374 | claim: &api.Claim{Token: "job-token", Job: api.Job{ID: 10, RunID: 20}}, |
| 375 | cancelled: true, |
| 376 | } |
| 377 | fengine := newCancelBlockingEngine() |
| 378 | r := New(Options{ |
| 379 | API: fapi, |
| 380 | Engine: fengine, |
| 381 | Workspaces: &fakeWorkspaces{dir: "/tmp/workspace"}, |
| 382 | Clock: func() time.Time { return now }, |
| 383 | CancelPollInterval: time.Nanosecond, |
| 384 | Sleep: func(ctx context.Context, _ time.Duration) error { |
| 385 | select { |
| 386 | case <-ctx.Done(): |
| 387 | return ctx.Err() |
| 388 | default: |
| 389 | return nil |
| 390 | } |
| 391 | }, |
| 392 | }) |
| 393 | claimed, err := r.RunOnce(t.Context()) |
| 394 | if err != nil { |
| 395 | t.Fatalf("RunOnce: %v", err) |
| 396 | } |
| 397 | if !claimed { |
| 398 | t.Fatal("claimed = false") |
| 399 | } |
| 400 | if !fengine.cancelled { |
| 401 | t.Fatal("engine Cancel was not called") |
| 402 | } |
| 403 | if fapi.cancelChecks == 0 { |
| 404 | t.Fatal("cancel-check was not called") |
| 405 | } |
| 406 | if len(fapi.statuses) != 2 || |
| 407 | fapi.statuses[0].Status != "running" || |
| 408 | fapi.statuses[1].Status != "cancelled" || |
| 409 | fapi.statuses[1].Conclusion != engine.ConclusionCancelled { |
| 410 | t.Fatalf("statuses: %#v", fapi.statuses) |
| 411 | } |
| 412 | } |
| 413 | |
| 414 | func TestRunOnce_PrepareFailureMarksJobFailed(t *testing.T) { |
| 415 | t.Parallel() |
| 416 | fapi := &fakeAPI{claim: &api.Claim{Token: "job-token", Job: api.Job{ID: 10, RunID: 20}}} |
| 417 | r := New(Options{ |
| 418 | API: fapi, |
| 419 | Engine: &fakeEngine{}, |
| 420 | Workspaces: &fakeWorkspaces{err: errors.New("disk full")}, |
| 421 | Clock: func() time.Time { return time.Date(2026, 5, 10, 21, 0, 0, 0, time.UTC) }, |
| 422 | }) |
| 423 | claimed, err := r.RunOnce(t.Context()) |
| 424 | if err == nil { |
| 425 | t.Fatal("RunOnce returned nil error") |
| 426 | } |
| 427 | if !claimed { |
| 428 | t.Fatal("claimed = false") |
| 429 | } |
| 430 | if len(fapi.statuses) != 1 || fapi.statuses[0].Conclusion != engine.ConclusionFailure { |
| 431 | t.Fatalf("statuses: %#v", fapi.statuses) |
| 432 | } |
| 433 | } |
| 434 |