tenseleyflow/shithub / 8cc4d79

Browse files

actions: add workflow retention cleanup worker

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
8cc4d79049a5a93decae9e7db288fe1bf9a5cb7d
Parents
5381fe8
Tree
59b59ed

14 changed files

StatusFile+-
M cmd/shithubd/worker.go 4 0
A internal/actions/cleanup/sweep.go 217 0
A internal/actions/cleanup/sweep_test.go 260 0
M internal/actions/queries/runner_jwt_used.sql 2 2
M internal/actions/queries/workflow_artifacts.sql 11 3
M internal/actions/queries/workflow_runs.sql 7 0
M internal/actions/queries/workflow_step_log_chunks.sql 8 0
M internal/actions/sqlc/querier.go 5 2
M internal/actions/sqlc/runner_jwt_used.sql.go 8 5
M internal/actions/sqlc/workflow_artifacts.sql.go 46 24
M internal/actions/sqlc/workflow_runs.sql.go 16 0
M internal/actions/sqlc/workflow_step_log_chunks.sql.go 17 0
M internal/infra/metrics/metrics.go 8 0
A internal/migrationsfs/migrations/0060_actions_retention_indexes.sql 23 0
cmd/shithubd/worker.gomodified
@@ -17,6 +17,7 @@ import (
17
 
17
 
18
 	"github.com/spf13/cobra"
18
 	"github.com/spf13/cobra"
19
 
19
 
20
+	"github.com/tenseleyFlow/shithub/internal/actions/cleanup"
20
 	"github.com/tenseleyFlow/shithub/internal/actions/finalize"
21
 	"github.com/tenseleyFlow/shithub/internal/actions/finalize"
21
 	"github.com/tenseleyFlow/shithub/internal/actions/trigger"
22
 	"github.com/tenseleyFlow/shithub/internal/actions/trigger"
22
 	"github.com/tenseleyFlow/shithub/internal/auth/audit"
23
 	"github.com/tenseleyFlow/shithub/internal/auth/audit"
@@ -184,6 +185,9 @@ var workerCmd = &cobra.Command{
184
 		} else {
185
 		} else {
185
 			logger.Info("actions: object storage not configured; workflow step log finalization disabled")
186
 			logger.Info("actions: object storage not configured; workflow step log finalization disabled")
186
 		}
187
 		}
188
+		p.Register(cleanup.KindWorkflowCleanup, cleanup.Handler(cleanup.Deps{
189
+			Pool: pool, ObjectStore: objectStore, Logger: logger,
190
+		}))
187
 
191
 
188
 		return p.Run(ctx)
192
 		return p.Run(ctx)
189
 	},
193
 	},
