tenseleyflow/shithub / 14ae067

Browse files

S29: emit domain events from issues/comments/assign/review-request

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
14ae06756029653d16ac6e4ae0271443bdf92e7b
Parents
9d2ffa4
Tree
a417117

6 changed files

StatusFile+-
M internal/issues/issues.go 57 4
M internal/issues/milestones.go 7 0
A internal/issues/notif_emit.go 131 0
A internal/notif/emit.go 97 0
A internal/notif/mentions.go 50 0
M internal/pulls/review/request.go 33 1
internal/issues/issues.gomodified
@@ -124,7 +124,7 @@ func Create(ctx context.Context, deps Deps, p CreateParams) (issuesdb.Issue, err
124
 	}
124
 	}
125
 
125
 
126
 	// Render markdown for the cached body html.
126
 	// Render markdown for the cached body html.
127
-	html := renderBodyHTML(ctx, deps, p.Body)
127
+	html, mentions := renderBody(ctx, deps, p.Body)
128
 	row.BodyHtmlCached = pgtype.Text{String: html, Valid: html != ""}
128
 	row.BodyHtmlCached = pgtype.Text{String: html, Valid: html != ""}
129
 
129
 
130
 	if err := q.UpdateIssueTitleBody(ctx, tx, issuesdb.UpdateIssueTitleBodyParams{
130
 	if err := q.UpdateIssueTitleBody(ctx, tx, issuesdb.UpdateIssueTitleBodyParams{
@@ -138,6 +138,21 @@ func Create(ctx context.Context, deps Deps, p CreateParams) (issuesdb.Issue, err
138
 		return issuesdb.Issue{}, fmt.Errorf("refs: %w", err)
138
 		return issuesdb.Issue{}, fmt.Errorf("refs: %w", err)
139
 	}
139
 	}
140
 
140
 
141
+	// Emit the domain event in the same tx as the issue row so a
142
+	// rollback drops both. Mention resolution happens *after* commit
143
+	// to avoid holding the row lock through user-id lookups; the
144
+	// fan-out worker reads payload.mentions to drive @-ping
145
+	// recipients.
146
+	mentionIDs := mentionUserIDs(ctx, deps.Pool, mentions)
147
+	repoVis, _ := repoVisibilityPublic(ctx, tx, p.RepoID)
148
+	eventKind := "issue_created"
149
+	if kind == "pr" {
150
+		eventKind = "pr_opened"
151
+	}
152
+	if err := emitIssueEventTx(ctx, tx, eventKind, row, p.AuthorUserID, repoVis, mentionIDs); err != nil {
153
+		return issuesdb.Issue{}, fmt.Errorf("emit event: %w", err)
154
+	}
155
+
141
 	if err := tx.Commit(ctx); err != nil {
156
 	if err := tx.Commit(ctx); err != nil {
142
 		return issuesdb.Issue{}, fmt.Errorf("commit: %w", err)
157
 		return issuesdb.Issue{}, fmt.Errorf("commit: %w", err)
143
 	}
158
 	}
@@ -185,7 +200,7 @@ func AddComment(ctx context.Context, deps Deps, p CommentCreateParams) (issuesdb
185
 		return issuesdb.IssueComment{}, ErrIssueLocked
200
 		return issuesdb.IssueComment{}, ErrIssueLocked
186
 	}
201
 	}
187
 
202
 
188
-	html := renderBodyHTML(ctx, deps, body)
203
+	html, mentions := renderBody(ctx, deps, body)
189
 
204
 
190
 	tx, err := deps.Pool.Begin(ctx)
205
 	tx, err := deps.Pool.Begin(ctx)
191
 	if err != nil {
206
 	if err != nil {
@@ -212,6 +227,16 @@ func AddComment(ctx context.Context, deps Deps, p CommentCreateParams) (issuesdb
212
 		return issuesdb.IssueComment{}, err
227
 		return issuesdb.IssueComment{}, err
213
 	}
228
 	}
214
 
229
 
230
+	mentionIDs := mentionUserIDs(ctx, deps.Pool, mentions)
231
+	repoVis, _ := repoVisibilityPublic(ctx, tx, issue.RepoID)
232
+	commentKind := "issue_comment_created"
233
+	if issue.Kind == issuesdb.IssueKindPr {
234
+		commentKind = "pr_comment_created"
235
+	}
236
+	if err := emitCommentEventTx(ctx, tx, commentKind, issue, c.ID, p.AuthorUserID, repoVis, mentionIDs); err != nil {
237
+		return issuesdb.IssueComment{}, fmt.Errorf("emit event: %w", err)
238
+	}
239
+
215
 	if err := tx.Commit(ctx); err != nil {
240
 	if err := tx.Commit(ctx); err != nil {
216
 		return issuesdb.IssueComment{}, err
241
 		return issuesdb.IssueComment{}, err
217
 	}
242
 	}
@@ -270,6 +295,18 @@ func SetState(ctx context.Context, deps Deps, actorUserID, issueID int64, newSta
270
 	}); err != nil {
295
 	}); err != nil {
271
 		return err
296
 		return err
272
 	}
297
 	}
