tenseleyflow/shithub / cabf7a3

Browse files

S33: fanout — drain domain_events into per-subscriber deliveries

Authored by espadonne
SHA
cabf7a3101bda4be833ffbc959b35590751ee33c
Parents
ca72eb1
Tree
da16ee5

1 changed file

StatusFile+-
A internal/webhook/fanout.go 225 0
internal/webhook/fanout.goadded
@@ -0,0 +1,225 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package webhook
4
+
5
+import (
6
+	"context"
7
+	"crypto/sha256"
8
+	"encoding/hex"
9
+	"encoding/json"
10
+	"errors"
11
+	"fmt"
12
+	"log/slog"
13
+	"strconv"
14
+	"time"
15
+
16
+	"github.com/jackc/pgx/v5"
17
+	"github.com/jackc/pgx/v5/pgtype"
18
+	"github.com/jackc/pgx/v5/pgxpool"
19
+
20
+	"github.com/tenseleyFlow/shithub/internal/worker"
21
+	webhookdb "github.com/tenseleyFlow/shithub/internal/webhook/sqlc"
22
+)
23
+
24
+// FanoutConsumer is the consumer name in domain_events_processed.
25
+// Distinct from notif's so cursors don't collide.
26
+const FanoutConsumer = "webhook_fanout"
27
+
28
+// FanoutBatch caps how many events a single tick drains.
29
+const FanoutBatch = 200
30
+
31
+// FanoutDeps wires the fan-out against the runtime.
32
+type FanoutDeps struct {
33
+	Pool   *pgxpool.Pool
34
+	Logger *slog.Logger
35
+}
36
+
37
+// FanoutOnce drains domain_events past the persisted cursor, finds
38
+// matching webhooks, and inserts a webhook_deliveries row + enqueues a
39
+// `webhook:deliver` job per match. Returns the number of events
40
+// processed; the caller decides to re-enqueue when full.
41
+//
42
+// The consumer cursor is advanced after each event regardless of
43
+// whether the event matched any subscribers — an event without
44
+// subscribers is "processed" the same as one with five.
45
+func FanoutOnce(ctx context.Context, deps FanoutDeps) (int, error) {
46
+	if deps.Pool == nil {
47
+		return 0, errors.New("webhook fanout: nil Pool")
48
+	}
49
+	q := webhookdb.New()
50
+	cur, err := q.GetWebhookCursor(ctx, deps.Pool, FanoutConsumer)
51
+	last := int64(0)
52
+	if err == nil {
53
+		last = cur.LastEventID
54
+	} else if !errors.Is(err, pgx.ErrNoRows) {
55
+		return 0, fmt.Errorf("webhook fanout: load cursor: %w", err)
56
+	}
57
+	rows, err := q.ListUnprocessedDomainEvents(ctx, deps.Pool, webhookdb.ListUnprocessedDomainEventsParams{
58
+		ID:    last,
59
+		Limit: FanoutBatch,
60
+	})
61
+	if err != nil {
62
+		return 0, fmt.Errorf("webhook fanout: list events: %w", err)
63
+	}
64
+	processed := 0
65
+	for _, ev := range rows {
66
+		if err := dispatchEvent(ctx, deps, q, ev); err != nil {
67
+			if deps.Logger != nil {
68
+				deps.Logger.WarnContext(ctx, "webhook fanout: dispatch failed",
69
+					"event_id", ev.ID, "kind", ev.Kind, "error", err)
70
+			}
71
+			break
72
+		}
73
+		last = ev.ID
74
+		processed++
75
+	}
76
+	if processed > 0 {
77
+		if err := q.SetWebhookCursor(ctx, deps.Pool, webhookdb.SetWebhookCursorParams{
78
+			Consumer: FanoutConsumer, LastEventID: last,
79
+		}); err != nil {
80
+			return processed, fmt.Errorf("webhook fanout: persist cursor: %w", err)
81
+		}
82
+	}
83
+	return processed, nil
84
+}
85
+
86
+// dispatchEvent finds matching webhooks for one domain event, creates
87
+// delivery rows, and enqueues per-row deliver jobs.
88
+func dispatchEvent(ctx context.Context, deps FanoutDeps, q *webhookdb.Queries, ev webhookdb.DomainEvent) error {
89
+	// Resolve the owner pool. Repo events may match repo-level
90
+	// webhooks AND the org-level webhooks (when the repo is org-owned).
91
+	// User-owned repos have no second bucket.
92
+	subs := []webhookdb.Webhook{}
93
+	if ev.RepoID.Valid {
94
+		repoSubs, err := q.ListActiveWebhooksForOwner(ctx, deps.Pool, webhookdb.ListActiveWebhooksForOwnerParams{
95
+			OwnerKind: webhookdb.WebhookOwnerKindRepo,
96
+			OwnerID:   ev.RepoID.Int64,
97
+		})
98
+		if err != nil {
99
+			return fmt.Errorf("list repo webhooks: %w", err)
100
+		}
101
+		subs = append(subs, repoSubs...)
102
+		// Org-level: look up the repo's owner_org_id; only org-owned
103
+		// repos pick up org-level webhooks.
104
+		owner, err := q.GetRepoOwnerKindForFanout(ctx, deps.Pool, ev.RepoID.Int64)
105
+		if err == nil && owner.OwnerOrgID.Valid {
106
+			orgSubs, err := q.ListActiveWebhooksForOwner(ctx, deps.Pool, webhookdb.ListActiveWebhooksForOwnerParams{
107
+				OwnerKind: webhookdb.WebhookOwnerKindOrg,
108
+				OwnerID:   owner.OwnerOrgID.Int64,
109
+			})
110
+			if err != nil {
111
+				return fmt.Errorf("list org webhooks: %w", err)
112
+			}
113
+			subs = append(subs, orgSubs...)
114
+		}
115
+	}
116
+	if len(subs) == 0 {
117
+		return nil
118
+	}
119
+	for _, w := range subs {
120
+		if !subscribesToKind(w.Events, ev.Kind) {
121
+			continue
122
+		}
123
+		body, headersJSON, err := buildPayload(ev, w)
124
+		if err != nil {
125
+			if deps.Logger != nil {
126
+				deps.Logger.WarnContext(ctx, "webhook fanout: build payload",
127
+					"event_id", ev.ID, "webhook_id", w.ID, "error", err)
128
+			}
129
+			continue
130
+		}
131
+		idem := idempotencyKey(w.ID, ev.ID, body)
132
+		row, err := q.CreateDelivery(ctx, deps.Pool, webhookdb.CreateDeliveryParams{
133
+			WebhookID:       w.ID,
134
+			EventKind:       ev.Kind,
135
+			EventID:         pgtype.Int8{Int64: ev.ID, Valid: true},
136
+			Payload:         body,
137
+			RequestHeaders:  headersJSON,
138
+			RequestBody:     body,
139
+			Attempt:         1,
140
+			MaxAttempts:     8,
141
+			NextRetryAt:     pgtype.Timestamptz{Time: time.Now(), Valid: true},
142
+			Status:          webhookdb.WebhookDeliveryStatusPending,
143
+			IdempotencyKey:  idem,
144
+			RedeliverOf:     pgtype.Int8{Valid: false},
145
+		})
146
+		if err != nil {
147
+			return fmt.Errorf("create delivery: %w", err)
148
+		}
149
+		if _, err := worker.Enqueue(ctx, deps.Pool, KindWebhookDeliver,
150
+			deliverPayload{DeliveryID: row.ID}, worker.EnqueueOptions{}); err != nil && deps.Logger != nil {
151
+			deps.Logger.WarnContext(ctx, "webhook fanout: enqueue deliver",
152
+				"delivery_id", row.ID, "error", err)
153
+		}
154
+	}
155
+	return nil
156
+}
157
+
158
+// subscribesToKind returns true when the webhook's `events` filter
159
+// either is empty (= "all") or contains the event kind.
160
+func subscribesToKind(events []string, kind string) bool {
161
+	if len(events) == 0 {
162
+		return true
163
+	}
164
+	for _, e := range events {
165
+		if e == kind || e == "*" {
166
+			return true
167
+		}
168
+	}
169
+	return false
170
+}
171
+
172
+// buildPayload assembles the JSON body for delivery. We pass the
173
+// domain_events row through verbatim under `event` plus a tiny
174
+// envelope; subscribers care about both the kind and the payload.
175
+func buildPayload(ev webhookdb.DomainEvent, w webhookdb.Webhook) (body []byte, headers []byte, err error) {
176
+	envelope := map[string]any{
177
+		"event_id":   ev.ID,
178
+		"event_kind": ev.Kind,
179
+		"created_at": ev.CreatedAt.Time.UTC().Format(time.RFC3339Nano),
180
+		"webhook_id": w.ID,
181
+		"payload":    json.RawMessage(ev.Payload),
182
+	}
183
+	if ev.RepoID.Valid {
184
+		envelope["repo_id"] = ev.RepoID.Int64
185
+	}
186
+	if ev.ActorUserID.Valid {
187
+		envelope["actor_user_id"] = ev.ActorUserID.Int64
188
+	}
189
+	body, err = json.Marshal(envelope)
190
+	if err != nil {
191
+		return nil, nil, err
192
+	}
193
+	hdrs := map[string]any{
194
+		"User-Agent":      "shithub-Hookshot",
195
+		"Content-Type":    contentTypeHeader(w.ContentType),
196
+		"X-Shithub-Event": ev.Kind,
197
+		"X-Shithub-Hook-Installation-Target-Type": string(w.OwnerKind),
198
+		"X-Shithub-Hook-Installation-Target-Id":   strconv.FormatInt(w.OwnerID, 10),
199
+	}
200
+	headers, err = json.Marshal(hdrs)
201
+	if err != nil {
202
+		return nil, nil, err
203
+	}
204
+	return body, headers, nil
205
+}
206
+
207
+// contentTypeHeader maps the enum to its on-the-wire MIME.
208
+func contentTypeHeader(ct webhookdb.WebhookContentType) string {
209
+	switch ct {
210
+	case webhookdb.WebhookContentTypeForm:
211
+		return "application/x-www-form-urlencoded"
212
+	default:
213
+		return "application/json"
214
+	}
215
+}
216
+
217
+// idempotencyKey is sha256(payload || webhook_id || event_id). Stable
218
+// across retries so subscribers can dedupe.
219
+func idempotencyKey(webhookID, eventID int64, body []byte) string {
220
+	h := sha256.New()
221
+	h.Write(body)
222
+	h.Write([]byte(strconv.FormatInt(webhookID, 10)))
223
+	h.Write([]byte(strconv.FormatInt(eventID, 10)))
224
+	return hex.EncodeToString(h.Sum(nil))
225
+}