internal/actions/cleanup/sweep.goadded
@@ -0,0 +1,217 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+// Package cleanup owns background retention for Actions data that should not
4
+// live forever: hot log chunks, expired artifact metadata/blob objects,
5
+// terminal workflow runs, and consumed runner JWT audit rows.
6
+package cleanup
7
+
8
+import (
9
+	"context"
10
+	"encoding/json"
11
+	"errors"
12
+	"fmt"
13
+	"log/slog"
14
+	"strings"
15
+	"time"
16
+
17
+	"github.com/jackc/pgx/v5/pgtype"
18
+	"github.com/jackc/pgx/v5/pgxpool"
19
+
20
+	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
21
+	"github.com/tenseleyFlow/shithub/internal/infra/metrics"
22
+	"github.com/tenseleyFlow/shithub/internal/infra/storage"
23
+	"github.com/tenseleyFlow/shithub/internal/worker"
24
+)
25
+
26
+// KindWorkflowCleanup names the worker job that applies Actions retention.
27
+const KindWorkflowCleanup worker.Kind = "workflow:cleanup"
28
+
29
+const (
30
+	defaultStepLogChunkDays = 7
31
+	defaultRunDays          = 365
32
+	defaultJWTUsedDays      = 30
33
+	defaultArtifactBatch    = 1000
34
+	maxArtifactBatch        = 10000
35
+	actionsRunsPrefix       = "actions/runs/"
36
+)
37
+
38
+// Payload is the workflow:cleanup worker payload. Zero values select the
39
+// production defaults documented in docs/internal/actions-schema.md.
40
+type Payload struct {
41
+	StepLogChunkDays int `json:"step_log_chunk_days,omitempty"`
42
+	RunDays          int `json:"run_days,omitempty"`
43
+	JWTUsedDays      int `json:"jwt_used_days,omitempty"`
44
+	ArtifactBatch    int `json:"artifact_batch,omitempty"`
45
+}
46
+
47
+// Deps are the runtime dependencies for Handler.
48
+type Deps struct {
49
+	Pool        *pgxpool.Pool
50
+	ObjectStore storage.ObjectStore
51
+	Logger      *slog.Logger
52
+	Now         func() time.Time
53
+}
54
+
55
+// Result summarizes one cleanup sweep.
56
+type Result struct {
57
+	ChunksDeleted          int64
58
+	ArtifactRowsDeleted    int64
59
+	ArtifactObjectsDeleted int64
60
+	RunsDeleted            int64
61
+	JWTUsedDeleted         int64
62
+}
63
+
64
+// Handler returns the worker handler for workflow:cleanup.
65
+func Handler(deps Deps) worker.Handler {
66
+	return func(ctx context.Context, raw json.RawMessage) error {
67
+		var p Payload
68
+		if len(raw) > 0 {
69
+			if err := json.Unmarshal(raw, &p); err != nil {
70
+				return worker.PoisonError(fmt.Errorf("workflow cleanup: bad payload: %w", err))
71
+			}
72
+		}
73
+		res, err := Sweep(ctx, deps, p)
74
+		if err != nil {
75
+			return err
76
+		}
77
+		if deps.Logger != nil {
78
+			deps.Logger.InfoContext(ctx, "workflow cleanup complete",
79
+				"chunks_deleted", res.ChunksDeleted,
80
+				"artifact_rows_deleted", res.ArtifactRowsDeleted,
81
+				"artifact_objects_deleted", res.ArtifactObjectsDeleted,
82
+				"runs_deleted", res.RunsDeleted,
83
+				"jwt_used_deleted", res.JWTUsedDeleted)
84
+		}
85
+		return nil
86
+	}
87
+}
88
+
89
+// Sweep applies Actions retention once.
90
+func Sweep(ctx context.Context, deps Deps, p Payload) (Result, error) {
91
+	if deps.Pool == nil {
92
+		return Result{}, worker.PoisonError(errors.New("workflow cleanup: database pool is not configured"))
93
+	}
94
+	if err := normalizePayload(&p); err != nil {
95
+		return Result{}, worker.PoisonError(err)
96
+	}
97
+	now := time.Now().UTC()
98
+	if deps.Now != nil {
99
+		now = deps.Now().UTC()
100
+	}
101
+	q := actionsdb.New()
102
+	res := Result{}
103
+
104
+	stepCutoff := pgtype.Timestamptz{Time: now.Add(-time.Duration(p.StepLogChunkDays) * 24 * time.Hour), Valid: true}
105
+	n, err := q.DeleteStaleStepLogChunksForCleanup(ctx, deps.Pool, stepCutoff)
106
+	if err != nil {
107
+		return Result{}, fmt.Errorf("workflow cleanup: delete stale log chunks: %w", err)
108
+	}
109
+	res.ChunksDeleted = n
110
+	recordPruned("chunks", n)
111
+
112
+	artifactCutoff := pgtype.Timestamptz{Time: now, Valid: true}
113
+	artifactRes, err := pruneExpiredArtifacts(ctx, q, deps, artifactCutoff, int32(p.ArtifactBatch))
114
+	if err != nil {
115
+		return Result{}, err
116
+	}
117
+	res.ArtifactRowsDeleted = artifactRes.ArtifactRowsDeleted
118
+	res.ArtifactObjectsDeleted = artifactRes.ArtifactObjectsDeleted
119
+	recordPruned("blobs", artifactRes.ArtifactObjectsDeleted)
120
+
121
+	runCutoff := pgtype.Timestamptz{Time: now.Add(-time.Duration(p.RunDays) * 24 * time.Hour), Valid: true}
122
+	n, err = q.DeleteOldWorkflowRunsForCleanup(ctx, deps.Pool, runCutoff)
123
+	if err != nil {
124
+		return Result{}, fmt.Errorf("workflow cleanup: delete old workflow runs: %w", err)
125
+	}
126
+	res.RunsDeleted = n
127
+	recordPruned("runs", n)
128
+
129
+	jwtCutoff := pgtype.Timestamptz{Time: now.Add(-time.Duration(p.JWTUsedDays) * 24 * time.Hour), Valid: true}
130
+	n, err = q.DeleteOldRunnerJWTUsesForCleanup(ctx, deps.Pool, jwtCutoff)
131
+	if err != nil {
132
+		return Result{}, fmt.Errorf("workflow cleanup: delete old runner JWT uses: %w", err)
133
+	}
134
+	res.JWTUsedDeleted = n
135
+	recordPruned("jwt_used", n)
136
+
137
+	return res, nil
138
+}
139
+
140
+func normalizePayload(p *Payload) error {
141
+	if p.StepLogChunkDays < 0 || p.RunDays < 0 || p.JWTUsedDays < 0 || p.ArtifactBatch < 0 {
142
+		return errors.New("workflow cleanup: retention values must be non-negative")
143
+	}
144
+	if p.StepLogChunkDays == 0 {
145
+		p.StepLogChunkDays = defaultStepLogChunkDays
146
+	}
147
+	if p.RunDays == 0 {
148
+		p.RunDays = defaultRunDays
149
+	}
150
+	if p.JWTUsedDays == 0 {
151
+		p.JWTUsedDays = defaultJWTUsedDays
152
+	}
153
+	if p.ArtifactBatch == 0 {
154
+		p.ArtifactBatch = defaultArtifactBatch
155
+	}
156
+	if p.ArtifactBatch > maxArtifactBatch {
157
+		return fmt.Errorf("workflow cleanup: artifact_batch must be <= %d", maxArtifactBatch)
158
+	}
159
+	return nil
160
+}
161
+
162
+func pruneExpiredArtifacts(
163
+	ctx context.Context,
164
+	q *actionsdb.Queries,
165
+	deps Deps,
166
+	cutoff pgtype.Timestamptz,
167
+	batch int32,
168
+) (Result, error) {
169
+	if deps.ObjectStore == nil {
170
+		if deps.Logger != nil {
171
+			deps.Logger.WarnContext(ctx, "workflow cleanup: object storage not configured; skipping artifact pruning")
172
+		}
173
+		return Result{}, nil
174
+	}
175
+
176
+	var res Result
177
+	for {
178
+		rows, err := q.ListExpiredWorkflowArtifactsForCleanup(ctx, deps.Pool, actionsdb.ListExpiredWorkflowArtifactsForCleanupParams{
179
+			ExpiresAt: cutoff,
180
+			Limit:     batch,
181
+		})
182
+		if err != nil {
183
+			return Result{}, fmt.Errorf("workflow cleanup: list expired artifacts: %w", err)
184
+		}
185
+		if len(rows) == 0 {
186
+			return res, nil
187
+		}
188
+
189
+		ids := make([]int64, 0, len(rows))
190
+		for _, row := range rows {
191
+			if !strings.HasPrefix(row.ObjectKey, actionsRunsPrefix) {
192
+				return Result{}, fmt.Errorf("workflow cleanup: refusing to delete non-actions object key %q", row.ObjectKey)
193
+			}
194
+			if err := deps.ObjectStore.Delete(ctx, row.ObjectKey); err != nil {
195
+				return Result{}, fmt.Errorf("workflow cleanup: delete artifact object %q: %w", row.ObjectKey, err)
196
+			}
197
+			res.ArtifactObjectsDeleted++
198
+			ids = append(ids, row.ID)
199
+		}
200
+
201
+		deleted, err := q.DeleteWorkflowArtifactsByIDs(ctx, deps.Pool, ids)
202
+		if err != nil {
203
+			return Result{}, fmt.Errorf("workflow cleanup: delete artifact rows: %w", err)
204
+		}
205
+		res.ArtifactRowsDeleted += deleted
206
+		if len(rows) < int(batch) {
207
+			return res, nil
208
+		}
209
+	}
210
+}
211
+
212
+func recordPruned(kind string, n int64) {
213
+	if n <= 0 {
214
+		return
215
+	}
216
+	metrics.ActionsRunsPrunedTotal.WithLabelValues(kind).Add(float64(n))
217
+}
internal/actions/cleanup/sweep_test.goadded
@@ -0,0 +1,260 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package cleanup
4
+
5
+import (
6
+	"context"
7
+	"errors"
8
+	"strconv"
9
+	"strings"
10
+	"testing"
11
+	"time"
12
+
13
+	"github.com/jackc/pgx/v5"
14
+	"github.com/jackc/pgx/v5/pgtype"
15
+	"github.com/jackc/pgx/v5/pgxpool"
16
+
17
+	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
18
+	"github.com/tenseleyFlow/shithub/internal/infra/storage"
19
+	reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
20
+	"github.com/tenseleyFlow/shithub/internal/testing/dbtest"
21
+	usersdb "github.com/tenseleyFlow/shithub/internal/users/sqlc"
22
+)
23
+
24
+const cleanupFixtureHash = "$argon2id$v=19$m=16384,t=1,p=1$" +
25
+	"AAAAAAAAAAAAAAAA$" +
26
+	"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
27
+
28
+func TestSweepPrunesActionsRetentionSurfaces(t *testing.T) {
29
+	ctx := context.Background()
30
+	pool := dbtest.NewTestDB(t)
31
+	store := storage.NewMemoryStore()
32
+	now := time.Date(2026, 5, 12, 3, 30, 0, 0, time.UTC)
33
+
34
+	repoID, userID := insertCleanupRepo(t, pool)
35
+	q := actionsdb.New()
36
+
37
+	chunkRun, oldChunkStep := insertCleanupRunJobStep(t, pool, repoID, userID, 1)
38
+	setRunTerminal(t, pool, chunkRun.ID, now.Add(-90*24*time.Hour), true)
39
+	setStepTerminal(t, pool, oldChunkStep.ID, now.Add(-8*24*time.Hour))
40
+	appendChunk(t, pool, oldChunkStep.ID, 0, "old chunk")
41
+
42
+	_, recentChunkStep := insertCleanupRunJobStep(t, pool, repoID, userID, 2)
43
+	setStepTerminal(t, pool, recentChunkStep.ID, now.Add(-2*24*time.Hour))
44
+	appendChunk(t, pool, recentChunkStep.ID, 0, "recent chunk")
45
+
46
+	artifactRun, _ := insertCleanupRunJobStep(t, pool, repoID, userID, 3)
47
+	artifactKey := "actions/runs/" + strconv.FormatInt(artifactRun.ID, 10) + "/artifacts/pkg.tgz"
48
+	if _, err := store.Put(ctx, artifactKey, strings.NewReader("artifact"), storage.PutOpts{}); err != nil {
49
+		t.Fatalf("store.Put artifact: %v", err)
50
+	}
51
+	artifact, err := q.InsertArtifact(ctx, pool, actionsdb.InsertArtifactParams{
52
+		RunID:     artifactRun.ID,
53
+		Name:      "pkg.tgz",
54
+		ObjectKey: artifactKey,
55
+		ByteCount: 8,
56
+		ExpiresAt: pgtype.Timestamptz{
57
+			Time:  now.Add(-24 * time.Hour),
58
+			Valid: true,
59
+		},
60
+	})
61
+	if err != nil {
62
+		t.Fatalf("InsertArtifact: %v", err)
63
+	}
64
+
65
+	oldRun, _ := insertCleanupRunJobStep(t, pool, repoID, userID, 4)
66
+	setRunTerminal(t, pool, oldRun.ID, now.Add(-366*24*time.Hour), false)
67
+	pinnedRun, _ := insertCleanupRunJobStep(t, pool, repoID, userID, 5)
68
+	setRunTerminal(t, pool, pinnedRun.ID, now.Add(-366*24*time.Hour), true)
69
+
70
+	jwtRun, jwtJobStep := insertCleanupRunJobStep(t, pool, repoID, userID, 6)
71
+	runner, err := q.InsertRunner(ctx, pool, actionsdb.InsertRunnerParams{
72
+		Name:               "runner-retention",
73
+		Labels:             []string{"ubuntu-latest"},
74
+		Capacity:           1,
75
+		RegisteredByUserID: pgtype.Int8{Int64: userID, Valid: true},
76
+	})
77
+	if err != nil {
78
+		t.Fatalf("InsertRunner: %v", err)
79
+	}
80
+	if _, err := q.MarkRunnerJWTUsed(ctx, pool, actionsdb.MarkRunnerJWTUsedParams{
81
+		Jti:       "old-jti-000000000",
82
+		RunnerID:  runner.ID,
83
+		JobID:     jwtJobStep.JobID,
84
+		RunID:     jwtRun.ID,
85
+		RepoID:    repoID,
86
+		ExpiresAt: pgtype.Timestamptz{Time: now.Add(-31 * 24 * time.Hour), Valid: true},
87
+	}); err != nil {
88
+		t.Fatalf("MarkRunnerJWTUsed old: %v", err)
89
+	}
90
+	if _, err := q.MarkRunnerJWTUsed(ctx, pool, actionsdb.MarkRunnerJWTUsedParams{
91
+		Jti:       "recent-jti-000000",
92
+		RunnerID:  runner.ID,
93
+		JobID:     jwtJobStep.JobID,
94
+		RunID:     jwtRun.ID,
95
+		RepoID:    repoID,
96
+		ExpiresAt: pgtype.Timestamptz{Time: now.Add(-24 * time.Hour), Valid: true},
97
+	}); err != nil {
98
+		t.Fatalf("MarkRunnerJWTUsed recent: %v", err)
99
+	}
100
+
101
+	res, err := Sweep(ctx, Deps{Pool: pool, ObjectStore: store, Now: func() time.Time { return now }}, Payload{})
102
+	if err != nil {
103
+		t.Fatalf("Sweep: %v", err)
104
+	}
105
+	if res.ChunksDeleted != 1 || res.ArtifactRowsDeleted != 1 || res.ArtifactObjectsDeleted != 1 ||
106
+		res.RunsDeleted != 1 || res.JWTUsedDeleted != 1 {
107
+		t.Fatalf("unexpected cleanup result: %+v", res)
108
+	}
109
+
110
+	oldChunks, err := q.ListAllStepLogChunksForStep(ctx, pool, oldChunkStep.ID)
111
+	if err != nil {
112
+		t.Fatalf("ListAllStepLogChunksForStep old: %v", err)
113
+	}
114
+	if len(oldChunks) != 0 {
115
+		t.Fatalf("old chunks survived: %+v", oldChunks)
116
+	}
117
+	recentChunks, err := q.ListAllStepLogChunksForStep(ctx, pool, recentChunkStep.ID)
118
+	if err != nil {
119
+		t.Fatalf("ListAllStepLogChunksForStep recent: %v", err)
120
+	}
121
+	if len(recentChunks) != 1 {
122
+		t.Fatalf("recent chunks pruned: %+v", recentChunks)
123
+	}
124
+	if _, _, err := store.Get(ctx, artifactKey); !errors.Is(err, storage.ErrNotFound) {
125
+		t.Fatalf("artifact object: got %v, want not found", err)
126
+	}
127
+	if _, err := q.GetArtifactByID(ctx, pool, artifact.ID); !errors.Is(err, pgx.ErrNoRows) {
128
+		t.Fatalf("artifact row: got %v, want no rows", err)
129
+	}
130
+	if _, err := q.GetWorkflowRunByID(ctx, pool, oldRun.ID); !errors.Is(err, pgx.ErrNoRows) {
131
+		t.Fatalf("old run: got %v, want no rows", err)
132
+	}
133
+	if _, err := q.GetWorkflowRunByID(ctx, pool, pinnedRun.ID); err != nil {
134
+		t.Fatalf("pinned run pruned: %v", err)
135
+	}
136
+	assertJWTUsedExists(t, pool, "old-jti-000000000", false)
137
+	assertJWTUsedExists(t, pool, "recent-jti-000000", true)
138
+}
139
+
140
+func insertCleanupRepo(t *testing.T, pool *pgxpool.Pool) (int64, int64) {
141
+	t.Helper()
142
+	ctx := context.Background()
143
+	user, err := usersdb.New().CreateUser(ctx, pool, usersdb.CreateUserParams{
144
+		Username:     "retention-alice",
145
+		DisplayName:  "Retention Alice",
146
+		PasswordHash: cleanupFixtureHash,
147
+	})
148
+	if err != nil {
149
+		t.Fatalf("CreateUser: %v", err)
150
+	}
151
+	repo, err := reposdb.New().CreateRepo(ctx, pool, reposdb.CreateRepoParams{
152
+		OwnerUserID:   pgtype.Int8{Int64: user.ID, Valid: true},
153
+		Name:          "retention-demo",
154
+		DefaultBranch: "trunk",
155
+		Visibility:    reposdb.RepoVisibilityPublic,
156
+	})
157
+	if err != nil {
158
+		t.Fatalf("CreateRepo: %v", err)
159
+	}
160
+	return repo.ID, user.ID
161
+}
162
+
163
+func insertCleanupRunJobStep(t *testing.T, pool *pgxpool.Pool, repoID, userID, runIndex int64) (actionsdb.WorkflowRun, actionsdb.WorkflowStep) {
164
+	t.Helper()
165
+	ctx := context.Background()
166
+	q := actionsdb.New()
167
+	run, err := q.InsertWorkflowRun(ctx, pool, actionsdb.InsertWorkflowRunParams{
168
+		RepoID:       repoID,
169
+		RunIndex:     runIndex,
170
+		WorkflowFile: ".shithub/workflows/ci.yml",
171
+		WorkflowName: "ci",
172
+		HeadSha:      strings.Repeat("a", 40),
173
+		HeadRef:      "refs/heads/trunk",
174
+		Event:        actionsdb.WorkflowRunEventPush,
175
+		EventPayload: []byte(`{}`),
176
+		ActorUserID:  pgtype.Int8{Int64: userID, Valid: true},
177
+	})
178
+	if err != nil {
179
+		t.Fatalf("InsertWorkflowRun: %v", err)
180
+	}
181
+	job, err := q.InsertWorkflowJob(ctx, pool, actionsdb.InsertWorkflowJobParams{
182
+		RunID:          run.ID,
183
+		JobIndex:       0,
184
+		JobKey:         "build",
185
+		JobName:        "build",
186
+		RunsOn:         "ubuntu-latest",
187
+		NeedsJobs:      []string{},
188
+		TimeoutMinutes: 360,
189
+		Permissions:    []byte(`{}`),
190
+		JobEnv:         []byte(`{}`),
191
+	})
192
+	if err != nil {
193
+		t.Fatalf("InsertWorkflowJob: %v", err)
194
+	}
195
+	step, err := q.InsertWorkflowStep(ctx, pool, actionsdb.InsertWorkflowStepParams{
196
+		JobID:            job.ID,
197
+		StepIndex:        0,
198
+		StepName:         "test",
199
+		RunCommand:       "go test ./...",
200
+		StepEnv:          []byte(`{}`),
201
+		WorkingDirectory: "",
202
+		StepWith:         []byte(`{}`),
203
+	})
204
+	if err != nil {
205
+		t.Fatalf("InsertWorkflowStep: %v", err)
206
+	}
207
+	return run, step
208
+}
209
+
210
+func setRunTerminal(t *testing.T, pool *pgxpool.Pool, runID int64, completedAt time.Time, pinned bool) {
211
+	t.Helper()
212
+	_, err := pool.Exec(context.Background(), `
213
+		UPDATE workflow_runs
214
+		SET status = 'completed',
215
+		    conclusion = 'success',
216
+		    pinned = $2,
217
+		    started_at = $3,
218
+		    completed_at = $3,
219
+		    updated_at = now()
220
+		WHERE id = $1
221
+	`, runID, pinned, completedAt)
222
+	if err != nil {
223
+		t.Fatalf("setRunTerminal: %v", err)
224
+	}
225
+}
226
+
227
+func setStepTerminal(t *testing.T, pool *pgxpool.Pool, stepID int64, completedAt time.Time) {
228
+	t.Helper()
229
+	if _, err := actionsdb.New().UpdateWorkflowStepStatus(context.Background(), pool, actionsdb.UpdateWorkflowStepStatusParams{
230
+		ID:          stepID,
231
+		Status:      actionsdb.WorkflowStepStatusCompleted,
232
+		Conclusion:  actionsdb.NullCheckConclusion{CheckConclusion: actionsdb.CheckConclusionSuccess, Valid: true},
233
+		StartedAt:   pgtype.Timestamptz{Time: completedAt.Add(-time.Minute), Valid: true},
234
+		CompletedAt: pgtype.Timestamptz{Time: completedAt, Valid: true},
235
+	}); err != nil {
236
+		t.Fatalf("UpdateWorkflowStepStatus: %v", err)
237
+	}
238
+}
239
+
240
+func appendChunk(t *testing.T, pool *pgxpool.Pool, stepID int64, seq int32, body string) {
241
+	t.Helper()
242
+	if _, err := actionsdb.New().AppendStepLogChunk(context.Background(), pool, actionsdb.AppendStepLogChunkParams{
243
+		StepID: stepID,
244
+		Seq:    seq,
245
+		Chunk:  []byte(body),
246
+	}); err != nil {
247
+		t.Fatalf("AppendStepLogChunk: %v", err)
248
+	}
249
+}
250
+
251
+func assertJWTUsedExists(t *testing.T, pool *pgxpool.Pool, jti string, want bool) {
252
+	t.Helper()
253
+	var exists bool
254
+	if err := pool.QueryRow(context.Background(), `SELECT EXISTS(SELECT 1 FROM runner_jwt_used WHERE jti = $1)`, jti).Scan(&exists); err != nil {
255
+		t.Fatalf("query runner_jwt_used %s: %v", jti, err)
256
+	}
257
+	if exists != want {
258
+		t.Fatalf("runner_jwt_used %s exists=%t, want %t", jti, exists, want)
259
+	}
260
+}
internal/actions/queries/runner_jwt_used.sqlmodified
@@ -6,5 +6,5 @@ VALUES ($1, $2, $3, $4, $5, $6)
6
 ON CONFLICT (jti) DO NOTHING