298
+	// Lifecycle domain event so the notif fan-out + S33 webhook
299
+	// pipeline pick up state changes the same way they pick up
300
+	// comments.
301
+	issue, _ := q.GetIssueByID(ctx, tx, issueID)
302
+	repoVis, _ := repoVisibilityPublic(ctx, tx, issue.RepoID)
303
+	stateKind := "issue_" + kind // issue_closed | issue_reopened
304
+	if issue.Kind == issuesdb.IssueKindPr {
305
+		stateKind = "pr_" + kind
306
+	}
307
+	if err := emitIssueEventTx(ctx, tx, stateKind, issue, actorUserID, repoVis, nil); err != nil {
308
+		return fmt.Errorf("emit event: %w", err)
309
+	}
273
 	if err := tx.Commit(ctx); err != nil {
310
 	if err := tx.Commit(ctx); err != nil {
274
 		return err
311
 		return err
275
 	}
312
 	}
@@ -340,10 +377,26 @@ func SetLock(ctx context.Context, deps Deps, actorUserID, issueID int64, locked
340
 // somewhere upstream regressed. The audit (S00-S25, M) flagged the
377
 // somewhere upstream regressed. The audit (S00-S25, M) flagged the
341
 // `_`-discard pattern as the kind of slop where a real bug could hide.
378
 // `_`-discard pattern as the kind of slop where a real bug could hide.
342
 func renderBodyHTML(ctx context.Context, deps Deps, body string) string {
379
 func renderBodyHTML(ctx context.Context, deps Deps, body string) string {
343
-	html, err := mdrender.RenderHTML([]byte(body))
380
+	html, _ := renderBody(ctx, deps, body)
381
+	return html
382
+}
383
+
384
+// renderBody is the mention-aware variant. Returns the cleaned HTML
385
+// plus the resolved mentions list — callers that emit notification
386
+// events use the mentions to fan out @-pings. The `_` shimming under
387
+// renderBodyHTML keeps existing call sites that don't care about
388
+// mentions untouched.
389
+func renderBody(ctx context.Context, deps Deps, body string) (string, []mdrender.Mention) {
390
+	if body == "" {
391
+		return "", nil
392
+	}
393
+	html, _, mentions, err := mdrender.Render(ctx, []byte(body), mdrender.Options{
394
+		SoftBreakAsBR: true,
395
+	})
344
 	if err != nil && deps.Logger != nil {
396
 	if err != nil && deps.Logger != nil {
345
 		deps.Logger.WarnContext(ctx, "issues: markdown render failed",
397
 		deps.Logger.WarnContext(ctx, "issues: markdown render failed",
346
 			"error", err, "body_bytes", len(body))
398
 			"error", err, "body_bytes", len(body))
399
+		return "", nil
347
 	}
400
 	}
348
-	return html
401
+	return string(html), mentions
349
 }
402
 }
internal/issues/milestones.gomodified
@@ -155,6 +155,13 @@ func AssignUser(ctx context.Context, deps Deps, actorUserID, issueID, userID int
155
 	}); err != nil {
155
 	}); err != nil {
156
 		return err
156
 		return err
157
 	}
