| 1 | // SPDX-License-Identifier: AGPL-3.0-or-later |
| 2 | |
| 3 | package trigger |
| 4 | |
| 5 | import ( |
| 6 | "context" |
| 7 | "encoding/json" |
| 8 | "errors" |
| 9 | "fmt" |
| 10 | "log/slog" |
| 11 | |
| 12 | "github.com/jackc/pgx/v5" |
| 13 | "github.com/jackc/pgx/v5/pgtype" |
| 14 | "github.com/jackc/pgx/v5/pgxpool" |
| 15 | |
| 16 | "github.com/tenseleyFlow/shithub/internal/actions/checksync" |
| 17 | "github.com/tenseleyFlow/shithub/internal/actions/concurrency" |
| 18 | actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc" |
| 19 | "github.com/tenseleyFlow/shithub/internal/actions/workflow" |
| 20 | "github.com/tenseleyFlow/shithub/internal/checks" |
| 21 | "github.com/tenseleyFlow/shithub/internal/infra/metrics" |
| 22 | ) |
| 23 | |
| 24 | // Deps wires the trigger pipeline against runtime infra. |
| 25 | type Deps struct { |
| 26 | Pool *pgxpool.Pool |
| 27 | Logger *slog.Logger |
| 28 | } |
| 29 | |
| 30 | // EnqueueParams is the input to Enqueue. The caller (trigger handler) |
| 31 | // has already discovered + parsed the workflow and built the typed |
| 32 | // event payload via internal/actions/event. |
| 33 | type EnqueueParams struct { |
| 34 | // RepoID identifies the repo the run belongs to. |
| 35 | RepoID int64 |
| 36 | |
| 37 | // WorkflowFile is the repo-relative path of the parsed file |
| 38 | // (e.g. ".shithub/workflows/ci.yml"). Used as part of the |
| 39 | // idempotency key so two workflows in the same repo on the same |
| 40 | // SHA each get their own run. |
| 41 | WorkflowFile string |
| 42 | |
| 43 | // HeadSHA is the commit the run targets — for push: the after-sha; |
| 44 | // for PR: the head-sha; for dispatch: the resolved-at-dispatch-time |
| 45 | // SHA; for schedule: the default-branch tip at sweep time. |
| 46 | HeadSHA string |
| 47 | |
| 48 | // HeadRef is the symbolic ref (e.g. "refs/heads/main", |
| 49 | // "refs/heads/feature/foo"). Optional — empty when not |
| 50 | // applicable (schedule). |
| 51 | HeadRef string |
| 52 | |
| 53 | // EventKind is one of the four supported triggers; persisted to |
| 54 | // workflow_runs.event for filtering on the runs list. |
| 55 | EventKind EventKind |
| 56 | |
| 57 | // EventPayload is the canonical shithub.event payload built by the |
| 58 | // matching constructor in internal/actions/event. Stored as |
| 59 | // jsonb on workflow_runs.event_payload; the runner consumes via |
| 60 | // expr.evalEventPath. |
| 61 | EventPayload map[string]any |
| 62 | |
| 63 | // ActorUserID is the user who triggered the run (pusher, |
| 64 | // PR-opener, dispatcher). Zero for schedule (system trigger). |
| 65 | ActorUserID int64 |
| 66 | |
| 67 | // ParentRunID is set for re-runs to chain back to the original. |
| 68 | // Zero for fresh triggers. |
| 69 | ParentRunID int64 |
| 70 | |
| 71 | // TriggerEventID is the stable identifier of the triggering |
| 72 | // event. See migration 0051's header comment for the construction |
| 73 | // convention. Required (empty string is a programmer error). |
| 74 | TriggerEventID string |
| 75 | |
| 76 | // Workflow is the parsed workflow. The matching jobs/steps are |
| 77 | // persisted; concurrency.group is resolved against the trigger |
| 78 | // context and enforced before runners can claim younger jobs. |
| 79 | Workflow *workflow.Workflow |
| 80 | } |
| 81 | |
| 82 | // Result reports the outcome of an Enqueue call. |
| 83 | type Result struct { |
| 84 | // RunID is the workflow_runs.id of the persisted (or already- |
| 85 | // existing) row. Always non-zero when err is nil. |
| 86 | RunID int64 |
| 87 | // RunIndex is the per-repo monotonic index (used for stable URLs). |
| 88 | RunIndex int64 |
| 89 | // AlreadyExists is true when a prior call with the same |
| 90 | // trigger_event_id had already created the run; in this case the |
| 91 | // child jobs/steps/check_runs were NOT re-inserted. Caller can |
| 92 | // log + move on. |
| 93 | AlreadyExists bool |
| 94 | // CheckRunIDs is the list of check_runs.id rows created (or |
| 95 | // looked up via ExternalID) for the workflow's jobs. Order |
| 96 | // matches Workflow.Jobs declaration order. |
| 97 | CheckRunIDs []int64 |
| 98 | } |
| 99 | |
| 100 | // Enqueue persists a matched workflow as a queued run with all its |
| 101 | // jobs + steps in one transaction, then creates one check_run per |
| 102 | // job (idempotent via ExternalID) outside the tx so the run+jobs+ |
| 103 | // steps remain atomic but check_run drift can be reconciled. |
| 104 | // |
| 105 | // On conflict (the trigger_event_id has been used for this |
| 106 | // workflow_file before), returns Result{AlreadyExists: true} after |
| 107 | // looking up the existing run — the handler treats that as success. |
| 108 | // |
| 109 | // On any other error, the inner tx is rolled back; no rows persist. |
| 110 | func Enqueue(ctx context.Context, deps Deps, p EnqueueParams) (Result, error) { |
| 111 | if err := validateParams(&p); err != nil { |
| 112 | return Result{}, err |
| 113 | } |
| 114 | concurrencyResolution, err := concurrency.Resolve(concurrency.ResolveInput{ |
| 115 | Workflow: p.Workflow, |
| 116 | EventPayload: p.EventPayload, |
| 117 | HeadSHA: p.HeadSHA, |
| 118 | HeadRef: p.HeadRef, |
| 119 | }) |
| 120 | if err != nil { |
| 121 | return Result{}, fmt.Errorf("trigger: concurrency: %w", err) |
| 122 | } |
| 123 | |
| 124 | q := actionsdb.New() |
| 125 | |
| 126 | tx, err := deps.Pool.Begin(ctx) |
| 127 | if err != nil { |
| 128 | return Result{}, fmt.Errorf("trigger: begin tx: %w", err) |
| 129 | } |
| 130 | committed := false |
| 131 | defer func() { |
| 132 | if !committed { |
| 133 | _ = tx.Rollback(ctx) |
| 134 | } |
| 135 | }() |
| 136 | |
| 137 | // run_index is per-repo; lookup MAX+1. Concurrent inserts on the |
| 138 | // same repo race here, but the (repo_id, run_index) UNIQUE catches |
| 139 | // it as a unique-violation and the caller (worker handler) retries. |
| 140 | // Realistic concurrency on a single repo's triggers is low for v1. |
| 141 | runIndex, err := q.NextRunIndexForRepo(ctx, tx, p.RepoID) |
| 142 | if err != nil { |
| 143 | return Result{}, fmt.Errorf("trigger: next run index: %w", err) |
| 144 | } |
| 145 | |
| 146 | payloadBytes, err := json.Marshal(p.EventPayload) |
| 147 | if err != nil { |
| 148 | return Result{}, fmt.Errorf("trigger: marshal event payload: %w", err) |
| 149 | } |
| 150 | |
| 151 | run, err := q.EnqueueWorkflowRun(ctx, tx, actionsdb.EnqueueWorkflowRunParams{ |
| 152 | RepoID: p.RepoID, |
| 153 | RunIndex: runIndex, |
| 154 | WorkflowFile: p.WorkflowFile, |
| 155 | WorkflowName: p.Workflow.Name, |
| 156 | HeadSha: p.HeadSHA, |
| 157 | HeadRef: p.HeadRef, |
| 158 | Event: actionsdb.WorkflowRunEvent(p.EventKind), |
| 159 | EventPayload: payloadBytes, |
| 160 | ActorUserID: pgInt8(p.ActorUserID), |
| 161 | ParentRunID: pgInt8(p.ParentRunID), |
| 162 | ConcurrencyGroup: concurrencyResolution.Group, |
| 163 | NeedApproval: false, |
| 164 | TriggerEventID: p.TriggerEventID, |
| 165 | }) |
| 166 | if err != nil { |
| 167 | // pgx.ErrNoRows = ON CONFLICT DO NOTHING fired. Lookup the |
| 168 | // existing row so the handler has a stable RunID to log. |
| 169 | if errors.Is(err, pgx.ErrNoRows) { |
| 170 | if err := tx.Commit(ctx); err != nil { |
| 171 | return Result{}, fmt.Errorf("trigger: commit empty tx: %w", err) |
| 172 | } |
| 173 | committed = true |
| 174 | existing, lookupErr := lookupExistingRun(ctx, deps.Pool, p) |
| 175 | if lookupErr != nil { |
| 176 | return Result{}, fmt.Errorf("trigger: existing-run lookup: %w", lookupErr) |
| 177 | } |
| 178 | metrics.ActionsRunsEnqueuedTotal.WithLabelValues(string(p.EventKind), "already_exists").Inc() |
| 179 | return Result{ |
| 180 | RunID: existing.ID, |
| 181 | RunIndex: existing.RunIndex, |
| 182 | AlreadyExists: true, |
| 183 | }, nil |
| 184 | } |
| 185 | return Result{}, fmt.Errorf("trigger: insert run: %w", err) |
| 186 | } |
| 187 | |
| 188 | // Persist child jobs + their steps. Order in Workflow.Jobs is YAML |
| 189 | // document order, which we preserve via job_index. |
| 190 | jobIDs := make([]int64, len(p.Workflow.Jobs)) |
| 191 | for i, j := range p.Workflow.Jobs { |
| 192 | needs := j.Needs |
| 193 | if needs == nil { |
| 194 | // Postgres text[] doesn't accept Go nil — use empty slice. |
| 195 | needs = []string{} |
| 196 | } |
| 197 | permsJSON, err := marshalPermissions(j.Permissions) |
| 198 | if err != nil { |
| 199 | return Result{}, fmt.Errorf("trigger: marshal permissions for job %s: %w", j.Key, err) |
| 200 | } |
| 201 | envJSON, err := marshalEnv(j.Env) |
| 202 | if err != nil { |
| 203 | return Result{}, fmt.Errorf("trigger: marshal env for job %s: %w", j.Key, err) |
| 204 | } |
| 205 | job, err := q.InsertWorkflowJob(ctx, tx, actionsdb.InsertWorkflowJobParams{ |
| 206 | RunID: run.ID, |
| 207 | JobIndex: int32(i), |
| 208 | JobKey: j.Key, |
| 209 | JobName: j.Name, |
| 210 | RunsOn: j.RunsOn, |
| 211 | NeedsJobs: needs, |
| 212 | IfExpr: j.If, |
| 213 | TimeoutMinutes: int32(j.TimeoutMinutes), |
| 214 | Permissions: permsJSON, |
| 215 | JobEnv: envJSON, |
| 216 | }) |
| 217 | if err != nil { |
| 218 | return Result{}, fmt.Errorf("trigger: insert job %s: %w", j.Key, err) |
| 219 | } |
| 220 | jobIDs[i] = job.ID |
| 221 | |
| 222 | for si, s := range j.Steps { |
| 223 | stepEnvJSON, err := marshalEnv(s.Env) |
| 224 | if err != nil { |
| 225 | return Result{}, fmt.Errorf("trigger: marshal step env: %w", err) |
| 226 | } |
| 227 | stepWithJSON, err := marshalEnv(s.With) |
| 228 | if err != nil { |
| 229 | return Result{}, fmt.Errorf("trigger: marshal step with: %w", err) |
| 230 | } |
| 231 | if _, err := q.InsertWorkflowStep(ctx, tx, actionsdb.InsertWorkflowStepParams{ |
| 232 | JobID: job.ID, |
| 233 | StepIndex: int32(si), |
| 234 | StepID: s.ID, |
| 235 | StepName: s.Name, |
| 236 | IfExpr: s.If, |
| 237 | RunCommand: s.Run, |
| 238 | UsesAlias: s.Uses, |
| 239 | WorkingDirectory: s.WorkingDirectory, |
| 240 | StepEnv: stepEnvJSON, |
| 241 | ContinueOnError: s.ContinueOnError, |
| 242 | StepWith: stepWithJSON, |
| 243 | }); err != nil { |
| 244 | return Result{}, fmt.Errorf("trigger: insert step %d for job %s: %w", si, j.Key, err) |
| 245 | } |
| 246 | } |
| 247 | } |
| 248 | |
| 249 | concurrencyResult, err := concurrency.Enforce(ctx, q, tx, concurrency.EnforceParams{ |
| 250 | Run: run, |
| 251 | CancelInProgress: concurrencyResolution.CancelInProgress, |
| 252 | }) |
| 253 | if err != nil { |
| 254 | return Result{}, fmt.Errorf("trigger: enforce concurrency: %w", err) |
| 255 | } |
| 256 | |
| 257 | if err := tx.Commit(ctx); err != nil { |
| 258 | return Result{}, fmt.Errorf("trigger: commit run tx: %w", err) |
| 259 | } |
| 260 | committed = true |
| 261 | |
| 262 | if len(concurrencyResult.CancelledJobs) > 0 { |
| 263 | metrics.ActionsJobsCancelledTotal.WithLabelValues(concurrency.CancelReason).Add(float64(len(concurrencyResult.CancelledJobs))) |
| 264 | checksync.ChangedJobs(ctx, checksync.Deps{Pool: deps.Pool, Logger: deps.Logger}, concurrencyResult.CancelledJobs) |
| 265 | } else if !concurrencyResolution.CancelInProgress && len(concurrencyResult.BlockingRuns) > 0 { |
| 266 | metrics.ActionsConcurrencyQueuedTotal.Inc() |
| 267 | } |
| 268 | |
| 269 | // check_run rows: separate concern, post-commit so the run+jobs+ |
| 270 | // steps are durable before we touch a different subsystem. ExternalID |
| 271 | // idempotency means a retry of just this phase converges cleanly. |
| 272 | checkRunIDs := make([]int64, 0, len(p.Workflow.Jobs)) |
| 273 | for i, j := range p.Workflow.Jobs { |
| 274 | extID := fmt.Sprintf("workflow_run:%d:job:%s", run.ID, j.Key) |
| 275 | name := j.Name |
| 276 | if name == "" { |
| 277 | name = j.Key |
| 278 | } |
| 279 | cr, err := checks.Create(ctx, checks.Deps{Pool: deps.Pool, Logger: deps.Logger}, checks.CreateParams{ |
| 280 | RepoID: p.RepoID, |
| 281 | HeadSHA: p.HeadSHA, |
| 282 | AppSlug: "shithub-actions", |
| 283 | Name: name, |
| 284 | Status: "queued", |
| 285 | ExternalID: extID, |
| 286 | }) |
| 287 | if err != nil { |
| 288 | // Don't roll back the run on check_run failure — log and |
| 289 | // continue. The run is queued and visible; checks can |
| 290 | // reconcile via a future retry of just this loop. |
| 291 | deps.Logger.WarnContext(ctx, "trigger: check_run create failed", |
| 292 | "run_id", run.ID, "job_key", j.Key, "error", err) |
| 293 | continue |
| 294 | } |
| 295 | checkRunIDs = append(checkRunIDs, cr.ID) |
| 296 | _ = jobIDs[i] // job_id linkage to check_run lands in S41c when the runner consumes both |
| 297 | } |
| 298 | |
| 299 | metrics.ActionsRunsEnqueuedTotal.WithLabelValues(string(p.EventKind), "fresh").Inc() |
| 300 | return Result{ |
| 301 | RunID: run.ID, |
| 302 | RunIndex: run.RunIndex, |
| 303 | CheckRunIDs: checkRunIDs, |
| 304 | }, nil |
| 305 | } |
| 306 | |
| 307 | func validateParams(p *EnqueueParams) error { |
| 308 | if p.RepoID == 0 { |
| 309 | return errors.New("trigger: RepoID required") |
| 310 | } |
| 311 | if p.WorkflowFile == "" { |
| 312 | return errors.New("trigger: WorkflowFile required") |
| 313 | } |
| 314 | if p.HeadSHA == "" { |
| 315 | return errors.New("trigger: HeadSHA required") |
| 316 | } |
| 317 | if p.TriggerEventID == "" { |
| 318 | return errors.New("trigger: TriggerEventID required (empty would bypass idempotency)") |
| 319 | } |
| 320 | if p.Workflow == nil { |
| 321 | return errors.New("trigger: Workflow required") |
| 322 | } |
| 323 | if len(p.Workflow.Jobs) == 0 { |
| 324 | return errors.New("trigger: workflow has no jobs") |
| 325 | } |
| 326 | switch p.EventKind { |
| 327 | case EventPush, EventPullRequest, EventSchedule, EventWorkflowDispatch: |
| 328 | default: |
| 329 | return fmt.Errorf("trigger: unsupported event kind %q", p.EventKind) |
| 330 | } |
| 331 | return nil |
| 332 | } |
| 333 | |
| 334 | // lookupExistingRun finds a workflow_run by the trigger_event_id key |
| 335 | // after EnqueueWorkflowRun reported a conflict. Done outside the |
| 336 | // inner tx (which we committed empty) so the caller has a stable |
| 337 | // RunID to surface. |
| 338 | func lookupExistingRun(ctx context.Context, pool *pgxpool.Pool, p EnqueueParams) (actionsdb.WorkflowRun, error) { |
| 339 | q := actionsdb.New() |
| 340 | rows, err := q.LookupWorkflowRunByTriggerEvent(ctx, pool, actionsdb.LookupWorkflowRunByTriggerEventParams{ |
| 341 | RepoID: p.RepoID, |
| 342 | WorkflowFile: p.WorkflowFile, |
| 343 | TriggerEventID: p.TriggerEventID, |
| 344 | }) |
| 345 | if err != nil { |
| 346 | return actionsdb.WorkflowRun{}, err |
| 347 | } |
| 348 | return rows, nil |
| 349 | } |
| 350 | |
| 351 | func pgInt8(v int64) pgtype.Int8 { |
| 352 | return pgtype.Int8{Int64: v, Valid: v != 0} |
| 353 | } |
| 354 | |
| 355 | // marshalEnv encodes a workflow.Value-keyed map to jsonb-friendly |
| 356 | // {string: string} for the step_env / job_env / step_with columns. |
| 357 | // We don't persist the parser-side struct directly — the runner only |
| 358 | // needs the resolved string value. |
| 359 | func marshalEnv(m map[string]workflow.Value) ([]byte, error) { |
| 360 | if len(m) == 0 { |
| 361 | return []byte("{}"), nil |
| 362 | } |
| 363 | out := make(map[string]string, len(m)) |
| 364 | for k, v := range m { |
| 365 | out[k] = v.Raw |
| 366 | } |
| 367 | return json.Marshal(out) |
| 368 | } |
| 369 | |
| 370 | // marshalPermissions flattens a workflow.Permissions into the jsonb |
| 371 | // shape S41c will consume. v1 just stores Mode + Per — no resolution. |
| 372 | func marshalPermissions(p workflow.Permissions) ([]byte, error) { |
| 373 | if p.Mode == "" && len(p.Per) == 0 { |
| 374 | return []byte("{}"), nil |
| 375 | } |
| 376 | out := map[string]any{} |
| 377 | if p.Mode != "" { |
| 378 | out["mode"] = p.Mode |
| 379 | } |
| 380 | if len(p.Per) > 0 { |
| 381 | per := make(map[string]string, len(p.Per)) |
| 382 | for k, v := range p.Per { |
| 383 | per[k] = string(v) |
| 384 | } |
| 385 | out["per"] = per |
| 386 | } |
| 387 | return json.Marshal(out) |
| 388 | } |
| 389 |