Go · 7634 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 package jobs
4
5 import (
6 "context"
7 "encoding/json"
8 "errors"
9 "fmt"
10 "log/slog"
11
12 "github.com/jackc/pgx/v5"
13 "github.com/jackc/pgx/v5/pgxpool"
14
15 actionsevent "github.com/tenseleyFlow/shithub/internal/actions/event"
16 "github.com/tenseleyFlow/shithub/internal/actions/trigger"
17 "github.com/tenseleyFlow/shithub/internal/infra/storage"
18 issuesdb "github.com/tenseleyFlow/shithub/internal/issues/sqlc"
19 orgsdb "github.com/tenseleyFlow/shithub/internal/orgs/sqlc"
20 "github.com/tenseleyFlow/shithub/internal/pulls"
21 pullsdb "github.com/tenseleyFlow/shithub/internal/pulls/sqlc"
22 gitops "github.com/tenseleyFlow/shithub/internal/repos/git"
23 reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
24 usersdb "github.com/tenseleyFlow/shithub/internal/users/sqlc"
25 "github.com/tenseleyFlow/shithub/internal/worker"
26 )
27
28 // PRJobsDeps are shared by the three PR job handlers.
29 type PRJobsDeps struct {
30 Pool *pgxpool.Pool
31 RepoFS *storage.RepoFS
32 Logger *slog.Logger
33 }
34
35 // PRSynchronizePayload — enqueued from push:process for any open PR
36 // whose head_repo_id+head_ref match the pushed ref.
37 type PRSynchronizePayload struct {
38 PRID int64 `json:"pr_id"`
39 }
40
41 // PRSynchronize refreshes commits + files for a PR and queues a
42 // mergeability recompute.
43 func PRSynchronize(deps PRJobsDeps) worker.Handler {
44 return func(ctx context.Context, raw json.RawMessage) error {
45 var p PRSynchronizePayload
46 if err := json.Unmarshal(raw, &p); err != nil {
47 return worker.PoisonError(fmt.Errorf("bad payload: %w", err))
48 }
49 if p.PRID == 0 {
50 return worker.PoisonError(errors.New("missing pr_id"))
51 }
52 gitDir, err := resolveGitDirForPR(ctx, deps.Pool, deps.RepoFS, p.PRID)
53 if err != nil {
54 return err
55 }
56 if err := pulls.Synchronize(ctx, pulls.Deps{Pool: deps.Pool, Logger: deps.Logger}, gitDir, p.PRID); err != nil {
57 return err
58 }
59 // Chain a mergeability tick.
60 if _, err := worker.Enqueue(ctx, deps.Pool, worker.KindPRMergeability,
61 map[string]any{"pr_id": p.PRID}, worker.EnqueueOptions{}); err != nil {
62 deps.Logger.WarnContext(ctx, "pr:synchronize: enqueue mergeability", "pr_id", p.PRID, "error", err)
63 }
64
65 // Actions trigger (S41b). On PR head movement, fan out a
66 // workflow:trigger with action="synchronize". Best-effort —
67 // failures here log and let the rest of the synchronize chain
68 // complete.
69 if err := enqueuePRActionsTrigger(ctx, deps, p.PRID, "synchronize"); err != nil {
70 deps.Logger.WarnContext(ctx, "pr:synchronize: enqueue workflow:trigger",
71 "pr_id", p.PRID, "error", err)
72 }
73
74 _ = worker.Notify(ctx, deps.Pool)
75 return nil
76 }
77 }
78
79 // PRMergeabilityPayload — enqueued from synchronize and from PR
80 // open. Recomputes the merge-tree probe.
81 type PRMergeabilityPayload struct {
82 PRID int64 `json:"pr_id"`
83 }
84
85 // PRMergeability runs `git merge-tree --write-tree` and persists the
86 // result.
87 func PRMergeability(deps PRJobsDeps) worker.Handler {
88 return func(ctx context.Context, raw json.RawMessage) error {
89 var p PRMergeabilityPayload
90 if err := json.Unmarshal(raw, &p); err != nil {
91 return worker.PoisonError(fmt.Errorf("bad payload: %w", err))
92 }
93 if p.PRID == 0 {
94 return worker.PoisonError(errors.New("missing pr_id"))
95 }
96 gitDir, err := resolveGitDirForPR(ctx, deps.Pool, deps.RepoFS, p.PRID)
97 if err != nil {
98 return err
99 }
100 return pulls.Mergeability(ctx, pulls.Deps{Pool: deps.Pool, Logger: deps.Logger}, gitDir, p.PRID)
101 }
102 }
103
104 // resolveGitDirForPR turns a PR id into the bare-repo path on disk
105 // using the repo + owner-user lookups. Cached per-call only; the
106 // caller's job-level context bounds the work.
107 func resolveGitDirForPR(ctx context.Context, pool *pgxpool.Pool, rfs *storage.RepoFS, prID int64) (string, error) {
108 pr, err := pullsdb.New().GetPullRequestByIssueID(ctx, pool, prID)
109 if err != nil {
110 if errors.Is(err, pgx.ErrNoRows) {
111 return "", worker.PoisonError(fmt.Errorf("pr %d not found", prID))
112 }
113 return "", err
114 }
115 ownerRow, err := reposdb.New().GetRepoOwnerUsernameByID(ctx, pool, pr.HeadRepoID)
116 if err != nil {
117 if errors.Is(err, pgx.ErrNoRows) {
118 return "", worker.PoisonError(fmt.Errorf("pr %d: repo %d not found", prID, pr.HeadRepoID))
119 }
120 return "", fmt.Errorf("load repo owner: %w", err)
121 }
122 ownerSlug, err := ownerSlugString(ownerRow.OwnerUsername)
123 if err != nil {
124 return "", worker.PoisonError(fmt.Errorf("pr %d: repo owner slug: %w", prID, err))
125 }
126 return rfs.RepoPath(ownerSlug, ownerRow.RepoName)
127 }
128
129 // enqueuePRActionsTrigger fans out a workflow:trigger job for a PR
130 // state transition (action ∈ {"opened", "synchronize"} for v1).
131 //
132 // Trust gate: the trigger handler evaluates the PR author through
133 // internal/auth/policy. Trusted same-repo collaborators run immediately;
134 // untrusted PRs are queued in an approval-required state when policy allows.
135 func enqueuePRActionsTrigger(ctx context.Context, deps PRJobsDeps, prID int64, action string) error {
136 pr, err := pullsdb.New().GetPullRequestByIssueID(ctx, deps.Pool, prID)
137 if err != nil {
138 return fmt.Errorf("load pr: %w", err)
139 }
140 issue, err := issuesdb.New().GetIssueByID(ctx, deps.Pool, pr.IssueID)
141 if err != nil {
142 return fmt.Errorf("load issue: %w", err)
143 }
144 repo, err := reposdb.New().GetRepoByID(ctx, deps.Pool, pr.HeadRepoID)
145 if err != nil {
146 return fmt.Errorf("load repo: %w", err)
147 }
148
149 if !issue.AuthorUserID.Valid {
150 deps.Logger.InfoContext(ctx, "pr: skipping workflow:trigger (missing author)",
151 "pr_id", prID, "action", action)
152 return nil
153 }
154
155 ownerLogin, err := resolvePRActionsOwnerLogin(ctx, deps.Pool, repo)
156 if err != nil {
157 return fmt.Errorf("load owner: %w", err)
158 }
159 gitDir, err := deps.RepoFS.RepoPath(ownerLogin, repo.Name)
160 if err != nil {
161 return fmt.Errorf("repo path: %w", err)
162 }
163
164 // Changed paths: head_oid against base_oid for paths: filter
165 // evaluation. Best-effort — if the diff fails (e.g., tip pruned)
166 // we proceed without paths, and paths-filtered workflows won't
167 // trigger.
168 changed, err := gitops.ChangedPaths(ctx, gitDir, pr.BaseOid, pr.HeadOid)
169 if err != nil {
170 deps.Logger.WarnContext(ctx, "pr: changed-paths failed",
171 "pr_id", prID, "error", err)
172 changed = nil
173 }
174
175 authorLogin := ""
176 if u, err := usersdb.New().GetUserByID(ctx, deps.Pool, issue.AuthorUserID.Int64); err == nil {
177 authorLogin = u.Username
178 }
179 payload := actionsevent.PullRequest(
180 action, issue.Number, issue.Title,
181 actionsevent.PRRef{Ref: pr.HeadRef, SHA: pr.HeadOid},
182 actionsevent.PRRef{Ref: pr.BaseRef, SHA: pr.BaseOid},
183 authorLogin,
184 )
185
186 job := trigger.JobPayload{
187 RepoID: repo.ID,
188 HeadSHA: pr.HeadOid,
189 HeadRef: "refs/heads/" + pr.HeadRef,
190 EventKind: trigger.EventPullRequest,
191 EventPayload: payload,
192 ActorUserID: issue.AuthorUserID.Int64,
193 TriggerEventID: fmt.Sprintf("pr_%s:%d:%s", action, prID, pr.HeadOid),
194 Action: action,
195 BaseRef: pr.BaseRef,
196 HeadRefShort: pr.HeadRef,
197 ChangedPaths: changed,
198 }
199 if _, err := worker.Enqueue(ctx, deps.Pool, trigger.KindWorkflowTrigger, job, worker.EnqueueOptions{}); err != nil {
200 return fmt.Errorf("enqueue: %w", err)
201 }
202 return nil
203 }
204
205 func resolvePRActionsOwnerLogin(ctx context.Context, pool *pgxpool.Pool, repo reposdb.Repo) (string, error) {
206 if repo.OwnerUserID.Valid {
207 u, err := usersdb.New().GetUserByID(ctx, pool, repo.OwnerUserID.Int64)
208 if err != nil {
209 return "", err
210 }
211 return u.Username, nil
212 }
213 if repo.OwnerOrgID.Valid {
214 o, err := orgsdb.New().GetOrgByID(ctx, pool, repo.OwnerOrgID.Int64)
215 if err != nil {
216 return "", err
217 }
218 return o.Slug, nil
219 }
220 return "", errors.New("repo has neither owner_user_id nor owner_org_id")
221 }
222