Go · 9918 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 package main
4
5 import (
6 "crypto/sha256"
7 "encoding/base64"
8 "errors"
9 "fmt"
10 "log/slog"
11 "os"
12 "os/signal"
13 "path/filepath"
14 "strconv"
15 "syscall"
16 "time"
17
18 "github.com/spf13/cobra"
19
20 "github.com/tenseleyFlow/shithub/internal/actions/finalize"
21 "github.com/tenseleyFlow/shithub/internal/actions/trigger"
22 "github.com/tenseleyFlow/shithub/internal/auth/audit"
23 "github.com/tenseleyFlow/shithub/internal/auth/email"
24 "github.com/tenseleyFlow/shithub/internal/auth/secretbox"
25 "github.com/tenseleyFlow/shithub/internal/auth/throttle"
26 "github.com/tenseleyFlow/shithub/internal/infra/config"
27 "github.com/tenseleyFlow/shithub/internal/infra/db"
28 "github.com/tenseleyFlow/shithub/internal/infra/storage"
29 "github.com/tenseleyFlow/shithub/internal/webhook"
30 "github.com/tenseleyFlow/shithub/internal/worker"
31 "github.com/tenseleyFlow/shithub/internal/worker/jobs"
32 )
33
34 // auditRecorder returns the shared audit recorder. Kept as a function
35 // rather than a package-level value so future tests / non-default
36 // recorders can substitute via dependency injection.
37 func auditRecorder() *audit.Recorder { return audit.NewRecorder() }
38
39 // workerCmd boots a long-running worker pool. SIGINT/SIGTERM trigger
40 // graceful shutdown: the LISTEN goroutine drops, claim attempts stop,
41 // in-flight jobs are given a deadline to finish, then the binary exits.
42 var workerCmd = &cobra.Command{
43 Use: "worker",
44 Short: "Run background workers (push processing, size recalc, purge)",
45 RunE: func(cmd *cobra.Command, _ []string) error {
46 workersFlag, _ := cmd.Flags().GetInt("workers")
47
48 cfg, err := config.Load(nil)
49 if err != nil {
50 return fmt.Errorf("config: %w", err)
51 }
52 if cfg.DB.URL == "" {
53 return errors.New("worker: SHITHUB_DATABASE_URL unset")
54 }
55 root, err := filepath.Abs(cfg.Storage.ReposRoot)
56 if err != nil {
57 return fmt.Errorf("repos_root: %w", err)
58 }
59 rfs, err := storage.NewRepoFS(root)
60 if err != nil {
61 return fmt.Errorf("repo fs: %w", err)
62 }
63
64 ctx, stop := signal.NotifyContext(cmd.Context(), os.Interrupt, syscall.SIGTERM)
65 defer stop()
66
67 // Worker count: flag overrides env override default.
68 count := workersFlag
69 if count <= 0 {
70 if v, _ := strconv.Atoi(os.Getenv("SHITHUB_WORKERS")); v > 0 {
71 count = v
72 }
73 }
74
75 pool, err := db.Open(ctx, db.Config{
76 URL: cfg.DB.URL, MaxConns: int32(count) + 2, MinConns: 1,
77 })
78 if err != nil {
79 return fmt.Errorf("db: %w", err)
80 }
81 defer pool.Close()
82
83 logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}))
84 objectStore, err := buildWorkerObjectStore(cfg.Storage.S3, logger)
85 if err != nil {
86 return fmt.Errorf("object storage: %w", err)
87 }
88 box, boxErr := secretbox.FromBase64(cfg.Auth.TOTPKeyB64)
89 if boxErr != nil {
90 logger.Warn("secretbox unavailable for encrypted worker payloads",
91 "hint", "set Auth.TOTPKeyB64 to a base64 32-byte key",
92 "error", boxErr)
93 }
94
95 p := worker.NewPool(pool, worker.PoolConfig{
96 Workers: count,
97 IdlePoll: 5 * time.Second,
98 JobTimeout: 5 * time.Minute,
99 Logger: logger,
100 })
101 p.Register(worker.KindPushProcess, jobs.PushProcess(jobs.PushProcessDeps{
102 Pool: pool, RepoFS: rfs, Logger: logger,
103 }))
104 p.Register(worker.KindRepoSizeRecalc, jobs.RepoSizeRecalc(jobs.RepoSizeRecalcDeps{
105 Pool: pool, RepoFS: rfs, Logger: logger,
106 }))
107 p.Register(worker.KindJobsPurge, jobs.JobsPurge(jobs.JobsPurgeDeps{
108 Pool: pool, Logger: logger,
109 }))
110 p.Register(worker.KindLifecycleSweep, jobs.LifecycleSweep(jobs.LifecycleSweepDeps{
111 Pool: pool, RepoFS: rfs, Audit: auditRecorder(), Logger: logger,
112 }))
113 prDeps := jobs.PRJobsDeps{Pool: pool, RepoFS: rfs, Logger: logger}
114 p.Register(worker.KindPRSynchronize, jobs.PRSynchronize(prDeps))
115 p.Register(worker.KindPRMergeability, jobs.PRMergeability(prDeps))
116
117 shithubdPath, _ := shithubdBinaryPath()
118 p.Register(worker.KindRepoForkClone, jobs.RepoForkClone(jobs.ForkCloneDeps{
119 Pool: pool, RepoFS: rfs, Logger: logger, ShithubdPath: shithubdPath,
120 }))
121
122 p.Register(worker.KindRepoIndexCode, jobs.RepoIndexCode(jobs.IndexCodeDeps{
123 Pool: pool, RepoFS: rfs, Logger: logger,
124 }))
125 p.Register(worker.KindRepoIndexReconcile, jobs.RepoIndexReconcile(jobs.IndexReconcileDeps{
126 Pool: pool, Logger: logger,
127 }))
128 importDeps := jobs.OrgGitHubImportDeps{
129 Pool: pool, RepoFS: rfs, Box: box, Audit: auditRecorder(),
130 Limiter: throttle.NewLimiter(), Logger: logger, ShithubdPath: shithubdPath,
131 }
132 p.Register(worker.KindOrgGitHubImportDiscover, jobs.OrgGitHubImportDiscover(importDeps))
133 p.Register(worker.KindOrgGitHubImportRepo, jobs.OrgGitHubImportRepo(importDeps))
134
135 notifSender, _ := pickNotifEmailSender(cfg)
136 p.Register(worker.KindNotifyFanout, jobs.NotifyFanout(jobs.NotifyFanoutDeps{
137 Pool: pool,
138 Logger: logger,
139 EmailSender: notifSender,
140 EmailFrom: cfg.Auth.EmailFrom,
141 SiteName: cfg.Auth.SiteName,
142 BaseURL: cfg.Auth.BaseURL,
143 UnsubscribeKey: notifUnsubscribeKey(cfg, logger),
144 }))
145 p.Register(worker.KindTrendingCompute, jobs.TrendingCompute(jobs.TrendingComputeDeps{
146 Pool: pool, Logger: logger,
147 }))
148
149 // Webhook delivery (S33). The fan-out drains domain_events
150 // past its own cursor; deliver runs per-row HTTP POSTs;
151 // purge-old prunes terminal rows past the retention window.
152 // We reuse the TOTP key as the at-rest secretbox key — both
153 // are encrypted-blob columns in the same trust domain.
154 if boxErr != nil {
155 logger.Warn("webhook: secretbox unavailable; webhook delivery disabled",
156 "hint", "set Auth.TOTPKeyB64 to a base64 32-byte key",
157 "error", boxErr)
158 } else {
159 p.Register(webhook.KindWebhookFanout, jobs.WebhookFanout(jobs.WebhookFanoutDeps{
160 Pool: pool, Logger: logger,
161 }))
162 p.Register(webhook.KindWebhookDeliver, jobs.WebhookDeliver(jobs.WebhookDeliverDeps{
163 Pool: pool,
164 Logger: logger,
165 SecretBox: box,
166 SSRF: webhook.DefaultSSRFConfig(),
167 }))
168 p.Register(webhook.KindWebhookPurgeOld, jobs.WebhookPurgeOld(jobs.WebhookPurgeOldDeps{
169 Pool: pool, Logger: logger, Retention: 30 * 24 * time.Hour,
170 }))
171 }
172
173 // Actions trigger pipeline (S41b). Discovers .shithub/workflows/
174 // at the head sha, parses each, matches against the triggering
175 // event, and enqueues workflow_runs for the matches. Runs sit
176 // in 'queued' status — no runner picks them up until S41c+.
177 p.Register(trigger.KindWorkflowTrigger, trigger.Handler(trigger.JobDeps{
178 Deps: trigger.Deps{Pool: pool, Logger: logger}, RepoFS: rfs,
179 }))
180 if objectStore != nil {
181 p.Register(finalize.KindWorkflowFinalizeStep, finalize.Handler(finalize.Deps{
182 Pool: pool, ObjectStore: objectStore, Logger: logger,
183 }))
184 } else {
185 logger.Info("actions: object storage not configured; workflow step log finalization disabled")
186 }
187
188 return p.Run(ctx)
189 },
190 }
191
192 func init() {
193 workerCmd.Flags().Int("workers", 0, "Number of worker goroutines (default 4)")
194 rootCmd.AddCommand(workerCmd)
195 }
196
197 func buildWorkerObjectStore(s config.S3StorageConfig, logger *slog.Logger) (storage.ObjectStore, error) {
198 if s.Bucket == "" {
199 return nil, nil
200 }
201 if logger != nil {
202 logger.Info("storage: configuring object store for worker", "bucket", s.Bucket)
203 }
204 return storage.NewS3Store(storage.S3Config{
205 Endpoint: s.Endpoint,
206 Region: s.Region,
207 AccessKeyID: s.AccessKeyID,
208 SecretAccessKey: s.SecretAccessKey,
209 Bucket: s.Bucket,
210 UseSSL: s.UseSSL,
211 ForcePathStyle: s.ForcePathStyle,
212 })
213 }
214
215 // pickNotifEmailSender mirrors pickAdminEmailSender / pickEmailSender
216 // in the web binary. Kept local to the worker so failure to construct
217 // the sender doesn't kill the process — fan-out without email is a
218 // supported degraded mode (inbox rows still land).
219 func pickNotifEmailSender(cfg config.Config) (email.Sender, error) {
220 switch cfg.Auth.EmailBackend {
221 case "stdout":
222 return email.NewStdoutSender(os.Stdout), nil
223 case "smtp":
224 return &email.SMTPSender{
225 Addr: cfg.Auth.SMTP.Addr,
226 From: cfg.Auth.EmailFrom,
227 Username: cfg.Auth.SMTP.Username,
228 Password: cfg.Auth.SMTP.Password,
229 }, nil
230 case "postmark":
231 return &email.PostmarkSender{
232 ServerToken: cfg.Auth.Postmark.ServerToken,
233 From: cfg.Auth.EmailFrom,
234 }, nil
235 default:
236 return nil, errors.New("worker: unknown email_backend")
237 }
238 }
239
240 // notifUnsubscribeKey resolves the HMAC key used to sign one-click
241 // unsubscribe URLs. Operators set Notif.UnsubscribeKeyB64 in prod.
242 // In dev (key empty) we derive a deterministic 32-byte key from the
243 // session secret so unsubscribe links survive process restarts
244 // without operator action — and we log a loud warning so the
245 // derivation can't sneak into prod by accident.
246 func notifUnsubscribeKey(cfg config.Config, logger *slog.Logger) []byte {
247 if cfg.Notif.UnsubscribeKeyB64 != "" {
248 k, err := base64.StdEncoding.DecodeString(cfg.Notif.UnsubscribeKeyB64)
249 if err == nil && len(k) >= 16 {
250 return k
251 }
252 if logger != nil {
253 logger.Warn("notif: unsubscribe_key_b64 invalid; falling back to derived key",
254 "hint", "set Notif.UnsubscribeKeyB64 to base64-encoded 32+ random bytes")
255 }
256 }
257 if cfg.Session.KeyB64 != "" {
258 seed, err := base64.StdEncoding.DecodeString(cfg.Session.KeyB64)
259 if err == nil && len(seed) > 0 {
260 sum := sha256.Sum256(append([]byte("notif-unsub:"), seed...))
261 if logger != nil {
262 logger.Warn("notif: deriving unsubscribe key from session secret (dev fallback)",
263 "hint", "set Notif.UnsubscribeKeyB64 in prod")
264 }
265 return sum[:]
266 }
267 }
268 // Last-resort static dev key — links work but anyone with source
269 // access can mint them. Logged at WARN so operators notice in
270 // prod logs.
271 if logger != nil {
272 logger.Warn("notif: no key material — using static dev key",
273 "hint", "set Notif.UnsubscribeKeyB64 (or session.key_b64)")
274 }
275 return []byte("shithub-dev-unsub-static-key-32B")
276 }
277