157
 	}
158
+	// S29: domain event so the fan-out worker can route an
159
+	// `assignment` notification to the assignee.
160
+	issue, _ := q.GetIssueByID(ctx, tx, issueID)
161
+	repoVis, _ := repoVisibilityPublic(ctx, tx, issue.RepoID)
162
+	if err := emitAssignmentEventTx(ctx, tx, issue, actorUserID, userID, repoVis); err != nil {
163
+		return err
164
+	}
158
 	if err := tx.Commit(ctx); err != nil {
165
 	if err := tx.Commit(ctx); err != nil {
159
 		return err
166
 		return err
160
 	}
167
 	}
internal/issues/notif_emit.goadded
@@ -0,0 +1,131 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package issues
4
+
5
+import (
6
+	"context"
7
+	"errors"
8
+	"strconv"
9
+
10
+	"github.com/jackc/pgx/v5"
11
+	"github.com/jackc/pgx/v5/pgxpool"
12
+
13
+	issuesdb "github.com/tenseleyFlow/shithub/internal/issues/sqlc"
14
+	"github.com/tenseleyFlow/shithub/internal/markdown"
15
+	"github.com/tenseleyFlow/shithub/internal/notif"
16
+)
17
+
18
+// emitIssueEventTx emits a domain event for an issue/PR mutation
19
+// inside the orchestrator's tx. Centralized so every call site
20
+// records the same shape (kind, source, repo, mentions).
21
+func emitIssueEventTx(
22
+	ctx context.Context,
23
+	tx pgx.Tx,
24
+	kind string,
25
+	issue issuesdb.Issue,
26
+	actorUserID int64,
27
+	repoIsPublic bool,
28
+	mentions []int64,
29
+) error {
30
+	return notif.EmitTx(ctx, tx, notif.Event{
31
+		ActorUserID: actorUserID,
32
+		Kind:        kind,
33
+		RepoID:      issue.RepoID,
34
+		SourceKind:  "issue",
35
+		SourceID:    issue.ID,
36
+		Public:      repoIsPublic,
37
+		Mentions:    mentions,
38
+		Extra: map[string]any{
39
+			"issue_number": issue.Number,
40
+			"issue_title":  issue.Title,
41
+		},
42
+	})
43
+}
44
+
45
+// emitCommentEventTx emits a comment-create event. Carries the
46
+// underlying issue number/title so the inbox can render a useful
47
+// summary without an extra join.
48
+func emitCommentEventTx(
49
+	ctx context.Context,
50
+	tx pgx.Tx,
51
+	kind string,
52
+	issue issuesdb.Issue,
53
+	commentID int64,
54
+	actorUserID int64,
55
+	repoIsPublic bool,
56
+	mentions []int64,
57
+) error {
58
+	return notif.EmitTx(ctx, tx, notif.Event{
59
+		ActorUserID: actorUserID,
60
+		Kind:        kind,
61
+		RepoID:      issue.RepoID,
62
+		SourceKind:  "issue",
63
+		SourceID:    issue.ID,
64
+		Public:      repoIsPublic,
65
+		Mentions:    mentions,
66
+		Extra: map[string]any{
67
+			"issue_number": issue.Number,
68
+			"issue_title":  issue.Title,
69
+			"comment_id":   strconv.FormatInt(commentID, 10),
70
+		},
71
+	})
72
+}
73
+
74
+// emitAssignmentEventTx fires when a user is added as an assignee.
75
+// One event per assignee — the fan-out worker treats each as a
76
+// separate "this user has been assigned" notification with reason
77
+// `assignment`.
78
+func emitAssignmentEventTx(
79
+	ctx context.Context,
80
+	tx pgx.Tx,
81
+	issue issuesdb.Issue,
82
+	actorUserID, assigneeUserID int64,
83
+	repoIsPublic bool,
84
+) error {
85
+	return notif.EmitTx(ctx, tx, notif.Event{
86
+		ActorUserID: actorUserID,
87
+		Kind:        "issue_assigned",
88
+		RepoID:      issue.RepoID,
89
+		SourceKind:  "issue",
90
+		SourceID:    issue.ID,
91
+		Public:      repoIsPublic,
92
+		// `assignee_user_id` lets the fan-out worker route the
93
+		// notification to the assignee even if they're not yet a
94
+		// thread subscriber.
95
+		Extra: map[string]any{
96
+			"issue_number":     issue.Number,
97
+			"issue_title":      issue.Title,
98
+			"assignee_user_id": assigneeUserID,
99
+		},
100
+	})
101
+}
102
+
103
+// repoVisibilityPublic resolves the repo's visibility (public/private)
104
+// using the pool. Best-effort: errors collapse to false (private),
105
+// which is the safe default for the public-feed flag on the event row.
106
+func repoVisibilityPublic(ctx context.Context, db pgxRow, repoID int64) (bool, error) {
107
+	var vis string
108
+	err := db.QueryRow(ctx,
109
+		`SELECT visibility::text FROM repos WHERE id = $1`,
110
+		repoID,
111
+	).Scan(&vis)
112
+	if err != nil {
113
+		if errors.Is(err, pgx.ErrNoRows) {
114
+			return false, nil
115
+		}
116
+		return false, err
117
+	}
118
+	return vis == "public", nil
119
+}
120
+
121
+// pgxRow is the minimum surface emit helpers need from a tx or pool.
122
+type pgxRow interface {
123
+	QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
124
+}
125
+
126
+// mentionUserIDs is the call-site ergonomic wrapper around
127
+// notif.ResolveMentionUserIDs. Lives here so issues.go's call site
128
+// stays a one-liner.
129
+func mentionUserIDs(ctx context.Context, pool *pgxpool.Pool, ms []markdown.Mention) []int64 {
130
+	return notif.ResolveMentionUserIDs(ctx, pool, ms)
131
+}
internal/notif/emit.goadded
@@ -0,0 +1,97 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package notif
4
+
5
+import (
6
+	"context"
7
+	"encoding/json"
8
+	"fmt"
9
+
10
+	"github.com/jackc/pgx/v5"
11
+	"github.com/jackc/pgx/v5/pgtype"
12
+	"github.com/jackc/pgx/v5/pgxpool"
13
+
14
+	socialdb "github.com/tenseleyFlow/shithub/internal/social/sqlc"
15
+	"github.com/tenseleyFlow/shithub/internal/worker"
16
+)
17
+
18
+// Event is the call-site shape for an emitter. The notif package
19
+// owns the schema-side details (which columns are NULL, payload
20
+// shape, JSON marshal); callers fill in the semantic fields.
21
+//
22
+// Mentions, when present, are surfaced to the fan-out worker via the
23
+// `mentions` JSON key in payload — kept here so callers don't have
24
+// to remember the wire convention.
25
+type Event struct {
26
+	ActorUserID int64  // 0 → system event (NULL in DB).
27
+	Kind        string // e.g. "issue_comment_created", "pr_review_requested".
28
+	RepoID      int64  // 0 → user-scoped event (NULL).
29
+	SourceKind  string // "issue", "pull", "repo", "user", …
30
+	SourceID    int64
31
+	Public      bool     // matches repo visibility (caller decides).
32
+	Mentions    []int64  // resolved user-ids to fan out @-mentions to.
33
+	Extra       map[string]any
34
+}
35
+
36
+// Emit inserts one row into domain_events and wakes the worker pool.
37
+// Use within a tx via EmitTx when the emit must atomically commit
38
+// with the source change (the typical case — issue.Create + the
39
+// matching event row land together or not at all).
40
+//
41
+// The emit side is intentionally thin: the routing matrix and
42
+// recipient computation live entirely in the fan-out worker so
43
+// callers don't grow a per-kind switch.
44
+func Emit(ctx context.Context, pool *pgxpool.Pool, ev Event) error {
45
+	if err := emitInto(ctx, pool, ev); err != nil {
46
+		return err
47
+	}
48
+	// Wake the worker pool. Best-effort — the cron-driven schedule
49
+	// catches up if NOTIFY is dropped (LISTEN reconnects on its own).
50
+	_ = worker.Notify(ctx, pool)
51
+	return nil
52
+}
53
+
54
+// EmitTx is the in-tx variant. Use it when the source mutation and
55
+// the event row must commit together (almost always). The NOTIFY is
56
+// deferred to commit by Postgres semantics; this matches the rest of
57
+// the worker's enqueue patterns.
58
+func EmitTx(ctx context.Context, tx pgx.Tx, ev Event) error {
59
+	return emitInto(ctx, tx, ev)
60
+}
61
+
62
+func emitInto(ctx context.Context, db socialdb.DBTX, ev Event) error {
63
+	payload, err := buildPayload(ev)
64
+	if err != nil {
65
+		return fmt.Errorf("notif emit: payload: %w", err)
66
+	}
67
+	if _, err := socialdb.New().InsertDomainEvent(ctx, db, socialdb.InsertDomainEventParams{
68
+		ActorUserID: pgInt8(ev.ActorUserID),
69
+		Kind:        ev.Kind,
70
+		RepoID:      pgInt8(ev.RepoID),
71
+		SourceKind:  ev.SourceKind,
72
+		SourceID:    ev.SourceID,
73
+		Public:      ev.Public,
74
+		Payload:     payload,
75
+	}); err != nil {
76
+		return fmt.Errorf("notif emit: insert: %w", err)
77
+	}
78
+	return nil
79
+}
80
+
81
+func buildPayload(ev Event) ([]byte, error) {
82
+	out := map[string]any{}
83
+	for k, v := range ev.Extra {
84
+		out[k] = v
85
+	}
86
+	if len(ev.Mentions) > 0 {
87
+		out["mentions"] = ev.Mentions
88
+	}
89
+	if len(out) == 0 {
90
+		return []byte("{}"), nil
91
+	}
92
+	return json.Marshal(out)
93
+}
94
+
95
+func pgInt8(v int64) pgtype.Int8 {
96
+	return pgtype.Int8{Int64: v, Valid: v != 0}
97
+}
internal/notif/mentions.goadded
@@ -0,0 +1,50 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package notif
4
+
5
+import (
6
+	"context"
7
+
8
+	"github.com/jackc/pgx/v5/pgxpool"
9
+
10
+	"github.com/tenseleyFlow/shithub/internal/markdown"
11
+	usersdb "github.com/tenseleyFlow/shithub/internal/users/sqlc"
12
+)
13
+
14
+// ResolveMentionUserIDs maps a slice of (possibly duplicate)
15
+// `markdown.Mention` records to deduplicated user IDs. Unknown,
16
+// suspended, and soft-deleted accounts are silently dropped — the
17
+// fan-out worker should never produce notifications for accounts
18
+// that wouldn't have read access. Result is stable-ordered by
19
+// first-occurrence in the input.
20
+//
21
+// The lookup is one-by-one rather than batched: typical mention
22
+// counts are in the single digits and the per-username GetUserByUsername
23
+// is index-backed. Batching becomes worth it only above ~50 distinct
24
+// mentions per body.
25
+func ResolveMentionUserIDs(ctx context.Context, pool *pgxpool.Pool, ms []markdown.Mention) []int64 {
26
+	if len(ms) == 0 {
27
+		return nil
28
+	}
29
+	q := usersdb.New()
30
+	seen := map[int64]struct{}{}
31
+	out := make([]int64, 0, len(ms))
32
+	for _, m := range ms {
33
+		u, err := q.GetUserByUsername(ctx, pool, m.Username)
34
+		if err != nil {
35
+			continue
36
+		}
37
+		if u.SuspendedAt.Valid || u.DeletedAt.Valid {
38
+			continue
39
+		}
40
+		if _, dup := seen[u.ID]; dup {
41
+			continue
42
+		}
43
+		seen[u.ID] = struct{}{}
44
+		out = append(out, u.ID)
45
+	}
46
+	if len(out) == 0 {
47
+		return nil
48
+	}
49
+	return out
50
+}
internal/pulls/review/request.gomodified
@@ -7,6 +7,8 @@ import (
7
 
7
 
8
 	"github.com/jackc/pgx/v5/pgtype"
8
 	"github.com/jackc/pgx/v5/pgtype"
9
 
9
 
10
+	issuesdb "github.com/tenseleyFlow/shithub/internal/issues/sqlc"
11
+	"github.com/tenseleyFlow/shithub/internal/notif"
10
 	pullsdb "github.com/tenseleyFlow/shithub/internal/pulls/sqlc"
12
 	pullsdb "github.com/tenseleyFlow/shithub/internal/pulls/sqlc"
11
 )
