tenseleyflow/shithub / 28e7461

Browse files

billing: IsBillingEventStaleForPrincipal + TouchBillingLastEventAtForPrincipal

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
28e746195ef35e3093edcb73a6c44dbc7c25a4b5
Parents
24bdba9
Tree
b83950e

4 changed files

StatusFile+-
M internal/billing/billing.go 81 0
M internal/billing/queries/billing.sql 31 0
M internal/billing/sqlc/billing.sql.go 131 26
M internal/billing/sqlc/querier.go 15 0
internal/billing/billing.gomodified
@@ -317,6 +317,87 @@ func SetWebhookEventSubjectForPrincipal(ctx context.Context, deps Deps, provider
317317
 	})
318318
 }
319319
 
320
+// IsBillingEventStaleForPrincipal reports whether an incoming Stripe
321
+// event's timestamp is older than the last event we've already applied
322
+// for this principal. PRO08 D4: the handler refuses stale events so
323
+// reverse-ordered retries can't regress state (e.g., a stale
324
+// subscription.updated[active] arriving after a fresh
325
+// subscription.updated[canceled] re-activating the principal).
326
+//
327
+// Returns false when there's no prior event on file (the first event
328
+// is never stale) or when the row simply doesn't exist (defaults to
329
+// allow; the caller's own ErrNoRows path handles missing-state).
330
+func IsBillingEventStaleForPrincipal(ctx context.Context, deps Deps, p Principal, eventAt time.Time) (bool, error) {
331
+	if err := validateDeps(deps); err != nil {
332
+		return false, err
333
+	}
334
+	if err := p.Validate(); err != nil {
335
+		return false, err
336
+	}
337
+	if eventAt.IsZero() {
338
+		return false, nil
339
+	}
340
+	q := billingdb.New()
341
+	switch p.Kind {
342
+	case SubjectKindOrg:
343
+		stale, err := q.IsOrgBillingEventStale(ctx, deps.Pool, billingdb.IsOrgBillingEventStaleParams{
344
+			OrgID:   p.ID,
345
+			EventAt: pgTime(eventAt),
346
+		})
347
+		if err != nil {
348
+			if errors.Is(err, pgx.ErrNoRows) {
349
+				return false, nil
350
+			}
351
+			return false, err
352
+		}
353
+		return stale, nil
354
+	case SubjectKindUser:
355
+		stale, err := q.IsUserBillingEventStale(ctx, deps.Pool, billingdb.IsUserBillingEventStaleParams{
356
+			UserID:  p.ID,
357
+			EventAt: pgTime(eventAt),
358
+		})
359
+		if err != nil {
360
+			if errors.Is(err, pgx.ErrNoRows) {
361
+				return false, nil
362
+			}
363
+			return false, err
364
+		}
365
+		return stale, nil
366
+	}
367
+	return false, nil
368
+}
369
+
370
+// TouchBillingLastEventAtForPrincipal bumps last_event_at on the
371
+// principal's billing-state row. PRO08 D4: called after a successful
372
+// apply so subsequent staleness checks have a baseline. The query
373
+// uses GREATEST(prev, incoming) so an out-of-order-but-recent retry
374
+// doesn't regress the timestamp.
375
+func TouchBillingLastEventAtForPrincipal(ctx context.Context, deps Deps, p Principal, eventAt time.Time) error {
376
+	if err := validateDeps(deps); err != nil {
377
+		return err
378
+	}
379
+	if err := p.Validate(); err != nil {
380
+		return err
381
+	}
382
+	if eventAt.IsZero() {
383
+		return nil
384
+	}
385
+	q := billingdb.New()
386
+	switch p.Kind {
387
+	case SubjectKindOrg:
388
+		return q.TouchOrgBillingLastEventAt(ctx, deps.Pool, billingdb.TouchOrgBillingLastEventAtParams{
389
+			OrgID:   p.ID,
390
+			EventAt: pgTime(eventAt),
391
+		})
392
+	case SubjectKindUser:
393
+		return q.TouchUserBillingLastEventAt(ctx, deps.Pool, billingdb.TouchUserBillingLastEventAtParams{
394
+			UserID:  p.ID,
395
+			EventAt: pgTime(eventAt),
396
+		})
397
+	}
398
+	return nil
399
+}
400
+
320401
 // ListFailedWebhookEvents is the operator query for "events we
321402
 // received but failed to process." Returns rows whose process_error
322403
 // is non-empty OR that have any processing_attempts but no
internal/billing/queries/billing.sqlmodified
@@ -478,6 +478,37 @@ UPDATE billing_webhook_events
478478
  WHERE provider = 'stripe'
