Go · 12263 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 package runner
4
5 import (
6 "context"
7 "errors"
8 "strconv"
9 "testing"
10 "time"
11
12 "github.com/tenseleyFlow/shithub/internal/runner/api"
13 "github.com/tenseleyFlow/shithub/internal/runner/engine"
14 )
15
16 type fakeAPI struct {
17 claim *api.Claim
18 heartbeats []api.HeartbeatRequest
19 statuses []api.StatusRequest
20 stepStatuses []api.StatusRequest
21 logs []api.LogRequest
22 cancelChecks int
23 cancelled bool
24 tokens []string
25 next int
26 }
27
28 func (f *fakeAPI) Heartbeat(_ context.Context, req api.HeartbeatRequest) (*api.Claim, error) {
29 f.heartbeats = append(f.heartbeats, req)
30 return f.claim, nil
31 }
32
33 func (f *fakeAPI) UpdateStatus(_ context.Context, _ int64, token string, req api.StatusRequest) (api.StatusResponse, error) {
34 f.tokens = append(f.tokens, token)
35 f.statuses = append(f.statuses, req)
36 if req.Status == "running" {
37 return api.StatusResponse{NextToken: f.nextToken()}, nil
38 }
39 return api.StatusResponse{}, nil
40 }
41
42 func (f *fakeAPI) UpdateStepStatus(_ context.Context, _, _ int64, token string, req api.StatusRequest) (api.StepStatusResponse, error) {
43 f.tokens = append(f.tokens, token)
44 f.stepStatuses = append(f.stepStatuses, req)
45 return api.StepStatusResponse{NextToken: f.nextToken()}, nil
46 }
47
48 func (f *fakeAPI) AppendLog(_ context.Context, _ int64, token string, req api.LogRequest) (api.LogResponse, error) {
49 f.tokens = append(f.tokens, token)
50 f.logs = append(f.logs, req)
51 return api.LogResponse{NextToken: f.nextToken()}, nil
52 }
53
54 func (f *fakeAPI) CancelCheck(_ context.Context, _ int64, token string) (api.CancelCheckResponse, error) {
55 f.tokens = append(f.tokens, token)
56 f.cancelChecks++
57 return api.CancelCheckResponse{Cancelled: f.cancelled, NextToken: f.nextToken()}, nil
58 }
59
60 func (f *fakeAPI) nextToken() string {
61 f.next++
62 return "next-token-" + strconv.Itoa(f.next)
63 }
64
65 type fakeEngine struct {
66 job engine.Job
67 out engine.Outcome
68 logs []engine.LogChunk
69 err error
70 cancelled bool
71 }
72
73 func (f *fakeEngine) Execute(_ context.Context, job engine.Job) (engine.Outcome, error) {
74 f.job = job
75 return f.out, f.err
76 }
77
78 func (f *fakeEngine) StreamLogs(_ context.Context, _ int64) (<-chan engine.LogChunk, error) {
79 ch := make(chan engine.LogChunk, len(f.logs))
80 for _, chunk := range f.logs {
81 ch <- chunk
82 }
83 close(ch)
84 return ch, nil
85 }
86
87 func (f *fakeEngine) Cancel(_ context.Context, _ int64) error {
88 f.cancelled = true
89 return nil
90 }
91
92 type fakeEventEngine struct {
93 fakeEngine
94 events []engine.Event
95 ch chan engine.Event
96 }
97
98 func (f *fakeEventEngine) StreamEvents(_ context.Context, _ int64) (<-chan engine.Event, error) {
99 f.ch = make(chan engine.Event, len(f.events))
100 return f.ch, nil
101 }
102
103 func (f *fakeEventEngine) Execute(ctx context.Context, job engine.Job) (engine.Outcome, error) {
104 out, err := f.fakeEngine.Execute(ctx, job)
105 for _, event := range f.events {
106 f.ch <- event
107 }
108 close(f.ch)
109 return out, err
110 }
111
112 type cancelBlockingEngine struct {
113 job engine.Job
114 logs chan engine.LogChunk
115 started chan struct{}
116 cancelled bool
117 }
118
119 func newCancelBlockingEngine() *cancelBlockingEngine {
120 return &cancelBlockingEngine{
121 logs: make(chan engine.LogChunk),
122 started: make(chan struct{}),
123 }
124 }
125
126 func (f *cancelBlockingEngine) Execute(ctx context.Context, job engine.Job) (engine.Outcome, error) {
127 f.job = job
128 close(f.started)
129 <-ctx.Done()
130 close(f.logs)
131 now := time.Date(2026, 5, 10, 21, 0, 1, 0, time.UTC)
132 return engine.Outcome{
133 Conclusion: engine.ConclusionCancelled,
134 StartedAt: now.Add(-time.Second),
135 CompletedAt: now,
136 }, ctx.Err()
137 }
138
139 func (f *cancelBlockingEngine) StreamLogs(_ context.Context, _ int64) (<-chan engine.LogChunk, error) {
140 return f.logs, nil
141 }
142
143 func (f *cancelBlockingEngine) Cancel(_ context.Context, _ int64) error {
144 f.cancelled = true
145 return nil
146 }
147
148 type fakeWorkspaces struct {
149 dir string
150 removed bool
151 err error
152 }
153
154 func (f *fakeWorkspaces) Prepare(_, _ int64) (string, error) {
155 return f.dir, f.err
156 }
157
158 func (f *fakeWorkspaces) Remove(_, _ int64) error {
159 f.removed = true
160 return nil
161 }
162
163 func TestRunOnce_NoClaim(t *testing.T) {
164 t.Parallel()
165 fapi := &fakeAPI{}
166 r := New(Options{
167 API: fapi,
168 Engine: &fakeEngine{},
169 Workspaces: &fakeWorkspaces{},
170 Labels: []string{"self-hosted", "linux"},
171 Capacity: 2,
172 HostName: "runner-host",
173 Version: "dev-test",
174 })
175 claimed, err := r.RunOnce(t.Context())
176 if err != nil {
177 t.Fatalf("RunOnce: %v", err)
178 }
179 if claimed {
180 t.Fatal("claimed = true")
181 }
182 if len(fapi.heartbeats) != 1 {
183 t.Fatalf("heartbeats: %#v", fapi.heartbeats)
184 }
185 got := fapi.heartbeats[0]
186 if got.Capacity != 2 || got.HostName != "runner-host" || got.Version != "dev-test" ||
187 len(got.Labels) != 2 || got.Labels[0] != "self-hosted" || got.Labels[1] != "linux" {
188 t.Fatalf("heartbeat: %#v", got)
189 }
190 }
191
192 func TestRunOnce_ExecutesAndCompletesSuccess(t *testing.T) {
193 t.Parallel()
194 now := time.Date(2026, 5, 10, 21, 0, 0, 0, time.UTC)
195 claim := &api.Claim{
196 Token: "job-token",
197 Job: api.Job{
198 ID: 10,
199 RunID: 20,
200 Env: map[string]string{"A": "B"},
201 Steps: []api.Step{{ID: 30, Run: "echo hi"}},
202 },
203 }
204 fapi := &fakeAPI{claim: claim}
205 fengine := &fakeEngine{out: engine.Outcome{Conclusion: engine.ConclusionSuccess, StartedAt: now, CompletedAt: now.Add(time.Second)}}
206 workspaces := &fakeWorkspaces{dir: "/tmp/workspace"}
207 r := New(Options{
208 API: fapi,
209 Engine: fengine,
210 Workspaces: workspaces,
211 DefaultImage: "runner-image",
212 Clock: func() time.Time { return now },
213 })
214 claimed, err := r.RunOnce(t.Context())
215 if err != nil {
216 t.Fatalf("RunOnce: %v", err)
217 }
218 if !claimed {
219 t.Fatal("claimed = false")
220 }
221 if fengine.job.WorkspaceDir != "/tmp/workspace" || fengine.job.Image != "runner-image" {
222 t.Fatalf("engine job: %#v", fengine.job)
223 }
224 if len(fapi.statuses) != 2 {
225 t.Fatalf("statuses: %#v", fapi.statuses)
226 }
227 if fapi.statuses[0].Status != "running" || fapi.statuses[1].Conclusion != engine.ConclusionSuccess {
228 t.Fatalf("statuses: %#v", fapi.statuses)
229 }
230 if len(fapi.tokens) != 2 || fapi.tokens[0] != "job-token" || fapi.tokens[1] != "next-token-1" {
231 t.Fatalf("tokens: %#v", fapi.tokens)
232 }
233 if !workspaces.removed {
234 t.Fatal("workspace was not removed")
235 }
236 }
237
238 func TestRunOnce_StreamsLogsAndMarksSteps(t *testing.T) {
239 t.Parallel()
240 now := time.Date(2026, 5, 10, 21, 0, 0, 0, time.UTC)
241 claim := &api.Claim{
242 Token: "job-token",
243 Job: api.Job{
244 ID: 10,
245 RunID: 20,
246 Steps: []api.Step{{ID: 30, Run: "echo hi"}},
247 },
248 }
249 fapi := &fakeAPI{claim: claim}
250 fengine := &fakeEngine{
251 out: engine.Outcome{
252 Conclusion: engine.ConclusionSuccess,
253 StartedAt: now,
254 CompletedAt: now.Add(time.Second),
255 StepOutcomes: []engine.StepOutcome{{
256 StepID: 30,
257 Status: "completed",
258 Conclusion: engine.ConclusionSuccess,
259 StartedAt: now,
260 CompletedAt: now.Add(500 * time.Millisecond),
261 }},
262 },
263 logs: []engine.LogChunk{{
264 JobID: 10,
265 StepID: 30,
266 Seq: 0,
267 Chunk: []byte("hello\n"),
268 }},
269 }
270 r := New(Options{
271 API: fapi,
272 Engine: fengine,
273 Workspaces: &fakeWorkspaces{dir: "/tmp/workspace"},
274 Clock: func() time.Time { return now },
275 })
276 claimed, err := r.RunOnce(t.Context())
277 if err != nil {
278 t.Fatalf("RunOnce: %v", err)
279 }
280 if !claimed {
281 t.Fatal("claimed = false")
282 }
283 if len(fapi.logs) != 1 || fapi.logs[0].StepID != 30 || string(fapi.logs[0].Chunk) != "hello\n" {
284 t.Fatalf("logs: %#v", fapi.logs)
285 }
286 if len(fapi.stepStatuses) != 1 || fapi.stepStatuses[0].Status != "completed" ||
287 fapi.stepStatuses[0].Conclusion != engine.ConclusionSuccess {
288 t.Fatalf("step statuses: %#v", fapi.stepStatuses)
289 }
290 wantTokens := []string{"job-token", "next-token-1", "next-token-2", "next-token-3"}
291 if len(fapi.tokens) != len(wantTokens) {
292 t.Fatalf("tokens: got %#v, want %#v", fapi.tokens, wantTokens)
293 }
294 for i, want := range wantTokens {
295 if fapi.tokens[i] != want {
296 t.Fatalf("tokens[%d]: got %q, want %q; all=%#v", i, fapi.tokens[i], want, fapi.tokens)
297 }
298 }
299 }
300
301 func TestRunOnce_PrefersOrderedEventStream(t *testing.T) {
302 t.Parallel()
303 now := time.Date(2026, 5, 10, 21, 0, 0, 0, time.UTC)
304 logEvent := engine.LogChunk{JobID: 10, StepID: 30, Seq: 0, Chunk: []byte("hello\n")}
305 stepEvent := engine.StepOutcome{
306 StepID: 30,
307 Status: "completed",
308 Conclusion: engine.ConclusionSuccess,
309 StartedAt: now,
310 CompletedAt: now.Add(500 * time.Millisecond),
311 }
312 fapi := &fakeAPI{claim: &api.Claim{
313 Token: "job-token",
314 Job: api.Job{ID: 10, RunID: 20, Steps: []api.Step{{ID: 30, Run: "echo hi"}}},
315 }}
316 fengine := &fakeEventEngine{
317 fakeEngine: fakeEngine{out: engine.Outcome{
318 Conclusion: engine.ConclusionSuccess,
319 StartedAt: now,
320 CompletedAt: now.Add(time.Second),
321 StepOutcomes: []engine.StepOutcome{stepEvent},
322 }},
323 events: []engine.Event{{Log: &logEvent}, {Step: &stepEvent}},
324 }
325 r := New(Options{
326 API: fapi,
327 Engine: fengine,
328 Workspaces: &fakeWorkspaces{dir: "/tmp/workspace"},
329 Clock: func() time.Time { return now },
330 })
331 if _, err := r.RunOnce(t.Context()); err != nil {
332 t.Fatalf("RunOnce: %v", err)
333 }
334 if len(fapi.logs) != 1 || len(fapi.stepStatuses) != 1 {
335 t.Fatalf("logs=%#v stepStatuses=%#v", fapi.logs, fapi.stepStatuses)
336 }
337 wantTokens := []string{"job-token", "next-token-1", "next-token-2", "next-token-3"}
338 if len(fapi.tokens) != len(wantTokens) {
339 t.Fatalf("tokens: got %#v, want %#v", fapi.tokens, wantTokens)
340 }
341 for i, want := range wantTokens {
342 if fapi.tokens[i] != want {
343 t.Fatalf("tokens[%d]: got %q, want %q; all=%#v", i, fapi.tokens[i], want, fapi.tokens)
344 }
345 }
346 }
347
348 func TestRunOnce_EngineFailureStillCompletesJob(t *testing.T) {
349 t.Parallel()
350 now := time.Date(2026, 5, 10, 21, 0, 0, 0, time.UTC)
351 fapi := &fakeAPI{claim: &api.Claim{Token: "job-token", Job: api.Job{ID: 10, RunID: 20}}}
352 fengine := &fakeEngine{
353 out: engine.Outcome{Conclusion: engine.ConclusionFailure, StartedAt: now, CompletedAt: now.Add(time.Second)},
354 err: errors.New("exit 1"),
355 }
356 r := New(Options{
357 API: fapi,
358 Engine: fengine,
359 Workspaces: &fakeWorkspaces{dir: "/tmp/workspace"},
360 Clock: func() time.Time { return now },
361 })
362 if _, err := r.RunOnce(t.Context()); err != nil {
363 t.Fatalf("RunOnce: %v", err)
364 }
365 if fapi.statuses[1].Conclusion != engine.ConclusionFailure {
366 t.Fatalf("completion: %#v", fapi.statuses[1])
367 }
368 }
369
370 func TestRunOnce_CancelCheckStopsEngineAndMarksJobCancelled(t *testing.T) {
371 t.Parallel()
372 now := time.Date(2026, 5, 10, 21, 0, 0, 0, time.UTC)
373 fapi := &fakeAPI{
374 claim: &api.Claim{Token: "job-token", Job: api.Job{ID: 10, RunID: 20}},
375 cancelled: true,
376 }
377 fengine := newCancelBlockingEngine()
378 r := New(Options{
379 API: fapi,
380 Engine: fengine,
381 Workspaces: &fakeWorkspaces{dir: "/tmp/workspace"},
382 Clock: func() time.Time { return now },
383 CancelPollInterval: time.Nanosecond,
384 Sleep: func(ctx context.Context, _ time.Duration) error {
385 select {
386 case <-ctx.Done():
387 return ctx.Err()
388 default:
389 return nil
390 }
391 },
392 })
393 claimed, err := r.RunOnce(t.Context())
394 if err != nil {
395 t.Fatalf("RunOnce: %v", err)
396 }
397 if !claimed {
398 t.Fatal("claimed = false")
399 }
400 if !fengine.cancelled {
401 t.Fatal("engine Cancel was not called")
402 }
403 if fapi.cancelChecks == 0 {
404 t.Fatal("cancel-check was not called")
405 }
406 if len(fapi.statuses) != 2 ||
407 fapi.statuses[0].Status != "running" ||
408 fapi.statuses[1].Status != "cancelled" ||
409 fapi.statuses[1].Conclusion != engine.ConclusionCancelled {
410 t.Fatalf("statuses: %#v", fapi.statuses)
411 }
412 }
413
414 func TestRunOnce_PrepareFailureMarksJobFailed(t *testing.T) {
415 t.Parallel()
416 fapi := &fakeAPI{claim: &api.Claim{Token: "job-token", Job: api.Job{ID: 10, RunID: 20}}}
417 r := New(Options{
418 API: fapi,
419 Engine: &fakeEngine{},
420 Workspaces: &fakeWorkspaces{err: errors.New("disk full")},
421 Clock: func() time.Time { return time.Date(2026, 5, 10, 21, 0, 0, 0, time.UTC) },
422 })
423 claimed, err := r.RunOnce(t.Context())
424 if err == nil {
425 t.Fatal("RunOnce returned nil error")
426 }
427 if !claimed {
428 t.Fatal("claimed = false")
429 }
430 if len(fapi.statuses) != 1 || fapi.statuses[0].Conclusion != engine.ConclusionFailure {
431 t.Fatalf("statuses: %#v", fapi.statuses)
432 }
433 }
434