| 1 | // SPDX-License-Identifier: AGPL-3.0-or-later |
| 2 | |
| 3 | package storage |
| 4 | |
| 5 | import ( |
| 6 | "context" |
| 7 | "errors" |
| 8 | "fmt" |
| 9 | "io" |
| 10 | "net/url" |
| 11 | "time" |
| 12 | |
| 13 | "github.com/minio/minio-go/v7" |
| 14 | "github.com/minio/minio-go/v7/pkg/credentials" |
| 15 | ) |
| 16 | |
| 17 | // S3Config configures the S3-compatible client. Mirrors config.S3StorageConfig. |
| 18 | type S3Config struct { |
| 19 | Endpoint string // host[:port], no scheme |
| 20 | Region string |
| 21 | AccessKeyID string |
| 22 | SecretAccessKey string |
| 23 | Bucket string |
| 24 | UseSSL bool |
| 25 | ForcePathStyle bool |
| 26 | } |
| 27 | |
| 28 | // S3Store is an ObjectStore backed by any S3-compatible endpoint. |
| 29 | type S3Store struct { |
| 30 | client *minio.Client |
| 31 | bucket string |
| 32 | } |
| 33 | |
| 34 | // NewS3Store constructs the client. The bucket must already exist; this |
| 35 | // constructor does not create it (deploy/dev scripts seed buckets). |
| 36 | func NewS3Store(cfg S3Config) (*S3Store, error) { |
| 37 | if cfg.Endpoint == "" { |
| 38 | return nil, errors.New("storage: s3: endpoint required") |
| 39 | } |
| 40 | if cfg.Bucket == "" { |
| 41 | return nil, errors.New("storage: s3: bucket required") |
| 42 | } |
| 43 | mc, err := minio.New(cfg.Endpoint, &minio.Options{ |
| 44 | Creds: credentials.NewStaticV4(cfg.AccessKeyID, cfg.SecretAccessKey, ""), |
| 45 | Secure: cfg.UseSSL, |
| 46 | Region: cfg.Region, |
| 47 | BucketLookup: bucketLookup(cfg.ForcePathStyle), |
| 48 | }) |
| 49 | if err != nil { |
| 50 | return nil, fmt.Errorf("storage: s3: client: %w", err) |
| 51 | } |
| 52 | return &S3Store{client: mc, bucket: cfg.Bucket}, nil |
| 53 | } |
| 54 | |
| 55 | func bucketLookup(forcePathStyle bool) minio.BucketLookupType { |
| 56 | if forcePathStyle { |
| 57 | return minio.BucketLookupPath |
| 58 | } |
| 59 | return minio.BucketLookupDNS |
| 60 | } |
| 61 | |
| 62 | // Put implements ObjectStore. |
| 63 | func (s *S3Store) Put(ctx context.Context, key string, body io.Reader, opts PutOpts) (PutResult, error) { |
| 64 | if opts.IfNoneMatch == "*" { |
| 65 | if _, err := s.client.StatObject(ctx, s.bucket, key, minio.StatObjectOptions{}); err == nil { |
| 66 | return PutResult{}, ErrPreconditionFailed |
| 67 | } else if !isNotFound(err) { |
| 68 | return PutResult{}, fmt.Errorf("storage: s3: precondition stat: %w", err) |
| 69 | } |
| 70 | } |
| 71 | putOpts := minio.PutObjectOptions{ContentType: opts.ContentType} |
| 72 | size := opts.ContentLength |
| 73 | if size <= 0 { |
| 74 | size = -1 |
| 75 | } |
| 76 | info, err := s.client.PutObject(ctx, s.bucket, key, body, size, putOpts) |
| 77 | if err != nil { |
| 78 | return PutResult{}, fmt.Errorf("storage: s3: put %s: %w", key, err) |
| 79 | } |
| 80 | return PutResult{ETag: info.ETag, Size: info.Size}, nil |
| 81 | } |
| 82 | |
| 83 | // Get implements ObjectStore. |
| 84 | func (s *S3Store) Get(ctx context.Context, key string) (io.ReadCloser, ObjectMeta, error) { |
| 85 | obj, err := s.client.GetObject(ctx, s.bucket, key, minio.GetObjectOptions{}) |
| 86 | if err != nil { |
| 87 | return nil, ObjectMeta{}, fmt.Errorf("storage: s3: get %s: %w", key, err) |
| 88 | } |
| 89 | stat, err := obj.Stat() |
| 90 | if err != nil { |
| 91 | _ = obj.Close() |
| 92 | if isNotFound(err) { |
| 93 | return nil, ObjectMeta{}, ErrNotFound |
| 94 | } |
| 95 | return nil, ObjectMeta{}, fmt.Errorf("storage: s3: stat %s: %w", key, err) |
| 96 | } |
| 97 | return obj, metaFromStat(key, stat), nil |
| 98 | } |
| 99 | |
| 100 | // Stat implements ObjectStore. |
| 101 | func (s *S3Store) Stat(ctx context.Context, key string) (ObjectMeta, error) { |
| 102 | stat, err := s.client.StatObject(ctx, s.bucket, key, minio.StatObjectOptions{}) |
| 103 | if err != nil { |
| 104 | if isNotFound(err) { |
| 105 | return ObjectMeta{}, ErrNotFound |
| 106 | } |
| 107 | return ObjectMeta{}, fmt.Errorf("storage: s3: stat %s: %w", key, err) |
| 108 | } |
| 109 | return metaFromStat(key, stat), nil |
| 110 | } |
| 111 | |
| 112 | // Delete implements ObjectStore. Returns nil for missing keys (idempotent). |
| 113 | func (s *S3Store) Delete(ctx context.Context, key string) error { |
| 114 | err := s.client.RemoveObject(ctx, s.bucket, key, minio.RemoveObjectOptions{}) |
| 115 | if err != nil && !isNotFound(err) { |
| 116 | return fmt.Errorf("storage: s3: delete %s: %w", key, err) |
| 117 | } |
| 118 | return nil |
| 119 | } |
| 120 | |
| 121 | // List implements ObjectStore. Recursive=false uses "/" as a delimiter. |
| 122 | func (s *S3Store) List(ctx context.Context, prefix string, opts ListOpts) (ListResult, error) { |
| 123 | listOpts := minio.ListObjectsOptions{ |
| 124 | Prefix: prefix, |
| 125 | Recursive: opts.Recursive, |
| 126 | MaxKeys: opts.MaxKeys, |
| 127 | StartAfter: opts.ContinuationToken, |
| 128 | } |
| 129 | var ( |
| 130 | out []ObjectMeta |
| 131 | prefixes []string |
| 132 | ) |
| 133 | for o := range s.client.ListObjects(ctx, s.bucket, listOpts) { |
| 134 | if o.Err != nil { |
| 135 | return ListResult{}, fmt.Errorf("storage: s3: list: %w", o.Err) |
| 136 | } |
| 137 | // minio-go signals common prefixes with size==0 and key ending in "/". |
| 138 | // In delimited mode they appear in the same channel. |
| 139 | if o.Size == 0 && len(o.Key) > 0 && o.Key[len(o.Key)-1] == '/' { |
| 140 | prefixes = append(prefixes, o.Key) |
| 141 | continue |
| 142 | } |
| 143 | out = append(out, ObjectMeta{ |
| 144 | Key: o.Key, |
| 145 | Size: o.Size, |
| 146 | ETag: o.ETag, |
| 147 | ContentType: o.ContentType, |
| 148 | LastModified: o.LastModified, |
| 149 | }) |
| 150 | if opts.MaxKeys > 0 && len(out) >= opts.MaxKeys { |
| 151 | break |
| 152 | } |
| 153 | } |
| 154 | res := ListResult{Objects: out, CommonPrefixes: prefixes} |
| 155 | if opts.MaxKeys > 0 && len(out) >= opts.MaxKeys && len(out) > 0 { |
| 156 | res.IsTruncated = true |
| 157 | res.NextContinuationToken = out[len(out)-1].Key |
| 158 | } |
| 159 | return res, nil |
| 160 | } |
| 161 | |
| 162 | // SignedURL implements ObjectStore. |
| 163 | func (s *S3Store) SignedURL(ctx context.Context, key string, ttl time.Duration, method string) (string, error) { |
| 164 | switch method { |
| 165 | case "GET": |
| 166 | u, err := s.client.PresignedGetObject(ctx, s.bucket, key, ttl, url.Values{}) |
| 167 | if err != nil { |
| 168 | return "", fmt.Errorf("storage: s3: presign get: %w", err) |
| 169 | } |
| 170 | return u.String(), nil |
| 171 | case "PUT": |
| 172 | u, err := s.client.PresignedPutObject(ctx, s.bucket, key, ttl) |
| 173 | if err != nil { |
| 174 | return "", fmt.Errorf("storage: s3: presign put: %w", err) |
| 175 | } |
| 176 | return u.String(), nil |
| 177 | default: |
| 178 | return "", fmt.Errorf("storage: s3: unsupported signed-url method %q", method) |
| 179 | } |
| 180 | } |
| 181 | |
| 182 | func isNotFound(err error) bool { |
| 183 | if err == nil { |
| 184 | return false |
| 185 | } |
| 186 | var resp minio.ErrorResponse |
| 187 | if errors.As(err, &resp) { |
| 188 | switch resp.Code { |
| 189 | case "NoSuchKey", "NoSuchBucket", "NotFound": |
| 190 | return true |
| 191 | } |
| 192 | if resp.StatusCode == 404 { |
| 193 | return true |
| 194 | } |
| 195 | } |
| 196 | return false |
| 197 | } |
| 198 | |
| 199 | func metaFromStat(key string, s minio.ObjectInfo) ObjectMeta { |
| 200 | return ObjectMeta{ |
| 201 | Key: key, |
| 202 | Size: s.Size, |
| 203 | ETag: s.ETag, |
| 204 | ContentType: s.ContentType, |
| 205 | LastModified: s.LastModified, |
| 206 | } |
| 207 | } |
| 208 |