Go · 10734 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 package jobs
4
5 import (
6 "context"
7 "encoding/json"
8 "errors"
9 "fmt"
10 "log/slog"
11 "strings"
12
13 "github.com/jackc/pgx/v5"
14 "github.com/jackc/pgx/v5/pgtype"
15 "github.com/jackc/pgx/v5/pgxpool"
16
17 "github.com/tenseleyFlow/shithub/internal/auth/audit"
18 "github.com/tenseleyFlow/shithub/internal/auth/secretbox"
19 "github.com/tenseleyFlow/shithub/internal/auth/throttle"
20 "github.com/tenseleyFlow/shithub/internal/infra/storage"
21 "github.com/tenseleyFlow/shithub/internal/orgs"
22 orgsdb "github.com/tenseleyFlow/shithub/internal/orgs/sqlc"
23 "github.com/tenseleyFlow/shithub/internal/repos"
24 reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
25 "github.com/tenseleyFlow/shithub/internal/worker"
26 )
27
28 type OrgGitHubImportDeps struct {
29 Pool *pgxpool.Pool
30 RepoFS *storage.RepoFS
31 Box *secretbox.Box
32 Audit *audit.Recorder
33 Limiter *throttle.Limiter
34 Logger *slog.Logger
35 ShithubdPath string
36 GitHubClient orgs.GitHubClient
37 }
38
39 type OrgGitHubImportDiscoverPayload struct {
40 ImportID int64 `json:"import_id"`
41 }
42
43 type OrgGitHubImportRepoPayload struct {
44 ImportRepoID int64 `json:"import_repo_id"`
45 }
46
47 func OrgGitHubImportDiscover(deps OrgGitHubImportDeps) worker.Handler {
48 return func(ctx context.Context, raw json.RawMessage) error {
49 var p OrgGitHubImportDiscoverPayload
50 if err := json.Unmarshal(raw, &p); err != nil {
51 return worker.PoisonError(fmt.Errorf("bad payload: %w", err))
52 }
53 if p.ImportID == 0 {
54 return worker.PoisonError(errors.New("missing import_id"))
55 }
56
57 q := orgsdb.New()
58 imp, err := q.GetOrgGithubImport(ctx, deps.Pool, p.ImportID)
59 if err != nil {
60 if errors.Is(err, pgx.ErrNoRows) {
61 return worker.PoisonError(fmt.Errorf("import %d not found", p.ImportID))
62 }
63 return err
64 }
65 if orgs.IsTerminalImportStatus(imp.Status) {
66 return nil
67 }
68 token, err := orgs.DecryptGitHubImportToken(imp, deps.Box)
69 if err != nil {
70 _ = markImportFailed(ctx, deps, imp.ID, err)
71 return worker.PoisonError(err)
72 }
73 if err := q.MarkOrgGithubImportDiscovering(ctx, deps.Pool, imp.ID); err != nil {
74 return err
75 }
76 ghRepos, err := deps.GitHubClient.ListOrgRepos(ctx, imp.SourceOrg, token)
77 if err != nil {
78 _ = markImportFailed(ctx, deps, imp.ID, err)
79 return nil
80 }
81
82 tx, err := deps.Pool.Begin(ctx)
83 if err != nil {
84 return err
85 }
86 committed := false
87 defer func() {
88 if !committed {
89 _ = tx.Rollback(ctx)
90 }
91 }()
92
93 for _, gh := range ghRepos {
94 targetName := repos.NormalizeName(gh.Name)
95 visibility := orgsdb.RepoVisibilityPublic
96 if gh.Private {
97 visibility = orgsdb.RepoVisibilityPrivate
98 }
99 row, err := q.InsertOrgGithubImportRepo(ctx, tx, orgsdb.InsertOrgGithubImportRepoParams{
100 ImportID: imp.ID,
101 GithubID: pgtype.Int8{Int64: gh.ID, Valid: gh.ID != 0},
102 SourceFullName: fallbackFullName(imp.SourceOrg, gh),
103 SourceName: strings.TrimSpace(gh.Name),
104 TargetName: targetName,
105 CloneUrl: strings.TrimSpace(gh.CloneURL),
106 Description: truncateRunes(gh.Description, repos.MaxDescriptionLen),
107 DefaultBranch: strings.TrimSpace(gh.DefaultBranch),
108 TargetVisibility: visibility,
109 IsPrivate: gh.Private,
110 IsFork: gh.Fork,
111 })
112 if err != nil {
113 return err
114 }
115 if _, err := worker.Enqueue(ctx, tx, worker.KindOrgGitHubImportRepo, OrgGitHubImportRepoPayload{
116 ImportRepoID: row.ID,
117 }, worker.EnqueueOptions{}); err != nil {
118 return err
119 }
120 }
121 if len(ghRepos) == 0 {
122 if err := q.MarkOrgGithubImportCompleted(ctx, tx, imp.ID); err != nil {
123 return err
124 }
125 } else if err := q.MarkOrgGithubImportImporting(ctx, tx, orgsdb.MarkOrgGithubImportImportingParams{
126 ID: imp.ID,
127 TotalCount: int32(len(ghRepos)),
128 }); err != nil {
129 return err
130 }
131 if err := worker.Notify(ctx, tx); err != nil && deps.Logger != nil {
132 deps.Logger.WarnContext(ctx, "github import: notify children", "error", err, "import_id", imp.ID)
133 }
134 if err := tx.Commit(ctx); err != nil {
135 return err
136 }
137 committed = true
138 return nil
139 }
140 }
141
142 func OrgGitHubImportRepo(deps OrgGitHubImportDeps) worker.Handler {
143 return func(ctx context.Context, raw json.RawMessage) error {
144 var p OrgGitHubImportRepoPayload
145 if err := json.Unmarshal(raw, &p); err != nil {
146 return worker.PoisonError(fmt.Errorf("bad payload: %w", err))
147 }
148 if p.ImportRepoID == 0 {
149 return worker.PoisonError(errors.New("missing import_repo_id"))
150 }
151
152 q := orgsdb.New()
153 rq := reposdb.New()
154 item, err := q.GetOrgGithubImportRepo(ctx, deps.Pool, p.ImportRepoID)
155 if err != nil {
156 if errors.Is(err, pgx.ErrNoRows) {
157 return worker.PoisonError(fmt.Errorf("import repo %d not found", p.ImportRepoID))
158 }
159 return err
160 }
161 if orgs.IsTerminalImportRepoStatus(item.Status) {
162 return nil
163 }
164 imp, err := q.GetOrgGithubImport(ctx, deps.Pool, item.ImportID)
165 if err != nil {
166 return err
167 }
168 if orgs.IsTerminalImportStatus(imp.Status) {
169 return nil
170 }
171 org, err := q.GetOrgByID(ctx, deps.Pool, imp.OrgID)
172 if err != nil {
173 return err
174 }
175 if org.DeletedAt.Valid {
176 if err := markImportRepoFailed(ctx, q, deps.Pool, item.ID, 0, "Organization was deleted during import."); err != nil {
177 return err
178 }
179 return completeImportIfDone(ctx, deps, imp.ID)
180 }
181 if err := q.MarkOrgGithubImportRepoImporting(ctx, deps.Pool, item.ID); err != nil {
182 return err
183 }
184
185 if err := repos.ValidateName(item.TargetName); err != nil {
186 if err := markImportRepoFailed(ctx, q, deps.Pool, item.ID, 0, friendlyRepoImportError(err)); err != nil {
187 return err
188 }
189 return completeImportIfDone(ctx, deps, imp.ID)
190 }
191 exists, err := rq.ExistsRepoForOwnerOrg(ctx, deps.Pool, reposdb.ExistsRepoForOwnerOrgParams{
192 OwnerOrgID: pgtype.Int8{Int64: org.ID, Valid: true},
193 Name: item.TargetName,
194 })
195 if err != nil {
196 return err
197 }
198 if exists {
199 if err := q.MarkOrgGithubImportRepoSkipped(ctx, deps.Pool, orgsdb.MarkOrgGithubImportRepoSkippedParams{
200 ID: item.ID,
201 LastError: pgtype.Text{String: "Repository already exists in this organization.", Valid: true},
202 }); err != nil {
203 return err
204 }
205 return completeImportIfDone(ctx, deps, imp.ID)
206 }
207
208 token, err := orgs.DecryptGitHubImportToken(imp, deps.Box)
209 if err != nil {
210 if err := markImportRepoFailed(ctx, q, deps.Pool, item.ID, 0, err.Error()); err != nil {
211 return err
212 }
213 return completeImportIfDone(ctx, deps, imp.ID)
214 }
215 if item.IsPrivate && token == "" {
216 if err := markImportRepoFailed(ctx, q, deps.Pool, item.ID, 0, "GitHub token unavailable for private repository."); err != nil {
217 return err
218 }
219 return completeImportIfDone(ctx, deps, imp.ID)
220 }
221
222 result, err := repos.Create(ctx, repos.Deps{
223 Pool: deps.Pool,
224 RepoFS: deps.RepoFS,
225 Audit: deps.Audit,
226 Limiter: deps.Limiter,
227 Logger: deps.Logger,
228 ShithubdPath: deps.ShithubdPath,
229 }, repos.Params{
230 OwnerOrgID: org.ID,
231 OwnerSlug: string(org.Slug),
232 ActorUserID: int64Value(imp.RequestedByUserID),
233 BypassCreateRateLimit: true,
234 Name: item.TargetName,
235 Description: item.Description,
236 Visibility: string(item.TargetVisibility),
237 })
238 if err != nil {
239 if errors.Is(err, repos.ErrTaken) {
240 if err := q.MarkOrgGithubImportRepoSkipped(ctx, deps.Pool, orgsdb.MarkOrgGithubImportRepoSkippedParams{
241 ID: item.ID,
242 LastError: pgtype.Text{String: "Repository already exists in this organization.", Valid: true},
243 }); err != nil {
244 return err
245 }
246 return completeImportIfDone(ctx, deps, imp.ID)
247 }
248 if err := markImportRepoFailed(ctx, q, deps.Pool, item.ID, 0, friendlyRepoImportError(err)); err != nil {
249 return err
250 }
251 return completeImportIfDone(ctx, deps, imp.ID)
252 }
253
254 remoteURL, err := repos.SaveSourceRemote(ctx, sourceRemoteDeps(deps, token), result.Repo.ID, item.CloneUrl)
255 if err != nil {
256 if err := markImportRepoFailed(ctx, q, deps.Pool, item.ID, result.Repo.ID, friendlyRepoImportError(err)); err != nil {
257 return err
258 }
259 return completeImportIfDone(ctx, deps, imp.ID)
260 }
261 if err := repos.FetchSourceRemote(ctx, sourceRemoteDeps(deps, token), result.Repo, string(org.Slug), remoteURL); err != nil {
262 if err := markImportRepoFailed(ctx, q, deps.Pool, item.ID, result.Repo.ID, friendlyRepoImportError(err)); err != nil {
263 return err
264 }
265 return completeImportIfDone(ctx, deps, imp.ID)
266 }
267 if err := q.MarkOrgGithubImportRepoImported(ctx, deps.Pool, orgsdb.MarkOrgGithubImportRepoImportedParams{
268 ID: item.ID,
269 RepoID: pgtype.Int8{Int64: result.Repo.ID, Valid: true},
270 }); err != nil {
271 return err
272 }
273 return completeImportIfDone(ctx, deps, imp.ID)
274 }
275 }
276
277 func sourceRemoteDeps(deps OrgGitHubImportDeps, token string) repos.SourceRemoteDeps {
278 return repos.SourceRemoteDeps{
279 Pool: deps.Pool,
280 RepoFS: deps.RepoFS,
281 Logger: deps.Logger,
282 FetchToken: token,
283 }
284 }
285
286 func markImportFailed(ctx context.Context, deps OrgGitHubImportDeps, importID int64, err error) error {
287 msg := friendlyRepoImportError(err)
288 return orgsdb.New().MarkOrgGithubImportFailed(ctx, deps.Pool, orgsdb.MarkOrgGithubImportFailedParams{
289 ID: importID,
290 LastError: pgtype.Text{String: msg, Valid: true},
291 })
292 }
293
294 func markImportRepoFailed(ctx context.Context, q *orgsdb.Queries, db orgsdb.DBTX, itemID, repoID int64, msg string) error {
295 if strings.TrimSpace(msg) == "" {
296 msg = "Import failed."
297 }
298 return q.MarkOrgGithubImportRepoFailed(ctx, db, orgsdb.MarkOrgGithubImportRepoFailedParams{
299 ID: itemID,
300 LastError: pgtype.Text{String: truncateRunes(msg, 500), Valid: true},
301 RepoID: pgtype.Int8{Int64: repoID, Valid: repoID != 0},
302 })
303 }
304
305 func completeImportIfDone(ctx context.Context, deps OrgGitHubImportDeps, importID int64) error {
306 _, err := orgsdb.New().MarkOrgGithubImportCompletedIfDone(ctx, deps.Pool, importID)
307 if errors.Is(err, pgx.ErrNoRows) {
308 return nil
309 }
310 return err
311 }
312
313 func fallbackFullName(sourceOrg string, repo orgs.GitHubRepo) string {
314 if strings.TrimSpace(repo.FullName) != "" {
315 return strings.TrimSpace(repo.FullName)
316 }
317 return sourceOrg + "/" + strings.TrimSpace(repo.Name)
318 }
319
320 func truncateRunes(s string, max int) string {
321 if max <= 0 {
322 return ""
323 }
324 runes := []rune(strings.TrimSpace(s))
325 if len(runes) <= max {
326 return string(runes)
327 }
328 return string(runes[:max])
329 }
330
331 func friendlyRepoImportError(err error) string {
332 if err == nil {
333 return ""
334 }
335 msg := strings.TrimSpace(err.Error())
336 if msg == "" {
337 return "Import failed."
338 }
339 return truncateRunes(msg, 500)
340 }
341
342 func int64Value(v pgtype.Int8) int64 {
343 if !v.Valid {
344 return 0
345 }
346 return v.Int64
347 }
348