tenseleyflow/shithub / d361661

Browse files

S27: KindRepoForkClone job + worker registration

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
d361661318f9a86d69cee682e8443725daa98753
Parents
3b9dc8e
Tree
5db1d8d

3 changed files

StatusFile+-
M cmd/shithubd/worker.go 5 0
A internal/worker/jobs/repo_fork_clone.go 172 0
M internal/worker/types.go 7 0
cmd/shithubd/worker.gomodified
@@ -96,6 +96,11 @@ var workerCmd = &cobra.Command{
9696
 		p.Register(worker.KindPRSynchronize, jobs.PRSynchronize(prDeps))
9797
 		p.Register(worker.KindPRMergeability, jobs.PRMergeability(prDeps))
9898
 
99
+		shithubdPath, _ := shithubdBinaryPath()
100
+		p.Register(worker.KindRepoForkClone, jobs.RepoForkClone(jobs.ForkCloneDeps{
101
+			Pool: pool, RepoFS: rfs, Logger: logger, ShithubdPath: shithubdPath,
102
+		}))
103
+
99104
 		return p.Run(ctx)
100105
 	},
101106
 }
internal/worker/jobs/repo_fork_clone.goadded
@@ -0,0 +1,172 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package jobs
4
+
5
+import (
6
+	"context"
7
+	"encoding/json"
8
+	"errors"
9
+	"fmt"
10
+	"log/slog"
11
+
12
+	"github.com/jackc/pgx/v5"
13
+	"github.com/jackc/pgx/v5/pgxpool"
14
+
15
+	"github.com/tenseleyFlow/shithub/internal/git/hooks"
16
+	"github.com/tenseleyFlow/shithub/internal/infra/storage"
17
+	reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
18
+	usersdb "github.com/tenseleyFlow/shithub/internal/users/sqlc"
19
+	"github.com/tenseleyFlow/shithub/internal/worker"
20
+)
21
+
22
+// ForkCloneDeps wires the job. Same shape as the rest of the fork-
23
+// adjacent jobs so the worker registration is uniform.
24
+//
25
+// ShithubdPath is the absolute path to the shithubd binary, used by
26
+// the hook installer to point the fork's pre-/post-receive hooks at
27
+// the right binary. Empty in tests that don't exercise the hook
28
+// stack — the clone still runs but the fork won't fire push:process
29
+// on subsequent user pushes.
30
+type ForkCloneDeps struct {
31
+	Pool         *pgxpool.Pool
32
+	RepoFS       *storage.RepoFS
33
+	Logger       *slog.Logger
34
+	ShithubdPath string
35
+}
36
+
37
+// ForkClonePayload — the bare minimum to find the source on disk and
38
+// the fork shell to populate. Both ids are looked up at job time so
39
+// soft-deletion between enqueue and run is detected (we set
40
+// init_failed and stop).
41
+type ForkClonePayload struct {
42
+	SourceRepoID int64 `json:"source_repo_id"`
43
+	ForkRepoID   int64 `json:"fork_repo_id"`
44
+}
45
+
46
+// RepoForkClone runs `git clone --bare --shared` for a fork shell,
47
+// then sets `extensions.preciousObjects = true` on the source so a
48
+// future `git gc` on the source can't prune objects the fork
49
+// reaches via alternates.
50
+//
51
+// On any permanent error (source missing, target name conflict on
52
+// disk, clone failure) the job flips the fork's init_status to
53
+// init_failed and returns a poison error so the worker doesn't
54
+// retry. Transient failures (DB blip mid-update) leave init_pending
55
+// and bubble up so the worker queue retries with backoff.
56
+func RepoForkClone(deps ForkCloneDeps) worker.Handler {
57
+	return func(ctx context.Context, raw json.RawMessage) error {
58
+		var p ForkClonePayload
59
+		if err := json.Unmarshal(raw, &p); err != nil {
60
+			return worker.PoisonError(fmt.Errorf("bad payload: %w", err))
61
+		}
62
+		if p.SourceRepoID == 0 || p.ForkRepoID == 0 {
63
+			return worker.PoisonError(errors.New("missing source_repo_id or fork_repo_id"))
64
+		}
65
+
66
+		rq := reposdb.New()
67
+		uq := usersdb.New()
68
+
69
+		fork, err := rq.GetRepoByID(ctx, deps.Pool, p.ForkRepoID)
70
+		if err != nil {
71
+			if errors.Is(err, pgx.ErrNoRows) {
72
+				return worker.PoisonError(fmt.Errorf("fork %d not found", p.ForkRepoID))
73
+			}
74
+			return err
75
+		}
76
+		// Idempotency: a retry after a successful run sees init_status
77
+		// = initialized and does nothing. The on-disk clone is what
78
+		// we'd want to re-do; we trust that an initialized row means
79
+		// the disk side already succeeded.
80
+		if fork.InitStatus == reposdb.RepoInitStatusInitialized {
81
+			return nil
82
+		}
83
+		source, err := rq.GetRepoByID(ctx, deps.Pool, p.SourceRepoID)
84
+		if err != nil {
85
+			return failFork(ctx, deps, p.ForkRepoID, fmt.Errorf("source %d not found", p.SourceRepoID))
86
+		}
87
+		if source.DeletedAt.Valid {
88
+			return failFork(ctx, deps, p.ForkRepoID, errors.New("source repo soft-deleted between enqueue and clone"))
89
+		}
90
+
91
+		forkOwner, err := getOwnerUsername(ctx, deps.Pool, uq, fork)
92
+		if err != nil {
93
+			return failFork(ctx, deps, p.ForkRepoID, err)
94
+		}
95
+		sourceOwner, err := getOwnerUsername(ctx, deps.Pool, uq, source)
96
+		if err != nil {
97
+			return failFork(ctx, deps, p.ForkRepoID, err)
98
+		}
99
+		forkPath, err := deps.RepoFS.RepoPath(forkOwner, fork.Name)
100
+		if err != nil {
101
+			return failFork(ctx, deps, p.ForkRepoID, err)
102
+		}
103
+		sourcePath, err := deps.RepoFS.RepoPath(sourceOwner, source.Name)
104
+		if err != nil {
105
+			return failFork(ctx, deps, p.ForkRepoID, err)
106
+		}
107
+
108
+		if err := deps.RepoFS.CloneBareShared(ctx, sourcePath, forkPath); err != nil {
109
+			// If the dst already exists with content we treat as
110
+			// poison (something is fundamentally wrong with our
111
+			// state — repo row exists but disk has unrelated content).
112
+			return failFork(ctx, deps, p.ForkRepoID, fmt.Errorf("clone: %w", err))
113
+		}
114
+
115
+		// Install push-pipeline hooks on the fork so subsequent
116
+		// user pushes fire push:process. Same install as the
117
+		// synchronous repo create path (internal/repos/create.go).
118
+		// Skipped in tests that don't pass ShithubdPath.
119
+		if deps.ShithubdPath != "" {
120
+			if err := hooks.Install(forkPath, deps.ShithubdPath); err != nil {
121
+				return failFork(ctx, deps, p.ForkRepoID, fmt.Errorf("install hooks: %w", err))
122
+			}
123
+		}
124
+
125
+		// Source GC safety: pin source's objects so future `git gc`
126
+		// can't prune what the fork reaches via alternates.
127
+		// Idempotent — running the config twice is a no-op.
128
+		if err := deps.RepoFS.SetPreciousObjects(ctx, sourcePath); err != nil {
129
+			// Not fatal for the fork — the alternates link still
130
+			// works today. Log loudly so an operator can investigate.
131
+			if deps.Logger != nil {
132
+				deps.Logger.WarnContext(ctx, "fork: set preciousObjects on source",
133
+					"source_repo_id", source.ID, "error", err)
134
+			}
135
+		}
136
+
137
+		if err := rq.SetRepoInitStatus(ctx, deps.Pool, reposdb.SetRepoInitStatusParams{
138
+			ID: fork.ID, InitStatus: reposdb.RepoInitStatusInitialized,
139
+		}); err != nil {
140
+			return err
141
+		}
142
+		return nil
143
+	}
144
+}
145
+
146
+// failFork flips the fork's init_status to init_failed and returns
147
+// a worker.PoisonError so the failure isn't retried. The repo row
148
+// stays in the DB so the user can see the failure and choose to
149
+// hard-delete; we don't auto-cleanup because that risks racing a
150
+// concurrent retry.
151
+func failFork(ctx context.Context, deps ForkCloneDeps, forkRepoID int64, cause error) error {
152
+	if err := reposdb.New().SetRepoInitStatus(ctx, deps.Pool, reposdb.SetRepoInitStatusParams{
153
+		ID: forkRepoID, InitStatus: reposdb.RepoInitStatusInitFailed,
154
+	}); err != nil && deps.Logger != nil {
155
+		deps.Logger.WarnContext(ctx, "fork: mark init_failed", "fork_repo_id", forkRepoID, "error", err)
156
+	}
157
+	return worker.PoisonError(cause)
158
+}
159
+
160
+// getOwnerUsername — same shape as the resolver in internal/repos/
161
+// fork/helpers.go but kept local to avoid pulling the orchestrator
162
+// package into the worker import graph.
163
+func getOwnerUsername(ctx context.Context, pool *pgxpool.Pool, uq *usersdb.Queries, repo reposdb.Repo) (string, error) {
164
+	if !repo.OwnerUserID.Valid {
165
+		return "", fmt.Errorf("repo %d has no user owner (org-owned arrives in S31)", repo.ID)
166
+	}
167
+	u, err := uq.GetUserByID(ctx, pool, repo.OwnerUserID.Int64)
168
+	if err != nil {
169
+		return "", fmt.Errorf("load owner: %w", err)
170
+	}
171
+	return u.Username, nil
172
+}
internal/worker/types.gomodified
@@ -48,6 +48,13 @@ const (
4848
 	KindPRMergeability Kind = "pr:mergeability"
4949
 )
5050
 
51
+// S27 fork kinds. fork_clone runs `git clone --bare --shared` for a
52
+// freshly created fork shell; the fork's init_status flips from
53
+// init_pending → initialized (or init_failed) on success/failure.
54
+const (
55
+	KindRepoForkClone Kind = "repo:fork_clone"
56
+)
57
+
5158
 // NotifyChannel is the Postgres LISTEN/NOTIFY channel the pool subscribes
5259
 // to so it wakes up immediately when a job is enqueued, instead of
5360
 // polling. Callers wrapping enqueue in a tx must NOTIFY inside the