| 1 | // SPDX-License-Identifier: AGPL-3.0-or-later |
| 2 | |
| 3 | package jobs |
| 4 | |
| 5 | import ( |
| 6 | "context" |
| 7 | "encoding/json" |
| 8 | "errors" |
| 9 | "fmt" |
| 10 | "log/slog" |
| 11 | |
| 12 | "github.com/jackc/pgx/v5" |
| 13 | "github.com/jackc/pgx/v5/pgxpool" |
| 14 | |
| 15 | actionsevent "github.com/tenseleyFlow/shithub/internal/actions/event" |
| 16 | "github.com/tenseleyFlow/shithub/internal/actions/trigger" |
| 17 | "github.com/tenseleyFlow/shithub/internal/infra/storage" |
| 18 | issuesdb "github.com/tenseleyFlow/shithub/internal/issues/sqlc" |
| 19 | orgsdb "github.com/tenseleyFlow/shithub/internal/orgs/sqlc" |
| 20 | "github.com/tenseleyFlow/shithub/internal/pulls" |
| 21 | pullsdb "github.com/tenseleyFlow/shithub/internal/pulls/sqlc" |
| 22 | gitops "github.com/tenseleyFlow/shithub/internal/repos/git" |
| 23 | reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc" |
| 24 | usersdb "github.com/tenseleyFlow/shithub/internal/users/sqlc" |
| 25 | "github.com/tenseleyFlow/shithub/internal/worker" |
| 26 | ) |
| 27 | |
| 28 | // PRJobsDeps are shared by the three PR job handlers. |
| 29 | type PRJobsDeps struct { |
| 30 | Pool *pgxpool.Pool |
| 31 | RepoFS *storage.RepoFS |
| 32 | Logger *slog.Logger |
| 33 | } |
| 34 | |
| 35 | // PRSynchronizePayload — enqueued from push:process for any open PR |
| 36 | // whose head_repo_id+head_ref match the pushed ref. |
| 37 | type PRSynchronizePayload struct { |
| 38 | PRID int64 `json:"pr_id"` |
| 39 | } |
| 40 | |
| 41 | // PRSynchronize refreshes commits + files for a PR and queues a |
| 42 | // mergeability recompute. |
| 43 | func PRSynchronize(deps PRJobsDeps) worker.Handler { |
| 44 | return func(ctx context.Context, raw json.RawMessage) error { |
| 45 | var p PRSynchronizePayload |
| 46 | if err := json.Unmarshal(raw, &p); err != nil { |
| 47 | return worker.PoisonError(fmt.Errorf("bad payload: %w", err)) |
| 48 | } |
| 49 | if p.PRID == 0 { |
| 50 | return worker.PoisonError(errors.New("missing pr_id")) |
| 51 | } |
| 52 | gitDir, err := resolveGitDirForPR(ctx, deps.Pool, deps.RepoFS, p.PRID) |
| 53 | if err != nil { |
| 54 | return err |
| 55 | } |
| 56 | if err := pulls.Synchronize(ctx, pulls.Deps{Pool: deps.Pool, Logger: deps.Logger}, gitDir, p.PRID); err != nil { |
| 57 | return err |
| 58 | } |
| 59 | // Chain a mergeability tick. |
| 60 | if _, err := worker.Enqueue(ctx, deps.Pool, worker.KindPRMergeability, |
| 61 | map[string]any{"pr_id": p.PRID}, worker.EnqueueOptions{}); err != nil { |
| 62 | deps.Logger.WarnContext(ctx, "pr:synchronize: enqueue mergeability", "pr_id", p.PRID, "error", err) |
| 63 | } |
| 64 | |
| 65 | // Actions trigger (S41b). On PR head movement, fan out a |
| 66 | // workflow:trigger with action="synchronize". Best-effort — |
| 67 | // failures here log and let the rest of the synchronize chain |
| 68 | // complete. |
| 69 | if err := enqueuePRActionsTrigger(ctx, deps, p.PRID, "synchronize"); err != nil { |
| 70 | deps.Logger.WarnContext(ctx, "pr:synchronize: enqueue workflow:trigger", |
| 71 | "pr_id", p.PRID, "error", err) |
| 72 | } |
| 73 | |
| 74 | _ = worker.Notify(ctx, deps.Pool) |
| 75 | return nil |
| 76 | } |
| 77 | } |
| 78 | |
| 79 | // PRMergeabilityPayload — enqueued from synchronize and from PR |
| 80 | // open. Recomputes the merge-tree probe. |
| 81 | type PRMergeabilityPayload struct { |
| 82 | PRID int64 `json:"pr_id"` |
| 83 | } |
| 84 | |
| 85 | // PRMergeability runs `git merge-tree --write-tree` and persists the |
| 86 | // result. |
| 87 | func PRMergeability(deps PRJobsDeps) worker.Handler { |
| 88 | return func(ctx context.Context, raw json.RawMessage) error { |
| 89 | var p PRMergeabilityPayload |
| 90 | if err := json.Unmarshal(raw, &p); err != nil { |
| 91 | return worker.PoisonError(fmt.Errorf("bad payload: %w", err)) |
| 92 | } |
| 93 | if p.PRID == 0 { |
| 94 | return worker.PoisonError(errors.New("missing pr_id")) |
| 95 | } |
| 96 | gitDir, err := resolveGitDirForPR(ctx, deps.Pool, deps.RepoFS, p.PRID) |
| 97 | if err != nil { |
| 98 | return err |
| 99 | } |
| 100 | return pulls.Mergeability(ctx, pulls.Deps{Pool: deps.Pool, Logger: deps.Logger}, gitDir, p.PRID) |
| 101 | } |
| 102 | } |
| 103 | |
| 104 | // resolveGitDirForPR turns a PR id into the bare-repo path on disk |
| 105 | // using the repo + owner-user lookups. Cached per-call only; the |
| 106 | // caller's job-level context bounds the work. |
| 107 | func resolveGitDirForPR(ctx context.Context, pool *pgxpool.Pool, rfs *storage.RepoFS, prID int64) (string, error) { |
| 108 | pr, err := pullsdb.New().GetPullRequestByIssueID(ctx, pool, prID) |
| 109 | if err != nil { |
| 110 | if errors.Is(err, pgx.ErrNoRows) { |
| 111 | return "", worker.PoisonError(fmt.Errorf("pr %d not found", prID)) |
| 112 | } |
| 113 | return "", err |
| 114 | } |
| 115 | ownerRow, err := reposdb.New().GetRepoOwnerUsernameByID(ctx, pool, pr.HeadRepoID) |
| 116 | if err != nil { |
| 117 | if errors.Is(err, pgx.ErrNoRows) { |
| 118 | return "", worker.PoisonError(fmt.Errorf("pr %d: repo %d not found", prID, pr.HeadRepoID)) |
| 119 | } |
| 120 | return "", fmt.Errorf("load repo owner: %w", err) |
| 121 | } |
| 122 | ownerSlug, err := ownerSlugString(ownerRow.OwnerUsername) |
| 123 | if err != nil { |
| 124 | return "", worker.PoisonError(fmt.Errorf("pr %d: repo owner slug: %w", prID, err)) |
| 125 | } |
| 126 | return rfs.RepoPath(ownerSlug, ownerRow.RepoName) |
| 127 | } |
| 128 | |
| 129 | // enqueuePRActionsTrigger fans out a workflow:trigger job for a PR |
| 130 | // state transition (action ∈ {"opened", "synchronize"} for v1). |
| 131 | // |
| 132 | // Trust gate: the trigger handler evaluates the PR author through |
| 133 | // internal/auth/policy. Trusted same-repo collaborators run immediately; |
| 134 | // untrusted PRs are queued in an approval-required state when policy allows. |
| 135 | func enqueuePRActionsTrigger(ctx context.Context, deps PRJobsDeps, prID int64, action string) error { |
| 136 | pr, err := pullsdb.New().GetPullRequestByIssueID(ctx, deps.Pool, prID) |
| 137 | if err != nil { |
| 138 | return fmt.Errorf("load pr: %w", err) |
| 139 | } |
| 140 | issue, err := issuesdb.New().GetIssueByID(ctx, deps.Pool, pr.IssueID) |
| 141 | if err != nil { |
| 142 | return fmt.Errorf("load issue: %w", err) |
| 143 | } |
| 144 | repo, err := reposdb.New().GetRepoByID(ctx, deps.Pool, pr.HeadRepoID) |
| 145 | if err != nil { |
| 146 | return fmt.Errorf("load repo: %w", err) |
| 147 | } |
| 148 | |
| 149 | if !issue.AuthorUserID.Valid { |
| 150 | deps.Logger.InfoContext(ctx, "pr: skipping workflow:trigger (missing author)", |
| 151 | "pr_id", prID, "action", action) |
| 152 | return nil |
| 153 | } |
| 154 | |
| 155 | ownerLogin, err := resolvePRActionsOwnerLogin(ctx, deps.Pool, repo) |
| 156 | if err != nil { |
| 157 | return fmt.Errorf("load owner: %w", err) |
| 158 | } |
| 159 | gitDir, err := deps.RepoFS.RepoPath(ownerLogin, repo.Name) |
| 160 | if err != nil { |
| 161 | return fmt.Errorf("repo path: %w", err) |
| 162 | } |
| 163 | |
| 164 | // Changed paths: head_oid against base_oid for paths: filter |
| 165 | // evaluation. Best-effort — if the diff fails (e.g., tip pruned) |
| 166 | // we proceed without paths, and paths-filtered workflows won't |
| 167 | // trigger. |
| 168 | changed, err := gitops.ChangedPaths(ctx, gitDir, pr.BaseOid, pr.HeadOid) |
| 169 | if err != nil { |
| 170 | deps.Logger.WarnContext(ctx, "pr: changed-paths failed", |
| 171 | "pr_id", prID, "error", err) |
| 172 | changed = nil |
| 173 | } |
| 174 | |
| 175 | authorLogin := "" |
| 176 | if u, err := usersdb.New().GetUserByID(ctx, deps.Pool, issue.AuthorUserID.Int64); err == nil { |
| 177 | authorLogin = u.Username |
| 178 | } |
| 179 | payload := actionsevent.PullRequest( |
| 180 | action, issue.Number, issue.Title, |
| 181 | actionsevent.PRRef{Ref: pr.HeadRef, SHA: pr.HeadOid}, |
| 182 | actionsevent.PRRef{Ref: pr.BaseRef, SHA: pr.BaseOid}, |
| 183 | authorLogin, |
| 184 | ) |
| 185 | |
| 186 | job := trigger.JobPayload{ |
| 187 | RepoID: repo.ID, |
| 188 | HeadSHA: pr.HeadOid, |
| 189 | HeadRef: "refs/heads/" + pr.HeadRef, |
| 190 | EventKind: trigger.EventPullRequest, |
| 191 | EventPayload: payload, |
| 192 | ActorUserID: issue.AuthorUserID.Int64, |
| 193 | TriggerEventID: fmt.Sprintf("pr_%s:%d:%s", action, prID, pr.HeadOid), |
| 194 | Action: action, |
| 195 | BaseRef: pr.BaseRef, |
| 196 | HeadRefShort: pr.HeadRef, |
| 197 | ChangedPaths: changed, |
| 198 | } |
| 199 | if _, err := worker.Enqueue(ctx, deps.Pool, trigger.KindWorkflowTrigger, job, worker.EnqueueOptions{}); err != nil { |
| 200 | return fmt.Errorf("enqueue: %w", err) |
| 201 | } |
| 202 | return nil |
| 203 | } |
| 204 | |
| 205 | func resolvePRActionsOwnerLogin(ctx context.Context, pool *pgxpool.Pool, repo reposdb.Repo) (string, error) { |
| 206 | if repo.OwnerUserID.Valid { |
| 207 | u, err := usersdb.New().GetUserByID(ctx, pool, repo.OwnerUserID.Int64) |
| 208 | if err != nil { |
| 209 | return "", err |
| 210 | } |
| 211 | return u.Username, nil |
| 212 | } |
| 213 | if repo.OwnerOrgID.Valid { |
| 214 | o, err := orgsdb.New().GetOrgByID(ctx, pool, repo.OwnerOrgID.Int64) |
| 215 | if err != nil { |
| 216 | return "", err |
| 217 | } |
| 218 | return o.Slug, nil |
| 219 | } |
| 220 | return "", errors.New("repo has neither owner_user_id nor owner_org_id") |
| 221 | } |
| 222 |