479479
    AND provider_event_id = sqlc.arg(provider_event_id)::text;
480480
 
481
+-- name: IsOrgBillingEventStale :one
482
+-- PRO08 D4: returns true when an incoming Stripe event's timestamp
483
+-- is older than the last event we've already applied for this org.
484
+-- Stripe doesn't guarantee delivery order across retries; without
485
+-- this guard a stale `subscription.updated[active]` could re-activate
486
+-- a canceled subscription. Returns false when no prior event has
487
+-- been recorded (last_event_at IS NULL) — the first event is never
488
+-- stale.
489
+SELECT COALESCE(last_event_at > sqlc.arg(event_at)::timestamptz, false)::boolean AS stale
490
+  FROM org_billing_states
491
+ WHERE org_id = sqlc.arg(org_id)::bigint;
492
+
493
+-- name: IsUserBillingEventStale :one
494
+SELECT COALESCE(last_event_at > sqlc.arg(event_at)::timestamptz, false)::boolean AS stale
495
+  FROM user_billing_states
496
+ WHERE user_id = sqlc.arg(user_id)::bigint;
497
+
498
+-- name: TouchOrgBillingLastEventAt :exec
499
+-- PRO08 D4: bump last_event_at on successful apply. Conditional so
500
+-- a fresh apply driven by an out-of-order-but-recent retry doesn't
501
+-- regress the timestamp (GREATEST). NULL last_event_at acquires the
502
+-- incoming value.
503
+UPDATE org_billing_states
504
+   SET last_event_at = GREATEST(COALESCE(last_event_at, sqlc.arg(event_at)::timestamptz), sqlc.arg(event_at)::timestamptz)
505
+ WHERE org_id = sqlc.arg(org_id)::bigint;
506
+
507
+-- name: TouchUserBillingLastEventAt :exec
508
+UPDATE user_billing_states
509
+   SET last_event_at = GREATEST(COALESCE(last_event_at, sqlc.arg(event_at)::timestamptz), sqlc.arg(event_at)::timestamptz)
510
+ WHERE user_id = sqlc.arg(user_id)::bigint;
511
+
481512
 -- name: TryAcquireWebhookEventLock :one
482513
 -- PRO08 A3: transaction-scoped advisory lock keyed on the hash of
483514
 -- the provider_event_id. Two concurrent webhook deliveries for the