6
 ON CONFLICT (jti) DO NOTHING
7
 RETURNING jti, runner_id, job_id, run_id, repo_id, expires_at, used_at;
7
 RETURNING jti, runner_id, job_id, run_id, repo_id, expires_at, used_at;
8
 
8
 
9
--- name: DeleteExpiredRunnerJWTUses :exec
9
+-- name: DeleteOldRunnerJWTUsesForCleanup :execrows
10
-DELETE FROM runner_jwt_used WHERE expires_at < now();
10
+DELETE FROM runner_jwt_used WHERE expires_at < $1;
internal/actions/queries/workflow_artifacts.sqlmodified
@@ -16,6 +16,14 @@ SELECT id, run_id, name, object_key, byte_count, expires_at, created_at
16
 FROM workflow_artifacts
16
 FROM workflow_artifacts
17
 WHERE id = $1;
17
 WHERE id = $1;
18
 
18
 
19
--- name: DeleteExpiredArtifacts :many
19
+-- name: ListExpiredWorkflowArtifactsForCleanup :many
20
-DELETE FROM workflow_artifacts WHERE expires_at < now()
20
+SELECT id, object_key
21
-RETURNING id, object_key;
21
+FROM workflow_artifacts
22
+WHERE expires_at < $1
23
+  AND object_key LIKE 'actions/runs/%'
24
+ORDER BY expires_at ASC, id ASC
25
+LIMIT $2;
26
+
27
+-- name: DeleteWorkflowArtifactsByIDs :execrows
28
+DELETE FROM workflow_artifacts
29
+WHERE id = ANY($1::bigint[]);
internal/actions/queries/workflow_runs.sqlmodified
@@ -150,3 +150,10 @@ SELECT workflow_file,
150
 FROM ranked
