Go · 8202 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 package main
4
5 import (
6 "crypto/sha256"
7 "encoding/base64"
8 "errors"
9 "fmt"
10 "log/slog"
11 "os"
12 "os/signal"
13 "path/filepath"
14 "strconv"
15 "syscall"
16 "time"
17
18 "github.com/spf13/cobra"
19
20 "github.com/tenseleyFlow/shithub/internal/actions/trigger"
21 "github.com/tenseleyFlow/shithub/internal/auth/audit"
22 "github.com/tenseleyFlow/shithub/internal/auth/email"
23 "github.com/tenseleyFlow/shithub/internal/auth/secretbox"
24 "github.com/tenseleyFlow/shithub/internal/infra/config"
25 "github.com/tenseleyFlow/shithub/internal/infra/db"
26 "github.com/tenseleyFlow/shithub/internal/infra/storage"
27 "github.com/tenseleyFlow/shithub/internal/webhook"
28 "github.com/tenseleyFlow/shithub/internal/worker"
29 "github.com/tenseleyFlow/shithub/internal/worker/jobs"
30 )
31
32 // auditRecorder returns the shared audit recorder. Kept as a function
33 // rather than a package-level value so future tests / non-default
34 // recorders can substitute via dependency injection.
35 func auditRecorder() *audit.Recorder { return audit.NewRecorder() }
36
37 // workerCmd boots a long-running worker pool. SIGINT/SIGTERM trigger
38 // graceful shutdown: the LISTEN goroutine drops, claim attempts stop,
39 // in-flight jobs are given a deadline to finish, then the binary exits.
40 var workerCmd = &cobra.Command{
41 Use: "worker",
42 Short: "Run background workers (push processing, size recalc, purge)",
43 RunE: func(cmd *cobra.Command, _ []string) error {
44 workersFlag, _ := cmd.Flags().GetInt("workers")
45
46 cfg, err := config.Load(nil)
47 if err != nil {
48 return fmt.Errorf("config: %w", err)
49 }
50 if cfg.DB.URL == "" {
51 return errors.New("worker: SHITHUB_DATABASE_URL unset")
52 }
53 root, err := filepath.Abs(cfg.Storage.ReposRoot)
54 if err != nil {
55 return fmt.Errorf("repos_root: %w", err)
56 }
57 rfs, err := storage.NewRepoFS(root)
58 if err != nil {
59 return fmt.Errorf("repo fs: %w", err)
60 }
61
62 ctx, stop := signal.NotifyContext(cmd.Context(), os.Interrupt, syscall.SIGTERM)
63 defer stop()
64
65 // Worker count: flag overrides env override default.
66 count := workersFlag
67 if count <= 0 {
68 if v, _ := strconv.Atoi(os.Getenv("SHITHUB_WORKERS")); v > 0 {
69 count = v
70 }
71 }
72
73 pool, err := db.Open(ctx, db.Config{
74 URL: cfg.DB.URL, MaxConns: int32(count) + 2, MinConns: 1,
75 })
76 if err != nil {
77 return fmt.Errorf("db: %w", err)
78 }
79 defer pool.Close()
80
81 logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}))
82
83 p := worker.NewPool(pool, worker.PoolConfig{
84 Workers: count,
85 IdlePoll: 5 * time.Second,
86 JobTimeout: 5 * time.Minute,
87 Logger: logger,
88 })
89 p.Register(worker.KindPushProcess, jobs.PushProcess(jobs.PushProcessDeps{
90 Pool: pool, RepoFS: rfs, Logger: logger,
91 }))
92 p.Register(worker.KindRepoSizeRecalc, jobs.RepoSizeRecalc(jobs.RepoSizeRecalcDeps{
93 Pool: pool, RepoFS: rfs, Logger: logger,
94 }))
95 p.Register(worker.KindJobsPurge, jobs.JobsPurge(jobs.JobsPurgeDeps{
96 Pool: pool, Logger: logger,
97 }))
98 p.Register(worker.KindLifecycleSweep, jobs.LifecycleSweep(jobs.LifecycleSweepDeps{
99 Pool: pool, RepoFS: rfs, Audit: auditRecorder(), Logger: logger,
100 }))
101 prDeps := jobs.PRJobsDeps{Pool: pool, RepoFS: rfs, Logger: logger}
102 p.Register(worker.KindPRSynchronize, jobs.PRSynchronize(prDeps))
103 p.Register(worker.KindPRMergeability, jobs.PRMergeability(prDeps))
104
105 shithubdPath, _ := shithubdBinaryPath()
106 p.Register(worker.KindRepoForkClone, jobs.RepoForkClone(jobs.ForkCloneDeps{
107 Pool: pool, RepoFS: rfs, Logger: logger, ShithubdPath: shithubdPath,
108 }))
109
110 p.Register(worker.KindRepoIndexCode, jobs.RepoIndexCode(jobs.IndexCodeDeps{
111 Pool: pool, RepoFS: rfs, Logger: logger,
112 }))
113 p.Register(worker.KindRepoIndexReconcile, jobs.RepoIndexReconcile(jobs.IndexReconcileDeps{
114 Pool: pool, Logger: logger,
115 }))
116
117 notifSender, _ := pickNotifEmailSender(cfg)
118 p.Register(worker.KindNotifyFanout, jobs.NotifyFanout(jobs.NotifyFanoutDeps{
119 Pool: pool,
120 Logger: logger,
121 EmailSender: notifSender,
122 EmailFrom: cfg.Auth.EmailFrom,
123 SiteName: cfg.Auth.SiteName,
124 BaseURL: cfg.Auth.BaseURL,
125 UnsubscribeKey: notifUnsubscribeKey(cfg, logger),
126 }))
127
128 // Webhook delivery (S33). The fan-out drains domain_events
129 // past its own cursor; deliver runs per-row HTTP POSTs;
130 // purge-old prunes terminal rows past the retention window.
131 // We reuse the TOTP key as the at-rest secretbox key — both
132 // are encrypted-blob columns in the same trust domain.
133 hookBox, hookBoxErr := secretbox.FromBase64(cfg.Auth.TOTPKeyB64)
134 if hookBoxErr != nil {
135 logger.Warn("webhook: secretbox unavailable; webhook delivery disabled",
136 "hint", "set Auth.TOTPKeyB64 to a base64 32-byte key",
137 "error", hookBoxErr)
138 } else {
139 p.Register(webhook.KindWebhookFanout, jobs.WebhookFanout(jobs.WebhookFanoutDeps{
140 Pool: pool, Logger: logger,
141 }))
142 p.Register(webhook.KindWebhookDeliver, jobs.WebhookDeliver(jobs.WebhookDeliverDeps{
143 Pool: pool,
144 Logger: logger,
145 SecretBox: hookBox,
146 SSRF: webhook.DefaultSSRFConfig(),
147 }))
148 p.Register(webhook.KindWebhookPurgeOld, jobs.WebhookPurgeOld(jobs.WebhookPurgeOldDeps{
149 Pool: pool, Logger: logger, Retention: 30 * 24 * time.Hour,
150 }))
151 }
152
153 // Actions trigger pipeline (S41b). Discovers .shithub/workflows/
154 // at the head sha, parses each, matches against the triggering
155 // event, and enqueues workflow_runs for the matches. Runs sit
156 // in 'queued' status — no runner picks them up until S41c+.
157 p.Register(trigger.KindWorkflowTrigger, trigger.Handler(trigger.JobDeps{
158 Deps: trigger.Deps{Pool: pool, Logger: logger}, RepoFS: rfs,
159 }))
160
161 return p.Run(ctx)
162 },
163 }
164
165 func init() {
166 workerCmd.Flags().Int("workers", 0, "Number of worker goroutines (default 4)")
167 rootCmd.AddCommand(workerCmd)
168 }
169
170 // pickNotifEmailSender mirrors pickAdminEmailSender / pickEmailSender
171 // in the web binary. Kept local to the worker so failure to construct
172 // the sender doesn't kill the process — fan-out without email is a
173 // supported degraded mode (inbox rows still land).
174 func pickNotifEmailSender(cfg config.Config) (email.Sender, error) {
175 switch cfg.Auth.EmailBackend {
176 case "stdout":
177 return email.NewStdoutSender(os.Stdout), nil
178 case "smtp":
179 return &email.SMTPSender{
180 Addr: cfg.Auth.SMTP.Addr,
181 From: cfg.Auth.EmailFrom,
182 Username: cfg.Auth.SMTP.Username,
183 Password: cfg.Auth.SMTP.Password,
184 }, nil
185 case "postmark":
186 return &email.PostmarkSender{
187 ServerToken: cfg.Auth.Postmark.ServerToken,
188 From: cfg.Auth.EmailFrom,
189 }, nil
190 default:
191 return nil, errors.New("worker: unknown email_backend")
192 }
193 }
194
195 // notifUnsubscribeKey resolves the HMAC key used to sign one-click
196 // unsubscribe URLs. Operators set Notif.UnsubscribeKeyB64 in prod.
197 // In dev (key empty) we derive a deterministic 32-byte key from the
198 // session secret so unsubscribe links survive process restarts
199 // without operator action — and we log a loud warning so the
200 // derivation can't sneak into prod by accident.
201 func notifUnsubscribeKey(cfg config.Config, logger *slog.Logger) []byte {
202 if cfg.Notif.UnsubscribeKeyB64 != "" {
203 k, err := base64.StdEncoding.DecodeString(cfg.Notif.UnsubscribeKeyB64)
204 if err == nil && len(k) >= 16 {
205 return k
206 }
207 if logger != nil {
208 logger.Warn("notif: unsubscribe_key_b64 invalid; falling back to derived key",
209 "hint", "set Notif.UnsubscribeKeyB64 to base64-encoded 32+ random bytes")
210 }
211 }
212 if cfg.Session.KeyB64 != "" {
213 seed, err := base64.StdEncoding.DecodeString(cfg.Session.KeyB64)
214 if err == nil && len(seed) > 0 {
215 sum := sha256.Sum256(append([]byte("notif-unsub:"), seed...))
216 if logger != nil {
217 logger.Warn("notif: deriving unsubscribe key from session secret (dev fallback)",
218 "hint", "set Notif.UnsubscribeKeyB64 in prod")
219 }
220 return sum[:]
221 }
222 }
223 // Last-resort static dev key — links work but anyone with source
224 // access can mint them. Logged at WARN so operators notice in
225 // prod logs.
226 if logger != nil {
227 logger.Warn("notif: no key material — using static dev key",
228 "hint", "set Notif.UnsubscribeKeyB64 (or session.key_b64)")
229 }
230 return []byte("shithub-dev-unsub-static-key-32B")
231 }
232