Go · 18980 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 // Package pulls owns the pull-request orchestrator. PRs reuse the S21
4 // `issues` row for title/body/state/timeline; this package owns the
5 // PR-specific surface — opening, synchronizing, mergeability detection,
6 // merge execution.
7 //
8 // Entry points are:
9 //
10 // Create — opens a PR (creates the issue row + the pull_requests row)
11 // Synchronize — refreshes commit + file lists + emits a synchronized event
12 // Mergeability — recomputes mergeable / mergeable_state via merge-tree
13 // Merge — performs the requested merge strategy in a temp worktree
14 // Edit — title/body
15 // SetState — close / reopen
16 // SetReady — draft → ready
17 package pulls
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "log/slog"
24 "strings"
25
26 "github.com/jackc/pgx/v5"
27 "github.com/jackc/pgx/v5/pgtype"
28 "github.com/jackc/pgx/v5/pgxpool"
29
30 actionsevent "github.com/tenseleyFlow/shithub/internal/actions/event"
31 "github.com/tenseleyFlow/shithub/internal/actions/trigger"
32 "github.com/tenseleyFlow/shithub/internal/auth/audit"
33 "github.com/tenseleyFlow/shithub/internal/checks"
34 "github.com/tenseleyFlow/shithub/internal/issues"
35 issuesdb "github.com/tenseleyFlow/shithub/internal/issues/sqlc"
36 mdrender "github.com/tenseleyFlow/shithub/internal/markdown"
37 "github.com/tenseleyFlow/shithub/internal/pulls/review"
38 pullsdb "github.com/tenseleyFlow/shithub/internal/pulls/sqlc"
39 repogit "github.com/tenseleyFlow/shithub/internal/repos/git"
40 "github.com/tenseleyFlow/shithub/internal/repos/protection"
41 reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
42 usersdb "github.com/tenseleyFlow/shithub/internal/users/sqlc"
43 "github.com/tenseleyFlow/shithub/internal/worker"
44 )
45
46 // Deps wires this package against the rest of the runtime.
47 type Deps struct {
48 Pool *pgxpool.Pool
49 Logger *slog.Logger
50 // Audit is optional; when non-nil, Merge and SetState write audit
51 // rows. Mirrors the issues orchestrator's contract so PR-side
52 // state changes are equally traceable (S00-S25 audit, M).
53 Audit *audit.Recorder
54 }
55
56 // Errors surfaced to handlers.
57 var (
58 ErrSameBranch = errors.New("pulls: base and head must differ")
59 ErrBaseNotFound = errors.New("pulls: base ref not found")
60 ErrHeadNotFound = errors.New("pulls: head ref not found")
61 ErrNoCommitsToMerge = errors.New("pulls: head has no commits ahead of base")
62 ErrAlreadyMerged = errors.New("pulls: already merged")
63 ErrAlreadyClosed = errors.New("pulls: already closed")
64 ErrMergeBlocked = errors.New("pulls: merge blocked (mergeable_state != clean)")
65 ErrMergeMethodOff = errors.New("pulls: requested merge method is disabled on this repo")
66 ErrConcurrentMerge = errors.New("pulls: PR is being merged by another request")
67 ErrPRNotFound = errors.New("pulls: PR not found")
68 )
69
70 // CreateParams describes a new-PR request.
71 type CreateParams struct {
72 RepoID int64
73 AuthorUserID int64
74 Title string
75 Body string
76 BaseRef string
77 HeadRef string
78 Draft bool
79 GitDir string // resolved from RepoFS by the caller
80 }
81
82 // CreateResult bundles the issue row + the PR row (post-snapshot).
83 type CreateResult struct {
84 Issue issuesdb.Issue
85 PullRequest pullsdb.PullRequest
86 }
87
88 // Create opens a PR. Validates that base/head are distinct and resolve
89 // in the on-disk repo, snapshots their OIDs, then creates the issues
90 // row + pull_requests row in one tx. Mergeability is `unknown` until
91 // the worker job ticks.
92 func Create(ctx context.Context, deps Deps, p CreateParams) (CreateResult, error) {
93 base := strings.TrimSpace(p.BaseRef)
94 head := strings.TrimSpace(p.HeadRef)
95 if base == "" || head == "" || base == head {
96 return CreateResult{}, ErrSameBranch
97 }
98 baseOID, err := repogit.ResolveRefOID(ctx, p.GitDir, base)
99 if err != nil {
100 if errors.Is(err, repogit.ErrRefNotFound) {
101 return CreateResult{}, ErrBaseNotFound
102 }
103 return CreateResult{}, fmt.Errorf("resolve base: %w", err)
104 }
105 headOID, err := repogit.ResolveRefOID(ctx, p.GitDir, head)
106 if err != nil {
107 if errors.Is(err, repogit.ErrRefNotFound) {
108 return CreateResult{}, ErrHeadNotFound
109 }
110 return CreateResult{}, fmt.Errorf("resolve head: %w", err)
111 }
112 if baseOID == headOID {
113 return CreateResult{}, ErrNoCommitsToMerge
114 }
115
116 // Open the issues row first via the issues orchestrator so we get
117 // the per-repo number allocation, body markdown render, and
118 // reference indexing for free.
119 issueRow, err := issues.Create(ctx, issues.Deps{Pool: deps.Pool, Logger: deps.Logger}, issues.CreateParams{
120 RepoID: p.RepoID,
121 AuthorUserID: p.AuthorUserID,
122 Title: p.Title,
123 Body: p.Body,
124 Kind: "pr",
125 })
126 if err != nil {
127 return CreateResult{}, err
128 }
129
130 prRow, err := pullsdb.New().CreatePullRequest(ctx, deps.Pool, pullsdb.CreatePullRequestParams{
131 IssueID: issueRow.ID,
132 BaseRef: base,
133 HeadRef: head,
134 HeadRepoID: p.RepoID,
135 BaseOid: baseOID,
136 HeadOid: headOID,
137 Draft: p.Draft,
138 })
139 if err != nil {
140 return CreateResult{}, fmt.Errorf("create pull_request: %w", err)
141 }
142
143 // Best-effort initial synchronize so the PR view has commits + files
144 // even before the worker queue runs. Failures here don't fail the
145 // open — the worker will retry on the next tick.
146 if err := refreshCommitsAndFiles(ctx, deps, p.GitDir, prRow.IssueID, baseOID, headOID); err != nil {
147 if deps.Logger != nil {
148 deps.Logger.WarnContext(ctx, "pulls: initial sync", "error", err, "pr_id", prRow.IssueID)
149 }
150 }
151
152 // Actions trigger (S41b): on PR open, fan out a workflow:trigger
153 // with action="opened". Best-effort — failures log and let the
154 // PR creation succeed. The collaborator gate lives in the PR
155 // trigger helper (pr_jobs.go); this site just enqueues with
156 // enough payload for the handler to evaluate.
157 if err := enqueueOpenedActionsTrigger(ctx, deps, p, prRow, issueRow.Number, baseOID, headOID); err != nil {
158 if deps.Logger != nil {
159 deps.Logger.WarnContext(ctx, "pulls: enqueue workflow:trigger",
160 "error", err, "pr_id", prRow.IssueID)
161 }
162 }
163
164 return CreateResult{Issue: issueRow, PullRequest: prRow}, nil
165 }
166
167 // enqueueOpenedActionsTrigger is the PR-create-side counterpart to
168 // jobs.enqueuePRActionsTrigger (which handles synchronize). Lives
169 // here in the pulls orchestrator so the open path stays
170 // self-contained — the alternative (a domain_event watcher in the
171 // jobs package) would need to round-trip through the queue just to
172 // observe the open.
173 //
174 // Collaborator gate: actor must be the repo's owning user. Same v1
175 // posture as the synchronize path.
176 func enqueueOpenedActionsTrigger(ctx context.Context, deps Deps, p CreateParams, prRow pullsdb.PullRequest, prNumber int64, baseOID, headOID string) error {
177 repo, err := reposdb.New().GetRepoByID(ctx, deps.Pool, p.RepoID)
178 if err != nil {
179 return fmt.Errorf("load repo: %w", err)
180 }
181 if !repo.OwnerUserID.Valid || repo.OwnerUserID.Int64 != p.AuthorUserID {
182 // Conservative collaborator gate — non-owner authors don't
183 // trigger. External-PR + org-member triggers parked for v2.
184 return nil
185 }
186 changed, err := repogit.ChangedPaths(ctx, p.GitDir, baseOID, headOID)
187 if err != nil {
188 changed = nil // best-effort; path-filtered workflows skip
189 }
190 authorLogin := ""
191 if u, err := usersdb.New().GetUserByID(ctx, deps.Pool, p.AuthorUserID); err == nil {
192 authorLogin = u.Username
193 }
194 payload := actionsevent.PullRequest(
195 "opened", prNumber, p.Title,
196 actionsevent.PRRef{Ref: prRow.HeadRef, SHA: prRow.HeadOid},
197 actionsevent.PRRef{Ref: prRow.BaseRef, SHA: prRow.BaseOid},
198 authorLogin,
199 )
200 job := trigger.JobPayload{
201 RepoID: p.RepoID,
202 HeadSHA: prRow.HeadOid,
203 HeadRef: "refs/heads/" + prRow.HeadRef,
204 EventKind: trigger.EventPullRequest,
205 EventPayload: payload,
206 ActorUserID: p.AuthorUserID,
207 TriggerEventID: fmt.Sprintf("pr_opened:%d:%s", prRow.IssueID, prRow.HeadOid),
208 Action: "opened",
209 BaseRef: prRow.BaseRef,
210 HeadRefShort: prRow.HeadRef,
211 ChangedPaths: changed,
212 }
213 if _, err := worker.Enqueue(ctx, deps.Pool, trigger.KindWorkflowTrigger, job, worker.EnqueueOptions{}); err != nil {
214 return fmt.Errorf("enqueue: %w", err)
215 }
216 return nil
217 }
218
219 // refreshCommitsAndFiles is shared by Create + Synchronize. Truncates +
220 // re-fills `pull_request_commits` and `pull_request_files`.
221 func refreshCommitsAndFiles(ctx context.Context, deps Deps, gitDir string, prID int64, baseOID, headOID string) error {
222 commits, err := repogit.CommitsBetweenDetail(ctx, gitDir, baseOID, headOID, 250)
223 if err != nil {
224 return fmt.Errorf("commits: %w", err)
225 }
226 files, err := repogit.FilesChangedBetween(ctx, gitDir, baseOID, headOID)
227 if err != nil {
228 return fmt.Errorf("files: %w", err)
229 }
230 tx, err := deps.Pool.Begin(ctx)
231 if err != nil {
232 return err
233 }
234 committed := false
235 defer func() {
236 if !committed {
237 _ = tx.Rollback(ctx)
238 }
239 }()
240 q := pullsdb.New()
241 if err := q.ClearPullRequestCommits(ctx, tx, prID); err != nil {
242 return err
243 }
244 if err := q.ClearPullRequestFiles(ctx, tx, prID); err != nil {
245 return err
246 }
247 for i, c := range commits {
248 var ats, cts pgtype.Timestamptz
249 if !c.AuthorWhen.IsZero() {
250 ats = pgtype.Timestamptz{Time: c.AuthorWhen, Valid: true}
251 }
252 if !c.CommitterWhen.IsZero() {
253 cts = pgtype.Timestamptz{Time: c.CommitterWhen, Valid: true}
254 }
255 if err := q.InsertPullRequestCommit(ctx, tx, pullsdb.InsertPullRequestCommitParams{
256 PrID: prID,
257 Sha: c.OID,
258 Position: int32(i),
259 AuthorName: c.AuthorName,
260 AuthorEmail: c.AuthorEmail,
261 CommitterName: c.CommitterName,
262 CommitterEmail: c.CommitterEmail,
263 Subject: c.Subject,
264 Body: c.Body,
265 AuthoredAt: ats,
266 CommittedAt: cts,
267 }); err != nil {
268 return err
269 }
270 }
271 for _, f := range files {
272 oldPath := pgtype.Text{}
273 if f.OldPath != "" {
274 oldPath = pgtype.Text{String: f.OldPath, Valid: true}
275 }
276 if err := q.InsertPullRequestFile(ctx, tx, pullsdb.InsertPullRequestFileParams{
277 PrID: prID,
278 Path: f.Path,
279 Status: pullsdb.PrFileStatus(f.Status),
280 OldPath: oldPath,
281 Additions: int32(f.Additions),
282 Deletions: int32(f.Deletions),
283 Changes: int32(f.Additions + f.Deletions),
284 }); err != nil {
285 return err
286 }
287 }
288 if err := q.SetPullRequestSnapshot(ctx, tx, pullsdb.SetPullRequestSnapshotParams{
289 IssueID: prID, BaseOid: baseOID, HeadOid: headOID,
290 }); err != nil {
291 return err
292 }
293 if err := tx.Commit(ctx); err != nil {
294 return err
295 }
296 committed = true
297 return nil
298 }
299
300 // Synchronize re-snapshots the PR's base/head OIDs, refreshes the
301 // commits + files lists, and emits a `synchronized` event into the
302 // issue timeline. Called from the pr:synchronize worker job after
303 // any push to the head ref.
304 func Synchronize(ctx context.Context, deps Deps, gitDir string, prID int64) error {
305 q := pullsdb.New()
306 pr, err := q.GetPullRequestByIssueID(ctx, deps.Pool, prID)
307 if err != nil {
308 if errors.Is(err, pgx.ErrNoRows) {
309 return ErrPRNotFound
310 }
311 return err
312 }
313 baseOID, err := repogit.ResolveRefOID(ctx, gitDir, pr.BaseRef)
314 if err != nil {
315 return fmt.Errorf("resolve base: %w", err)
316 }
317 headOID, err := repogit.ResolveRefOID(ctx, gitDir, pr.HeadRef)
318 if err != nil {
319 return fmt.Errorf("resolve head: %w", err)
320 }
321 if err := refreshCommitsAndFiles(ctx, deps, gitDir, prID, baseOID, headOID); err != nil {
322 return err
323 }
324 // Re-anchor review comments against the new snapshot. Comments
325 // whose original line still exists keep their thread; the rest
326 // outdate (current_position=NULL) and surface in the "Show
327 // outdated" toggle of the Files tab.
328 if err := review.RemapAllForPR(ctx, review.Deps{Pool: deps.Pool, Logger: deps.Logger}, gitDir, prID, baseOID, headOID); err != nil {
329 // Best-effort: a position-map miss shouldn't block the sync
330 // pipeline. Log + continue.
331 if deps.Logger != nil {
332 deps.Logger.WarnContext(ctx, "pulls: position remap", "error", err, "pr_id", prID)
333 }
334 }
335 // Reset mergeability to unknown so the next mergeability tick
336 // recomputes against the fresh snapshot.
337 if err := q.SetPullRequestMergeability(ctx, deps.Pool, pullsdb.SetPullRequestMergeabilityParams{
338 IssueID: prID,
339 Mergeable: pgtype.Bool{},
340 MergeableState: pullsdb.PrMergeableStateUnknown,
341 }); err != nil {
342 return fmt.Errorf("reset mergeability: %w", err)
343 }
344 // Emit the synchronized timeline event.
345 iq := issuesdb.New()
346 if _, err := iq.InsertIssueEvent(ctx, deps.Pool, issuesdb.InsertIssueEventParams{
347 IssueID: prID,
348 Kind: "synchronized",
349 Meta: []byte(fmt.Sprintf(`{"head_oid":%q}`, headOID)),
350 }); err != nil {
351 return fmt.Errorf("emit event: %w", err)
352 }
353 return nil
354 }
355
356 // Mergeability runs the merge-tree probe and persists the result.
357 // Order of state checks (highest priority first):
358 //
359 // dirty — git merge-tree reports conflicts
360 // behind — head has no commits ahead of base
361 // blocked — required reviews missing OR an undismissed
362 // request_changes review exists (S23 gate)
363 // clean — merge-tree clean and review gate satisfied
364 //
365 // `blocked` is set by the S23 review evaluator; when no protection
366 // rule applies and no request_changes review exists, the gate is a
367 // no-op and we fall through to clean.
368 func Mergeability(ctx context.Context, deps Deps, gitDir string, prID int64) error {
369 q := pullsdb.New()
370 pr, err := q.GetPullRequestByIssueID(ctx, deps.Pool, prID)
371 if err != nil {
372 return err
373 }
374 if pr.BaseOid == "" || pr.HeadOid == "" {
375 return nil // synchronize hasn't run yet; nothing to probe
376 }
377 // Behind: head has no commits ahead of base.
378 commits, err := repogit.CommitsBetweenDetail(ctx, gitDir, pr.BaseOid, pr.HeadOid, 1)
379 if err != nil && !errors.Is(err, repogit.ErrRefNotFound) {
380 return err
381 }
382 if len(commits) == 0 {
383 return q.SetPullRequestMergeability(ctx, deps.Pool, pullsdb.SetPullRequestMergeabilityParams{
384 IssueID: prID,
385 Mergeable: pgtype.Bool{Bool: false, Valid: true},
386 MergeableState: pullsdb.PrMergeableStateBehind,
387 })
388 }
389 res, err := repogit.ProbeMerge(ctx, gitDir, pr.BaseOid, pr.HeadOid)
390 if err != nil {
391 return fmt.Errorf("probe: %w", err)
392 }
393 if res.HasConflict {
394 return q.SetPullRequestMergeability(ctx, deps.Pool, pullsdb.SetPullRequestMergeabilityParams{
395 IssueID: prID,
396 Mergeable: pgtype.Bool{Bool: false, Valid: true},
397 MergeableState: pullsdb.PrMergeableStateDirty,
398 })
399 }
400 // Composed gate: review (S23) + required-checks (S24). Either one
401 // failing produces `blocked`. The two evaluators are independent;
402 // each loads its own slice of the protection rule.
403 issue, err := issuesdb.New().GetIssueByID(ctx, deps.Pool, prID)
404 if err != nil {
405 return fmt.Errorf("load issue: %w", err)
406 }
407 reviewGate, err := review.Evaluate(ctx, deps.Pool, review.GateInputs{
408 RepoID: issue.RepoID,
409 BaseRef: pr.BaseRef,
410 PRIssueID: prID,
411 }, int64FromPg(issue.AuthorUserID))
412 if err != nil {
413 return fmt.Errorf("review gate: %w", err)
414 }
415 requiredCheckNames, err := loadRequiredCheckNames(ctx, deps.Pool, issue.RepoID, pr.BaseRef)
416 if err != nil {
417 return fmt.Errorf("required-check rule lookup: %w", err)
418 }
419 checksGate, err := checks.EvaluateRequiredChecks(ctx, deps.Pool, checks.GateInputs{
420 RepoID: issue.RepoID,
421 HeadSHA: pr.HeadOid,
422 RequiredNames: requiredCheckNames,
423 })
424 if err != nil {
425 return fmt.Errorf("checks gate: %w", err)
426 }
427 state := pullsdb.PrMergeableStateClean
428 mergeable := true
429 if !reviewGate.Satisfied || !checksGate.Satisfied {
430 state = pullsdb.PrMergeableStateBlocked
431 mergeable = false
432 }
433 return q.SetPullRequestMergeability(ctx, deps.Pool, pullsdb.SetPullRequestMergeabilityParams{
434 IssueID: prID,
435 Mergeable: pgtype.Bool{Bool: mergeable, Valid: true},
436 MergeableState: state,
437 })
438 }
439
440 // loadRequiredCheckNames returns the `status_checks_required` list
441 // from the longest-pattern-matching protection rule for `baseRef`.
442 // Empty slice means no rule, no required checks.
443 func loadRequiredCheckNames(ctx context.Context, pool *pgxpool.Pool, repoID int64, baseRef string) ([]string, error) {
444 rules, err := reposdb.New().ListBranchProtectionRules(ctx, pool, repoID)
445 if err != nil {
446 return nil, err
447 }
448 rule, ok := protection.MatchLongestRule(rules, baseRef)
449 if !ok {
450 return []string{}, nil
451 }
452 return rule.StatusChecksRequired, nil
453 }
454
455 // int64FromPg unwraps a pgtype.Int8; returns 0 when invalid.
456 func int64FromPg(p pgtype.Int8) int64 {
457 if !p.Valid {
458 return 0
459 }
460 return p.Int64
461 }
462
463 // EditPR updates the PR's title + body. Body markdown is re-rendered
464 // via the same pipeline issues.Create uses so HTML is consistent.
465 func EditPR(ctx context.Context, deps Deps, prID int64, title, body string) error {
466 title = strings.TrimSpace(title)
467 if title == "" {
468 return issues.ErrEmptyTitle
469 }
470 if len(title) > 256 {
471 return issues.ErrTitleTooLong
472 }
473 if len(body) > 65535 {
474 return issues.ErrBodyTooLong
475 }
476 html := renderBodyHTML(ctx, deps, body)
477 q := issuesdb.New()
478 return q.UpdateIssueTitleBody(ctx, deps.Pool, issuesdb.UpdateIssueTitleBodyParams{
479 ID: prID,
480 Title: title,
481 Body: body,
482 BodyHtmlCached: pgtype.Text{String: html, Valid: html != ""},
483 })
484 }
485
486 // SetReady flips draft → false and emits a `ready_for_review` event.
487 func SetReady(ctx context.Context, deps Deps, actorUserID, prID int64) error {
488 q := pullsdb.New()
489 tx, err := deps.Pool.Begin(ctx)
490 if err != nil {
491 return err
492 }
493 committed := false
494 defer func() {
495 if !committed {
496 _ = tx.Rollback(ctx)
497 }
498 }()
499 if err := q.SetPullRequestDraft(ctx, tx, pullsdb.SetPullRequestDraftParams{IssueID: prID, Draft: false}); err != nil {
500 return err
501 }
502 iq := issuesdb.New()
503 if _, err := iq.InsertIssueEvent(ctx, tx, issuesdb.InsertIssueEventParams{
504 IssueID: prID,
505 ActorUserID: pgtype.Int8{Int64: actorUserID, Valid: actorUserID != 0},
506 Kind: "ready_for_review",
507 Meta: []byte("{}"),
508 }); err != nil {
509 return err
510 }
511 if err := tx.Commit(ctx); err != nil {
512 return err
513 }
514 committed = true
515 return nil
516 }
517
518 // AllowedMethod returns true when the repo allows the named merge
519 // strategy. Falls open for unknown methods so callers get a clear
520 // error from the orchestrator.
521 func AllowedMethod(repo reposdb.Repo, method string) bool {
522 switch method {
523 case "merge":
524 return repo.AllowMergeCommit
525 case "squash":
526 return repo.AllowSquashMerge
527 case "rebase":
528 return repo.AllowRebaseMerge
529 }
530 return false
531 }
532
533 // renderBodyHTML wraps markdown.RenderHTML with a logger-aware error
534 // path. PR body length is bounded upstream at 65535 chars by the
535 // orchestrator; markdown caps at 1 MiB. ErrInputTooLarge here means
536 // a precondition regressed — log loudly. (S00-S25 audit, M.)
537 func renderBodyHTML(ctx context.Context, deps Deps, body string) string {
538 html, err := mdrender.RenderHTML([]byte(body))
539 if err != nil && deps.Logger != nil {
540 deps.Logger.WarnContext(ctx, "pulls: markdown render failed",
541 "error", err, "body_bytes", len(body))
542 }
543 return html
544 }
545