Skip to content

Commit

Permalink
Fix multipart (#1041)
Browse files Browse the repository at this point in the history
  • Loading branch information
roman-khimov authored Dec 10, 2024
2 parents 4e9ea88 + ed28289 commit cdd9ce6
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 54 deletions.
166 changes: 121 additions & 45 deletions api/layer/multipart_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/nspcc-dev/neofs-sdk-go/version"
"github.com/nspcc-dev/tzhash/tz"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

const (
Expand Down Expand Up @@ -285,6 +286,7 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
if err != nil {
return nil, fmt.Errorf("getLastPart: %w", err)
}
reqInfo := api.GetReqInfo(ctx)

// The previous part is not uploaded yet.
if lastPart == nil {
Expand All @@ -304,6 +306,12 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
return nil, err
}

n.log.Debug("upload part as slot",
zap.String("reqId", reqInfo.RequestID),
zap.String("bucket", bktInfo.Name), zap.Stringer("cid", bktInfo.CID),
zap.String("multipart upload", p.Info.UploadID),
zap.Int("part number", p.PartNumber), zap.String("object", p.Info.Key), zap.Stringer("oid", objInfo.ID), zap.String("ETag", objInfo.HashSum), zap.Int64("decSize", decSize))

return objInfo, nil
}

Expand Down Expand Up @@ -372,13 +380,6 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
n.buffers.Put(chunk)
}

reqInfo := api.GetReqInfo(ctx)
n.log.Debug("upload part",
zap.String("reqId", reqInfo.RequestID),
zap.String("bucket", bktInfo.Name), zap.Stringer("cid", bktInfo.CID),
zap.String("multipart upload", p.Info.UploadID),
zap.Int("part number", p.PartNumber), zap.String("object", p.Info.Key), zap.Stringer("oid", id))

partInfo := &data.PartInfo{
Key: p.Info.Key,
UploadID: p.Info.UploadID,
Expand All @@ -390,6 +391,12 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
Elements: elements,
}

n.log.Debug("upload part",
zap.String("reqId", reqInfo.RequestID),
zap.String("bucket", bktInfo.Name), zap.Stringer("cid", bktInfo.CID),
zap.String("multipart upload", p.Info.UploadID),
zap.Int("part number", p.PartNumber), zap.String("object", p.Info.Key), zap.Stringer("oid", id), zap.String("ETag", partInfo.ETag), zap.Int64("decSize", decSize))