13
 )
12
 
14
 
@@ -42,10 +44,40 @@ func Request(ctx context.Context, deps Deps, p RequestParams) (pullsdb.PrReviewR
42
 			return pullsdb.PrReviewRequest{}, ErrReviewerAlreadyPending
44
 			return pullsdb.PrReviewRequest{}, ErrReviewerAlreadyPending
43
 		}
45
 		}
44
 	}
46
 	}
45
-	return q.CreatePRReviewRequest(ctx, deps.Pool, pullsdb.CreatePRReviewRequestParams{
47
+	row, err := q.CreatePRReviewRequest(ctx, deps.Pool, pullsdb.CreatePRReviewRequestParams{
46
 		PrIssueID:         p.PRIssueID,
48
 		PrIssueID:         p.PRIssueID,
47
 		RequestedUserID:   pgtype.Int8{Int64: p.RequestedUserID, Valid: p.RequestedUserID != 0},
49
 		RequestedUserID:   pgtype.Int8{Int64: p.RequestedUserID, Valid: p.RequestedUserID != 0},
48
 		RequestedTeamID:   pgtype.Int8{Valid: false},
50
 		RequestedTeamID:   pgtype.Int8{Valid: false},
49
 		RequestedByUserID: pgtype.Int8{Int64: p.RequestedByUserID, Valid: p.RequestedByUserID != 0},
51
 		RequestedByUserID: pgtype.Int8{Int64: p.RequestedByUserID, Valid: p.RequestedByUserID != 0},
50
 	})
52
 	})
