Go · 2057 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 package jobs
4
5 import (
6 "context"
7 "encoding/json"
8 "log/slog"
9
10 "github.com/jackc/pgx/v5/pgxpool"
11
12 reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
13 "github.com/tenseleyFlow/shithub/internal/worker"
14 )
15
16 // IndexReconcileDeps wires the reconciler. Same shape as the rest
17 // of the index-adjacent jobs.
18 type IndexReconcileDeps struct {
19 Pool *pgxpool.Pool
20 Logger *slog.Logger
21 }
22
23 // reconcileBatch is the per-tick cap on how many drifted repos the
24 // reconciler enqueues. Bounded so a single reconciler tick can't
25 // spike the queue depth on a fresh deploy where every repo's
26 // `last_indexed_oid` is NULL.
27 const reconcileBatch = 100
28
29 // RepoIndexReconcile compares `repos.default_branch_oid` against
30 // `repos.last_indexed_oid` and enqueues a `repo:index_code` job for
31 // each drifted row. Self-throttling: only `reconcileBatch` repos
32 // per tick.
33 //
34 // Designed to be invoked from a cron schedule (e.g. every 5 minutes)
35 // once the cron framework lands. For now we expose it as a worker
36 // job so an operator can invoke it manually via the admin enqueue
37 // path, and the regular worker pool runs it on enqueue.
38 func RepoIndexReconcile(deps IndexReconcileDeps) worker.Handler {
39 return func(ctx context.Context, _ json.RawMessage) error {
40 repos, err := reposdb.New().ListReposNeedingReindex(ctx, deps.Pool, reconcileBatch)
41 if err != nil {
42 return err
43 }
44 for _, r := range repos {
45 if _, err := worker.Enqueue(
46 ctx, deps.Pool, worker.KindRepoIndexCode,
47 map[string]any{"repo_id": r.ID},
48 worker.EnqueueOptions{},
49 ); err != nil {
50 if deps.Logger != nil {
51 deps.Logger.WarnContext(ctx, "reconcile: enqueue failed",
52 "repo_id", r.ID, "error", err)
53 }
54 // Don't fail the whole reconciler — keep enqueuing
55 // the rest. A single transient enqueue failure
56 // shouldn't poison the batch.
57 }
58 }
59 if deps.Logger != nil && len(repos) > 0 {
60 deps.Logger.InfoContext(ctx, "reconcile: enqueued reindex jobs", "count", len(repos))
61 }
62 return nil
63 }
64 }
65