Go · 13168 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 package trigger
4
5 import (
6 "context"
7 "encoding/json"
8 "errors"
9 "fmt"
10 "log/slog"
11
12 "github.com/jackc/pgx/v5"
13 "github.com/jackc/pgx/v5/pgtype"
14 "github.com/jackc/pgx/v5/pgxpool"
15
16 "github.com/tenseleyFlow/shithub/internal/actions/checksync"
17 "github.com/tenseleyFlow/shithub/internal/actions/concurrency"
18 actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
19 "github.com/tenseleyFlow/shithub/internal/actions/workflow"
20 "github.com/tenseleyFlow/shithub/internal/checks"
21 "github.com/tenseleyFlow/shithub/internal/infra/metrics"
22 )
23
24 // Deps wires the trigger pipeline against runtime infra.
25 type Deps struct {
26 Pool *pgxpool.Pool
27 Logger *slog.Logger
28 }
29
30 // EnqueueParams is the input to Enqueue. The caller (trigger handler)
31 // has already discovered + parsed the workflow and built the typed
32 // event payload via internal/actions/event.
33 type EnqueueParams struct {
34 // RepoID identifies the repo the run belongs to.
35 RepoID int64
36
37 // WorkflowFile is the repo-relative path of the parsed file
38 // (e.g. ".shithub/workflows/ci.yml"). Used as part of the
39 // idempotency key so two workflows in the same repo on the same
40 // SHA each get their own run.
41 WorkflowFile string
42
43 // HeadSHA is the commit the run targets — for push: the after-sha;
44 // for PR: the head-sha; for dispatch: the resolved-at-dispatch-time
45 // SHA; for schedule: the default-branch tip at sweep time.
46 HeadSHA string
47
48 // HeadRef is the symbolic ref (e.g. "refs/heads/main",
49 // "refs/heads/feature/foo"). Optional — empty when not
50 // applicable (schedule).
51 HeadRef string
52
53 // EventKind is one of the four supported triggers; persisted to
54 // workflow_runs.event for filtering on the runs list.
55 EventKind EventKind
56
57 // EventPayload is the canonical shithub.event payload built by the
58 // matching constructor in internal/actions/event. Stored as
59 // jsonb on workflow_runs.event_payload; the runner consumes via
60 // expr.evalEventPath.
61 EventPayload map[string]any
62
63 // ActorUserID is the user who triggered the run (pusher,
64 // PR-opener, dispatcher). Zero for schedule (system trigger).
65 ActorUserID int64
66
67 // ParentRunID is set for re-runs to chain back to the original.
68 // Zero for fresh triggers.
69 ParentRunID int64
70
71 // TriggerEventID is the stable identifier of the triggering
72 // event. See migration 0051's header comment for the construction
73 // convention. Required (empty string is a programmer error).
74 TriggerEventID string
75
76 // Workflow is the parsed workflow. The matching jobs/steps are
77 // persisted; concurrency.group is resolved against the trigger
78 // context and enforced before runners can claim younger jobs.
79 Workflow *workflow.Workflow
80 }
81
82 // Result reports the outcome of an Enqueue call.
83 type Result struct {
84 // RunID is the workflow_runs.id of the persisted (or already-
85 // existing) row. Always non-zero when err is nil.
86 RunID int64
87 // RunIndex is the per-repo monotonic index (used for stable URLs).
88 RunIndex int64
89 // AlreadyExists is true when a prior call with the same
90 // trigger_event_id had already created the run; in this case the
91 // child jobs/steps/check_runs were NOT re-inserted. Caller can
92 // log + move on.
93 AlreadyExists bool
94 // CheckRunIDs is the list of check_runs.id rows created (or
95 // looked up via ExternalID) for the workflow's jobs. Order
96 // matches Workflow.Jobs declaration order.
97 CheckRunIDs []int64
98 }
99
100 // Enqueue persists a matched workflow as a queued run with all its
101 // jobs + steps in one transaction, then creates one check_run per
102 // job (idempotent via ExternalID) outside the tx so the run+jobs+
103 // steps remain atomic but check_run drift can be reconciled.
104 //
105 // On conflict (the trigger_event_id has been used for this
106 // workflow_file before), returns Result{AlreadyExists: true} after
107 // looking up the existing run — the handler treats that as success.
108 //
109 // On any other error, the inner tx is rolled back; no rows persist.
110 func Enqueue(ctx context.Context, deps Deps, p EnqueueParams) (Result, error) {
111 if err := validateParams(&p); err != nil {
112 return Result{}, err
113 }
114 concurrencyResolution, err := concurrency.Resolve(concurrency.ResolveInput{
115 Workflow: p.Workflow,
116 EventPayload: p.EventPayload,
117 HeadSHA: p.HeadSHA,
118 HeadRef: p.HeadRef,
119 })
120 if err != nil {
121 return Result{}, fmt.Errorf("trigger: concurrency: %w", err)
122 }
123
124 q := actionsdb.New()
125
126 tx, err := deps.Pool.Begin(ctx)
127 if err != nil {
128 return Result{}, fmt.Errorf("trigger: begin tx: %w", err)
129 }
130 committed := false
131 defer func() {
132 if !committed {
133 _ = tx.Rollback(ctx)
134 }
135 }()
136
137 // run_index is per-repo; lookup MAX+1. Concurrent inserts on the
138 // same repo race here, but the (repo_id, run_index) UNIQUE catches
139 // it as a unique-violation and the caller (worker handler) retries.
140 // Realistic concurrency on a single repo's triggers is low for v1.
141 runIndex, err := q.NextRunIndexForRepo(ctx, tx, p.RepoID)
142 if err != nil {
143 return Result{}, fmt.Errorf("trigger: next run index: %w", err)
144 }
145
146 payloadBytes, err := json.Marshal(p.EventPayload)
147 if err != nil {
148 return Result{}, fmt.Errorf("trigger: marshal event payload: %w", err)
149 }
150
151 run, err := q.EnqueueWorkflowRun(ctx, tx, actionsdb.EnqueueWorkflowRunParams{
152 RepoID: p.RepoID,
153 RunIndex: runIndex,
154 WorkflowFile: p.WorkflowFile,
155 WorkflowName: p.Workflow.Name,
156 HeadSha: p.HeadSHA,
157 HeadRef: p.HeadRef,
158 Event: actionsdb.WorkflowRunEvent(p.EventKind),
159 EventPayload: payloadBytes,
160 ActorUserID: pgInt8(p.ActorUserID),
161 ParentRunID: pgInt8(p.ParentRunID),
162 ConcurrencyGroup: concurrencyResolution.Group,
163 NeedApproval: false,
164 TriggerEventID: p.TriggerEventID,
165 })
166 if err != nil {
167 // pgx.ErrNoRows = ON CONFLICT DO NOTHING fired. Lookup the
168 // existing row so the handler has a stable RunID to log.
169 if errors.Is(err, pgx.ErrNoRows) {
170 if err := tx.Commit(ctx); err != nil {
171 return Result{}, fmt.Errorf("trigger: commit empty tx: %w", err)
172 }
173 committed = true
174 existing, lookupErr := lookupExistingRun(ctx, deps.Pool, p)
175 if lookupErr != nil {
176 return Result{}, fmt.Errorf("trigger: existing-run lookup: %w", lookupErr)
177 }
178 metrics.ActionsRunsEnqueuedTotal.WithLabelValues(string(p.EventKind), "already_exists").Inc()
179 return Result{
180 RunID: existing.ID,
181 RunIndex: existing.RunIndex,
182 AlreadyExists: true,
183 }, nil
184 }
185 return Result{}, fmt.Errorf("trigger: insert run: %w", err)
186 }
187
188 // Persist child jobs + their steps. Order in Workflow.Jobs is YAML
189 // document order, which we preserve via job_index.
190 jobIDs := make([]int64, len(p.Workflow.Jobs))
191 for i, j := range p.Workflow.Jobs {
192 needs := j.Needs
193 if needs == nil {
194 // Postgres text[] doesn't accept Go nil — use empty slice.
195 needs = []string{}
196 }
197 permsJSON, err := marshalPermissions(j.Permissions)
198 if err != nil {
199 return Result{}, fmt.Errorf("trigger: marshal permissions for job %s: %w", j.Key, err)
200 }
201 envJSON, err := marshalEnv(j.Env)
202 if err != nil {
203 return Result{}, fmt.Errorf("trigger: marshal env for job %s: %w", j.Key, err)
204 }
205 job, err := q.InsertWorkflowJob(ctx, tx, actionsdb.InsertWorkflowJobParams{
206 RunID: run.ID,
207 JobIndex: int32(i),
208 JobKey: j.Key,
209 JobName: j.Name,
210 RunsOn: j.RunsOn,
211 NeedsJobs: needs,
212 IfExpr: j.If,
213 TimeoutMinutes: int32(j.TimeoutMinutes),
214 Permissions: permsJSON,
215 JobEnv: envJSON,
216 })
217 if err != nil {
218 return Result{}, fmt.Errorf("trigger: insert job %s: %w", j.Key, err)
219 }
220 jobIDs[i] = job.ID
221
222 for si, s := range j.Steps {
223 stepEnvJSON, err := marshalEnv(s.Env)
224 if err != nil {
225 return Result{}, fmt.Errorf("trigger: marshal step env: %w", err)
226 }
227 stepWithJSON, err := marshalEnv(s.With)
228 if err != nil {
229 return Result{}, fmt.Errorf("trigger: marshal step with: %w", err)
230 }
231 if _, err := q.InsertWorkflowStep(ctx, tx, actionsdb.InsertWorkflowStepParams{
232 JobID: job.ID,
233 StepIndex: int32(si),
234 StepID: s.ID,
235 StepName: s.Name,
236 IfExpr: s.If,
237 RunCommand: s.Run,
238 UsesAlias: s.Uses,
239 WorkingDirectory: s.WorkingDirectory,
240 StepEnv: stepEnvJSON,
241 ContinueOnError: s.ContinueOnError,
242 StepWith: stepWithJSON,
243 }); err != nil {
244 return Result{}, fmt.Errorf("trigger: insert step %d for job %s: %w", si, j.Key, err)
245 }
246 }
247 }
248
249 concurrencyResult, err := concurrency.Enforce(ctx, q, tx, concurrency.EnforceParams{
250 Run: run,
251 CancelInProgress: concurrencyResolution.CancelInProgress,
252 })
253 if err != nil {
254 return Result{}, fmt.Errorf("trigger: enforce concurrency: %w", err)
255 }
256
257 if err := tx.Commit(ctx); err != nil {
258 return Result{}, fmt.Errorf("trigger: commit run tx: %w", err)
259 }
260 committed = true
261
262 if len(concurrencyResult.CancelledJobs) > 0 {
263 metrics.ActionsJobsCancelledTotal.WithLabelValues(concurrency.CancelReason).Add(float64(len(concurrencyResult.CancelledJobs)))
264 checksync.ChangedJobs(ctx, checksync.Deps{Pool: deps.Pool, Logger: deps.Logger}, concurrencyResult.CancelledJobs)
265 } else if !concurrencyResolution.CancelInProgress && len(concurrencyResult.BlockingRuns) > 0 {
266 metrics.ActionsConcurrencyQueuedTotal.Inc()
267 }
268
269 // check_run rows: separate concern, post-commit so the run+jobs+
270 // steps are durable before we touch a different subsystem. ExternalID
271 // idempotency means a retry of just this phase converges cleanly.
272 checkRunIDs := make([]int64, 0, len(p.Workflow.Jobs))
273 for i, j := range p.Workflow.Jobs {
274 extID := fmt.Sprintf("workflow_run:%d:job:%s", run.ID, j.Key)
275 name := j.Name
276 if name == "" {
277 name = j.Key
278 }
279 cr, err := checks.Create(ctx, checks.Deps{Pool: deps.Pool, Logger: deps.Logger}, checks.CreateParams{
280 RepoID: p.RepoID,
281 HeadSHA: p.HeadSHA,
282 AppSlug: "shithub-actions",
283 Name: name,
284 Status: "queued",
285 ExternalID: extID,
286 })
287 if err != nil {
288 // Don't roll back the run on check_run failure — log and
289 // continue. The run is queued and visible; checks can
290 // reconcile via a future retry of just this loop.
291 deps.Logger.WarnContext(ctx, "trigger: check_run create failed",
292 "run_id", run.ID, "job_key", j.Key, "error", err)
293 continue
294 }
295 checkRunIDs = append(checkRunIDs, cr.ID)
296 _ = jobIDs[i] // job_id linkage to check_run lands in S41c when the runner consumes both
297 }
298
299 metrics.ActionsRunsEnqueuedTotal.WithLabelValues(string(p.EventKind), "fresh").Inc()
300 return Result{
301 RunID: run.ID,
302 RunIndex: run.RunIndex,
303 CheckRunIDs: checkRunIDs,
304 }, nil
305 }
306
307 func validateParams(p *EnqueueParams) error {
308 if p.RepoID == 0 {
309 return errors.New("trigger: RepoID required")
310 }
311 if p.WorkflowFile == "" {
312 return errors.New("trigger: WorkflowFile required")
313 }
314 if p.HeadSHA == "" {
315 return errors.New("trigger: HeadSHA required")
316 }
317 if p.TriggerEventID == "" {
318 return errors.New("trigger: TriggerEventID required (empty would bypass idempotency)")
319 }
320 if p.Workflow == nil {
321 return errors.New("trigger: Workflow required")
322 }
323 if len(p.Workflow.Jobs) == 0 {
324 return errors.New("trigger: workflow has no jobs")
325 }
326 switch p.EventKind {
327 case EventPush, EventPullRequest, EventSchedule, EventWorkflowDispatch:
328 default:
329 return fmt.Errorf("trigger: unsupported event kind %q", p.EventKind)
330 }
331 return nil
332 }
333
334 // lookupExistingRun finds a workflow_run by the trigger_event_id key
335 // after EnqueueWorkflowRun reported a conflict. Done outside the
336 // inner tx (which we committed empty) so the caller has a stable
337 // RunID to surface.
338 func lookupExistingRun(ctx context.Context, pool *pgxpool.Pool, p EnqueueParams) (actionsdb.WorkflowRun, error) {
339 q := actionsdb.New()
340 rows, err := q.LookupWorkflowRunByTriggerEvent(ctx, pool, actionsdb.LookupWorkflowRunByTriggerEventParams{
341 RepoID: p.RepoID,
342 WorkflowFile: p.WorkflowFile,
343 TriggerEventID: p.TriggerEventID,
344 })
345 if err != nil {
346 return actionsdb.WorkflowRun{}, err
347 }
348 return rows, nil
349 }
350
351 func pgInt8(v int64) pgtype.Int8 {
352 return pgtype.Int8{Int64: v, Valid: v != 0}
353 }
354
355 // marshalEnv encodes a workflow.Value-keyed map to jsonb-friendly
356 // {string: string} for the step_env / job_env / step_with columns.
357 // We don't persist the parser-side struct directly — the runner only
358 // needs the resolved string value.
359 func marshalEnv(m map[string]workflow.Value) ([]byte, error) {
360 if len(m) == 0 {
361 return []byte("{}"), nil
362 }
363 out := make(map[string]string, len(m))
364 for k, v := range m {
365 out[k] = v.Raw
366 }
367 return json.Marshal(out)
368 }
369
370 // marshalPermissions flattens a workflow.Permissions into the jsonb
371 // shape S41c will consume. v1 just stores Mode + Per — no resolution.
372 func marshalPermissions(p workflow.Permissions) ([]byte, error) {
373 if p.Mode == "" && len(p.Per) == 0 {
374 return []byte("{}"), nil
375 }
376 out := map[string]any{}
377 if p.Mode != "" {
378 out["mode"] = p.Mode
379 }
380 if len(p.Per) > 0 {
381 per := make(map[string]string, len(p.Per))
382 for k, v := range p.Per {
383 per[k] = string(v)
384 }
385 out["per"] = per
386 }
387 return json.Marshal(out)
388 }
389