tenseleyflow/shithub / d8c2140

Browse files

orgs tests: concurrent webhook replay applies exactly once via advisory lock

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
d8c2140086c2740951253fb98210fd3a10fe6fcf
Parents
18b9a8a
Tree
21540fb

1 changed file

StatusFile+-
A internal/web/handlers/orgs/billing_webhook_concurrency_test.go 131 0
internal/web/handlers/orgs/billing_webhook_concurrency_test.goadded
@@ -0,0 +1,131 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package orgs_test
4
+
5
+import (
6
+	"context"
7
+	"encoding/json"
8
+	"net/http"
9
+	"net/http/httptest"
10
+	"strconv"
11
+	"strings"
12
+	"sync"
13
+	"sync/atomic"
14
+	"testing"
15
+	"time"
16
+
17
+	stripeapi "github.com/stripe/stripe-go/v85"
18
+
19
+	orgbilling "github.com/tenseleyFlow/shithub/internal/billing"
20
+	billingdb "github.com/tenseleyFlow/shithub/internal/billing/sqlc"
21
+	"github.com/tenseleyFlow/shithub/internal/billing/stripebilling"
22
+	"github.com/tenseleyFlow/shithub/internal/testing/dbtest"
23
+)
24
+
25
+// TestBillingWebhookConcurrentReplayAppliesOnce locks PRO08 A3: when
26
+// N concurrent deliveries of the same event_id race, the advisory
27
+// lock makes the apply path mutually exclusive — exactly one apply
28
+// runs, processing_attempts==1, and the contending replays return 200
29
+// without doubling the state mutation.
30
+//
31
+// Without the lock, two callers race past CreateWebhookEventReceipt
32
+// before either marks processed_at, and both call ApplySubscriptionSnapshot.
33
+// The CTE is idempotent on equal payloads, but MarkPastDueForPrincipal
34
+// would rewrite grace_until on each apply — and processing_attempts
35
+// would tick to N instead of 1.
36
+func TestBillingWebhookConcurrentReplayAppliesOnce(t *testing.T) {
37
+	t.Parallel()
38
+	ctx := context.Background()
39
+	pool := dbtest.NewTestDB(t)
40
+	ownerID := insertOrgAvatarUser(t, pool, "owner")
41
+	orgID := insertOrgAvatarOrg(t, pool, ownerID, "acme")
42
+
43
+	// applyCount tracks how many times the resolver actually ran. Each
44
+	// concurrent request goes through resolve → guard → apply. With the
45
+	// lock in place exactly one request gets past the lock check and
46
+	// runs the apply; the rest return 200 "in flight".
47
+	var resolveCount atomic.Int64
48
+	raw, err := json.Marshal(map[string]any{
49
+		"id":       "sub_concurrent",
50
+		"customer": "cus_concurrent",
51
+		"status":   "active",
52
+		"metadata": map[string]string{stripebilling.MetadataOrgID: strconv.FormatInt(orgID, 10)},
53
+		"items": map[string]any{"data": []map[string]any{{
54
+			"id":                   "si_concurrent",
55
+			"current_period_start": time.Now().UTC().Add(-time.Hour).Unix(),
56
+			"current_period_end":   time.Now().UTC().Add(30 * 24 * time.Hour).Unix(),
57
+		}}},
58
+	})
59
+	if err != nil {
60
+		t.Fatalf("marshal: %v", err)
61
+	}
62
+	fake := &fakeStripeRemote{
63
+		verifyWebhookFn: func(_ []byte, _ string) (stripeapi.Event, error) {
64
+			// VerifyWebhook is called on every delivery. We count the
65
+			// resolve-side activity below; here we just hand back the
66
+			// same event each time.
67
+			return stripeapi.Event{
68
+				ID:   "evt_concurrent",
69
+				Type: stripeapi.EventType("customer.subscription.updated"),
70
+				Data: &stripeapi.EventData{Raw: raw},
71
+			}, nil
72
+		},
73
+	}
74
+	mux := newOrgBillingMux(t, pool, ownerID, fake)
75
+
76
+	const workers = 8
77
+	var wg sync.WaitGroup
78
+	wg.Add(workers)
79
+	successes := atomic.Int64{}
80
+	inFlight := atomic.Int64{}
81
+	for i := 0; i < workers; i++ {
82
+		go func() {
83
+			defer wg.Done()
84
+			req := httptest.NewRequest(http.MethodPost, "/stripe/webhook", strings.NewReader(`{"id":"evt_concurrent"}`))
85
+			req.Header.Set("Stripe-Signature", "sig_test")
86
+			resp := httptest.NewRecorder()
87
+			mux.ServeHTTP(resp, req)
88
+			if resp.Code != http.StatusOK {
89
+				return
90
+			}
91
+			body := resp.Body.String()
92
+			if strings.Contains(body, "in flight") {
93
+				inFlight.Add(1)
94
+				return
95
+			}
96
+			successes.Add(1)
97
+		}()
98
+	}
99
+	wg.Wait()
100
+
101
+	// Exactly one worker should have completed the full apply (success
102
+	// body == "ok"). The remaining returned "ok (in flight)".
103
+	if got := successes.Load(); got != 1 {
104
+		t.Errorf("successes=%d, want exactly 1 (lock should serialize)", got)
105
+	}
106
+	if got := inFlight.Load(); got != workers-1 {
107
+		t.Errorf("in-flight responses=%d, want %d (lock should reject %d racers)", got, workers-1, workers-1)
108
+	}
109
+
110
+	state, err := orgbilling.GetOrgBillingState(ctx, orgbilling.Deps{Pool: pool}, orgID)
111
+	if err != nil {
112
+		t.Fatalf("GetOrgBillingState: %v", err)
113
+	}
114
+	if state.Plan != orgbilling.PlanTeam || state.SubscriptionStatus != orgbilling.SubscriptionStatusActive {
115
+		t.Fatalf("expected Team active, got plan=%s status=%s", state.Plan, state.SubscriptionStatus)
116
+	}
117
+	receipt, err := billingdb.New().GetWebhookEventReceipt(ctx, pool, "evt_concurrent")
118
+	if err != nil {
119
+		t.Fatalf("GetWebhookEventReceipt: %v", err)
120
+	}
121
+	if receipt.ProcessingAttempts != 1 {
122
+		t.Fatalf("processing_attempts=%d, want 1 (lock should prevent double-apply)", receipt.ProcessingAttempts)
123
+	}
124
+	if !receipt.ProcessedAt.Valid {
125
+		t.Fatalf("processed_at not set: %+v", receipt)
126
+	}
127
+	// VerifyWebhook gets called on every delivery (parses event before
128
+	// the lock check); resolveCount remains unused — kept above for
129
+	// future diagnostics if we want per-resolve counting.
130
+	_ = resolveCount.Load()
131
+}