Go · 5276 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 // Package finalize owns server-side Actions finalization work that should not
4 // run on the hot runner API path.
5 package finalize
6
7 import (
8 "bytes"
9 "context"
10 "encoding/json"
11 "errors"
12 "fmt"
13 "log/slog"
14
15 "github.com/jackc/pgx/v5"
16 "github.com/jackc/pgx/v5/pgtype"
17 "github.com/jackc/pgx/v5/pgxpool"
18
19 actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
20 "github.com/tenseleyFlow/shithub/internal/infra/storage"
21 "github.com/tenseleyFlow/shithub/internal/worker"
22 )
23
24 // KindWorkflowFinalizeStep names the worker job that compacts per-step log
25 // chunks into object storage and prunes the SQL chunks.
26 const KindWorkflowFinalizeStep worker.Kind = "workflow:finalize_step"
27
28 const (
29 defaultMaxLogBytes = 100 * 1024 * 1024
30 chunkPageSize = 1000
31 logContentType = "text/plain; charset=utf-8"
32 )
33
34 // Payload is the workflow:finalize_step worker payload.
35 type Payload struct {
36 StepID int64 `json:"step_id"`
37 }
38
39 // Deps are the runtime dependencies for Handler.
40 type Deps struct {
41 Pool *pgxpool.Pool
42 ObjectStore storage.ObjectStore
43 Logger *slog.Logger
44 MaxLogBytes int64
45 }
46
47 // Handler returns the worker handler for workflow:finalize_step.
48 func Handler(deps Deps) worker.Handler {
49 maxLogBytes := deps.MaxLogBytes
50 if maxLogBytes <= 0 {
51 maxLogBytes = defaultMaxLogBytes
52 }
53 return func(ctx context.Context, raw json.RawMessage) error {
54 var p Payload
55 if err := json.Unmarshal(raw, &p); err != nil {
56 return worker.PoisonError(fmt.Errorf("finalize step: bad payload: %w", err))
57 }
58 if p.StepID <= 0 {
59 return worker.PoisonError(errors.New("finalize step: missing step_id"))
60 }
61 if deps.Pool == nil {
62 return worker.PoisonError(errors.New("finalize step: database pool is not configured"))
63 }
64 if deps.ObjectStore == nil {
65 return worker.PoisonError(errors.New("finalize step: object storage is not configured"))
66 }
67 if err := finalizeStep(ctx, deps, p.StepID, maxLogBytes); err != nil {
68 return err
69 }
70 return nil
71 }
72 }
73
74 func finalizeStep(ctx context.Context, deps Deps, stepID, maxLogBytes int64) error {
75 q := actionsdb.New()
76 step, err := q.GetWorkflowStepByID(ctx, deps.Pool, stepID)
77 if err != nil {
78 if errors.Is(err, pgx.ErrNoRows) {
79 return worker.PoisonError(fmt.Errorf("finalize step: step %d not found", stepID))
80 }
81 return fmt.Errorf("finalize step: load step: %w", err)
82 }
83 job, err := q.GetWorkflowJobByID(ctx, deps.Pool, step.JobID)
84 if err != nil {
85 if errors.Is(err, pgx.ErrNoRows) {
86 return worker.PoisonError(fmt.Errorf("finalize step: job %d not found", step.JobID))
87 }
88 return fmt.Errorf("finalize step: load job: %w", err)
89 }
90
91 log, err := collectChunks(ctx, q, deps.Pool, stepID, maxLogBytes)
92 if err != nil {
93 return err
94 }
95 if len(log) == 0 && step.LogObjectKey.Valid {
96 return nil
97 }
98
99 objectKey := pgtype.Text{}
100 if len(log) > 0 {
101 key := StepLogObjectKey(job.RunID, job.ID, stepID)
102 if _, err := deps.ObjectStore.Put(ctx, key, bytes.NewReader(log), storage.PutOpts{
103 ContentType: logContentType,
104 ContentLength: int64(len(log)),
105 }); err != nil {
106 return fmt.Errorf("finalize step: upload log object: %w", err)
107 }
108 objectKey = pgtype.Text{String: key, Valid: true}
109 }
110
111 tx, err := deps.Pool.Begin(ctx)
112 if err != nil {
113 return fmt.Errorf("finalize step: begin tx: %w", err)
114 }
115 committed := false
116 defer func() {
117 if !committed {
118 _ = tx.Rollback(ctx)
119 }
120 }()
121 if _, err := q.UpdateWorkflowStepLogObject(ctx, tx, actionsdb.UpdateWorkflowStepLogObjectParams{
122 ID: stepID,
123 LogObjectKey: objectKey,
124 LogByteCount: int64(len(log)),
125 }); err != nil {
126 return fmt.Errorf("finalize step: update step log object: %w", err)
127 }
128 if err := q.DeleteStepLogChunks(ctx, tx, stepID); err != nil {
129 return fmt.Errorf("finalize step: delete chunks: %w", err)
130 }
131 if err := tx.Commit(ctx); err != nil {
132 return fmt.Errorf("finalize step: commit: %w", err)
133 }
134 committed = true
135 if deps.Logger != nil {
136 deps.Logger.InfoContext(ctx, "finalized workflow step log",
137 "step_id", stepID, "job_id", job.ID, "run_id", job.RunID, "bytes", len(log))
138 }
139 return nil
140 }
141
142 func collectChunks(ctx context.Context, q *actionsdb.Queries, db actionsdb.DBTX, stepID, maxLogBytes int64) ([]byte, error) {
143 var (
144 out bytes.Buffer
145 lastSeq int32 = -1
146 )
147 for {
148 chunks, err := q.ListStepLogChunks(ctx, db, actionsdb.ListStepLogChunksParams{
149 StepID: stepID,
150 Seq: lastSeq,
151 Limit: chunkPageSize,
152 })
153 if err != nil {
154 return nil, fmt.Errorf("finalize step: list chunks: %w", err)
155 }
156 if len(chunks) == 0 {
157 return out.Bytes(), nil
158 }
159 for _, chunk := range chunks {
160 if int64(out.Len()+len(chunk.Chunk)) > maxLogBytes {
161 return nil, worker.PoisonError(fmt.Errorf("finalize step: log exceeds %d bytes", maxLogBytes))
162 }
163 if _, err := out.Write(chunk.Chunk); err != nil {
164 return nil, fmt.Errorf("finalize step: append chunk: %w", err)
165 }
166 lastSeq = chunk.Seq
167 }
168 }
169 }
170
171 // StepLogObjectKey returns the deterministic object key for a finalized step
172 // log. It is intentionally independent of step names so renames do not move
173 // already-uploaded logs.
174 func StepLogObjectKey(runID, jobID, stepID int64) string {
175 return fmt.Sprintf("actions/runs/%d/jobs/%d/steps/%d.log", runID, jobID, stepID)
176 }
177