Go · 11707 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 package runner
4
5 import (
6 "context"
7 "errors"
8 "strconv"
9 "testing"
10 "time"
11
12 "github.com/tenseleyFlow/shithub/internal/runner/api"
13 "github.com/tenseleyFlow/shithub/internal/runner/engine"
14 )
15
16 type fakeAPI struct {
17 claim *api.Claim
18 statuses []api.StatusRequest
19 stepStatuses []api.StatusRequest
20 logs []api.LogRequest
21 cancelChecks int
22 cancelled bool
23 tokens []string
24 next int
25 }
26
27 func (f *fakeAPI) Heartbeat(_ context.Context, _ api.HeartbeatRequest) (*api.Claim, error) {
28 return f.claim, nil
29 }
30
31 func (f *fakeAPI) UpdateStatus(_ context.Context, _ int64, token string, req api.StatusRequest) (api.StatusResponse, error) {
32 f.tokens = append(f.tokens, token)
33 f.statuses = append(f.statuses, req)
34 if req.Status == "running" {
35 return api.StatusResponse{NextToken: f.nextToken()}, nil
36 }
37 return api.StatusResponse{}, nil
38 }
39
40 func (f *fakeAPI) UpdateStepStatus(_ context.Context, _, _ int64, token string, req api.StatusRequest) (api.StepStatusResponse, error) {
41 f.tokens = append(f.tokens, token)
42 f.stepStatuses = append(f.stepStatuses, req)
43 return api.StepStatusResponse{NextToken: f.nextToken()}, nil
44 }
45
46 func (f *fakeAPI) AppendLog(_ context.Context, _ int64, token string, req api.LogRequest) (api.LogResponse, error) {
47 f.tokens = append(f.tokens, token)
48 f.logs = append(f.logs, req)
49 return api.LogResponse{NextToken: f.nextToken()}, nil
50 }
51
52 func (f *fakeAPI) CancelCheck(_ context.Context, _ int64, token string) (api.CancelCheckResponse, error) {
53 f.tokens = append(f.tokens, token)
54 f.cancelChecks++
55 return api.CancelCheckResponse{Cancelled: f.cancelled, NextToken: f.nextToken()}, nil
56 }
57
58 func (f *fakeAPI) nextToken() string {
59 f.next++
60 return "next-token-" + strconv.Itoa(f.next)
61 }
62
63 type fakeEngine struct {
64 job engine.Job
65 out engine.Outcome
66 logs []engine.LogChunk
67 err error
68 cancelled bool
69 }
70
71 func (f *fakeEngine) Execute(_ context.Context, job engine.Job) (engine.Outcome, error) {
72 f.job = job
73 return f.out, f.err
74 }
75
76 func (f *fakeEngine) StreamLogs(_ context.Context, _ int64) (<-chan engine.LogChunk, error) {
77 ch := make(chan engine.LogChunk, len(f.logs))
78 for _, chunk := range f.logs {
79 ch <- chunk
80 }
81 close(ch)
82 return ch, nil
83 }
84
85 func (f *fakeEngine) Cancel(_ context.Context, _ int64) error {
86 f.cancelled = true
87 return nil
88 }
89
90 type fakeEventEngine struct {
91 fakeEngine
92 events []engine.Event
93 ch chan engine.Event
94 }
95
96 func (f *fakeEventEngine) StreamEvents(_ context.Context, _ int64) (<-chan engine.Event, error) {
97 f.ch = make(chan engine.Event, len(f.events))
98 return f.ch, nil
99 }
100
101 func (f *fakeEventEngine) Execute(ctx context.Context, job engine.Job) (engine.Outcome, error) {
102 out, err := f.fakeEngine.Execute(ctx, job)
103 for _, event := range f.events {
104 f.ch <- event
105 }
106 close(f.ch)
107 return out, err
108 }
109
110 type cancelBlockingEngine struct {
111 job engine.Job
112 logs chan engine.LogChunk
113 started chan struct{}
114 cancelled bool
115 }
116
117 func newCancelBlockingEngine() *cancelBlockingEngine {
118 return &cancelBlockingEngine{
119 logs: make(chan engine.LogChunk),
120 started: make(chan struct{}),
121 }
122 }
123
124 func (f *cancelBlockingEngine) Execute(ctx context.Context, job engine.Job) (engine.Outcome, error) {
125 f.job = job
126 close(f.started)
127 <-ctx.Done()
128 close(f.logs)
129 now := time.Date(2026, 5, 10, 21, 0, 1, 0, time.UTC)
130 return engine.Outcome{
131 Conclusion: engine.ConclusionCancelled,
132 StartedAt: now.Add(-time.Second),
133 CompletedAt: now,
134 }, ctx.Err()
135 }
136
137 func (f *cancelBlockingEngine) StreamLogs(_ context.Context, _ int64) (<-chan engine.LogChunk, error) {
138 return f.logs, nil
139 }
140
141 func (f *cancelBlockingEngine) Cancel(_ context.Context, _ int64) error {
142 f.cancelled = true
143 return nil
144 }
145
146 type fakeWorkspaces struct {
147 dir string
148 removed bool
149 err error
150 }
151
152 func (f *fakeWorkspaces) Prepare(_, _ int64) (string, error) {
153 return f.dir, f.err
154 }
155
156 func (f *fakeWorkspaces) Remove(_, _ int64) error {
157 f.removed = true
158 return nil
159 }
160
161 func TestRunOnce_NoClaim(t *testing.T) {
162 t.Parallel()
163 r := New(Options{API: &fakeAPI{}, Engine: &fakeEngine{}, Workspaces: &fakeWorkspaces{}})
164 claimed, err := r.RunOnce(t.Context())
165 if err != nil {
166 t.Fatalf("RunOnce: %v", err)
167 }
168 if claimed {
169 t.Fatal("claimed = true")
170 }
171 }
172
173 func TestRunOnce_ExecutesAndCompletesSuccess(t *testing.T) {
174 t.Parallel()
175 now := time.Date(2026, 5, 10, 21, 0, 0, 0, time.UTC)
176 claim := &api.Claim{
177 Token: "job-token",
178 Job: api.Job{
179 ID: 10,
180 RunID: 20,
181 Env: map[string]string{"A": "B"},
182 Steps: []api.Step{{ID: 30, Run: "echo hi"}},
183 },
184 }
185 fapi := &fakeAPI{claim: claim}
186 fengine := &fakeEngine{out: engine.Outcome{Conclusion: engine.ConclusionSuccess, StartedAt: now, CompletedAt: now.Add(time.Second)}}
187 workspaces := &fakeWorkspaces{dir: "/tmp/workspace"}
188 r := New(Options{
189 API: fapi,
190 Engine: fengine,
191 Workspaces: workspaces,
192 DefaultImage: "runner-image",
193 Clock: func() time.Time { return now },
194 })
195 claimed, err := r.RunOnce(t.Context())
196 if err != nil {
197 t.Fatalf("RunOnce: %v", err)
198 }
199 if !claimed {
200 t.Fatal("claimed = false")
201 }
202 if fengine.job.WorkspaceDir != "/tmp/workspace" || fengine.job.Image != "runner-image" {
203 t.Fatalf("engine job: %#v", fengine.job)
204 }
205 if len(fapi.statuses) != 2 {
206 t.Fatalf("statuses: %#v", fapi.statuses)
207 }
208 if fapi.statuses[0].Status != "running" || fapi.statuses[1].Conclusion != engine.ConclusionSuccess {
209 t.Fatalf("statuses: %#v", fapi.statuses)
210 }
211 if len(fapi.tokens) != 2 || fapi.tokens[0] != "job-token" || fapi.tokens[1] != "next-token-1" {
212 t.Fatalf("tokens: %#v", fapi.tokens)
213 }
214 if !workspaces.removed {
215 t.Fatal("workspace was not removed")
216 }
217 }
218
219 func TestRunOnce_StreamsLogsAndMarksSteps(t *testing.T) {
220 t.Parallel()
221 now := time.Date(2026, 5, 10, 21, 0, 0, 0, time.UTC)
222 claim := &api.Claim{
223 Token: "job-token",
224 Job: api.Job{
225 ID: 10,
226 RunID: 20,
227 Steps: []api.Step{{ID: 30, Run: "echo hi"}},
228 },
229 }
230 fapi := &fakeAPI{claim: claim}
231 fengine := &fakeEngine{
232 out: engine.Outcome{
233 Conclusion: engine.ConclusionSuccess,
234 StartedAt: now,
235 CompletedAt: now.Add(time.Second),
236 StepOutcomes: []engine.StepOutcome{{
237 StepID: 30,
238 Status: "completed",
239 Conclusion: engine.ConclusionSuccess,
240 StartedAt: now,
241 CompletedAt: now.Add(500 * time.Millisecond),
242 }},
243 },
244 logs: []engine.LogChunk{{
245 JobID: 10,
246 StepID: 30,
247 Seq: 0,
248 Chunk: []byte("hello\n"),
249 }},
250 }
251 r := New(Options{
252 API: fapi,
253 Engine: fengine,
254 Workspaces: &fakeWorkspaces{dir: "/tmp/workspace"},
255 Clock: func() time.Time { return now },
256 })
257 claimed, err := r.RunOnce(t.Context())
258 if err != nil {
259 t.Fatalf("RunOnce: %v", err)
260 }
261 if !claimed {
262 t.Fatal("claimed = false")
263 }
264 if len(fapi.logs) != 1 || fapi.logs[0].StepID != 30 || string(fapi.logs[0].Chunk) != "hello\n" {
265 t.Fatalf("logs: %#v", fapi.logs)
266 }
267 if len(fapi.stepStatuses) != 1 || fapi.stepStatuses[0].Status != "completed" ||
268 fapi.stepStatuses[0].Conclusion != engine.ConclusionSuccess {
269 t.Fatalf("step statuses: %#v", fapi.stepStatuses)
270 }
271 wantTokens := []string{"job-token", "next-token-1", "next-token-2", "next-token-3"}
272 if len(fapi.tokens) != len(wantTokens) {
273 t.Fatalf("tokens: got %#v, want %#v", fapi.tokens, wantTokens)
274 }
275 for i, want := range wantTokens {
276 if fapi.tokens[i] != want {
277 t.Fatalf("tokens[%d]: got %q, want %q; all=%#v", i, fapi.tokens[i], want, fapi.tokens)
278 }
279 }
280 }
281
282 func TestRunOnce_PrefersOrderedEventStream(t *testing.T) {
283 t.Parallel()
284 now := time.Date(2026, 5, 10, 21, 0, 0, 0, time.UTC)
285 logEvent := engine.LogChunk{JobID: 10, StepID: 30, Seq: 0, Chunk: []byte("hello\n")}
286 stepEvent := engine.StepOutcome{
287 StepID: 30,
288 Status: "completed",
289 Conclusion: engine.ConclusionSuccess,
290 StartedAt: now,
291 CompletedAt: now.Add(500 * time.Millisecond),
292 }
293 fapi := &fakeAPI{claim: &api.Claim{
294 Token: "job-token",
295 Job: api.Job{ID: 10, RunID: 20, Steps: []api.Step{{ID: 30, Run: "echo hi"}}},
296 }}
297 fengine := &fakeEventEngine{
298 fakeEngine: fakeEngine{out: engine.Outcome{
299 Conclusion: engine.ConclusionSuccess,
300 StartedAt: now,
301 CompletedAt: now.Add(time.Second),
302 StepOutcomes: []engine.StepOutcome{stepEvent},
303 }},
304 events: []engine.Event{{Log: &logEvent}, {Step: &stepEvent}},
305 }
306 r := New(Options{
307 API: fapi,
308 Engine: fengine,
309 Workspaces: &fakeWorkspaces{dir: "/tmp/workspace"},
310 Clock: func() time.Time { return now },
311 })
312 if _, err := r.RunOnce(t.Context()); err != nil {
313 t.Fatalf("RunOnce: %v", err)
314 }
315 if len(fapi.logs) != 1 || len(fapi.stepStatuses) != 1 {
316 t.Fatalf("logs=%#v stepStatuses=%#v", fapi.logs, fapi.stepStatuses)
317 }
318 wantTokens := []string{"job-token", "next-token-1", "next-token-2", "next-token-3"}
319 if len(fapi.tokens) != len(wantTokens) {
320 t.Fatalf("tokens: got %#v, want %#v", fapi.tokens, wantTokens)
321 }
322 for i, want := range wantTokens {
323 if fapi.tokens[i] != want {
324 t.Fatalf("tokens[%d]: got %q, want %q; all=%#v", i, fapi.tokens[i], want, fapi.tokens)
325 }
326 }
327 }
328
329 func TestRunOnce_EngineFailureStillCompletesJob(t *testing.T) {
330 t.Parallel()
331 now := time.Date(2026, 5, 10, 21, 0, 0, 0, time.UTC)
332 fapi := &fakeAPI{claim: &api.Claim{Token: "job-token", Job: api.Job{ID: 10, RunID: 20}}}
333 fengine := &fakeEngine{
334 out: engine.Outcome{Conclusion: engine.ConclusionFailure, StartedAt: now, CompletedAt: now.Add(time.Second)},
335 err: errors.New("exit 1"),
336 }
337 r := New(Options{
338 API: fapi,
339 Engine: fengine,
340 Workspaces: &fakeWorkspaces{dir: "/tmp/workspace"},
341 Clock: func() time.Time { return now },
342 })
343 if _, err := r.RunOnce(t.Context()); err != nil {
344 t.Fatalf("RunOnce: %v", err)
345 }
346 if fapi.statuses[1].Conclusion != engine.ConclusionFailure {
347 t.Fatalf("completion: %#v", fapi.statuses[1])
348 }
349 }
350
351 func TestRunOnce_CancelCheckStopsEngineAndMarksJobCancelled(t *testing.T) {
352 t.Parallel()
353 now := time.Date(2026, 5, 10, 21, 0, 0, 0, time.UTC)
354 fapi := &fakeAPI{
355 claim: &api.Claim{Token: "job-token", Job: api.Job{ID: 10, RunID: 20}},
356 cancelled: true,
357 }
358 fengine := newCancelBlockingEngine()
359 r := New(Options{
360 API: fapi,
361 Engine: fengine,
362 Workspaces: &fakeWorkspaces{dir: "/tmp/workspace"},
363 Clock: func() time.Time { return now },
364 CancelPollInterval: time.Nanosecond,
365 Sleep: func(ctx context.Context, _ time.Duration) error {
366 select {
367 case <-ctx.Done():
368 return ctx.Err()
369 default:
370 return nil
371 }
372 },
373 })
374 claimed, err := r.RunOnce(t.Context())
375 if err != nil {
376 t.Fatalf("RunOnce: %v", err)
377 }
378 if !claimed {
379 t.Fatal("claimed = false")
380 }
381 if !fengine.cancelled {
382 t.Fatal("engine Cancel was not called")
383 }
384 if fapi.cancelChecks == 0 {
385 t.Fatal("cancel-check was not called")
386 }
387 if len(fapi.statuses) != 2 ||
388 fapi.statuses[0].Status != "running" ||
389 fapi.statuses[1].Status != "cancelled" ||
390 fapi.statuses[1].Conclusion != engine.ConclusionCancelled {
391 t.Fatalf("statuses: %#v", fapi.statuses)
392 }
393 }
394
395 func TestRunOnce_PrepareFailureMarksJobFailed(t *testing.T) {
396 t.Parallel()
397 fapi := &fakeAPI{claim: &api.Claim{Token: "job-token", Job: api.Job{ID: 10, RunID: 20}}}
398 r := New(Options{
399 API: fapi,
400 Engine: &fakeEngine{},
401 Workspaces: &fakeWorkspaces{err: errors.New("disk full")},
402 Clock: func() time.Time { return time.Date(2026, 5, 10, 21, 0, 0, 0, time.UTC) },
403 })
404 claimed, err := r.RunOnce(t.Context())
405 if err == nil {
406 t.Fatal("RunOnce returned nil error")
407 }
408 if !claimed {
409 t.Fatal("claimed = false")
410 }
411 if len(fapi.statuses) != 1 || fapi.statuses[0].Conclusion != engine.ConclusionFailure {
412 t.Fatalf("statuses: %#v", fapi.statuses)
413 }
414 }
415