internal/billing/sqlc/billing.sql.gomodified
@@ -92,7 +92,7 @@ WITH state AS (
9292
                ELSE org_billing_states.grace_until
9393
            END,
9494
            updated_at = now()
95
-    RETURNING org_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, billable_seats, seat_snapshot_at, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at
95
+    RETURNING org_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, billable_seats, seat_snapshot_at, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at, last_event_at
9696
 ), org_update AS (
9797
     UPDATE orgs
9898
        SET plan = $2::org_plan,
@@ -100,7 +100,7 @@ WITH state AS (
100100
      WHERE id = $1::bigint
101101
     RETURNING id
102102
 )
103
-SELECT org_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, billable_seats, seat_snapshot_at, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at FROM state
103
+SELECT org_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, billable_seats, seat_snapshot_at, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at, last_event_at FROM state
104104
 `
105105
 
106106
 type ApplySubscriptionSnapshotParams struct {
@@ -139,6 +139,7 @@ type ApplySubscriptionSnapshotRow struct {
139139
 	LastWebhookEventID       string
140140
 	CreatedAt                pgtype.Timestamptz
141141
 	UpdatedAt                pgtype.Timestamptz
142
+	LastEventAt              pgtype.Timestamptz
142143
 }
143144
 
144145
 func (q *Queries) ApplySubscriptionSnapshot(ctx context.Context, db DBTX, arg ApplySubscriptionSnapshotParams) (ApplySubscriptionSnapshotRow, error) {
@@ -178,6 +179,7 @@ func (q *Queries) ApplySubscriptionSnapshot(ctx context.Context, db DBTX, arg Ap
178179
 		&i.LastWebhookEventID,
179180
 		&i.CreatedAt,
180181
 		&i.UpdatedAt,
182
+		&i.LastEventAt,
181183
 	)
182184
 	return i, err
183185
 }
@@ -260,7 +262,7 @@ WITH state AS (
260262
                ELSE user_billing_states.grace_until
261263
            END,
262264
            updated_at = now()
263
-    RETURNING user_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at
265
+    RETURNING user_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at, last_event_at
264266
 ), user_update AS (
265267
     UPDATE users
266268
        SET plan = $2::user_plan,
@@ -268,7 +270,7 @@ WITH state AS (
268270
      WHERE id = $1::bigint
269271
     RETURNING id
270272
 )
271
-SELECT user_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at FROM state
273
+SELECT user_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at, last_event_at FROM state
272274
 `
273275
 
274276
 type ApplyUserSubscriptionSnapshotParams struct {
@@ -305,6 +307,7 @@ type ApplyUserSubscriptionSnapshotRow struct {
305307
 	LastWebhookEventID       string
306308
 	CreatedAt                pgtype.Timestamptz
307309
 	UpdatedAt                pgtype.Timestamptz
310
+	LastEventAt              pgtype.Timestamptz
308311
 }
309312
 
310313
 // Mirrors ApplySubscriptionSnapshot for orgs minus the seat columns
@@ -345,6 +348,7 @@ func (q *Queries) ApplyUserSubscriptionSnapshot(ctx context.Context, db DBTX, ar
345348
 		&i.LastWebhookEventID,
346349
 		&i.CreatedAt,
347350
 		&i.UpdatedAt,
351
+		&i.LastEventAt,
348352
 	)
349353
 	return i, err
350354
 }
@@ -365,7 +369,7 @@ WITH state AS (
365369
            grace_until = NULL,
366370
            updated_at = now()
367371
      WHERE org_id = $1
368
-    RETURNING org_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, billable_seats, seat_snapshot_at, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at
372
+    RETURNING org_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, billable_seats, seat_snapshot_at, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at, last_event_at
369373
 ), org_update AS (
370374
     UPDATE orgs
371375
        SET plan = state.plan,
@@ -374,7 +378,7 @@ WITH state AS (
374378
      WHERE orgs.id = state.org_id
375379
     RETURNING orgs.id
376380
 )
377
-SELECT org_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, billable_seats, seat_snapshot_at, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at FROM state
381
+SELECT org_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, billable_seats, seat_snapshot_at, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at, last_event_at FROM state
378382
 `
379383
 
380384
 type ClearBillingLockRow struct {
@@ -399,6 +403,7 @@ type ClearBillingLockRow struct {
399403
 	LastWebhookEventID       string
400404
 	CreatedAt                pgtype.Timestamptz
401405
 	UpdatedAt                pgtype.Timestamptz
406
+	LastEventAt              pgtype.Timestamptz
402407
 }
403408
 
404409
 func (q *Queries) ClearBillingLock(ctx context.Context, db DBTX, orgID int64) (ClearBillingLockRow, error) {
@@ -426,6 +431,7 @@ func (q *Queries) ClearBillingLock(ctx context.Context, db DBTX, orgID int64) (C
426431
 		&i.LastWebhookEventID,
427432
 		&i.CreatedAt,
428433
 		&i.UpdatedAt,
434
+		&i.LastEventAt,
429435
 	)
430436
 	return i, err
431437
 }
@@ -446,7 +452,7 @@ WITH state AS (
446452
            grace_until = NULL,
447453
            updated_at = now()
448454
      WHERE user_id = $1
449
-    RETURNING user_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at
455
+    RETURNING user_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at, last_event_at
450456
 ), user_update AS (
451457
     UPDATE users
452458
        SET plan = state.plan,
@@ -455,7 +461,7 @@ WITH state AS (
455461
      WHERE users.id = state.user_id
456462
     RETURNING users.id
457463
 )
458
-SELECT user_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at FROM state
464
+SELECT user_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at, last_event_at FROM state
459465
 `
460466
 
461467
 type ClearUserBillingLockRow struct {
@@ -478,6 +484,7 @@ type ClearUserBillingLockRow struct {
478484
 	LastWebhookEventID       string
479485
 	CreatedAt                pgtype.Timestamptz
480486
 	UpdatedAt                pgtype.Timestamptz
487
+	LastEventAt              pgtype.Timestamptz
481488
 }
482489
 
483490
 func (q *Queries) ClearUserBillingLock(ctx context.Context, db DBTX, userID int64) (ClearUserBillingLockRow, error) {
@@ -503,6 +510,7 @@ func (q *Queries) ClearUserBillingLock(ctx context.Context, db DBTX, userID int6
503510
 		&i.LastWebhookEventID,
504511
 		&i.CreatedAt,
505512
 		&i.UpdatedAt,
513
+		&i.LastEventAt,
506514
 	)
507515
 	return i, err
508516
 }
@@ -667,7 +675,7 @@ func (q *Queries) CreateWebhookEventReceipt(ctx context.Context, db DBTX, arg Cr
667675
 const getOrgBillingState = `-- name: GetOrgBillingState :one
668676
 
669677
 
670
-SELECT org_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, billable_seats, seat_snapshot_at, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at FROM org_billing_states WHERE org_id = $1
678
+SELECT org_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, billable_seats, seat_snapshot_at, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at, last_event_at FROM org_billing_states WHERE org_id = $1
671679
 `
672680
 
673681
 // SPDX-License-Identifier: AGPL-3.0-or-later
@@ -697,12 +705,13 @@ func (q *Queries) GetOrgBillingState(ctx context.Context, db DBTX, orgID int64)
697705
 		&i.LastWebhookEventID,
698706
 		&i.CreatedAt,
699707
 		&i.UpdatedAt,
708
+		&i.LastEventAt,
700709
 	)
701710
 	return i, err
702711
 }
703712
 
704713
 const getOrgBillingStateByStripeCustomer = `-- name: GetOrgBillingStateByStripeCustomer :one
705
-SELECT org_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, billable_seats, seat_snapshot_at, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at FROM org_billing_states
714
+SELECT org_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, billable_seats, seat_snapshot_at, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at, last_event_at FROM org_billing_states
706715
 WHERE provider = 'stripe'
707716
   AND stripe_customer_id = $1
708717
 `
@@ -732,12 +741,13 @@ func (q *Queries) GetOrgBillingStateByStripeCustomer(ctx context.Context, db DBT
732741
 		&i.LastWebhookEventID,
733742
 		&i.CreatedAt,
734743
 		&i.UpdatedAt,
744
+		&i.LastEventAt,
735745
 	)
736746
 	return i, err
737747
 }
738748
 
739749
 const getOrgBillingStateByStripeSubscription = `-- name: GetOrgBillingStateByStripeSubscription :one
740
-SELECT org_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, billable_seats, seat_snapshot_at, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at FROM org_billing_states
750
+SELECT org_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, billable_seats, seat_snapshot_at, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at, last_event_at FROM org_billing_states
741751
 WHERE provider = 'stripe'
742752
   AND stripe_subscription_id = $1
743753
 `
@@ -767,13 +777,14 @@ func (q *Queries) GetOrgBillingStateByStripeSubscription(ctx context.Context, db
767777
 		&i.LastWebhookEventID,
768778
 		&i.CreatedAt,
769779
 		&i.UpdatedAt,
780
+		&i.LastEventAt,
770781
 	)
771782
 	return i, err
772783
 }
