Go · 9344 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 package main
4
5 import (
6 "crypto/sha256"
7 "encoding/base64"
8 "errors"
9 "fmt"
10 "log/slog"
11 "os"
12 "os/signal"
13 "path/filepath"
14 "strconv"
15 "syscall"
16 "time"
17
18 "github.com/spf13/cobra"
19
20 "github.com/tenseleyFlow/shithub/internal/actions/finalize"
21 "github.com/tenseleyFlow/shithub/internal/actions/trigger"
22 "github.com/tenseleyFlow/shithub/internal/auth/audit"
23 "github.com/tenseleyFlow/shithub/internal/auth/email"
24 "github.com/tenseleyFlow/shithub/internal/auth/secretbox"
25 "github.com/tenseleyFlow/shithub/internal/infra/config"
26 "github.com/tenseleyFlow/shithub/internal/infra/db"
27 "github.com/tenseleyFlow/shithub/internal/infra/storage"
28 "github.com/tenseleyFlow/shithub/internal/webhook"
29 "github.com/tenseleyFlow/shithub/internal/worker"
30 "github.com/tenseleyFlow/shithub/internal/worker/jobs"
31 )
32
33 // auditRecorder returns the shared audit recorder. Kept as a function
34 // rather than a package-level value so future tests / non-default
35 // recorders can substitute via dependency injection.
36 func auditRecorder() *audit.Recorder { return audit.NewRecorder() }
37
38 // workerCmd boots a long-running worker pool. SIGINT/SIGTERM trigger
39 // graceful shutdown: the LISTEN goroutine drops, claim attempts stop,
40 // in-flight jobs are given a deadline to finish, then the binary exits.
41 var workerCmd = &cobra.Command{
42 Use: "worker",
43 Short: "Run background workers (push processing, size recalc, purge)",
44 RunE: func(cmd *cobra.Command, _ []string) error {
45 workersFlag, _ := cmd.Flags().GetInt("workers")
46
47 cfg, err := config.Load(nil)
48 if err != nil {
49 return fmt.Errorf("config: %w", err)
50 }
51 if cfg.DB.URL == "" {
52 return errors.New("worker: SHITHUB_DATABASE_URL unset")
53 }
54 root, err := filepath.Abs(cfg.Storage.ReposRoot)
55 if err != nil {
56 return fmt.Errorf("repos_root: %w", err)
57 }
58 rfs, err := storage.NewRepoFS(root)
59 if err != nil {
60 return fmt.Errorf("repo fs: %w", err)
61 }
62
63 ctx, stop := signal.NotifyContext(cmd.Context(), os.Interrupt, syscall.SIGTERM)
64 defer stop()
65
66 // Worker count: flag overrides env override default.
67 count := workersFlag
68 if count <= 0 {
69 if v, _ := strconv.Atoi(os.Getenv("SHITHUB_WORKERS")); v > 0 {
70 count = v
71 }
72 }
73
74 pool, err := db.Open(ctx, db.Config{
75 URL: cfg.DB.URL, MaxConns: int32(count) + 2, MinConns: 1,
76 })
77 if err != nil {
78 return fmt.Errorf("db: %w", err)
79 }
80 defer pool.Close()
81
82 logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}))
83 objectStore, err := buildWorkerObjectStore(cfg.Storage.S3, logger)
84 if err != nil {
85 return fmt.Errorf("object storage: %w", err)
86 }
87
88 p := worker.NewPool(pool, worker.PoolConfig{
89 Workers: count,
90 IdlePoll: 5 * time.Second,
91 JobTimeout: 5 * time.Minute,
92 Logger: logger,
93 })
94 p.Register(worker.KindPushProcess, jobs.PushProcess(jobs.PushProcessDeps{
95 Pool: pool, RepoFS: rfs, Logger: logger,
96 }))
97 p.Register(worker.KindRepoSizeRecalc, jobs.RepoSizeRecalc(jobs.RepoSizeRecalcDeps{
98 Pool: pool, RepoFS: rfs, Logger: logger,
99 }))
100 p.Register(worker.KindJobsPurge, jobs.JobsPurge(jobs.JobsPurgeDeps{
101 Pool: pool, Logger: logger,
102 }))
103 p.Register(worker.KindLifecycleSweep, jobs.LifecycleSweep(jobs.LifecycleSweepDeps{
104 Pool: pool, RepoFS: rfs, Audit: auditRecorder(), Logger: logger,
105 }))
106 prDeps := jobs.PRJobsDeps{Pool: pool, RepoFS: rfs, Logger: logger}
107 p.Register(worker.KindPRSynchronize, jobs.PRSynchronize(prDeps))
108 p.Register(worker.KindPRMergeability, jobs.PRMergeability(prDeps))
109
110 shithubdPath, _ := shithubdBinaryPath()
111 p.Register(worker.KindRepoForkClone, jobs.RepoForkClone(jobs.ForkCloneDeps{
112 Pool: pool, RepoFS: rfs, Logger: logger, ShithubdPath: shithubdPath,
113 }))
114
115 p.Register(worker.KindRepoIndexCode, jobs.RepoIndexCode(jobs.IndexCodeDeps{
116 Pool: pool, RepoFS: rfs, Logger: logger,
117 }))
118 p.Register(worker.KindRepoIndexReconcile, jobs.RepoIndexReconcile(jobs.IndexReconcileDeps{
119 Pool: pool, Logger: logger,
120 }))
121
122 notifSender, _ := pickNotifEmailSender(cfg)
123 p.Register(worker.KindNotifyFanout, jobs.NotifyFanout(jobs.NotifyFanoutDeps{
124 Pool: pool,
125 Logger: logger,
126 EmailSender: notifSender,
127 EmailFrom: cfg.Auth.EmailFrom,
128 SiteName: cfg.Auth.SiteName,
129 BaseURL: cfg.Auth.BaseURL,
130 UnsubscribeKey: notifUnsubscribeKey(cfg, logger),
131 }))
132 p.Register(worker.KindTrendingCompute, jobs.TrendingCompute(jobs.TrendingComputeDeps{
133 Pool: pool, Logger: logger,
134 }))
135
136 // Webhook delivery (S33). The fan-out drains domain_events
137 // past its own cursor; deliver runs per-row HTTP POSTs;
138 // purge-old prunes terminal rows past the retention window.
139 // We reuse the TOTP key as the at-rest secretbox key — both
140 // are encrypted-blob columns in the same trust domain.
141 hookBox, hookBoxErr := secretbox.FromBase64(cfg.Auth.TOTPKeyB64)
142 if hookBoxErr != nil {
143 logger.Warn("webhook: secretbox unavailable; webhook delivery disabled",
144 "hint", "set Auth.TOTPKeyB64 to a base64 32-byte key",
145 "error", hookBoxErr)
146 } else {
147 p.Register(webhook.KindWebhookFanout, jobs.WebhookFanout(jobs.WebhookFanoutDeps{
148 Pool: pool, Logger: logger,
149 }))
150 p.Register(webhook.KindWebhookDeliver, jobs.WebhookDeliver(jobs.WebhookDeliverDeps{
151 Pool: pool,
152 Logger: logger,
153 SecretBox: hookBox,
154 SSRF: webhook.DefaultSSRFConfig(),
155 }))
156 p.Register(webhook.KindWebhookPurgeOld, jobs.WebhookPurgeOld(jobs.WebhookPurgeOldDeps{
157 Pool: pool, Logger: logger, Retention: 30 * 24 * time.Hour,
158 }))
159 }
160
161 // Actions trigger pipeline (S41b). Discovers .shithub/workflows/
162 // at the head sha, parses each, matches against the triggering
163 // event, and enqueues workflow_runs for the matches. Runs sit
164 // in 'queued' status — no runner picks them up until S41c+.
165 p.Register(trigger.KindWorkflowTrigger, trigger.Handler(trigger.JobDeps{
166 Deps: trigger.Deps{Pool: pool, Logger: logger}, RepoFS: rfs,
167 }))
168 if objectStore != nil {
169 p.Register(finalize.KindWorkflowFinalizeStep, finalize.Handler(finalize.Deps{
170 Pool: pool, ObjectStore: objectStore, Logger: logger,
171 }))
172 } else {
173 logger.Info("actions: object storage not configured; workflow step log finalization disabled")
174 }
175
176 return p.Run(ctx)
177 },
178 }
179
180 func init() {
181 workerCmd.Flags().Int("workers", 0, "Number of worker goroutines (default 4)")
182 rootCmd.AddCommand(workerCmd)
183 }
184
185 func buildWorkerObjectStore(s config.S3StorageConfig, logger *slog.Logger) (storage.ObjectStore, error) {
186 if s.Bucket == "" {
187 return nil, nil
188 }
189 if logger != nil {
190 logger.Info("storage: configuring object store for worker", "bucket", s.Bucket)
191 }
192 return storage.NewS3Store(storage.S3Config{
193 Endpoint: s.Endpoint,
194 Region: s.Region,
195 AccessKeyID: s.AccessKeyID,
196 SecretAccessKey: s.SecretAccessKey,
197 Bucket: s.Bucket,
198 UseSSL: s.UseSSL,
199 ForcePathStyle: s.ForcePathStyle,
200 })
201 }
202
203 // pickNotifEmailSender mirrors pickAdminEmailSender / pickEmailSender
204 // in the web binary. Kept local to the worker so failure to construct
205 // the sender doesn't kill the process — fan-out without email is a
206 // supported degraded mode (inbox rows still land).
207 func pickNotifEmailSender(cfg config.Config) (email.Sender, error) {
208 switch cfg.Auth.EmailBackend {
209 case "stdout":
210 return email.NewStdoutSender(os.Stdout), nil
211 case "smtp":
212 return &email.SMTPSender{
213 Addr: cfg.Auth.SMTP.Addr,
214 From: cfg.Auth.EmailFrom,
215 Username: cfg.Auth.SMTP.Username,
216 Password: cfg.Auth.SMTP.Password,
217 }, nil
218 case "postmark":
219 return &email.PostmarkSender{
220 ServerToken: cfg.Auth.Postmark.ServerToken,
221 From: cfg.Auth.EmailFrom,
222 }, nil
223 default:
224 return nil, errors.New("worker: unknown email_backend")
225 }
226 }
227
228 // notifUnsubscribeKey resolves the HMAC key used to sign one-click
229 // unsubscribe URLs. Operators set Notif.UnsubscribeKeyB64 in prod.
230 // In dev (key empty) we derive a deterministic 32-byte key from the
231 // session secret so unsubscribe links survive process restarts
232 // without operator action — and we log a loud warning so the
233 // derivation can't sneak into prod by accident.
234 func notifUnsubscribeKey(cfg config.Config, logger *slog.Logger) []byte {
235 if cfg.Notif.UnsubscribeKeyB64 != "" {
236 k, err := base64.StdEncoding.DecodeString(cfg.Notif.UnsubscribeKeyB64)
237 if err == nil && len(k) >= 16 {
238 return k
239 }
240 if logger != nil {
241 logger.Warn("notif: unsubscribe_key_b64 invalid; falling back to derived key",
242 "hint", "set Notif.UnsubscribeKeyB64 to base64-encoded 32+ random bytes")
243 }
244 }
245 if cfg.Session.KeyB64 != "" {
246 seed, err := base64.StdEncoding.DecodeString(cfg.Session.KeyB64)
247 if err == nil && len(seed) > 0 {
248 sum := sha256.Sum256(append([]byte("notif-unsub:"), seed...))
249 if logger != nil {
250 logger.Warn("notif: deriving unsubscribe key from session secret (dev fallback)",
251 "hint", "set Notif.UnsubscribeKeyB64 in prod")
252 }
253 return sum[:]
254 }
255 }
256 // Last-resort static dev key — links work but anyone with source
257 // access can mint them. Logged at WARN so operators notice in
258 // prod logs.
259 if logger != nil {
260 logger.Warn("notif: no key material — using static dev key",
261 "hint", "set Notif.UnsubscribeKeyB64 (or session.key_b64)")
262 }
263 return []byte("shithub-dev-unsub-static-key-32B")
264 }
265