tenseleyflow/shithub / a6e775f

Browse files

S14: shithubd hook, hooks reinstall, worker subcommands

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
a6e775f5e231e938dda49e1ccafe46141f6c4cbc
Parents
6943eaf
Tree
10954d9

4 changed files

StatusFile+-
A cmd/shithubd/hook.go 330 0
A cmd/shithubd/hooks_reinstall.go 104 0
M cmd/shithubd/root.go 1 4
A cmd/shithubd/worker.go 94 0
cmd/shithubd/hook.goadded
@@ -0,0 +1,330 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package main
4
+
5
+import (
6
+	"bufio"
7
+	"context"
8
+	"errors"
9
+	"fmt"
10
+	"io"
11
+	"log/slog"
12
+	"os"
13
+	"strconv"
14
+	"strings"
15
+	"time"
16
+
17
+	"github.com/jackc/pgx/v5"
18
+	"github.com/jackc/pgx/v5/pgtype"
19
+	"github.com/jackc/pgx/v5/pgxpool"
20
+	"github.com/spf13/cobra"
21
+
22
+	"github.com/tenseleyFlow/shithub/internal/infra/config"
23
+	"github.com/tenseleyFlow/shithub/internal/infra/db"
24
+	usersdb "github.com/tenseleyFlow/shithub/internal/users/sqlc"
25
+	"github.com/tenseleyFlow/shithub/internal/worker"
26
+	workerdb "github.com/tenseleyFlow/shithub/internal/worker/sqlc"
27
+)
28
+
29
+// hookCmd is the umbrella for `shithubd hook <name>`. Each named hook
30
+// is a leaf subcommand; the symlink shim installed by hooks.Install
31
+// invokes one of them. Hidden because no human runs these directly.
32
+var hookCmd = &cobra.Command{
33
+	Use:    "hook",
34
+	Short:  "Git hook entrypoints (post-receive, pre-receive)",
35
+	Hidden: true,
36
+}
37
+
38
+// hookPreReceiveCmd implements the minimum-gates pre-receive hook
39
+// described in S14. Full branch-protection gates land in S20.
40
+//
41
+// Stdin lines: "<old_sha> <new_sha> <ref>".
42
+//
43
+// Exit codes:
44
+//   - 0: accept the push.
45
+//   - 1: reject; git aborts and prints whatever we wrote to stderr.
46
+//
47
+// Latency budget: under 100ms for the common case (no archive/suspension).
48
+// We re-check user/repo state from the DB to avoid trusting potentially
49
+// stale env vars from long-lived SSH sessions.
50
+var hookPreReceiveCmd = &cobra.Command{
51
+	Use:    "pre-receive",
52
+	Short:  "Hook: pre-receive — minimum-gates accept/reject",
53
+	Hidden: true,
54
+	RunE: func(cmd *cobra.Command, _ []string) error {
55
+		ctx, cancel := context.WithTimeout(cmd.Context(), 5*time.Second)
56
+		defer cancel()
57
+
58
+		hook, err := loadHookCtx(ctx)
59
+		if err != nil {
60
+			fmt.Fprintln(cmd.ErrOrStderr(), friendlyHookErr(err))
61
+			return err
62
+		}
63
+		defer hook.pool.Close()
64
+
65
+		// Drain stdin so git doesn't EPIPE — we don't actually need the
66
+		// per-ref data for the minimum gates, but a future protection
67
+		// engine (S20) does. Reading and discarding is the safe contract.
68
+		_, _ = io.Copy(io.Discard, cmd.InOrStdin())
69
+
70
+		if err := preReceiveCheck(ctx, hook); err != nil {
71
+			fmt.Fprintln(cmd.ErrOrStderr(), friendlyHookErr(err))
72
+			return err
73
+		}
74
+		return nil
75
+	},
76
+}
77
+
78
+// hookPostReceiveCmd records each pushed ref as a push_events row,
79
+// enqueues a push:process job per ref, and NOTIFYs idle workers.
80
+// Latency budget: under 100ms for typical small pushes; we keep the
81
+// hook to INSERT + NOTIFY + exit. No HTTP calls, no derivation work.
82
+var hookPostReceiveCmd = &cobra.Command{
83
+	Use:    "post-receive",
84
+	Short:  "Hook: post-receive — enqueue async processing",
85
+	Hidden: true,
86
+	RunE: func(cmd *cobra.Command, _ []string) error {
87
+		ctx, cancel := context.WithTimeout(cmd.Context(), 5*time.Second)
88
+		defer cancel()
89
+
90
+		hook, err := loadHookCtx(ctx)
91
+		if err != nil {
92
+			// post-receive is non-fatal: the push has already landed. We
93
+			// log to stderr (the user's git client sees it) but exit 0
94
+			// so the push isn't reported as failed.
95
+			fmt.Fprintln(cmd.ErrOrStderr(), "shithub: warning: post-receive enqueue skipped:", err)
96
+			return nil
97
+		}
98
+		defer hook.pool.Close()
99
+
100
+		refs, err := readRefLines(cmd.InOrStdin())
101
+		if err != nil || len(refs) == 0 {
102
+			return nil
103
+		}
104
+
105
+		if err := postReceiveEnqueue(ctx, hook, refs); err != nil {
106
+			fmt.Fprintln(cmd.ErrOrStderr(), "shithub: warning: post-receive enqueue:", err)
107
+		}
108
+		return nil
109
+	},
110
+}
111
+
112
+// hookCtx bundles the deps each hook subcommand needs. Loaded once per
113
+// invocation; closed by the caller via defer.
114
+type hookCtx struct {
115
+	cfg    config.Config
116
+	pool   *pgxpool.Pool
117
+	logger *slog.Logger
118
+
119
+	userID     int64
120
+	username   string
121
+	repoID     int64
122
+	repoFull   string
123
+	protocol   string
124
+	remoteIP   string
125
+	requestID  string
126
+}
127
+
128
+func loadHookCtx(ctx context.Context) (*hookCtx, error) {
129
+	cfg, err := config.Load(nil)
130
+	if err != nil {
131
+		return nil, fmt.Errorf("config: %w", err)
132
+	}
133
+	if cfg.DB.URL == "" {
134
+		return nil, errors.New("DB URL not set")
135
+	}
136
+
137
+	pool, err := db.Open(ctx, db.Config{
138
+		URL: cfg.DB.URL, MaxConns: 2, MinConns: 0,
139
+		ConnectTimeout: 1500 * time.Millisecond,
140
+	})
141
+	if err != nil {
142
+		return nil, fmt.Errorf("db: %w", err)
143
+	}
144
+
145
+	logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}))
146
+
147
+	uid, _ := strconv.ParseInt(os.Getenv("SHITHUB_USER_ID"), 10, 64)
148
+	rid, _ := strconv.ParseInt(os.Getenv("SHITHUB_REPO_ID"), 10, 64)
149
+	return &hookCtx{
150
+		cfg:       cfg,
151
+		pool:      pool,
152
+		logger:    logger,
153
+		userID:    uid,
154
+		username:  os.Getenv("SHITHUB_USERNAME"),
155
+		repoID:    rid,
156
+		repoFull:  os.Getenv("SHITHUB_REPO_FULL_NAME"),
157
+		protocol:  os.Getenv("SHITHUB_PROTOCOL"),
158
+		remoteIP:  os.Getenv("SHITHUB_REMOTE_IP"),
159
+		requestID: os.Getenv("SHITHUB_REQUEST_ID"),
160
+	}, nil
161
+}
162
+
163
+// errHookGate is the typed error pre-receive returns for each rejection
164
+// reason. friendlyHookErr maps these back to user-facing messages.
165
+type errHookGate struct{ kind string }
166
+
167
+func (e errHookGate) Error() string { return "shithub-hook: " + e.kind }
168
+
169
+var (
170
+	errHookSuspended = errHookGate{"user suspended"}
171
+	errHookArchived  = errHookGate{"repo archived"}
172
+	errHookDeleted   = errHookGate{"repo deleted"}
173
+	errHookMissing   = errHookGate{"missing context"}
174
+)
175
+
176
+func friendlyHookErr(err error) string {
177
+	switch {
178
+	case errors.Is(err, errHookSuspended):
179
+		return "shithub: your account is suspended; pushes are disabled."
180
+	case errors.Is(err, errHookArchived):
181
+		return "shithub: this repository is archived; pushes are disabled."
182
+	case errors.Is(err, errHookDeleted):
183
+		return "shithub: this repository has been deleted."
184
+	case errors.Is(err, errHookMissing):
185
+		return "shithub: server error: hook context missing. Contact the operator."
186
+	default:
187
+		return "shithub: server error: " + err.Error()
188
+	}
189
+}
190
+
191
+func preReceiveCheck(ctx context.Context, h *hookCtx) error {
192
+	if h.userID == 0 || h.repoID == 0 {
193
+		return errHookMissing
194
+	}
195
+	uq := usersdb.New()
196
+	user, err := uq.GetUserByID(ctx, h.pool, h.userID)
197
+	if err != nil {
198
+		return fmt.Errorf("user lookup: %w", err)
199
+	}
200
+	if user.SuspendedAt.Valid {
201
+		return errHookSuspended
202
+	}
203
+
204
+	row := h.pool.QueryRow(ctx,
205
+		`SELECT is_archived, deleted_at FROM repos WHERE id = $1`, h.repoID)
206
+	var archived bool
207
+	var deletedAt pgtype.Timestamptz
208
+	if err := row.Scan(&archived, &deletedAt); err != nil {
209
+		return fmt.Errorf("repo lookup: %w", err)
210
+	}
211
+	if deletedAt.Valid {
212
+		return errHookDeleted
213
+	}
214
+	if archived {
215
+		return errHookArchived
216
+	}
217
+	return nil
218
+}
219
+
220
+func postReceiveEnqueue(ctx context.Context, h *hookCtx, refs []refUpdate) error {
221
+	if h.repoID == 0 {
222
+		return errHookMissing
223
+	}
224
+
225
+	tx, err := h.pool.Begin(ctx)
226
+	if err != nil {
227
+		return fmt.Errorf("begin: %w", err)
228
+	}
229
+	committed := false
230
+	defer func() {
231
+		if !committed {
232
+			_ = tx.Rollback(ctx)
233
+		}
234
+	}()
235
+
236
+	wq := workerdb.New()
237
+	protocol := h.protocol
238
+	if protocol == "" {
239
+		protocol = "ssh" // safe fallback when env is missing
240
+	}
241
+	for _, r := range refs {
242
+		event, err := wq.InsertPushEvent(ctx, tx, workerdb.InsertPushEventParams{
243
+			RepoID:        h.repoID,
244
+			BeforeSha:     r.before,
245
+			AfterSha:      r.after,
246
+			Ref:           r.ref,
247
+			Protocol:      protocol,
248
+			PusherUserID:  pgtype.Int8{Int64: h.userID, Valid: h.userID != 0},
249
+			RequestID:     pgtype.Text{String: h.requestID, Valid: h.requestID != ""},
250
+		})
251
+		if err != nil {
252
+			return fmt.Errorf("insert push_event: %w", err)
253
+		}
254
+		if _, err := worker.Enqueue(ctx, tx, worker.KindPushProcess,
255
+			map[string]any{"push_event_id": event.ID},
256
+			worker.EnqueueOptions{}); err != nil {
257
+			return fmt.Errorf("enqueue push:process: %w", err)
258
+		}
259
+	}
260
+	if err := worker.Notify(ctx, tx); err != nil {
261
+		// Notify failure inside tx is non-fatal — workers also poll.
262
+		h.logger.WarnContext(ctx, "post-receive: NOTIFY failed", "error", err)
263
+	}
264
+	if err := tx.Commit(ctx); err != nil {
265
+		return fmt.Errorf("commit: %w", err)
266
+	}
267
+	committed = true
268
+	return nil
269
+}
270
+
271
+// refUpdate is one stdin line as parsed by readRefLines.
272
+type refUpdate struct {
273
+	before, after, ref string
274
+}
275
+
276
+func readRefLines(r io.Reader) ([]refUpdate, error) {
277
+	var out []refUpdate
278
+	sc := bufio.NewScanner(r)
279
+	sc.Buffer(make([]byte, 0, 64<<10), 1<<20)
280
+	for sc.Scan() {
281
+		line := strings.TrimSpace(sc.Text())
282
+		if line == "" {
283
+			continue
284
+		}
285
+		parts := strings.Fields(line)
286
+		if len(parts) != 3 {
287
+			continue
288
+		}
289
+		out = append(out, refUpdate{before: parts[0], after: parts[1], ref: parts[2]})
290
+	}
291
+	return out, sc.Err()
292
+}
293
+
294
+// hooksReinstallCmd reinstalls hook symlinks on every active repo, used
295
+// after a binary path change in production deploys. --repo runs against
296
+// a single owner/name; --all walks every repo via the DB.
297
+var hooksReinstallCmd = &cobra.Command{
298
+	Use:   "reinstall",
299
+	Short: "Reinstall hook symlinks on existing repos",
300
+	RunE: func(cmd *cobra.Command, _ []string) error {
301
+		all, _ := cmd.Flags().GetBool("all")
302
+		repo, _ := cmd.Flags().GetString("repo")
303
+		if !all && repo == "" {
304
+			return errors.New("hooks reinstall: pass --all or --repo owner/name")
305
+		}
306
+		return runHooksReinstall(cmd.Context(), all, repo, cmd.OutOrStdout())
307
+	},
308
+}
309
+
310
+// hooksParentCmd is the umbrella so the operator command reads as
311
+// `shithubd hooks reinstall ...`.
312
+var hooksParentCmd = &cobra.Command{
313
+	Use:   "hooks",
314
+	Short: "Operator commands for git hook installation",
315
+}
316
+
317
+func init() {
318
+	hookCmd.AddCommand(hookPreReceiveCmd)
319
+	hookCmd.AddCommand(hookPostReceiveCmd)
320
+	hooksReinstallCmd.Flags().Bool("all", false, "Reinstall on every active repo")
321
+	hooksReinstallCmd.Flags().String("repo", "", "Reinstall on owner/name only")
322
+	hooksParentCmd.AddCommand(hooksReinstallCmd)
323
+
324
+	rootCmd.AddCommand(hookCmd)
325
+	rootCmd.AddCommand(hooksParentCmd)
326
+}
327
+
328
+// silence unused import warnings during incremental builds — the pgx
329
+// import is used through the hookCtx pool helper above.
330
+var _ = pgx.ErrNoRows
cmd/shithubd/hooks_reinstall.goadded
@@ -0,0 +1,104 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package main
4
+
5
+import (
6
+	"context"
7
+	"errors"
8
+	"fmt"
9
+	"io"
10
+	"os"
11
+	"path/filepath"
12
+	"strings"
13
+	"time"
14
+
15
+	"github.com/tenseleyFlow/shithub/internal/git/hooks"
16
+	"github.com/tenseleyFlow/shithub/internal/infra/config"
17
+	"github.com/tenseleyFlow/shithub/internal/infra/db"
18
+	"github.com/tenseleyFlow/shithub/internal/infra/storage"
19
+	reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
20
+)
21
+
22
+// runHooksReinstall is the implementation of `shithubd hooks reinstall`.
23
+// It owns the deployment-time bootstrap of hook shims — call after
24
+// upgrading the binary so every repo's hook scripts point at the new
25
+// path.
26
+func runHooksReinstall(ctx context.Context, all bool, repoArg string, out io.Writer) error {
27
+	cfg, err := config.Load(nil)
28
+	if err != nil {
29
+		return fmt.Errorf("config: %w", err)
30
+	}
31
+	root, err := filepath.Abs(cfg.Storage.ReposRoot)
32
+	if err != nil || root == "" {
33
+		return fmt.Errorf("repos_root unset")
34
+	}
35
+	rfs, err := storage.NewRepoFS(root)
36
+	if err != nil {
37
+		return fmt.Errorf("repo fs: %w", err)
38
+	}
39
+	binPath, err := shithubdBinaryPath()
40
+	if err != nil {
41
+		return fmt.Errorf("binary path: %w", err)
42
+	}
43
+
44
+	if !all {
45
+		owner, name, ok := strings.Cut(repoArg, "/")
46
+		if !ok || owner == "" || name == "" {
47
+			return errors.New("hooks reinstall: --repo wants owner/name")
48
+		}
49
+		gitDir, err := rfs.RepoPath(owner, name)
50
+		if err != nil {
51
+			return fmt.Errorf("repo path: %w", err)
52
+		}
53
+		if err := hooks.Install(gitDir, binPath); err != nil {
54
+			return fmt.Errorf("install: %w", err)
55
+		}
56
+		fmt.Fprintf(out, "ok: %s/%s\n", owner, name)
57
+		return nil
58
+	}
59
+
60
+	// --all: enumerate via DB.
61
+	dbCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
62
+	defer cancel()
63
+	pool, err := db.Open(dbCtx, db.Config{URL: cfg.DB.URL, MaxConns: 4})
64
+	if err != nil {
65
+		return fmt.Errorf("db: %w", err)
66
+	}
67
+	defer pool.Close()
68
+	rq := reposdb.New()
69
+	rows, err := rq.ListAllRepoFullNames(dbCtx, pool)
70
+	if err != nil {
71
+		return fmt.Errorf("list repos: %w", err)
72
+	}
73
+	var ok, fail int
74
+	for _, r := range rows {
75
+		gitDir, err := rfs.RepoPath(r.OwnerUsername, r.Name)
76
+		if err != nil {
77
+			fmt.Fprintf(out, "skip: %s/%s: %v\n", r.OwnerUsername, r.Name, err)
78
+			fail++
79
+			continue
80
+		}
81
+		if err := hooks.Install(gitDir, binPath); err != nil {
82
+			fmt.Fprintf(out, "fail: %s/%s: %v\n", r.OwnerUsername, r.Name, err)
83
+			fail++
84
+			continue
85
+		}
86
+		ok++
87
+	}
88
+	fmt.Fprintf(out, "reinstalled hooks on %d repos (%d failed)\n", ok, fail)
89
+	if fail > 0 {
90
+		return fmt.Errorf("%d failures", fail)
91
+	}
92
+	return nil
93
+}
94
+
95
+// shithubdBinaryPath returns the absolute path of the running binary.
96
+// hooks.Install bakes this into the shim so every push exec's the same
97
+// version that wrote the hook.
98
+func shithubdBinaryPath() (string, error) {
99
+	exe, err := os.Executable()
100
+	if err != nil {
101
+		return "", err
102
+	}
103
+	return filepath.Abs(exe)
104
+}
cmd/shithubd/root.gomodified
@@ -31,12 +31,9 @@ func init() {
3131
 	rootCmd.AddCommand(migrateCmd)
3232
 	rootCmd.AddCommand(seedCmd)
3333
 
34
-	// Stubs for subcommands implemented in later sprints. They surface in
35
-	// `--help` so the operator-facing interface is discoverable from day one.
36
-	rootCmd.AddCommand(stubCmd("worker", "Run background workers", "S14"))
34
+	// worker, hook, hooks (reinstall) registered in their own files.
3735
 	// ssh-authkeys and ssh-shell are registered in cmd/shithubd/ssh.go.
3836
 	rootCmd.AddCommand(storageCmd)
39
-	rootCmd.AddCommand(stubCmd("hook", "Git hook entrypoint", "S14"))
4037
 	rootCmd.AddCommand(adminCmd)
4138
 	rootCmd.AddCommand(configCmd)
4239
 }