773784
 
774785
 const getUserBillingState = `-- name: GetUserBillingState :one
775786
 
776
-SELECT user_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at FROM user_billing_states WHERE user_id = $1
787
+SELECT user_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at, last_event_at FROM user_billing_states WHERE user_id = $1
777788
 `
778789
 
779790
 // ─── user_billing_states (PRO03) ──────────────────────────────────
@@ -800,12 +811,13 @@ func (q *Queries) GetUserBillingState(ctx context.Context, db DBTX, userID int64
800811
 		&i.LastWebhookEventID,
801812
 		&i.CreatedAt,
802813
 		&i.UpdatedAt,
814
+		&i.LastEventAt,
803815
 	)
804816
 	return i, err
805817
 }
806818
 
807819
 const getUserBillingStateByStripeCustomer = `-- name: GetUserBillingStateByStripeCustomer :one
808
-SELECT user_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at FROM user_billing_states
820
+SELECT user_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at, last_event_at FROM user_billing_states
809821
 WHERE provider = 'stripe'
810822
   AND stripe_customer_id = $1
811823
 `
@@ -833,12 +845,13 @@ func (q *Queries) GetUserBillingStateByStripeCustomer(ctx context.Context, db DB
833845
 		&i.LastWebhookEventID,
834846
 		&i.CreatedAt,
835847
 		&i.UpdatedAt,
848
+		&i.LastEventAt,
836849
 	)
837850
 	return i, err
838851
 }
839852
 
840853
 const getUserBillingStateByStripeSubscription = `-- name: GetUserBillingStateByStripeSubscription :one
841
-SELECT user_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at FROM user_billing_states
854
+SELECT user_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at, last_event_at FROM user_billing_states
842855
 WHERE provider = 'stripe'
843856
   AND stripe_subscription_id = $1
844857
 `
@@ -866,6 +879,7 @@ func (q *Queries) GetUserBillingStateByStripeSubscription(ctx context.Context, d
866879
 		&i.LastWebhookEventID,
867880
 		&i.CreatedAt,
868881
 		&i.UpdatedAt,
882
+		&i.LastEventAt,
869883
 	)