150
 FROM ranked
151
 WHERE rn = 1
151
 WHERE rn = 1
152
 ORDER BY lower(COALESCE(NULLIF(workflow_name, ''), workflow_file)), workflow_file;
152
 ORDER BY lower(COALESCE(NULLIF(workflow_name, ''), workflow_file)), workflow_file;
153
+
154
+-- name: DeleteOldWorkflowRunsForCleanup :execrows
155
+DELETE FROM workflow_runs
156
+WHERE pinned = false
157
+  AND status IN ('completed', 'cancelled')
158
+  AND completed_at IS NOT NULL
159
+  AND completed_at < $1;
internal/actions/queries/workflow_step_log_chunks.sqlmodified
@@ -38,3 +38,11 @@ WHERE id = $1;
38
 
38
 
39
 -- name: DeleteStepLogChunks :exec
39
 -- name: DeleteStepLogChunks :exec
40
 DELETE FROM workflow_step_log_chunks WHERE step_id = $1;
40
 DELETE FROM workflow_step_log_chunks WHERE step_id = $1;
41
+
42
+-- name: DeleteStaleStepLogChunksForCleanup :execrows
43
+DELETE FROM workflow_step_log_chunks c
44
+USING workflow_steps s
45
+WHERE c.step_id = s.id
46
+  AND s.status IN ('completed', 'cancelled', 'skipped')
47
+  AND s.completed_at IS NOT NULL
48
+  AND s.completed_at < $1;
internal/actions/sqlc/querier.gomodified
@@ -18,13 +18,15 @@ type Querier interface {
18
 	CompleteWorkflowRun(ctx context.Context, db DBTX, arg CompleteWorkflowRunParams) (WorkflowRun, error)
18
 	CompleteWorkflowRun(ctx context.Context, db DBTX, arg CompleteWorkflowRunParams) (WorkflowRun, error)
19
 	CountRunningJobsForRunner(ctx context.Context, db DBTX, runnerID int64) (int32, error)
19
 	CountRunningJobsForRunner(ctx context.Context, db DBTX, runnerID int64) (int32, error)
20
 	CountWorkflowRunsForRepo(ctx context.Context, db DBTX, arg CountWorkflowRunsForRepoParams) (int64, error)
20
 	CountWorkflowRunsForRepo(ctx context.Context, db DBTX, arg CountWorkflowRunsForRepoParams) (int64, error)
21
-	DeleteExpiredArtifacts(ctx context.Context, db DBTX) ([]DeleteExpiredArtifactsRow, error)
21
+	DeleteOldRunnerJWTUsesForCleanup(ctx context.Context, db DBTX, expiresAt pgtype.Timestamptz) (int64, error)
22
-	DeleteExpiredRunnerJWTUses(ctx context.Context, db DBTX) error
22
+	DeleteOldWorkflowRunsForCleanup(ctx context.Context, db DBTX, completedAt pgtype.Timestamptz) (int64, error)
23
 	DeleteOrgSecret(ctx context.Context, db DBTX, arg DeleteOrgSecretParams) error
23
 	DeleteOrgSecret(ctx context.Context, db DBTX, arg DeleteOrgSecretParams) error
24
 	DeleteOrgVariable(ctx context.Context, db DBTX, arg DeleteOrgVariableParams) error
24
 	DeleteOrgVariable(ctx context.Context, db DBTX, arg DeleteOrgVariableParams) error
25
 	DeleteRepoSecret(ctx context.Context, db DBTX, arg DeleteRepoSecretParams) error
25
 	DeleteRepoSecret(ctx context.Context, db DBTX, arg DeleteRepoSecretParams) error
26
 	DeleteRepoVariable(ctx context.Context, db DBTX, arg DeleteRepoVariableParams) error
26
 	DeleteRepoVariable(ctx context.Context, db DBTX, arg DeleteRepoVariableParams) error
27
+	DeleteStaleStepLogChunksForCleanup(ctx context.Context, db DBTX, completedAt pgtype.Timestamptz) (int64, error)
27
 	DeleteStepLogChunks(ctx context.Context, db DBTX, stepID int64) error
28
 	DeleteStepLogChunks(ctx context.Context, db DBTX, stepID int64) error
29
+	DeleteWorkflowArtifactsByIDs(ctx context.Context, db DBTX, dollar_1 []int64) (int64, error)
28
 	// Idempotent insert: if a row with the same (repo_id, workflow_file,
30
 	// Idempotent insert: if a row with the same (repo_id, workflow_file,
29
 	// trigger_event_id) already exists, returns no rows (pgx.ErrNoRows in
31
 	// trigger_event_id) already exists, returns no rows (pgx.ErrNoRows in
30
 	// Go). The handler treats that as a successful no-op so worker
32
 	// Go). The handler treats that as a successful no-op so worker
@@ -65,6 +67,7 @@ type Querier interface {
65
 	InsertWorkflowStep(ctx context.Context, db DBTX, arg InsertWorkflowStepParams) (WorkflowStep, error)
67
 	InsertWorkflowStep(ctx context.Context, db DBTX, arg InsertWorkflowStepParams) (WorkflowStep, error)
66
 	ListAllStepLogChunksForStep(ctx context.Context, db DBTX, stepID int64) ([]WorkflowStepLogChunk, error)
68
 	ListAllStepLogChunksForStep(ctx context.Context, db DBTX, stepID int64) ([]WorkflowStepLogChunk, error)
67
 	ListArtifactsForRun(ctx context.Context, db DBTX, runID int64) ([]ListArtifactsForRunRow, error)
69
 	ListArtifactsForRun(ctx context.Context, db DBTX, runID int64) ([]ListArtifactsForRunRow, error)
70
+	ListExpiredWorkflowArtifactsForCleanup(ctx context.Context, db DBTX, arg ListExpiredWorkflowArtifactsForCleanupParams) ([]ListExpiredWorkflowArtifactsForCleanupRow, error)
68
 	ListJobsForRun(ctx context.Context, db DBTX, runID int64) ([]ListJobsForRunRow, error)
71
 	ListJobsForRun(ctx context.Context, db DBTX, runID int64) ([]ListJobsForRunRow, error)
69
 	ListOrgSecrets(ctx context.Context, db DBTX, orgID pgtype.Int8) ([]ListOrgSecretsRow, error)
72
 	ListOrgSecrets(ctx context.Context, db DBTX, orgID pgtype.Int8) ([]ListOrgSecretsRow, error)
70
 	ListOrgVariables(ctx context.Context, db DBTX, orgID pgtype.Int8) ([]ListOrgVariablesRow, error)
73
 	ListOrgVariables(ctx context.Context, db DBTX, orgID pgtype.Int8) ([]ListOrgVariablesRow, error)
internal/actions/sqlc/runner_jwt_used.sql.gomodified
@@ -11,13 +11,16 @@ import (
11
 	"github.com/jackc/pgx/v5/pgtype"
11
 	"github.com/jackc/pgx/v5/pgtype"
12
 )
12
 )
13
 
13
 
14
-const deleteExpiredRunnerJWTUses = `-- name: DeleteExpiredRunnerJWTUses :exec
14
+const deleteOldRunnerJWTUsesForCleanup = `-- name: DeleteOldRunnerJWTUsesForCleanup :execrows
15
-DELETE FROM runner_jwt_used WHERE expires_at < now()
15
+DELETE FROM runner_jwt_used WHERE expires_at < $1
16
 `
16
 `
17
 
17
 
18
-func (q *Queries) DeleteExpiredRunnerJWTUses(ctx context.Context, db DBTX) error {
18
+func (q *Queries) DeleteOldRunnerJWTUsesForCleanup(ctx context.Context, db DBTX, expiresAt pgtype.Timestamptz) (int64, error) {
19
-	_, err := db.Exec(ctx, deleteExpiredRunnerJWTUses)
19
+	result, err := db.Exec(ctx, deleteOldRunnerJWTUsesForCleanup, expiresAt)
20
-	return err
20
+	if err != nil {
21
+		return 0, err
22
+	}
23
+	return result.RowsAffected(), nil
21
 }
24
 }
22
 
25
 
23
 const markRunnerJWTUsed = `-- name: MarkRunnerJWTUsed :one
26
 const markRunnerJWTUsed = `-- name: MarkRunnerJWTUsed :one
internal/actions/sqlc/workflow_artifacts.sql.gomodified
@@ -11,34 +11,17 @@ import (
11
 	"github.com/jackc/pgx/v5/pgtype"
11
 	"github.com/jackc/pgx/v5/pgtype"
12
 )
12
 )
