tenseleyflow/shithub / 204a28d

Browse files

S28: code-index + reconciler jobs; push:process kicks index_code

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
204a28d01a65b90b79c73c323074f5402b001c7a
Parents
6251924
Tree
2c52382

5 changed files

StatusFile+-
M cmd/shithubd/worker.go 7 0
M internal/worker/jobs/push_process.go 11 0
A internal/worker/jobs/repo_index_code.go 225 0
A internal/worker/jobs/repo_index_reconcile.go 63 0
M internal/worker/types.go 9 0
cmd/shithubd/worker.gomodified
@@ -101,6 +101,13 @@ var workerCmd = &cobra.Command{
101101
 			Pool: pool, RepoFS: rfs, Logger: logger, ShithubdPath: shithubdPath,
102102
 		}))
103103
 
104
+		p.Register(worker.KindRepoIndexCode, jobs.RepoIndexCode(jobs.IndexCodeDeps{
105
+			Pool: pool, RepoFS: rfs, Logger: logger,
106
+		}))
107
+		p.Register(worker.KindRepoIndexReconcile, jobs.RepoIndexReconcile(jobs.IndexReconcileDeps{
108
+			Pool: pool, Logger: logger,
109
+		}))
110
+
104111
 		return p.Run(ctx)
105112
 	},
106113
 }
internal/worker/jobs/push_process.gomodified
@@ -97,6 +97,17 @@ func PushProcess(deps PushProcessDeps) worker.Handler {
9797
 						DefaultBranchOid: pgtype.Text{String: newOID, Valid: true},
9898
 					})
9999
 				}
100
+				// S28: kick a code-search reindex of the new
101
+				// default-branch tip. The reindex job is idempotent
102
+				// + atomic-swap, so a concurrent push that lands
103
+				// before this finishes simply gets re-indexed on
104
+				// the NEXT push. The reconciler heals any drift.
105
+				if _, err := worker.Enqueue(ctx, deps.Pool, worker.KindRepoIndexCode,
106
+					map[string]any{"repo_id": repo.ID},
107
+					worker.EnqueueOptions{}); err != nil {
108
+					deps.Logger.WarnContext(ctx, "push:process: enqueue index_code",
109
+						"push_event_id", event.ID, "error", err)
110
+				}
100111
 			}
101112
 		}
102113
 