870884
 	return i, err
871885
 }
@@ -896,6 +910,49 @@ func (q *Queries) GetWebhookEventReceipt(ctx context.Context, db DBTX, providerE
896910
 	return i, err
897911
 }
898912
 
913
+const isOrgBillingEventStale = `-- name: IsOrgBillingEventStale :one
914
+SELECT COALESCE(last_event_at > $1::timestamptz, false)::boolean AS stale
915
+  FROM org_billing_states
916
+ WHERE org_id = $2::bigint
917
+`
918
+
919
+type IsOrgBillingEventStaleParams struct {
920
+	EventAt pgtype.Timestamptz
921
+	OrgID   int64
922
+}
923
+
924
+// PRO08 D4: returns true when an incoming Stripe event's timestamp
925
+// is older than the last event we've already applied for this org.
926
+// Stripe doesn't guarantee delivery order across retries; without
927
+// this guard a stale `subscription.updated[active]` could re-activate
928
+// a canceled subscription. Returns false when no prior event has
929
+// been recorded (last_event_at IS NULL) — the first event is never
930
+// stale.
931
+func (q *Queries) IsOrgBillingEventStale(ctx context.Context, db DBTX, arg IsOrgBillingEventStaleParams) (bool, error) {
932
+	row := db.QueryRow(ctx, isOrgBillingEventStale, arg.EventAt, arg.OrgID)
933
+	var stale bool
934
+	err := row.Scan(&stale)
935
+	return stale, err
936
+}
937
+
938
+const isUserBillingEventStale = `-- name: IsUserBillingEventStale :one
939
+SELECT COALESCE(last_event_at > $1::timestamptz, false)::boolean AS stale
940
+  FROM user_billing_states
941
+ WHERE user_id = $2::bigint
942
+`
943
+
944
+type IsUserBillingEventStaleParams struct {
945
+	EventAt pgtype.Timestamptz
946
+	UserID  int64
947
+}
948
+
949
+func (q *Queries) IsUserBillingEventStale(ctx context.Context, db DBTX, arg IsUserBillingEventStaleParams) (bool, error) {
950
+	row := db.QueryRow(ctx, isUserBillingEventStale, arg.EventAt, arg.UserID)
951
+	var stale bool
952
+	err := row.Scan(&stale)
953
+	return stale, err
954
+}
955
+
899956
 const listFailedWebhookEvents = `-- name: ListFailedWebhookEvents :many
900957
 SELECT id, provider, provider_event_id, event_type, api_version,
901958
        received_at, processed_at, processing_attempts, process_error,
@@ -1137,7 +1194,7 @@ WITH state AS (
11371194
            last_webhook_event_id = $1::text,
11381195
            updated_at = now()
11391196
      WHERE org_id = $2::bigint
1140
-    RETURNING org_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, billable_seats, seat_snapshot_at, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at
1197
+    RETURNING org_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, billable_seats, seat_snapshot_at, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at, last_event_at
11411198
 ), org_update AS (
11421199
     UPDATE orgs
11431200
        SET plan = 'free',
@@ -1145,7 +1202,7 @@ WITH state AS (
11451202
      WHERE id = $2::bigint
11461203
     RETURNING id
11471204
 )
1148
-SELECT org_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, billable_seats, seat_snapshot_at, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at FROM state
1205
+SELECT org_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, billable_seats, seat_snapshot_at, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at, last_event_at FROM state
11491206
 `
11501207
 
