Go · 15807 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 package trigger_test
4
5 import (
6 "context"
7 "errors"
8 "fmt"
9 "io"
10 "log/slog"
11 "strings"
12 "testing"
13
14 "github.com/jackc/pgx/v5"
15 "github.com/jackc/pgx/v5/pgtype"
16 "github.com/jackc/pgx/v5/pgxpool"
17
18 actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
19 "github.com/tenseleyFlow/shithub/internal/actions/trigger"
20 "github.com/tenseleyFlow/shithub/internal/actions/workflow"
21 reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
22 "github.com/tenseleyFlow/shithub/internal/testing/dbtest"
23 usersdb "github.com/tenseleyFlow/shithub/internal/users/sqlc"
24 )
25
26 const enqFixtureHash = "$argon2id$v=19$m=16384,t=1,p=1$" +
27 "AAAAAAAAAAAAAAAA$" +
28 "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
29
30 type enqFx struct {
31 pool *pgxpool.Pool
32 deps trigger.Deps
33 repoID int64
34 userID int64
35 }
36
37 func setupEnq(t *testing.T) enqFx {
38 t.Helper()
39 pool := dbtest.NewTestDB(t)
40 ctx := context.Background()
41 user, err := usersdb.New().CreateUser(ctx, pool, usersdb.CreateUserParams{
42 Username: "alice", DisplayName: "Alice", PasswordHash: enqFixtureHash,
43 })
44 if err != nil {
45 t.Fatalf("CreateUser: %v", err)
46 }
47 repo, err := reposdb.New().CreateRepo(ctx, pool, reposdb.CreateRepoParams{
48 OwnerUserID: pgtype.Int8{Int64: user.ID, Valid: true},
49 Name: "demo",
50 DefaultBranch: "trunk",
51 Visibility: reposdb.RepoVisibilityPublic,
52 })
53 if err != nil {
54 t.Fatalf("CreateRepo: %v", err)
55 }
56 return enqFx{
57 pool: pool,
58 deps: trigger.Deps{Pool: pool, Logger: slog.New(slog.NewTextHandler(io.Discard, nil))},
59 repoID: repo.ID,
60 userID: user.ID,
61 }
62 }
63
64 // fixtureWorkflow returns a small valid Workflow with one job + two
65 // steps. Used by every enqueue test.
66 func fixtureWorkflow(t *testing.T) *workflow.Workflow {
67 t.Helper()
68 return workflowFromYAML(t, `name: ci
69 on: push
70 jobs:
71 build:
72 runs-on: ubuntu-latest
73 steps:
74 - uses: actions/checkout@v4
75 - run: echo hello
76 `)
77 }
78
79 func concurrencyWorkflow(t *testing.T, group string, cancelInProgress bool) *workflow.Workflow {
80 t.Helper()
81 return workflowFromYAML(t, fmt.Sprintf(`name: ci
82 on: push
83 concurrency:
84 group: "%s"
85 cancel-in-progress: %t
86 jobs:
87 build:
88 runs-on: ubuntu-latest
89 steps:
90 - uses: actions/checkout@v4
91 - run: echo hello
92 `, group, cancelInProgress))
93 }
94
95 func workflowFromYAML(t *testing.T, src string) *workflow.Workflow {
96 t.Helper()
97 w, diags, err := workflow.Parse([]byte(src))
98 if err != nil {
99 t.Fatalf("parse fixture: %v", err)
100 }
101 for _, d := range diags {
102 if d.Severity == workflow.Error {
103 t.Fatalf("unexpected diagnostic: %v", d)
104 }
105 }
106 return w
107 }
108
109 func TestEnqueue_HappyPath(t *testing.T) {
110 f := setupEnq(t)
111 ctx := context.Background()
112 res, err := trigger.Enqueue(ctx, f.deps, trigger.EnqueueParams{
113 RepoID: f.repoID,
114 WorkflowFile: ".shithub/workflows/ci.yml",
115 HeadSHA: strings.Repeat("a", 40),
116 HeadRef: "refs/heads/trunk",
117 EventKind: trigger.EventPush,
118 EventPayload: map[string]any{"ref": "refs/heads/trunk"},
119 ActorUserID: f.userID,
120 TriggerEventID: "push:42",
121 Workflow: fixtureWorkflow(t),
122 })
123 if err != nil {
124 t.Fatalf("Enqueue: %v", err)
125 }
126 if res.RunID == 0 || res.RunIndex == 0 || res.AlreadyExists {
127 t.Errorf("expected fresh run, got %+v", res)
128 }
129 // One job, so one check_run.
130 if len(res.CheckRunIDs) != 1 {
131 t.Errorf("expected 1 check_run, got %d", len(res.CheckRunIDs))
132 }
133
134 // Verify rows landed.
135 q := actionsdb.New()
136 run, err := q.GetWorkflowRunByID(ctx, f.pool, res.RunID)
137 if err != nil {
138 t.Fatalf("GetWorkflowRunByID: %v", err)
139 }
140 if run.TriggerEventID != "push:42" {
141 t.Errorf("trigger_event_id: got %q want push:42", run.TriggerEventID)
142 }
143 if run.Status != actionsdb.WorkflowRunStatusQueued {
144 t.Errorf("status: got %s want queued", run.Status)
145 }
146 }
147
148 func TestEnqueue_ResolvesConcurrencyGroupExpression(t *testing.T) {
149 f := setupEnq(t)
150 ctx := context.Background()
151 res, err := trigger.Enqueue(ctx, f.deps, trigger.EnqueueParams{
152 RepoID: f.repoID,
153 WorkflowFile: ".shithub/workflows/ci.yml",
154 HeadSHA: strings.Repeat("a", 40),
155 HeadRef: "refs/heads/feature",
156 EventKind: trigger.EventPush,
157 EventPayload: map[string]any{"ref": "refs/heads/feature"},
158 ActorUserID: f.userID,
159 TriggerEventID: "push:concurrency-expr",
160 Workflow: concurrencyWorkflow(t, "branch-${{ shithub.ref }}", false),
161 })
162 if err != nil {
163 t.Fatalf("Enqueue: %v", err)
164 }
165 run, err := actionsdb.New().GetWorkflowRunByID(ctx, f.pool, res.RunID)
166 if err != nil {
167 t.Fatalf("GetWorkflowRunByID: %v", err)
168 }
169 if run.ConcurrencyGroup != "branch-refs/heads/feature" {
170 t.Fatalf("concurrency_group: got %q", run.ConcurrencyGroup)
171 }
172 }
173
174 func TestEnqueue_CancelInProgressCancelsOlderQueuedRun(t *testing.T) {
175 f := setupEnq(t)
176 ctx := context.Background()
177 q := actionsdb.New()
178 first, err := trigger.Enqueue(ctx, f.deps, trigger.EnqueueParams{
179 RepoID: f.repoID,
180 WorkflowFile: ".shithub/workflows/ci.yml",
181 HeadSHA: strings.Repeat("a", 40),
182 HeadRef: "refs/heads/trunk",
183 EventKind: trigger.EventPush,
184 EventPayload: map[string]any{"ref": "refs/heads/trunk"},
185 ActorUserID: f.userID,
186 TriggerEventID: "push:concurrency-cancel-1",
187 Workflow: concurrencyWorkflow(t, "${{ shithub.ref }}", false),
188 })
189 if err != nil {
190 t.Fatalf("first Enqueue: %v", err)
191 }
192 second, err := trigger.Enqueue(ctx, f.deps, trigger.EnqueueParams{
193 RepoID: f.repoID,
194 WorkflowFile: ".shithub/workflows/ci.yml",
195 HeadSHA: strings.Repeat("b", 40),
196 HeadRef: "refs/heads/trunk",
197 EventKind: trigger.EventPush,
198 EventPayload: map[string]any{"ref": "refs/heads/trunk"},
199 ActorUserID: f.userID,
200 TriggerEventID: "push:concurrency-cancel-2",
201 Workflow: concurrencyWorkflow(t, "${{ shithub.ref }}", true),
202 })
203 if err != nil {
204 t.Fatalf("second Enqueue: %v", err)
205 }
206 oldRun, err := q.GetWorkflowRunByID(ctx, f.pool, first.RunID)
207 if err != nil {
208 t.Fatalf("GetWorkflowRunByID old: %v", err)
209 }
210 if oldRun.Status != actionsdb.WorkflowRunStatusCompleted ||
211 !oldRun.Conclusion.Valid ||
212 oldRun.Conclusion.CheckConclusion != actionsdb.CheckConclusionCancelled {
213 t.Fatalf("old run not cancelled: %+v", oldRun)
214 }
215 oldJobs, err := q.ListJobsForRun(ctx, f.pool, first.RunID)
216 if err != nil {
217 t.Fatalf("ListJobsForRun old: %v", err)
218 }
219 if len(oldJobs) != 1 || oldJobs[0].Status != actionsdb.WorkflowJobStatusCancelled {
220 t.Fatalf("old jobs not cancelled: %+v", oldJobs)
221 }
222 oldSteps, err := q.ListStepsForJob(ctx, f.pool, oldJobs[0].ID)
223 if err != nil {
224 t.Fatalf("ListStepsForJob old: %v", err)
225 }
226 for _, step := range oldSteps {
227 if step.Status != actionsdb.WorkflowStepStatusCancelled {
228 t.Fatalf("step %d status: got %s want cancelled", step.ID, step.Status)
229 }
230 }
231 newRun, err := q.GetWorkflowRunByID(ctx, f.pool, second.RunID)
232 if err != nil {
233 t.Fatalf("GetWorkflowRunByID new: %v", err)
234 }
235 if newRun.Status != actionsdb.WorkflowRunStatusQueued {
236 t.Fatalf("new run status: got %s want queued", newRun.Status)
237 }
238 }
239
240 func TestClaimQueuedWorkflowJob_BlocksYoungerConcurrencyRun(t *testing.T) {
241 f := setupEnq(t)
242 ctx := context.Background()
243 q := actionsdb.New()
244 first, err := trigger.Enqueue(ctx, f.deps, trigger.EnqueueParams{
245 RepoID: f.repoID,
246 WorkflowFile: ".shithub/workflows/ci.yml",
247 HeadSHA: strings.Repeat("c", 40),
248 HeadRef: "refs/heads/trunk",
249 EventKind: trigger.EventPush,
250 EventPayload: map[string]any{"ref": "refs/heads/trunk"},
251 ActorUserID: f.userID,
252 TriggerEventID: "push:concurrency-block-1",
253 Workflow: concurrencyWorkflow(t, "${{ shithub.ref }}", false),
254 })
255 if err != nil {
256 t.Fatalf("first Enqueue: %v", err)
257 }
258 second, err := trigger.Enqueue(ctx, f.deps, trigger.EnqueueParams{
259 RepoID: f.repoID,
260 WorkflowFile: ".shithub/workflows/ci.yml",
261 HeadSHA: strings.Repeat("d", 40),
262 HeadRef: "refs/heads/trunk",
263 EventKind: trigger.EventPush,
264 EventPayload: map[string]any{"ref": "refs/heads/trunk"},
265 ActorUserID: f.userID,
266 TriggerEventID: "push:concurrency-block-2",
267 Workflow: concurrencyWorkflow(t, "${{ shithub.ref }}", false),
268 })
269 if err != nil {
270 t.Fatalf("second Enqueue: %v", err)
271 }
272 runner, err := q.InsertRunner(ctx, f.pool, actionsdb.InsertRunnerParams{
273 Name: "runner-block",
274 Labels: []string{"ubuntu-latest"},
275 Capacity: 2,
276 })
277 if err != nil {
278 t.Fatalf("InsertRunner: %v", err)
279 }
280 claimed, err := q.ClaimQueuedWorkflowJob(ctx, f.pool, actionsdb.ClaimQueuedWorkflowJobParams{
281 Labels: []string{"ubuntu-latest"},
282 RunnerID: runner.ID,
283 })
284 if err != nil {
285 t.Fatalf("first ClaimQueuedWorkflowJob: %v", err)
286 }
287 if claimed.RunID != first.RunID {
288 t.Fatalf("claimed run: got %d want first run %d", claimed.RunID, first.RunID)
289 }
290 _, err = q.ClaimQueuedWorkflowJob(ctx, f.pool, actionsdb.ClaimQueuedWorkflowJobParams{
291 Labels: []string{"ubuntu-latest"},
292 RunnerID: runner.ID,
293 })
294 if !errors.Is(err, pgx.ErrNoRows) {
295 t.Fatalf("second claim error: got %v want pgx.ErrNoRows", err)
296 }
297 changed, err := q.RequestWorkflowRunCancel(ctx, f.pool, first.RunID)
298 if err != nil {
299 t.Fatalf("RequestWorkflowRunCancel: %v", err)
300 }
301 if len(changed) != 1 || !changed[0].CancelRequested {
302 t.Fatalf("cancel request did not release blocker: %+v", changed)
303 }
304 released, err := q.ClaimQueuedWorkflowJob(ctx, f.pool, actionsdb.ClaimQueuedWorkflowJobParams{
305 Labels: []string{"ubuntu-latest"},
306 RunnerID: runner.ID,
307 })
308 if err != nil {
309 t.Fatalf("claim after cancel request: %v", err)
310 }
311 if released.RunID != second.RunID {
312 t.Fatalf("released claim run: got %d want second run %d", released.RunID, second.RunID)
313 }
314 }
315
316 func TestEnqueue_IdempotentSecondCall(t *testing.T) {
317 f := setupEnq(t)
318 ctx := context.Background()
319 params := trigger.EnqueueParams{
320 RepoID: f.repoID,
321 WorkflowFile: ".shithub/workflows/ci.yml",
322 HeadSHA: strings.Repeat("b", 40),
323 HeadRef: "refs/heads/trunk",
324 EventKind: trigger.EventPush,
325 EventPayload: map[string]any{"ref": "refs/heads/trunk"},
326 ActorUserID: f.userID,
327 TriggerEventID: "push:99",
328 Workflow: fixtureWorkflow(t),
329 }
330 first, err := trigger.Enqueue(ctx, f.deps, params)
331 if err != nil {
332 t.Fatalf("first Enqueue: %v", err)
333 }
334 if first.AlreadyExists {
335 t.Fatal("first call should not be AlreadyExists")
336 }
337 second, err := trigger.Enqueue(ctx, f.deps, params)
338 if err != nil {
339 t.Fatalf("second Enqueue: %v", err)
340 }
341 if !second.AlreadyExists {
342 t.Errorf("second call should be AlreadyExists")
343 }
344 if second.RunID != first.RunID {
345 t.Errorf("second call must return the SAME run id; got %d vs %d", second.RunID, first.RunID)
346 }
347 // Verify only one row exists.
348 var count int
349 if err := f.pool.QueryRow(ctx,
350 `SELECT count(*) FROM workflow_runs WHERE repo_id=$1 AND trigger_event_id=$2`,
351 f.repoID, "push:99").Scan(&count); err != nil {
352 t.Fatalf("count query: %v", err)
353 }
354 if count != 1 {
355 t.Errorf("expected exactly 1 workflow_run, got %d", count)
356 }
357 }
358
359 func TestEnqueue_DifferentTriggerEventIDsDoNotCollide(t *testing.T) {
360 // Re-runs explicitly produce a different trigger_event_id. Verify
361 // they're allowed alongside the original.
362 f := setupEnq(t)
363 ctx := context.Background()
364 base := trigger.EnqueueParams{
365 RepoID: f.repoID,
366 WorkflowFile: ".shithub/workflows/ci.yml",
367 HeadSHA: strings.Repeat("c", 40),
368 HeadRef: "refs/heads/trunk",
369 EventKind: trigger.EventPush,
370 EventPayload: map[string]any{"ref": "refs/heads/trunk"},
371 ActorUserID: f.userID,
372 Workflow: fixtureWorkflow(t),
373 }
374 first := base
375 first.TriggerEventID = "push:1"
376 res1, err := trigger.Enqueue(ctx, f.deps, first)
377 if err != nil {
378 t.Fatalf("first Enqueue: %v", err)
379 }
380 rerun := base
381 rerun.TriggerEventID = "rerun:" + strings.Repeat("c", 40) + ":xyz"
382 rerun.ParentRunID = res1.RunID
383 res2, err := trigger.Enqueue(ctx, f.deps, rerun)
384 if err != nil {
385 t.Fatalf("rerun Enqueue: %v", err)
386 }
387 if res2.AlreadyExists {
388 t.Error("rerun should produce a new run, not AlreadyExists")
389 }
390 if res2.RunID == res1.RunID {
391 t.Error("rerun must have a different RunID")
392 }
393 }
394
395 func TestEnqueue_EmptyTriggerEventIDIsRejected(t *testing.T) {
396 f := setupEnq(t)
397 ctx := context.Background()
398 _, err := trigger.Enqueue(ctx, f.deps, trigger.EnqueueParams{
399 RepoID: f.repoID,
400 WorkflowFile: ".shithub/workflows/ci.yml",
401 HeadSHA: strings.Repeat("d", 40),
402 EventKind: trigger.EventPush,
403 EventPayload: map[string]any{},
404 Workflow: fixtureWorkflow(t),
405 TriggerEventID: "",
406 })
407 if err == nil {
408 t.Fatal("empty TriggerEventID should error — would silently bypass idempotency")
409 }
410 }
411
412 func TestEnqueue_RunIndexIsPerRepoMonotonic(t *testing.T) {
413 f := setupEnq(t)
414 ctx := context.Background()
415 mk := func(triggerID string) trigger.EnqueueParams {
416 return trigger.EnqueueParams{
417 RepoID: f.repoID,
418 WorkflowFile: ".shithub/workflows/ci.yml",
419 HeadSHA: strings.Repeat("e", 40),
420 EventKind: trigger.EventPush,
421 EventPayload: map[string]any{},
422 ActorUserID: f.userID,
423 TriggerEventID: triggerID,
424 Workflow: fixtureWorkflow(t),
425 }
426 }
427 r1, err := trigger.Enqueue(ctx, f.deps, mk("push:101"))
428 if err != nil {
429 t.Fatalf("Enqueue 1: %v", err)
430 }
431 r2, err := trigger.Enqueue(ctx, f.deps, mk("push:102"))
432 if err != nil {
433 t.Fatalf("Enqueue 2: %v", err)
434 }
435 if r2.RunIndex != r1.RunIndex+1 {
436 t.Errorf("run_index not monotonic; got r1=%d r2=%d", r1.RunIndex, r2.RunIndex)
437 }
438 }
439
440 // TestEnqueue_ChildRowsExist confirms that the per-tx run+jobs+steps
441 // insertion lands all three layers atomically (i.e., we don't end up
442 // with an orphan run with no jobs).
443 func TestEnqueue_ChildRowsExist(t *testing.T) {
444 f := setupEnq(t)
445 ctx := context.Background()
446 res, err := trigger.Enqueue(ctx, f.deps, trigger.EnqueueParams{
447 RepoID: f.repoID,
448 WorkflowFile: ".shithub/workflows/ci.yml",
449 HeadSHA: strings.Repeat("f", 40),
450 EventKind: trigger.EventPush,
451 EventPayload: map[string]any{},
452 ActorUserID: f.userID,
453 TriggerEventID: "push:200",
454 Workflow: fixtureWorkflow(t),
455 })
456 if err != nil {
457 t.Fatalf("Enqueue: %v", err)
458 }
459 q := actionsdb.New()
460 jobs, err := q.ListJobsForRun(ctx, f.pool, res.RunID)
461 if err != nil {
462 t.Fatalf("ListJobsForRun: %v", err)
463 }
464 if len(jobs) != 1 {
465 t.Fatalf("expected 1 job, got %d", len(jobs))
466 }
467 steps, err := q.ListStepsForJob(ctx, f.pool, jobs[0].ID)
468 if err != nil {
469 t.Fatalf("ListStepsForJob: %v", err)
470 }
471 if len(steps) != 2 {
472 t.Errorf("expected 2 steps (uses + run), got %d", len(steps))
473 }
474 }
475
476 // TestEnqueue_ConflictDetectsExistingRun exercises the lookup path
477 // the conflict branch takes: confirm pgx.ErrNoRows from the INSERT
478 // is correctly translated into AlreadyExists rather than bubbling
479 // out as an error.
480 func TestEnqueue_ConflictDetectsExistingRun(t *testing.T) {
481 f := setupEnq(t)
482 ctx := context.Background()
483 params := trigger.EnqueueParams{
484 RepoID: f.repoID,
485 WorkflowFile: ".shithub/workflows/ci.yml",
486 HeadSHA: strings.Repeat("9", 40),
487 EventKind: trigger.EventPush,
488 EventPayload: map[string]any{},
489 ActorUserID: f.userID,
490 TriggerEventID: "push:conflict",
491 Workflow: fixtureWorkflow(t),
492 }
493 if _, err := trigger.Enqueue(ctx, f.deps, params); err != nil {
494 t.Fatalf("first: %v", err)
495 }
496 res, err := trigger.Enqueue(ctx, f.deps, params)
497 if err != nil {
498 // Ensure the conflict path doesn't surface as ErrNoRows by
499 // accident.
500 if errors.Is(err, pgx.ErrNoRows) {
501 t.Fatal("conflict path should NOT return pgx.ErrNoRows; expected AlreadyExists")
502 }
503 t.Fatalf("conflict: %v", err)
504 }
505 if !res.AlreadyExists {
506 t.Error("expected AlreadyExists on second call")
507 }
508 }
509