Go · 2275 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 package jobs
4
5 import (
6 "context"
7 "encoding/json"
8 "log/slog"
9
10 "github.com/jackc/pgx/v5/pgxpool"
11
12 "github.com/tenseleyFlow/shithub/internal/auth/email"
13 "github.com/tenseleyFlow/shithub/internal/notif"
14 "github.com/tenseleyFlow/shithub/internal/worker"
15 )
16
17 // NotifyFanoutDeps wires the fan-out handler against the runtime.
18 // EmailSender is optional — when nil, inbox rows still get written
19 // but no email goes out (matches the dev-mode "look at the inbox in
20 // the web UI" loop).
21 type NotifyFanoutDeps struct {
22 Pool *pgxpool.Pool
23 Logger *slog.Logger
24 EmailSender email.Sender
25 EmailFrom string
26 SiteName string
27 BaseURL string
28 UnsubscribeKey []byte
29 }
30
31 // NotifyFanout returns the worker handler for `notify:fanout`. The
32 // handler ignores the payload (cursor lives in
33 // domain_events_processed) and drains up to FanoutBatch events per
34 // invocation. The pool re-runs the job through normal retry cadence
35 // when a drain partially fails, and the cron-driven scheduler re-
36 // enqueues on a tick.
37 func NotifyFanout(deps NotifyFanoutDeps) worker.Handler {
38 return func(ctx context.Context, _ json.RawMessage) error {
39 processed, err := notif.FanoutOnce(ctx, notif.Deps{
40 Pool: deps.Pool,
41 Logger: deps.Logger,
42 EmailSender: deps.EmailSender,
43 EmailFrom: deps.EmailFrom,
44 SiteName: deps.SiteName,
45 BaseURL: deps.BaseURL,
46 UnsubscribeKey: deps.UnsubscribeKey,
47 })
48 if err != nil {
49 return err
50 }
51 if deps.Logger != nil && processed > 0 {
52 deps.Logger.InfoContext(ctx, "notify:fanout drained events",
53 "count", processed)
54 }
55 // If we drained a full batch, more events probably exist.
56 // Re-enqueue ourselves so we keep going on the next tick
57 // without waiting for the next cron beat. Best-effort: an
58 // enqueue failure is logged but not surfaced as a job error
59 // (the next cron tick will pick up).
60 if processed >= notif.FanoutBatch {
61 if _, err := worker.Enqueue(ctx, deps.Pool, worker.KindNotifyFanout,
62 map[string]any{}, worker.EnqueueOptions{}); err != nil && deps.Logger != nil {
63 deps.Logger.WarnContext(ctx, "notify:fanout self-enqueue failed",
64 "error", err)
65 }
66 }
67 return nil
68 }
69 }
70