Go · 10766 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 package main
4
5 import (
6 "crypto/sha256"
7 "encoding/base64"
8 "errors"
9 "fmt"
10 "log/slog"
11 "os"
12 "os/signal"
13 "path/filepath"
14 "strconv"
15 "syscall"
16 "time"
17
18 "github.com/spf13/cobra"
19
20 "github.com/tenseleyFlow/shithub/internal/actions/cleanup"
21 "github.com/tenseleyFlow/shithub/internal/actions/finalize"
22 "github.com/tenseleyFlow/shithub/internal/actions/trigger"
23 "github.com/tenseleyFlow/shithub/internal/auth/audit"
24 "github.com/tenseleyFlow/shithub/internal/auth/email"
25 "github.com/tenseleyFlow/shithub/internal/auth/secretbox"
26 "github.com/tenseleyFlow/shithub/internal/auth/throttle"
27 "github.com/tenseleyFlow/shithub/internal/billing/stripebilling"
28 "github.com/tenseleyFlow/shithub/internal/infra/config"
29 "github.com/tenseleyFlow/shithub/internal/infra/db"
30 "github.com/tenseleyFlow/shithub/internal/infra/storage"
31 "github.com/tenseleyFlow/shithub/internal/webhook"
32 "github.com/tenseleyFlow/shithub/internal/worker"
33 "github.com/tenseleyFlow/shithub/internal/worker/jobs"
34 )
35
36 // auditRecorder returns the shared audit recorder. Kept as a function
37 // rather than a package-level value so future tests / non-default
38 // recorders can substitute via dependency injection.
39 func auditRecorder() *audit.Recorder { return audit.NewRecorder() }
40
41 // workerCmd boots a long-running worker pool. SIGINT/SIGTERM trigger
42 // graceful shutdown: the LISTEN goroutine drops, claim attempts stop,
43 // in-flight jobs are given a deadline to finish, then the binary exits.
44 var workerCmd = &cobra.Command{
45 Use: "worker",
46 Short: "Run background workers (push processing, size recalc, purge)",
47 RunE: func(cmd *cobra.Command, _ []string) error {
48 workersFlag, _ := cmd.Flags().GetInt("workers")
49
50 cfg, err := config.Load(nil)
51 if err != nil {
52 return fmt.Errorf("config: %w", err)
53 }
54 if cfg.DB.URL == "" {
55 return errors.New("worker: SHITHUB_DATABASE_URL unset")
56 }
57 root, err := filepath.Abs(cfg.Storage.ReposRoot)
58 if err != nil {
59 return fmt.Errorf("repos_root: %w", err)
60 }
61 rfs, err := storage.NewRepoFS(root)
62 if err != nil {
63 return fmt.Errorf("repo fs: %w", err)
64 }
65
66 ctx, stop := signal.NotifyContext(cmd.Context(), os.Interrupt, syscall.SIGTERM)
67 defer stop()
68
69 // Worker count: flag overrides env override default.
70 count := workersFlag
71 if count <= 0 {
72 if v, _ := strconv.Atoi(os.Getenv("SHITHUB_WORKERS")); v > 0 {
73 count = v
74 }
75 }
76
77 pool, err := db.Open(ctx, db.Config{
78 URL: cfg.DB.URL, MaxConns: int32(count) + 2, MinConns: 1,
79 })
80 if err != nil {
81 return fmt.Errorf("db: %w", err)
82 }
83 defer pool.Close()
84
85 logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}))
86 objectStore, err := buildWorkerObjectStore(cfg.Storage.S3, logger)
87 if err != nil {
88 return fmt.Errorf("object storage: %w", err)
89 }
90 box, boxErr := secretbox.FromBase64(cfg.Auth.TOTPKeyB64)
91 if boxErr != nil {
92 logger.Warn("secretbox unavailable for encrypted worker payloads",
93 "hint", "set Auth.TOTPKeyB64 to a base64 32-byte key",
94 "error", boxErr)
95 }
96 var stripeRemote stripebilling.Remote
97 if cfg.Billing.Enabled {
98 remote, err := stripebilling.New(stripebilling.Config{
99 SecretKey: cfg.Billing.Stripe.SecretKey,
100 WebhookSecret: cfg.Billing.Stripe.WebhookSecret,
101 TeamPriceID: cfg.Billing.Stripe.TeamPriceID,
102 AutomaticTax: cfg.Billing.Stripe.AutomaticTax,
103 })
104 if err != nil {
105 return fmt.Errorf("billing: %w", err)
106 }
107 stripeRemote = remote
108 }
109
110 p := worker.NewPool(pool, worker.PoolConfig{
111 Workers: count,
112 IdlePoll: 5 * time.Second,
113 JobTimeout: 5 * time.Minute,
114 Logger: logger,
115 })
116 p.Register(worker.KindPushProcess, jobs.PushProcess(jobs.PushProcessDeps{
117 Pool: pool, RepoFS: rfs, Logger: logger,
118 }))
119 p.Register(worker.KindRepoSizeRecalc, jobs.RepoSizeRecalc(jobs.RepoSizeRecalcDeps{
120 Pool: pool, RepoFS: rfs, Logger: logger,
121 }))
122 p.Register(worker.KindJobsPurge, jobs.JobsPurge(jobs.JobsPurgeDeps{
123 Pool: pool, Logger: logger,
124 }))
125 p.Register(worker.KindLifecycleSweep, jobs.LifecycleSweep(jobs.LifecycleSweepDeps{
126 Pool: pool, RepoFS: rfs, Audit: auditRecorder(), Logger: logger,
127 }))
128 prDeps := jobs.PRJobsDeps{Pool: pool, RepoFS: rfs, Logger: logger}
129 p.Register(worker.KindPRSynchronize, jobs.PRSynchronize(prDeps))
130 p.Register(worker.KindPRMergeability, jobs.PRMergeability(prDeps))
131
132 shithubdPath, _ := shithubdBinaryPath()
133 p.Register(worker.KindRepoForkClone, jobs.RepoForkClone(jobs.ForkCloneDeps{
134 Pool: pool, RepoFS: rfs, Logger: logger, ShithubdPath: shithubdPath,
135 }))
136
137 p.Register(worker.KindRepoIndexCode, jobs.RepoIndexCode(jobs.IndexCodeDeps{
138 Pool: pool, RepoFS: rfs, Logger: logger,
139 }))
140 p.Register(worker.KindRepoIndexReconcile, jobs.RepoIndexReconcile(jobs.IndexReconcileDeps{
141 Pool: pool, Logger: logger,
142 }))
143 importDeps := jobs.OrgGitHubImportDeps{
144 Pool: pool, RepoFS: rfs, Box: box, Audit: auditRecorder(),
145 Limiter: throttle.NewLimiter(), Logger: logger, ShithubdPath: shithubdPath,
146 }
147 p.Register(worker.KindOrgGitHubImportDiscover, jobs.OrgGitHubImportDiscover(importDeps))
148 p.Register(worker.KindOrgGitHubImportRepo, jobs.OrgGitHubImportRepo(importDeps))
149 p.Register(worker.KindOrgBillingSeatSync, jobs.OrgBillingSeatSync(jobs.OrgBillingSeatSyncDeps{
150 Pool: pool, Logger: logger, Stripe: stripeRemote,
151 }))
152
153 notifSender, _ := pickNotifEmailSender(cfg)
154 p.Register(worker.KindNotifyFanout, jobs.NotifyFanout(jobs.NotifyFanoutDeps{
155 Pool: pool,
156 Logger: logger,
157 EmailSender: notifSender,
158 EmailFrom: cfg.Auth.EmailFrom,
159 SiteName: cfg.Auth.SiteName,
160 BaseURL: cfg.Auth.BaseURL,
161 UnsubscribeKey: notifUnsubscribeKey(cfg, logger),
162 }))
163 p.Register(worker.KindTrendingCompute, jobs.TrendingCompute(jobs.TrendingComputeDeps{
164 Pool: pool, Logger: logger,
165 }))
166
167 // Webhook delivery (S33). The fan-out drains domain_events
168 // past its own cursor; deliver runs per-row HTTP POSTs;
169 // purge-old prunes terminal rows past the retention window.
170 // We reuse the TOTP key as the at-rest secretbox key — both
171 // are encrypted-blob columns in the same trust domain.
172 if boxErr != nil {
173 logger.Warn("webhook: secretbox unavailable; webhook delivery disabled",
174 "hint", "set Auth.TOTPKeyB64 to a base64 32-byte key",
175 "error", boxErr)
176 } else {
177 p.Register(webhook.KindWebhookFanout, jobs.WebhookFanout(jobs.WebhookFanoutDeps{
178 Pool: pool, Logger: logger,
179 }))
180 p.Register(webhook.KindWebhookDeliver, jobs.WebhookDeliver(jobs.WebhookDeliverDeps{
181 Pool: pool,
182 Logger: logger,
183 SecretBox: box,
184 SSRF: webhook.DefaultSSRFConfig(),
185 }))
186 p.Register(webhook.KindWebhookPurgeOld, jobs.WebhookPurgeOld(jobs.WebhookPurgeOldDeps{
187 Pool: pool, Logger: logger, Retention: 30 * 24 * time.Hour,
188 }))
189 }
190
191 // Actions trigger pipeline (S41b). Discovers .shithub/workflows/
192 // at the head sha, parses each, matches against the triggering
193 // event, and enqueues workflow_runs for the matches. Runs sit
194 // in 'queued' status — no runner picks them up until S41c+.
195 p.Register(trigger.KindWorkflowTrigger, trigger.Handler(trigger.JobDeps{
196 Deps: trigger.Deps{Pool: pool, Logger: logger}, RepoFS: rfs,
197 }))
198 if objectStore != nil {
199 p.Register(finalize.KindWorkflowFinalizeStep, finalize.Handler(finalize.Deps{
200 Pool: pool, ObjectStore: objectStore, Logger: logger,
201 }))
202 } else {
203 logger.Info("actions: object storage not configured; workflow step log finalization disabled")
204 }
205 p.Register(cleanup.KindWorkflowCleanup, cleanup.Handler(cleanup.Deps{
206 Pool: pool, ObjectStore: objectStore, Logger: logger,
207 }))
208
209 return p.Run(ctx)
210 },
211 }
212
213 func init() {
214 workerCmd.Flags().Int("workers", 0, "Number of worker goroutines (default 4)")
215 rootCmd.AddCommand(workerCmd)
216 }
217
218 func buildWorkerObjectStore(s config.S3StorageConfig, logger *slog.Logger) (storage.ObjectStore, error) {
219 if s.Bucket == "" {
220 return nil, nil
221 }
222 if logger != nil {
223 logger.Info("storage: configuring object store for worker", "bucket", s.Bucket)
224 }
225 return storage.NewS3Store(storage.S3Config{
226 Endpoint: s.Endpoint,
227 Region: s.Region,
228 AccessKeyID: s.AccessKeyID,
229 SecretAccessKey: s.SecretAccessKey,
230 Bucket: s.Bucket,
231 UseSSL: s.UseSSL,
232 ForcePathStyle: s.ForcePathStyle,
233 })
234 }
235
236 // pickNotifEmailSender mirrors pickAdminEmailSender / pickEmailSender
237 // in the web binary. Kept local to the worker so failure to construct
238 // the sender doesn't kill the process — fan-out without email is a
239 // supported degraded mode (inbox rows still land).
240 func pickNotifEmailSender(cfg config.Config) (email.Sender, error) {
241 switch cfg.Auth.EmailBackend {
242 case "stdout":
243 return email.NewStdoutSender(os.Stdout), nil
244 case "smtp":
245 return &email.SMTPSender{
246 Addr: cfg.Auth.SMTP.Addr,
247 From: cfg.Auth.EmailFrom,
248 Username: cfg.Auth.SMTP.Username,
249 Password: cfg.Auth.SMTP.Password,
250 }, nil
251 case "postmark":
252 return &email.PostmarkSender{
253 ServerToken: cfg.Auth.Postmark.ServerToken,
254 From: cfg.Auth.EmailFrom,
255 }, nil
256 default:
257 return nil, errors.New("worker: unknown email_backend")
258 }
259 }
260
261 // notifUnsubscribeKey resolves the HMAC key used to sign one-click
262 // unsubscribe URLs. Operators set Notif.UnsubscribeKeyB64 in prod.
263 // In dev (key empty) we derive a deterministic 32-byte key from the
264 // session secret so unsubscribe links survive process restarts
265 // without operator action — and we log a loud warning so the
266 // derivation can't sneak into prod by accident.
267 func notifUnsubscribeKey(cfg config.Config, logger *slog.Logger) []byte {
268 if cfg.Notif.UnsubscribeKeyB64 != "" {
269 k, err := base64.StdEncoding.DecodeString(cfg.Notif.UnsubscribeKeyB64)
270 if err == nil && len(k) >= 16 {
271 return k
272 }
273 if logger != nil {
274 logger.Warn("notif: unsubscribe_key_b64 invalid; falling back to derived key",
275 "hint", "set Notif.UnsubscribeKeyB64 to base64-encoded 32+ random bytes")
276 }
277 }
278 if cfg.Session.KeyB64 != "" {
279 seed, err := base64.StdEncoding.DecodeString(cfg.Session.KeyB64)
280 if err == nil && len(seed) > 0 {
281 sum := sha256.Sum256(append([]byte("notif-unsub:"), seed...))
282 if logger != nil {
283 logger.Warn("notif: deriving unsubscribe key from session secret (dev fallback)",
284 "hint", "set Notif.UnsubscribeKeyB64 in prod")
285 }
286 return sum[:]
287 }
288 }
289 // Last-resort static dev key — links work but anyone with source
290 // access can mint them. Logged at WARN so operators notice in
291 // prod logs.
292 if logger != nil {
293 logger.Warn("notif: no key material — using static dev key",
294 "hint", "set Notif.UnsubscribeKeyB64 (or session.key_b64)")
295 }
296 return []byte("shithub-dev-unsub-static-key-32B")
297 }
298