cmd/shithubd/worker.goadded
@@ -0,0 +1,94 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package main
4
+
5
+import (
6
+	"errors"
7
+	"fmt"
8
+	"log/slog"
9
+	"os"
10
+	"os/signal"
11
+	"path/filepath"
12
+	"strconv"
13
+	"syscall"
14
+	"time"
15
+
16
+	"github.com/spf13/cobra"
17
+
18
+	"github.com/tenseleyFlow/shithub/internal/infra/config"
19
+	"github.com/tenseleyFlow/shithub/internal/infra/db"
20
+	"github.com/tenseleyFlow/shithub/internal/infra/storage"
21
+	"github.com/tenseleyFlow/shithub/internal/worker"
22
+	"github.com/tenseleyFlow/shithub/internal/worker/jobs"
23
+)
24
+
25
+// workerCmd boots a long-running worker pool. SIGINT/SIGTERM trigger
26
+// graceful shutdown: the LISTEN goroutine drops, claim attempts stop,
27
+// in-flight jobs are given a deadline to finish, then the binary exits.
28
+var workerCmd = &cobra.Command{
29
+	Use:   "worker",
30
+	Short: "Run background workers (push processing, size recalc, purge)",
31
+	RunE: func(cmd *cobra.Command, _ []string) error {
32
+		workersFlag, _ := cmd.Flags().GetInt("workers")
33
+
34
+		cfg, err := config.Load(nil)
35
+		if err != nil {
36
+			return fmt.Errorf("config: %w", err)
37
+		}
38
+		if cfg.DB.URL == "" {
39
+			return errors.New("worker: SHITHUB_DATABASE_URL unset")
40
+		}
41
+		root, err := filepath.Abs(cfg.Storage.ReposRoot)
42
+		if err != nil {
43
+			return fmt.Errorf("repos_root: %w", err)
44
+		}
45
+		rfs, err := storage.NewRepoFS(root)
46
+		if err != nil {
47
+			return fmt.Errorf("repo fs: %w", err)
48
+		}
49
+
50
+		ctx, stop := signal.NotifyContext(cmd.Context(), os.Interrupt, syscall.SIGTERM)
51
+		defer stop()
52
+
53
+		// Worker count: flag overrides env override default.
54
+		count := workersFlag
55
+		if count <= 0 {
56
+			if v, _ := strconv.Atoi(os.Getenv("SHITHUB_WORKERS")); v > 0 {
57
+				count = v
58
+			}
59
+		}
60
+
61
+		pool, err := db.Open(ctx, db.Config{
62
+			URL: cfg.DB.URL, MaxConns: int32(count) + 2, MinConns: 1,
63
+		})
64
+		if err != nil {
65
+			return fmt.Errorf("db: %w", err)
66
+		}
67
+		defer pool.Close()
68
+
69
+		logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}))
70
+
71
+		p := worker.NewPool(pool, worker.PoolConfig{
72
+			Workers:    count,
73
+			IdlePoll:   5 * time.Second,
74
+			JobTimeout: 5 * time.Minute,
75
+			Logger:     logger,
76
+		})
77
+		p.Register(worker.KindPushProcess, jobs.PushProcess(jobs.PushProcessDeps{
78
+			Pool: pool, RepoFS: rfs, Logger: logger,
79
+		}))
80
+		p.Register(worker.KindRepoSizeRecalc, jobs.RepoSizeRecalc(jobs.RepoSizeRecalcDeps{
81
+			Pool: pool, RepoFS: rfs, Logger: logger,
82
+		}))
83
+		p.Register(worker.KindJobsPurge, jobs.JobsPurge(jobs.JobsPurgeDeps{
84
+			Pool: pool, Logger: logger,
85
+		}))
86
+
87
+		return p.Run(ctx)
88
+	},
89
+}
90
+
91
+func init() {
92
+	workerCmd.Flags().Int("workers", 0, "Number of worker goroutines (default 4)")
93
+	rootCmd.AddCommand(workerCmd)
94
+}