| 1 | // SPDX-License-Identifier: AGPL-3.0-or-later |
| 2 | |
| 3 | package main |
| 4 | |
| 5 | import ( |
| 6 | "crypto/sha256" |
| 7 | "encoding/base64" |
| 8 | "errors" |
| 9 | "fmt" |
| 10 | "log/slog" |
| 11 | "os" |
| 12 | "os/signal" |
| 13 | "path/filepath" |
| 14 | "strconv" |
| 15 | "syscall" |
| 16 | "time" |
| 17 | |
| 18 | "github.com/spf13/cobra" |
| 19 | |
| 20 | "github.com/tenseleyFlow/shithub/internal/actions/cleanup" |
| 21 | "github.com/tenseleyFlow/shithub/internal/actions/finalize" |
| 22 | "github.com/tenseleyFlow/shithub/internal/actions/trigger" |
| 23 | "github.com/tenseleyFlow/shithub/internal/auth/audit" |
| 24 | "github.com/tenseleyFlow/shithub/internal/auth/email" |
| 25 | "github.com/tenseleyFlow/shithub/internal/auth/secretbox" |
| 26 | "github.com/tenseleyFlow/shithub/internal/auth/throttle" |
| 27 | "github.com/tenseleyFlow/shithub/internal/billing/stripebilling" |
| 28 | "github.com/tenseleyFlow/shithub/internal/infra/config" |
| 29 | "github.com/tenseleyFlow/shithub/internal/infra/db" |
| 30 | "github.com/tenseleyFlow/shithub/internal/infra/storage" |
| 31 | "github.com/tenseleyFlow/shithub/internal/webhook" |
| 32 | "github.com/tenseleyFlow/shithub/internal/worker" |
| 33 | "github.com/tenseleyFlow/shithub/internal/worker/jobs" |
| 34 | ) |
| 35 | |
| 36 | // auditRecorder returns the shared audit recorder. Kept as a function |
| 37 | // rather than a package-level value so future tests / non-default |
| 38 | // recorders can substitute via dependency injection. |
| 39 | func auditRecorder() *audit.Recorder { return audit.NewRecorder() } |
| 40 | |
| 41 | // workerCmd boots a long-running worker pool. SIGINT/SIGTERM trigger |
| 42 | // graceful shutdown: the LISTEN goroutine drops, claim attempts stop, |
| 43 | // in-flight jobs are given a deadline to finish, then the binary exits. |
| 44 | var workerCmd = &cobra.Command{ |
| 45 | Use: "worker", |
| 46 | Short: "Run background workers (push processing, size recalc, purge)", |
| 47 | RunE: func(cmd *cobra.Command, _ []string) error { |
| 48 | workersFlag, _ := cmd.Flags().GetInt("workers") |
| 49 | |
| 50 | cfg, err := config.Load(nil) |
| 51 | if err != nil { |
| 52 | return fmt.Errorf("config: %w", err) |
| 53 | } |
| 54 | if cfg.DB.URL == "" { |
| 55 | return errors.New("worker: SHITHUB_DATABASE_URL unset") |
| 56 | } |
| 57 | root, err := filepath.Abs(cfg.Storage.ReposRoot) |
| 58 | if err != nil { |
| 59 | return fmt.Errorf("repos_root: %w", err) |
| 60 | } |
| 61 | rfs, err := storage.NewRepoFS(root) |
| 62 | if err != nil { |
| 63 | return fmt.Errorf("repo fs: %w", err) |
| 64 | } |
| 65 | |
| 66 | ctx, stop := signal.NotifyContext(cmd.Context(), os.Interrupt, syscall.SIGTERM) |
| 67 | defer stop() |
| 68 | |
| 69 | // Worker count: flag overrides env override default. |
| 70 | count := workersFlag |
| 71 | if count <= 0 { |
| 72 | if v, _ := strconv.Atoi(os.Getenv("SHITHUB_WORKERS")); v > 0 { |
| 73 | count = v |
| 74 | } |
| 75 | } |
| 76 | |
| 77 | pool, err := db.Open(ctx, db.Config{ |
| 78 | URL: cfg.DB.URL, MaxConns: int32(count) + 2, MinConns: 1, |
| 79 | }) |
| 80 | if err != nil { |
| 81 | return fmt.Errorf("db: %w", err) |
| 82 | } |
| 83 | defer pool.Close() |
| 84 | |
| 85 | logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo})) |
| 86 | objectStore, err := buildWorkerObjectStore(cfg.Storage.S3, logger) |
| 87 | if err != nil { |
| 88 | return fmt.Errorf("object storage: %w", err) |
| 89 | } |
| 90 | box, boxErr := secretbox.FromBase64(cfg.Auth.TOTPKeyB64) |
| 91 | if boxErr != nil { |
| 92 | logger.Warn("secretbox unavailable for encrypted worker payloads", |
| 93 | "hint", "set Auth.TOTPKeyB64 to a base64 32-byte key", |
| 94 | "error", boxErr) |
| 95 | } |
| 96 | var stripeRemote stripebilling.Remote |
| 97 | if cfg.Billing.Enabled { |
| 98 | remote, err := stripebilling.New(stripebilling.Config{ |
| 99 | SecretKey: cfg.Billing.Stripe.SecretKey, |
| 100 | WebhookSecret: cfg.Billing.Stripe.WebhookSecret, |
| 101 | TeamPriceID: cfg.Billing.Stripe.TeamPriceID, |
| 102 | AutomaticTax: cfg.Billing.Stripe.AutomaticTax, |
| 103 | }) |
| 104 | if err != nil { |
| 105 | return fmt.Errorf("billing: %w", err) |
| 106 | } |
| 107 | stripeRemote = remote |
| 108 | } |
| 109 | |
| 110 | p := worker.NewPool(pool, worker.PoolConfig{ |
| 111 | Workers: count, |
| 112 | IdlePoll: 5 * time.Second, |
| 113 | JobTimeout: 5 * time.Minute, |
| 114 | Logger: logger, |
| 115 | }) |
| 116 | p.Register(worker.KindPushProcess, jobs.PushProcess(jobs.PushProcessDeps{ |
| 117 | Pool: pool, RepoFS: rfs, Logger: logger, |
| 118 | })) |
| 119 | p.Register(worker.KindRepoSizeRecalc, jobs.RepoSizeRecalc(jobs.RepoSizeRecalcDeps{ |
| 120 | Pool: pool, RepoFS: rfs, Logger: logger, |
| 121 | })) |
| 122 | p.Register(worker.KindJobsPurge, jobs.JobsPurge(jobs.JobsPurgeDeps{ |
| 123 | Pool: pool, Logger: logger, |
| 124 | })) |
| 125 | p.Register(worker.KindLifecycleSweep, jobs.LifecycleSweep(jobs.LifecycleSweepDeps{ |
| 126 | Pool: pool, RepoFS: rfs, Audit: auditRecorder(), Logger: logger, |
| 127 | })) |
| 128 | prDeps := jobs.PRJobsDeps{Pool: pool, RepoFS: rfs, Logger: logger} |
| 129 | p.Register(worker.KindPRSynchronize, jobs.PRSynchronize(prDeps)) |
| 130 | p.Register(worker.KindPRMergeability, jobs.PRMergeability(prDeps)) |
| 131 | |
| 132 | shithubdPath, _ := shithubdBinaryPath() |
| 133 | p.Register(worker.KindRepoForkClone, jobs.RepoForkClone(jobs.ForkCloneDeps{ |
| 134 | Pool: pool, RepoFS: rfs, Logger: logger, ShithubdPath: shithubdPath, |
| 135 | })) |
| 136 | |
| 137 | p.Register(worker.KindRepoIndexCode, jobs.RepoIndexCode(jobs.IndexCodeDeps{ |
| 138 | Pool: pool, RepoFS: rfs, Logger: logger, |
| 139 | })) |
| 140 | p.Register(worker.KindRepoIndexReconcile, jobs.RepoIndexReconcile(jobs.IndexReconcileDeps{ |
| 141 | Pool: pool, Logger: logger, |
| 142 | })) |
| 143 | importDeps := jobs.OrgGitHubImportDeps{ |
| 144 | Pool: pool, RepoFS: rfs, Box: box, Audit: auditRecorder(), |
| 145 | Limiter: throttle.NewLimiter(), Logger: logger, ShithubdPath: shithubdPath, |
| 146 | } |
| 147 | p.Register(worker.KindOrgGitHubImportDiscover, jobs.OrgGitHubImportDiscover(importDeps)) |
| 148 | p.Register(worker.KindOrgGitHubImportRepo, jobs.OrgGitHubImportRepo(importDeps)) |
| 149 | p.Register(worker.KindOrgBillingSeatSync, jobs.OrgBillingSeatSync(jobs.OrgBillingSeatSyncDeps{ |
| 150 | Pool: pool, Logger: logger, Stripe: stripeRemote, |
| 151 | })) |
| 152 | |
| 153 | p.Register(worker.KindGPGBackfill, jobs.GPGBackfill(jobs.GPGBackfillDeps{ |
| 154 | Pool: pool, RepoFS: rfs, Logger: logger, |
| 155 | })) |
| 156 | |
| 157 | notifSender, _ := pickNotifEmailSender(cfg) |
| 158 | p.Register(worker.KindNotifyFanout, jobs.NotifyFanout(jobs.NotifyFanoutDeps{ |
| 159 | Pool: pool, |
| 160 | Logger: logger, |
| 161 | EmailSender: notifSender, |
| 162 | EmailFrom: cfg.Auth.EmailFrom, |
| 163 | SiteName: cfg.Auth.SiteName, |
| 164 | BaseURL: cfg.Auth.BaseURL, |
| 165 | UnsubscribeKey: notifUnsubscribeKey(cfg, logger), |
| 166 | })) |
| 167 | p.Register(worker.KindTrendingCompute, jobs.TrendingCompute(jobs.TrendingComputeDeps{ |
| 168 | Pool: pool, Logger: logger, |
| 169 | })) |
| 170 | |
| 171 | // Webhook delivery (S33). The fan-out drains domain_events |
| 172 | // past its own cursor; deliver runs per-row HTTP POSTs; |
| 173 | // purge-old prunes terminal rows past the retention window. |
| 174 | // We reuse the TOTP key as the at-rest secretbox key — both |
| 175 | // are encrypted-blob columns in the same trust domain. |
| 176 | if boxErr != nil { |
| 177 | logger.Warn("webhook: secretbox unavailable; webhook delivery disabled", |
| 178 | "hint", "set Auth.TOTPKeyB64 to a base64 32-byte key", |
| 179 | "error", boxErr) |
| 180 | } else { |
| 181 | p.Register(webhook.KindWebhookFanout, jobs.WebhookFanout(jobs.WebhookFanoutDeps{ |
| 182 | Pool: pool, Logger: logger, |
| 183 | })) |
| 184 | p.Register(webhook.KindWebhookDeliver, jobs.WebhookDeliver(jobs.WebhookDeliverDeps{ |
| 185 | Pool: pool, |
| 186 | Logger: logger, |
| 187 | SecretBox: box, |
| 188 | SSRF: webhook.DefaultSSRFConfig(), |
| 189 | })) |
| 190 | p.Register(webhook.KindWebhookPurgeOld, jobs.WebhookPurgeOld(jobs.WebhookPurgeOldDeps{ |
| 191 | Pool: pool, Logger: logger, Retention: 30 * 24 * time.Hour, |
| 192 | })) |
| 193 | } |
| 194 | |
| 195 | // Actions trigger pipeline (S41b). Discovers .shithub/workflows/ |
| 196 | // at the head sha, parses each, matches against the triggering |
| 197 | // event, and enqueues workflow_runs for the matches. Runs sit |
| 198 | // in 'queued' status — no runner picks them up until S41c+. |
| 199 | p.Register(trigger.KindWorkflowTrigger, trigger.Handler(trigger.JobDeps{ |
| 200 | Deps: trigger.Deps{Pool: pool, Logger: logger}, RepoFS: rfs, |
| 201 | })) |
| 202 | if objectStore != nil { |
| 203 | p.Register(finalize.KindWorkflowFinalizeStep, finalize.Handler(finalize.Deps{ |
| 204 | Pool: pool, ObjectStore: objectStore, Logger: logger, |
| 205 | })) |
| 206 | } else { |
| 207 | logger.Info("actions: object storage not configured; workflow step log finalization disabled") |
| 208 | } |
| 209 | p.Register(cleanup.KindWorkflowCleanup, cleanup.Handler(cleanup.Deps{ |
| 210 | Pool: pool, ObjectStore: objectStore, Logger: logger, |
| 211 | })) |
| 212 | |
| 213 | return p.Run(ctx) |
| 214 | }, |
| 215 | } |
| 216 | |
| 217 | func init() { |
| 218 | workerCmd.Flags().Int("workers", 0, "Number of worker goroutines (default 4)") |
| 219 | rootCmd.AddCommand(workerCmd) |
| 220 | } |
| 221 | |
| 222 | func buildWorkerObjectStore(s config.S3StorageConfig, logger *slog.Logger) (storage.ObjectStore, error) { |
| 223 | if s.Bucket == "" { |
| 224 | return nil, nil |
| 225 | } |
| 226 | if logger != nil { |
| 227 | logger.Info("storage: configuring object store for worker", "bucket", s.Bucket) |
| 228 | } |
| 229 | return storage.NewS3Store(storage.S3Config{ |
| 230 | Endpoint: s.Endpoint, |
| 231 | Region: s.Region, |
| 232 | AccessKeyID: s.AccessKeyID, |
| 233 | SecretAccessKey: s.SecretAccessKey, |
| 234 | Bucket: s.Bucket, |
| 235 | UseSSL: s.UseSSL, |
| 236 | ForcePathStyle: s.ForcePathStyle, |
| 237 | }) |
| 238 | } |
| 239 | |
| 240 | // pickNotifEmailSender mirrors pickAdminEmailSender / pickEmailSender |
| 241 | // in the web binary. Kept local to the worker so failure to construct |
| 242 | // the sender doesn't kill the process — fan-out without email is a |
| 243 | // supported degraded mode (inbox rows still land). |
| 244 | func pickNotifEmailSender(cfg config.Config) (email.Sender, error) { |
| 245 | switch cfg.Auth.EmailBackend { |
| 246 | case "stdout": |
| 247 | return email.NewStdoutSender(os.Stdout), nil |
| 248 | case "smtp": |
| 249 | return &email.SMTPSender{ |
| 250 | Addr: cfg.Auth.SMTP.Addr, |
| 251 | From: cfg.Auth.EmailFrom, |
| 252 | Username: cfg.Auth.SMTP.Username, |
| 253 | Password: cfg.Auth.SMTP.Password, |
| 254 | }, nil |
| 255 | case "postmark": |
| 256 | return &email.PostmarkSender{ |
| 257 | ServerToken: cfg.Auth.Postmark.ServerToken, |
| 258 | From: cfg.Auth.EmailFrom, |
| 259 | }, nil |
| 260 | case "resend": |
| 261 | return &email.ResendSender{ |
| 262 | APIKey: cfg.Auth.Resend.APIKey, |
| 263 | From: cfg.Auth.EmailFrom, |
| 264 | }, nil |
| 265 | default: |
| 266 | return nil, errors.New("worker: unknown email_backend") |
| 267 | } |
| 268 | } |
| 269 | |
| 270 | // notifUnsubscribeKey resolves the HMAC key used to sign one-click |
| 271 | // unsubscribe URLs. Operators set Notif.UnsubscribeKeyB64 in prod. |
| 272 | // In dev (key empty) we derive a deterministic 32-byte key from the |
| 273 | // session secret so unsubscribe links survive process restarts |
| 274 | // without operator action — and we log a loud warning so the |
| 275 | // derivation can't sneak into prod by accident. |
| 276 | func notifUnsubscribeKey(cfg config.Config, logger *slog.Logger) []byte { |
| 277 | if cfg.Notif.UnsubscribeKeyB64 != "" { |
| 278 | k, err := base64.StdEncoding.DecodeString(cfg.Notif.UnsubscribeKeyB64) |
| 279 | if err == nil && len(k) >= 16 { |
| 280 | return k |
| 281 | } |
| 282 | if logger != nil { |
| 283 | logger.Warn("notif: unsubscribe_key_b64 invalid; falling back to derived key", |
| 284 | "hint", "set Notif.UnsubscribeKeyB64 to base64-encoded 32+ random bytes") |
| 285 | } |
| 286 | } |
| 287 | if cfg.Session.KeyB64 != "" { |
| 288 | seed, err := base64.StdEncoding.DecodeString(cfg.Session.KeyB64) |
| 289 | if err == nil && len(seed) > 0 { |
| 290 | sum := sha256.Sum256(append([]byte("notif-unsub:"), seed...)) |
| 291 | if logger != nil { |
| 292 | logger.Warn("notif: deriving unsubscribe key from session secret (dev fallback)", |
| 293 | "hint", "set Notif.UnsubscribeKeyB64 in prod") |
| 294 | } |
| 295 | return sum[:] |
| 296 | } |
| 297 | } |
| 298 | // Last-resort static dev key — links work but anyone with source |
| 299 | // access can mint them. Logged at WARN so operators notice in |
| 300 | // prod logs. |
| 301 | if logger != nil { |
| 302 | logger.Warn("notif: no key material — using static dev key", |
| 303 | "hint", "set Notif.UnsubscribeKeyB64 (or session.key_b64)") |
| 304 | } |
| 305 | return []byte("shithub-dev-unsub-static-key-32B") |
| 306 | } |
| 307 |