53
+	if err != nil {
54
+		return row, err
55
+	}
56
+	// S29: emit a domain event so the fan-out worker can route a
57
+	// `review_requested` notification to the requested reviewer.
58
+	// Best-effort — review-request side already succeeded; an emit
59
+	// failure is logged but doesn't fail the request (the fan-out
60
+	// worker isn't strict about ordering for this kind).
61
+	issue, ierr := issuesdb.New().GetIssueByID(ctx, deps.Pool, p.PRIssueID)
62
+	if ierr == nil {
63
+		var public bool
64
+		_ = deps.Pool.QueryRow(ctx,
65
+			`SELECT visibility = 'public' FROM repos WHERE id = $1`,
66
+			issue.RepoID,
67
+		).Scan(&public)
68
+		_ = notif.Emit(ctx, deps.Pool, notif.Event{
69
+			ActorUserID: p.RequestedByUserID,
70
+			Kind:        "review_requested",
71
+			RepoID:      issue.RepoID,
72
+			SourceKind:  "issue",
73
+			SourceID:    issue.ID,
74
+			Public:      public,
75
+			Extra: map[string]any{
76
+				"reviewer_user_id": p.RequestedUserID,
77
+				"issue_number":     issue.Number,
78
+				"issue_title":      issue.Title,
79
+			},
80
+		})
81
+	}
82
+	return row, nil
51
 }
83
 }