Go · 7125 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 package webhook
4
5 import (
6 "context"
7 "crypto/sha256"
8 "encoding/hex"
9 "encoding/json"
10 "errors"
11 "fmt"
12 "log/slog"
13 "strconv"
14 "time"
15
16 "github.com/jackc/pgx/v5"
17 "github.com/jackc/pgx/v5/pgtype"
18 "github.com/jackc/pgx/v5/pgxpool"
19
20 webhookdb "github.com/tenseleyFlow/shithub/internal/webhook/sqlc"
21 "github.com/tenseleyFlow/shithub/internal/worker"
22 )
23
24 // FanoutConsumer is the consumer name in domain_events_processed.
25 // Distinct from notif's so cursors don't collide.
26 const FanoutConsumer = "webhook_fanout"
27
28 // FanoutBatch caps how many events a single tick drains.
29 const FanoutBatch = 200
30
31 // FanoutDeps wires the fan-out against the runtime.
32 type FanoutDeps struct {
33 Pool *pgxpool.Pool
34 Logger *slog.Logger
35 }
36
37 // FanoutOnce drains domain_events past the persisted cursor, finds
38 // matching webhooks, and inserts a webhook_deliveries row + enqueues a
39 // `webhook:deliver` job per match. Returns the number of events
40 // processed; the caller decides to re-enqueue when full.
41 //
42 // The consumer cursor is advanced after each event regardless of
43 // whether the event matched any subscribers — an event without
44 // subscribers is "processed" the same as one with five.
45 func FanoutOnce(ctx context.Context, deps FanoutDeps) (int, error) {
46 if deps.Pool == nil {
47 return 0, errors.New("webhook fanout: nil Pool")
48 }
49 q := webhookdb.New()
50 cur, err := q.GetWebhookCursor(ctx, deps.Pool, FanoutConsumer)
51 last := int64(0)
52 if err == nil {
53 last = cur.LastEventID
54 } else if !errors.Is(err, pgx.ErrNoRows) {
55 return 0, fmt.Errorf("webhook fanout: load cursor: %w", err)
56 }
57 rows, err := q.ListUnprocessedDomainEvents(ctx, deps.Pool, webhookdb.ListUnprocessedDomainEventsParams{
58 ID: last,
59 Limit: FanoutBatch,
60 })
61 if err != nil {
62 return 0, fmt.Errorf("webhook fanout: list events: %w", err)
63 }
64 processed := 0
65 for _, ev := range rows {
66 if err := dispatchEvent(ctx, deps, q, ev); err != nil {
67 if deps.Logger != nil {
68 deps.Logger.WarnContext(ctx, "webhook fanout: dispatch failed",
69 "event_id", ev.ID, "kind", ev.Kind, "error", err)
70 }
71 break
72 }
73 last = ev.ID
74 processed++
75 }
76 if processed > 0 {
77 if err := q.SetWebhookCursor(ctx, deps.Pool, webhookdb.SetWebhookCursorParams{
78 Consumer: FanoutConsumer, LastEventID: last,
79 }); err != nil {
80 return processed, fmt.Errorf("webhook fanout: persist cursor: %w", err)
81 }
82 }
83 return processed, nil
84 }
85
86 // dispatchEvent finds matching webhooks for one domain event, creates
87 // delivery rows, and enqueues per-row deliver jobs.
88 func dispatchEvent(ctx context.Context, deps FanoutDeps, q *webhookdb.Queries, ev webhookdb.DomainEvent) error {
89 // Resolve the owner pool. Repo events may match repo-level
90 // webhooks AND the org-level webhooks (when the repo is org-owned).
91 // User-owned repos have no second bucket.
92 subs := []webhookdb.Webhook{}
93 if ev.RepoID.Valid {
94 repoSubs, err := q.ListActiveWebhooksForOwner(ctx, deps.Pool, webhookdb.ListActiveWebhooksForOwnerParams{
95 OwnerKind: webhookdb.WebhookOwnerKindRepo,
96 OwnerID: ev.RepoID.Int64,
97 })
98 if err != nil {
99 return fmt.Errorf("list repo webhooks: %w", err)
100 }
101 subs = append(subs, repoSubs...)
102 // Org-level: look up the repo's owner_org_id; only org-owned
103 // repos pick up org-level webhooks.
104 owner, err := q.GetRepoOwnerKindForFanout(ctx, deps.Pool, ev.RepoID.Int64)
105 if err == nil && owner.OwnerOrgID.Valid {
106 orgSubs, err := q.ListActiveWebhooksForOwner(ctx, deps.Pool, webhookdb.ListActiveWebhooksForOwnerParams{
107 OwnerKind: webhookdb.WebhookOwnerKindOrg,
108 OwnerID: owner.OwnerOrgID.Int64,
109 })
110 if err != nil {
111 return fmt.Errorf("list org webhooks: %w", err)
112 }
113 subs = append(subs, orgSubs...)
114 }
115 }
116 if len(subs) == 0 {
117 return nil
118 }
119 for _, w := range subs {
120 if !subscribesToKind(w.Events, ev.Kind) {
121 continue
122 }
123 body, headersJSON, err := buildPayload(ev, w)
124 if err != nil {
125 if deps.Logger != nil {
126 deps.Logger.WarnContext(ctx, "webhook fanout: build payload",
127 "event_id", ev.ID, "webhook_id", w.ID, "error", err)
128 }
129 continue
130 }
131 idem := idempotencyKey(w.ID, ev.ID, body)
132 row, err := q.CreateDelivery(ctx, deps.Pool, webhookdb.CreateDeliveryParams{
133 WebhookID: w.ID,
134 EventKind: ev.Kind,
135 EventID: pgtype.Int8{Int64: ev.ID, Valid: true},
136 Payload: body,
137 RequestHeaders: headersJSON,
138 RequestBody: body,
139 Attempt: 1,
140 MaxAttempts: 8,
141 NextRetryAt: pgtype.Timestamptz{Time: time.Now(), Valid: true},
142 Status: webhookdb.WebhookDeliveryStatusPending,
143 IdempotencyKey: idem,
144 RedeliverOf: pgtype.Int8{Valid: false},
145 })
146 if err != nil {
147 return fmt.Errorf("create delivery: %w", err)
148 }
149 if _, err := worker.Enqueue(ctx, deps.Pool, KindWebhookDeliver,
150 deliverPayload{DeliveryID: row.ID}, worker.EnqueueOptions{}); err != nil && deps.Logger != nil {
151 deps.Logger.WarnContext(ctx, "webhook fanout: enqueue deliver",
152 "delivery_id", row.ID, "error", err)
153 }
154 }
155 return nil
156 }
157
158 // subscribesToKind returns true when the webhook's `events` filter
159 // either is empty (= "all") or contains the event kind.
160 func subscribesToKind(events []string, kind string) bool {
161 if len(events) == 0 {
162 return true
163 }
164 for _, e := range events {
165 if e == kind || e == "*" {
166 return true
167 }
168 }
169 return false
170 }
171
172 // buildPayload assembles the JSON body for delivery. We pass the
173 // domain_events row through verbatim under `event` plus a tiny
174 // envelope; subscribers care about both the kind and the payload.
175 func buildPayload(ev webhookdb.DomainEvent, w webhookdb.Webhook) (body []byte, headers []byte, err error) {
176 envelope := map[string]any{
177 "event_id": ev.ID,
178 "event_kind": ev.Kind,
179 "created_at": ev.CreatedAt.Time.UTC().Format(time.RFC3339Nano),
180 "webhook_id": w.ID,
181 "payload": json.RawMessage(ev.Payload),
182 }
183 if ev.RepoID.Valid {
184 envelope["repo_id"] = ev.RepoID.Int64
185 }
186 if ev.ActorUserID.Valid {
187 envelope["actor_user_id"] = ev.ActorUserID.Int64
188 }
189 body, err = json.Marshal(envelope)
190 if err != nil {
191 return nil, nil, err
192 }
193 hdrs := map[string]any{
194 "User-Agent": "shithub-Hookshot",
195 "Content-Type": contentTypeHeader(w.ContentType),
196 "X-Shithub-Event": ev.Kind,
197 "X-Shithub-Hook-Installation-Target-Type": string(w.OwnerKind),
198 "X-Shithub-Hook-Installation-Target-Id": strconv.FormatInt(w.OwnerID, 10),
199 }
200 headers, err = json.Marshal(hdrs)
201 if err != nil {
202 return nil, nil, err
203 }
204 return body, headers, nil
205 }
206
207 // contentTypeHeader maps the enum to its on-the-wire MIME.
208 func contentTypeHeader(ct webhookdb.WebhookContentType) string {
209 switch ct {
210 case webhookdb.WebhookContentTypeForm:
211 return "application/x-www-form-urlencoded"
212 default:
213 return "application/json"
214 }
215 }
216
217 // idempotencyKey is sha256(payload || webhook_id || event_id). Stable
218 // across retries so subscribers can dedupe.
219 func idempotencyKey(webhookID, eventID int64, body []byte) string {
220 h := sha256.New()
221 h.Write(body)
222 h.Write([]byte(strconv.FormatInt(webhookID, 10)))
223 h.Write([]byte(strconv.FormatInt(eventID, 10)))
224 return hex.EncodeToString(h.Sum(nil))
225 }
226