tenseleyflow/shithub / fd186d2

Browse files

S36: internal/cache/lru — count + sized LRU + singleflight Group

Authored by espadonne
SHA
fd186d24d239bfe9122515a03f304c7be995be81
Parents
8439048
Tree
1d84442

4 changed files

StatusFile+-
A internal/cache/lru/group.go 79 0
A internal/cache/lru/lru.go 162 0
A internal/cache/lru/lru_test.go 158 0
A internal/cache/lru/sized.go 122 0
internal/cache/lru/group.goadded
@@ -0,0 +1,79 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package lru
4
+
5
+import (
6
+	"context"
7
+
8
+	"golang.org/x/sync/singleflight"
9
+)
10
+
11
+// Group wraps a Cache with single-flight semantics so a hot-key
12
+// miss doesn't spawn N concurrent upstream calls. The fetch function
13
+// is invoked at most once per (key, in-flight wave) — concurrent
14
+// callers wait on the same goroutine and receive its result.
15
+//
16
+// Use this whenever the upstream is non-trivial: a `git rev-list`
17
+// subprocess, an FS walk, a multi-row DB read.
18
+type Group[K comparable, V any] struct {
19
+	cache *Cache[K, V]
20
+	sf    singleflight.Group
21
+	// keyer converts the typed key into the singleflight string key.
22
+	// We keep the cache strongly-typed but singleflight is string-
23
+	// keyed, so callers supply a stable string mapping.
24
+	keyer func(K) string
25
+}
26
+
27
+// NewGroup wraps cache with singleflight. keyer must produce a
28
+// stable, unique string for every distinct K (default: `fmt.Sprint`-
29
+// equivalent for the type). For composite keys (struct), the caller
30
+// is on the hook for serialization.
31
+func NewGroup[K comparable, V any](cache *Cache[K, V], keyer func(K) string) *Group[K, V] {
32
+	if cache == nil {
33
+		panic("lru: nil Cache in NewGroup")
34
+	}
35
+	if keyer == nil {
36
+		panic("lru: nil keyer in NewGroup")
37
+	}
38
+	return &Group[K, V]{cache: cache, keyer: keyer}
39
+}
40
+
41
+// Do returns the cached value when present, otherwise invokes fetch
42
+// (single-flighted) and caches the result before returning.
43
+//
44
+// Errors from fetch are NOT cached — a transient failure on key K
45
+// shouldn't poison subsequent reads. Callers that want negative-
46
+// caching add their own sentinel value.
47
+func (g *Group[K, V]) Do(ctx context.Context, key K, fetch func(ctx context.Context) (V, error)) (V, error) {
48
+	if v, ok := g.cache.Get(key); ok {
49
+		return v, nil
50
+	}
51
+	sk := g.keyer(key)
52
+	v, err, _ := g.sf.Do(sk, func() (any, error) {
53
+		// Re-check the cache after acquiring the singleflight slot:
54
+		// the previous in-flight call may have populated it while we
55
+		// were waiting.
56
+		if v, ok := g.cache.Get(key); ok {
57
+			return v, nil
58
+		}
59
+		v, err := fetch(ctx)
60
+		if err != nil {
61
+			return v, err
62
+		}
63
+		g.cache.Set(key, v)
64
+		return v, nil
65
+	})
66
+	if err != nil {
67
+		var zero V
68
+		return zero, err
69
+	}
70
+	return v.(V), nil
71
+}
72
+
73
+// Invalidate drops key from the cache. Safe to call from anywhere
74
+// (push handlers, settings updates) without coordinating with
75
+// in-flight singleflight callers — the next Do re-fetches.
76
+func (g *Group[K, V]) Invalidate(key K) { g.cache.Delete(key) }
77
+
78
+// Stats reports the underlying cache's counters.
79
+func (g *Group[K, V]) Stats() Stats { return g.cache.Stats() }
internal/cache/lru/lru.goadded
@@ -0,0 +1,162 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+// Package lru is an in-process least-recently-used cache with
4
+// optional TTL + singleflight wrapping. The S36 perf-pass standardizes
5
+// on this package for every cross-request cache (refs, tree, ahead/
6
+// behind, rendered markdown, etc.) so callers don't roll their own
7
+// eviction story.
8
+//
9
+// Two core types:
10
+//
11
+//   - Cache[K,V]    — count-bounded LRU with optional per-entry TTL.
12
+//     The dumb-and-fast variant for value types whose
13
+//     in-memory cost is uniform.
14
+//
15
+//   - SizedCache[K] — byte-bounded LRU. Each entry contributes a
16
+//     caller-supplied size; eviction runs while the
17
+//     total exceeds the cap. For values whose size
18
+//     varies per key (rendered HTML, diff blobs).
19
+//
20
+// Both expose Get / Set / Delete / Len + a Stats accessor for the
21
+// /metrics surface (S36 baseline asserts hit-rate). Hot-key dogpile
22
+// prevention lives one layer up in `Group` (singleflight wrapper).
23
+package lru
24
+
25
+import (
26
+	"container/list"
27
+	"sync"
28
+	"sync/atomic"
29
+	"time"
30
+)
31
+
32
+// Stats is the per-cache hit / miss / eviction counter set. Counters
33
+// are atomic so /metrics can read without locking the cache.
34
+type Stats struct {
35
+	Hits      uint64
36
+	Misses    uint64
37
+	Evictions uint64
38
+}
39
+
40
+// Cache is a count-bounded LRU. Construct with New[K,V](capacity).
41
+type Cache[K comparable, V any] struct {
42
+	mu       sync.Mutex
43
+	capacity int
44
+	ll       *list.List
45
+	items    map[K]*list.Element
46
+	ttl      time.Duration // zero = no TTL
47
+	now      func() time.Time
48
+
49
+	hits      atomic.Uint64
50
+	misses    atomic.Uint64
51
+	evictions atomic.Uint64
52
+}
53
+
54
+// entry is the linked-list payload. ExpiresAt is zero when the
55
+// cache has no TTL.
56
+type entry[K comparable, V any] struct {
57
+	key       K
58
+	val       V
59
+	expiresAt time.Time
60
+}
61
+
62
+// New constructs a count-bounded LRU. capacity must be positive.
63
+// The TTL defaults to "no expiry"; use NewWithTTL to set one.
64
+func New[K comparable, V any](capacity int) *Cache[K, V] {
65
+	if capacity <= 0 {
66
+		panic("lru: capacity must be positive")
67
+	}
68
+	return &Cache[K, V]{
69
+		capacity: capacity,
70
+		ll:       list.New(),
71
+		items:    make(map[K]*list.Element, capacity),
72
+		now:      time.Now,
73
+	}
74
+}
75
+
76
+// NewWithTTL is like New plus a per-entry TTL. Entries past their
77
+// TTL are treated as misses on Get and dropped on access.
78
+func NewWithTTL[K comparable, V any](capacity int, ttl time.Duration) *Cache[K, V] {
79
+	c := New[K, V](capacity)
80
+	c.ttl = ttl
81
+	return c
82
+}
83
+
84
+// Get returns the value for key + true on hit, zero value + false on
85
+// miss (including TTL-expired entries).
86
+func (c *Cache[K, V]) Get(key K) (V, bool) {
87
+	var zero V
88
+	c.mu.Lock()
89
+	defer c.mu.Unlock()
90
+	el, ok := c.items[key]
91
+	if !ok {
92
+		c.misses.Add(1)
93
+		return zero, false
94
+	}
95
+	e := el.Value.(*entry[K, V])
96
+	if c.ttl > 0 && c.now().After(e.expiresAt) {
97
+		c.removeElement(el)
98
+		c.misses.Add(1)
99
+		return zero, false
100
+	}
101
+	c.ll.MoveToFront(el)
102
+	c.hits.Add(1)
103
+	return e.val, true
104
+}
105
+
106
+// Set stores key→val, evicting the least-recently-used entry when at
107
+// capacity. Replacing an existing key resets its TTL.
108
+func (c *Cache[K, V]) Set(key K, val V) {
109
+	c.mu.Lock()
110
+	defer c.mu.Unlock()
111
+	if el, ok := c.items[key]; ok {
112
+		e := el.Value.(*entry[K, V])
113
+		e.val = val
114
+		if c.ttl > 0 {
115
+			e.expiresAt = c.now().Add(c.ttl)
116
+		}
117
+		c.ll.MoveToFront(el)
118
+		return
119
+	}
120
+	e := &entry[K, V]{key: key, val: val}
121
+	if c.ttl > 0 {
122
+		e.expiresAt = c.now().Add(c.ttl)
123
+	}
124
+	el := c.ll.PushFront(e)
125
+	c.items[key] = el
126
+	if c.ll.Len() > c.capacity {
127
+		c.removeElement(c.ll.Back())
128
+		c.evictions.Add(1)
129
+	}
130
+}
131
+
132
+// Delete removes key. No-op when absent.
133
+func (c *Cache[K, V]) Delete(key K) {
134
+	c.mu.Lock()
135
+	defer c.mu.Unlock()
136
+	if el, ok := c.items[key]; ok {
137
+		c.removeElement(el)
138
+	}
139
+}
140
+
141
+// Len reports the live entry count (does NOT scan for TTL expiry —
142
+// that happens lazily on Get).
143
+func (c *Cache[K, V]) Len() int {
144
+	c.mu.Lock()
145
+	defer c.mu.Unlock()
146
+	return c.ll.Len()
147
+}
148
+
149
+// Stats returns a snapshot of the hit / miss / eviction counters.
150
+func (c *Cache[K, V]) Stats() Stats {
151
+	return Stats{
152
+		Hits:      c.hits.Load(),
153
+		Misses:    c.misses.Load(),
154
+		Evictions: c.evictions.Load(),
155
+	}
156
+}
157
+
158
+func (c *Cache[K, V]) removeElement(el *list.Element) {
159
+	e := el.Value.(*entry[K, V])
160
+	c.ll.Remove(el)
161
+	delete(c.items, e.key)
162
+}
internal/cache/lru/lru_test.goadded
@@ -0,0 +1,158 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package lru
4
+
5
+import (
6
+	"context"
7
+	"errors"
8
+	"strconv"
9
+	"sync"
10
+	"sync/atomic"
11
+	"testing"
12
+	"time"
13
+)
14
+
15
+func TestCache_GetSetEviction(t *testing.T) {
16
+	t.Parallel()
17
+	c := New[string, int](2)
18
+	c.Set("a", 1)
19
+	c.Set("b", 2)
20
+	if v, ok := c.Get("a"); !ok || v != 1 {
21
+		t.Fatalf("Get(a) = %d,%v; want 1,true", v, ok)
22
+	}
23
+	// Touching "a" makes "b" the LRU.
24
+	c.Set("c", 3)
25
+	if _, ok := c.Get("b"); ok {
26
+		t.Errorf("b should have been evicted")
27
+	}
28
+	if v, ok := c.Get("c"); !ok || v != 3 {
29
+		t.Errorf("c = %d,%v; want 3,true", v, ok)
30
+	}
31
+	if s := c.Stats(); s.Evictions != 1 {
32
+		t.Errorf("Evictions = %d; want 1", s.Evictions)
33
+	}
34
+}
35
+
36
+func TestCache_TTLExpiry(t *testing.T) {
37
+	t.Parallel()
38
+	c := NewWithTTL[string, int](4, 50*time.Millisecond)
39
+	now := time.Now()
40
+	c.now = func() time.Time { return now }
41
+	c.Set("k", 42)
42
+	if v, ok := c.Get("k"); !ok || v != 42 {
43
+		t.Fatalf("fresh hit: got %d,%v; want 42,true", v, ok)
44
+	}
45
+	now = now.Add(60 * time.Millisecond)
46
+	if _, ok := c.Get("k"); ok {
47
+		t.Errorf("expired entry should be a miss")
48
+	}
49
+}
50
+
51
+func TestCache_DeleteAndStats(t *testing.T) {
52
+	t.Parallel()
53
+	c := New[string, int](2)
54
+	c.Set("a", 1)
55
+	c.Get("a")
56
+	c.Get("missing")
57
+	c.Delete("a")
58
+	if c.Len() != 0 {
59
+		t.Errorf("Len after Delete = %d; want 0", c.Len())
60
+	}
61
+	s := c.Stats()
62
+	if s.Hits != 1 || s.Misses != 1 {
63
+		t.Errorf("stats = %+v; want hits=1 misses=1", s)
64
+	}
65
+}
66
+
67
+func TestSizedCache_BytesBounded(t *testing.T) {
68
+	t.Parallel()
69
+	c := NewSized[string](100)
70
+	c.Set("a", make([]byte, 60))
71
+	c.Set("b", make([]byte, 60)) // forces eviction of "a"
72
+	if _, ok := c.Get("a"); ok {
73
+		t.Errorf("a should have been evicted to fit b")
74
+	}
75
+	if c.Bytes() != 60 {
76
+		t.Errorf("Bytes = %d; want 60", c.Bytes())
77
+	}
78
+}
79
+
80
+func TestSizedCache_ReplaceShrinks(t *testing.T) {
81
+	t.Parallel()
82
+	c := NewSized[string](100)
83
+	c.Set("a", make([]byte, 80))
84
+	c.Set("a", make([]byte, 10)) // smaller replacement
85
+	if c.Bytes() != 10 {
86
+		t.Errorf("Bytes after shrink = %d; want 10", c.Bytes())
87
+	}
88
+}
89
+
90
+func TestGroup_SingleFlightCollapsesConcurrentMisses(t *testing.T) {
91
+	t.Parallel()
92
+	c := New[string, int](16)
93
+	g := NewGroup(c, func(s string) string { return s })
94
+
95
+	var calls atomic.Int64
96
+	fetch := func(ctx context.Context) (int, error) {
97
+		calls.Add(1)
98
+		time.Sleep(20 * time.Millisecond)
99
+		return 99, nil
100
+	}
101
+
102
+	const N = 50
103
+	var wg sync.WaitGroup
104
+	wg.Add(N)
105
+	for i := 0; i < N; i++ {
106
+		go func() {
107
+			defer wg.Done()
108
+			v, err := g.Do(context.Background(), "k", fetch)
109
+			if err != nil || v != 99 {
110
+				t.Errorf("Do = %d,%v; want 99,nil", v, err)
111
+			}
112
+		}()
113
+	}
114
+	wg.Wait()
115
+
116
+	if calls.Load() != 1 {
117
+		t.Errorf("upstream called %d times; want 1 (singleflight collapse failed)", calls.Load())
118
+	}
119
+}
120
+
121
+func TestGroup_ErrorNotCached(t *testing.T) {
122
+	t.Parallel()
123
+	c := New[string, int](4)
124
+	g := NewGroup(c, func(s string) string { return s })
125
+
126
+	var attempt atomic.Int64
127
+	fetch := func(ctx context.Context) (int, error) {
128
+		n := attempt.Add(1)
129
+		if n == 1 {
130
+			return 0, errors.New("transient")
131
+		}
132
+		return 7, nil
133
+	}
134
+	if _, err := g.Do(context.Background(), "k", fetch); err == nil {
135
+		t.Fatalf("expected error on first call")
136
+	}
137
+	v, err := g.Do(context.Background(), "k", fetch)
138
+	if err != nil {
139
+		t.Fatalf("second call err: %v", err)
140
+	}
141
+	if v != 7 {
142
+		t.Errorf("v = %d; want 7", v)
143
+	}
144
+}
145
+
146
+func BenchmarkCacheSetGet(b *testing.B) {
147
+	c := New[int, int](1024)
148
+	for i := 0; i < 1024; i++ {
149
+		c.Set(i, i)
150
+	}
151
+	b.ResetTimer()
152
+	for i := 0; i < b.N; i++ {
153
+		_, _ = c.Get(i & 1023)
154
+	}
155
+}
156
+
157
+// Reference for keyer construction in the test above.
158
+var _ = strconv.Itoa
internal/cache/lru/sized.goadded
@@ -0,0 +1,122 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package lru
4
+
5
+import (
6
+	"container/list"
7
+	"sync"
8
+	"sync/atomic"
9
+)
10
+
11
+// SizedCache is a byte-bounded LRU. Each entry contributes a caller-
12
+// supplied size; eviction runs while the running total exceeds the
13
+// cap. Use this for caches whose values vary widely in memory cost
14
+// (rendered HTML, diff blobs, response bodies).
15
+type SizedCache[K comparable] struct {
16
+	mu       sync.Mutex
17
+	maxBytes int64
18
+	cur      int64
19
+	ll       *list.List
20
+	items    map[K]*list.Element
21
+
22
+	hits      atomic.Uint64
23
+	misses    atomic.Uint64
24
+	evictions atomic.Uint64
25
+}
26
+
27
+type sizedEntry[K comparable] struct {
28
+	key  K
29
+	val  []byte
30
+	size int64
31
+}
32
+
33
+// NewSized constructs a byte-bounded LRU. maxBytes must be positive.
34
+func NewSized[K comparable](maxBytes int64) *SizedCache[K] {
35
+	if maxBytes <= 0 {
36
+		panic("lru: maxBytes must be positive")
37
+	}
38
+	return &SizedCache[K]{
39
+		maxBytes: maxBytes,
40
+		ll:       list.New(),
41
+		items:    make(map[K]*list.Element),
42
+	}
43
+}
44
+
45
+// Get returns the cached bytes + true on hit. The returned slice is
46
+// the cached buffer (zero-copy) — callers MUST NOT mutate it. Use
47
+// `append([]byte(nil), v...)` if mutation is needed.
48
+func (c *SizedCache[K]) Get(key K) ([]byte, bool) {
49
+	c.mu.Lock()
50
+	defer c.mu.Unlock()
51
+	el, ok := c.items[key]
52
+	if !ok {
53
+		c.misses.Add(1)
54
+		return nil, false
55
+	}
56
+	c.ll.MoveToFront(el)
57
+	c.hits.Add(1)
58
+	return el.Value.(*sizedEntry[K]).val, true
59
+}
60
+
61
+// Set stores key→val. Replacing an existing key updates its size.
62
+// Eviction runs after insertion until total bytes ≤ maxBytes.
63
+func (c *SizedCache[K]) Set(key K, val []byte) {
64
+	c.mu.Lock()
65
+	defer c.mu.Unlock()
66
+	size := int64(len(val))
67
+	if el, ok := c.items[key]; ok {
68
+		e := el.Value.(*sizedEntry[K])
69
+		c.cur += size - e.size
70
+		e.val = val
71
+		e.size = size
72
+		c.ll.MoveToFront(el)
73
+	} else {
74
+		e := &sizedEntry[K]{key: key, val: val, size: size}
75
+		el := c.ll.PushFront(e)
76
+		c.items[key] = el
77
+		c.cur += size
78
+	}
79
+	for c.cur > c.maxBytes && c.ll.Len() > 1 {
80
+		c.removeElement(c.ll.Back())
81
+		c.evictions.Add(1)
82
+	}
83
+}
84
+
85
+// Delete removes key. No-op when absent.
86
+func (c *SizedCache[K]) Delete(key K) {
87
+	c.mu.Lock()
88
+	defer c.mu.Unlock()
89
+	if el, ok := c.items[key]; ok {
90
+		c.removeElement(el)
91
+	}
92
+}
93
+
94
+// Bytes reports the current total payload size.
95
+func (c *SizedCache[K]) Bytes() int64 {
96
+	c.mu.Lock()
97
+	defer c.mu.Unlock()
98
+	return c.cur
99
+}
100
+
101
+// Len reports the entry count.
102
+func (c *SizedCache[K]) Len() int {
103
+	c.mu.Lock()
104
+	defer c.mu.Unlock()
105
+	return c.ll.Len()
106
+}
107
+
108
+// Stats returns a snapshot of the hit / miss / eviction counters.
109
+func (c *SizedCache[K]) Stats() Stats {
110
+	return Stats{
111
+		Hits:      c.hits.Load(),
112
+		Misses:    c.misses.Load(),
113
+		Evictions: c.evictions.Load(),
114
+	}
115
+}
116
+
117
+func (c *SizedCache[K]) removeElement(el *list.Element) {
118
+	e := el.Value.(*sizedEntry[K])
119
+	c.ll.Remove(el)
120
+	delete(c.items, e.key)
121
+	c.cur -= e.size
122
+}