13
 
13
 
14
-const deleteExpiredArtifacts = `-- name: DeleteExpiredArtifacts :many
14
+const deleteWorkflowArtifactsByIDs = `-- name: DeleteWorkflowArtifactsByIDs :execrows
15
-DELETE FROM workflow_artifacts WHERE expires_at < now()
15
+DELETE FROM workflow_artifacts
16
-RETURNING id, object_key
16
+WHERE id = ANY($1::bigint[])
17
 `
17
 `
18
 
18
 
19
-type DeleteExpiredArtifactsRow struct {
19
+func (q *Queries) DeleteWorkflowArtifactsByIDs(ctx context.Context, db DBTX, dollar_1 []int64) (int64, error) {
20
-	ID        int64
20
+	result, err := db.Exec(ctx, deleteWorkflowArtifactsByIDs, dollar_1)
21
-	ObjectKey string
22
-}
23
-
24
-func (q *Queries) DeleteExpiredArtifacts(ctx context.Context, db DBTX) ([]DeleteExpiredArtifactsRow, error) {
25
-	rows, err := db.Query(ctx, deleteExpiredArtifacts)
26
 	if err != nil {
21
 	if err != nil {
27
-		return nil, err
22
+		return 0, err
28
 	}
23
 	}
29
-	defer rows.Close()
24
+	return result.RowsAffected(), nil
30
-	items := []DeleteExpiredArtifactsRow{}
31
-	for rows.Next() {
32
-		var i DeleteExpiredArtifactsRow
33
-		if err := rows.Scan(&i.ID, &i.ObjectKey); err != nil {
34
-			return nil, err
35
-		}
36
-		items = append(items, i)
37
-	}
38
-	if err := rows.Err(); err != nil {
39
-		return nil, err
40
-	}
41
-	return items, nil
42
 }
25
 }