// encoding hash.Hash state to save it in tree service.
// the required interface is guaranteed according to the docs, so just cast without checks.
binaryMarshaler := multipartHash.(encoding.BinaryMarshaler)
Expand Down Expand Up @@ -589,9 +596,96 @@ func (n *layer) reUploadFollowingParts(ctx context.Context, uploadParams UploadP
for _, part := range parts {
uploadParams.PartNumber = part.Number

if err = n.reUploadPart(ctx, uploadParams, part.OID, bktInfo, multipartInfo); err != nil {
return fmt.Errorf("reupload number=%d: %w", part.Number, err)
if len(part.Elements) > 0 {
if err = n.reUploadSegmentedPart(ctx, uploadParams, part, bktInfo, multipartInfo); err != nil {
return fmt.Errorf("reupload number=%d: %w", part.Number, err)
}
} else {
if err = n.reUploadPart(ctx, uploadParams, part.OID, bktInfo, multipartInfo); err != nil {
return fmt.Errorf("reupload number=%d: %w", part.Number, err)
}
}
}

return nil
}

func (n *layer) reUploadSegmentedPart(ctx context.Context, uploadParams UploadPartParams, part *data.PartInfo, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error {
var (
eg errgroup.Group
pipeReader, pipeWriter = io.Pipe()
)

eg.Go(func() error {
var (
err error
elementObj *object.Object
)

for _, element := range part.Elements {
elementObj, err = n.objectGet(ctx, bktInfo, element.OID)
if err != nil {
err = fmt.Errorf("get part oid=%s, element oid=%s: %w", part.OID.String(), element.OID.String(), err)
break
}

if _, err = pipeWriter.Write(elementObj.Payload()); err != nil {
err = fmt.Errorf("write part oid=%s, element oid=%s: %w", part.OID.String(), element.OID.String(), err)
break
}

// The part contains all elements for Split chain and contains itself as well.
// We mustn't remove it here, it will be removed on MultipartComplete.
if part.OID == element.OID {
continue
}

if deleteErr := n.objectDelete(ctx, bktInfo, element.OID); deleteErr != nil {
n.log.Error(
"couldn't delete object",
zap.Error(deleteErr),
zap.String("cnrID", bktInfo.CID.EncodeToString()),
zap.String("uploadID", multipartInfo.UploadID),
zap.Int("partNumber", part.Number),
zap.String("part.OID", part.OID.String()),
zap.String("part element OID", element.OID.String()),
)
// no return intentionally.
}
}

pipeCloseErr := pipeWriter.Close()

if err != nil {
return fmt.Errorf("pipe: %w", err)
}

if pipeCloseErr != nil {
return fmt.Errorf("close writer part oid=%s: %w", part.OID.String(), err)
}

return nil
})

eg.Go(func() error {
uploadParams.Size = part.Size
uploadParams.Reader = pipeReader

n.log.Debug("reUploadPart", zap.String("oid", part.OID.String()), zap.Int64("payload size", uploadParams.Size))
if _, err := n.uploadPart(ctx, multipartInfo, &uploadParams); err != nil {
return fmt.Errorf("upload id=%s: %w", part.OID.String(), err)
}

return nil
})

if err := eg.Wait(); err != nil {
return fmt.Errorf("upload part oid=%s: %w", part.OID.String(), err)
}

// remove old object, we just re-uploaded a new one.
if err := n.objectDelete(ctx, bktInfo, part.OID); err != nil {
return fmt.Errorf("delete old id=%s: %w", part.OID.String(), err)
}

return nil
Expand All @@ -606,6 +700,7 @@ func (n *layer) reUploadPart(ctx context.Context, uploadParams UploadPartParams,
uploadParams.Size = int64(obj.PayloadSize())
uploadParams.Reader = bytes.NewReader(obj.Payload())

n.log.Debug("reUploadPart", zap.String("oid", id.String()), zap.Uint64("payload size", obj.PayloadSize()))
if _, err = n.uploadPart(ctx, multipartInfo, &uploadParams); err != nil {
return fmt.Errorf("upload id=%s: %w", id.String(), err)
}
Expand Down Expand Up @@ -692,6 +787,8 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar

// There are no parts which were uploaded in arbitrary order.
if partNumber == 0 {
n.log.Debug("no arbitrary order parts", zap.String("uploadID", p.Info.UploadID))

// In case of all parts were uploaded subsequently, but some of them were re-uploaded.
partNumber, err = n.getMinDuplicatedPartNumber(ctx, p.Info, multipartInfo)
if err != nil {
Expand All @@ -701,6 +798,8 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar

// We need to fix Split.
if partNumber > 0 {
n.log.Debug("split fix required", zap.String("uploadID", p.Info.UploadID))

var uploadPartParams = UploadPartParams{Info: p.Info}

// We should take the part which broke the multipart upload sequence and re-upload all parts including this one.
Expand All @@ -718,6 +817,11 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
encInfo := FormEncryptionInfo(multipartInfo.Meta)

if len(partsInfo) < len(p.Parts) {
n.log.Debug(
"parts amount mismatch",
zap.Int("partsInfo", len(partsInfo)),
zap.Int("p.Parts", len(p.Parts)),
)
return nil, nil, s3errors.GetAPIError(s3errors.ErrInvalidPart)
}

Expand Down Expand Up @@ -1238,27 +1342,12 @@ func (n *layer) manualSlice(ctx context.Context, bktInfo *data.BucketInfo, prm P
// uploadPartAsSlot uploads multipart part, but without correct link to previous part because we don't have it.
// It uses zero part as pivot. Actual link will be set on CompleteMultipart.
func (n *layer) uploadPartAsSlot(ctx context.Context, params uploadPartAsSlotParams) (*data.ObjectInfo, error) {
zeroPart, err := n.treeService.GetPartByNumber(ctx, params.bktInfo, params.multipartInfo.ID, 0)
if err != nil {
return nil, fmt.Errorf("get part by number: %w", err)
}

var (
id oid.ID
chunk *[]byte
elements []data.LinkObjectPayload
isReturnToPool bool
splitFirstID = zeroPart.OID
splitPreviousID = zeroPart.OID
multipartHash = sha256.New()
currentPartHash = sha256.New()
id oid.ID
elements []data.LinkObjectPayload
multipartHash = sha256.New()
)

objHashes := []hash.Hash{multipartHash, currentPartHash}
if params.tzHash != nil {
objHashes = append(objHashes, params.tzHash)
}

params.attributes = append(params.attributes,
[2]string{headerS3MultipartUpload, params.multipartInfo.UploadID},
[2]string{headerS3MultipartNumber, strconv.FormatInt(int64(params.uploadPartParams.PartNumber), 10)},
Expand All @@ -1271,26 +1360,13 @@ func (n *layer) uploadPartAsSlot(ctx context.Context, params uploadPartAsSlotPar
Attributes: params.attributes,
CreationTime: params.creationTime,
CopiesNumber: params.multipartInfo.CopiesNumber,
Multipart: &Multipart{
MultipartHashes: objHashes,
},
}

if params.uploadPartParams.Size > n.neoFS.MaxObjectSize()/2 {
chunk = n.buffers.Get().(*[]byte)
isReturnToPool = true
} else {
smallChunk := make([]byte, params.uploadPartParams.Size)
chunk = &smallChunk
}

id, elements, err = n.manualSlice(ctx, params.bktInfo, prm, splitFirstID, splitPreviousID, *chunk, params.payloadReader)
if isReturnToPool {
n.buffers.Put(chunk)
Payload: params.payloadReader,
PayloadSize: uint64(params.decSize),
}

id, objHashBts, err := n.objectPutAndHash(ctx, prm, params.bktInfo)
if err != nil {
return nil, fmt.Errorf("manual slice: %w", err)
return nil, fmt.Errorf("object put and hash: %w", err)
}

partInfo := &data.PartInfo{
Expand All @@ -1299,7 +1375,7 @@ func (n *layer) uploadPartAsSlot(ctx context.Context, params uploadPartAsSlotPar
Number: params.uploadPartParams.PartNumber,
OID: id,
Size: params.decSize,
ETag: hex.EncodeToString(currentPartHash.Sum(nil)),
ETag: hex.EncodeToString(objHashBts),
Created: prm.CreationTime,
Elements: elements,
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/urfave/cli/v2 v2.27.4
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.26.0
golang.org/x/sync v0.8.0
google.golang.org/grpc v1.66.0
google.golang.org/protobuf v1.34.2
)
Expand All @@ -47,7 +48,6 @@ require (
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
go.etcd.io/bbolt v1.3.11 // indirect
golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 // indirect
golang.org/x/sync v0.8.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed // indirect
)

Expand Down
18 changes: 10 additions & 8 deletions internal/neofs/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,15 +284,17 @@ func newPartInfo(node NodeResponse) (*data.PartInfo, error) {
case homoHashKV:
partInfo.HomoHash = []byte(value)
case elementsKV:
elements := strings.Split(value, ",")
partInfo.Elements = make([]data.LinkObjectPayload, len(elements))
for i, e := range elements {
var element data.LinkObjectPayload
if err = element.Unmarshal(e); err != nil {
return nil, fmt.Errorf("invalid element: %w", err)
if value != "" {
elements := strings.Split(value, ",")
partInfo.Elements = make([]data.LinkObjectPayload, len(elements))
for i, e := range elements {
var element data.LinkObjectPayload
if err = element.Unmarshal(e); err != nil {
return nil, fmt.Errorf("invalid element: %w", err)
}

partInfo.Elements[i] = element
}

partInfo.Elements[i] = element
}
}
}
Expand Down

0 comments on commit cdd9ce6

Please sign in to comment.