tenseleyflow/shithub / 7296411

Browse files

billing: record (subject_kind, subject_id) on webhook receipts + ListFailedWebhookEvents

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
7296411db439150f93c8b78e10adcfa084a5f90b
Parents
925e659
Tree
3c977b8

4 changed files

StatusFile+-
M internal/billing/billing.go 39 0
M internal/billing/queries/billing.sql 30 0
M internal/billing/sqlc/billing.sql.go 89 0
M internal/billing/sqlc/querier.go 12 0
internal/billing/billing.gomodified
@@ -293,6 +293,45 @@ func GetWebhookEventReceipt(ctx context.Context, deps Deps, providerEventID stri
293
 	return billingdb.New().GetWebhookEventReceipt(ctx, deps.Pool, providerEventID)
293
 	return billingdb.New().GetWebhookEventReceipt(ctx, deps.Pool, providerEventID)
294
 }
294
 }
295
 
295
 
296
+// SetWebhookEventSubjectForPrincipal records the resolved subject on
297
+// the receipt row. Called after a successful subject-resolution step
298
+// in the webhook apply path (before guard + state mutation) so the
299
+// audit trail survives even if the apply later fails. Migration 0075's
300
+// CHECK constraint enforces both-or-neither — the helper rejects a
301
+// zero principal.
302
+func SetWebhookEventSubjectForPrincipal(ctx context.Context, deps Deps, providerEventID string, p Principal) error {
303
+	if err := validateDeps(deps); err != nil {
304
+		return err
305
+	}
306
+	providerEventID = strings.TrimSpace(providerEventID)
307
+	if providerEventID == "" {
308
+		return ErrWebhookEventID
309
+	}
310
+	if err := p.Validate(); err != nil {
311
+		return err
312
+	}
313
+	return billingdb.New().SetWebhookEventSubject(ctx, deps.Pool, billingdb.SetWebhookEventSubjectParams{
314
+		SubjectKind:     billingdb.BillingSubjectKind(p.Kind),
315
+		SubjectID:       p.ID,
316
+		ProviderEventID: providerEventID,
317
+	})
318
+}
319
+
320
+// ListFailedWebhookEvents is the operator query for "events we
321
+// received but failed to process." Returns rows whose process_error
322
+// is non-empty OR that have any processing_attempts but no
323
+// processed_at (in-flight failures). Returned in descending received_at
324
+// order; limit caps the result set.
325
+func ListFailedWebhookEvents(ctx context.Context, deps Deps, limit int32) ([]billingdb.ListFailedWebhookEventsRow, error) {
326
+	if err := validateDeps(deps); err != nil {
327
+		return nil, err
328
+	}
329
+	if limit <= 0 {
330
+		limit = 50
331
+	}
332
+	return billingdb.New().ListFailedWebhookEvents(ctx, deps.Pool, limit)
333
+}
334
+
296
 func UpsertInvoice(ctx context.Context, deps Deps, snap InvoiceSnapshot) (billingdb.BillingInvoice, error) {
335
 func UpsertInvoice(ctx context.Context, deps Deps, snap InvoiceSnapshot) (billingdb.BillingInvoice, error) {
297
 	if err := validateDeps(deps); err != nil {
336
 	if err := validateDeps(deps); err != nil {
298
 		return billingdb.BillingInvoice{}, err
337
 		return billingdb.BillingInvoice{}, err
internal/billing/queries/billing.sqlmodified
@@ -445,6 +445,36 @@ UPDATE billing_webhook_events
445
    AND provider_event_id = $1
445
    AND provider_event_id = $1
446
 RETURNING *;
446
 RETURNING *;
447
 
447
 
448
+-- name: SetWebhookEventSubject :exec
449
+-- Records the resolved subject on the receipt row after a successful
450
+-- subject-resolution step. Called from the apply path before guard +
451
+-- state mutation so the receipt carries the audit trail even if the
452
+-- subsequent apply fails. Migration 0075's CHECK constraint enforces
453
+-- both-or-neither; callers must pass a non-zero subject.
454
+UPDATE billing_webhook_events
455
+   SET subject_kind = sqlc.arg(subject_kind)::billing_subject_kind,
456
+       subject_id   = sqlc.arg(subject_id)::bigint
457
+ WHERE provider = 'stripe'
458
+   AND provider_event_id = sqlc.arg(provider_event_id)::text;
459
+
460
+-- name: ListFailedWebhookEvents :many
461
+-- Operator query for "events we received but failed to process."
462
+-- A row is "failed" when it has a non-empty process_error OR when
463
+-- it has never been processed (processed_at NULL) and has at least
464
+-- one processing attempt. Rows that are merely new and untouched
465
+-- (attempts=0, processed_at NULL, no error) are excluded.
466
+SELECT id, provider, provider_event_id, event_type, api_version,
467
+       received_at, processed_at, processing_attempts, process_error,
468
+       subject_kind, subject_id
469
+  FROM billing_webhook_events
470
+ WHERE provider = 'stripe'
471
+   AND (
472
+        process_error <> ''
473
+        OR (processed_at IS NULL AND processing_attempts > 0)
474
+       )
475
+ ORDER BY received_at DESC
476
+ LIMIT $1;
477
+
448
 -- ─── user_billing_states (PRO03) ──────────────────────────────────
478
 -- ─── user_billing_states (PRO03) ──────────────────────────────────
449
 
479
 
450
 -- name: GetUserBillingState :one
480
 -- name: GetUserBillingState :one
internal/billing/sqlc/billing.sql.gomodified
@@ -857,6 +857,71 @@ func (q *Queries) GetWebhookEventReceipt(ctx context.Context, db DBTX, providerE
857
 	return i, err
857
 	return i, err
858
 }
858
 }
859
 
859
 
860
+const listFailedWebhookEvents = `-- name: ListFailedWebhookEvents :many
861
+SELECT id, provider, provider_event_id, event_type, api_version,
862
+       received_at, processed_at, processing_attempts, process_error,
863
+       subject_kind, subject_id
864
+  FROM billing_webhook_events
865
+ WHERE provider = 'stripe'
866
+   AND (
867
+        process_error <> ''
868
+        OR (processed_at IS NULL AND processing_attempts > 0)
869
+       )
870
+ ORDER BY received_at DESC
871
+ LIMIT $1
872
+`
873
+
874
+type ListFailedWebhookEventsRow struct {
875
+	ID                 int64
876
+	Provider           BillingProvider
877
+	ProviderEventID    string
878
+	EventType          string
879
+	ApiVersion         string
880
+	ReceivedAt         pgtype.Timestamptz
881
+	ProcessedAt        pgtype.Timestamptz
882
+	ProcessingAttempts int32
883
+	ProcessError       string
884
+	SubjectKind        NullBillingSubjectKind
885
+	SubjectID          pgtype.Int8
886
+}
887
+
888
+// Operator query for "events we received but failed to process."
889
+// A row is "failed" when it has a non-empty process_error OR when
890
+// it has never been processed (processed_at NULL) and has at least
891
+// one processing attempt. Rows that are merely new and untouched
892
+// (attempts=0, processed_at NULL, no error) are excluded.
893
+func (q *Queries) ListFailedWebhookEvents(ctx context.Context, db DBTX, limit int32) ([]ListFailedWebhookEventsRow, error) {
894
+	rows, err := db.Query(ctx, listFailedWebhookEvents, limit)
895
+	if err != nil {
896
+		return nil, err
897
+	}
898
+	defer rows.Close()
899
+	items := []ListFailedWebhookEventsRow{}
900
+	for rows.Next() {
901
+		var i ListFailedWebhookEventsRow
902
+		if err := rows.Scan(
903
+			&i.ID,
904
+			&i.Provider,
905
+			&i.ProviderEventID,
906
+			&i.EventType,
907
+			&i.ApiVersion,
908
+			&i.ReceivedAt,
909
+			&i.ProcessedAt,
910
+			&i.ProcessingAttempts,
911
+			&i.ProcessError,
912
+			&i.SubjectKind,
913
+			&i.SubjectID,
914
+		); err != nil {
915
+			return nil, err
916
+		}
917
+		items = append(items, i)
918
+	}
919
+	if err := rows.Err(); err != nil {
920
+		return nil, err
921
+	}
922
+	return items, nil
923
+}
924
+
860
 const listInvoicesForOrg = `-- name: ListInvoicesForOrg :many
925
 const listInvoicesForOrg = `-- name: ListInvoicesForOrg :many
861
 SELECT id, org_id, provider, stripe_invoice_id, stripe_customer_id, stripe_subscription_id, status, number, currency, amount_due_cents, amount_paid_cents, amount_remaining_cents, hosted_invoice_url, invoice_pdf_url, period_start, period_end, due_at, paid_at, voided_at, created_at, updated_at, subject_kind, subject_id FROM billing_invoices
926
 SELECT id, org_id, provider, stripe_invoice_id, stripe_customer_id, stripe_subscription_id, status, number, currency, amount_due_cents, amount_paid_cents, amount_remaining_cents, hosted_invoice_url, invoice_pdf_url, period_start, period_end, due_at, paid_at, voided_at, created_at, updated_at, subject_kind, subject_id FROM billing_invoices
862
 WHERE subject_kind = 'org' AND subject_id = $1
927
 WHERE subject_kind = 'org' AND subject_id = $1
@@ -1602,6 +1667,30 @@ func (q *Queries) SetUserStripeCustomer(ctx context.Context, db DBTX, arg SetUse
1602
 	return i, err
1667
 	return i, err
1603
 }
1668
 }
1604
 
1669
 
1670
+const setWebhookEventSubject = `-- name: SetWebhookEventSubject :exec
1671
+UPDATE billing_webhook_events
1672
+   SET subject_kind = $1::billing_subject_kind,
1673
+       subject_id   = $2::bigint
1674
+ WHERE provider = 'stripe'
1675
+   AND provider_event_id = $3::text
1676
+`
1677
+
1678
+type SetWebhookEventSubjectParams struct {
1679
+	SubjectKind     BillingSubjectKind
1680
+	SubjectID       int64
1681
+	ProviderEventID string
1682
+}
1683
+
1684
+// Records the resolved subject on the receipt row after a successful
1685
+// subject-resolution step. Called from the apply path before guard +
1686
+// state mutation so the receipt carries the audit trail even if the
1687
+// subsequent apply fails. Migration 0075's CHECK constraint enforces
1688
+// both-or-neither; callers must pass a non-zero subject.
1689
+func (q *Queries) SetWebhookEventSubject(ctx context.Context, db DBTX, arg SetWebhookEventSubjectParams) error {
1690
+	_, err := db.Exec(ctx, setWebhookEventSubject, arg.SubjectKind, arg.SubjectID, arg.ProviderEventID)
1691
+	return err
1692
+}
1693
+
1605
 const upsertInvoice = `-- name: UpsertInvoice :one
1694
 const upsertInvoice = `-- name: UpsertInvoice :one
1606
 
1695
 
1607
 INSERT INTO billing_invoices (
1696
 INSERT INTO billing_invoices (
internal/billing/sqlc/querier.gomodified
@@ -34,6 +34,12 @@ type Querier interface {
34
 	GetUserBillingStateByStripeCustomer(ctx context.Context, db DBTX, stripeCustomerID pgtype.Text) (UserBillingState, error)
34
 	GetUserBillingStateByStripeCustomer(ctx context.Context, db DBTX, stripeCustomerID pgtype.Text) (UserBillingState, error)
35
 	GetUserBillingStateByStripeSubscription(ctx context.Context, db DBTX, stripeSubscriptionID pgtype.Text) (UserBillingState, error)
35
 	GetUserBillingStateByStripeSubscription(ctx context.Context, db DBTX, stripeSubscriptionID pgtype.Text) (UserBillingState, error)
36
 	GetWebhookEventReceipt(ctx context.Context, db DBTX, providerEventID string) (BillingWebhookEvent, error)
36
 	GetWebhookEventReceipt(ctx context.Context, db DBTX, providerEventID string) (BillingWebhookEvent, error)
37
+	// Operator query for "events we received but failed to process."
38
+	// A row is "failed" when it has a non-empty process_error OR when
39
+	// it has never been processed (processed_at NULL) and has at least
40
+	// one processing attempt. Rows that are merely new and untouched
41
+	// (attempts=0, processed_at NULL, no error) are excluded.
42
+	ListFailedWebhookEvents(ctx context.Context, db DBTX, limit int32) ([]ListFailedWebhookEventsRow, error)
37
 	// PRO03: filters on the polymorphic subject columns so the index
43
 	// PRO03: filters on the polymorphic subject columns so the index
38
 	// billing_invoices_subject_created_idx services this query. The
44
 	// billing_invoices_subject_created_idx services this query. The
39
 	// legacy `org_id` column is kept populated by UpsertInvoice for the
45
 	// legacy `org_id` column is kept populated by UpsertInvoice for the
@@ -55,6 +61,12 @@ type Querier interface {
55
 	MarkWebhookEventProcessed(ctx context.Context, db DBTX, providerEventID string) (BillingWebhookEvent, error)
61
 	MarkWebhookEventProcessed(ctx context.Context, db DBTX, providerEventID string) (BillingWebhookEvent, error)
56
 	SetStripeCustomer(ctx context.Context, db DBTX, arg SetStripeCustomerParams) (OrgBillingState, error)
62
 	SetStripeCustomer(ctx context.Context, db DBTX, arg SetStripeCustomerParams) (OrgBillingState, error)
57
 	SetUserStripeCustomer(ctx context.Context, db DBTX, arg SetUserStripeCustomerParams) (UserBillingState, error)
63
 	SetUserStripeCustomer(ctx context.Context, db DBTX, arg SetUserStripeCustomerParams) (UserBillingState, error)
64
+	// Records the resolved subject on the receipt row after a successful
65
+	// subject-resolution step. Called from the apply path before guard +
66
+	// state mutation so the receipt carries the audit trail even if the
67
+	// subsequent apply fails. Migration 0075's CHECK constraint enforces
68
+	// both-or-neither; callers must pass a non-zero subject.
69
+	SetWebhookEventSubject(ctx context.Context, db DBTX, arg SetWebhookEventSubjectParams) error
58
 	// ─── billing_invoices ──────────────────────────────────────────────
70
 	// ─── billing_invoices ──────────────────────────────────────────────
59
 	// PRO03: writes both legacy `org_id` and polymorphic
71
 	// PRO03: writes both legacy `org_id` and polymorphic
60
 	// `(subject_kind, subject_id)`. Callers continue to bind org_id only;
72
 	// `(subject_kind, subject_id)`. Callers continue to bind org_id only;