Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allows files bigger than 4GiB by sharding #4

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 111 additions & 99 deletions uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,25 @@ import (
"github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/blockstore"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/datamodel"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
ucanto_car "github.com/web3-storage/go-ucanto/core/car"
"github.com/web3-storage/go-ucanto/core/delegation"
"github.com/web3-storage/go-ucanto/did"
"github.com/web3-storage/go-ucanto/principal"
"github.com/web3-storage/go-ucanto/principal/ed25519/signer"
"github.com/web3-storage/go-w3up/capability/storeadd"
"github.com/web3-storage/go-w3up/capability/uploadadd"
"github.com/web3-storage/go-w3up/car/sharding"
"github.com/web3-storage/go-w3up/client"
"github.com/web3-storage/go-w3up/cmd/util"
w3sdelegation "github.com/web3-storage/go-w3up/delegation"
)

// w3s interface to make it easier to mock w3s.
type w3s interface {
upload(cid.Cid, string) (cid.Cid, error)
upload(cid.Cid, string) (cid.Cid, []ipld.Link, error)
}

// Uploader ...
Expand Down Expand Up @@ -63,56 +64,34 @@ func NewUploader(spaceID string, sk string, proofBytes []byte, tmpDir string) (*

// Upload uploads the content of a io.Reader.
func (u *Uploader) Upload(ctx context.Context, r io.Reader) (_ UploadResult, err error) {
dest, err := u.saveTmp(r)
if err != nil {
return UploadResult{}, fmt.Errorf("failed saving into tmp: %s", err)
}
randBytes := make([]byte, 16)
_, _ = rand.Read(randBytes)
dest := filepath.Join(u.tmpDir, hex.EncodeToString(randBytes))
dest = fmt.Sprintf("%s.car", dest)

defer func() {
if cErr := u.removeTmp(dest); err == nil {
err = cErr
}
}()

root, err := u.createCar(ctx, dest)
root, err := u.createCar(ctx, dest, r)
if err != nil {
return UploadResult{}, fmt.Errorf("failed generating CAR: %s", err)
}

shard, err := u.w3s.upload(root, dest)
root, shards, err := u.w3s.upload(root, dest)
if err != nil {
return UploadResult{}, fmt.Errorf("failed archiving CAR: %s", err)
}

return UploadResult{
Root: root,
Shard: shard,
Shard: cid.MustParse(shards[0].String()),
}, nil
}

func (u *Uploader) saveTmp(r io.Reader) (_ string, err error) {
randBytes := make([]byte, 16)
_, _ = rand.Read(randBytes)
dest := filepath.Join(u.tmpDir, hex.EncodeToString(randBytes))

f, err := os.Create(dest)
if err != nil {
return "", err
}
defer func() {
// Close file and override return error type if it is nil.
if cerr := f.Close(); err == nil {
err = cerr
}
}()

if _, err := io.Copy(f, r); err != nil {
return "", err
}

return dest, nil
}

func (u *Uploader) createCar(ctx context.Context, dest string) (cid.Cid, error) {
func (u *Uploader) createCar(ctx context.Context, dest string, r io.Reader) (cid.Cid, error) {
hasher, err := multihash.GetHasher(multihash.SHA2_256)
if err != nil {
return cid.Cid{}, err
Expand All @@ -125,14 +104,14 @@ func (u *Uploader) createCar(ctx context.Context, dest string) (cid.Cid, error)
proxyRoot := cid.NewCidV1(uint64(multicodec.DagPb), hash)

cdest, err := blockstore.OpenReadWrite(
fmt.Sprintf("%s.car", dest), []cid.Cid{proxyRoot}, []car.Option{blockstore.WriteAsCarV1(true)}...,
dest, []cid.Cid{proxyRoot}, []car.Option{blockstore.WriteAsCarV1(true)}...,
)
if err != nil {
return cid.Cid{}, err
}

// Write the unixfs blocks into the store.
root, _, err := writeFile(ctx, cdest, dest)
root, _, err := writeFile(ctx, cdest, r)
if err != nil {
return cid.Cid{}, err
}
Expand All @@ -141,7 +120,7 @@ func (u *Uploader) createCar(ctx context.Context, dest string) (cid.Cid, error)
return cid.Cid{}, err
}
// re-open/finalize with the final root.
if err := car.ReplaceRootsInFile(fmt.Sprintf("%s.car", dest), []cid.Cid{root}); err != nil {
if err := car.ReplaceRootsInFile(dest, []cid.Cid{root}); err != nil {
return cid.Cid{}, err
}

Expand All @@ -152,15 +131,10 @@ func (*Uploader) removeTmp(dest string) error {
if err := os.Remove(dest); err != nil {
return fmt.Errorf("failed to remove file: %s", err)
}

if err := os.Remove(fmt.Sprintf("%s.car", dest)); err != nil {
return fmt.Errorf("failed to remove car file: %s", err)
}

return nil
}

func writeFile(ctx context.Context, bs *blockstore.ReadWrite, path string) (_ cid.Cid, sz uint64, err error) {
func writeFile(ctx context.Context, bs *blockstore.ReadWrite, reader io.Reader) (_ cid.Cid, sz uint64, err error) {
ls := cidlink.DefaultLinkSystem()
ls.TrustedStorage = true
ls.StorageReadOpener = func(_ ipld.LinkContext, l ipld.Link) (io.Reader, error) {
Expand Down Expand Up @@ -192,18 +166,7 @@ func writeFile(ctx context.Context, bs *blockstore.ReadWrite, path string) (_ ci
}, nil
}

f, err := os.Open(path)
if err != nil {
return cid.Undef, 0, err
}
defer func() {
// Close file and override return error type if it is nil.
if cerr := f.Close(); err == nil {
err = cerr
}
}()

l, size, err := builder.BuildUnixFSFile(f, "", &ls)
l, size, err := builder.BuildUnixFSFile(reader, "", &ls)
if err != nil {
return cid.Undef, 0, err
}
Expand Down Expand Up @@ -246,49 +209,115 @@ func newW3sclient(spaceID string, sk string, proofBytes []byte) (*w3sclient, err
}, nil
}

func (c *w3sclient) upload(root cid.Cid, dest string) (_ cid.Cid, err error) {
func (c *w3sclient) upload(root cid.Cid, dest string) (_ cid.Cid, _ []ipld.Link, err error) {
// no need to close the file because the http client is doing that
f, err := os.Open(fmt.Sprintf("%s.car", dest))
f, err := os.Open(dest)
if err != nil {
return cid.Undef, err
return cid.Undef, []ipld.Link{}, err
}
defer func() {
// Close file and override return error type if it is nil.
if cerr := f.Close(); err == nil {
err = cerr
}
}()

stat, err := f.Stat()
if err != nil {
return cid.Undef, err
return cid.Undef, []ipld.Link{}, err
}

var shdlnks []ipld.Link

size := uint64(stat.Size())
mh, err := multihash.SumStream(f, multihash.SHA2_256, -1)
if err != nil {
return cid.Undef, err
if size < sharding.ShardSize {
link, err := storeShard(c.issuer, c.space, f, []delegation.Delegation{c.proof})
if err != nil {
return cid.Undef, []ipld.Link{}, err
}
shdlnks = append(shdlnks, link)
} else {
_, blocks, err := ucanto_car.Decode(f)
if err != nil {
return cid.Undef, []ipld.Link{}, fmt.Errorf("decoding CAR: %s", err)
}
shds, err := sharding.NewSharder([]ipld.Link{}, blocks)
if err != nil {
return cid.Undef, []ipld.Link{}, fmt.Errorf("sharding CAR: %s", err)
}

for {
shd, err := shds.Next()
if err != nil {
if err == io.EOF {
break
}
return cid.Undef, []ipld.Link{}, err
}
link, err := storeShard(c.issuer, c.space, shd, []delegation.Delegation{c.proof})
if err != nil {
return cid.Undef, []ipld.Link{}, err
}
shdlnks = append(shdlnks, link)
}
}

shardLink := cidlink.Link{Cid: cid.NewCidV1(uint64(multicodec.Car), mh)}
rcpt, err := client.StoreAdd(
rcpt2, err := client.UploadAdd(
c.issuer,
c.space,
&storeadd.Caveat{Link: shardLink, Size: size},
&uploadadd.Caveat{Root: cidlink.Link{Cid: root}, Shards: shdlnks},
client.WithConnection(util.MustGetConnection()),
client.WithProofs([]delegation.Delegation{c.proof}),
)
if err != nil {
return cid.Undef, err
return cid.Undef, []ipld.Link{}, err
}

if rcpt2.Out().Error() != nil {
return cid.Undef, []ipld.Link{}, fmt.Errorf("%s", rcpt2.Out().Error().Message)
}

return root, shdlnks, nil
}

func storeShard(
issuer principal.Signer, space did.DID, shard io.Reader, proofs []delegation.Delegation,
) (ipld.Link, error) {
buf := new(bytes.Buffer)
_, err := buf.ReadFrom(shard)
if err != nil {
return nil, fmt.Errorf("reading CAR: %s", err)
}

mh, err := multihash.Sum(buf.Bytes(), multihash.SHA2_256, -1)
if err != nil {
return nil, fmt.Errorf("hashing CAR: %s", err)
}

link := cidlink.Link{Cid: cid.NewCidV1(0x0202, mh)}

rcpt, err := client.StoreAdd(
issuer,
space,
&storeadd.Caveat{
Link: link,
Size: uint64(buf.Len()),
},
client.WithConnection(util.MustGetConnection()),
client.WithProofs(proofs),
)
if err != nil {
return nil, fmt.Errorf("store/add %s: %s", link, err)
}

if rcpt.Out().Error() != nil {
return cid.Undef, fmt.Errorf(rcpt.Out().Error().Message)
return nil, fmt.Errorf("%+v", rcpt.Out().Error())
}

if rcpt.Out().Ok().Status == "upload" {
_, err := f.Seek(0, io.SeekStart)
if err != nil {
return cid.Undef, err
}

hr, err := http.NewRequest("PUT", *rcpt.Out().Ok().Url, f)
hr, err := http.NewRequest("PUT", *rcpt.Out().Ok().Url, bytes.NewReader(buf.Bytes()))
if err != nil {
return cid.Undef, err
return nil, fmt.Errorf("creating HTTP request: %s", err)
}

hdr := map[string][]string{}
Expand All @@ -298,39 +327,22 @@ func (c *w3sclient) upload(root cid.Cid, dest string) (_ cid.Cid, err error) {
}
hdr[k] = []string{v}
}

hr.Header = hdr
hr.ContentLength = int64(size)
httpClient := http.Client{
Timeout: 0,
}
hr.ContentLength = int64(buf.Len())
httpClient := http.Client{}
res, err := httpClient.Do(hr)
if err != nil {
return cid.Undef, err
return nil, fmt.Errorf("doing HTTP request: %s", err)
}

if res.StatusCode != 200 {
return cid.Undef, fmt.Errorf("status code: %d", res.StatusCode)
return nil, fmt.Errorf("non-200 status code while uploading file: %d", res.StatusCode)
}

if err := res.Body.Close(); err != nil {
return cid.Undef, fmt.Errorf("closing request body: %s", err)
err = res.Body.Close()
if err != nil {
return nil, fmt.Errorf("closing request body: %s", err)
}
}

rcpt2, err := client.UploadAdd(
c.issuer,
c.space,
&uploadadd.Caveat{Root: cidlink.Link{Cid: root}, Shards: []datamodel.Link{shardLink}},
client.WithConnection(util.MustGetConnection()),
client.WithProofs([]delegation.Delegation{c.proof}),
)
if err != nil {
return cid.Undef, err
}

if rcpt2.Out().Error() != nil {
return cid.Undef, fmt.Errorf(rcpt2.Out().Error().Message)
}

return shardLink.Cid, nil
return link, nil
}
Loading
Loading