Go · 9373 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 package lifecycle
4
5 import (
6 "context"
7 "errors"
8 "fmt"
9 "os"
10
11 "github.com/jackc/pgx/v5"
12 "github.com/jackc/pgx/v5/pgconn"
13 "github.com/jackc/pgx/v5/pgtype"
14
15 "github.com/tenseleyFlow/shithub/internal/auth/audit"
16 reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
17 usersdb "github.com/tenseleyFlow/shithub/internal/users/sqlc"
18 )
19
20 // TransferRequestParams describes a new transfer offer.
21 type TransferRequestParams struct {
22 ActorUserID int64
23 RepoID int64
24 FromUserID int64
25 ToPrincipalKind string // "user" — "org" arrives in S31
26 ToPrincipalID int64
27 }
28
29 // RequestTransfer creates a pending transfer with a 7-day TTL. Returns
30 // the new request id. Confirmation typing ("type owner/repo to confirm")
31 // belongs in the handler — this function trusts its inputs.
32 func RequestTransfer(ctx context.Context, deps Deps, p TransferRequestParams) (int64, error) {
33 if p.ToPrincipalKind != "user" {
34 return 0, fmt.Errorf("transfer: principal kind %q not supported in S16 (org transfers in S31)", p.ToPrincipalKind)
35 }
36 if p.ToPrincipalID == p.FromUserID {
37 return 0, ErrTransferToSelf
38 }
39
40 rq := reposdb.New()
41 row, err := rq.InsertTransferRequest(ctx, deps.Pool, reposdb.InsertTransferRequestParams{
42 RepoID: p.RepoID,
43 FromUserID: p.FromUserID,
44 ToPrincipalKind: reposdb.TransferPrincipalKind(p.ToPrincipalKind),
45 ToPrincipalID: p.ToPrincipalID,
46 CreatedBy: p.ActorUserID,
47 ExpiresAt: pgtype.Timestamptz{Time: deps.now().Add(transferTTL), Valid: true},
48 })
49 if err != nil {
50 return 0, fmt.Errorf("insert transfer: %w", err)
51 }
52 if deps.Audit != nil {
53 _ = deps.Audit.Record(ctx, deps.Pool, p.ActorUserID,
54 audit.ActionRepoTransferRequested, audit.TargetRepo, p.RepoID,
55 map[string]any{
56 "to_principal_kind": p.ToPrincipalKind,
57 "to_principal_id": p.ToPrincipalID,
58 "transfer_id": row.ID,
59 })
60 }
61 return row.ID, nil
62 }
63
64 // AcceptTransfer flips the transfer to accepted, updates the repo's
65 // owner, writes a redirect row from the old owner+name, and renames
66 // the bare repo on disk if the recipient also has a different name
67 // in mind (rare; defaults to keeping the same name).
68 //
69 // The recipient is the actor in this op. We re-check status under the
70 // row lock so concurrent accept/cancel/decline races are settled
71 // deterministically by the DB.
72 func AcceptTransfer(ctx context.Context, deps Deps, actorUserID, transferID int64) error {
73 rq := reposdb.New()
74 uq := usersdb.New()
75
76 tx, err := deps.Pool.Begin(ctx)
77 if err != nil {
78 return fmt.Errorf("begin: %w", err)
79 }
80 committed := false
81 defer func() {
82 if !committed {
83 _ = tx.Rollback(ctx)
84 }
85 }()
86
87 // Lock the transfer row to serialize racing accept/decline/cancel.
88 row, err := lockTransfer(ctx, tx, transferID)
89 if err != nil {
90 return err
91 }
92 if row.Status != reposdb.TransferStatusPending {
93 return ErrTransferTerminal
94 }
95 if !row.ExpiresAt.Valid || deps.now().After(row.ExpiresAt.Time) {
96 return ErrTransferExpired
97 }
98 if row.ToPrincipalKind != reposdb.TransferPrincipalKindUser || row.ToPrincipalID != actorUserID {
99 // The actor isn't the recipient — a 403 at the handler.
100 return ErrTransferTerminal
101 }
102
103 repo, err := rq.GetRepoByID(ctx, tx, row.RepoID)
104 if err != nil {
105 return fmt.Errorf("load repo: %w", err)
106 }
107 oldOwnerUserID := int64(0)
108 if repo.OwnerUserID.Valid {
109 oldOwnerUserID = repo.OwnerUserID.Int64
110 }
111
112 // Insert redirect from the old owner/name.
113 if err := rq.InsertRepoRedirect(ctx, tx, reposdb.InsertRepoRedirectParams{
114 OldOwnerUserID: pgtype.Int8{Int64: oldOwnerUserID, Valid: oldOwnerUserID != 0},
115 OldOwnerOrgID: pgtype.Int8{Valid: false},
116 OldName: repo.Name,
117 RepoID: repo.ID,
118 }); err != nil {
119 return fmt.Errorf("insert redirect: %w", err)
120 }
121
122 // Update owner. We keep the same name; if it collides on the
123 // recipient, the unique index trips and the tx rolls back.
124 if err := rq.TransferRepoOwner(ctx, tx, reposdb.TransferRepoOwnerParams{
125 ID: repo.ID,
126 Name: repo.Name,
127 OwnerUserID: pgtype.Int8{Int64: actorUserID, Valid: true},
128 OwnerOrgID: pgtype.Int8{Valid: false},
129 }); err != nil {
130 var pgErr *pgconn.PgError
131 if errAs(err, &pgErr) && pgErr.Code == "23505" {
132 return ErrNameTaken
133 }
134 return fmt.Errorf("transfer owner: %w", err)
135 }
136
137 // Mark the transfer accepted.
138 if err := rq.AcceptTransferRequest(ctx, tx, transferID); err != nil {
139 return fmt.Errorf("accept request: %w", err)
140 }
141
142 if err := tx.Commit(ctx); err != nil {
143 return fmt.Errorf("commit: %w", err)
144 }
145 committed = true
146
147 // FS move from the old owner's shard to the recipient's. Best-
148 // effort: if the on-disk repo doesn't exist (never pushed), skip.
149 oldOwner, err := uq.GetUserByID(ctx, deps.Pool, oldOwnerUserID)
150 if err != nil {
151 // Old owner row gone (recently deleted) — nothing to move.
152 return nil
153 }
154 newOwner, err := uq.GetUserByID(ctx, deps.Pool, actorUserID)
155 if err != nil {
156 // Should never happen — recipient is the actor — but don't roll
157 // back FS-wise on this rare error; the DB is the source of
158 // truth and an operator can reconcile the disk path later.
159 if deps.Logger != nil {
160 deps.Logger.WarnContext(ctx, "transfer accept: new owner lookup", "error", err)
161 }
162 return nil
163 }
164 oldPath, e1 := deps.RepoFS.RepoPath(oldOwner.Username, repo.Name)
165 newPath, e2 := deps.RepoFS.RepoPath(newOwner.Username, repo.Name)
166 if e1 != nil || e2 != nil {
167 if deps.Logger != nil {
168 deps.Logger.WarnContext(ctx, "transfer accept: path resolve", "e1", e1, "e2", e2)
169 }
170 return nil
171 }
172 if _, err := os.Stat(oldPath); err == nil {
173 if err := deps.RepoFS.Move(oldPath, newPath); err != nil {
174 if deps.Logger != nil {
175 deps.Logger.WarnContext(ctx, "transfer accept: fs move", "error", err)
176 }
177 return nil
178 }
179 }
180
181 if deps.Audit != nil {
182 _ = deps.Audit.Record(ctx, deps.Pool, actorUserID,
183 audit.ActionRepoTransferAccepted, audit.TargetRepo, repo.ID,
184 map[string]any{"transfer_id": transferID, "from_user_id": oldOwnerUserID})
185 }
186 return nil
187 }
188
189 // DeclineTransfer flips the row to declined. Recipient is the actor.
190 func DeclineTransfer(ctx context.Context, deps Deps, actorUserID, transferID int64) error {
191 rq := reposdb.New()
192 row, err := rq.GetTransferRequest(ctx, deps.Pool, transferID)
193 if err != nil {
194 return fmt.Errorf("load transfer: %w", err)
195 }
196 if row.Status != reposdb.TransferStatusPending {
197 return ErrTransferTerminal
198 }
199 if row.ToPrincipalKind != reposdb.TransferPrincipalKindUser || row.ToPrincipalID != actorUserID {
200 return ErrTransferTerminal
201 }
202 if err := rq.DeclineTransferRequest(ctx, deps.Pool, transferID); err != nil {
203 return fmt.Errorf("decline: %w", err)
204 }
205 if deps.Audit != nil {
206 _ = deps.Audit.Record(ctx, deps.Pool, actorUserID,
207 audit.ActionRepoTransferDeclined, audit.TargetRepo, row.RepoID,
208 map[string]any{"transfer_id": transferID})
209 }
210 return nil
211 }
212
213 // CancelTransfer is the sender pulling the offer. Repo-admin actors
214 // (owner) can cancel.
215 func CancelTransfer(ctx context.Context, deps Deps, actorUserID, transferID int64) error {
216 rq := reposdb.New()
217 row, err := rq.GetTransferRequest(ctx, deps.Pool, transferID)
218 if err != nil {
219 return fmt.Errorf("load transfer: %w", err)
220 }
221 if row.Status != reposdb.TransferStatusPending {
222 return ErrTransferTerminal
223 }
224 if err := rq.CancelTransferRequest(ctx, deps.Pool, transferID); err != nil {
225 return fmt.Errorf("cancel: %w", err)
226 }
227 if deps.Audit != nil {
228 _ = deps.Audit.Record(ctx, deps.Pool, actorUserID,
229 audit.ActionRepoTransferCanceled, audit.TargetRepo, row.RepoID,
230 map[string]any{"transfer_id": transferID})
231 }
232 return nil
233 }
234
235 // ExpirePending sweeps every pending transfer past its expires_at.
236 // Returns the count of rows flipped — the worker uses this for
237 // observability. Audit logs are emitted in bulk via a single row that
238 // records the count; per-transfer audit lines would be noisy and the
239 // individual transfer rows themselves carry the timestamp.
240 func ExpirePending(ctx context.Context, deps Deps) (int64, error) {
241 rq := reposdb.New()
242 n, err := rq.ExpirePendingTransfers(ctx, deps.Pool)
243 if err != nil {
244 return 0, fmt.Errorf("expire: %w", err)
245 }
246 return n, nil
247 }
248
249 // lockTransfer reads the row inside a tx with FOR UPDATE so concurrent
250 // accept/decline/cancel can't double-fire. Returns the row directly —
251 // the caller already has the transferID in scope.
252 func lockTransfer(ctx context.Context, tx pgx.Tx, transferID int64) (reposdb.RepoTransferRequest, error) {
253 row, err := tx.Query(ctx,
254 `SELECT id, repo_id, from_user_id, to_principal_kind, to_principal_id,
255 created_by, created_at, expires_at, status,
256 accepted_at, declined_at, canceled_at
257 FROM repo_transfer_requests
258 WHERE id = $1
259 FOR UPDATE`, transferID)
260 if err != nil {
261 return reposdb.RepoTransferRequest{}, fmt.Errorf("lock transfer: %w", err)
262 }
263 defer row.Close()
264 if !row.Next() {
265 return reposdb.RepoTransferRequest{}, errors.New("transfer not found")
266 }
267 var r reposdb.RepoTransferRequest
268 if err := row.Scan(&r.ID, &r.RepoID, &r.FromUserID, &r.ToPrincipalKind, &r.ToPrincipalID,
269 &r.CreatedBy, &r.CreatedAt, &r.ExpiresAt, &r.Status,
270 &r.AcceptedAt, &r.DeclinedAt, &r.CanceledAt); err != nil {
271 return reposdb.RepoTransferRequest{}, fmt.Errorf("scan transfer: %w", err)
272 }
273 return r, nil
274 }
275