tenseleyflow/shithub / b4f92ba

Browse files

S22: pr:synchronize/mergeability/merge worker jobs + push:process fan-out

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
b4f92ba4de920981587080d0f9a663fa58bad1e9
Parents
a943129
Tree
7cbe95b

4 changed files

StatusFile+-
M cmd/shithubd/worker.go 4 0
A internal/worker/jobs/pr_jobs.go 147 0
M internal/worker/jobs/push_process.go 23 0
M internal/worker/types.go 9 0
cmd/shithubd/worker.gomodified
@@ -92,6 +92,10 @@ var workerCmd = &cobra.Command{
9292
 		p.Register(worker.KindLifecycleSweep, jobs.LifecycleSweep(jobs.LifecycleSweepDeps{
9393
 			Pool: pool, RepoFS: rfs, Audit: auditRecorder(), Logger: logger,
9494
 		}))
95
+		prDeps := jobs.PRJobsDeps{Pool: pool, RepoFS: rfs, Logger: logger}
96
+		p.Register(worker.KindPRSynchronize, jobs.PRSynchronize(prDeps))
97
+		p.Register(worker.KindPRMergeability, jobs.PRMergeability(prDeps))
98
+		p.Register(worker.KindPRMerge, jobs.PRMerge(prDeps))
9599
 
96100
 		return p.Run(ctx)
97101
 	},
