| 1 | // SPDX-License-Identifier: AGPL-3.0-or-later |
| 2 | |
| 3 | package jobs |
| 4 | |
| 5 | import ( |
| 6 | "context" |
| 7 | "encoding/json" |
| 8 | "log/slog" |
| 9 | |
| 10 | "github.com/jackc/pgx/v5/pgxpool" |
| 11 | |
| 12 | reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc" |
| 13 | "github.com/tenseleyFlow/shithub/internal/worker" |
| 14 | ) |
| 15 | |
| 16 | // IndexReconcileDeps wires the reconciler. Same shape as the rest |
| 17 | // of the index-adjacent jobs. |
| 18 | type IndexReconcileDeps struct { |
| 19 | Pool *pgxpool.Pool |
| 20 | Logger *slog.Logger |
| 21 | } |
| 22 | |
| 23 | // reconcileBatch is the per-tick cap on how many drifted repos the |
| 24 | // reconciler enqueues. Bounded so a single reconciler tick can't |
| 25 | // spike the queue depth on a fresh deploy where every repo's |
| 26 | // `last_indexed_oid` is NULL. |
| 27 | const reconcileBatch = 100 |
| 28 | |
| 29 | // RepoIndexReconcile compares `repos.default_branch_oid` against |
| 30 | // `repos.last_indexed_oid` and enqueues a `repo:index_code` job for |
| 31 | // each drifted row. Self-throttling: only `reconcileBatch` repos |
| 32 | // per tick. |
| 33 | // |
| 34 | // Designed to be invoked from a cron schedule (e.g. every 5 minutes) |
| 35 | // once the cron framework lands. For now we expose it as a worker |
| 36 | // job so an operator can invoke it manually via the admin enqueue |
| 37 | // path, and the regular worker pool runs it on enqueue. |
| 38 | func RepoIndexReconcile(deps IndexReconcileDeps) worker.Handler { |
| 39 | return func(ctx context.Context, _ json.RawMessage) error { |
| 40 | repos, err := reposdb.New().ListReposNeedingReindex(ctx, deps.Pool, reconcileBatch) |
| 41 | if err != nil { |
| 42 | return err |
| 43 | } |
| 44 | for _, r := range repos { |
| 45 | if _, err := worker.Enqueue( |
| 46 | ctx, deps.Pool, worker.KindRepoIndexCode, |
| 47 | map[string]any{"repo_id": r.ID}, |
| 48 | worker.EnqueueOptions{}, |
| 49 | ); err != nil { |
| 50 | if deps.Logger != nil { |
| 51 | deps.Logger.WarnContext(ctx, "reconcile: enqueue failed", |
| 52 | "repo_id", r.ID, "error", err) |
| 53 | } |
| 54 | // Don't fail the whole reconciler — keep enqueuing |
| 55 | // the rest. A single transient enqueue failure |
| 56 | // shouldn't poison the batch. |
| 57 | } |
| 58 | } |
| 59 | if deps.Logger != nil && len(repos) > 0 { |
| 60 | deps.Logger.InfoContext(ctx, "reconcile: enqueued reindex jobs", "count", len(repos)) |
| 61 | } |
| 62 | return nil |
| 63 | } |
| 64 | } |
| 65 |