tenseleyflow/shithub / 9d2ffa4

Browse files

S29: register notify:fanout worker job

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
9d2ffa44e75b233b55b8e41772c566b07d12e366
Parents
424bde8
Tree
722483f

4 changed files

StatusFile+-
M cmd/shithubd/worker.go 77 0
M internal/infra/config/config.go 12 0
A internal/worker/jobs/notify_fanout.go 69 0
M internal/worker/types.go 9 0
cmd/shithubd/worker.gomodified
@@ -3,6 +3,8 @@
33
 package main
44
 
55
 import (
6
+	"crypto/sha256"
7
+	"encoding/base64"
68
 	"errors"
79
 	"fmt"
810
 	"log/slog"
@@ -16,6 +18,7 @@ import (
1618
 	"github.com/spf13/cobra"
1719
 
1820
 	"github.com/tenseleyFlow/shithub/internal/auth/audit"
21
+	"github.com/tenseleyFlow/shithub/internal/auth/email"
1922
 	"github.com/tenseleyFlow/shithub/internal/infra/config"
2023
 	"github.com/tenseleyFlow/shithub/internal/infra/db"
2124
 	"github.com/tenseleyFlow/shithub/internal/infra/storage"
@@ -108,6 +111,17 @@ var workerCmd = &cobra.Command{
108111
 			Pool: pool, Logger: logger,
109112
 		}))
110113
 
114
+		notifSender, _ := pickNotifEmailSender(cfg)
115
+		p.Register(worker.KindNotifyFanout, jobs.NotifyFanout(jobs.NotifyFanoutDeps{
116
+			Pool:           pool,
117
+			Logger:         logger,
118
+			EmailSender:    notifSender,
119
+			EmailFrom:      cfg.Auth.EmailFrom,
120
+			SiteName:       cfg.Auth.SiteName,
121
+			BaseURL:        cfg.Auth.BaseURL,
122
+			UnsubscribeKey: notifUnsubscribeKey(cfg, logger),
123
+		}))
124
+
111125
 		return p.Run(ctx)
112126
 	},
113127
 }
@@ -116,3 +130,66 @@ func init() {
116130
 	workerCmd.Flags().Int("workers", 0, "Number of worker goroutines (default 4)")
117131
 	rootCmd.AddCommand(workerCmd)
118132
 }
133
+
134
+// pickNotifEmailSender mirrors pickAdminEmailSender / pickEmailSender
135
+// in the web binary. Kept local to the worker so failure to construct
136
+// the sender doesn't kill the process — fan-out without email is a
137
+// supported degraded mode (inbox rows still land).
138
+func pickNotifEmailSender(cfg config.Config) (email.Sender, error) {
139
+	switch cfg.Auth.EmailBackend {
140
+	case "stdout":
141
+		return email.NewStdoutSender(os.Stdout), nil
142
+	case "smtp":
143
+		return &email.SMTPSender{
144
+			Addr:     cfg.Auth.SMTP.Addr,
145
+			From:     cfg.Auth.EmailFrom,
146
+			Username: cfg.Auth.SMTP.Username,
147
+			Password: cfg.Auth.SMTP.Password,
148
+		}, nil
149
+	case "postmark":
150
+		return &email.PostmarkSender{
151
+			ServerToken: cfg.Auth.Postmark.ServerToken,
152
+			From:        cfg.Auth.EmailFrom,
153
+		}, nil
154
+	default:
155
+		return nil, errors.New("worker: unknown email_backend")
156
+	}
157
+}
158
+
159
+// notifUnsubscribeKey resolves the HMAC key used to sign one-click
160
+// unsubscribe URLs. Operators set Notif.UnsubscribeKeyB64 in prod.
161
+// In dev (key empty) we derive a deterministic 32-byte key from the
162
+// session secret so unsubscribe links survive process restarts
163
+// without operator action — and we log a loud warning so the
164
+// derivation can't sneak into prod by accident.
165
+func notifUnsubscribeKey(cfg config.Config, logger *slog.Logger) []byte {
166
+	if cfg.Notif.UnsubscribeKeyB64 != "" {
167
+		k, err := base64.StdEncoding.DecodeString(cfg.Notif.UnsubscribeKeyB64)
168
+		if err == nil && len(k) >= 16 {
169
+			return k
170
+		}
171
+		if logger != nil {
172
+			logger.Warn("notif: unsubscribe_key_b64 invalid; falling back to derived key",
173
+				"hint", "set Notif.UnsubscribeKeyB64 to base64-encoded 32+ random bytes")
174
+		}
175
+	}
176
+	if cfg.Session.KeyB64 != "" {
177
+		seed, err := base64.StdEncoding.DecodeString(cfg.Session.KeyB64)
178
+		if err == nil && len(seed) > 0 {
179
+			sum := sha256.Sum256(append([]byte("notif-unsub:"), seed...))
180
+			if logger != nil {
181
+				logger.Warn("notif: deriving unsubscribe key from session secret (dev fallback)",
182
+					"hint", "set Notif.UnsubscribeKeyB64 in prod")
183
+			}
184
+			return sum[:]
185
+		}
186
+	}
187
+	// Last-resort static dev key — links work but anyone with source
188
+	// access can mint them. Logged at WARN so operators notice in
189
+	// prod logs.
190
+	if logger != nil {
191
+		logger.Warn("notif: no key material — using static dev key",
192
+			"hint", "set Notif.UnsubscribeKeyB64 (or session.key_b64)")
193
+	}
194
+	return []byte("shithub-dev-unsub-static-key-32B")
195
+}
internal/infra/config/config.gomodified
@@ -37,6 +37,18 @@ type Config struct {
3737
 	Session        SessionConfig        `toml:"session"`
3838
 	Storage        StorageConfig        `toml:"storage"`
3939
 	Auth           AuthConfig           `toml:"auth"`
40
+	Notif          NotifConfig          `toml:"notif"`
41
+}
42
+
43
+// NotifConfig configures the S29 notification surface. UnsubscribeKeyB64
44
+// is the base64-encoded HMAC-SHA256 key that signs one-click
45
+// unsubscribe URLs (RFC 8058). When empty (dev default), the
46
+// fan-out worker derives a deterministic key from the session key so
47
+// links are stable across restarts without operator action — this
48
+// derivation is NOT suitable for prod and the wiring layer logs a
49
+// warning when it fires.
50
+type NotifConfig struct {
51
+	UnsubscribeKeyB64 string `toml:"unsubscribe_key_b64"`
4052
 }
