Capture social feed activity
Authored by
mfwolffe <wolffemf@dukes.jmu.edu>
- SHA
86fb1458626bc7f7d62db0eea8c65a48505db021- Parents
-
3e9cf7e - Tree
31fd8c2
86fb145
86fb1458626bc7f7d62db0eea8c65a48505db0213e9cf7e
31fd8c2| Status | File | + | - |
|---|---|---|---|
| M |
cmd/shithubd/worker.go
|
3 | 0 |
| M |
internal/repos/create.go
|
15 | 0 |
| M |
internal/worker/jobs/push_process.go
|
15 | 0 |
| A |
internal/worker/jobs/trending_compute.go
|
75 | 0 |
| M |
internal/worker/types.go
|
6 | 0 |
cmd/shithubd/worker.gomodified@@ -129,6 +129,9 @@ var workerCmd = &cobra.Command{ | ||
| 129 | 129 | BaseURL: cfg.Auth.BaseURL, |
| 130 | 130 | UnsubscribeKey: notifUnsubscribeKey(cfg, logger), |
| 131 | 131 | })) |
| 132 | + p.Register(worker.KindTrendingCompute, jobs.TrendingCompute(jobs.TrendingComputeDeps{ | |
| 133 | + Pool: pool, Logger: logger, | |
| 134 | + })) | |
| 132 | 135 | |
| 133 | 136 | // Webhook delivery (S33). The fan-out drains domain_events |
| 134 | 137 | // past its own cursor; deliver runs per-row HTTP POSTs; |
internal/repos/create.gomodified@@ -21,6 +21,7 @@ import ( | ||
| 21 | 21 | "github.com/tenseleyFlow/shithub/internal/infra/storage" |
| 22 | 22 | "github.com/tenseleyFlow/shithub/internal/issues" |
| 23 | 23 | issuesdb "github.com/tenseleyFlow/shithub/internal/issues/sqlc" |
| 24 | + "github.com/tenseleyFlow/shithub/internal/notif" | |
| 24 | 25 | repogit "github.com/tenseleyFlow/shithub/internal/repos/git" |
| 25 | 26 | reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc" |
| 26 | 27 | "github.com/tenseleyFlow/shithub/internal/repos/templates" |
@@ -269,6 +270,20 @@ func Create(ctx context.Context, deps Deps, p Params) (Result, error) { | ||
| 269 | 270 | } |
| 270 | 271 | committed = true |
| 271 | 272 | |
| 273 | + if err := notif.Emit(ctx, deps.Pool, notif.Event{ | |
| 274 | + ActorUserID: p.ActorUserID, | |
| 275 | + Kind: "repo_created", | |
| 276 | + RepoID: row.ID, | |
| 277 | + SourceKind: "repo", | |
| 278 | + SourceID: row.ID, | |
| 279 | + Public: p.Visibility == "public", | |
| 280 | + Extra: map[string]any{ | |
| 281 | + "repo_name": p.Name, | |
| 282 | + }, | |
| 283 | + }); err != nil && deps.Logger != nil { | |
| 284 | + deps.Logger.WarnContext(ctx, "repos: emit repo_created", "repo_id", row.ID, "error", err) | |
| 285 | + } | |
| 286 | + | |
| 272 | 287 | if err := deps.Audit.Record(ctx, deps.Pool, p.ActorUserID, |
| 273 | 288 | audit.ActionRepoCreated, audit.TargetRepo, row.ID, map[string]any{ |
| 274 | 289 | "name": p.Name, |
internal/worker/jobs/push_process.gomodified@@ -25,6 +25,7 @@ import ( | ||
| 25 | 25 | pullsdb "github.com/tenseleyFlow/shithub/internal/pulls/sqlc" |
| 26 | 26 | gitops "github.com/tenseleyFlow/shithub/internal/repos/git" |
| 27 | 27 | reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc" |
| 28 | + "github.com/tenseleyFlow/shithub/internal/social" | |
| 28 | 29 | usersdb "github.com/tenseleyFlow/shithub/internal/users/sqlc" |
| 29 | 30 | "github.com/tenseleyFlow/shithub/internal/worker" |
| 30 | 31 | workerdb "github.com/tenseleyFlow/shithub/internal/worker/sqlc" |
@@ -209,6 +210,20 @@ func PushProcess(deps PushProcessDeps) worker.Handler { | ||
| 209 | 210 | if err := wq.MarkPushEventProcessed(ctx, deps.Pool, event.ID); err != nil { |
| 210 | 211 | return fmt.Errorf("mark processed: %w", err) |
| 211 | 212 | } |
| 213 | + if actorID := int64ValueOrZero(event.PusherUserID); actorID != 0 { | |
| 214 | + if err := social.Emit(ctx, social.Deps{Pool: deps.Pool, Logger: deps.Logger}, social.EmitParams{ | |
| 215 | + ActorUserID: actorID, | |
| 216 | + Kind: "push", | |
| 217 | + RepoID: event.RepoID, | |
| 218 | + SourceKind: "repo", | |
| 219 | + SourceID: event.RepoID, | |
| 220 | + Public: repo.Visibility == reposdb.RepoVisibilityPublic, | |
| 221 | + Payload: body, | |
| 222 | + }); err != nil && deps.Logger != nil { | |
| 223 | + deps.Logger.WarnContext(ctx, "push:process: emit push event", | |
| 224 | + "push_event_id", event.ID, "error", err) | |
| 225 | + } | |
| 226 | + } | |
| 212 | 227 | |
| 213 | 228 | // Wake any size_recalc workers waiting on LISTEN. |
| 214 | 229 | _ = worker.Notify(ctx, deps.Pool) |
internal/worker/jobs/trending_compute.goadded@@ -0,0 +1,75 @@ | ||
| 1 | +// SPDX-License-Identifier: AGPL-3.0-or-later | |
| 2 | + | |
| 3 | +package jobs | |
| 4 | + | |
| 5 | +import ( | |
| 6 | + "context" | |
| 7 | + "encoding/json" | |
| 8 | + "errors" | |
| 9 | + "fmt" | |
| 10 | + "log/slog" | |
| 11 | + "time" | |
| 12 | + | |
| 13 | + "github.com/jackc/pgx/v5/pgtype" | |
| 14 | + "github.com/jackc/pgx/v5/pgxpool" | |
| 15 | + | |
| 16 | + "github.com/tenseleyFlow/shithub/internal/social" | |
| 17 | + "github.com/tenseleyFlow/shithub/internal/worker" | |
| 18 | +) | |
| 19 | + | |
| 20 | +type TrendingComputeDeps struct { | |
| 21 | + Pool *pgxpool.Pool | |
| 22 | + Logger *slog.Logger | |
| 23 | +} | |
| 24 | + | |
| 25 | +type TrendingComputePayload struct { | |
| 26 | + ScheduleNext *bool `json:"schedule_next,omitempty"` | |
| 27 | + IntervalMinutes int32 `json:"interval_minutes,omitempty"` | |
| 28 | +} | |
| 29 | + | |
| 30 | +func TrendingCompute(deps TrendingComputeDeps) worker.Handler { | |
| 31 | + return func(ctx context.Context, raw json.RawMessage) error { | |
| 32 | + if deps.Pool == nil { | |
| 33 | + return errors.New("trending:compute: missing pool") | |
| 34 | + } | |
| 35 | + payload := TrendingComputePayload{} | |
| 36 | + if len(raw) > 0 && string(raw) != "null" { | |
| 37 | + if err := json.Unmarshal(raw, &payload); err != nil { | |
| 38 | + return worker.PoisonError(fmt.Errorf("bad payload: %w", err)) | |
| 39 | + } | |
| 40 | + } | |
| 41 | + | |
| 42 | + if err := social.CaptureTrendingSnapshots(ctx, social.Deps{Pool: deps.Pool, Logger: deps.Logger}); err != nil { | |
| 43 | + return err | |
| 44 | + } | |
| 45 | + if deps.Logger != nil { | |
| 46 | + deps.Logger.InfoContext(ctx, "trending:compute captured snapshots") | |
| 47 | + } | |
| 48 | + | |
| 49 | + scheduleNext := true | |
| 50 | + if payload.ScheduleNext != nil { | |
| 51 | + scheduleNext = *payload.ScheduleNext | |
| 52 | + } | |
| 53 | + if !scheduleNext { | |
| 54 | + return nil | |
| 55 | + } | |
| 56 | + interval := time.Duration(payload.IntervalMinutes) * time.Minute | |
| 57 | + if interval <= 0 { | |
| 58 | + interval = time.Hour | |
| 59 | + } | |
| 60 | + if _, err := worker.Enqueue(ctx, deps.Pool, worker.KindTrendingCompute, map[string]any{ | |
| 61 | + "schedule_next": true, | |
| 62 | + "interval_minutes": int(interval / time.Minute), | |
| 63 | + }, worker.EnqueueOptions{ | |
| 64 | + RunAt: pgtype.Timestamptz{Time: time.Now().Add(interval), Valid: true}, | |
| 65 | + MaxAttempts: 3, | |
| 66 | + }); err != nil { | |
| 67 | + if deps.Logger != nil { | |
| 68 | + deps.Logger.WarnContext(ctx, "trending:compute self-enqueue failed", "error", err) | |
| 69 | + } | |
| 70 | + return nil | |
| 71 | + } | |
| 72 | + _ = worker.Notify(ctx, deps.Pool) | |
| 73 | + return nil | |
| 74 | + } | |
| 75 | +} | |
internal/worker/types.gomodified@@ -73,6 +73,12 @@ const ( | ||
| 73 | 73 | KindNotifyFanout Kind = "notify:fanout" |
| 74 | 74 | ) |
| 75 | 75 | |
| 76 | +// S42 social feed kinds. trending:compute refreshes the denormalized | |
| 77 | +// day/week/month Explore rankings. | |
| 78 | +const ( | |
| 79 | + KindTrendingCompute Kind = "trending:compute" | |
| 80 | +) | |
| 81 | + | |
| 76 | 82 | // NotifyChannel is the Postgres LISTEN/NOTIFY channel the pool subscribes |
| 77 | 83 | // to so it wakes up immediately when a job is enqueued, instead of |
| 78 | 84 | // polling. Callers wrapping enqueue in a tx must NOTIFY inside the |