Go · 2562 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 package main
4
5 import (
6 "errors"
7 "fmt"
8 "log/slog"
9 "os"
10 "os/signal"
11 "path/filepath"
12 "strconv"
13 "syscall"
14 "time"
15
16 "github.com/spf13/cobra"
17
18 "github.com/tenseleyFlow/shithub/internal/infra/config"
19 "github.com/tenseleyFlow/shithub/internal/infra/db"
20 "github.com/tenseleyFlow/shithub/internal/infra/storage"
21 "github.com/tenseleyFlow/shithub/internal/worker"
22 "github.com/tenseleyFlow/shithub/internal/worker/jobs"
23 )
24
25 // workerCmd boots a long-running worker pool. SIGINT/SIGTERM trigger
26 // graceful shutdown: the LISTEN goroutine drops, claim attempts stop,
27 // in-flight jobs are given a deadline to finish, then the binary exits.
28 var workerCmd = &cobra.Command{
29 Use: "worker",
30 Short: "Run background workers (push processing, size recalc, purge)",
31 RunE: func(cmd *cobra.Command, _ []string) error {
32 workersFlag, _ := cmd.Flags().GetInt("workers")
33
34 cfg, err := config.Load(nil)
35 if err != nil {
36 return fmt.Errorf("config: %w", err)
37 }
38 if cfg.DB.URL == "" {
39 return errors.New("worker: SHITHUB_DATABASE_URL unset")
40 }
41 root, err := filepath.Abs(cfg.Storage.ReposRoot)
42 if err != nil {
43 return fmt.Errorf("repos_root: %w", err)
44 }
45 rfs, err := storage.NewRepoFS(root)
46 if err != nil {
47 return fmt.Errorf("repo fs: %w", err)
48 }
49
50 ctx, stop := signal.NotifyContext(cmd.Context(), os.Interrupt, syscall.SIGTERM)
51 defer stop()
52
53 // Worker count: flag overrides env override default.
54 count := workersFlag
55 if count <= 0 {
56 if v, _ := strconv.Atoi(os.Getenv("SHITHUB_WORKERS")); v > 0 {
57 count = v
58 }
59 }
60
61 pool, err := db.Open(ctx, db.Config{
62 URL: cfg.DB.URL, MaxConns: int32(count) + 2, MinConns: 1,
63 })
64 if err != nil {
65 return fmt.Errorf("db: %w", err)
66 }
67 defer pool.Close()
68
69 logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}))
70
71 p := worker.NewPool(pool, worker.PoolConfig{
72 Workers: count,
73 IdlePoll: 5 * time.Second,
74 JobTimeout: 5 * time.Minute,
75 Logger: logger,
76 })
77 p.Register(worker.KindPushProcess, jobs.PushProcess(jobs.PushProcessDeps{
78 Pool: pool, RepoFS: rfs, Logger: logger,
79 }))
80 p.Register(worker.KindRepoSizeRecalc, jobs.RepoSizeRecalc(jobs.RepoSizeRecalcDeps{
81 Pool: pool, RepoFS: rfs, Logger: logger,
82 }))
83 p.Register(worker.KindJobsPurge, jobs.JobsPurge(jobs.JobsPurgeDeps{
84 Pool: pool, Logger: logger,
85 }))
86
87 return p.Run(ctx)
88 },
89 }
90
91 func init() {
92 workerCmd.Flags().Int("workers", 0, "Number of worker goroutines (default 4)")
93 rootCmd.AddCommand(workerCmd)
94 }
95