4153
 
4254
 // WebConfig holds HTTP server settings.
internal/worker/jobs/notify_fanout.goadded
@@ -0,0 +1,69 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package jobs
4
+
5
+import (
6
+	"context"
7
+	"encoding/json"
8
+	"log/slog"
9
+
10
+	"github.com/jackc/pgx/v5/pgxpool"
11
+
12
+	"github.com/tenseleyFlow/shithub/internal/auth/email"
13
+	"github.com/tenseleyFlow/shithub/internal/notif"
14
+	"github.com/tenseleyFlow/shithub/internal/worker"
15
+)
16
+
17
+// NotifyFanoutDeps wires the fan-out handler against the runtime.
18
+// EmailSender is optional — when nil, inbox rows still get written
19
+// but no email goes out (matches the dev-mode "look at the inbox in
20
+// the web UI" loop).
21
+type NotifyFanoutDeps struct {
22
+	Pool           *pgxpool.Pool
23
+	Logger         *slog.Logger
24
+	EmailSender    email.Sender
25
+	EmailFrom      string
26
+	SiteName       string
27
+	BaseURL        string
28
+	UnsubscribeKey []byte
29
+}
30
+
31
+// NotifyFanout returns the worker handler for `notify:fanout`. The
32
+// handler ignores the payload (cursor lives in
33
+// domain_events_processed) and drains up to FanoutBatch events per
34
+// invocation. The pool re-runs the job through normal retry cadence
35
+// when a drain partially fails, and the cron-driven scheduler re-
36
+// enqueues on a tick.
37
+func NotifyFanout(deps NotifyFanoutDeps) worker.Handler {
38
+	return func(ctx context.Context, _ json.RawMessage) error {
39
+		processed, err := notif.FanoutOnce(ctx, notif.Deps{
40
+			Pool:           deps.Pool,
41
+			Logger:         deps.Logger,
42
+			EmailSender:    deps.EmailSender,
43
+			EmailFrom:      deps.EmailFrom,
44
+			SiteName:       deps.SiteName,
45
+			BaseURL:        deps.BaseURL,
46
+			UnsubscribeKey: deps.UnsubscribeKey,
47
+		})
48
+		if err != nil {
49
+			return err
50
+		}
51
+		if deps.Logger != nil && processed > 0 {
52
+			deps.Logger.InfoContext(ctx, "notify:fanout drained events",
53
+				"count", processed)
54
+		}
55
+		// If we drained a full batch, more events probably exist.
56
+		// Re-enqueue ourselves so we keep going on the next tick
57
+		// without waiting for the next cron beat. Best-effort: an
58
+		// enqueue failure is logged but not surfaced as a job error
59
+		// (the next cron tick will pick up).
60
+		if processed >= notif.FanoutBatch {
61
+			if _, err := worker.Enqueue(ctx, deps.Pool, worker.KindNotifyFanout,
62
+				map[string]any{}, worker.EnqueueOptions{}); err != nil && deps.Logger != nil {
63
+				deps.Logger.WarnContext(ctx, "notify:fanout self-enqueue failed",
64
+					"error", err)
65
+			}
66
+		}
67
+		return nil
68
+	}
69
+}
internal/worker/types.gomodified
@@ -64,6 +64,15 @@ const (
6464
 	KindRepoIndexReconcile Kind = "repo:index_reconcile"
6565
 )
6666
 
67
+// S29 notification kinds. notify:fanout drains the domain_events
68
+// table past the persisted cursor and materializes inbox rows +
69
+// (optional) emails. Self-throttling per FanoutBatch; loop-friendly
70
+// (the handler returns the count so the pool can re-enqueue when a
71
+// drain didn't catch up).
72
+const (
73
+	KindNotifyFanout Kind = "notify:fanout"
74
+)
75
+
6776
 // NotifyChannel is the Postgres LISTEN/NOTIFY channel the pool subscribes
6877
 // to so it wakes up immediately when a job is enqueued, instead of
6978
 // polling. Callers wrapping enqueue in a tx must NOTIFY inside the