internal/worker/jobs/repo_index_code.goadded
@@ -0,0 +1,225 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package jobs
4
+
5
+import (
6
+	"bytes"
7
+	"context"
8
+	"encoding/json"
9
+	"errors"
10
+	"fmt"
11
+	"log/slog"
12
+	"strings"
13
+
14
+	"github.com/jackc/pgx/v5"
15
+	"github.com/jackc/pgx/v5/pgtype"
16
+	"github.com/jackc/pgx/v5/pgxpool"
17
+
18
+	"github.com/tenseleyFlow/shithub/internal/infra/storage"
19
+	repogit "github.com/tenseleyFlow/shithub/internal/repos/git"
20
+	reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
21
+	usersdb "github.com/tenseleyFlow/shithub/internal/users/sqlc"
22
+	"github.com/tenseleyFlow/shithub/internal/worker"
23
+)
24
+
25
+// IndexCodeDeps wires the job.
26
+type IndexCodeDeps struct {
27
+	Pool   *pgxpool.Pool
28
+	RepoFS *storage.RepoFS
29
+	Logger *slog.Logger
30
+}
31
+
32
+// IndexCodePayload is enqueued by the push-process job (or the
33
+// reconciler) when the default branch advances.
34
+type IndexCodePayload struct {
35
+	RepoID int64 `json:"repo_id"`
36
+}
37
+
38
+// Code-search size limits, per the spec:
39
+//   - Files > maxFileBytes are skipped from content indexing (path
40
+//     stays indexed regardless).
41
+//   - Per-file indexed content is truncated to maxIndexBytes so the
42
+//     trigram column doesn't bloat for huge text files.
43
+const (
44
+	maxFileBytes  = 256 * 1024 // 256 KiB
45
+	maxIndexBytes = 64 * 1024  // 64 KiB
46
+)
47
+
48
+// RepoIndexCode walks the repo's default branch tree, indexes paths
49
+// for every file and content for files that fit the size + textness
50
+// gates, then atomically swaps the new index in via delete-then-
51
+// insert in a single tx. Readers never see a partial index.
52
+//
53
+// Tracks `repos.last_indexed_oid` so the reconciler can detect drift.
54
+func RepoIndexCode(deps IndexCodeDeps) worker.Handler {
55
+	return func(ctx context.Context, raw json.RawMessage) error {
56
+		var p IndexCodePayload
57
+		if err := json.Unmarshal(raw, &p); err != nil {
58
+			return worker.PoisonError(fmt.Errorf("bad payload: %w", err))
59
+		}
60
+		if p.RepoID == 0 {
61
+			return worker.PoisonError(errors.New("missing repo_id"))
62
+		}
63
+
64
+		rq := reposdb.New()
65
+		uq := usersdb.New()
66
+		repo, err := rq.GetRepoByID(ctx, deps.Pool, p.RepoID)
67
+		if err != nil {
68
+			if errors.Is(err, pgx.ErrNoRows) {
69
+				return worker.PoisonError(fmt.Errorf("repo %d not found", p.RepoID))
70
+			}
71
+			return err
72
+		}
73
+		if repo.DeletedAt.Valid {
74
+			// Repo went away between enqueue and now. Nothing to do.
75
+			return nil
76
+		}
77
+		if !repo.OwnerUserID.Valid {
78
+			return worker.PoisonError(fmt.Errorf("repo %d has no user owner (org-owned arrives in S31)", repo.ID))
79
+		}
80
+		owner, err := uq.GetUserByID(ctx, deps.Pool, repo.OwnerUserID.Int64)
81
+		if err != nil {
82
+			return err
83
+		}
84
+		gitDir, err := deps.RepoFS.RepoPath(owner.Username, repo.Name)
85
+		if err != nil {
86
+			return err
87
+		}
88
+
89
+		ref := repo.DefaultBranch
90
+		// Resolve the current OID for the default branch. If empty
91
+		// (no commits yet) clear any prior index and bail.
92
+		oid, err := repogit.ResolveRefOID(ctx, gitDir, ref)
93
+		if err != nil {
94
+			if errors.Is(err, repogit.ErrRefNotFound) {
95
+				if err := clearRepoIndex(ctx, deps.Pool, repo.ID); err != nil {
96
+					return err
97
+				}
98
+				_ = rq.SetLastIndexedOID(ctx, deps.Pool, reposdb.SetLastIndexedOIDParams{
99
+					ID: repo.ID, LastIndexedOid: pgtype.Text{Valid: false},
100
+				})
101
+				return nil
102
+			}
103
+			return fmt.Errorf("resolve %s: %w", ref, err)
104
+		}
105
+
106
+		// Walk the tree.
107
+		paths, err := repogit.ListAllPaths(ctx, gitDir, ref)
108
+		if err != nil {
109
+			return fmt.Errorf("ls-tree: %w", err)
110
+		}
111
+
112
+		// Read each blob; classify size + text. Skipped paths still
113
+		// land in the path index (only content is gated by size).
114
+		type indexed struct {
115
+			path    string
116
+			content []byte // empty if skipped from content index
117
+		}
118
+		entries := make([]indexed, 0, len(paths))
119
+		for _, path := range paths {
120
+			if shouldSkipPath(path) {
121
+				continue
122
+			}
123
+			ent := indexed{path: path}
124
+			blob, err := repogit.ReadBlobBytes(ctx, gitDir, ref, path, maxFileBytes+1)
125
+			if err == nil && len(blob) <= maxFileBytes && isText(blob) {
126
+				if len(blob) > maxIndexBytes {
127
+					blob = blob[:maxIndexBytes]
128
+				}
129
+				ent.content = blob
130
+			}
131
+			entries = append(entries, ent)
132
+		}
133
+
134
+		// Atomic swap: delete + insert in one tx.
135
+		tx, err := deps.Pool.Begin(ctx)
136
+		if err != nil {
137
+			return err
138
+		}
139
+		committed := false
140
+		defer func() {
141
+			if !committed {
142
+				_ = tx.Rollback(ctx)
143
+			}
144
+		}()
145
+		if _, err := tx.Exec(ctx,
146
+			`DELETE FROM code_search_paths WHERE repo_id = $1`, repo.ID); err != nil {
147
+			return err
148
+		}
149
+		if _, err := tx.Exec(ctx,
150
+			`DELETE FROM code_search_content WHERE repo_id = $1`, repo.ID); err != nil {
151
+			return err
152
+		}
153
+		for _, e := range entries {
154
+			if _, err := tx.Exec(ctx, `
155
+				INSERT INTO code_search_paths (repo_id, ref_name, path, tsv)
156
+				VALUES ($1, $2, $3, to_tsvector('shithub_search', $3))
157
+				ON CONFLICT DO NOTHING
158
+			`, repo.ID, ref, e.path); err != nil {
159
+				return fmt.Errorf("insert path %s: %w", e.path, err)
160
+			}
161
+			if e.content != nil {
162
+				content := string(e.content)
163
+				if _, err := tx.Exec(ctx, `
164
+					INSERT INTO code_search_content
165
+					    (repo_id, ref_name, path, content_tsv, content_trgm)
166
+					VALUES ($1, $2, $3, to_tsvector('shithub_search', $4), $4)
167
+					ON CONFLICT DO NOTHING
168
+				`, repo.ID, ref, e.path, content); err != nil {
169
+					return fmt.Errorf("insert content %s: %w", e.path, err)
170
+				}
171
+			}
172
+		}
173
+		if err := rq.SetLastIndexedOID(ctx, tx, reposdb.SetLastIndexedOIDParams{
174
+			ID: repo.ID, LastIndexedOid: pgtype.Text{String: oid, Valid: true},
175
+		}); err != nil {
176
+			return err
177
+		}
178
+		if err := tx.Commit(ctx); err != nil {
179
+			return err
180
+		}
181
+		committed = true
182
+		return nil
183
+	}
184
+}
185
+
186
+// clearRepoIndex drops every code-search row for a repo. Used when
187
+// the default branch is gone (deleted, or never created) so we
188
+// don't leave stale rows behind.
189
+func clearRepoIndex(ctx context.Context, pool *pgxpool.Pool, repoID int64) error {
190
+	if _, err := pool.Exec(ctx,
191
+		`DELETE FROM code_search_paths WHERE repo_id = $1`, repoID); err != nil {
192
+		return err
193
+	}
194
+	if _, err := pool.Exec(ctx,
195
+		`DELETE FROM code_search_content WHERE repo_id = $1`, repoID); err != nil {
196
+		return err
197
+	}
198
+	return nil
199
+}
200
+
201
+// shouldSkipPath filters out paths the spec calls out as
202
+// "skipped by default": vendor/, node_modules/, dist/, .git*. The
203
+// `path:` operator (post-MVP) lets users opt into them.
204
+func shouldSkipPath(p string) bool {
205
+	if strings.HasPrefix(p, ".git") {
206
+		return true
207
+	}
208
+	for _, prefix := range []string{"vendor/", "node_modules/", "dist/"} {
209
+		if strings.HasPrefix(p, prefix) || strings.Contains(p, "/"+prefix) {
210
+			return true
211
+		}
212
+	}
213
+	return false
214
+}
215
+
216
+// isText is the same NUL-byte heuristic S17 uses for the blob view:
217
+// any NUL in the first 8 KiB → binary; otherwise text. Cheap, good
218
+// enough for code-search content gating.
219
+func isText(b []byte) bool {
220
+	limit := len(b)
221
+	if limit > 8192 {
222
+		limit = 8192
223
+	}
224
+	return bytes.IndexByte(b[:limit], 0) < 0
225
+}
internal/worker/jobs/repo_index_reconcile.goadded
@@ -0,0 +1,63 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package jobs
4
+
5
+import (
6
+	"context"
7
+	"encoding/json"
8
+	"log/slog"
9
+
10
+	"github.com/jackc/pgx/v5/pgxpool"
11
+
12
+	reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
13
+	"github.com/tenseleyFlow/shithub/internal/worker"
14
+)
15
+
16
+// IndexReconcileDeps wires the reconciler. Same shape as the rest
17
+// of the index-adjacent jobs.
18
+type IndexReconcileDeps struct {
19
+	Pool   *pgxpool.Pool
20
+	Logger *slog.Logger
21
+}
22
+
23
+// reconcileBatch is the per-tick cap on how many drifted repos the
24
+// reconciler enqueues. Bounded so a single reconciler tick can't
25
+// spike the queue depth on a fresh deploy where every repo's
26
+// `last_indexed_oid` is NULL.
27
+const reconcileBatch = 100
28
+
29
+// RepoIndexReconcile compares `repos.default_branch_oid` against
30
+// `repos.last_indexed_oid` and enqueues a `repo:index_code` job for
31
+// each drifted row. Self-throttling: only `reconcileBatch` repos
32
+// per tick.
33
+//
34
+// Designed to be invoked from a cron schedule (e.g. every 5 minutes)
35
+// once the cron framework lands. For now we expose it as a worker
36
+// job so an operator can invoke it manually via the admin enqueue
37
+// path, and the regular worker pool runs it on enqueue.
38
+func RepoIndexReconcile(deps IndexReconcileDeps) worker.Handler {
39
+	return func(ctx context.Context, _ json.RawMessage) error {
40
+		repos, err := reposdb.New().ListReposNeedingReindex(ctx, deps.Pool, reconcileBatch)
41
+		if err != nil {
42
+			return err
43
+		}
44
+		for _, r := range repos {
45
+			if _, err := worker.Enqueue(ctx, deps.Pool, worker.KindRepoIndexCode,
46
+				map[string]any{"repo_id": r.ID},
47
+				worker.EnqueueOptions{},
48
+			); err != nil {
49
+				if deps.Logger != nil {
50
+					deps.Logger.WarnContext(ctx, "reconcile: enqueue failed",
51
+						"repo_id", r.ID, "error", err)
52
+				}
53
+				// Don't fail the whole reconciler — keep enqueuing
54
+				// the rest. A single transient enqueue failure
55
+				// shouldn't poison the batch.
56
+			}
57
+		}
58
+		if deps.Logger != nil && len(repos) > 0 {
59
+			deps.Logger.InfoContext(ctx, "reconcile: enqueued reindex jobs", "count", len(repos))
60
+		}
61
+		return nil
62
+	}
63
+}
internal/worker/types.gomodified
@@ -55,6 +55,15 @@ const (
5555
 	KindRepoForkClone Kind = "repo:fork_clone"
5656
 )
5757
 
58
+// S28 code-search kinds. index_code re-indexes a repo's default
59
+// branch (paths + content). Enqueued by push:process when the
60
+// default branch advances; also by index_reconcile when drift
61
+// between default_branch_oid and last_indexed_oid is detected.
62
+const (
63
+	KindRepoIndexCode      Kind = "repo:index_code"
64
+	KindRepoIndexReconcile Kind = "repo:index_reconcile"
65
+)
66
+
5867
 // NotifyChannel is the Postgres LISTEN/NOTIFY channel the pool subscribes
5968
 // to so it wakes up immediately when a job is enqueued, instead of
6069
 // polling. Callers wrapping enqueue in a tx must NOTIFY inside the