Go · 8963 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 package cleanup
4
5 import (
6 "context"
7 "errors"
8 "strconv"
9 "strings"
10 "testing"
11 "time"
12
13 "github.com/jackc/pgx/v5"
14 "github.com/jackc/pgx/v5/pgtype"
15 "github.com/jackc/pgx/v5/pgxpool"
16
17 actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
18 "github.com/tenseleyFlow/shithub/internal/infra/storage"
19 reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
20 "github.com/tenseleyFlow/shithub/internal/testing/dbtest"
21 usersdb "github.com/tenseleyFlow/shithub/internal/users/sqlc"
22 )
23
24 const cleanupFixtureHash = "$argon2id$v=19$m=16384,t=1,p=1$" +
25 "AAAAAAAAAAAAAAAA$" +
26 "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
27
28 func TestSweepPrunesActionsRetentionSurfaces(t *testing.T) {
29 ctx := context.Background()
30 pool := dbtest.NewTestDB(t)
31 store := storage.NewMemoryStore()
32 now := time.Date(2026, 5, 12, 3, 30, 0, 0, time.UTC)
33
34 repoID, userID := insertCleanupRepo(t, pool)
35 q := actionsdb.New()
36
37 chunkRun, oldChunkStep := insertCleanupRunJobStep(t, pool, repoID, userID, 1)
38 setRunTerminal(t, pool, chunkRun.ID, now.Add(-90*24*time.Hour), true)
39 setStepTerminal(t, pool, oldChunkStep.ID, now.Add(-8*24*time.Hour))
40 appendChunk(t, pool, oldChunkStep.ID, 0, "old chunk")
41
42 _, recentChunkStep := insertCleanupRunJobStep(t, pool, repoID, userID, 2)
43 setStepTerminal(t, pool, recentChunkStep.ID, now.Add(-2*24*time.Hour))
44 appendChunk(t, pool, recentChunkStep.ID, 0, "recent chunk")
45
46 artifactRun, _ := insertCleanupRunJobStep(t, pool, repoID, userID, 3)
47 artifactKey := "actions/runs/" + strconv.FormatInt(artifactRun.ID, 10) + "/artifacts/pkg.tgz"
48 if _, err := store.Put(ctx, artifactKey, strings.NewReader("artifact"), storage.PutOpts{}); err != nil {
49 t.Fatalf("store.Put artifact: %v", err)
50 }
51 artifact, err := q.InsertArtifact(ctx, pool, actionsdb.InsertArtifactParams{
52 RunID: artifactRun.ID,
53 Name: "pkg.tgz",
54 ObjectKey: artifactKey,
55 ByteCount: 8,
56 ExpiresAt: pgtype.Timestamptz{
57 Time: now.Add(-24 * time.Hour),
58 Valid: true,
59 },
60 })
61 if err != nil {
62 t.Fatalf("InsertArtifact: %v", err)
63 }
64
65 oldRun, _ := insertCleanupRunJobStep(t, pool, repoID, userID, 4)
66 setRunTerminal(t, pool, oldRun.ID, now.Add(-366*24*time.Hour), false)
67 pinnedRun, _ := insertCleanupRunJobStep(t, pool, repoID, userID, 5)
68 setRunTerminal(t, pool, pinnedRun.ID, now.Add(-366*24*time.Hour), true)
69
70 jwtRun, jwtJobStep := insertCleanupRunJobStep(t, pool, repoID, userID, 6)
71 runner, err := q.InsertRunner(ctx, pool, actionsdb.InsertRunnerParams{
72 Name: "runner-retention",
73 Labels: []string{"ubuntu-latest"},
74 Capacity: 1,
75 RegisteredByUserID: pgtype.Int8{Int64: userID, Valid: true},
76 })
77 if err != nil {
78 t.Fatalf("InsertRunner: %v", err)
79 }
80 if _, err := q.MarkRunnerJWTUsed(ctx, pool, actionsdb.MarkRunnerJWTUsedParams{
81 Jti: "old-jti-000000000",
82 RunnerID: runner.ID,
83 JobID: jwtJobStep.JobID,
84 RunID: jwtRun.ID,
85 RepoID: repoID,
86 ExpiresAt: pgtype.Timestamptz{Time: now.Add(-31 * 24 * time.Hour), Valid: true},
87 }); err != nil {
88 t.Fatalf("MarkRunnerJWTUsed old: %v", err)
89 }
90 if _, err := q.MarkRunnerJWTUsed(ctx, pool, actionsdb.MarkRunnerJWTUsedParams{
91 Jti: "recent-jti-000000",
92 RunnerID: runner.ID,
93 JobID: jwtJobStep.JobID,
94 RunID: jwtRun.ID,
95 RepoID: repoID,
96 ExpiresAt: pgtype.Timestamptz{Time: now.Add(-24 * time.Hour), Valid: true},
97 }); err != nil {
98 t.Fatalf("MarkRunnerJWTUsed recent: %v", err)
99 }
100
101 res, err := Sweep(ctx, Deps{Pool: pool, ObjectStore: store, Now: func() time.Time { return now }}, Payload{})
102 if err != nil {
103 t.Fatalf("Sweep: %v", err)
104 }
105 if res.ChunksDeleted != 1 || res.ArtifactRowsDeleted != 1 || res.ArtifactObjectsDeleted != 1 ||
106 res.RunsDeleted != 1 || res.JWTUsedDeleted != 1 {
107 t.Fatalf("unexpected cleanup result: %+v", res)
108 }
109
110 oldChunks, err := q.ListAllStepLogChunksForStep(ctx, pool, oldChunkStep.ID)
111 if err != nil {
112 t.Fatalf("ListAllStepLogChunksForStep old: %v", err)
113 }
114 if len(oldChunks) != 0 {
115 t.Fatalf("old chunks survived: %+v", oldChunks)
116 }
117 recentChunks, err := q.ListAllStepLogChunksForStep(ctx, pool, recentChunkStep.ID)
118 if err != nil {
119 t.Fatalf("ListAllStepLogChunksForStep recent: %v", err)
120 }
121 if len(recentChunks) != 1 {
122 t.Fatalf("recent chunks pruned: %+v", recentChunks)
123 }
124 if _, _, err := store.Get(ctx, artifactKey); !errors.Is(err, storage.ErrNotFound) {
125 t.Fatalf("artifact object: got %v, want not found", err)
126 }
127 if _, err := q.GetArtifactByID(ctx, pool, artifact.ID); !errors.Is(err, pgx.ErrNoRows) {
128 t.Fatalf("artifact row: got %v, want no rows", err)
129 }
130 if _, err := q.GetWorkflowRunByID(ctx, pool, oldRun.ID); !errors.Is(err, pgx.ErrNoRows) {
131 t.Fatalf("old run: got %v, want no rows", err)
132 }
133 if _, err := q.GetWorkflowRunByID(ctx, pool, pinnedRun.ID); err != nil {
134 t.Fatalf("pinned run pruned: %v", err)
135 }
136 assertJWTUsedExists(t, pool, "old-jti-000000000", false)
137 assertJWTUsedExists(t, pool, "recent-jti-000000", true)
138 }
139
140 func insertCleanupRepo(t *testing.T, pool *pgxpool.Pool) (int64, int64) {
141 t.Helper()
142 ctx := context.Background()
143 user, err := usersdb.New().CreateUser(ctx, pool, usersdb.CreateUserParams{
144 Username: "retention-alice",
145 DisplayName: "Retention Alice",
146 PasswordHash: cleanupFixtureHash,
147 })
148 if err != nil {
149 t.Fatalf("CreateUser: %v", err)
150 }
151 repo, err := reposdb.New().CreateRepo(ctx, pool, reposdb.CreateRepoParams{
152 OwnerUserID: pgtype.Int8{Int64: user.ID, Valid: true},
153 Name: "retention-demo",
154 DefaultBranch: "trunk",
155 Visibility: reposdb.RepoVisibilityPublic,
156 })
157 if err != nil {
158 t.Fatalf("CreateRepo: %v", err)
159 }
160 return repo.ID, user.ID
161 }
162
163 func insertCleanupRunJobStep(t *testing.T, pool *pgxpool.Pool, repoID, userID, runIndex int64) (actionsdb.WorkflowRun, actionsdb.WorkflowStep) {
164 t.Helper()
165 ctx := context.Background()
166 q := actionsdb.New()
167 run, err := q.InsertWorkflowRun(ctx, pool, actionsdb.InsertWorkflowRunParams{
168 RepoID: repoID,
169 RunIndex: runIndex,
170 WorkflowFile: ".shithub/workflows/ci.yml",
171 WorkflowName: "ci",
172 HeadSha: strings.Repeat("a", 40),
173 HeadRef: "refs/heads/trunk",
174 Event: actionsdb.WorkflowRunEventPush,
175 EventPayload: []byte(`{}`),
176 ActorUserID: pgtype.Int8{Int64: userID, Valid: true},
177 })
178 if err != nil {
179 t.Fatalf("InsertWorkflowRun: %v", err)
180 }
181 job, err := q.InsertWorkflowJob(ctx, pool, actionsdb.InsertWorkflowJobParams{
182 RunID: run.ID,
183 JobIndex: 0,
184 JobKey: "build",
185 JobName: "build",
186 RunsOn: "ubuntu-latest",
187 NeedsJobs: []string{},
188 TimeoutMinutes: 360,
189 Permissions: []byte(`{}`),
190 JobEnv: []byte(`{}`),
191 })
192 if err != nil {
193 t.Fatalf("InsertWorkflowJob: %v", err)
194 }
195 step, err := q.InsertWorkflowStep(ctx, pool, actionsdb.InsertWorkflowStepParams{
196 JobID: job.ID,
197 StepIndex: 0,
198 StepName: "test",
199 RunCommand: "go test ./...",
200 StepEnv: []byte(`{}`),
201 WorkingDirectory: "",
202 StepWith: []byte(`{}`),
203 })
204 if err != nil {
205 t.Fatalf("InsertWorkflowStep: %v", err)
206 }
207 return run, step
208 }
209
210 func setRunTerminal(t *testing.T, pool *pgxpool.Pool, runID int64, completedAt time.Time, pinned bool) {
211 t.Helper()
212 _, err := pool.Exec(context.Background(), `
213 UPDATE workflow_runs
214 SET status = 'completed',
215 conclusion = 'success',
216 pinned = $2,
217 started_at = $3,
218 completed_at = $3,
219 updated_at = now()
220 WHERE id = $1
221 `, runID, pinned, completedAt)
222 if err != nil {
223 t.Fatalf("setRunTerminal: %v", err)
224 }
225 }
226
227 func setStepTerminal(t *testing.T, pool *pgxpool.Pool, stepID int64, completedAt time.Time) {
228 t.Helper()
229 if _, err := actionsdb.New().UpdateWorkflowStepStatus(context.Background(), pool, actionsdb.UpdateWorkflowStepStatusParams{
230 ID: stepID,
231 Status: actionsdb.WorkflowStepStatusCompleted,
232 Conclusion: actionsdb.NullCheckConclusion{CheckConclusion: actionsdb.CheckConclusionSuccess, Valid: true},
233 StartedAt: pgtype.Timestamptz{Time: completedAt.Add(-time.Minute), Valid: true},
234 CompletedAt: pgtype.Timestamptz{Time: completedAt, Valid: true},
235 }); err != nil {
236 t.Fatalf("UpdateWorkflowStepStatus: %v", err)
237 }
238 }
239
240 func appendChunk(t *testing.T, pool *pgxpool.Pool, stepID int64, seq int32, body string) {
241 t.Helper()
242 if _, err := actionsdb.New().AppendStepLogChunk(context.Background(), pool, actionsdb.AppendStepLogChunkParams{
243 StepID: stepID,
244 Seq: seq,
245 Chunk: []byte(body),
246 }); err != nil {
247 t.Fatalf("AppendStepLogChunk: %v", err)
248 }
249 }
250
251 func assertJWTUsedExists(t *testing.T, pool *pgxpool.Pool, jti string, want bool) {
252 t.Helper()
253 var exists bool
254 if err := pool.QueryRow(context.Background(), `SELECT EXISTS(SELECT 1 FROM runner_jwt_used WHERE jti = $1)`, jti).Scan(&exists); err != nil {
255 t.Fatalf("query runner_jwt_used %s: %v", jti, err)
256 }
257 if exists != want {
258 t.Fatalf("runner_jwt_used %s exists=%t, want %t", jti, exists, want)
259 }
260 }
261