43
 
26
 
44
 const getArtifactByID = `-- name: GetArtifactByID :one
27
 const getArtifactByID = `-- name: GetArtifactByID :one
@@ -141,3 +124,42 @@ func (q *Queries) ListArtifactsForRun(ctx context.Context, db DBTX, runID int64)
141
 	}
124
 	}
142
 	return items, nil
125
 	return items, nil
143
 }
126
 }
127
+
128
+const listExpiredWorkflowArtifactsForCleanup = `-- name: ListExpiredWorkflowArtifactsForCleanup :many
129
+SELECT id, object_key
130
+FROM workflow_artifacts
131
+WHERE expires_at < $1
132
+  AND object_key LIKE 'actions/runs/%'
133
+ORDER BY expires_at ASC, id ASC
134
+LIMIT $2
135
+`
136
+
137
+type ListExpiredWorkflowArtifactsForCleanupParams struct {
138
+	ExpiresAt pgtype.Timestamptz
139
+	Limit     int32
140
+}
141
+
142
+type ListExpiredWorkflowArtifactsForCleanupRow struct {
143
+	ID        int64
144
+	ObjectKey string
145
+}
146
+
147
+func (q *Queries) ListExpiredWorkflowArtifactsForCleanup(ctx context.Context, db DBTX, arg ListExpiredWorkflowArtifactsForCleanupParams) ([]ListExpiredWorkflowArtifactsForCleanupRow, error) {
148
+	rows, err := db.Query(ctx, listExpiredWorkflowArtifactsForCleanup, arg.ExpiresAt, arg.Limit)
149
+	if err != nil {
150
+		return nil, err
151
+	}
152
+	defer rows.Close()
153
+	items := []ListExpiredWorkflowArtifactsForCleanupRow{}
154
+	for rows.Next() {
155
+		var i ListExpiredWorkflowArtifactsForCleanupRow
156
+		if err := rows.Scan(&i.ID, &i.ObjectKey); err != nil {
157
+			return nil, err
158
+		}
159
+		items = append(items, i)
160
+	}
161
+	if err := rows.Err(); err != nil {
162
+		return nil, err
163
+	}
164
+	return items, nil
165
+}
internal/actions/sqlc/workflow_runs.sql.gomodified
@@ -101,6 +101,22 @@ func (q *Queries) CountWorkflowRunsForRepo(ctx context.Context, db DBTX, arg Cou
101
 	return column_1, err
101
 	return column_1, err
102
 }
