Go · 11391 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 package notif_test
4
5 import (
6 "context"
7 "io"
8 "log/slog"
9 "strconv"
10 "sync"
11 "testing"
12
13 "github.com/jackc/pgx/v5/pgtype"
14 "github.com/jackc/pgx/v5/pgxpool"
15
16 "github.com/tenseleyFlow/shithub/internal/auth/email"
17 issuesdb "github.com/tenseleyFlow/shithub/internal/issues/sqlc"
18 "github.com/tenseleyFlow/shithub/internal/notif"
19 notifdb "github.com/tenseleyFlow/shithub/internal/notif/sqlc"
20 reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
21 socialdb "github.com/tenseleyFlow/shithub/internal/social/sqlc"
22 "github.com/tenseleyFlow/shithub/internal/testing/dbtest"
23 usersdb "github.com/tenseleyFlow/shithub/internal/users/sqlc"
24 )
25
26 const fixtureHash = "$argon2id$v=19$m=16384,t=1,p=1$" +
27 "AAAAAAAAAAAAAAAA$" +
28 "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
29
30 // captureSender records every Send call so tests can count them.
31 type captureSender struct {
32 mu sync.Mutex
33 messages []email.Message
34 }
35
36 func (c *captureSender) Send(_ context.Context, m email.Message) error {
37 c.mu.Lock()
38 defer c.mu.Unlock()
39 c.messages = append(c.messages, m)
40 return nil
41 }
42
43 func (c *captureSender) count() int {
44 c.mu.Lock()
45 defer c.mu.Unlock()
46 return len(c.messages)
47 }
48
49 type fixture struct {
50 pool *pgxpool.Pool
51 deps notif.Deps
52 sender *captureSender
53 repoID int64
54 author int64 // creator + actor in most tests
55 }
56
57 func setupFanout(t *testing.T) fixture {
58 t.Helper()
59 pool := dbtest.NewTestDB(t)
60 ctx := context.Background()
61
62 uq := usersdb.New()
63 author, err := uq.CreateUser(ctx, pool, usersdb.CreateUserParams{
64 Username: "alice", DisplayName: "Alice", PasswordHash: fixtureHash,
65 })
66 if err != nil {
67 t.Fatalf("create author: %v", err)
68 }
69 repo, err := reposdb.New().CreateRepo(ctx, pool, reposdb.CreateRepoParams{
70 OwnerUserID: pgtype.Int8{Int64: author.ID, Valid: true},
71 Name: "demo",
72 DefaultBranch: "trunk",
73 Visibility: reposdb.RepoVisibilityPublic,
74 })
75 if err != nil {
76 t.Fatalf("create repo: %v", err)
77 }
78 sender := &captureSender{}
79 deps := notif.Deps{
80 Pool: pool,
81 Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
82 EmailSender: sender,
83 EmailFrom: "noreply@example.com",
84 SiteName: "shithub",
85 BaseURL: "https://shithub.example",
86 UnsubscribeKey: []byte("test-key-32-bytes-aaaaaaaaaaaaaa"),
87 }
88 return fixture{pool: pool, deps: deps, sender: sender, repoID: repo.ID, author: author.ID}
89 }
90
91 func mustUser(t *testing.T, pool *pgxpool.Pool, name string) int64 {
92 t.Helper()
93 u, err := usersdb.New().CreateUser(context.Background(), pool, usersdb.CreateUserParams{
94 Username: name, DisplayName: name, PasswordHash: fixtureHash,
95 })
96 if err != nil {
97 t.Fatalf("create user %s: %v", name, err)
98 }
99 // Give every user a verified primary email so the email-side
100 // gate doesn't trivially short-circuit the storm-dampener tests.
101 em, err := usersdb.New().CreateUserEmail(context.Background(), pool, usersdb.CreateUserEmailParams{
102 UserID: u.ID,
103 Email: name + "@example.com",
104 })
105 if err != nil {
106 t.Fatalf("create email %s: %v", name, err)
107 }
108 if err := usersdb.New().MarkUserEmailVerified(context.Background(), pool, em.ID); err != nil {
109 t.Fatalf("verify email %s: %v", name, err)
110 }
111 if err := usersdb.New().LinkUserPrimaryEmail(context.Background(), pool, usersdb.LinkUserPrimaryEmailParams{
112 ID: u.ID, PrimaryEmailID: pgtype.Int8{Int64: em.ID, Valid: true},
113 }); err != nil {
114 t.Fatalf("set primary email %s: %v", name, err)
115 }
116 return u.ID
117 }
118
119 func mustIssue(t *testing.T, pool *pgxpool.Pool, repoID, authorID int64) issuesdb.Issue {
120 t.Helper()
121 ctx := context.Background()
122 q := issuesdb.New()
123 if err := q.EnsureRepoIssueCounter(ctx, pool, repoID); err != nil {
124 t.Fatalf("ensure counter: %v", err)
125 }
126 num, err := q.AllocateIssueNumber(ctx, pool, repoID)
127 if err != nil {
128 t.Fatalf("allocate: %v", err)
129 }
130 row, err := q.CreateIssue(ctx, pool, issuesdb.CreateIssueParams{
131 RepoID: repoID,
132 Number: num,
133 Kind: issuesdb.IssueKindIssue,
134 Title: "test issue " + strconv.FormatInt(num, 10),
135 Body: "body",
136 AuthorUserID: pgtype.Int8{Int64: authorID, Valid: true},
137 })
138 if err != nil {
139 t.Fatalf("create issue: %v", err)
140 }
141 return row
142 }
143
144 func emitComment(t *testing.T, fx fixture, issue issuesdb.Issue, actorID int64, mentions []int64) {
145 t.Helper()
146 if err := notif.Emit(context.Background(), fx.pool, notif.Event{
147 ActorUserID: actorID,
148 Kind: "issue_comment_created",
149 RepoID: fx.repoID,
150 SourceKind: "issue",
151 SourceID: issue.ID,
152 Public: true,
153 Mentions: mentions,
154 }); err != nil {
155 t.Fatalf("emit: %v", err)
156 }
157 }
158
159 // ─── tests ─────────────────────────────────────────────────────────
160
161 // Five comments on one issue produce ONE inbox row for a watching
162 // recipient, with last_event_at advanced.
163 func TestFanout_ThreadCoalescing(t *testing.T) {
164 fx := setupFanout(t)
165 ctx := context.Background()
166
167 bob := mustUser(t, fx.pool, "bob")
168 // Bob watches the repo at level=all so he gets every comment.
169 if err := socialdb.New().UpsertWatch(ctx, fx.pool, socialdb.UpsertWatchParams{
170 UserID: bob, RepoID: fx.repoID, Level: socialdb.WatchLevelAll,
171 }); err != nil {
172 t.Fatalf("watch: %v", err)
173 }
174 issue := mustIssue(t, fx.pool, fx.repoID, fx.author)
175
176 for i := 0; i < 5; i++ {
177 emitComment(t, fx, issue, fx.author, nil)
178 }
179 if _, err := notif.FanoutOnce(ctx, fx.deps); err != nil {
180 t.Fatalf("fanout: %v", err)
181 }
182
183 // Single inbox row coalesced for bob.
184 count, err := notifdb.New().CountNotificationsForRecipient(ctx, fx.pool,
185 notifdb.CountNotificationsForRecipientParams{RecipientUserID: bob})
186 if err != nil {
187 t.Fatalf("count: %v", err)
188 }
189 if count != 1 {
190 t.Fatalf("want 1 coalesced row, got %d", count)
191 }
192 }
193
194 // Same scenario as coalesce: 5 events should produce ONE email
195 // thanks to the per-thread storm dampener (cap=1 per 10 minutes).
196 func TestFanout_StormDampener_SingleEmailPerThread(t *testing.T) {
197 fx := setupFanout(t)
198 ctx := context.Background()
199
200 bob := mustUser(t, fx.pool, "bob")
201 if err := socialdb.New().UpsertWatch(ctx, fx.pool, socialdb.UpsertWatchParams{
202 UserID: bob, RepoID: fx.repoID, Level: socialdb.WatchLevelAll,
203 }); err != nil {
204 t.Fatalf("watch: %v", err)
205 }
206 issue := mustIssue(t, fx.pool, fx.repoID, fx.author)
207 for i := 0; i < 5; i++ {
208 emitComment(t, fx, issue, fx.author, nil)
209 if _, err := notif.FanoutOnce(ctx, fx.deps); err != nil {
210 t.Fatalf("fanout: %v", err)
211 }
212 }
213 if got := fx.sender.count(); got != 1 {
214 t.Fatalf("want 1 email after dampener, got %d", got)
215 }
216 }
217
218 // The actor never notifies themselves — even if they are a
219 // watcher / author / etc.
220 func TestFanout_SelfSuppression(t *testing.T) {
221 fx := setupFanout(t)
222 ctx := context.Background()
223
224 // Make the author a level=all watcher of their own repo so they
225 // would otherwise match the watcher slot.
226 if err := socialdb.New().UpsertWatch(ctx, fx.pool, socialdb.UpsertWatchParams{
227 UserID: fx.author, RepoID: fx.repoID, Level: socialdb.WatchLevelAll,
228 }); err != nil {
229 t.Fatalf("watch: %v", err)
230 }
231 issue := mustIssue(t, fx.pool, fx.repoID, fx.author)
232 emitComment(t, fx, issue, fx.author, nil)
233 if _, err := notif.FanoutOnce(ctx, fx.deps); err != nil {
234 t.Fatalf("fanout: %v", err)
235 }
236 count, _ := notifdb.New().CountNotificationsForRecipient(ctx, fx.pool,
237 notifdb.CountNotificationsForRecipientParams{RecipientUserID: fx.author})
238 if count != 0 {
239 t.Fatalf("self-notification leaked: %d rows", count)
240 }
241 }
242
243 // A user with watches.level=ignore on the repo gets nothing for
244 // regular events — but @mentions OVERRIDE ignore (matches GitHub
245 // semantics; the spec's day-1 lean).
246 func TestFanout_MentionOverridesIgnore(t *testing.T) {
247 fx := setupFanout(t)
248 ctx := context.Background()
249
250 bob := mustUser(t, fx.pool, "bob")
251 if err := socialdb.New().UpsertWatch(ctx, fx.pool, socialdb.UpsertWatchParams{
252 UserID: bob, RepoID: fx.repoID, Level: socialdb.WatchLevelIgnore,
253 }); err != nil {
254 t.Fatalf("watch ignore: %v", err)
255 }
256
257 issue := mustIssue(t, fx.pool, fx.repoID, fx.author)
258 // Plain comment: bob should get nothing (ignore level + no
259 // other relation).
260 emitComment(t, fx, issue, fx.author, nil)
261 if _, err := notif.FanoutOnce(ctx, fx.deps); err != nil {
262 t.Fatalf("fanout: %v", err)
263 }
264 count, _ := notifdb.New().CountNotificationsForRecipient(ctx, fx.pool,
265 notifdb.CountNotificationsForRecipientParams{RecipientUserID: bob})
266 if count != 0 {
267 t.Fatalf("ignored user got plain comment: %d rows", count)
268 }
269 // Mention should bypass ignore.
270 emitComment(t, fx, issue, fx.author, []int64{bob})
271 if _, err := notif.FanoutOnce(ctx, fx.deps); err != nil {
272 t.Fatalf("fanout: %v", err)
273 }
274 count, _ = notifdb.New().CountNotificationsForRecipient(ctx, fx.pool,
275 notifdb.CountNotificationsForRecipientParams{RecipientUserID: bob})
276 if count != 1 {
277 t.Fatalf("mention should override ignore; got %d rows", count)
278 }
279 }
280
281 // Visibility re-check: bob is not a collab on a private repo. An
282 // event emitted with Public=true (e.g. emitter mis-stamped or repo
283 // flipped to private mid-flight) must not produce a notification
284 // for bob.
285 func TestFanout_VisibilityRecheck_PrivateRepo(t *testing.T) {
286 fx := setupFanout(t)
287 ctx := context.Background()
288 // Flip the repo to private.
289 if _, err := fx.pool.Exec(ctx,
290 `UPDATE repos SET visibility='private' WHERE id=$1`, fx.repoID); err != nil {
291 t.Fatalf("flip private: %v", err)
292 }
293 bob := mustUser(t, fx.pool, "bob")
294 if err := socialdb.New().UpsertWatch(ctx, fx.pool, socialdb.UpsertWatchParams{
295 UserID: bob, RepoID: fx.repoID, Level: socialdb.WatchLevelAll,
296 }); err != nil {
297 t.Fatalf("watch: %v", err)
298 }
299 issue := mustIssue(t, fx.pool, fx.repoID, fx.author)
300 emitComment(t, fx, issue, fx.author, nil)
301 if _, err := notif.FanoutOnce(ctx, fx.deps); err != nil {
302 t.Fatalf("fanout: %v", err)
303 }
304 count, _ := notifdb.New().CountNotificationsForRecipient(ctx, fx.pool,
305 notifdb.CountNotificationsForRecipientParams{RecipientUserID: bob})
306 if count != 0 {
307 t.Fatalf("non-collab on private got notification: %d rows", count)
308 }
309 }
310
311 // Verify that the unsubscribe HMAC roundtrips: a recipient + thread
312 // gets a verifying signature; tampering with any field invalidates it.
313 func TestUnsubscribe_HMACRoundtrip(t *testing.T) {
314 key := []byte("test-key-32-bytes-aaaaaaaaaaaaaa")
315 const (
316 uid = int64(123)
317 tk = "issue"
318 tid = int64(45)
319 bad = "issues"
320 other = int64(46)
321 )
322 // Build by parsing the URL the same path the email-side uses.
323 // The exposed verifier takes the parsed pieces.
324 good := makeSig(key, uid, tk, tid)
325 if !notif.VerifyUnsubscribe(key, uid, tk, tid, good) {
326 t.Fatal("good sig should verify")
327 }
328 if notif.VerifyUnsubscribe(key, uid, bad, tid, good) {
329 t.Fatal("changed kind should NOT verify")
330 }
331 if notif.VerifyUnsubscribe(key, uid, tk, other, good) {
332 t.Fatal("changed thread id should NOT verify")
333 }
334 }
335
336 // makeSig duplicates the package's URL-builder math so the roundtrip
337 // test doesn't have to import internals or pull a half-built URL apart.
338 func makeSig(key []byte, uid int64, tk string, tid int64) string {
339 // Mirror unsubscribeURL's payload format.
340 rec := strconv.FormatInt(uid, 10)
341 tids := strconv.FormatInt(tid, 10)
342 payload := rec + ":" + tk + ":" + tids
343 return notif.HMACSig(key, payload)
344 }
345