11511208
 type MarkCanceledParams struct {
@@ -1175,6 +1232,7 @@ type MarkCanceledRow struct {
11751232
 	LastWebhookEventID       string
11761233
 	CreatedAt                pgtype.Timestamptz
11771234
 	UpdatedAt                pgtype.Timestamptz
1235
+	LastEventAt              pgtype.Timestamptz
11781236
 }
11791237
 
11801238
 func (q *Queries) MarkCanceled(ctx context.Context, db DBTX, arg MarkCanceledParams) (MarkCanceledRow, error) {
@@ -1202,6 +1260,7 @@ func (q *Queries) MarkCanceled(ctx context.Context, db DBTX, arg MarkCanceledPar
12021260
 		&i.LastWebhookEventID,
12031261
 		&i.CreatedAt,
12041262
 		&i.UpdatedAt,
1263
+		&i.LastEventAt,
12051264
 	)
12061265
 	return i, err
12071266
 }
@@ -1216,7 +1275,7 @@ UPDATE org_billing_states
12161275
        last_webhook_event_id = $2::text,
12171276
        updated_at = now()
12181277
  WHERE org_id = $3::bigint
1219
-RETURNING org_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, billable_seats, seat_snapshot_at, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at
1278
+RETURNING org_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, billable_seats, seat_snapshot_at, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at, last_event_at
12201279
 `
12211280
 
12221281
 type MarkPastDueParams struct {
@@ -1250,6 +1309,7 @@ func (q *Queries) MarkPastDue(ctx context.Context, db DBTX, arg MarkPastDueParam
12501309
 		&i.LastWebhookEventID,
12511310
 		&i.CreatedAt,
12521311
 		&i.UpdatedAt,
1312
+		&i.LastEventAt,
12531313
 	)
12541314
 	return i, err
12551315
 }
@@ -1275,7 +1335,7 @@ WITH state AS (
12751335
            last_webhook_event_id = $1::text,
12761336
            updated_at = now()
12771337
      WHERE org_id = $2::bigint
1278
-    RETURNING org_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, billable_seats, seat_snapshot_at, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at
1338
+    RETURNING org_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, billable_seats, seat_snapshot_at, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at, last_event_at
12791339
 ), org_update AS (
12801340
     UPDATE orgs
12811341
        SET plan = state.plan,
@@ -1284,7 +1344,7 @@ WITH state AS (
12841344
      WHERE orgs.id = state.org_id
12851345
     RETURNING orgs.id
12861346
 )
1287
-SELECT org_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, billable_seats, seat_snapshot_at, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at FROM state
1347
+SELECT org_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, billable_seats, seat_snapshot_at, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at, last_event_at FROM state
12881348
 `
12891349
 
12901350
 type MarkPaymentSucceededParams struct {
@@ -1314,6 +1374,7 @@ type MarkPaymentSucceededRow struct {
13141374
 	LastWebhookEventID       string
13151375
 	CreatedAt                pgtype.Timestamptz
13161376
 	UpdatedAt                pgtype.Timestamptz
1377
+	LastEventAt              pgtype.Timestamptz
13171378
 }
13181379
 
13191380
 func (q *Queries) MarkPaymentSucceeded(ctx context.Context, db DBTX, arg MarkPaymentSucceededParams) (MarkPaymentSucceededRow, error) {
@@ -1341,6 +1402,7 @@ func (q *Queries) MarkPaymentSucceeded(ctx context.Context, db DBTX, arg MarkPay
13411402
 		&i.LastWebhookEventID,
13421403
 		&i.CreatedAt,
13431404
 		&i.UpdatedAt,
1405
+		&i.LastEventAt,
13441406
 	)
13451407
 	return i, err
13461408
 }
@@ -1358,7 +1420,7 @@ WITH state AS (
13581420
            last_webhook_event_id = $1::text,
13591421
            updated_at = now()
13601422
      WHERE user_id = $2::bigint
1361
-    RETURNING user_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at
1423
+    RETURNING user_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at, last_event_at
13621424
 ), user_update AS (
13631425
     UPDATE users
13641426
        SET plan = 'free',
@@ -1366,7 +1428,7 @@ WITH state AS (
13661428
      WHERE id = $2::bigint
13671429
     RETURNING id
13681430
 )
1369
-SELECT user_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at FROM state
1431
+SELECT user_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at, last_event_at FROM state
13701432
 `
13711433
 
13721434
 type MarkUserCanceledParams struct {
@@ -1394,6 +1456,7 @@ type MarkUserCanceledRow struct {
13941456
 	LastWebhookEventID       string
13951457
 	CreatedAt                pgtype.Timestamptz
13961458
 	UpdatedAt                pgtype.Timestamptz
1459
+	LastEventAt              pgtype.Timestamptz
13971460
 }
13981461
 
13991462
 func (q *Queries) MarkUserCanceled(ctx context.Context, db DBTX, arg MarkUserCanceledParams) (MarkUserCanceledRow, error) {
@@ -1419,6 +1482,7 @@ func (q *Queries) MarkUserCanceled(ctx context.Context, db DBTX, arg MarkUserCan
14191482
 		&i.LastWebhookEventID,
14201483
 		&i.CreatedAt,
14211484
 		&i.UpdatedAt,
1485
+		&i.LastEventAt,
14221486
 	)
14231487
 	return i, err
14241488
 }
@@ -1433,7 +1497,7 @@ UPDATE user_billing_states
14331497
        last_webhook_event_id = $2::text,
14341498
        updated_at = now()
14351499
  WHERE user_id = $3::bigint
1436
-RETURNING user_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at
1500
+RETURNING user_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at, last_event_at
14371501
 `
14381502
 
14391503
 type MarkUserPastDueParams struct {
@@ -1465,6 +1529,7 @@ func (q *Queries) MarkUserPastDue(ctx context.Context, db DBTX, arg MarkUserPast
14651529
 		&i.LastWebhookEventID,
14661530
 		&i.CreatedAt,
14671531
 		&i.UpdatedAt,
1532
+		&i.LastEventAt,
14681533
 	)
14691534
 	return i, err
14701535
 }
@@ -1490,7 +1555,7 @@ WITH state AS (
14901555
            last_webhook_event_id = $1::text,
14911556
            updated_at = now()
14921557
      WHERE user_id = $2::bigint
1493
-    RETURNING user_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at
1558
+    RETURNING user_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at, last_event_at
14941559
 ), user_update AS (
14951560
     UPDATE users
14961561
        SET plan = state.plan,
@@ -1499,7 +1564,7 @@ WITH state AS (
14991564
      WHERE users.id = state.user_id
15001565
     RETURNING users.id
15011566
 )
1502
-SELECT user_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at FROM state
1567
+SELECT user_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at, last_event_at FROM state
15031568
 `
15041569
 
15051570
 type MarkUserPaymentSucceededParams struct {
@@ -1527,6 +1592,7 @@ type MarkUserPaymentSucceededRow struct {
15271592
 	LastWebhookEventID       string
15281593
 	CreatedAt                pgtype.Timestamptz
15291594
 	UpdatedAt                pgtype.Timestamptz
1595
+	LastEventAt              pgtype.Timestamptz
15301596
 }
15311597
 
15321598
 func (q *Queries) MarkUserPaymentSucceeded(ctx context.Context, db DBTX, arg MarkUserPaymentSucceededParams) (MarkUserPaymentSucceededRow, error) {
@@ -1552,6 +1618,7 @@ func (q *Queries) MarkUserPaymentSucceeded(ctx context.Context, db DBTX, arg Mar
15521618
 		&i.LastWebhookEventID,
15531619
 		&i.CreatedAt,
15541620
 		&i.UpdatedAt,
1621
+		&i.LastEventAt,
15551622
 	)
15561623
 	return i, err
15571624
 }
@@ -1627,7 +1694,7 @@ ON CONFLICT (org_id) DO UPDATE
16271694
    SET stripe_customer_id = EXCLUDED.stripe_customer_id,
16281695
        provider = 'stripe',
16291696
        updated_at = now()
1630
-RETURNING org_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, billable_seats, seat_snapshot_at, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at
1697
+RETURNING org_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, billable_seats, seat_snapshot_at, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at, last_event_at
16311698
 `
16321699
 
16331700
 type SetStripeCustomerParams struct {
@@ -1660,6 +1727,7 @@ func (q *Queries) SetStripeCustomer(ctx context.Context, db DBTX, arg SetStripeC
16601727
 		&i.LastWebhookEventID,
16611728
 		&i.CreatedAt,
16621729
 		&i.UpdatedAt,
1730
+		&i.LastEventAt,
16631731
 	)
16641732
 	return i, err
16651733
 }
@@ -1671,7 +1739,7 @@ ON CONFLICT (user_id) DO UPDATE
16711739
    SET stripe_customer_id = EXCLUDED.stripe_customer_id,
16721740
        provider = 'stripe',
16731741
        updated_at = now()
1674
-RETURNING user_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at
1742
+RETURNING user_id, provider, stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, plan, subscription_status, current_period_start, current_period_end, cancel_at_period_end, trial_end, past_due_at, canceled_at, locked_at, lock_reason, grace_until, last_webhook_event_id, created_at, updated_at, last_event_at
16751743
 `
16761744
 
16771745
 type SetUserStripeCustomerParams struct {
@@ -1702,6 +1770,7 @@ func (q *Queries) SetUserStripeCustomer(ctx context.Context, db DBTX, arg SetUse
17021770
 		&i.LastWebhookEventID,
17031771
 		&i.CreatedAt,
17041772
 		&i.UpdatedAt,
1773
+		&i.LastEventAt,
17051774
 	)
17061775
 	return i, err
17071776
 }
@@ -1730,6 +1799,42 @@ func (q *Queries) SetWebhookEventSubject(ctx context.Context, db DBTX, arg SetWe
17301799
 	return err
17311800
 }
17321801
 
1802
+const touchOrgBillingLastEventAt = `-- name: TouchOrgBillingLastEventAt :exec
1803
+UPDATE org_billing_states
1804
+   SET last_event_at = GREATEST(COALESCE(last_event_at, $1::timestamptz), $1::timestamptz)
1805
+ WHERE org_id = $2::bigint
1806
+`
1807
+
1808
+type TouchOrgBillingLastEventAtParams struct {
1809
+	EventAt pgtype.Timestamptz
1810
+	OrgID   int64
1811
+}
1812
+
1813
+// PRO08 D4: bump last_event_at on successful apply. Conditional so
1814
+// a fresh apply driven by an out-of-order-but-recent retry doesn't
1815
+// regress the timestamp (GREATEST). NULL last_event_at acquires the
1816
+// incoming value.
1817
+func (q *Queries) TouchOrgBillingLastEventAt(ctx context.Context, db DBTX, arg TouchOrgBillingLastEventAtParams) error {
1818
+	_, err := db.Exec(ctx, touchOrgBillingLastEventAt, arg.EventAt, arg.OrgID)
1819
+	return err
1820
+}
1821
+
1822
+const touchUserBillingLastEventAt = `-- name: TouchUserBillingLastEventAt :exec
1823
+UPDATE user_billing_states
1824
+   SET last_event_at = GREATEST(COALESCE(last_event_at, $1::timestamptz), $1::timestamptz)
1825
+ WHERE user_id = $2::bigint
1826
+`
1827
+
1828
+type TouchUserBillingLastEventAtParams struct {
1829
+	EventAt pgtype.Timestamptz
1830
+	UserID  int64
1831
+}
1832
+
1833
+func (q *Queries) TouchUserBillingLastEventAt(ctx context.Context, db DBTX, arg TouchUserBillingLastEventAtParams) error {
1834
+	_, err := db.Exec(ctx, touchUserBillingLastEventAt, arg.EventAt, arg.UserID)
1835
+	return err
1836
+}
1837
+
17331838
 const tryAcquireWebhookEventLock = `-- name: TryAcquireWebhookEventLock :one
17341839
 SELECT pg_try_advisory_xact_lock(hashtext($1)::bigint) AS acquired
17351840
 `
internal/billing/sqlc/querier.gomodified
@@ -34,6 +34,15 @@ type Querier interface {
3434
 	GetUserBillingStateByStripeCustomer(ctx context.Context, db DBTX, stripeCustomerID pgtype.Text) (UserBillingState, error)
3535
 	GetUserBillingStateByStripeSubscription(ctx context.Context, db DBTX, stripeSubscriptionID pgtype.Text) (UserBillingState, error)
3636
 	GetWebhookEventReceipt(ctx context.Context, db DBTX, providerEventID string) (BillingWebhookEvent, error)
37
+	// PRO08 D4: returns true when an incoming Stripe event's timestamp
38
+	// is older than the last event we've already applied for this org.
39
+	// Stripe doesn't guarantee delivery order across retries; without
40
+	// this guard a stale `subscription.updated[active]` could re-activate
41
+	// a canceled subscription. Returns false when no prior event has
42
+	// been recorded (last_event_at IS NULL) — the first event is never
43
+	// stale.
44
+	IsOrgBillingEventStale(ctx context.Context, db DBTX, arg IsOrgBillingEventStaleParams) (bool, error)
45
+	IsUserBillingEventStale(ctx context.Context, db DBTX, arg IsUserBillingEventStaleParams) (bool, error)
3746
 	// Operator query for "events we received but failed to process."
3847
 	// A row is "failed" when it has a non-empty process_error OR when
3948
 	// it has never been processed (processed_at NULL) and has at least
@@ -67,6 +76,12 @@ type Querier interface {
6776
 	// subsequent apply fails. Migration 0075's CHECK constraint enforces
6877
 	// both-or-neither; callers must pass a non-zero subject.
6978
 	SetWebhookEventSubject(ctx context.Context, db DBTX, arg SetWebhookEventSubjectParams) error
79
+	// PRO08 D4: bump last_event_at on successful apply. Conditional so
80
+	// a fresh apply driven by an out-of-order-but-recent retry doesn't
81
+	// regress the timestamp (GREATEST). NULL last_event_at acquires the
82
+	// incoming value.
83
+	TouchOrgBillingLastEventAt(ctx context.Context, db DBTX, arg TouchOrgBillingLastEventAtParams) error
84
+	TouchUserBillingLastEventAt(ctx context.Context, db DBTX, arg TouchUserBillingLastEventAtParams) error
7085
 	// PRO08 A3: transaction-scoped advisory lock keyed on the hash of
7186
 	// the provider_event_id. Two concurrent webhook deliveries for the
7287
 	// same event_id race past CreateWebhookEventReceipt before either has