102
 }
103
 
103
 
104
+const deleteOldWorkflowRunsForCleanup = `-- name: DeleteOldWorkflowRunsForCleanup :execrows
105
+DELETE FROM workflow_runs
106
+WHERE pinned = false
107
+  AND status IN ('completed', 'cancelled')
108
+  AND completed_at IS NOT NULL
109
+  AND completed_at < $1
110
+`
111
+
112
+func (q *Queries) DeleteOldWorkflowRunsForCleanup(ctx context.Context, db DBTX, completedAt pgtype.Timestamptz) (int64, error) {
113
+	result, err := db.Exec(ctx, deleteOldWorkflowRunsForCleanup, completedAt)
114
+	if err != nil {
115
+		return 0, err
116
+	}
117
+	return result.RowsAffected(), nil
118
+}
119
+
104
 const enqueueWorkflowRun = `-- name: EnqueueWorkflowRun :one
120
 const enqueueWorkflowRun = `-- name: EnqueueWorkflowRun :one
105
 INSERT INTO workflow_runs (
121
 INSERT INTO workflow_runs (
106
     repo_id, run_index, workflow_file, workflow_name,
122
     repo_id, run_index, workflow_file, workflow_name,
internal/actions/sqlc/workflow_step_log_chunks.sql.gomodified
@@ -45,6 +45,23 @@ func (q *Queries) AppendStepLogChunk(ctx context.Context, db DBTX, arg AppendSte
45
 	return i, err
45
 	return i, err
46
 }
46
 }
47
 
47
 
48
+const deleteStaleStepLogChunksForCleanup = `-- name: DeleteStaleStepLogChunksForCleanup :execrows
49
+DELETE FROM workflow_step_log_chunks c
50
+USING workflow_steps s
51
+WHERE c.step_id = s.id
52
+  AND s.status IN ('completed', 'cancelled', 'skipped')
53
+  AND s.completed_at IS NOT NULL
54
+  AND s.completed_at < $1
55
+`
56
+
57
+func (q *Queries) DeleteStaleStepLogChunksForCleanup(ctx context.Context, db DBTX, completedAt pgtype.Timestamptz) (int64, error) {
58
+	result, err := db.Exec(ctx, deleteStaleStepLogChunksForCleanup, completedAt)
59
+	if err != nil {
60
+		return 0, err
61
+	}
62
+	return result.RowsAffected(), nil
63
+}
64
+
48
 const deleteStepLogChunks = `-- name: DeleteStepLogChunks :exec
65
 const deleteStepLogChunks = `-- name: DeleteStepLogChunks :exec
49
 DELETE FROM workflow_step_log_chunks WHERE step_id = $1
66
 DELETE FROM workflow_step_log_chunks WHERE step_id = $1
50
 `
67
 `
internal/infra/metrics/metrics.gomodified
@@ -163,6 +163,13 @@ var (
163
 		},
163
 		},
