tenseleyflow/shithub / 75cbeea

Browse files

Add S3 ObjectStore via minio-go with If-None-Match precondition support

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
75cbeead6c36fdf75c626e5dd164296fa54e99c2
Parents
50fc718
Tree
4c5bc61

2 changed files

StatusFile+-
A internal/infra/storage/s3.go 207 0
A internal/infra/storage/s3_test.go 130 0
internal/infra/storage/s3.goadded
@@ -0,0 +1,207 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package storage
4
+
5
+import (
6
+	"context"
7
+	"errors"
8
+	"fmt"
9
+	"io"
10
+	"net/url"
11
+	"time"
12
+
13
+	"github.com/minio/minio-go/v7"
14
+	"github.com/minio/minio-go/v7/pkg/credentials"
15
+)
16
+
17
+// S3Config configures the S3-compatible client. Mirrors config.S3StorageConfig.
18
+type S3Config struct {
19
+	Endpoint        string // host[:port], no scheme
20
+	Region          string
21
+	AccessKeyID     string
22
+	SecretAccessKey string
23
+	Bucket          string
24
+	UseSSL          bool
25
+	ForcePathStyle  bool
26
+}
27
+
28
+// S3Store is an ObjectStore backed by any S3-compatible endpoint.
29
+type S3Store struct {
30
+	client *minio.Client
31
+	bucket string
32
+}
33
+
34
+// NewS3Store constructs the client. The bucket must already exist; this
35
+// constructor does not create it (deploy/dev scripts seed buckets).
36
+func NewS3Store(cfg S3Config) (*S3Store, error) {
37
+	if cfg.Endpoint == "" {
38
+		return nil, errors.New("storage: s3: endpoint required")
39
+	}
40
+	if cfg.Bucket == "" {
41
+		return nil, errors.New("storage: s3: bucket required")
42
+	}
43
+	mc, err := minio.New(cfg.Endpoint, &minio.Options{
44
+		Creds:        credentials.NewStaticV4(cfg.AccessKeyID, cfg.SecretAccessKey, ""),
45
+		Secure:       cfg.UseSSL,
46
+		Region:       cfg.Region,
47
+		BucketLookup: bucketLookup(cfg.ForcePathStyle),
48
+	})
49
+	if err != nil {
50
+		return nil, fmt.Errorf("storage: s3: client: %w", err)
51
+	}
52
+	return &S3Store{client: mc, bucket: cfg.Bucket}, nil
53
+}
54
+
55
+func bucketLookup(forcePathStyle bool) minio.BucketLookupType {
56
+	if forcePathStyle {
57
+		return minio.BucketLookupPath
58
+	}
59
+	return minio.BucketLookupDNS
60
+}
61
+
62
+// Put implements ObjectStore.
63
+func (s *S3Store) Put(ctx context.Context, key string, body io.Reader, opts PutOpts) (PutResult, error) {
64
+	if opts.IfNoneMatch == "*" {
65
+		if _, err := s.client.StatObject(ctx, s.bucket, key, minio.StatObjectOptions{}); err == nil {
66
+			return PutResult{}, ErrPreconditionFailed
67
+		} else if !isNotFound(err) {
68
+			return PutResult{}, fmt.Errorf("storage: s3: precondition stat: %w", err)
69
+		}
70
+	}
71
+	putOpts := minio.PutObjectOptions{ContentType: opts.ContentType}
72
+	size := opts.ContentLength
73
+	if size <= 0 {
74
+		size = -1
75
+	}
76
+	info, err := s.client.PutObject(ctx, s.bucket, key, body, size, putOpts)
77
+	if err != nil {
78
+		return PutResult{}, fmt.Errorf("storage: s3: put %s: %w", key, err)
79
+	}
80
+	return PutResult{ETag: info.ETag, Size: info.Size}, nil
81
+}
82
+
83
+// Get implements ObjectStore.
84
+func (s *S3Store) Get(ctx context.Context, key string) (io.ReadCloser, ObjectMeta, error) {
85
+	obj, err := s.client.GetObject(ctx, s.bucket, key, minio.GetObjectOptions{})
86
+	if err != nil {
87
+		return nil, ObjectMeta{}, fmt.Errorf("storage: s3: get %s: %w", key, err)
88
+	}
89
+	stat, err := obj.Stat()
90
+	if err != nil {
91
+		_ = obj.Close()
92
+		if isNotFound(err) {
93
+			return nil, ObjectMeta{}, ErrNotFound
94
+		}
95
+		return nil, ObjectMeta{}, fmt.Errorf("storage: s3: stat %s: %w", key, err)
96
+	}
97
+	return obj, metaFromStat(key, stat), nil
98
+}
99
+
100
+// Stat implements ObjectStore.
101
+func (s *S3Store) Stat(ctx context.Context, key string) (ObjectMeta, error) {
102
+	stat, err := s.client.StatObject(ctx, s.bucket, key, minio.StatObjectOptions{})
103
+	if err != nil {
104
+		if isNotFound(err) {
105
+			return ObjectMeta{}, ErrNotFound
106
+		}
107
+		return ObjectMeta{}, fmt.Errorf("storage: s3: stat %s: %w", key, err)
108
+	}
109
+	return metaFromStat(key, stat), nil
110
+}
111
+
112
+// Delete implements ObjectStore. Returns nil for missing keys (idempotent).
113
+func (s *S3Store) Delete(ctx context.Context, key string) error {
114
+	err := s.client.RemoveObject(ctx, s.bucket, key, minio.RemoveObjectOptions{})
115
+	if err != nil && !isNotFound(err) {
116
+		return fmt.Errorf("storage: s3: delete %s: %w", key, err)
117
+	}
118
+	return nil
119
+}
120
+
121
+// List implements ObjectStore. Recursive=false uses "/" as a delimiter.
122
+func (s *S3Store) List(ctx context.Context, prefix string, opts ListOpts) (ListResult, error) {
123
+	listOpts := minio.ListObjectsOptions{
124
+		Prefix:     prefix,
125
+		Recursive:  opts.Recursive,
126
+		MaxKeys:    opts.MaxKeys,
127
+		StartAfter: opts.ContinuationToken,
128
+	}
129
+	var (
130
+		out      []ObjectMeta
131
+		prefixes []string
132
+	)
133
+	for o := range s.client.ListObjects(ctx, s.bucket, listOpts) {
134
+		if o.Err != nil {
135
+			return ListResult{}, fmt.Errorf("storage: s3: list: %w", o.Err)
136
+		}
137
+		// minio-go signals common prefixes with size==0 and key ending in "/".
138
+		// In delimited mode they appear in the same channel.
139
+		if o.Size == 0 && len(o.Key) > 0 && o.Key[len(o.Key)-1] == '/' {
140
+			prefixes = append(prefixes, o.Key)
141
+			continue
142
+		}
143
+		out = append(out, ObjectMeta{
144
+			Key:          o.Key,
145
+			Size:         o.Size,
146
+			ETag:         o.ETag,
147
+			ContentType:  o.ContentType,
148
+			LastModified: o.LastModified,
149
+		})
150
+		if opts.MaxKeys > 0 && len(out) >= opts.MaxKeys {
151
+			break
152
+		}
153
+	}
154
+	res := ListResult{Objects: out, CommonPrefixes: prefixes}
155
+	if opts.MaxKeys > 0 && len(out) >= opts.MaxKeys && len(out) > 0 {
156
+		res.IsTruncated = true
157
+		res.NextContinuationToken = out[len(out)-1].Key
158
+	}
159
+	return res, nil
160
+}
161
+
162
+// SignedURL implements ObjectStore.
163
+func (s *S3Store) SignedURL(ctx context.Context, key string, ttl time.Duration, method string) (string, error) {
164
+	switch method {
165
+	case "GET":
166
+		u, err := s.client.PresignedGetObject(ctx, s.bucket, key, ttl, url.Values{})
167
+		if err != nil {
168
+			return "", fmt.Errorf("storage: s3: presign get: %w", err)
169
+		}
170
+		return u.String(), nil
171
+	case "PUT":
172
+		u, err := s.client.PresignedPutObject(ctx, s.bucket, key, ttl)
173
+		if err != nil {
174
+			return "", fmt.Errorf("storage: s3: presign put: %w", err)
175
+		}
176
+		return u.String(), nil
177
+	default:
178
+		return "", fmt.Errorf("storage: s3: unsupported signed-url method %q", method)
179
+	}
180
+}
181
+
182
+func isNotFound(err error) bool {
183
+	if err == nil {
184
+		return false
185
+	}
186
+	var resp minio.ErrorResponse
187
+	if errors.As(err, &resp) {
188
+		switch resp.Code {
189
+		case "NoSuchKey", "NoSuchBucket", "NotFound":
190
+			return true
191
+		}
192
+		if resp.StatusCode == 404 {
193
+			return true
194
+		}
195
+	}
196
+	return false
197
+}
198
+
199
+func metaFromStat(key string, s minio.ObjectInfo) ObjectMeta {
200
+	return ObjectMeta{
201
+		Key:          key,
202
+		Size:         s.Size,
203
+		ETag:         s.ETag,
204
+		ContentType:  s.ContentType,
205
+		LastModified: s.LastModified,
206
+	}
207
+}
internal/infra/storage/s3_test.goadded
@@ -0,0 +1,130 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package storage
4
+
5
+import (
6
+	"bytes"
7
+	"context"
8
+	"errors"
9
+	"io"
10
+	"os"
11
+	"strings"
12
+	"testing"
13
+	"time"
14
+)
15
+
16
+// s3FromEnv returns an S3Store configured from SHITHUB_TEST_S3_* env vars,
17
+// or skips the test when those aren't set. Tests that exercise the s3
18
+// backend end-to-end run only when CI or a developer has wired up MinIO.
19
+//
20
+//	SHITHUB_TEST_S3_ENDPOINT          (e.g. 127.0.0.1:9000)
21
+//	SHITHUB_TEST_S3_ACCESS_KEY_ID
22
+//	SHITHUB_TEST_S3_SECRET_ACCESS_KEY
23
+//	SHITHUB_TEST_S3_BUCKET            (e.g. shithub-dev)
24
+func s3FromEnv(t *testing.T) *S3Store {
25
+	t.Helper()
26
+	endpoint := os.Getenv("SHITHUB_TEST_S3_ENDPOINT")
27
+	if endpoint == "" {
28
+		t.Skip("SHITHUB_TEST_S3_ENDPOINT not set; skipping s3 integration test")
29
+	}
30
+	store, err := NewS3Store(S3Config{
31
+		Endpoint:        endpoint,
32
+		Region:          envOr("SHITHUB_TEST_S3_REGION", "us-east-1"),
33
+		AccessKeyID:     os.Getenv("SHITHUB_TEST_S3_ACCESS_KEY_ID"),
34
+		SecretAccessKey: os.Getenv("SHITHUB_TEST_S3_SECRET_ACCESS_KEY"),
35
+		Bucket:          os.Getenv("SHITHUB_TEST_S3_BUCKET"),
36
+		UseSSL:          false,
37
+		ForcePathStyle:  true,
38
+	})
39
+	if err != nil {
40
+		t.Fatalf("NewS3Store: %v", err)
41
+	}
42
+	return store
43
+}
44
+
45
+func envOr(key, def string) string {
46
+	if v := os.Getenv(key); v != "" {
47
+		return v
48
+	}
49
+	return def
50
+}
51
+
52
+func TestS3Store_RoundTrip(t *testing.T) {
53
+	t.Parallel()
54
+	s := s3FromEnv(t)
55
+	ctx := context.Background()
56
+	key := "test/round-trip-" + time.Now().UTC().Format("20060102-150405.000000000")
57
+
58
+	body := strings.NewReader("integration body")
59
+	res, err := s.Put(ctx, key, body, PutOpts{ContentType: "text/plain"})
60
+	if err != nil {
61
+		t.Fatalf("Put: %v", err)
62
+	}
63
+	t.Cleanup(func() { _ = s.Delete(ctx, key) })
64
+
65
+	if res.Size != int64(len("integration body")) {
66
+		t.Fatalf("size = %d, want %d", res.Size, len("integration body"))
67
+	}
68
+
69
+	rc, meta, err := s.Get(ctx, key)
70
+	if err != nil {
71
+		t.Fatalf("Get: %v", err)
72
+	}
73
+	defer func() { _ = rc.Close() }()
74
+	got, _ := io.ReadAll(rc)
75
+	if string(got) != "integration body" {
76
+		t.Fatalf("body mismatch: got %q", got)
77
+	}
78
+	if meta.Size != res.Size {
79
+		t.Fatalf("meta size mismatch")
80
+	}
81
+}
82
+
83
+func TestS3Store_PutIfNoneMatch(t *testing.T) {
84
+	t.Parallel()
85
+	s := s3FromEnv(t)
86
+	ctx := context.Background()
87
+	key := "test/inm-" + time.Now().UTC().Format("20060102-150405.000000000")
88
+
89
+	if _, err := s.Put(ctx, key, strings.NewReader("first"), PutOpts{}); err != nil {
90
+		t.Fatalf("first put: %v", err)
91
+	}
92
+	t.Cleanup(func() { _ = s.Delete(ctx, key) })
93
+
94
+	_, err := s.Put(ctx, key, strings.NewReader("second"), PutOpts{IfNoneMatch: "*"})
95
+	if !errors.Is(err, ErrPreconditionFailed) {
96
+		t.Fatalf("expected ErrPreconditionFailed, got %v", err)
97
+	}
98
+}
99
+
100
+func TestS3Store_LargeRoundTrip(t *testing.T) {
101
+	t.Parallel()
102
+	s := s3FromEnv(t)
103
+	ctx := context.Background()
104
+	key := "test/large-" + time.Now().UTC().Format("20060102-150405.000000000")
105
+
106
+	body := bytes.Repeat([]byte{0xab}, 5*1024*1024) // 5 MiB
107
+	if _, err := s.Put(ctx, key, bytes.NewReader(body), PutOpts{ContentLength: int64(len(body))}); err != nil {
108
+		t.Fatalf("Put large: %v", err)
109
+	}
110
+	t.Cleanup(func() { _ = s.Delete(ctx, key) })
111
+
112
+	rc, _, err := s.Get(ctx, key)
113
+	if err != nil {
114
+		t.Fatalf("Get large: %v", err)
115
+	}
116
+	defer func() { _ = rc.Close() }()
117
+	got, _ := io.ReadAll(rc)
118
+	if !bytes.Equal(got, body) {
119
+		t.Fatalf("body mismatch (len got=%d want=%d)", len(got), len(body))
120
+	}
121
+}
122
+
123
+func TestS3Store_GetMissing(t *testing.T) {
124
+	t.Parallel()
125
+	s := s3FromEnv(t)
126
+	_, _, err := s.Get(context.Background(), "test/should-not-exist-xyz-"+time.Now().UTC().Format("150405.000"))
127
+	if !errors.Is(err, ErrNotFound) {
128
+		t.Fatalf("expected ErrNotFound, got %v", err)
129
+	}
130
+}