Go · 12163 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 package trigger
4
5 import (
6 "context"
7 "encoding/json"
8 "errors"
9 "fmt"
10 "log/slog"
11
12 "github.com/jackc/pgx/v5"
13 "github.com/jackc/pgx/v5/pgtype"
14 "github.com/jackc/pgx/v5/pgxpool"
15
16 actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
17 "github.com/tenseleyFlow/shithub/internal/actions/workflow"
18 "github.com/tenseleyFlow/shithub/internal/checks"
19 "github.com/tenseleyFlow/shithub/internal/infra/metrics"
20 )
21
22 // Deps wires the trigger pipeline against runtime infra.
23 type Deps struct {
24 Pool *pgxpool.Pool
25 Logger *slog.Logger
26 }
27
28 // EnqueueParams is the input to Enqueue. The caller (trigger handler)
29 // has already discovered + parsed the workflow and built the typed
30 // event payload via internal/actions/event.
31 type EnqueueParams struct {
32 // RepoID identifies the repo the run belongs to.
33 RepoID int64
34
35 // WorkflowFile is the repo-relative path of the parsed file
36 // (e.g. ".shithub/workflows/ci.yml"). Used as part of the
37 // idempotency key so two workflows in the same repo on the same
38 // SHA each get their own run.
39 WorkflowFile string
40
41 // HeadSHA is the commit the run targets — for push: the after-sha;
42 // for PR: the head-sha; for dispatch: the resolved-at-dispatch-time
43 // SHA; for schedule: the default-branch tip at sweep time.
44 HeadSHA string
45
46 // HeadRef is the symbolic ref (e.g. "refs/heads/main",
47 // "refs/heads/feature/foo"). Optional — empty when not
48 // applicable (schedule).
49 HeadRef string
50
51 // EventKind is one of the four supported triggers; persisted to
52 // workflow_runs.event for filtering on the runs list.
53 EventKind EventKind
54
55 // EventPayload is the canonical shithub.event payload built by the
56 // matching constructor in internal/actions/event. Stored as
57 // jsonb on workflow_runs.event_payload; the runner consumes via
58 // expr.evalEventPath.
59 EventPayload map[string]any
60
61 // ActorUserID is the user who triggered the run (pusher,
62 // PR-opener, dispatcher). Zero for schedule (system trigger).
63 ActorUserID int64
64
65 // ParentRunID is set for re-runs to chain back to the original.
66 // Zero for fresh triggers.
67 ParentRunID int64
68
69 // TriggerEventID is the stable identifier of the triggering
70 // event. See migration 0051's header comment for the construction
71 // convention. Required (empty string is a programmer error).
72 TriggerEventID string
73
74 // Workflow is the parsed workflow. The matching jobs/steps are
75 // persisted; concurrency.group is captured at trigger time as a
76 // string (expression resolution against the event context lands
77 // in S41g when the slot manager actually consumes it).
78 Workflow *workflow.Workflow
79 }
80
81 // Result reports the outcome of an Enqueue call.
82 type Result struct {
83 // RunID is the workflow_runs.id of the persisted (or already-
84 // existing) row. Always non-zero when err is nil.
85 RunID int64
86 // RunIndex is the per-repo monotonic index (used for stable URLs).
87 RunIndex int64
88 // AlreadyExists is true when a prior call with the same
89 // trigger_event_id had already created the run; in this case the
90 // child jobs/steps/check_runs were NOT re-inserted. Caller can
91 // log + move on.
92 AlreadyExists bool
93 // CheckRunIDs is the list of check_runs.id rows created (or
94 // looked up via ExternalID) for the workflow's jobs. Order
95 // matches Workflow.Jobs declaration order.
96 CheckRunIDs []int64
97 }
98
99 // Enqueue persists a matched workflow as a queued run with all its
100 // jobs + steps in one transaction, then creates one check_run per
101 // job (idempotent via ExternalID) outside the tx so the run+jobs+
102 // steps remain atomic but check_run drift can be reconciled.
103 //
104 // On conflict (the trigger_event_id has been used for this
105 // workflow_file before), returns Result{AlreadyExists: true} after
106 // looking up the existing run — the handler treats that as success.
107 //
108 // On any other error, the inner tx is rolled back; no rows persist.
109 func Enqueue(ctx context.Context, deps Deps, p EnqueueParams) (Result, error) {
110 if err := validateParams(&p); err != nil {
111 return Result{}, err
112 }
113
114 q := actionsdb.New()
115
116 tx, err := deps.Pool.Begin(ctx)
117 if err != nil {
118 return Result{}, fmt.Errorf("trigger: begin tx: %w", err)
119 }
120 committed := false
121 defer func() {
122 if !committed {
123 _ = tx.Rollback(ctx)
124 }
125 }()
126
127 // run_index is per-repo; lookup MAX+1. Concurrent inserts on the
128 // same repo race here, but the (repo_id, run_index) UNIQUE catches
129 // it as a unique-violation and the caller (worker handler) retries.
130 // Realistic concurrency on a single repo's triggers is low for v1.
131 runIndex, err := q.NextRunIndexForRepo(ctx, tx, p.RepoID)
132 if err != nil {
133 return Result{}, fmt.Errorf("trigger: next run index: %w", err)
134 }
135
136 payloadBytes, err := json.Marshal(p.EventPayload)
137 if err != nil {
138 return Result{}, fmt.Errorf("trigger: marshal event payload: %w", err)
139 }
140
141 concurrencyGroup := p.Workflow.Concurrency.Group.Raw
142
143 run, err := q.EnqueueWorkflowRun(ctx, tx, actionsdb.EnqueueWorkflowRunParams{
144 RepoID: p.RepoID,
145 RunIndex: runIndex,
146 WorkflowFile: p.WorkflowFile,
147 WorkflowName: p.Workflow.Name,
148 HeadSha: p.HeadSHA,
149 HeadRef: p.HeadRef,
150 Event: actionsdb.WorkflowRunEvent(p.EventKind),
151 EventPayload: payloadBytes,
152 ActorUserID: pgInt8(p.ActorUserID),
153 ParentRunID: pgInt8(p.ParentRunID),
154 ConcurrencyGroup: concurrencyGroup,
155 NeedApproval: false,
156 TriggerEventID: p.TriggerEventID,
157 })
158 if err != nil {
159 // pgx.ErrNoRows = ON CONFLICT DO NOTHING fired. Lookup the
160 // existing row so the handler has a stable RunID to log.
161 if errors.Is(err, pgx.ErrNoRows) {
162 if err := tx.Commit(ctx); err != nil {
163 return Result{}, fmt.Errorf("trigger: commit empty tx: %w", err)
164 }
165 committed = true
166 existing, lookupErr := lookupExistingRun(ctx, deps.Pool, p)
167 if lookupErr != nil {
168 return Result{}, fmt.Errorf("trigger: existing-run lookup: %w", lookupErr)
169 }
170 metrics.ActionsRunsEnqueuedTotal.WithLabelValues(string(p.EventKind), "already_exists").Inc()
171 return Result{
172 RunID: existing.ID,
173 RunIndex: existing.RunIndex,
174 AlreadyExists: true,
175 }, nil
176 }
177 return Result{}, fmt.Errorf("trigger: insert run: %w", err)
178 }
179
180 // Persist child jobs + their steps. Order in Workflow.Jobs is YAML
181 // document order, which we preserve via job_index.
182 jobIDs := make([]int64, len(p.Workflow.Jobs))
183 for i, j := range p.Workflow.Jobs {
184 needs := j.Needs
185 if needs == nil {
186 // Postgres text[] doesn't accept Go nil — use empty slice.
187 needs = []string{}
188 }
189 permsJSON, err := marshalPermissions(j.Permissions)
190 if err != nil {
191 return Result{}, fmt.Errorf("trigger: marshal permissions for job %s: %w", j.Key, err)
192 }
193 envJSON, err := marshalEnv(j.Env)
194 if err != nil {
195 return Result{}, fmt.Errorf("trigger: marshal env for job %s: %w", j.Key, err)
196 }
197 job, err := q.InsertWorkflowJob(ctx, tx, actionsdb.InsertWorkflowJobParams{
198 RunID: run.ID,
199 JobIndex: int32(i),
200 JobKey: j.Key,
201 JobName: j.Name,
202 RunsOn: j.RunsOn,
203 NeedsJobs: needs,
204 IfExpr: j.If,
205 TimeoutMinutes: int32(j.TimeoutMinutes),
206 Permissions: permsJSON,
207 JobEnv: envJSON,
208 })
209 if err != nil {
210 return Result{}, fmt.Errorf("trigger: insert job %s: %w", j.Key, err)
211 }
212 jobIDs[i] = job.ID
213
214 for si, s := range j.Steps {
215 stepEnvJSON, err := marshalEnv(s.Env)
216 if err != nil {
217 return Result{}, fmt.Errorf("trigger: marshal step env: %w", err)
218 }
219 stepWithJSON, err := marshalEnv(s.With)
220 if err != nil {
221 return Result{}, fmt.Errorf("trigger: marshal step with: %w", err)
222 }
223 if _, err := q.InsertWorkflowStep(ctx, tx, actionsdb.InsertWorkflowStepParams{
224 JobID: job.ID,
225 StepIndex: int32(si),
226 StepID: s.ID,
227 StepName: s.Name,
228 IfExpr: s.If,
229 RunCommand: s.Run,
230 UsesAlias: s.Uses,
231 WorkingDirectory: s.WorkingDirectory,
232 StepEnv: stepEnvJSON,
233 ContinueOnError: s.ContinueOnError,
234 StepWith: stepWithJSON,
235 }); err != nil {
236 return Result{}, fmt.Errorf("trigger: insert step %d for job %s: %w", si, j.Key, err)
237 }
238 }
239 }
240
241 if err := tx.Commit(ctx); err != nil {
242 return Result{}, fmt.Errorf("trigger: commit run tx: %w", err)
243 }
244 committed = true
245
246 // check_run rows: separate concern, post-commit so the run+jobs+
247 // steps are durable before we touch a different subsystem. ExternalID
248 // idempotency means a retry of just this phase converges cleanly.
249 checkRunIDs := make([]int64, 0, len(p.Workflow.Jobs))
250 for i, j := range p.Workflow.Jobs {
251 extID := fmt.Sprintf("workflow_run:%d:job:%s", run.ID, j.Key)
252 name := j.Name
253 if name == "" {
254 name = j.Key
255 }
256 cr, err := checks.Create(ctx, checks.Deps{Pool: deps.Pool, Logger: deps.Logger}, checks.CreateParams{
257 RepoID: p.RepoID,
258 HeadSHA: p.HeadSHA,
259 AppSlug: "shithub-actions",
260 Name: name,
261 Status: "queued",
262 ExternalID: extID,
263 })
264 if err != nil {
265 // Don't roll back the run on check_run failure — log and
266 // continue. The run is queued and visible; checks can
267 // reconcile via a future retry of just this loop.
268 deps.Logger.WarnContext(ctx, "trigger: check_run create failed",
269 "run_id", run.ID, "job_key", j.Key, "error", err)
270 continue
271 }
272 checkRunIDs = append(checkRunIDs, cr.ID)
273 _ = jobIDs[i] // job_id linkage to check_run lands in S41c when the runner consumes both
274 }
275
276 metrics.ActionsRunsEnqueuedTotal.WithLabelValues(string(p.EventKind), "fresh").Inc()
277 return Result{
278 RunID: run.ID,
279 RunIndex: run.RunIndex,
280 CheckRunIDs: checkRunIDs,
281 }, nil
282 }
283
284 func validateParams(p *EnqueueParams) error {
285 if p.RepoID == 0 {
286 return errors.New("trigger: RepoID required")
287 }
288 if p.WorkflowFile == "" {
289 return errors.New("trigger: WorkflowFile required")
290 }
291 if p.HeadSHA == "" {
292 return errors.New("trigger: HeadSHA required")
293 }
294 if p.TriggerEventID == "" {
295 return errors.New("trigger: TriggerEventID required (empty would bypass idempotency)")
296 }
297 if p.Workflow == nil {
298 return errors.New("trigger: Workflow required")
299 }
300 if len(p.Workflow.Jobs) == 0 {
301 return errors.New("trigger: workflow has no jobs")
302 }
303 switch p.EventKind {
304 case EventPush, EventPullRequest, EventSchedule, EventWorkflowDispatch:
305 default:
306 return fmt.Errorf("trigger: unsupported event kind %q", p.EventKind)
307 }
308 return nil
309 }
310
311 // lookupExistingRun finds a workflow_run by the trigger_event_id key
312 // after EnqueueWorkflowRun reported a conflict. Done outside the
313 // inner tx (which we committed empty) so the caller has a stable
314 // RunID to surface.
315 func lookupExistingRun(ctx context.Context, pool *pgxpool.Pool, p EnqueueParams) (actionsdb.WorkflowRun, error) {
316 q := actionsdb.New()
317 rows, err := q.LookupWorkflowRunByTriggerEvent(ctx, pool, actionsdb.LookupWorkflowRunByTriggerEventParams{
318 RepoID: p.RepoID,
319 WorkflowFile: p.WorkflowFile,
320 TriggerEventID: p.TriggerEventID,
321 })
322 if err != nil {
323 return actionsdb.WorkflowRun{}, err
324 }
325 return rows, nil
326 }
327
328 func pgInt8(v int64) pgtype.Int8 {
329 return pgtype.Int8{Int64: v, Valid: v != 0}
330 }
331
332 // marshalEnv encodes a workflow.Value-keyed map to jsonb-friendly
333 // {string: string} for the step_env / job_env / step_with columns.
334 // We don't persist the parser-side struct directly — the runner only
335 // needs the resolved string value.
336 func marshalEnv(m map[string]workflow.Value) ([]byte, error) {
337 if len(m) == 0 {
338 return []byte("{}"), nil
339 }
340 out := make(map[string]string, len(m))
341 for k, v := range m {
342 out[k] = v.Raw
343 }
344 return json.Marshal(out)
345 }
346
347 // marshalPermissions flattens a workflow.Permissions into the jsonb
348 // shape S41c will consume. v1 just stores Mode + Per — no resolution.
349 func marshalPermissions(p workflow.Permissions) ([]byte, error) {
350 if p.Mode == "" && len(p.Per) == 0 {
351 return []byte("{}"), nil
352 }
353 out := map[string]any{}
354 if p.Mode != "" {
355 out["mode"] = p.Mode
356 }
357 if len(p.Per) > 0 {
358 per := make(map[string]string, len(p.Per))
359 for k, v := range p.Per {
360 per[k] = string(v)
361 }
362 out["per"] = per
363 }
364 return json.Marshal(out)
365 }
366