Go · 11014 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 package main
4
5 import (
6 "crypto/sha256"
7 "encoding/base64"
8 "errors"
9 "fmt"
10 "log/slog"
11 "os"
12 "os/signal"
13 "path/filepath"
14 "strconv"
15 "syscall"
16 "time"
17
18 "github.com/spf13/cobra"
19
20 "github.com/tenseleyFlow/shithub/internal/actions/cleanup"
21 "github.com/tenseleyFlow/shithub/internal/actions/finalize"
22 "github.com/tenseleyFlow/shithub/internal/actions/trigger"
23 "github.com/tenseleyFlow/shithub/internal/auth/audit"
24 "github.com/tenseleyFlow/shithub/internal/auth/email"
25 "github.com/tenseleyFlow/shithub/internal/auth/secretbox"
26 "github.com/tenseleyFlow/shithub/internal/auth/throttle"
27 "github.com/tenseleyFlow/shithub/internal/billing/stripebilling"
28 "github.com/tenseleyFlow/shithub/internal/infra/config"
29 "github.com/tenseleyFlow/shithub/internal/infra/db"
30 "github.com/tenseleyFlow/shithub/internal/infra/storage"
31 "github.com/tenseleyFlow/shithub/internal/webhook"
32 "github.com/tenseleyFlow/shithub/internal/worker"
33 "github.com/tenseleyFlow/shithub/internal/worker/jobs"
34 )
35
36 // auditRecorder returns the shared audit recorder. Kept as a function
37 // rather than a package-level value so future tests / non-default
38 // recorders can substitute via dependency injection.
39 func auditRecorder() *audit.Recorder { return audit.NewRecorder() }
40
41 // workerCmd boots a long-running worker pool. SIGINT/SIGTERM trigger
42 // graceful shutdown: the LISTEN goroutine drops, claim attempts stop,
43 // in-flight jobs are given a deadline to finish, then the binary exits.
44 var workerCmd = &cobra.Command{
45 Use: "worker",
46 Short: "Run background workers (push processing, size recalc, purge)",
47 RunE: func(cmd *cobra.Command, _ []string) error {
48 workersFlag, _ := cmd.Flags().GetInt("workers")
49
50 cfg, err := config.Load(nil)
51 if err != nil {
52 return fmt.Errorf("config: %w", err)
53 }
54 if cfg.DB.URL == "" {
55 return errors.New("worker: SHITHUB_DATABASE_URL unset")
56 }
57 root, err := filepath.Abs(cfg.Storage.ReposRoot)
58 if err != nil {
59 return fmt.Errorf("repos_root: %w", err)
60 }
61 rfs, err := storage.NewRepoFS(root)
62 if err != nil {
63 return fmt.Errorf("repo fs: %w", err)
64 }
65
66 ctx, stop := signal.NotifyContext(cmd.Context(), os.Interrupt, syscall.SIGTERM)
67 defer stop()
68
69 // Worker count: flag overrides env override default.
70 count := workersFlag
71 if count <= 0 {
72 if v, _ := strconv.Atoi(os.Getenv("SHITHUB_WORKERS")); v > 0 {
73 count = v
74 }
75 }
76
77 pool, err := db.Open(ctx, db.Config{
78 URL: cfg.DB.URL, MaxConns: int32(count) + 2, MinConns: 1,
79 })
80 if err != nil {
81 return fmt.Errorf("db: %w", err)
82 }
83 defer pool.Close()
84
85 logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}))
86 objectStore, err := buildWorkerObjectStore(cfg.Storage.S3, logger)
87 if err != nil {
88 return fmt.Errorf("object storage: %w", err)
89 }
90 box, boxErr := secretbox.FromBase64(cfg.Auth.TOTPKeyB64)
91 if boxErr != nil {
92 logger.Warn("secretbox unavailable for encrypted worker payloads",
93 "hint", "set Auth.TOTPKeyB64 to a base64 32-byte key",
94 "error", boxErr)
95 }
96 var stripeRemote stripebilling.Remote
97 if cfg.Billing.Enabled {
98 remote, err := stripebilling.New(stripebilling.Config{
99 SecretKey: cfg.Billing.Stripe.SecretKey,
100 WebhookSecret: cfg.Billing.Stripe.WebhookSecret,
101 TeamPriceID: cfg.Billing.Stripe.TeamPriceID,
102 AutomaticTax: cfg.Billing.Stripe.AutomaticTax,
103 })
104 if err != nil {
105 return fmt.Errorf("billing: %w", err)
106 }
107 stripeRemote = remote
108 }
109
110 p := worker.NewPool(pool, worker.PoolConfig{
111 Workers: count,
112 IdlePoll: 5 * time.Second,
113 JobTimeout: 5 * time.Minute,
114 Logger: logger,
115 })
116 p.Register(worker.KindPushProcess, jobs.PushProcess(jobs.PushProcessDeps{
117 Pool: pool, RepoFS: rfs, Logger: logger,
118 }))
119 p.Register(worker.KindRepoSizeRecalc, jobs.RepoSizeRecalc(jobs.RepoSizeRecalcDeps{
120 Pool: pool, RepoFS: rfs, Logger: logger,
121 }))
122 p.Register(worker.KindJobsPurge, jobs.JobsPurge(jobs.JobsPurgeDeps{
123 Pool: pool, Logger: logger,
124 }))
125 p.Register(worker.KindLifecycleSweep, jobs.LifecycleSweep(jobs.LifecycleSweepDeps{
126 Pool: pool, RepoFS: rfs, Audit: auditRecorder(), Logger: logger,
127 }))
128 prDeps := jobs.PRJobsDeps{Pool: pool, RepoFS: rfs, Logger: logger}
129 p.Register(worker.KindPRSynchronize, jobs.PRSynchronize(prDeps))
130 p.Register(worker.KindPRMergeability, jobs.PRMergeability(prDeps))
131
132 shithubdPath, _ := shithubdBinaryPath()
133 p.Register(worker.KindRepoForkClone, jobs.RepoForkClone(jobs.ForkCloneDeps{
134 Pool: pool, RepoFS: rfs, Logger: logger, ShithubdPath: shithubdPath,
135 }))
136
137 p.Register(worker.KindRepoIndexCode, jobs.RepoIndexCode(jobs.IndexCodeDeps{
138 Pool: pool, RepoFS: rfs, Logger: logger,
139 }))
140 p.Register(worker.KindRepoIndexReconcile, jobs.RepoIndexReconcile(jobs.IndexReconcileDeps{
141 Pool: pool, Logger: logger,
142 }))
143 importDeps := jobs.OrgGitHubImportDeps{
144 Pool: pool, RepoFS: rfs, Box: box, Audit: auditRecorder(),
145 Limiter: throttle.NewLimiter(), Logger: logger, ShithubdPath: shithubdPath,
146 }
147 p.Register(worker.KindOrgGitHubImportDiscover, jobs.OrgGitHubImportDiscover(importDeps))
148 p.Register(worker.KindOrgGitHubImportRepo, jobs.OrgGitHubImportRepo(importDeps))
149 p.Register(worker.KindOrgBillingSeatSync, jobs.OrgBillingSeatSync(jobs.OrgBillingSeatSyncDeps{
150 Pool: pool, Logger: logger, Stripe: stripeRemote,
151 }))
152
153 p.Register(worker.KindGPGBackfill, jobs.GPGBackfill(jobs.GPGBackfillDeps{
154 Pool: pool, RepoFS: rfs, Logger: logger,
155 }))
156
157 notifSender, _ := pickNotifEmailSender(cfg)
158 p.Register(worker.KindNotifyFanout, jobs.NotifyFanout(jobs.NotifyFanoutDeps{
159 Pool: pool,
160 Logger: logger,
161 EmailSender: notifSender,
162 EmailFrom: cfg.Auth.EmailFrom,
163 SiteName: cfg.Auth.SiteName,
164 BaseURL: cfg.Auth.BaseURL,
165 UnsubscribeKey: notifUnsubscribeKey(cfg, logger),
166 }))
167 p.Register(worker.KindTrendingCompute, jobs.TrendingCompute(jobs.TrendingComputeDeps{
168 Pool: pool, Logger: logger,
169 }))
170
171 // Webhook delivery (S33). The fan-out drains domain_events
172 // past its own cursor; deliver runs per-row HTTP POSTs;
173 // purge-old prunes terminal rows past the retention window.
174 // We reuse the TOTP key as the at-rest secretbox key — both
175 // are encrypted-blob columns in the same trust domain.
176 if boxErr != nil {
177 logger.Warn("webhook: secretbox unavailable; webhook delivery disabled",
178 "hint", "set Auth.TOTPKeyB64 to a base64 32-byte key",
179 "error", boxErr)
180 } else {
181 p.Register(webhook.KindWebhookFanout, jobs.WebhookFanout(jobs.WebhookFanoutDeps{
182 Pool: pool, Logger: logger,
183 }))
184 p.Register(webhook.KindWebhookDeliver, jobs.WebhookDeliver(jobs.WebhookDeliverDeps{
185 Pool: pool,
186 Logger: logger,
187 SecretBox: box,
188 SSRF: webhook.DefaultSSRFConfig(),
189 }))
190 p.Register(webhook.KindWebhookPurgeOld, jobs.WebhookPurgeOld(jobs.WebhookPurgeOldDeps{
191 Pool: pool, Logger: logger, Retention: 30 * 24 * time.Hour,
192 }))
193 }
194
195 // Actions trigger pipeline (S41b). Discovers .shithub/workflows/
196 // at the head sha, parses each, matches against the triggering
197 // event, and enqueues workflow_runs for the matches. Runs sit
198 // in 'queued' status — no runner picks them up until S41c+.
199 p.Register(trigger.KindWorkflowTrigger, trigger.Handler(trigger.JobDeps{
200 Deps: trigger.Deps{Pool: pool, Logger: logger}, RepoFS: rfs,
201 }))
202 if objectStore != nil {
203 p.Register(finalize.KindWorkflowFinalizeStep, finalize.Handler(finalize.Deps{
204 Pool: pool, ObjectStore: objectStore, Logger: logger,
205 }))
206 } else {
207 logger.Info("actions: object storage not configured; workflow step log finalization disabled")
208 }
209 p.Register(cleanup.KindWorkflowCleanup, cleanup.Handler(cleanup.Deps{
210 Pool: pool, ObjectStore: objectStore, Logger: logger,
211 }))
212
213 return p.Run(ctx)
214 },
215 }
216
217 func init() {
218 workerCmd.Flags().Int("workers", 0, "Number of worker goroutines (default 4)")
219 rootCmd.AddCommand(workerCmd)
220 }
221
222 func buildWorkerObjectStore(s config.S3StorageConfig, logger *slog.Logger) (storage.ObjectStore, error) {
223 if s.Bucket == "" {
224 return nil, nil
225 }
226 if logger != nil {
227 logger.Info("storage: configuring object store for worker", "bucket", s.Bucket)
228 }
229 return storage.NewS3Store(storage.S3Config{
230 Endpoint: s.Endpoint,
231 Region: s.Region,
232 AccessKeyID: s.AccessKeyID,
233 SecretAccessKey: s.SecretAccessKey,
234 Bucket: s.Bucket,
235 UseSSL: s.UseSSL,
236 ForcePathStyle: s.ForcePathStyle,
237 })
238 }
239
240 // pickNotifEmailSender mirrors pickAdminEmailSender / pickEmailSender
241 // in the web binary. Kept local to the worker so failure to construct
242 // the sender doesn't kill the process — fan-out without email is a
243 // supported degraded mode (inbox rows still land).
244 func pickNotifEmailSender(cfg config.Config) (email.Sender, error) {
245 switch cfg.Auth.EmailBackend {
246 case "stdout":
247 return email.NewStdoutSender(os.Stdout), nil
248 case "smtp":
249 return &email.SMTPSender{
250 Addr: cfg.Auth.SMTP.Addr,
251 From: cfg.Auth.EmailFrom,
252 Username: cfg.Auth.SMTP.Username,
253 Password: cfg.Auth.SMTP.Password,
254 }, nil
255 case "postmark":
256 return &email.PostmarkSender{
257 ServerToken: cfg.Auth.Postmark.ServerToken,
258 From: cfg.Auth.EmailFrom,
259 }, nil
260 case "resend":
261 return &email.ResendSender{
262 APIKey: cfg.Auth.Resend.APIKey,
263 From: cfg.Auth.EmailFrom,
264 }, nil
265 default:
266 return nil, errors.New("worker: unknown email_backend")
267 }
268 }
269
270 // notifUnsubscribeKey resolves the HMAC key used to sign one-click
271 // unsubscribe URLs. Operators set Notif.UnsubscribeKeyB64 in prod.
272 // In dev (key empty) we derive a deterministic 32-byte key from the
273 // session secret so unsubscribe links survive process restarts
274 // without operator action — and we log a loud warning so the
275 // derivation can't sneak into prod by accident.
276 func notifUnsubscribeKey(cfg config.Config, logger *slog.Logger) []byte {
277 if cfg.Notif.UnsubscribeKeyB64 != "" {
278 k, err := base64.StdEncoding.DecodeString(cfg.Notif.UnsubscribeKeyB64)
279 if err == nil && len(k) >= 16 {
280 return k
281 }
282 if logger != nil {
283 logger.Warn("notif: unsubscribe_key_b64 invalid; falling back to derived key",
284 "hint", "set Notif.UnsubscribeKeyB64 to base64-encoded 32+ random bytes")
285 }
286 }
287 if cfg.Session.KeyB64 != "" {
288 seed, err := base64.StdEncoding.DecodeString(cfg.Session.KeyB64)
289 if err == nil && len(seed) > 0 {
290 sum := sha256.Sum256(append([]byte("notif-unsub:"), seed...))
291 if logger != nil {
292 logger.Warn("notif: deriving unsubscribe key from session secret (dev fallback)",
293 "hint", "set Notif.UnsubscribeKeyB64 in prod")
294 }
295 return sum[:]
296 }
297 }
298 // Last-resort static dev key — links work but anyone with source
299 // access can mint them. Logged at WARN so operators notice in
300 // prod logs.
301 if logger != nil {
302 logger.Warn("notif: no key material — using static dev key",
303 "hint", "set Notif.UnsubscribeKeyB64 (or session.key_b64)")
304 }
305 return []byte("shithub-dev-unsub-static-key-32B")
306 }
307