164
 		[]string{"location"},
164
 		[]string{"location"},
165
 	)
165
 	)
166
+	ActionsRunsPrunedTotal = prometheus.NewCounterVec(
167
+		prometheus.CounterOpts{
168
+			Name: "shithub_actions_runs_pruned_total",
169
+			Help: "Total Actions retention deletions by kind (chunks, blobs, runs, jwt_used).",
170
+		},
171
+		[]string{"kind"},
172
+	)
166
 )
173
 )
167
 
174
 
168
 func init() {
175
 func init() {
@@ -185,6 +192,7 @@ func init() {
185
 		ActionsRunnerJWTTotal,
192
 		ActionsRunnerJWTTotal,
186
 		ActionsJobsCancelledTotal,
193
 		ActionsJobsCancelledTotal,
187
 		ActionsLogScrubReplacementsTotal,
194
 		ActionsLogScrubReplacementsTotal,
195
+		ActionsRunsPrunedTotal,
188
 	)
196
 	)
189
 }
197
 }
190
 
198
 
internal/migrationsfs/migrations/0060_actions_retention_indexes.sqladded
@@ -0,0 +1,23 @@
1
+-- SPDX-License-Identifier: AGPL-3.0-or-later
2
+--
3
+-- S41g retention sweep support. The cleanup worker walks terminal
4
+-- Actions rows by completion time, so give it narrow indexes that do
5
+-- not bloat the hot queued/running paths.
6
+
7
+-- +goose Up
8
+
9
+CREATE INDEX workflow_steps_retention_idx
10
+    ON workflow_steps (completed_at)
11
+    WHERE completed_at IS NOT NULL
12
+      AND status IN ('completed', 'cancelled', 'skipped');
13
+
14
+CREATE INDEX workflow_runs_retention_idx
15
+    ON workflow_runs (completed_at)
16
+    WHERE pinned = false
17
+      AND completed_at IS NOT NULL
18
+      AND status IN ('completed', 'cancelled');
19
+
20
+-- +goose Down
21
+
22
+DROP INDEX IF EXISTS workflow_runs_retention_idx;
23
+DROP INDEX IF EXISTS workflow_steps_retention_idx;