@@ -140,8 +140,14 @@ func (h *Handlers) applyStripeCheckoutCompleted(ctx context.Context, event strip |
| 140 | return err | 140 | return err |
| 141 | } | 141 | } |
| 142 | h.recordWebhookSubject(ctx, event.ID, principal) | 142 | h.recordWebhookSubject(ctx, event.ID, principal) |
| 143 | - _, err = orgbilling.SetStripeCustomerForPrincipal(ctx, orgbilling.Deps{Pool: h.d.Pool}, principal, customerID) | 143 | + if stale, err := h.checkStaleEvent(ctx, event, principal); err != nil || stale { |
| 144 | - return err | 144 | + return err |
| | 145 | + } |
| | 146 | + if _, err := orgbilling.SetStripeCustomerForPrincipal(ctx, orgbilling.Deps{Pool: h.d.Pool}, principal, customerID); err != nil { |
| | 147 | + return err |
| | 148 | + } |
| | 149 | + h.touchLastEventAt(ctx, event, principal) |
| | 150 | + return nil |
| 145 | } | 151 | } |
| 146 | | 152 | |
| 147 | // resolvePrincipalFromCheckout walks the resolution chain for a | 153 | // resolvePrincipalFromCheckout walks the resolution chain for a |
@@ -186,6 +192,9 @@ func (h *Handlers) applyStripeSubscriptionEvent(ctx context.Context, event strip |
| 186 | return err | 192 | return err |
| 187 | } | 193 | } |
| 188 | h.recordWebhookSubject(ctx, event.ID, principal) | 194 | h.recordWebhookSubject(ctx, event.ID, principal) |
| | 195 | + if stale, err := h.checkStaleEvent(ctx, event, principal); err != nil || stale { |
| | 196 | + return err |
| | 197 | + } |
| 189 | // PRO08 D3: if the principal already has a different Stripe | 198 | // PRO08 D3: if the principal already has a different Stripe |
| 190 | // subscription on file, refuse to overwrite it. A second sub | 199 | // subscription on file, refuse to overwrite it. A second sub |
| 191 | // for the same customer (e.g., an operator created one manually | 200 | // for the same customer (e.g., an operator created one manually |
@@ -218,12 +227,15 @@ func (h *Handlers) applyStripeSubscriptionEvent(ctx context.Context, event strip |
| 218 | return err | 227 | return err |
| 219 | } | 228 | } |
| 220 | if status == orgbilling.SubscriptionStatusCanceled || string(event.Type) == "customer.subscription.deleted" { | 229 | if status == orgbilling.SubscriptionStatusCanceled || string(event.Type) == "customer.subscription.deleted" { |
| 221 | - _, err := orgbilling.MarkCanceledForPrincipal(ctx, orgbilling.Deps{Pool: h.d.Pool}, principal, event.ID) | 230 | + if _, err := orgbilling.MarkCanceledForPrincipal(ctx, orgbilling.Deps{Pool: h.d.Pool}, principal, event.ID); err != nil { |
| 222 | - return err | 231 | + return err |
| | 232 | + } |
| | 233 | + h.touchLastEventAt(ctx, event, principal) |
| | 234 | + return nil |
| 223 | } | 235 | } |
| 224 | itemID := stripeSubscriptionItemID(sub.Items) | 236 | itemID := stripeSubscriptionItemID(sub.Items) |
| 225 | periodStart, periodEnd := stripeSubscriptionPeriod(sub.Items) | 237 | periodStart, periodEnd := stripeSubscriptionPeriod(sub.Items) |
| 226 | - _, err = orgbilling.ApplySubscriptionSnapshotForPrincipal(ctx, orgbilling.Deps{Pool: h.d.Pool}, orgbilling.PrincipalSubscriptionSnapshot{ | 238 | + if _, err := orgbilling.ApplySubscriptionSnapshotForPrincipal(ctx, orgbilling.Deps{Pool: h.d.Pool}, orgbilling.PrincipalSubscriptionSnapshot{ |
| 227 | Principal: principal, | 239 | Principal: principal, |
| 228 | Status: status, | 240 | Status: status, |
| 229 | StripeSubscriptionID: strings.TrimSpace(sub.ID), | 241 | StripeSubscriptionID: strings.TrimSpace(sub.ID), |
@@ -234,8 +246,11 @@ func (h *Handlers) applyStripeSubscriptionEvent(ctx context.Context, event strip |
| 234 | TrialEnd: unixTime(sub.TrialEnd), | 246 | TrialEnd: unixTime(sub.TrialEnd), |
| 235 | CanceledAt: unixTime(sub.CanceledAt), | 247 | CanceledAt: unixTime(sub.CanceledAt), |
| 236 | LastWebhookEventID: event.ID, | 248 | LastWebhookEventID: event.ID, |
| 237 | - }) | 249 | + }); err != nil { |
| 238 | - return err | 250 | + return err |
| | 251 | + } |
| | 252 | + h.touchLastEventAt(ctx, event, principal) |
| | 253 | + return nil |
| 239 | } | 254 | } |
| 240 | | 255 | |
| 241 | // resolvePrincipalFromSubscription walks the same chain as the | 256 | // resolvePrincipalFromSubscription walks the same chain as the |
@@ -375,6 +390,9 @@ func (h *Handlers) applyStripeInvoiceEvent(ctx context.Context, event stripeapi. |
| 375 | return err | 390 | return err |
| 376 | } | 391 | } |
| 377 | h.recordWebhookSubject(ctx, event.ID, principalState.Principal) | 392 | h.recordWebhookSubject(ctx, event.ID, principalState.Principal) |
| | 393 | + if stale, err := h.checkStaleEvent(ctx, event, principalState.Principal); err != nil || stale { |
| | 394 | + return err |
| | 395 | + } |
| 378 | status, err := stripeInvoiceStatus(inv.Status) | 396 | status, err := stripeInvoiceStatus(inv.Status) |
| 379 | if err != nil { | 397 | if err != nil { |
| 380 | return err | 398 | return err |
@@ -402,14 +420,17 @@ func (h *Handlers) applyStripeInvoiceEvent(ctx context.Context, event stripeapi. |
| 402 | switch string(event.Type) { | 420 | switch string(event.Type) { |
| 403 | case "invoice.payment_failed": | 421 | case "invoice.payment_failed": |
| 404 | graceUntil := time.Now().UTC().Add(h.d.BillingGracePeriod) | 422 | graceUntil := time.Now().UTC().Add(h.d.BillingGracePeriod) |
| 405 | - _, err := orgbilling.MarkPastDueForPrincipal(ctx, orgbilling.Deps{Pool: h.d.Pool}, principalState.Principal, graceUntil, event.ID) | 423 | + if _, err := orgbilling.MarkPastDueForPrincipal(ctx, orgbilling.Deps{Pool: h.d.Pool}, principalState.Principal, graceUntil, event.ID); err != nil { |
| 406 | - return err | 424 | + return err |
| | 425 | + } |
| 407 | case "invoice.payment_succeeded": | 426 | case "invoice.payment_succeeded": |
| 408 | if principalState.SubscriptionStatus != orgbilling.SubscriptionStatusCanceled { | 427 | if principalState.SubscriptionStatus != orgbilling.SubscriptionStatusCanceled { |
| 409 | - _, err := orgbilling.MarkPaymentSucceededForPrincipal(ctx, orgbilling.Deps{Pool: h.d.Pool}, principalState.Principal, event.ID) | 428 | + if _, err := orgbilling.MarkPaymentSucceededForPrincipal(ctx, orgbilling.Deps{Pool: h.d.Pool}, principalState.Principal, event.ID); err != nil { |
| 410 | - return err | 429 | + return err |
| | 430 | + } |
| 411 | } | 431 | } |
| 412 | } | 432 | } |
| | 433 | + h.touchLastEventAt(ctx, event, principalState.Principal) |
| 413 | return nil | 434 | return nil |
| 414 | } | 435 | } |
| 415 | | 436 | |
@@ -564,6 +585,59 @@ func unixTime(ts int64) time.Time { |
| 564 | return time.Unix(ts, 0).UTC() | 585 | return time.Unix(ts, 0).UTC() |
| 565 | } | 586 | } |
| 566 | | 587 | |
| | 588 | +// checkStaleEvent compares the incoming Stripe event's `created` |
| | 589 | +// timestamp to the principal's persisted last_event_at. Returns |
| | 590 | +// stale=true when the event is older than the last applied event, |
| | 591 | +// in which case the caller should return nil (the parent webhook |
| | 592 | +// handler logs MarkProcessed and Stripe stops retrying). Returns |
| | 593 | +// err only when the staleness query itself errored. |
| | 594 | +// |
| | 595 | +// PRO08 D4. Stripe doesn't guarantee delivery order across retries; |
| | 596 | +// without this guard a stale subscription.updated[active] arriving |
| | 597 | +// after a fresh subscription.updated[canceled] would re-activate the |
| | 598 | +// principal. |
| | 599 | +func (h *Handlers) checkStaleEvent(ctx context.Context, event stripeapi.Event, p orgbilling.Principal) (bool, error) { |
| | 600 | + if err := p.Validate(); err != nil { |
| | 601 | + return false, nil |
| | 602 | + } |
| | 603 | + eventAt := unixTime(event.Created) |
| | 604 | + if eventAt.IsZero() { |
| | 605 | + // No timestamp on event — can't make a staleness judgment. |
| | 606 | + return false, nil |
| | 607 | + } |
| | 608 | + stale, err := orgbilling.IsBillingEventStaleForPrincipal(ctx, orgbilling.Deps{Pool: h.d.Pool}, p, eventAt) |
| | 609 | + if err != nil { |
| | 610 | + h.d.Logger.WarnContext(ctx, "org billing: stale-event check failed", |
| | 611 | + "event_id", event.ID, "principal", p.String(), "error", err) |
| | 612 | + return false, nil |
| | 613 | + } |
| | 614 | + if stale { |
| | 615 | + h.d.Logger.InfoContext(ctx, "org billing: dropping stale Stripe event", |
| | 616 | + "event_id", event.ID, |
| | 617 | + "event_type", event.Type, |
| | 618 | + "event_created", eventAt, |
| | 619 | + "principal", p.String()) |
| | 620 | + } |
| | 621 | + return stale, nil |
| | 622 | +} |
| | 623 | + |
| | 624 | +// touchLastEventAt updates the principal's last_event_at after a |
| | 625 | +// successful apply. Logs and continues on error — this is auxiliary |
| | 626 | +// to the load-bearing state mutation. |
| | 627 | +func (h *Handlers) touchLastEventAt(ctx context.Context, event stripeapi.Event, p orgbilling.Principal) { |
| | 628 | + if err := p.Validate(); err != nil { |
| | 629 | + return |
| | 630 | + } |
| | 631 | + eventAt := unixTime(event.Created) |
| | 632 | + if eventAt.IsZero() { |
| | 633 | + return |
| | 634 | + } |
| | 635 | + if err := orgbilling.TouchBillingLastEventAtForPrincipal(ctx, orgbilling.Deps{Pool: h.d.Pool}, p, eventAt); err != nil { |
| | 636 | + h.d.Logger.WarnContext(ctx, "org billing: touch last_event_at failed", |
| | 637 | + "event_id", event.ID, "principal", p.String(), "error", err) |
| | 638 | + } |
| | 639 | +} |
| | 640 | + |
| 567 | // recordWebhookSubject persists the resolved principal on the receipt | 641 | // recordWebhookSubject persists the resolved principal on the receipt |
| 568 | // row so failed events keep their audit trail. Logs and continues on | 642 | // row so failed events keep their audit trail. Logs and continues on |
| 569 | // error — the subject is auxiliary; the state-mutation path is the | 643 | // error — the subject is auxiliary; the state-mutation path is the |