Go · 6574 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 package jobs
4
5 import (
6 "bytes"
7 "context"
8 "encoding/json"
9 "errors"
10 "fmt"
11 "log/slog"
12 "strings"
13
14 "github.com/jackc/pgx/v5"
15 "github.com/jackc/pgx/v5/pgtype"
16 "github.com/jackc/pgx/v5/pgxpool"
17
18 "github.com/tenseleyFlow/shithub/internal/infra/storage"
19 repogit "github.com/tenseleyFlow/shithub/internal/repos/git"
20 reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
21 "github.com/tenseleyFlow/shithub/internal/worker"
22 )
23
24 // IndexCodeDeps wires the job.
25 type IndexCodeDeps struct {
26 Pool *pgxpool.Pool
27 RepoFS *storage.RepoFS
28 Logger *slog.Logger
29 }
30
31 // IndexCodePayload is enqueued by the push-process job (or the
32 // reconciler) when the default branch advances.
33 type IndexCodePayload struct {
34 RepoID int64 `json:"repo_id"`
35 }
36
37 // Code-search size limits, per the spec:
38 // - Files > maxFileBytes are skipped from content indexing (path
39 // stays indexed regardless).
40 // - Per-file indexed content is truncated to maxIndexBytes so the
41 // trigram column doesn't bloat for huge text files.
42 const (
43 maxFileBytes = 256 * 1024 // 256 KiB
44 maxIndexBytes = 64 * 1024 // 64 KiB
45 )
46
47 // RepoIndexCode walks the repo's default branch tree, indexes paths
48 // for every file and content for files that fit the size + textness
49 // gates, then atomically swaps the new index in via delete-then-
50 // insert in a single tx. Readers never see a partial index.
51 //
52 // Tracks `repos.last_indexed_oid` so the reconciler can detect drift.
53 func RepoIndexCode(deps IndexCodeDeps) worker.Handler {
54 return func(ctx context.Context, raw json.RawMessage) error {
55 var p IndexCodePayload
56 if err := json.Unmarshal(raw, &p); err != nil {
57 return worker.PoisonError(fmt.Errorf("bad payload: %w", err))
58 }
59 if p.RepoID == 0 {
60 return worker.PoisonError(errors.New("missing repo_id"))
61 }
62
63 rq := reposdb.New()
64 repo, err := rq.GetRepoByID(ctx, deps.Pool, p.RepoID)
65 if err != nil {
66 if errors.Is(err, pgx.ErrNoRows) {
67 return worker.PoisonError(fmt.Errorf("repo %d not found", p.RepoID))
68 }
69 return err
70 }
71 if repo.DeletedAt.Valid {
72 // Repo went away between enqueue and now. Nothing to do.
73 return nil
74 }
75 owner, err := rq.GetRepoOwnerUsernameByID(ctx, deps.Pool, repo.ID)
76 if err != nil {
77 return err
78 }
79 ownerSlug, err := ownerSlugString(owner.OwnerUsername)
80 if err != nil {
81 return err
82 }
83 gitDir, err := deps.RepoFS.RepoPath(ownerSlug, repo.Name)
84 if err != nil {
85 return err
86 }
87
88 ref := repo.DefaultBranch
89 // Resolve the current OID for the default branch. If empty
90 // (no commits yet) clear any prior index and bail.
91 oid, err := repogit.ResolveRefOID(ctx, gitDir, ref)
92 if err != nil {
93 if errors.Is(err, repogit.ErrRefNotFound) {
94 if err := clearRepoIndex(ctx, deps.Pool, repo.ID); err != nil {
95 return err
96 }
97 _ = rq.SetLastIndexedOID(ctx, deps.Pool, reposdb.SetLastIndexedOIDParams{
98 ID: repo.ID, LastIndexedOid: pgtype.Text{Valid: false},
99 })
100 return nil
101 }
102 return fmt.Errorf("resolve %s: %w", ref, err)
103 }
104
105 // Walk the tree.
106 paths, err := repogit.ListAllPaths(ctx, gitDir, ref)
107 if err != nil {
108 return fmt.Errorf("ls-tree: %w", err)
109 }
110
111 // Read each blob; classify size + text. Skipped paths still
112 // land in the path index (only content is gated by size).
113 type indexed struct {
114 path string
115 content []byte // empty if skipped from content index
116 }
117 entries := make([]indexed, 0, len(paths))
118 for _, path := range paths {
119 if shouldSkipPath(path) {
120 continue
121 }
122 ent := indexed{path: path}
123 blob, err := repogit.ReadBlobBytes(ctx, gitDir, ref, path, maxFileBytes+1)
124 if err == nil && len(blob) <= maxFileBytes && isText(blob) {
125 if len(blob) > maxIndexBytes {
126 blob = blob[:maxIndexBytes]
127 }
128 ent.content = blob
129 }
130 entries = append(entries, ent)
131 }
132
133 // Atomic swap: delete + insert in one tx.
134 tx, err := deps.Pool.Begin(ctx)
135 if err != nil {
136 return err
137 }
138 committed := false
139 defer func() {
140 if !committed {
141 _ = tx.Rollback(ctx)
142 }
143 }()
144 if _, err := tx.Exec(ctx,
145 `DELETE FROM code_search_paths WHERE repo_id = $1`, repo.ID); err != nil {
146 return err
147 }
148 if _, err := tx.Exec(ctx,
149 `DELETE FROM code_search_content WHERE repo_id = $1`, repo.ID); err != nil {
150 return err
151 }
152 for _, e := range entries {
153 if _, err := tx.Exec(ctx, `
154 INSERT INTO code_search_paths (repo_id, ref_name, path, tsv)
155 VALUES ($1, $2, $3, to_tsvector('shithub_search', $3))
156 ON CONFLICT DO NOTHING
157 `, repo.ID, ref, e.path); err != nil {
158 return fmt.Errorf("insert path %s: %w", e.path, err)
159 }
160 if e.content != nil {
161 content := string(e.content)
162 if _, err := tx.Exec(ctx, `
163 INSERT INTO code_search_content
164 (repo_id, ref_name, path, content_tsv, content_trgm)
165 VALUES ($1, $2, $3, to_tsvector('shithub_search', $4), $4)
166 ON CONFLICT DO NOTHING
167 `, repo.ID, ref, e.path, content); err != nil {
168 return fmt.Errorf("insert content %s: %w", e.path, err)
169 }
170 }
171 }
172 if err := rq.SetLastIndexedOID(ctx, tx, reposdb.SetLastIndexedOIDParams{
173 ID: repo.ID, LastIndexedOid: pgtype.Text{String: oid, Valid: true},
174 }); err != nil {
175 return err
176 }
177 if err := tx.Commit(ctx); err != nil {
178 return err
179 }
180 committed = true
181 return nil
182 }
183 }
184
185 // clearRepoIndex drops every code-search row for a repo. Used when
186 // the default branch is gone (deleted, or never created) so we
187 // don't leave stale rows behind.
188 func clearRepoIndex(ctx context.Context, pool *pgxpool.Pool, repoID int64) error {
189 if _, err := pool.Exec(ctx,
190 `DELETE FROM code_search_paths WHERE repo_id = $1`, repoID); err != nil {
191 return err
192 }
193 if _, err := pool.Exec(ctx,
194 `DELETE FROM code_search_content WHERE repo_id = $1`, repoID); err != nil {
195 return err
196 }
197 return nil
198 }
199
200 // shouldSkipPath filters out paths the spec calls out as
201 // "skipped by default": vendor/, node_modules/, dist/, .git*. The
202 // `path:` operator (post-MVP) lets users opt into them.
203 func shouldSkipPath(p string) bool {
204 if strings.HasPrefix(p, ".git") {
205 return true
206 }
207 for _, prefix := range []string{"vendor/", "node_modules/", "dist/"} {
208 if strings.HasPrefix(p, prefix) || strings.Contains(p, "/"+prefix) {
209 return true
210 }
211 }
212 return false
213 }
214
215 // isText is the same NUL-byte heuristic S17 uses for the blob view:
216 // any NUL in the first 8 KiB → binary; otherwise text. Cheap, good
217 // enough for code-search content gating.
218 func isText(b []byte) bool {
219 limit := len(b)
220 if limit > 8192 {
221 limit = 8192
222 }
223 return bytes.IndexByte(b[:limit], 0) < 0
224 }
225