tenseleyflow/shithub / 50fc718

Browse files

Add in-memory ObjectStore implementation with full semantics + tests

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
50fc718104a1fdbd475f02a8fa85b3ad0ef96d8b
Parents
873c0e5
Tree
1d4599a

2 changed files

StatusFile+-
A internal/infra/storage/memory.go 182 0
A internal/infra/storage/memory_test.go 187 0
internal/infra/storage/memory.goadded
@@ -0,0 +1,182 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package storage
4
+
5
+import (
6
+	"bytes"
7
+	"context"
8
+	"crypto/md5" //nolint:gosec // ETag, not security-sensitive — matches S3 etag derivation.
9
+	"encoding/hex"
10
+	"fmt"
11
+	"io"
12
+	"sort"
13
+	"strings"
14
+	"sync"
15
+	"time"
16
+)
17
+
18
+// MemoryStore is an in-process ObjectStore implementation. Used by tests.
19
+// Honors the same If-None-Match semantics as the s3 backend.
20
+type MemoryStore struct {
21
+	mu      sync.RWMutex
22
+	objects map[string]memObject
23
+	// signedURLBase is the prefix of generated SignedURLs so tests can
24
+	// assert their shape. Defaults to "mem://".
25
+	signedURLBase string
26
+}
27
+
28
+type memObject struct {
29
+	body         []byte
30
+	etag         string
31
+	contentType  string
32
+	lastModified time.Time
33
+}
34
+
35
+// NewMemoryStore constructs an empty in-memory store.
36
+func NewMemoryStore() *MemoryStore {
37
+	return &MemoryStore{
38
+		objects:       make(map[string]memObject),
39
+		signedURLBase: "mem://",
40
+	}
41
+}
42
+
43
+// Put implements ObjectStore.
44
+func (m *MemoryStore) Put(_ context.Context, key string, body io.Reader, opts PutOpts) (PutResult, error) {
45
+	if key == "" {
46
+		return PutResult{}, fmt.Errorf("storage: put: %w: empty key", ErrInvalidPath)
47
+	}
48
+	buf, err := io.ReadAll(body)
49
+	if err != nil {
50
+		return PutResult{}, fmt.Errorf("storage: put: read body: %w", err)
51
+	}
52
+	m.mu.Lock()
53
+	defer m.mu.Unlock()
54
+	if opts.IfNoneMatch == "*" {
55
+		if _, exists := m.objects[key]; exists {
56
+			return PutResult{}, ErrPreconditionFailed
57
+		}
58
+	}
59
+	sum := md5.Sum(buf) //nolint:gosec // not security-sensitive.
60
+	etag := hex.EncodeToString(sum[:])
61
+	m.objects[key] = memObject{
62
+		body:         buf,
63
+		etag:         etag,
64
+		contentType:  opts.ContentType,
65
+		lastModified: time.Now().UTC(),
66
+	}
67
+	return PutResult{ETag: etag, Size: int64(len(buf))}, nil
68
+}
69
+
70
+// Get implements ObjectStore.
71
+func (m *MemoryStore) Get(_ context.Context, key string) (io.ReadCloser, ObjectMeta, error) {
72
+	m.mu.RLock()
73
+	defer m.mu.RUnlock()
74
+	o, ok := m.objects[key]
75
+	if !ok {
76
+		return nil, ObjectMeta{}, ErrNotFound
77
+	}
78
+	return io.NopCloser(bytes.NewReader(o.body)), m.metaOf(key, o), nil
79
+}
80
+
81
+// Stat implements ObjectStore.
82
+func (m *MemoryStore) Stat(_ context.Context, key string) (ObjectMeta, error) {
83
+	m.mu.RLock()
84
+	defer m.mu.RUnlock()
85
+	o, ok := m.objects[key]
86
+	if !ok {
87
+		return ObjectMeta{}, ErrNotFound
88
+	}
89
+	return m.metaOf(key, o), nil
90
+}
91
+
92
+// Delete implements ObjectStore. Idempotent.
93
+func (m *MemoryStore) Delete(_ context.Context, key string) error {
94
+	m.mu.Lock()
95
+	defer m.mu.Unlock()
96
+	delete(m.objects, key)
97
+	return nil
98
+}
99
+
100
+// List implements ObjectStore. ContinuationToken is the last key returned
101
+// in the previous page; results are sorted lexicographically.
102
+func (m *MemoryStore) List(_ context.Context, prefix string, opts ListOpts) (ListResult, error) {
103
+	m.mu.RLock()
104
+	defer m.mu.RUnlock()
105
+
106
+	keys := make([]string, 0, len(m.objects))
107
+	for k := range m.objects {
108
+		if strings.HasPrefix(k, prefix) {
109
+			keys = append(keys, k)
110
+		}
111
+	}
112
+	sort.Strings(keys)
113
+
114
+	if opts.ContinuationToken != "" {
115
+		i := sort.SearchStrings(keys, opts.ContinuationToken)
116
+		if i < len(keys) && keys[i] == opts.ContinuationToken {
117
+			i++
118
+		}
119
+		keys = keys[i:]
120
+	}
121
+
122
+	maxKeys := opts.MaxKeys
123
+	if maxKeys <= 0 {
124
+		maxKeys = 1000
125
+	}
126
+
127
+	var (
128
+		out      []ObjectMeta
129
+		prefixes []string
130
+	)
131
+	seenPrefix := map[string]struct{}{}
132
+
133
+	for _, k := range keys {
134
+		if !opts.Recursive {
135
+			rest := strings.TrimPrefix(k, prefix)
136
+			if i := strings.Index(rest, "/"); i >= 0 {
137
+				cp := prefix + rest[:i+1]
138
+				if _, ok := seenPrefix[cp]; !ok {
139
+					seenPrefix[cp] = struct{}{}
140
+					prefixes = append(prefixes, cp)
141
+				}
142
+				continue
143
+			}
144
+		}
145
+		o := m.objects[k]
146
+		out = append(out, m.metaOf(k, o))
147
+		if len(out) >= maxKeys {
148
+			break
149
+		}
150
+	}
151
+
152
+	res := ListResult{Objects: out, CommonPrefixes: prefixes}
153
+	if len(out) >= maxKeys && len(out) > 0 {
154
+		res.IsTruncated = true
155
+		res.NextContinuationToken = out[len(out)-1].Key
156
+	}
157
+	return res, nil
158
+}
159
+
160
+// SignedURL implements ObjectStore. Tests can rely on the prefix to
161
+// distinguish memory-backed URLs from real ones.
162
+func (m *MemoryStore) SignedURL(_ context.Context, key string, ttl time.Duration, method string) (string, error) {
163
+	switch method {
164
+	case "GET", "PUT":
165
+	default:
166
+		return "", fmt.Errorf("storage: signed url: unsupported method %q", method)
167
+	}
168
+	if key == "" {
169
+		return "", fmt.Errorf("storage: signed url: %w: empty key", ErrInvalidPath)
170
+	}
171
+	return fmt.Sprintf("%s%s?method=%s&ttl=%s", m.signedURLBase, key, method, ttl), nil
172
+}
173
+
174
+func (m *MemoryStore) metaOf(key string, o memObject) ObjectMeta {
175
+	return ObjectMeta{
176
+		Key:          key,
177
+		Size:         int64(len(o.body)),
178
+		ETag:         o.etag,
179
+		ContentType:  o.contentType,
180
+		LastModified: o.lastModified,
181
+	}
182
+}
internal/infra/storage/memory_test.goadded
@@ -0,0 +1,187 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package storage
4
+
5
+import (
6
+	"bytes"
7
+	"context"
8
+	"errors"
9
+	"io"
10
+	"strings"
11
+	"testing"
12
+	"time"
13
+)
14
+
15
+func TestMemoryStore_PutGetStat(t *testing.T) {
16
+	t.Parallel()
17
+	ctx := context.Background()
18
+	m := NewMemoryStore()
19
+
20
+	res, err := m.Put(ctx, "k1", strings.NewReader("hello"), PutOpts{ContentType: "text/plain"})
21
+	if err != nil {
22
+		t.Fatalf("Put: %v", err)
23
+	}
24
+	if res.Size != 5 || res.ETag == "" {
25
+		t.Fatalf("unexpected put result: %+v", res)
26
+	}
27
+
28
+	rc, meta, err := m.Get(ctx, "k1")
29
+	if err != nil {
30
+		t.Fatalf("Get: %v", err)
31
+	}
32
+	defer func() { _ = rc.Close() }()
33
+	body, _ := io.ReadAll(rc)
34
+	if string(body) != "hello" {
35
+		t.Fatalf("body = %q, want hello", body)
36
+	}
37
+	if meta.ContentType != "text/plain" || meta.Size != 5 {
38
+		t.Fatalf("meta = %+v", meta)
39
+	}
40
+
41
+	stat, err := m.Stat(ctx, "k1")
42
+	if err != nil {
43
+		t.Fatalf("Stat: %v", err)
44
+	}
45
+	if stat.ETag != res.ETag {
46
+		t.Fatalf("etag mismatch: %s vs %s", stat.ETag, res.ETag)
47
+	}
48
+}
49
+
50
+func TestMemoryStore_GetMissing(t *testing.T) {
51
+	t.Parallel()
52
+	m := NewMemoryStore()
53
+	if _, _, err := m.Get(context.Background(), "absent"); !errors.Is(err, ErrNotFound) {
54
+		t.Fatalf("expected ErrNotFound, got %v", err)
55
+	}
56
+}
57
+
58
+func TestMemoryStore_PutIfNoneMatch(t *testing.T) {
59
+	t.Parallel()
60
+	ctx := context.Background()
61
+	m := NewMemoryStore()
62
+
63
+	if _, err := m.Put(ctx, "k", strings.NewReader("first"), PutOpts{}); err != nil {
64
+		t.Fatalf("first put: %v", err)
65
+	}
66
+	_, err := m.Put(ctx, "k", strings.NewReader("second"), PutOpts{IfNoneMatch: "*"})
67
+	if !errors.Is(err, ErrPreconditionFailed) {
68
+		t.Fatalf("expected ErrPreconditionFailed, got %v", err)
69
+	}
70
+
71
+	// Without IfNoneMatch, overwrite succeeds.
72
+	if _, err := m.Put(ctx, "k", strings.NewReader("third"), PutOpts{}); err != nil {
73
+		t.Fatalf("overwrite: %v", err)
74
+	}
75
+	rc, _, _ := m.Get(ctx, "k")
76
+	defer func() { _ = rc.Close() }()
77
+	body, _ := io.ReadAll(rc)
78
+	if string(body) != "third" {
79
+		t.Fatalf("got %q, want third", body)
80
+	}
81
+}
82
+
83
+func TestMemoryStore_DeleteIdempotent(t *testing.T) {
84
+	t.Parallel()
85
+	ctx := context.Background()
86
+	m := NewMemoryStore()
87
+	if err := m.Delete(ctx, "missing"); err != nil {
88
+		t.Fatalf("delete missing: %v", err)
89
+	}
90
+	_, _ = m.Put(ctx, "k", strings.NewReader("x"), PutOpts{})
91
+	if err := m.Delete(ctx, "k"); err != nil {
92
+		t.Fatalf("delete: %v", err)
93
+	}
94
+	if _, err := m.Stat(ctx, "k"); !errors.Is(err, ErrNotFound) {
95
+		t.Fatalf("post-delete stat = %v, want ErrNotFound", err)
96
+	}
97
+}
98
+
99
+func TestMemoryStore_ListRecursiveAndDelimited(t *testing.T) {
100
+	t.Parallel()
101
+	ctx := context.Background()
102
+	m := NewMemoryStore()
103
+	for _, k := range []string{
104
+		"avatars/alice/64.png",
105
+		"avatars/alice/128.png",
106
+		"avatars/bob/64.png",
107
+		"attachments/issue-1/x.txt",
108
+	} {
109
+		if _, err := m.Put(ctx, k, strings.NewReader("x"), PutOpts{}); err != nil {
110
+			t.Fatalf("seed %s: %v", k, err)
111
+		}
112
+	}
113
+
114
+	rec, err := m.List(ctx, "avatars/", ListOpts{Recursive: true})
115
+	if err != nil {
116
+		t.Fatalf("recursive list: %v", err)
117
+	}
118
+	if len(rec.Objects) != 3 {
119
+		t.Fatalf("recursive: got %d objects, want 3", len(rec.Objects))
120
+	}
121
+
122
+	del, err := m.List(ctx, "avatars/", ListOpts{})
123
+	if err != nil {
124
+		t.Fatalf("delimited list: %v", err)
125
+	}
126
+	if len(del.CommonPrefixes) != 2 {
127
+		t.Fatalf("delimited: got %d common prefixes, want 2: %v", len(del.CommonPrefixes), del.CommonPrefixes)
128
+	}
129
+}
130
+
131
+func TestMemoryStore_LargeRoundTrip(t *testing.T) {
132
+	t.Parallel()
133
+	ctx := context.Background()
134
+	m := NewMemoryStore()
135
+	body := bytes.Repeat([]byte{0xcd}, 5*1024*1024) // 5 MiB
136
+	if _, err := m.Put(ctx, "big", bytes.NewReader(body), PutOpts{ContentLength: int64(len(body))}); err != nil {
137
+		t.Fatalf("Put big: %v", err)
138
+	}
139
+	rc, meta, err := m.Get(ctx, "big")
140
+	if err != nil {
141
+		t.Fatalf("Get big: %v", err)
142
+	}
143
+	defer func() { _ = rc.Close() }()
144
+	got, _ := io.ReadAll(rc)
145
+	if !bytes.Equal(got, body) {
146
+		t.Fatalf("body mismatch (len got=%d want=%d)", len(got), len(body))
147
+	}
148
+	if meta.Size != int64(len(body)) {
149
+		t.Fatalf("meta size = %d, want %d", meta.Size, len(body))
150
+	}
151
+}
152
+
153
+func TestMemoryStore_SignedURL(t *testing.T) {
154
+	t.Parallel()
155
+	m := NewMemoryStore()
156
+	u, err := m.SignedURL(context.Background(), "k1", time.Minute, "GET")
157
+	if err != nil {
158
+		t.Fatalf("SignedURL: %v", err)
159
+	}
160
+	if !strings.HasPrefix(u, "mem://k1") {
161
+		t.Fatalf("unexpected url: %s", u)
162
+	}
163
+	if _, err := m.SignedURL(context.Background(), "k1", time.Minute, "POST"); err == nil {
164
+		t.Fatal("expected error for unsupported method")
165
+	}
166
+}
167
+
168
+func TestQuota(t *testing.T) {
169
+	t.Parallel()
170
+	q := Quota{Used: 100, Limit: 1000}
171
+	if q.Available() != 900 {
172
+		t.Fatalf("Available = %d, want 900", q.Available())
173
+	}
174
+	if q.WouldExceed(800) {
175
+		t.Fatal("WouldExceed(800) = true, want false")
176
+	}
177
+	if !q.WouldExceed(901) {
178
+		t.Fatal("WouldExceed(901) = false, want true")
179
+	}
180
+	unlimited := Quota{Used: 1 << 40}
181
+	if unlimited.Available() != -1 {
182
+		t.Fatalf("unlimited Available = %d, want -1", unlimited.Available())
183
+	}
184
+	if unlimited.WouldExceed(1 << 50) {
185
+		t.Fatal("unlimited WouldExceed = true, want false")
186
+	}
187
+}