Go · 6664 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 // Package cleanup owns background retention for Actions data that should not
4 // live forever: hot log chunks, expired artifact metadata/blob objects,
5 // terminal workflow runs, and consumed runner JWT audit rows.
6 package cleanup
7
8 import (
9 "context"
10 "encoding/json"
11 "errors"
12 "fmt"
13 "log/slog"
14 "strings"
15 "time"
16
17 "github.com/jackc/pgx/v5/pgtype"
18 "github.com/jackc/pgx/v5/pgxpool"
19
20 actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
21 "github.com/tenseleyFlow/shithub/internal/infra/metrics"
22 "github.com/tenseleyFlow/shithub/internal/infra/storage"
23 "github.com/tenseleyFlow/shithub/internal/worker"
24 )
25
26 // KindWorkflowCleanup names the worker job that applies Actions retention.
27 const KindWorkflowCleanup worker.Kind = "workflow:cleanup"
28
29 const (
30 defaultStepLogChunkDays = 7
31 defaultRunDays = 365
32 defaultJWTUsedDays = 30
33 defaultArtifactBatch = 1000
34 maxArtifactBatch = 10000
35 actionsRunsPrefix = "actions/runs/"
36 )
37
38 // Payload is the workflow:cleanup worker payload. Zero values select the
39 // production defaults documented in docs/internal/actions-schema.md.
40 type Payload struct {
41 StepLogChunkDays int `json:"step_log_chunk_days,omitempty"`
42 RunDays int `json:"run_days,omitempty"`
43 JWTUsedDays int `json:"jwt_used_days,omitempty"`
44 ArtifactBatch int `json:"artifact_batch,omitempty"`
45 }
46
47 // Deps are the runtime dependencies for Handler.
48 type Deps struct {
49 Pool *pgxpool.Pool
50 ObjectStore storage.ObjectStore
51 Logger *slog.Logger
52 Now func() time.Time
53 }
54
55 // Result summarizes one cleanup sweep.
56 type Result struct {
57 ChunksDeleted int64
58 ArtifactRowsDeleted int64
59 ArtifactObjectsDeleted int64
60 RunsDeleted int64
61 JWTUsedDeleted int64
62 }
63
64 // Handler returns the worker handler for workflow:cleanup.
65 func Handler(deps Deps) worker.Handler {
66 return func(ctx context.Context, raw json.RawMessage) error {
67 var p Payload
68 if len(raw) > 0 {
69 if err := json.Unmarshal(raw, &p); err != nil {
70 return worker.PoisonError(fmt.Errorf("workflow cleanup: bad payload: %w", err))
71 }
72 }
73 res, err := Sweep(ctx, deps, p)
74 if err != nil {
75 return err
76 }
77 if deps.Logger != nil {
78 deps.Logger.InfoContext(ctx, "workflow cleanup complete",
79 "chunks_deleted", res.ChunksDeleted,
80 "artifact_rows_deleted", res.ArtifactRowsDeleted,
81 "artifact_objects_deleted", res.ArtifactObjectsDeleted,
82 "runs_deleted", res.RunsDeleted,
83 "jwt_used_deleted", res.JWTUsedDeleted)
84 }
85 return nil
86 }
87 }
88
89 // Sweep applies Actions retention once.
90 func Sweep(ctx context.Context, deps Deps, p Payload) (Result, error) {
91 if deps.Pool == nil {
92 return Result{}, worker.PoisonError(errors.New("workflow cleanup: database pool is not configured"))
93 }
94 if err := normalizePayload(&p); err != nil {
95 return Result{}, worker.PoisonError(err)
96 }
97 now := time.Now().UTC()
98 if deps.Now != nil {
99 now = deps.Now().UTC()
100 }
101 q := actionsdb.New()
102 res := Result{}
103
104 stepCutoff := pgtype.Timestamptz{Time: now.Add(-time.Duration(p.StepLogChunkDays) * 24 * time.Hour), Valid: true}
105 n, err := q.DeleteStaleStepLogChunksForCleanup(ctx, deps.Pool, stepCutoff)
106 if err != nil {
107 return Result{}, fmt.Errorf("workflow cleanup: delete stale log chunks: %w", err)
108 }
109 res.ChunksDeleted = n
110 recordPruned("chunks", n)
111
112 artifactCutoff := pgtype.Timestamptz{Time: now, Valid: true}
113 artifactRes, err := pruneExpiredArtifacts(ctx, q, deps, artifactCutoff, int32(p.ArtifactBatch))
114 if err != nil {
115 return Result{}, err
116 }
117 res.ArtifactRowsDeleted = artifactRes.ArtifactRowsDeleted
118 res.ArtifactObjectsDeleted = artifactRes.ArtifactObjectsDeleted
119 recordPruned("blobs", artifactRes.ArtifactObjectsDeleted)
120
121 runCutoff := pgtype.Timestamptz{Time: now.Add(-time.Duration(p.RunDays) * 24 * time.Hour), Valid: true}
122 n, err = q.DeleteOldWorkflowRunsForCleanup(ctx, deps.Pool, runCutoff)
123 if err != nil {
124 return Result{}, fmt.Errorf("workflow cleanup: delete old workflow runs: %w", err)
125 }
126 res.RunsDeleted = n
127 recordPruned("runs", n)
128
129 jwtCutoff := pgtype.Timestamptz{Time: now.Add(-time.Duration(p.JWTUsedDays) * 24 * time.Hour), Valid: true}
130 n, err = q.DeleteOldRunnerJWTUsesForCleanup(ctx, deps.Pool, jwtCutoff)
131 if err != nil {
132 return Result{}, fmt.Errorf("workflow cleanup: delete old runner JWT uses: %w", err)
133 }
134 res.JWTUsedDeleted = n
135 recordPruned("jwt_used", n)
136
137 return res, nil
138 }
139
140 func normalizePayload(p *Payload) error {
141 if p.StepLogChunkDays < 0 || p.RunDays < 0 || p.JWTUsedDays < 0 || p.ArtifactBatch < 0 {
142 return errors.New("workflow cleanup: retention values must be non-negative")
143 }
144 if p.StepLogChunkDays == 0 {
145 p.StepLogChunkDays = defaultStepLogChunkDays
146 }
147 if p.RunDays == 0 {
148 p.RunDays = defaultRunDays
149 }
150 if p.JWTUsedDays == 0 {
151 p.JWTUsedDays = defaultJWTUsedDays
152 }
153 if p.ArtifactBatch == 0 {
154 p.ArtifactBatch = defaultArtifactBatch
155 }
156 if p.ArtifactBatch > maxArtifactBatch {
157 return fmt.Errorf("workflow cleanup: artifact_batch must be <= %d", maxArtifactBatch)
158 }
159 return nil
160 }
161
162 func pruneExpiredArtifacts(
163 ctx context.Context,
164 q *actionsdb.Queries,
165 deps Deps,
166 cutoff pgtype.Timestamptz,
167 batch int32,
168 ) (Result, error) {
169 if deps.ObjectStore == nil {
170 if deps.Logger != nil {
171 deps.Logger.WarnContext(ctx, "workflow cleanup: object storage not configured; skipping artifact pruning")
172 }
173 return Result{}, nil
174 }
175
176 var res Result
177 for {
178 rows, err := q.ListExpiredWorkflowArtifactsForCleanup(ctx, deps.Pool, actionsdb.ListExpiredWorkflowArtifactsForCleanupParams{
179 ExpiresAt: cutoff,
180 Limit: batch,
181 })
182 if err != nil {
183 return Result{}, fmt.Errorf("workflow cleanup: list expired artifacts: %w", err)
184 }
185 if len(rows) == 0 {
186 return res, nil
187 }
188
189 ids := make([]int64, 0, len(rows))
190 for _, row := range rows {
191 if !strings.HasPrefix(row.ObjectKey, actionsRunsPrefix) {
192 return Result{}, fmt.Errorf("workflow cleanup: refusing to delete non-actions object key %q", row.ObjectKey)
193 }
194 if err := deps.ObjectStore.Delete(ctx, row.ObjectKey); err != nil {
195 return Result{}, fmt.Errorf("workflow cleanup: delete artifact object %q: %w", row.ObjectKey, err)
196 }
197 res.ArtifactObjectsDeleted++
198 ids = append(ids, row.ID)
199 }
200
201 deleted, err := q.DeleteWorkflowArtifactsByIDs(ctx, deps.Pool, ids)
202 if err != nil {
203 return Result{}, fmt.Errorf("workflow cleanup: delete artifact rows: %w", err)
204 }
205 res.ArtifactRowsDeleted += deleted
206 if len(rows) < int(batch) {
207 return res, nil
208 }
209 }
210 }
211
212 func recordPruned(kind string, n int64) {
213 if n <= 0 {
214 return
215 }
216 metrics.ActionsRunsPrunedTotal.WithLabelValues(kind).Add(float64(n))
217 }
218