internal/worker/jobs/pr_jobs.goadded
@@ -0,0 +1,147 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package jobs
4
+
5
+import (
6
+	"context"
7
+	"encoding/json"
8
+	"errors"
9
+	"fmt"
10
+	"log/slog"
11
+
12
+	"github.com/jackc/pgx/v5"
13
+	"github.com/jackc/pgx/v5/pgxpool"
14
+
15
+	"github.com/tenseleyFlow/shithub/internal/infra/storage"
16
+	"github.com/tenseleyFlow/shithub/internal/pulls"
17
+	pullsdb "github.com/tenseleyFlow/shithub/internal/pulls/sqlc"
18
+	reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
19
+	usersdb "github.com/tenseleyFlow/shithub/internal/users/sqlc"
20
+	"github.com/tenseleyFlow/shithub/internal/worker"
21
+)
22
+
23
+// PRJobsDeps are shared by the three PR job handlers.
24
+type PRJobsDeps struct {
25
+	Pool   *pgxpool.Pool
26
+	RepoFS *storage.RepoFS
27
+	Logger *slog.Logger
28
+}
29
+
30
+// PRSynchronizePayload — enqueued from push:process for any open PR
31
+// whose head_repo_id+head_ref match the pushed ref.
32
+type PRSynchronizePayload struct {
33
+	PRID int64 `json:"pr_id"`
34
+}
35
+
36
+// PRSynchronize refreshes commits + files for a PR and queues a
37
+// mergeability recompute.
38
+func PRSynchronize(deps PRJobsDeps) worker.Handler {
39
+	return func(ctx context.Context, raw json.RawMessage) error {
40
+		var p PRSynchronizePayload
41
+		if err := json.Unmarshal(raw, &p); err != nil {
42
+			return worker.PoisonError(fmt.Errorf("bad payload: %w", err))
43
+		}
44
+		if p.PRID == 0 {
45
+			return worker.PoisonError(errors.New("missing pr_id"))
46
+		}
47
+		gitDir, err := resolveGitDirForPR(ctx, deps.Pool, deps.RepoFS, p.PRID)
48
+		if err != nil {
49
+			return err
50
+		}
51
+		if err := pulls.Synchronize(ctx, pulls.Deps{Pool: deps.Pool, Logger: deps.Logger}, gitDir, p.PRID); err != nil {
52
+			return err
53
+		}
54
+		// Chain a mergeability tick.
55
+		if _, err := worker.Enqueue(ctx, deps.Pool, worker.KindPRMergeability,
56
+			map[string]any{"pr_id": p.PRID}, worker.EnqueueOptions{}); err != nil {
57
+			deps.Logger.WarnContext(ctx, "pr:synchronize: enqueue mergeability", "pr_id", p.PRID, "error", err)
58
+		}
59
+		_ = worker.Notify(ctx, deps.Pool)
60
+		return nil
61
+	}
62
+}
63
+
64
+// PRMergeabilityPayload — enqueued from synchronize and from PR
65
+// open. Recomputes the merge-tree probe.
66
+type PRMergeabilityPayload struct {
67
+	PRID int64 `json:"pr_id"`
68
+}
69
+
70
+// PRMergeability runs `git merge-tree --write-tree` and persists the
71
+// result.
72
+func PRMergeability(deps PRJobsDeps) worker.Handler {
73
+	return func(ctx context.Context, raw json.RawMessage) error {
74
+		var p PRMergeabilityPayload
75
+		if err := json.Unmarshal(raw, &p); err != nil {
76
+			return worker.PoisonError(fmt.Errorf("bad payload: %w", err))
77
+		}
78
+		if p.PRID == 0 {
79
+			return worker.PoisonError(errors.New("missing pr_id"))
80
+		}
81
+		gitDir, err := resolveGitDirForPR(ctx, deps.Pool, deps.RepoFS, p.PRID)
82
+		if err != nil {
83
+			return err
84
+		}
85
+		return pulls.Mergeability(ctx, pulls.Deps{Pool: deps.Pool, Logger: deps.Logger}, gitDir, p.PRID)
86
+	}
87
+}
88
+
89
+// PRMergePayload — enqueued from the POST .../merge handler. Contains
90
+// the actor + chosen method + optional subject/body overrides.
91
+type PRMergePayload struct {
92
+	PRID        int64  `json:"pr_id"`
93
+	ActorUserID int64  `json:"actor_user_id"`
94
+	Method      string `json:"method"`
95
+	Subject     string `json:"subject,omitempty"`
96
+	Body        string `json:"body,omitempty"`
97
+}
98
+
99
+// PRMerge performs the requested merge strategy and updates DB state.
100
+func PRMerge(deps PRJobsDeps) worker.Handler {
101
+	return func(ctx context.Context, raw json.RawMessage) error {
102
+		var p PRMergePayload
103
+		if err := json.Unmarshal(raw, &p); err != nil {
104
+			return worker.PoisonError(fmt.Errorf("bad payload: %w", err))
105
+		}
106
+		if p.PRID == 0 || p.Method == "" {
107
+			return worker.PoisonError(errors.New("missing pr_id or method"))
108
+		}
109
+		gitDir, err := resolveGitDirForPR(ctx, deps.Pool, deps.RepoFS, p.PRID)
110
+		if err != nil {
111
+			return err
112
+		}
113
+		return pulls.Merge(ctx, pulls.Deps{Pool: deps.Pool, Logger: deps.Logger}, pulls.MergeParams{
114
+			PRID:        p.PRID,
115
+			ActorUserID: p.ActorUserID,
116
+			GitDir:      gitDir,
117
+			Method:      p.Method,
118
+			Subject:     p.Subject,
119
+			Body:        p.Body,
120
+		})
121
+	}
122
+}
123
+
124
+// resolveGitDirForPR turns a PR id into the bare-repo path on disk
125
+// using the repo + owner-user lookups. Cached per-call only; the
126
+// caller's job-level context bounds the work.
127
+func resolveGitDirForPR(ctx context.Context, pool *pgxpool.Pool, rfs *storage.RepoFS, prID int64) (string, error) {
128
+	pr, err := pullsdb.New().GetPullRequestByIssueID(ctx, pool, prID)
129
+	if err != nil {
130
+		if errors.Is(err, pgx.ErrNoRows) {
131
+			return "", worker.PoisonError(fmt.Errorf("pr %d not found", prID))
132
+		}
133
+		return "", err
134
+	}
135
+	repo, err := reposdb.New().GetRepoByID(ctx, pool, pr.HeadRepoID)
136
+	if err != nil {
137
+		return "", err
138
+	}
139
+	if !repo.OwnerUserID.Valid {
140
+		return "", worker.PoisonError(fmt.Errorf("pr %d: repo has no owner user (org owners not yet supported here)", prID))
141
+	}
142
+	owner, err := usersdb.New().GetUserByID(ctx, pool, repo.OwnerUserID.Int64)
143
+	if err != nil {
144
+		return "", err
145
+	}
146
+	return rfs.RepoPath(owner.Username, repo.Name)
147
+}
internal/worker/jobs/push_process.gomodified
@@ -17,6 +17,7 @@ import (
1717
 	"github.com/jackc/pgx/v5/pgxpool"
1818
 
1919
 	"github.com/tenseleyFlow/shithub/internal/infra/storage"
20
+	pullsdb "github.com/tenseleyFlow/shithub/internal/pulls/sqlc"
2021
 	reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
2122
 	"github.com/tenseleyFlow/shithub/internal/worker"
2223
 	workerdb "github.com/tenseleyFlow/shithub/internal/worker/sqlc"
@@ -123,6 +124,28 @@ func PushProcess(deps PushProcessDeps) worker.Handler {
123124
 			return fmt.Errorf("insert webhook pending: %w", err)
124125
 		}
125126
 
127
+		// 4b: PR auto-synchronize. For any open PR whose head ref
128
+		// matches the pushed ref, fan out a pr:synchronize job.
129
+		// Best-effort — sync failures don't block the push pipeline.
130
+		if strings.HasPrefix(event.Ref, refPrefix) {
131
+			pq := pullsdb.New()
132
+			prIDs, err := pq.ListOpenPRsForHeadRef(ctx, deps.Pool, pullsdb.ListOpenPRsForHeadRefParams{
133
+				HeadRepoID: event.RepoID,
134
+				HeadRef:    event.Ref[len(refPrefix):],
135
+			})
136
+			if err != nil {
137
+				deps.Logger.WarnContext(ctx, "push:process: list PRs for sync",
138
+					"push_event_id", event.ID, "error", err)
139
+			}
140
+			for _, prID := range prIDs {
141
+				if _, err := worker.Enqueue(ctx, deps.Pool, worker.KindPRSynchronize,
142
+					map[string]any{"pr_id": prID}, worker.EnqueueOptions{}); err != nil {
143
+					deps.Logger.WarnContext(ctx, "push:process: enqueue pr:synchronize",
144
+						"pr_id", prID, "push_event_id", event.ID, "error", err)
145
+				}
146
+			}
147
+		}
148
+
126149
 		// 5: mark processed last so a partial failure earlier triggers a
127150
 		// retry that retries the whole pipeline. Idempotency is via the
128151
 		// processed_at guard at the top.
internal/worker/types.gomodified
@@ -36,6 +36,15 @@ const (
3636
 	KindLifecycleSweep Kind = "lifecycle:sweep"
3737
 )
3838
 
39
+// S22 pull-request kinds. Synchronize refreshes commits + files after
40
+// a head-side push; mergeability runs the merge-tree probe; merge
41
+// performs the requested strategy in a temp worktree.
42
+const (
43
+	KindPRSynchronize Kind = "pr:synchronize"
44
+	KindPRMergeability Kind = "pr:mergeability"
45
+	KindPRMerge        Kind = "pr:merge"
46
+)
47
+
3948
 // NotifyChannel is the Postgres LISTEN/NOTIFY channel the pool subscribes
4049
 // to so it wakes up immediately when a job is enqueued, instead of
4150
 // polling. Callers wrapping enqueue in a tx must NOTIFY inside the