Go · 18867 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 // Package pulls owns the pull-request orchestrator. PRs reuse the S21
4 // `issues` row for title/body/state/timeline; this package owns the
5 // PR-specific surface — opening, synchronizing, mergeability detection,
6 // merge execution.
7 //
8 // Entry points are:
9 //
10 // Create — opens a PR (creates the issue row + the pull_requests row)
11 // Synchronize — refreshes commit + file lists + emits a synchronized event
12 // Mergeability — recomputes mergeable / mergeable_state via merge-tree
13 // Merge — performs the requested merge strategy in a temp worktree
14 // Edit — title/body
15 // SetState — close / reopen
16 // SetReady — draft → ready
17 package pulls
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "log/slog"
24 "strings"
25
26 "github.com/jackc/pgx/v5"
27 "github.com/jackc/pgx/v5/pgtype"
28 "github.com/jackc/pgx/v5/pgxpool"
29
30 actionsevent "github.com/tenseleyFlow/shithub/internal/actions/event"
31 "github.com/tenseleyFlow/shithub/internal/actions/trigger"
32 "github.com/tenseleyFlow/shithub/internal/auth/audit"
33 "github.com/tenseleyFlow/shithub/internal/checks"
34 "github.com/tenseleyFlow/shithub/internal/issues"
35 issuesdb "github.com/tenseleyFlow/shithub/internal/issues/sqlc"
36 mdrender "github.com/tenseleyFlow/shithub/internal/markdown"
37 "github.com/tenseleyFlow/shithub/internal/pulls/review"
38 pullsdb "github.com/tenseleyFlow/shithub/internal/pulls/sqlc"
39 repogit "github.com/tenseleyFlow/shithub/internal/repos/git"
40 "github.com/tenseleyFlow/shithub/internal/repos/protection"
41 reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
42 usersdb "github.com/tenseleyFlow/shithub/internal/users/sqlc"
43 "github.com/tenseleyFlow/shithub/internal/worker"
44 )
45
46 // Deps wires this package against the rest of the runtime.
47 type Deps struct {
48 Pool *pgxpool.Pool
49 Logger *slog.Logger
50 // Audit is optional; when non-nil, Merge and SetState write audit
51 // rows. Mirrors the issues orchestrator's contract so PR-side
52 // state changes are equally traceable (S00-S25 audit, M).
53 Audit *audit.Recorder
54 }
55
56 // Errors surfaced to handlers.
57 var (
58 ErrSameBranch = errors.New("pulls: base and head must differ")
59 ErrBaseNotFound = errors.New("pulls: base ref not found")
60 ErrHeadNotFound = errors.New("pulls: head ref not found")
61 ErrNoCommitsToMerge = errors.New("pulls: head has no commits ahead of base")
62 ErrAlreadyMerged = errors.New("pulls: already merged")
63 ErrAlreadyClosed = errors.New("pulls: already closed")
64 ErrMergeBlocked = errors.New("pulls: merge blocked (mergeable_state != clean)")
65 ErrMergeMethodOff = errors.New("pulls: requested merge method is disabled on this repo")
66 ErrConcurrentMerge = errors.New("pulls: PR is being merged by another request")
67 ErrPRNotFound = errors.New("pulls: PR not found")
68 )
69
70 // CreateParams describes a new-PR request.
71 type CreateParams struct {
72 RepoID int64
73 AuthorUserID int64
74 Title string
75 Body string
76 BaseRef string
77 HeadRef string
78 Draft bool
79 GitDir string // resolved from RepoFS by the caller
80 }
81
82 // CreateResult bundles the issue row + the PR row (post-snapshot).
83 type CreateResult struct {
84 Issue issuesdb.Issue
85 PullRequest pullsdb.PullRequest
86 }
87
88 // Create opens a PR. Validates that base/head are distinct and resolve
89 // in the on-disk repo, snapshots their OIDs, then creates the issues
90 // row + pull_requests row in one tx. Mergeability is `unknown` until
91 // the worker job ticks.
92 func Create(ctx context.Context, deps Deps, p CreateParams) (CreateResult, error) {
93 base := strings.TrimSpace(p.BaseRef)
94 head := strings.TrimSpace(p.HeadRef)
95 if base == "" || head == "" || base == head {
96 return CreateResult{}, ErrSameBranch
97 }
98 baseOID, err := repogit.ResolveRefOID(ctx, p.GitDir, base)
99 if err != nil {
100 if errors.Is(err, repogit.ErrRefNotFound) {
101 return CreateResult{}, ErrBaseNotFound
102 }
103 return CreateResult{}, fmt.Errorf("resolve base: %w", err)
104 }
105 headOID, err := repogit.ResolveRefOID(ctx, p.GitDir, head)
106 if err != nil {
107 if errors.Is(err, repogit.ErrRefNotFound) {
108 return CreateResult{}, ErrHeadNotFound
109 }
110 return CreateResult{}, fmt.Errorf("resolve head: %w", err)
111 }
112 if baseOID == headOID {
113 return CreateResult{}, ErrNoCommitsToMerge
114 }
115
116 // Open the issues row first via the issues orchestrator so we get
117 // the per-repo number allocation, body markdown render, and
118 // reference indexing for free.
119 issueRow, err := issues.Create(ctx, issues.Deps{Pool: deps.Pool, Logger: deps.Logger}, issues.CreateParams{
120 RepoID: p.RepoID,
121 AuthorUserID: p.AuthorUserID,
122 Title: p.Title,
123 Body: p.Body,
124 Kind: "pr",
125 })
126 if err != nil {
127 return CreateResult{}, err
128 }
129
130 prRow, err := pullsdb.New().CreatePullRequest(ctx, deps.Pool, pullsdb.CreatePullRequestParams{
131 IssueID: issueRow.ID,
132 BaseRef: base,
133 HeadRef: head,
134 HeadRepoID: p.RepoID,
135 BaseOid: baseOID,
136 HeadOid: headOID,
137 Draft: p.Draft,
138 })
139 if err != nil {
140 return CreateResult{}, fmt.Errorf("create pull_request: %w", err)
141 }
142
143 // Best-effort initial synchronize so the PR view has commits + files
144 // even before the worker queue runs. Failures here don't fail the
145 // open — the worker will retry on the next tick.
146 if err := refreshCommitsAndFiles(ctx, deps, p.GitDir, prRow.IssueID, baseOID, headOID); err != nil {
147 if deps.Logger != nil {
148 deps.Logger.WarnContext(ctx, "pulls: initial sync", "error", err, "pr_id", prRow.IssueID)
149 }
150 }
151
152 // Actions trigger (S41b): on PR open, fan out a workflow:trigger
153 // with action="opened". Best-effort — failures log and let the
154 // PR creation succeed. The collaborator gate lives in the PR
155 // trigger helper (pr_jobs.go); this site just enqueues with
156 // enough payload for the handler to evaluate.
157 if err := enqueueOpenedActionsTrigger(ctx, deps, p, prRow, issueRow.Number, baseOID, headOID); err != nil {
158 if deps.Logger != nil {
159 deps.Logger.WarnContext(ctx, "pulls: enqueue workflow:trigger",
160 "error", err, "pr_id", prRow.IssueID)
161 }
162 }
163
164 return CreateResult{Issue: issueRow, PullRequest: prRow}, nil
165 }
166
167 // enqueueOpenedActionsTrigger is the PR-create-side counterpart to
168 // jobs.enqueuePRActionsTrigger (which handles synchronize). Lives
169 // here in the pulls orchestrator so the open path stays
170 // self-contained — the alternative (a domain_event watcher in the
171 // jobs package) would need to round-trip through the queue just to
172 // observe the open.
173 //
174 // Trust gate: the trigger handler evaluates the author through
175 // internal/auth/policy and either queues the run as trusted or marks it
176 // approval-required. This helper only supplies the canonical PR event.
177 func enqueueOpenedActionsTrigger(ctx context.Context, deps Deps, p CreateParams, prRow pullsdb.PullRequest, prNumber int64, baseOID, headOID string) error {
178 repo, err := reposdb.New().GetRepoByID(ctx, deps.Pool, p.RepoID)
179 if err != nil {
180 return fmt.Errorf("load repo: %w", err)
181 }
182 changed, err := repogit.ChangedPaths(ctx, p.GitDir, baseOID, headOID)
183 if err != nil {
184 changed = nil // best-effort; path-filtered workflows skip
185 }
186 authorLogin := ""
187 if u, err := usersdb.New().GetUserByID(ctx, deps.Pool, p.AuthorUserID); err == nil {
188 authorLogin = u.Username
189 }
190 payload := actionsevent.PullRequest(
191 "opened", prNumber, p.Title,
192 actionsevent.PRRef{Ref: prRow.HeadRef, SHA: prRow.HeadOid},
193 actionsevent.PRRef{Ref: prRow.BaseRef, SHA: prRow.BaseOid},
194 authorLogin,
195 )
196 job := trigger.JobPayload{
197 RepoID: repo.ID,
198 HeadSHA: prRow.HeadOid,
199 HeadRef: "refs/heads/" + prRow.HeadRef,
200 EventKind: trigger.EventPullRequest,
201 EventPayload: payload,
202 ActorUserID: p.AuthorUserID,
203 TriggerEventID: fmt.Sprintf("pr_opened:%d:%s", prRow.IssueID, prRow.HeadOid),
204 Action: "opened",
205 BaseRef: prRow.BaseRef,
206 HeadRefShort: prRow.HeadRef,
207 ChangedPaths: changed,
208 }
209 if _, err := worker.Enqueue(ctx, deps.Pool, trigger.KindWorkflowTrigger, job, worker.EnqueueOptions{}); err != nil {
210 return fmt.Errorf("enqueue: %w", err)
211 }
212 return nil
213 }
214
215 // refreshCommitsAndFiles is shared by Create + Synchronize. Truncates +
216 // re-fills `pull_request_commits` and `pull_request_files`.
217 func refreshCommitsAndFiles(ctx context.Context, deps Deps, gitDir string, prID int64, baseOID, headOID string) error {
218 commits, err := repogit.CommitsBetweenDetail(ctx, gitDir, baseOID, headOID, 250)
219 if err != nil {
220 return fmt.Errorf("commits: %w", err)
221 }
222 files, err := repogit.FilesChangedBetween(ctx, gitDir, baseOID, headOID)
223 if err != nil {
224 return fmt.Errorf("files: %w", err)
225 }
226 tx, err := deps.Pool.Begin(ctx)
227 if err != nil {
228 return err
229 }
230 committed := false
231 defer func() {
232 if !committed {
233 _ = tx.Rollback(ctx)
234 }
235 }()
236 q := pullsdb.New()
237 if err := q.ClearPullRequestCommits(ctx, tx, prID); err != nil {
238 return err
239 }
240 if err := q.ClearPullRequestFiles(ctx, tx, prID); err != nil {
241 return err
242 }
243 for i, c := range commits {
244 var ats, cts pgtype.Timestamptz
245 if !c.AuthorWhen.IsZero() {
246 ats = pgtype.Timestamptz{Time: c.AuthorWhen, Valid: true}
247 }
248 if !c.CommitterWhen.IsZero() {
249 cts = pgtype.Timestamptz{Time: c.CommitterWhen, Valid: true}
250 }
251 if err := q.InsertPullRequestCommit(ctx, tx, pullsdb.InsertPullRequestCommitParams{
252 PrID: prID,
253 Sha: c.OID,
254 Position: int32(i),
255 AuthorName: c.AuthorName,
256 AuthorEmail: c.AuthorEmail,
257 CommitterName: c.CommitterName,
258 CommitterEmail: c.CommitterEmail,
259 Subject: c.Subject,
260 Body: c.Body,
261 AuthoredAt: ats,
262 CommittedAt: cts,
263 }); err != nil {
264 return err
265 }
266 }
267 for _, f := range files {
268 oldPath := pgtype.Text{}
269 if f.OldPath != "" {
270 oldPath = pgtype.Text{String: f.OldPath, Valid: true}
271 }
272 if err := q.InsertPullRequestFile(ctx, tx, pullsdb.InsertPullRequestFileParams{
273 PrID: prID,
274 Path: f.Path,
275 Status: pullsdb.PrFileStatus(f.Status),
276 OldPath: oldPath,
277 Additions: int32(f.Additions),
278 Deletions: int32(f.Deletions),
279 Changes: int32(f.Additions + f.Deletions),
280 }); err != nil {
281 return err
282 }
283 }
284 if err := q.SetPullRequestSnapshot(ctx, tx, pullsdb.SetPullRequestSnapshotParams{
285 IssueID: prID, BaseOid: baseOID, HeadOid: headOID,
286 }); err != nil {
287 return err
288 }
289 if err := tx.Commit(ctx); err != nil {
290 return err
291 }
292 committed = true
293 return nil
294 }
295
296 // Synchronize re-snapshots the PR's base/head OIDs, refreshes the
297 // commits + files lists, and emits a `synchronized` event into the
298 // issue timeline. Called from the pr:synchronize worker job after
299 // any push to the head ref.
300 func Synchronize(ctx context.Context, deps Deps, gitDir string, prID int64) error {
301 q := pullsdb.New()
302 pr, err := q.GetPullRequestByIssueID(ctx, deps.Pool, prID)
303 if err != nil {
304 if errors.Is(err, pgx.ErrNoRows) {
305 return ErrPRNotFound
306 }
307 return err
308 }
309 baseOID, err := repogit.ResolveRefOID(ctx, gitDir, pr.BaseRef)
310 if err != nil {
311 return fmt.Errorf("resolve base: %w", err)
312 }
313 headOID, err := repogit.ResolveRefOID(ctx, gitDir, pr.HeadRef)
314 if err != nil {
315 return fmt.Errorf("resolve head: %w", err)
316 }
317 if err := refreshCommitsAndFiles(ctx, deps, gitDir, prID, baseOID, headOID); err != nil {
318 return err
319 }
320 // Re-anchor review comments against the new snapshot. Comments
321 // whose original line still exists keep their thread; the rest
322 // outdate (current_position=NULL) and surface in the "Show
323 // outdated" toggle of the Files tab.
324 if err := review.RemapAllForPR(ctx, review.Deps{Pool: deps.Pool, Logger: deps.Logger}, gitDir, prID, baseOID, headOID); err != nil {
325 // Best-effort: a position-map miss shouldn't block the sync
326 // pipeline. Log + continue.
327 if deps.Logger != nil {
328 deps.Logger.WarnContext(ctx, "pulls: position remap", "error", err, "pr_id", prID)
329 }
330 }
331 // Reset mergeability to unknown so the next mergeability tick
332 // recomputes against the fresh snapshot.
333 if err := q.SetPullRequestMergeability(ctx, deps.Pool, pullsdb.SetPullRequestMergeabilityParams{
334 IssueID: prID,
335 Mergeable: pgtype.Bool{},
336 MergeableState: pullsdb.PrMergeableStateUnknown,
337 }); err != nil {
338 return fmt.Errorf("reset mergeability: %w", err)
339 }
340 // Emit the synchronized timeline event.
341 iq := issuesdb.New()
342 if _, err := iq.InsertIssueEvent(ctx, deps.Pool, issuesdb.InsertIssueEventParams{
343 IssueID: prID,
344 Kind: "synchronized",
345 Meta: []byte(fmt.Sprintf(`{"head_oid":%q}`, headOID)),
346 }); err != nil {
347 return fmt.Errorf("emit event: %w", err)
348 }
349 return nil
350 }
351
352 // Mergeability runs the merge-tree probe and persists the result.
353 // Order of state checks (highest priority first):
354 //
355 // dirty — git merge-tree reports conflicts
356 // behind — head has no commits ahead of base
357 // blocked — required reviews missing OR an undismissed
358 // request_changes review exists (S23 gate)
359 // clean — merge-tree clean and review gate satisfied
360 //
361 // `blocked` is set by the S23 review evaluator; when no protection
362 // rule applies and no request_changes review exists, the gate is a
363 // no-op and we fall through to clean.
364 func Mergeability(ctx context.Context, deps Deps, gitDir string, prID int64) error {
365 q := pullsdb.New()
366 pr, err := q.GetPullRequestByIssueID(ctx, deps.Pool, prID)
367 if err != nil {
368 return err
369 }
370 if pr.BaseOid == "" || pr.HeadOid == "" {
371 return nil // synchronize hasn't run yet; nothing to probe
372 }
373 // Behind: head has no commits ahead of base.
374 commits, err := repogit.CommitsBetweenDetail(ctx, gitDir, pr.BaseOid, pr.HeadOid, 1)
375 if err != nil && !errors.Is(err, repogit.ErrRefNotFound) {
376 return err
377 }
378 if len(commits) == 0 {
379 return q.SetPullRequestMergeability(ctx, deps.Pool, pullsdb.SetPullRequestMergeabilityParams{
380 IssueID: prID,
381 Mergeable: pgtype.Bool{Bool: false, Valid: true},
382 MergeableState: pullsdb.PrMergeableStateBehind,
383 })
384 }
385 res, err := repogit.ProbeMerge(ctx, gitDir, pr.BaseOid, pr.HeadOid)
386 if err != nil {
387 return fmt.Errorf("probe: %w", err)
388 }
389 if res.HasConflict {
390 return q.SetPullRequestMergeability(ctx, deps.Pool, pullsdb.SetPullRequestMergeabilityParams{
391 IssueID: prID,
392 Mergeable: pgtype.Bool{Bool: false, Valid: true},
393 MergeableState: pullsdb.PrMergeableStateDirty,
394 })
395 }
396 // Composed gate: review (S23) + required-checks (S24). Either one
397 // failing produces `blocked`. The two evaluators are independent;
398 // each loads its own slice of the protection rule.
399 issue, err := issuesdb.New().GetIssueByID(ctx, deps.Pool, prID)
400 if err != nil {
401 return fmt.Errorf("load issue: %w", err)
402 }
403 reviewGate, err := review.Evaluate(ctx, deps.Pool, review.GateInputs{
404 RepoID: issue.RepoID,
405 BaseRef: pr.BaseRef,
406 PRIssueID: prID,
407 }, int64FromPg(issue.AuthorUserID))
408 if err != nil {
409 return fmt.Errorf("review gate: %w", err)
410 }
411 requiredCheckNames, err := loadRequiredCheckNames(ctx, deps.Pool, issue.RepoID, pr.BaseRef)
412 if err != nil {
413 return fmt.Errorf("required-check rule lookup: %w", err)
414 }
415 checksGate, err := checks.EvaluateRequiredChecks(ctx, deps.Pool, checks.GateInputs{
416 RepoID: issue.RepoID,
417 HeadSHA: pr.HeadOid,
418 RequiredNames: requiredCheckNames,
419 })
420 if err != nil {
421 return fmt.Errorf("checks gate: %w", err)
422 }
423 state := pullsdb.PrMergeableStateClean
424 mergeable := true
425 if !reviewGate.Satisfied || !checksGate.Satisfied {
426 state = pullsdb.PrMergeableStateBlocked
427 mergeable = false
428 }
429 return q.SetPullRequestMergeability(ctx, deps.Pool, pullsdb.SetPullRequestMergeabilityParams{
430 IssueID: prID,
431 Mergeable: pgtype.Bool{Bool: mergeable, Valid: true},
432 MergeableState: state,
433 })
434 }
435
436 // loadRequiredCheckNames returns the `status_checks_required` list
437 // from the longest-pattern-matching protection rule for `baseRef`.
438 // Empty slice means no rule, no required checks.
439 func loadRequiredCheckNames(ctx context.Context, pool *pgxpool.Pool, repoID int64, baseRef string) ([]string, error) {
440 rules, err := reposdb.New().ListBranchProtectionRules(ctx, pool, repoID)
441 if err != nil {
442 return nil, err
443 }
444 rule, ok := protection.MatchLongestRule(rules, baseRef)
445 if !ok {
446 return []string{}, nil
447 }
448 return rule.StatusChecksRequired, nil
449 }
450
451 // int64FromPg unwraps a pgtype.Int8; returns 0 when invalid.
452 func int64FromPg(p pgtype.Int8) int64 {
453 if !p.Valid {
454 return 0
455 }
456 return p.Int64
457 }
458
459 // EditPR updates the PR's title + body. Body markdown is re-rendered
460 // via the same pipeline issues.Create uses so HTML is consistent.
461 func EditPR(ctx context.Context, deps Deps, prID int64, title, body string) error {
462 title = strings.TrimSpace(title)
463 if title == "" {
464 return issues.ErrEmptyTitle
465 }
466 if len(title) > 256 {
467 return issues.ErrTitleTooLong
468 }
469 if len(body) > 65535 {
470 return issues.ErrBodyTooLong
471 }
472 html := renderBodyHTML(ctx, deps, body)
473 q := issuesdb.New()
474 return q.UpdateIssueTitleBody(ctx, deps.Pool, issuesdb.UpdateIssueTitleBodyParams{
475 ID: prID,
476 Title: title,
477 Body: body,
478 BodyHtmlCached: pgtype.Text{String: html, Valid: html != ""},
479 })
480 }
481
482 // SetReady flips draft → false and emits a `ready_for_review` event.
483 func SetReady(ctx context.Context, deps Deps, actorUserID, prID int64) error {
484 q := pullsdb.New()
485 tx, err := deps.Pool.Begin(ctx)
486 if err != nil {
487 return err
488 }
489 committed := false
490 defer func() {
491 if !committed {
492 _ = tx.Rollback(ctx)
493 }
494 }()
495 if err := q.SetPullRequestDraft(ctx, tx, pullsdb.SetPullRequestDraftParams{IssueID: prID, Draft: false}); err != nil {
496 return err
497 }
498 iq := issuesdb.New()
499 if _, err := iq.InsertIssueEvent(ctx, tx, issuesdb.InsertIssueEventParams{
500 IssueID: prID,
501 ActorUserID: pgtype.Int8{Int64: actorUserID, Valid: actorUserID != 0},
502 Kind: "ready_for_review",
503 Meta: []byte("{}"),
504 }); err != nil {
505 return err
506 }
507 if err := tx.Commit(ctx); err != nil {
508 return err
509 }
510 committed = true
511 return nil
512 }
513
514 // AllowedMethod returns true when the repo allows the named merge
515 // strategy. Falls open for unknown methods so callers get a clear
516 // error from the orchestrator.
517 func AllowedMethod(repo reposdb.Repo, method string) bool {
518 switch method {
519 case "merge":
520 return repo.AllowMergeCommit
521 case "squash":
522 return repo.AllowSquashMerge
523 case "rebase":
524 return repo.AllowRebaseMerge
525 }
526 return false
527 }
528
529 // renderBodyHTML wraps markdown.RenderHTML with a logger-aware error
530 // path. PR body length is bounded upstream at 65535 chars by the
531 // orchestrator; markdown caps at 1 MiB. ErrInputTooLarge here means
532 // a precondition regressed — log loudly. (S00-S25 audit, M.)
533 func renderBodyHTML(ctx context.Context, deps Deps, body string) string {
534 html, err := mdrender.RenderHTML([]byte(body))
535 if err != nil && deps.Logger != nil {
536 deps.Logger.WarnContext(ctx, "pulls: markdown render failed",
537 "error", err, "body_bytes", len(body))
538 }
539 return html
540 }
541