tenseleyflow/shithub / 9256e8c

Browse files

S16: lifecycle:sweep worker job — past-grace hard delete + transfer expiry

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
9256e8c504d8f07adb3cfd22224d1cc7845e3287
Parents
3d70843
Tree
f22956d

3 changed files

StatusFile+-
M cmd/shithubd/worker.go 9 0
A internal/worker/jobs/lifecycle_sweep.go 71 0
M internal/worker/types.go 5 0
cmd/shithubd/worker.gomodified
@@ -15,6 +15,7 @@ import (
15
 
15
 
16
 	"github.com/spf13/cobra"
16
 	"github.com/spf13/cobra"
17
 
17
 
18
+	"github.com/tenseleyFlow/shithub/internal/auth/audit"
18
 	"github.com/tenseleyFlow/shithub/internal/infra/config"
19
 	"github.com/tenseleyFlow/shithub/internal/infra/config"
19
 	"github.com/tenseleyFlow/shithub/internal/infra/db"
20
 	"github.com/tenseleyFlow/shithub/internal/infra/db"
20
 	"github.com/tenseleyFlow/shithub/internal/infra/storage"
21
 	"github.com/tenseleyFlow/shithub/internal/infra/storage"
@@ -22,6 +23,11 @@ import (
22
 	"github.com/tenseleyFlow/shithub/internal/worker/jobs"
23
 	"github.com/tenseleyFlow/shithub/internal/worker/jobs"
23
 )
24
 )
24
 
25
 
26
+// auditRecorder returns the shared audit recorder. Kept as a function
27
+// rather than a package-level value so future tests / non-default
28
+// recorders can substitute via dependency injection.
29
+func auditRecorder() *audit.Recorder { return audit.NewRecorder() }
30
+
25
 // workerCmd boots a long-running worker pool. SIGINT/SIGTERM trigger
31
 // workerCmd boots a long-running worker pool. SIGINT/SIGTERM trigger
26
 // graceful shutdown: the LISTEN goroutine drops, claim attempts stop,
32
 // graceful shutdown: the LISTEN goroutine drops, claim attempts stop,
27
 // in-flight jobs are given a deadline to finish, then the binary exits.
33
 // in-flight jobs are given a deadline to finish, then the binary exits.
@@ -83,6 +89,9 @@ var workerCmd = &cobra.Command{
83
 		p.Register(worker.KindJobsPurge, jobs.JobsPurge(jobs.JobsPurgeDeps{
89
 		p.Register(worker.KindJobsPurge, jobs.JobsPurge(jobs.JobsPurgeDeps{
84
 			Pool: pool, Logger: logger,
90
 			Pool: pool, Logger: logger,
85
 		}))
91
 		}))
92
+		p.Register(worker.KindLifecycleSweep, jobs.LifecycleSweep(jobs.LifecycleSweepDeps{
93
+			Pool: pool, RepoFS: rfs, Audit: auditRecorder(), Logger: logger,
94
+		}))
86
 
95
 
87
 		return p.Run(ctx)
96
 		return p.Run(ctx)
88
 	},
97
 	},
internal/worker/jobs/lifecycle_sweep.goadded
@@ -0,0 +1,71 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package jobs
4
+
5
+import (
6
+	"context"
7
+	"encoding/json"
8
+	"log/slog"
9
+
10
+	"github.com/jackc/pgx/v5/pgxpool"
11
+
12
+	"github.com/tenseleyFlow/shithub/internal/auth/audit"
13
+	"github.com/tenseleyFlow/shithub/internal/infra/storage"
14
+	"github.com/tenseleyFlow/shithub/internal/repos/lifecycle"
15
+	reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
16
+	"github.com/tenseleyFlow/shithub/internal/worker"
17
+)
18
+
19
+// LifecycleSweepDeps wires the periodic sweep handler.
20
+type LifecycleSweepDeps struct {
21
+	Pool   *pgxpool.Pool
22
+	RepoFS *storage.RepoFS
23
+	Audit  *audit.Recorder
24
+	Logger *slog.Logger
25
+}
26
+
27
+// LifecycleSweep runs two related housekeeping passes in one job:
28
+//
29
+//  1. Hard-delete every repo whose deleted_at is past the grace window.
30
+//     Each repo gets the full lifecycle.HardDelete cascade — FS + DB
31
+//     + audit. We process inline rather than fanning out to one job
32
+//     per repo because hard-deletes are rare and the per-row cost is
33
+//     small.
34
+//  2. Flip pending transfer requests past expires_at to "expired".
35
+//
36
+// Enqueue this kind from a cron timer (S26 owns scheduling); for now
37
+// the operator can `INSERT` a job manually or call it once at boot.
38
+func LifecycleSweep(deps LifecycleSweepDeps) worker.Handler {
39
+	return func(ctx context.Context, _ json.RawMessage) error {
40
+		// 1. Hard-delete past-grace repos.
41
+		rq := reposdb.New()
42
+		ids, err := rq.ListRepoIDsPastSoftDeleteGrace(ctx, deps.Pool)
43
+		if err != nil {
44
+			return err
45
+		}
46
+		ldeps := lifecycle.Deps{
47
+			Pool: deps.Pool, RepoFS: deps.RepoFS,
48
+			Audit: deps.Audit, Logger: deps.Logger,
49
+		}
50
+		for _, id := range ids {
51
+			if err := ctx.Err(); err != nil {
52
+				return err
53
+			}
54
+			if err := lifecycle.HardDelete(ctx, ldeps, 0, id); err != nil {
55
+				deps.Logger.WarnContext(ctx, "lifecycle:sweep: hard delete failed",
56
+					"repo_id", id, "error", err)
57
+				// Keep going — one bad row shouldn't poison the sweep.
58
+			}
59
+		}
60
+
61
+		// 2. Expire pending transfers past their TTL.
62
+		n, err := lifecycle.ExpirePending(ctx, ldeps)
63
+		if err != nil {
64
+			return err
65
+		}
66
+		if n > 0 {
67
+			deps.Logger.InfoContext(ctx, "lifecycle:sweep: expired transfers", "count", n)
68
+		}
69
+		return nil
70
+	}
71
+}
internal/worker/types.gomodified
@@ -31,6 +31,11 @@ const (
31
 	KindJobsPurge      Kind = "jobs:purge_completed"
31
 	KindJobsPurge      Kind = "jobs:purge_completed"
32
 )
32
 )
33
 
33
 
34
+// S16 lifecycle housekeeping kind.
35
+const (
36
+	KindLifecycleSweep Kind = "lifecycle:sweep"
37
+)
38
+
34
 // NotifyChannel is the Postgres LISTEN/NOTIFY channel the pool subscribes
39
 // NotifyChannel is the Postgres LISTEN/NOTIFY channel the pool subscribes
35
 // to so it wakes up immediately when a job is enqueued, instead of
40
 // to so it wakes up immediately when a job is enqueued, instead of
36
 // polling. Callers wrapping enqueue in a tx must NOTIFY inside the
41
 // polling. Callers wrapping enqueue in a tx must NOTIFY inside the