Go · 10113 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 package main
4
5 import (
6 "crypto/sha256"
7 "encoding/base64"
8 "errors"
9 "fmt"
10 "log/slog"
11 "os"
12 "os/signal"
13 "path/filepath"
14 "strconv"
15 "syscall"
16 "time"
17
18 "github.com/spf13/cobra"
19
20 "github.com/tenseleyFlow/shithub/internal/actions/cleanup"
21 "github.com/tenseleyFlow/shithub/internal/actions/finalize"
22 "github.com/tenseleyFlow/shithub/internal/actions/trigger"
23 "github.com/tenseleyFlow/shithub/internal/auth/audit"
24 "github.com/tenseleyFlow/shithub/internal/auth/email"
25 "github.com/tenseleyFlow/shithub/internal/auth/secretbox"
26 "github.com/tenseleyFlow/shithub/internal/auth/throttle"
27 "github.com/tenseleyFlow/shithub/internal/infra/config"
28 "github.com/tenseleyFlow/shithub/internal/infra/db"
29 "github.com/tenseleyFlow/shithub/internal/infra/storage"
30 "github.com/tenseleyFlow/shithub/internal/webhook"
31 "github.com/tenseleyFlow/shithub/internal/worker"
32 "github.com/tenseleyFlow/shithub/internal/worker/jobs"
33 )
34
35 // auditRecorder returns the shared audit recorder. Kept as a function
36 // rather than a package-level value so future tests / non-default
37 // recorders can substitute via dependency injection.
38 func auditRecorder() *audit.Recorder { return audit.NewRecorder() }
39
40 // workerCmd boots a long-running worker pool. SIGINT/SIGTERM trigger
41 // graceful shutdown: the LISTEN goroutine drops, claim attempts stop,
42 // in-flight jobs are given a deadline to finish, then the binary exits.
43 var workerCmd = &cobra.Command{
44 Use: "worker",
45 Short: "Run background workers (push processing, size recalc, purge)",
46 RunE: func(cmd *cobra.Command, _ []string) error {
47 workersFlag, _ := cmd.Flags().GetInt("workers")
48
49 cfg, err := config.Load(nil)
50 if err != nil {
51 return fmt.Errorf("config: %w", err)
52 }
53 if cfg.DB.URL == "" {
54 return errors.New("worker: SHITHUB_DATABASE_URL unset")
55 }
56 root, err := filepath.Abs(cfg.Storage.ReposRoot)
57 if err != nil {
58 return fmt.Errorf("repos_root: %w", err)
59 }
60 rfs, err := storage.NewRepoFS(root)
61 if err != nil {
62 return fmt.Errorf("repo fs: %w", err)
63 }
64
65 ctx, stop := signal.NotifyContext(cmd.Context(), os.Interrupt, syscall.SIGTERM)
66 defer stop()
67
68 // Worker count: flag overrides env override default.
69 count := workersFlag
70 if count <= 0 {
71 if v, _ := strconv.Atoi(os.Getenv("SHITHUB_WORKERS")); v > 0 {
72 count = v
73 }
74 }
75
76 pool, err := db.Open(ctx, db.Config{
77 URL: cfg.DB.URL, MaxConns: int32(count) + 2, MinConns: 1,
78 })
79 if err != nil {
80 return fmt.Errorf("db: %w", err)
81 }
82 defer pool.Close()
83
84 logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}))
85 objectStore, err := buildWorkerObjectStore(cfg.Storage.S3, logger)
86 if err != nil {
87 return fmt.Errorf("object storage: %w", err)
88 }
89 box, boxErr := secretbox.FromBase64(cfg.Auth.TOTPKeyB64)
90 if boxErr != nil {
91 logger.Warn("secretbox unavailable for encrypted worker payloads",
92 "hint", "set Auth.TOTPKeyB64 to a base64 32-byte key",
93 "error", boxErr)
94 }
95
96 p := worker.NewPool(pool, worker.PoolConfig{
97 Workers: count,
98 IdlePoll: 5 * time.Second,
99 JobTimeout: 5 * time.Minute,
100 Logger: logger,
101 })
102 p.Register(worker.KindPushProcess, jobs.PushProcess(jobs.PushProcessDeps{
103 Pool: pool, RepoFS: rfs, Logger: logger,
104 }))
105 p.Register(worker.KindRepoSizeRecalc, jobs.RepoSizeRecalc(jobs.RepoSizeRecalcDeps{
106 Pool: pool, RepoFS: rfs, Logger: logger,
107 }))
108 p.Register(worker.KindJobsPurge, jobs.JobsPurge(jobs.JobsPurgeDeps{
109 Pool: pool, Logger: logger,
110 }))
111 p.Register(worker.KindLifecycleSweep, jobs.LifecycleSweep(jobs.LifecycleSweepDeps{
112 Pool: pool, RepoFS: rfs, Audit: auditRecorder(), Logger: logger,
113 }))
114 prDeps := jobs.PRJobsDeps{Pool: pool, RepoFS: rfs, Logger: logger}
115 p.Register(worker.KindPRSynchronize, jobs.PRSynchronize(prDeps))
116 p.Register(worker.KindPRMergeability, jobs.PRMergeability(prDeps))
117
118 shithubdPath, _ := shithubdBinaryPath()
119 p.Register(worker.KindRepoForkClone, jobs.RepoForkClone(jobs.ForkCloneDeps{
120 Pool: pool, RepoFS: rfs, Logger: logger, ShithubdPath: shithubdPath,
121 }))
122
123 p.Register(worker.KindRepoIndexCode, jobs.RepoIndexCode(jobs.IndexCodeDeps{
124 Pool: pool, RepoFS: rfs, Logger: logger,
125 }))
126 p.Register(worker.KindRepoIndexReconcile, jobs.RepoIndexReconcile(jobs.IndexReconcileDeps{
127 Pool: pool, Logger: logger,
128 }))
129 importDeps := jobs.OrgGitHubImportDeps{
130 Pool: pool, RepoFS: rfs, Box: box, Audit: auditRecorder(),
131 Limiter: throttle.NewLimiter(), Logger: logger, ShithubdPath: shithubdPath,
132 }
133 p.Register(worker.KindOrgGitHubImportDiscover, jobs.OrgGitHubImportDiscover(importDeps))
134 p.Register(worker.KindOrgGitHubImportRepo, jobs.OrgGitHubImportRepo(importDeps))
135
136 notifSender, _ := pickNotifEmailSender(cfg)
137 p.Register(worker.KindNotifyFanout, jobs.NotifyFanout(jobs.NotifyFanoutDeps{
138 Pool: pool,
139 Logger: logger,
140 EmailSender: notifSender,
141 EmailFrom: cfg.Auth.EmailFrom,
142 SiteName: cfg.Auth.SiteName,
143 BaseURL: cfg.Auth.BaseURL,
144 UnsubscribeKey: notifUnsubscribeKey(cfg, logger),
145 }))
146 p.Register(worker.KindTrendingCompute, jobs.TrendingCompute(jobs.TrendingComputeDeps{
147 Pool: pool, Logger: logger,
148 }))
149
150 // Webhook delivery (S33). The fan-out drains domain_events
151 // past its own cursor; deliver runs per-row HTTP POSTs;
152 // purge-old prunes terminal rows past the retention window.
153 // We reuse the TOTP key as the at-rest secretbox key — both
154 // are encrypted-blob columns in the same trust domain.
155 if boxErr != nil {
156 logger.Warn("webhook: secretbox unavailable; webhook delivery disabled",
157 "hint", "set Auth.TOTPKeyB64 to a base64 32-byte key",
158 "error", boxErr)
159 } else {
160 p.Register(webhook.KindWebhookFanout, jobs.WebhookFanout(jobs.WebhookFanoutDeps{
161 Pool: pool, Logger: logger,
162 }))
163 p.Register(webhook.KindWebhookDeliver, jobs.WebhookDeliver(jobs.WebhookDeliverDeps{
164 Pool: pool,
165 Logger: logger,
166 SecretBox: box,
167 SSRF: webhook.DefaultSSRFConfig(),
168 }))
169 p.Register(webhook.KindWebhookPurgeOld, jobs.WebhookPurgeOld(jobs.WebhookPurgeOldDeps{
170 Pool: pool, Logger: logger, Retention: 30 * 24 * time.Hour,
171 }))
172 }
173
174 // Actions trigger pipeline (S41b). Discovers .shithub/workflows/
175 // at the head sha, parses each, matches against the triggering
176 // event, and enqueues workflow_runs for the matches. Runs sit
177 // in 'queued' status — no runner picks them up until S41c+.
178 p.Register(trigger.KindWorkflowTrigger, trigger.Handler(trigger.JobDeps{
179 Deps: trigger.Deps{Pool: pool, Logger: logger}, RepoFS: rfs,
180 }))
181 if objectStore != nil {
182 p.Register(finalize.KindWorkflowFinalizeStep, finalize.Handler(finalize.Deps{
183 Pool: pool, ObjectStore: objectStore, Logger: logger,
184 }))
185 } else {
186 logger.Info("actions: object storage not configured; workflow step log finalization disabled")
187 }
188 p.Register(cleanup.KindWorkflowCleanup, cleanup.Handler(cleanup.Deps{
189 Pool: pool, ObjectStore: objectStore, Logger: logger,
190 }))
191
192 return p.Run(ctx)
193 },
194 }
195
196 func init() {
197 workerCmd.Flags().Int("workers", 0, "Number of worker goroutines (default 4)")
198 rootCmd.AddCommand(workerCmd)
199 }
200
201 func buildWorkerObjectStore(s config.S3StorageConfig, logger *slog.Logger) (storage.ObjectStore, error) {
202 if s.Bucket == "" {
203 return nil, nil
204 }
205 if logger != nil {
206 logger.Info("storage: configuring object store for worker", "bucket", s.Bucket)
207 }
208 return storage.NewS3Store(storage.S3Config{
209 Endpoint: s.Endpoint,
210 Region: s.Region,
211 AccessKeyID: s.AccessKeyID,
212 SecretAccessKey: s.SecretAccessKey,
213 Bucket: s.Bucket,
214 UseSSL: s.UseSSL,
215 ForcePathStyle: s.ForcePathStyle,
216 })
217 }
218
219 // pickNotifEmailSender mirrors pickAdminEmailSender / pickEmailSender
220 // in the web binary. Kept local to the worker so failure to construct
221 // the sender doesn't kill the process — fan-out without email is a
222 // supported degraded mode (inbox rows still land).
223 func pickNotifEmailSender(cfg config.Config) (email.Sender, error) {
224 switch cfg.Auth.EmailBackend {
225 case "stdout":
226 return email.NewStdoutSender(os.Stdout), nil
227 case "smtp":
228 return &email.SMTPSender{
229 Addr: cfg.Auth.SMTP.Addr,
230 From: cfg.Auth.EmailFrom,
231 Username: cfg.Auth.SMTP.Username,
232 Password: cfg.Auth.SMTP.Password,
233 }, nil
234 case "postmark":
235 return &email.PostmarkSender{
236 ServerToken: cfg.Auth.Postmark.ServerToken,
237 From: cfg.Auth.EmailFrom,
238 }, nil
239 default:
240 return nil, errors.New("worker: unknown email_backend")
241 }
242 }
243
244 // notifUnsubscribeKey resolves the HMAC key used to sign one-click
245 // unsubscribe URLs. Operators set Notif.UnsubscribeKeyB64 in prod.
246 // In dev (key empty) we derive a deterministic 32-byte key from the
247 // session secret so unsubscribe links survive process restarts
248 // without operator action — and we log a loud warning so the
249 // derivation can't sneak into prod by accident.
250 func notifUnsubscribeKey(cfg config.Config, logger *slog.Logger) []byte {
251 if cfg.Notif.UnsubscribeKeyB64 != "" {
252 k, err := base64.StdEncoding.DecodeString(cfg.Notif.UnsubscribeKeyB64)
253 if err == nil && len(k) >= 16 {
254 return k
255 }
256 if logger != nil {
257 logger.Warn("notif: unsubscribe_key_b64 invalid; falling back to derived key",
258 "hint", "set Notif.UnsubscribeKeyB64 to base64-encoded 32+ random bytes")
259 }
260 }
261 if cfg.Session.KeyB64 != "" {
262 seed, err := base64.StdEncoding.DecodeString(cfg.Session.KeyB64)
263 if err == nil && len(seed) > 0 {
264 sum := sha256.Sum256(append([]byte("notif-unsub:"), seed...))
265 if logger != nil {
266 logger.Warn("notif: deriving unsubscribe key from session secret (dev fallback)",
267 "hint", "set Notif.UnsubscribeKeyB64 in prod")
268 }
269 return sum[:]
270 }
271 }
272 // Last-resort static dev key — links work but anyone with source
273 // access can mint them. Logged at WARN so operators notice in
274 // prod logs.
275 if logger != nil {
276 logger.Warn("notif: no key material — using static dev key",
277 "hint", "set Notif.UnsubscribeKeyB64 (or session.key_b64)")
278 }
279 return []byte("shithub-dev-unsub-static-key-32B")
280 }
281