Go · 7732 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 package main
4
5 import (
6 "crypto/sha256"
7 "encoding/base64"
8 "errors"
9 "fmt"
10 "log/slog"
11 "os"
12 "os/signal"
13 "path/filepath"
14 "strconv"
15 "syscall"
16 "time"
17
18 "github.com/spf13/cobra"
19
20 "github.com/tenseleyFlow/shithub/internal/auth/audit"
21 "github.com/tenseleyFlow/shithub/internal/auth/email"
22 "github.com/tenseleyFlow/shithub/internal/auth/secretbox"
23 "github.com/tenseleyFlow/shithub/internal/infra/config"
24 "github.com/tenseleyFlow/shithub/internal/infra/db"
25 "github.com/tenseleyFlow/shithub/internal/infra/storage"
26 "github.com/tenseleyFlow/shithub/internal/webhook"
27 "github.com/tenseleyFlow/shithub/internal/worker"
28 "github.com/tenseleyFlow/shithub/internal/worker/jobs"
29 )
30
31 // auditRecorder returns the shared audit recorder. Kept as a function
32 // rather than a package-level value so future tests / non-default
33 // recorders can substitute via dependency injection.
34 func auditRecorder() *audit.Recorder { return audit.NewRecorder() }
35
36 // workerCmd boots a long-running worker pool. SIGINT/SIGTERM trigger
37 // graceful shutdown: the LISTEN goroutine drops, claim attempts stop,
38 // in-flight jobs are given a deadline to finish, then the binary exits.
39 var workerCmd = &cobra.Command{
40 Use: "worker",
41 Short: "Run background workers (push processing, size recalc, purge)",
42 RunE: func(cmd *cobra.Command, _ []string) error {
43 workersFlag, _ := cmd.Flags().GetInt("workers")
44
45 cfg, err := config.Load(nil)
46 if err != nil {
47 return fmt.Errorf("config: %w", err)
48 }
49 if cfg.DB.URL == "" {
50 return errors.New("worker: SHITHUB_DATABASE_URL unset")
51 }
52 root, err := filepath.Abs(cfg.Storage.ReposRoot)
53 if err != nil {
54 return fmt.Errorf("repos_root: %w", err)
55 }
56 rfs, err := storage.NewRepoFS(root)
57 if err != nil {
58 return fmt.Errorf("repo fs: %w", err)
59 }
60
61 ctx, stop := signal.NotifyContext(cmd.Context(), os.Interrupt, syscall.SIGTERM)
62 defer stop()
63
64 // Worker count: flag overrides env override default.
65 count := workersFlag
66 if count <= 0 {
67 if v, _ := strconv.Atoi(os.Getenv("SHITHUB_WORKERS")); v > 0 {
68 count = v
69 }
70 }
71
72 pool, err := db.Open(ctx, db.Config{
73 URL: cfg.DB.URL, MaxConns: int32(count) + 2, MinConns: 1,
74 })
75 if err != nil {
76 return fmt.Errorf("db: %w", err)
77 }
78 defer pool.Close()
79
80 logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}))
81
82 p := worker.NewPool(pool, worker.PoolConfig{
83 Workers: count,
84 IdlePoll: 5 * time.Second,
85 JobTimeout: 5 * time.Minute,
86 Logger: logger,
87 })
88 p.Register(worker.KindPushProcess, jobs.PushProcess(jobs.PushProcessDeps{
89 Pool: pool, RepoFS: rfs, Logger: logger,
90 }))
91 p.Register(worker.KindRepoSizeRecalc, jobs.RepoSizeRecalc(jobs.RepoSizeRecalcDeps{
92 Pool: pool, RepoFS: rfs, Logger: logger,
93 }))
94 p.Register(worker.KindJobsPurge, jobs.JobsPurge(jobs.JobsPurgeDeps{
95 Pool: pool, Logger: logger,
96 }))
97 p.Register(worker.KindLifecycleSweep, jobs.LifecycleSweep(jobs.LifecycleSweepDeps{
98 Pool: pool, RepoFS: rfs, Audit: auditRecorder(), Logger: logger,
99 }))
100 prDeps := jobs.PRJobsDeps{Pool: pool, RepoFS: rfs, Logger: logger}
101 p.Register(worker.KindPRSynchronize, jobs.PRSynchronize(prDeps))
102 p.Register(worker.KindPRMergeability, jobs.PRMergeability(prDeps))
103
104 shithubdPath, _ := shithubdBinaryPath()
105 p.Register(worker.KindRepoForkClone, jobs.RepoForkClone(jobs.ForkCloneDeps{
106 Pool: pool, RepoFS: rfs, Logger: logger, ShithubdPath: shithubdPath,
107 }))
108
109 p.Register(worker.KindRepoIndexCode, jobs.RepoIndexCode(jobs.IndexCodeDeps{
110 Pool: pool, RepoFS: rfs, Logger: logger,
111 }))
112 p.Register(worker.KindRepoIndexReconcile, jobs.RepoIndexReconcile(jobs.IndexReconcileDeps{
113 Pool: pool, Logger: logger,
114 }))
115
116 notifSender, _ := pickNotifEmailSender(cfg)
117 p.Register(worker.KindNotifyFanout, jobs.NotifyFanout(jobs.NotifyFanoutDeps{
118 Pool: pool,
119 Logger: logger,
120 EmailSender: notifSender,
121 EmailFrom: cfg.Auth.EmailFrom,
122 SiteName: cfg.Auth.SiteName,
123 BaseURL: cfg.Auth.BaseURL,
124 UnsubscribeKey: notifUnsubscribeKey(cfg, logger),
125 }))
126
127 // Webhook delivery (S33). The fan-out drains domain_events
128 // past its own cursor; deliver runs per-row HTTP POSTs;
129 // purge-old prunes terminal rows past the retention window.
130 // We reuse the TOTP key as the at-rest secretbox key — both
131 // are encrypted-blob columns in the same trust domain.
132 hookBox, hookBoxErr := secretbox.FromBase64(cfg.Auth.TOTPKeyB64)
133 if hookBoxErr != nil {
134 logger.Warn("webhook: secretbox unavailable; webhook delivery disabled",
135 "hint", "set Auth.TOTPKeyB64 to a base64 32-byte key",
136 "error", hookBoxErr)
137 } else {
138 p.Register(webhook.KindWebhookFanout, jobs.WebhookFanout(jobs.WebhookFanoutDeps{
139 Pool: pool, Logger: logger,
140 }))
141 p.Register(webhook.KindWebhookDeliver, jobs.WebhookDeliver(jobs.WebhookDeliverDeps{
142 Pool: pool,
143 Logger: logger,
144 SecretBox: hookBox,
145 SSRF: webhook.DefaultSSRFConfig(),
146 }))
147 p.Register(webhook.KindWebhookPurgeOld, jobs.WebhookPurgeOld(jobs.WebhookPurgeOldDeps{
148 Pool: pool, Logger: logger, Retention: 30 * 24 * time.Hour,
149 }))
150 }
151
152 return p.Run(ctx)
153 },
154 }
155
156 func init() {
157 workerCmd.Flags().Int("workers", 0, "Number of worker goroutines (default 4)")
158 rootCmd.AddCommand(workerCmd)
159 }
160
161 // pickNotifEmailSender mirrors pickAdminEmailSender / pickEmailSender
162 // in the web binary. Kept local to the worker so failure to construct
163 // the sender doesn't kill the process — fan-out without email is a
164 // supported degraded mode (inbox rows still land).
165 func pickNotifEmailSender(cfg config.Config) (email.Sender, error) {
166 switch cfg.Auth.EmailBackend {
167 case "stdout":
168 return email.NewStdoutSender(os.Stdout), nil
169 case "smtp":
170 return &email.SMTPSender{
171 Addr: cfg.Auth.SMTP.Addr,
172 From: cfg.Auth.EmailFrom,
173 Username: cfg.Auth.SMTP.Username,
174 Password: cfg.Auth.SMTP.Password,
175 }, nil
176 case "postmark":
177 return &email.PostmarkSender{
178 ServerToken: cfg.Auth.Postmark.ServerToken,
179 From: cfg.Auth.EmailFrom,
180 }, nil
181 default:
182 return nil, errors.New("worker: unknown email_backend")
183 }
184 }
185
186 // notifUnsubscribeKey resolves the HMAC key used to sign one-click
187 // unsubscribe URLs. Operators set Notif.UnsubscribeKeyB64 in prod.
188 // In dev (key empty) we derive a deterministic 32-byte key from the
189 // session secret so unsubscribe links survive process restarts
190 // without operator action — and we log a loud warning so the
191 // derivation can't sneak into prod by accident.
192 func notifUnsubscribeKey(cfg config.Config, logger *slog.Logger) []byte {
193 if cfg.Notif.UnsubscribeKeyB64 != "" {
194 k, err := base64.StdEncoding.DecodeString(cfg.Notif.UnsubscribeKeyB64)
195 if err == nil && len(k) >= 16 {
196 return k
197 }
198 if logger != nil {
199 logger.Warn("notif: unsubscribe_key_b64 invalid; falling back to derived key",
200 "hint", "set Notif.UnsubscribeKeyB64 to base64-encoded 32+ random bytes")
201 }
202 }
203 if cfg.Session.KeyB64 != "" {
204 seed, err := base64.StdEncoding.DecodeString(cfg.Session.KeyB64)
205 if err == nil && len(seed) > 0 {
206 sum := sha256.Sum256(append([]byte("notif-unsub:"), seed...))
207 if logger != nil {
208 logger.Warn("notif: deriving unsubscribe key from session secret (dev fallback)",
209 "hint", "set Notif.UnsubscribeKeyB64 in prod")
210 }
211 return sum[:]
212 }
213 }
214 // Last-resort static dev key — links work but anyone with source
215 // access can mint them. Logged at WARN so operators notice in
216 // prod logs.
217 if logger != nil {
218 logger.Warn("notif: no key material — using static dev key",
219 "hint", "set Notif.UnsubscribeKeyB64 (or session.key_b64)")
220 }
221 return []byte("shithub-dev-unsub-static-key-32B")
222 }
223