-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathstores.go
284 lines (236 loc) · 8.88 KB
/
stores.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
package dstore
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"net/url"
"os"
"path/filepath"
"strings"
)
var ErrNotFound = errors.New("not found")
type Store interface {
OpenObject(ctx context.Context, name string) (out io.ReadCloser, err error)
FileExists(ctx context.Context, base string) (bool, error)
ObjectPath(base string) string
ObjectURL(base string) string
ObjectAttributes(ctx context.Context, base string) (*ObjectAttributes, error)
WriteObject(ctx context.Context, base string, f io.Reader) (err error)
PushLocalFile(ctx context.Context, localFile, toBaseName string) (err error)
CopyObject(ctx context.Context, src, dest string) error
Overwrite() bool
SetOverwrite(enabled bool)
// Walk recursively all files starting with the given prefix within this store. The `f` callback is invoked
// for each file found.
//
// If you return `dstore.StopIteration` from your callback, iteration stops right away and `nil` will
// returned by the `Walk` function. If your callback returns any error, iteration stops right away and
// callback returned error is return by the `Walk` function.
Walk(ctx context.Context, prefix string, f func(filename string) (err error)) error
WalkFrom(ctx context.Context, prefix, inclusiveFrom string, f func(filename string) (err error)) error
WalkFromTo(ctx context.Context, prefix, inclusiveFrom, exclusiveTo string, f func(filename string) (err error)) error
ListFiles(ctx context.Context, prefix string, max int) ([]string, error)
DeleteObject(ctx context.Context, base string) error
// Used to retrieve original query parameters, allowing further
// configurability of the consumers of this store.
BaseURL() *url.URL
SubStore(subFolder string) (Store, error)
// Deprecated: Use the Options to add callbacks to inject metering from the upstream code instead
SetMeter(meter Meter)
}
type Clonable interface {
Clone(ctx context.Context, opts ...Option) (Store, error)
}
var StopIteration = errors.New("stop iteration")
func NewDBinStore(baseURL string, opts ...Option) (Store, error) {
return NewStore(baseURL, "dbin.zst", "zstd", false, opts...)
}
func NewJSONLStore(baseURL string, opts ...Option) (Store, error) {
// Replaces NewSimpleArchiveStore() from before
return NewStore(baseURL, "jsonl.gz", "gzip", false, opts...)
}
func NewSimpleStore(baseURL string, opts ...Option) (Store, error) {
// Replaces NewSimpleGStore, and supports local store.
return NewStore(baseURL, "", "", true, opts...)
}
// NewStore creates a new Store instance. The baseURL is always a directory, and does not end with a `/`.
func NewStore(baseURL, extension, compressionType string, overwrite bool, opts ...Option) (Store, error) {
if strings.HasSuffix(baseURL, "/") {
return nil, fmt.Errorf("baseURL shouldn't end with a /")
}
// WARN: if you were passing `jsonl` as an extension, you should now add `.gz` if you intend
// to enable compression.
base, err := url.Parse(baseURL)
if err != nil {
return nil, err
}
config := config{}
for _, opt := range opts {
opt.apply(&config)
}
if config.compression != "" {
compressionType = config.compression
}
switch base.Scheme {
case "gs":
return NewGSStore(base, extension, compressionType, overwrite, opts...)
case "az":
return NewAzureStore(base, extension, compressionType, overwrite, opts...)
case "s3":
return NewS3Store(base, extension, compressionType, overwrite, opts...)
case "file":
return NewLocalStore(base, extension, compressionType, overwrite, opts...)
case "memory":
return NewMemoryStore(base, extension, compressionType, overwrite, opts...)
case "":
// If scheme is empty, let's assume baseURL was a absolute/relative path without being an actual URL
return NewLocalStore(base, extension, compressionType, overwrite, opts...)
}
return nil, fmt.Errorf("archive store only supports, file://, gs:// or local path")
}
type config struct {
compression string
overwrite bool
compressedWriteCallback func(ctx context.Context, size int)
compressedReadCallback func(ctx context.Context, size int)
uncompressedWriteCallback func(ctx context.Context, size int)
uncompressedReadCallback func(ctx context.Context, size int)
}
type Option interface {
apply(config *config)
}
type optionFunc func(config *config)
func (f optionFunc) apply(config *config) {
f(config)
}
// Compression defines which kind of compression to use when creating the store
// instance.
//
// Valid `compressionType` values:
// - <empty> No compression
// - zstd Use ZSTD compression
// - gzip Use GZIP compression
func Compression(compressionType string) Option {
return optionFunc(func(config *config) {
config.compression = compressionType
})
}
// AllowOverwrite allow files to be overwritten when already exist at a given
// location.
func AllowOverwrite() Option {
return optionFunc(func(config *config) {
config.overwrite = true
})
}
// WithCompressedReadCallback allows you to set a callback function that is invoked
// when a compressed read operation is performed.
func WithCompressedReadCallback(cb func(context.Context, int)) Option {
return optionFunc(func(config *config) {
config.compressedReadCallback = cb
})
}
// WithUncompressedReadCallback allows you to set a callback function that is invoked
// when an uncompressed read operation is performed.
func WithUncompressedReadCallback(cb func(context.Context, int)) Option {
return optionFunc(func(config *config) {
config.uncompressedReadCallback = cb
})
}
// WithCompressedWriteCallback allows you to set a callback function that is invoked
// when a compressed write operation is performed.
func WithCompressedWriteCallback(cb func(context.Context, int)) Option {
return optionFunc(func(config *config) {
config.compressedWriteCallback = cb
})
}
// WithUncompressedWriteCallback allows you to set a callback function that is invoked
// when an uncompressed write operation is performed.
func WithUncompressedWriteCallback(cb func(context.Context, int)) Option {
return optionFunc(func(config *config) {
config.uncompressedWriteCallback = cb
})
}
// Deprecated: Use NewStoreFromFileURL
var NewStoreFromURL = NewStoreFromFileURL
// NewStoreFromFileURL works against a full file URL to derive the store from it as well as
// the filename it points to. Use this method **only and only if** the input points to a file directly,
// if your input is to build a store, use NewStore instead.
//
// This is a shortcut helper function that make it simpler to get store from a single file
// url.
func NewStoreFromFileURL(fileURL string, opts ...Option) (store Store, filename string, err error) {
var storeURL string
if _, err := os.Stat(fileURL); !os.IsNotExist(err) {
sanitizedURL := filepath.Clean(fileURL)
filename = filepath.Base(sanitizedURL)
storeURL = filepath.Dir(sanitizedURL)
} else {
url, err := url.Parse(fileURL)
if err != nil {
return store, "", fmt.Errorf("parse file url: %w", err)
}
filename = filepath.Base(url.Path)
url.Path = strings.TrimSuffix(filepath.Dir(url.Path), "/")
storeURL = url.String()
}
config := config{}
for _, opt := range opts {
opt.apply(&config)
}
store, err = NewStore(storeURL, "", config.compression, config.overwrite, opts...)
if err != nil {
return nil, filename, fmt.Errorf("open store: %w", err)
}
return store, filename, nil
}
// OpenObject directly opens the giving file URL by parsing the file url, extracting the
// path and the filename from it, creating the store interface, opening the object directly
// and returning all this.
//
// This is a shortcut helper function that make it simpler to get store from a single file
// url.
func OpenObject(ctx context.Context, fileURL string, opts ...Option) (out io.ReadCloser, store Store, filename string, err error) {
store, filename, err = NewStoreFromFileURL(fileURL, opts...)
if err != nil {
err = fmt.Errorf("new store: %w", err)
return
}
out, err = store.OpenObject(ctx, filename)
return
}
// ReadObject directly reads the giving file URL by parsing the file url, extracting the
// path and the filename from it, creating the store interface, opening the object directly
// and returning all this.
//
// This is a shortcut helper function that make it simpler to get store from a single file
// url.
func ReadObject(ctx context.Context, fileURL string, opts ...Option) ([]byte, error) {
reader, _, _, err := OpenObject(ctx, fileURL, opts...)
if err != nil {
return nil, fmt.Errorf("open object: %w", err)
}
defer reader.Close()
return io.ReadAll(reader)
}
//
// Buffered ReadCloser
//
type BufferedFileReadCloser struct {
file *os.File
reader io.Reader
}
func NewBufferedFileReadCloser(file *os.File) *BufferedFileReadCloser {
reader := bufio.NewReader(file)
return &BufferedFileReadCloser{
file: file,
reader: reader,
}
}
func (readCloser *BufferedFileReadCloser) Read(p []byte) (n int, err error) {
return readCloser.reader.Read(p)
}
func (readCloser *BufferedFileReadCloser) Close() error {
return readCloser.file.Close()
}