tenseleyflow/shithub / 26a859a

Browse files

actions/trigger: Enqueue test suite — 7 cases covering happy path, idempotency, re-runs (S41b)

- TestEnqueue_HappyPath: fresh run lands with jobs+steps+check_run.
- TestEnqueue_IdempotentSecondCall: same trigger_event_id replayed
returns AlreadyExists, same RunID, single workflow_runs row.
- TestEnqueue_DifferentTriggerEventIDsDoNotCollide: rerun with a
different trigger_event_id + parent_run_id produces a new run.
- TestEnqueue_EmptyTriggerEventIDIsRejected: validation catches the
silent-bypass-idempotency footgun.
- TestEnqueue_RunIndexIsPerRepoMonotonic: r2.RunIndex == r1+1.
- TestEnqueue_ChildRowsExist: the per-tx run+jobs+steps insertion
lands all three layers atomically (no orphan runs).
- TestEnqueue_ConflictDetectsExistingRun: pgx.ErrNoRows from the
ON CONFLICT path is correctly translated to AlreadyExists rather
than bubbling out as an error.
Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
26a859a49992d506e51f7dbb9b81a0a3f787fb44
Parents
4e2aeeb
Tree
8aa8167

1 changed file

StatusFile+-
A internal/actions/trigger/enqueue_test.go 319 0
internal/actions/trigger/enqueue_test.goadded
@@ -0,0 +1,319 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package trigger_test
4
+
5
+import (
6
+	"context"
7
+	"errors"
8
+	"io"
9
+	"log/slog"
10
+	"strings"
11
+	"testing"
12
+
13
+	"github.com/jackc/pgx/v5"
14
+	"github.com/jackc/pgx/v5/pgtype"
15
+	"github.com/jackc/pgx/v5/pgxpool"
16
+
17
+	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
18
+	"github.com/tenseleyFlow/shithub/internal/actions/trigger"
19
+	"github.com/tenseleyFlow/shithub/internal/actions/workflow"
20
+	reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
21
+	"github.com/tenseleyFlow/shithub/internal/testing/dbtest"
22
+	usersdb "github.com/tenseleyFlow/shithub/internal/users/sqlc"
23
+)
24
+
25
+const enqFixtureHash = "$argon2id$v=19$m=16384,t=1,p=1$" +
26
+	"AAAAAAAAAAAAAAAA$" +
27
+	"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
28
+
29
+type enqFx struct {
30
+	pool   *pgxpool.Pool
31
+	deps   trigger.Deps
32
+	repoID int64
33
+	userID int64
34
+}
35
+
36
+func setupEnq(t *testing.T) enqFx {
37
+	t.Helper()
38
+	pool := dbtest.NewTestDB(t)
39
+	ctx := context.Background()
40
+	user, err := usersdb.New().CreateUser(ctx, pool, usersdb.CreateUserParams{
41
+		Username: "alice", DisplayName: "Alice", PasswordHash: enqFixtureHash,
42
+	})
43
+	if err != nil {
44
+		t.Fatalf("CreateUser: %v", err)
45
+	}
46
+	repo, err := reposdb.New().CreateRepo(ctx, pool, reposdb.CreateRepoParams{
47
+		OwnerUserID:   pgtype.Int8{Int64: user.ID, Valid: true},
48
+		Name:          "demo",
49
+		DefaultBranch: "trunk",
50
+		Visibility:    reposdb.RepoVisibilityPublic,
51
+	})
52
+	if err != nil {
53
+		t.Fatalf("CreateRepo: %v", err)
54
+	}
55
+	return enqFx{
56
+		pool:   pool,
57
+		deps:   trigger.Deps{Pool: pool, Logger: slog.New(slog.NewTextHandler(io.Discard, nil))},
58
+		repoID: repo.ID,
59
+		userID: user.ID,
60
+	}
61
+}
62
+
63
+// fixtureWorkflow returns a small valid Workflow with one job + two
64
+// steps. Used by every enqueue test.
65
+func fixtureWorkflow(t *testing.T) *workflow.Workflow {
66
+	t.Helper()
67
+	src := []byte(`name: ci
68
+on: push
69
+jobs:
70
+  build:
71
+    runs-on: ubuntu-latest
72
+    steps:
73
+      - uses: actions/checkout@v4
74
+      - run: echo hello
75
+`)
76
+	w, diags, err := workflow.Parse(src)
77
+	if err != nil {
78
+		t.Fatalf("parse fixture: %v", err)
79
+	}
80
+	for _, d := range diags {
81
+		if d.Severity == workflow.Error {
82
+			t.Fatalf("unexpected diagnostic: %v", d)
83
+		}
84
+	}
85
+	return w
86
+}
87
+
88
+func TestEnqueue_HappyPath(t *testing.T) {
89
+	f := setupEnq(t)
90
+	ctx := context.Background()
91
+	res, err := trigger.Enqueue(ctx, f.deps, trigger.EnqueueParams{
92
+		RepoID:         f.repoID,
93
+		WorkflowFile:   ".shithub/workflows/ci.yml",
94
+		HeadSHA:        strings.Repeat("a", 40),
95
+		HeadRef:        "refs/heads/trunk",
96
+		EventKind:      trigger.EventPush,
97
+		EventPayload:   map[string]any{"ref": "refs/heads/trunk"},
98
+		ActorUserID:    f.userID,
99
+		TriggerEventID: "push:42",
100
+		Workflow:       fixtureWorkflow(t),
101
+	})
102
+	if err != nil {
103
+		t.Fatalf("Enqueue: %v", err)
104
+	}
105
+	if res.RunID == 0 || res.RunIndex == 0 || res.AlreadyExists {
106
+		t.Errorf("expected fresh run, got %+v", res)
107
+	}
108
+	// One job, so one check_run.
109
+	if len(res.CheckRunIDs) != 1 {
110
+		t.Errorf("expected 1 check_run, got %d", len(res.CheckRunIDs))
111
+	}
112
+
113
+	// Verify rows landed.
114
+	q := actionsdb.New()
115
+	run, err := q.GetWorkflowRunByID(ctx, f.pool, res.RunID)
116
+	if err != nil {
117
+		t.Fatalf("GetWorkflowRunByID: %v", err)
118
+	}
119
+	if run.TriggerEventID != "push:42" {
120
+		t.Errorf("trigger_event_id: got %q want push:42", run.TriggerEventID)
121
+	}
122
+	if run.Status != actionsdb.WorkflowRunStatusQueued {
123
+		t.Errorf("status: got %s want queued", run.Status)
124
+	}
125
+}
126
+
127
+func TestEnqueue_IdempotentSecondCall(t *testing.T) {
128
+	f := setupEnq(t)
129
+	ctx := context.Background()
130
+	params := trigger.EnqueueParams{
131
+		RepoID:         f.repoID,
132
+		WorkflowFile:   ".shithub/workflows/ci.yml",
133
+		HeadSHA:        strings.Repeat("b", 40),
134
+		HeadRef:        "refs/heads/trunk",
135
+		EventKind:      trigger.EventPush,
136
+		EventPayload:   map[string]any{"ref": "refs/heads/trunk"},
137
+		ActorUserID:    f.userID,
138
+		TriggerEventID: "push:99",
139
+		Workflow:       fixtureWorkflow(t),
140
+	}
141
+	first, err := trigger.Enqueue(ctx, f.deps, params)
142
+	if err != nil {
143
+		t.Fatalf("first Enqueue: %v", err)
144
+	}
145
+	if first.AlreadyExists {
146
+		t.Fatal("first call should not be AlreadyExists")
147
+	}
148
+	second, err := trigger.Enqueue(ctx, f.deps, params)
149
+	if err != nil {
150
+		t.Fatalf("second Enqueue: %v", err)
151
+	}
152
+	if !second.AlreadyExists {
153
+		t.Errorf("second call should be AlreadyExists")
154
+	}
155
+	if second.RunID != first.RunID {
156
+		t.Errorf("second call must return the SAME run id; got %d vs %d", second.RunID, first.RunID)
157
+	}
158
+	// Verify only one row exists.
159
+	var count int
160
+	if err := f.pool.QueryRow(ctx,
161
+		`SELECT count(*) FROM workflow_runs WHERE repo_id=$1 AND trigger_event_id=$2`,
162
+		f.repoID, "push:99").Scan(&count); err != nil {
163
+		t.Fatalf("count query: %v", err)
164
+	}
165
+	if count != 1 {
166
+		t.Errorf("expected exactly 1 workflow_run, got %d", count)
167
+	}
168
+}
169
+
170
+func TestEnqueue_DifferentTriggerEventIDsDoNotCollide(t *testing.T) {
171
+	// Re-runs explicitly produce a different trigger_event_id. Verify
172
+	// they're allowed alongside the original.
173
+	f := setupEnq(t)
174
+	ctx := context.Background()
175
+	base := trigger.EnqueueParams{
176
+		RepoID:       f.repoID,
177
+		WorkflowFile: ".shithub/workflows/ci.yml",
178
+		HeadSHA:      strings.Repeat("c", 40),
179
+		HeadRef:      "refs/heads/trunk",
180
+		EventKind:    trigger.EventPush,
181
+		EventPayload: map[string]any{"ref": "refs/heads/trunk"},
182
+		ActorUserID:  f.userID,
183
+		Workflow:     fixtureWorkflow(t),
184
+	}
185
+	first := base
186
+	first.TriggerEventID = "push:1"
187
+	res1, err := trigger.Enqueue(ctx, f.deps, first)
188
+	if err != nil {
189
+		t.Fatalf("first Enqueue: %v", err)
190
+	}
191
+	rerun := base
192
+	rerun.TriggerEventID = "rerun:" + strings.Repeat("c", 40) + ":xyz"
193
+	rerun.ParentRunID = res1.RunID
194
+	res2, err := trigger.Enqueue(ctx, f.deps, rerun)
195
+	if err != nil {
196
+		t.Fatalf("rerun Enqueue: %v", err)
197
+	}
198
+	if res2.AlreadyExists {
199
+		t.Error("rerun should produce a new run, not AlreadyExists")
200
+	}
201
+	if res2.RunID == res1.RunID {
202
+		t.Error("rerun must have a different RunID")
203
+	}
204
+}
205
+
206
+func TestEnqueue_EmptyTriggerEventIDIsRejected(t *testing.T) {
207
+	f := setupEnq(t)
208
+	ctx := context.Background()
209
+	_, err := trigger.Enqueue(ctx, f.deps, trigger.EnqueueParams{
210
+		RepoID:         f.repoID,
211
+		WorkflowFile:   ".shithub/workflows/ci.yml",
212
+		HeadSHA:        strings.Repeat("d", 40),
213
+		EventKind:      trigger.EventPush,
214
+		EventPayload:   map[string]any{},
215
+		Workflow:       fixtureWorkflow(t),
216
+		TriggerEventID: "",
217
+	})
218
+	if err == nil {
219
+		t.Fatal("empty TriggerEventID should error — would silently bypass idempotency")
220
+	}
221
+}
222
+
223
+func TestEnqueue_RunIndexIsPerRepoMonotonic(t *testing.T) {
224
+	f := setupEnq(t)
225
+	ctx := context.Background()
226
+	mk := func(triggerID string) trigger.EnqueueParams {
227
+		return trigger.EnqueueParams{
228
+			RepoID:         f.repoID,
229
+			WorkflowFile:   ".shithub/workflows/ci.yml",
230
+			HeadSHA:        strings.Repeat("e", 40),
231
+			EventKind:      trigger.EventPush,
232
+			EventPayload:   map[string]any{},
233
+			ActorUserID:    f.userID,
234
+			TriggerEventID: triggerID,
235
+			Workflow:       fixtureWorkflow(t),
236
+		}
237
+	}
238
+	r1, err := trigger.Enqueue(ctx, f.deps, mk("push:101"))
239
+	if err != nil {
240
+		t.Fatalf("Enqueue 1: %v", err)
241
+	}
242
+	r2, err := trigger.Enqueue(ctx, f.deps, mk("push:102"))
243
+	if err != nil {
244
+		t.Fatalf("Enqueue 2: %v", err)
245
+	}
246
+	if r2.RunIndex != r1.RunIndex+1 {
247
+		t.Errorf("run_index not monotonic; got r1=%d r2=%d", r1.RunIndex, r2.RunIndex)
248
+	}
249
+}
250
+
251
+// TestEnqueue_ChildRowsExist confirms that the per-tx run+jobs+steps
252
+// insertion lands all three layers atomically (i.e., we don't end up
253
+// with an orphan run with no jobs).
254
+func TestEnqueue_ChildRowsExist(t *testing.T) {
255
+	f := setupEnq(t)
256
+	ctx := context.Background()
257
+	res, err := trigger.Enqueue(ctx, f.deps, trigger.EnqueueParams{
258
+		RepoID:         f.repoID,
259
+		WorkflowFile:   ".shithub/workflows/ci.yml",
260
+		HeadSHA:        strings.Repeat("f", 40),
261
+		EventKind:      trigger.EventPush,
262
+		EventPayload:   map[string]any{},
263
+		ActorUserID:    f.userID,
264
+		TriggerEventID: "push:200",
265
+		Workflow:       fixtureWorkflow(t),
266
+	})
267
+	if err != nil {
268
+		t.Fatalf("Enqueue: %v", err)
269
+	}
270
+	q := actionsdb.New()
271
+	jobs, err := q.ListJobsForRun(ctx, f.pool, res.RunID)
272
+	if err != nil {
273
+		t.Fatalf("ListJobsForRun: %v", err)
274
+	}
275
+	if len(jobs) != 1 {
276
+		t.Fatalf("expected 1 job, got %d", len(jobs))
277
+	}
278
+	steps, err := q.ListStepsForJob(ctx, f.pool, jobs[0].ID)
279
+	if err != nil {
280
+		t.Fatalf("ListStepsForJob: %v", err)
281
+	}
282
+	if len(steps) != 2 {
283
+		t.Errorf("expected 2 steps (uses + run), got %d", len(steps))
284
+	}
285
+}
286
+
287
+// TestEnqueue_ConflictDetectsExistingRun exercises the lookup path
288
+// the conflict branch takes: confirm pgx.ErrNoRows from the INSERT
289
+// is correctly translated into AlreadyExists rather than bubbling
290
+// out as an error.
291
+func TestEnqueue_ConflictDetectsExistingRun(t *testing.T) {
292
+	f := setupEnq(t)
293
+	ctx := context.Background()
294
+	params := trigger.EnqueueParams{
295
+		RepoID:         f.repoID,
296
+		WorkflowFile:   ".shithub/workflows/ci.yml",
297
+		HeadSHA:        strings.Repeat("9", 40),
298
+		EventKind:      trigger.EventPush,
299
+		EventPayload:   map[string]any{},
300
+		ActorUserID:    f.userID,
301
+		TriggerEventID: "push:conflict",
302
+		Workflow:       fixtureWorkflow(t),
303
+	}
304
+	if _, err := trigger.Enqueue(ctx, f.deps, params); err != nil {
305
+		t.Fatalf("first: %v", err)
306
+	}
307
+	res, err := trigger.Enqueue(ctx, f.deps, params)
308
+	if err != nil {
309
+		// Ensure the conflict path doesn't surface as ErrNoRows by
310
+		// accident.
311
+		if errors.Is(err, pgx.ErrNoRows) {
312
+			t.Fatal("conflict path should NOT return pgx.ErrNoRows; expected AlreadyExists")
313
+		}
314
+		t.Fatalf("conflict: %v", err)
315
+	}
316
+	if !res.AlreadyExists {
317
+		t.Error("expected AlreadyExists on second call")
318
+	}
319
+}