tenseleyflow/shithub / 18b9a8a

Browse files

web/orgs/billing_webhook: session-scoped advisory lock serializes concurrent replays

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
18b9a8ac413f7e2e717e9e75cbdfe6bd5477cd19
Parents
5690fdf
Tree
18aa48d

1 changed file

StatusFile+-
M internal/web/handlers/orgs/billing_webhook.go 37 0
internal/web/handlers/orgs/billing_webhook.gomodified
@@ -43,6 +43,43 @@ func (h *Handlers) billingWebhook(w http.ResponseWriter, r *http.Request) {
4343
 		http.Error(w, "invalid stripe signature", http.StatusBadRequest)
4444
 		return
4545
 	}
46
+	// PRO08 A3: serialize concurrent deliveries of the same event_id
47
+	// with a session-scoped advisory lock on a dedicated pool conn.
48
+	// Without this, the dedup short-circuit at "!created && processed_at"
49
+	// races: two replays both observe processed_at=NULL, both apply,
50
+	// double-mutating state. The lock makes the apply path mutually
51
+	// exclusive per event_id. A racing replay returns 200 immediately —
52
+	// Stripe stops retrying THIS delivery; if this worker fails the
53
+	// apply, Stripe will resend later (different delivery, fresh race).
54
+	conn, err := h.d.Pool.Acquire(r.Context())
55
+	if err != nil {
56
+		h.d.Logger.ErrorContext(r.Context(), "org billing: acquire conn for webhook lock", "event_id", event.ID, "error", err)
57
+		http.Error(w, "could not acquire webhook lock", http.StatusInternalServerError)
58
+		return
59
+	}
60
+	defer conn.Release()
61
+	var acquired bool
62
+	if err := conn.QueryRow(r.Context(), "SELECT pg_try_advisory_lock(hashtext($1)::bigint)", event.ID).Scan(&acquired); err != nil {
63
+		h.d.Logger.ErrorContext(r.Context(), "org billing: try advisory lock", "event_id", event.ID, "error", err)
64
+		http.Error(w, "could not acquire webhook lock", http.StatusInternalServerError)
65
+		return
66
+	}
67
+	if !acquired {
68
+		// Another worker holds the lock — return 200 so Stripe stops
69
+		// retrying THIS delivery; the in-flight worker finishes the apply.
70
+		h.d.Logger.InfoContext(r.Context(), "org billing: webhook in flight elsewhere",
71
+			"event_id", event.ID, "event_type", event.Type)
72
+		w.WriteHeader(http.StatusOK)
73
+		_, _ = w.Write([]byte("ok (in flight)"))
74
+		return
75
+	}
76
+	defer func() {
77
+		// Use Background so unlock runs even if request ctx cancelled.
78
+		// Best-effort — txn cleanup eventually releases either way.
79
+		if _, err := conn.Exec(context.Background(), "SELECT pg_advisory_unlock(hashtext($1)::bigint)", event.ID); err != nil {
80
+			h.d.Logger.WarnContext(r.Context(), "org billing: advisory unlock", "event_id", event.ID, "error", err)
81
+		}
82
+	}()
4683
 	receipt, created, err := orgbilling.RecordWebhookEvent(r.Context(), orgbilling.Deps{Pool: h.d.Pool}, orgbilling.WebhookEvent{
4784
 		ProviderEventID: event.ID,
4885
 		EventType:       string(event.Type),