diff --git a/api/data/tree.go b/api/data/tree.go index 7809c98e..7796a69b 100644 --- a/api/data/tree.go +++ b/api/data/tree.go @@ -75,6 +75,7 @@ type MultipartInfo struct { Created time.Time Meta map[string]string CopiesNumber uint32 + SplitID string } // PartInfo is upload information about part. @@ -89,6 +90,13 @@ type PartInfo struct { Created time.Time // Server creation time. ServerCreated time.Time + + // MultipartHash contains internal state of the [hash.Hash] to calculate whole object payload hash. + MultipartHash []byte + // HomoHash contains internal state of the [hash.Hash] to calculate whole object homomorphic payload hash. + HomoHash []byte + // Elements contain [oid.ID] object list for the current part. + Elements []oid.ID } // ToHeaderString form short part representation to use in S3-Completed-Parts header. diff --git a/api/handler/acl.go b/api/handler/acl.go index bc1cd39e..0c39e7d0 100644 --- a/api/handler/acl.go +++ b/api/handler/acl.go @@ -8,7 +8,6 @@ import ( "encoding/json" "encoding/xml" "errors" - stderrors "errors" "fmt" "net/http" "sort" @@ -1464,7 +1463,7 @@ func bucketACLToTable(acp *AccessControlPolicy) (*eacl.Table, error) { for _, grant := range acp.AccessControlList { if !isValidGrant(grant) { - return nil, stderrors.New("unsupported grantee") + return nil, errors.New("unsupported grantee") } if grant.Grantee.ID == acp.Owner.ID { found = true diff --git a/api/handler/handlers_test.go b/api/handler/handlers_test.go index 3cfa13d3..285e18a6 100644 --- a/api/handler/handlers_test.go +++ b/api/handler/handlers_test.go @@ -78,14 +78,14 @@ func prepareHandlerContext(t *testing.T) *handlerContext { require.NoError(t, err) anonSigner := user.NewAutoIDSignerRFC6979(anonKey.PrivateKey) + signer := user.NewAutoIDSignerRFC6979(key.PrivateKey) + owner := signer.UserID() + l := zap.NewExample() - tp := layer.NewTestNeoFS() + tp := layer.NewTestNeoFS(signer) testResolver := &contResolver{layer: tp} - signer := user.NewAutoIDSignerRFC6979(key.PrivateKey) - owner := signer.UserID() - layerCfg := &layer.Config{ Caches: layer.DefaultCachesConfigs(zap.NewExample()), GateKey: key, diff --git a/api/layer/layer.go b/api/layer/layer.go index 16149098..1cae32ea 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -9,6 +9,7 @@ import ( "net/url" "strconv" "strings" + "sync" "time" "github.com/nats-io/nats.go" @@ -50,6 +51,7 @@ type ( ncontroller EventListener cache *Cache treeService TreeService + buffers *sync.Pool } Config struct { @@ -266,6 +268,12 @@ func (f MsgHandlerFunc) HandleMessage(ctx context.Context, msg *nats.Msg) error // NewLayer creates an instance of a layer. It checks credentials // and establishes gRPC connection with the node. func NewLayer(log *zap.Logger, neoFS NeoFS, config *Config) Client { + buffers := sync.Pool{} + buffers.New = func() any { + b := make([]byte, neoFS.MaxObjectSize()) + return &b + } + return &layer{ neoFS: neoFS, log: log, @@ -273,6 +281,7 @@ func NewLayer(log *zap.Logger, neoFS NeoFS, config *Config) Client { resolver: config.Resolver, cache: NewCache(config.Caches), treeService: config.TreeService, + buffers: &buffers, } } diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index 7a51aa92..141caf0b 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -1,10 +1,14 @@ package layer import ( + "bytes" "context" + "crypto/sha256" + "encoding" "encoding/hex" - stderrors "errors" + "errors" "fmt" + "hash" "io" "sort" "strconv" @@ -16,16 +20,16 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/api/data" "github.com/nspcc-dev/neofs-s3-gw/api/layer/encryption" "github.com/nspcc-dev/neofs-s3-gw/api/s3errors" + "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/user" + "github.com/nspcc-dev/tzhash/tz" "go.uber.org/zap" "golang.org/x/exp/slices" ) const ( - UploadIDAttributeName = "S3-Upload-Id" - UploadPartNumberAttributeName = "S3-Upload-Part-Number" - UploadCompletedParts = "S3-Completed-Parts" + UploadCompletedParts = "S3-Completed-Parts" metaPrefix = "meta-" aclPrefix = "acl-" @@ -148,6 +152,7 @@ func (n *layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartPar Created: TimeNow(ctx), Meta: make(map[string]string, metaSize), CopiesNumber: p.CopiesNumber, + SplitID: object.NewSplitID().String(), } for key, val := range p.Header { @@ -176,7 +181,7 @@ func (n *layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartPar func (n *layer) UploadPart(ctx context.Context, p *UploadPartParams) (string, error) { multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Info.Bkt, p.Info.Key, p.Info.UploadID) if err != nil { - if stderrors.Is(err, ErrNodeNotFound) { + if errors.Is(err, ErrNodeNotFound) { return "", s3errors.GetAPIError(s3errors.ErrNoSuchUpload) } return "", err @@ -191,6 +196,10 @@ func (n *layer) UploadPart(ctx context.Context, p *UploadPartParams) (string, er return "", err } + if err = n.reUploadFollowingParts(ctx, *p, p.PartNumber, p.Info.Bkt, multipartInfo); err != nil { + return "", fmt.Errorf("reuploading parts: %w", err) + } + return objInfo.HashSum, nil } @@ -201,34 +210,121 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf return nil, s3errors.GetAPIError(s3errors.ErrInvalidEncryptionParameters) } - bktInfo := p.Info.Bkt - prm := PrmObjectCreate{ - Container: bktInfo.CID, - Creator: bktInfo.Owner, - Attributes: make([][2]string, 2), - Payload: p.Reader, - CreationTime: TimeNow(ctx), - CopiesNumber: multipartInfo.CopiesNumber, - } + var ( + bktInfo = p.Info.Bkt + payloadReader = p.Reader + decSize = p.Size + attributes [][2]string + ) - decSize := p.Size if p.Info.Encryption.Enabled() { r, encSize, err := encryptionReader(p.Reader, uint64(p.Size), p.Info.Encryption.Key()) if err != nil { return nil, fmt.Errorf("failed to create ecnrypted reader: %w", err) } - prm.Attributes = append(prm.Attributes, [2]string{AttributeDecryptedSize, strconv.FormatInt(p.Size, 10)}) - prm.Payload = r + attributes = append(attributes, [2]string{AttributeDecryptedSize, strconv.FormatInt(p.Size, 10)}) + payloadReader = r p.Size = int64(encSize) } - prm.Attributes[0][0], prm.Attributes[0][1] = UploadIDAttributeName, p.Info.UploadID - prm.Attributes[1][0], prm.Attributes[1][1] = UploadPartNumberAttributeName, strconv.Itoa(p.PartNumber) + var ( + splitPreviousID oid.ID + isSetSplitPreviousID bool + multipartHash = sha256.New() + tzHash hash.Hash + ) + + if n.neoFS.IsHomomorphicHashingEnabled() { + tzHash = tz.New() + } - id, hash, err := n.objectPutAndHash(ctx, prm, bktInfo) + lastPart, err := n.treeService.GetLastPart(ctx, bktInfo, multipartInfo.ID) if err != nil { - return nil, err + // if ErrPartListIsEmpty, there is the first part of multipart. + if !errors.Is(err, ErrPartListIsEmpty) { + return nil, fmt.Errorf("getLastPart: %w", err) + } + } else { + // try to restore hash state from the last part. + // the required interface is guaranteed according to the docs, so just cast without checks. + binaryUnmarshaler := multipartHash.(encoding.BinaryUnmarshaler) + if err = binaryUnmarshaler.UnmarshalBinary(lastPart.MultipartHash); err != nil { + return nil, fmt.Errorf("unmarshal previous part hash: %w", err) + } + + if tzHash != nil { + binaryUnmarshaler = tzHash.(encoding.BinaryUnmarshaler) + if err = binaryUnmarshaler.UnmarshalBinary(lastPart.HomoHash); err != nil { + return nil, fmt.Errorf("unmarshal previous part homo hash: %w", err) + } + } + + isSetSplitPreviousID = true + splitPreviousID = lastPart.OID + } + + var ( + id oid.ID + elements []oid.ID + creationTime = TimeNow(ctx) + // User may upload part large maxObjectSize in NeoFS. From users point of view it is a single object. + // We have to calculate the hash from this object separately. + currentPartHash = sha256.New() + ) + + objHashes := []hash.Hash{multipartHash, currentPartHash} + if tzHash != nil { + objHashes = append(objHashes, tzHash) + } + + prm := PrmObjectCreate{ + Container: bktInfo.CID, + Creator: bktInfo.Owner, + Attributes: attributes, + CreationTime: creationTime, + CopiesNumber: multipartInfo.CopiesNumber, + Multipart: &Multipart{ + SplitID: multipartInfo.SplitID, + MultipartHashes: objHashes, + }, + } + + chunk := n.buffers.Get().(*[]byte) + + // slice part manually. Simultaneously considering the part is a single object for user. + for { + if isSetSplitPreviousID { + prm.Multipart.SplitPreviousID = &splitPreviousID + } + + nBts, readErr := io.ReadAtLeast(payloadReader, *chunk, len(*chunk)) + if nBts > 0 { + prm.Payload = bytes.NewReader((*chunk)[:nBts]) + prm.PayloadSize = uint64(nBts) + + id, _, err = n.objectPutAndHash(ctx, prm, bktInfo) + if err != nil { + return nil, err + } + + isSetSplitPreviousID = true + splitPreviousID = id + elements = append(elements, id) + } + + if readErr == nil { + continue + } + + // If an EOF happens after reading fewer than min bytes, ReadAtLeast returns ErrUnexpectedEOF. + // We have the whole payload. + if !errors.Is(readErr, io.EOF) && !errors.Is(readErr, io.ErrUnexpectedEOF) { + return nil, fmt.Errorf("read payload chunk: %w", err) + } + + break } + n.buffers.Put(chunk) reqInfo := api.GetReqInfo(ctx) n.log.Debug("upload part", @@ -243,12 +339,30 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf Number: p.PartNumber, OID: id, Size: decSize, - ETag: hex.EncodeToString(hash), + ETag: hex.EncodeToString(currentPartHash.Sum(nil)), Created: prm.CreationTime, + Elements: elements, + } + + // encoding hash.Hash state to save it in tree service. + // the required interface is guaranteed according to the docs, so just cast without checks. + binaryMarshaler := multipartHash.(encoding.BinaryMarshaler) + partInfo.MultipartHash, err = binaryMarshaler.MarshalBinary() + if err != nil { + return nil, fmt.Errorf("marshalBinary: %w", err) + } + + if tzHash != nil { + binaryMarshaler = tzHash.(encoding.BinaryMarshaler) + partInfo.HomoHash, err = binaryMarshaler.MarshalBinary() + + if err != nil { + return nil, fmt.Errorf("marshalBinary: %w", err) + } } oldPartID, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo) - oldPartIDNotFound := stderrors.Is(err, ErrNoNodeToRemove) + oldPartIDNotFound := errors.Is(err, ErrNoNodeToRemove) if err != nil && !oldPartIDNotFound { return nil, err } @@ -275,10 +389,53 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf return objInfo, nil } +func (n *layer) reUploadFollowingParts(ctx context.Context, uploadParams UploadPartParams, partID int, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error { + parts, err := n.treeService.GetPartsAfter(ctx, bktInfo, multipartInfo.ID, partID) + if err != nil { + // nothing to re-upload. + if errors.Is(err, ErrPartListIsEmpty) { + return nil + } + + return fmt.Errorf("get parts after: %w", err) + } + + for _, part := range parts { + uploadParams.PartNumber = part.Number + + if err = n.reUploadPart(ctx, uploadParams, part.OID, bktInfo, multipartInfo); err != nil { + return fmt.Errorf("reupload number=%d: %w", part.Number, err) + } + } + + return nil +} + +func (n *layer) reUploadPart(ctx context.Context, uploadParams UploadPartParams, id oid.ID, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error { + obj, err := n.objectGet(ctx, bktInfo, id) + if err != nil { + return fmt.Errorf("get id=%s: %w", id.String(), err) + } + + uploadParams.Size = int64(obj.PayloadSize()) + uploadParams.Reader = bytes.NewReader(obj.Payload()) + + if _, err = n.uploadPart(ctx, multipartInfo, &uploadParams); err != nil { + return fmt.Errorf("upload id=%s: %w", id.String(), err) + } + + // remove old object, we just re-uploaded a new one. + if err = n.objectDelete(ctx, bktInfo, id); err != nil { + return fmt.Errorf("delete old id=%s: %w", id.String(), err) + } + + return nil +} + func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.ObjectInfo, error) { multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Info.Bkt, p.Info.Key, p.Info.UploadID) if err != nil { - if stderrors.Is(err, ErrNodeNotFound) { + if errors.Is(err, ErrNodeNotFound) { return nil, s3errors.GetAPIError(s3errors.ErrNoSuchUpload) } return nil, err @@ -317,46 +474,16 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data. Reader: pr, } - return n.uploadPart(ctx, multipartInfo, params) -} - -// implements io.Reader of payloads of the object list stored in the NeoFS network. -type multiObjectReader struct { - ctx context.Context - - layer *layer - - prm getParams - - curReader io.Reader - - parts []*data.PartInfo -} - -func (x *multiObjectReader) Read(p []byte) (n int, err error) { - if x.curReader != nil { - n, err = x.curReader.Read(p) - if !stderrors.Is(err, io.EOF) { - return n, err - } - } - - if len(x.parts) == 0 { - return n, io.EOF - } - - x.prm.oid = x.parts[0].OID - - x.curReader, err = x.layer.initObjectPayloadReader(x.ctx, x.prm) + objInfo, err := n.uploadPart(ctx, multipartInfo, params) if err != nil { - return n, fmt.Errorf("init payload reader for the next part: %w", err) + return nil, fmt.Errorf("upload part: %w", err) } - x.parts = x.parts[1:] - - next, err := x.Read(p[n:]) + if err = n.reUploadFollowingParts(ctx, *params, p.PartNumber, p.Info.Bkt, multipartInfo); err != nil { + return nil, fmt.Errorf("reuploading parts: %w", err) + } - return n + next, err + return objInfo, nil } func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*UploadData, *data.ExtendedObjectInfo, error) { @@ -378,8 +505,8 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar var multipartObjetSize int64 var encMultipartObjectSize uint64 - parts := make([]*data.PartInfo, 0, len(p.Parts)) - + var lastPartID int + var children []oid.ID var completedPartsHeader strings.Builder for i, part := range p.Parts { partInfo := partsInfo[part.PartNumber] @@ -390,7 +517,6 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar if i != len(p.Parts)-1 && partInfo.Size < uploadMinSize { return nil, nil, s3errors.GetAPIError(s3errors.ErrEntityTooSmall) } - parts = append(parts, partInfo) multipartObjetSize += partInfo.Size // even if encryption is enabled size is actual (decrypted) if encInfo.Enabled { @@ -408,6 +534,44 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar if _, err = completedPartsHeader.WriteString(partInfoStr); err != nil { return nil, nil, err } + + if part.PartNumber > lastPartID { + lastPartID = part.PartNumber + } + + children = append(children, partInfo.Elements...) + } + + multipartHash := sha256.New() + var homoHash hash.Hash + var splitPreviousID oid.ID + + if lastPartID > 0 { + lastPart := partsInfo[lastPartID] + + if lastPart != nil { + if len(lastPart.MultipartHash) > 0 { + splitPreviousID = lastPart.OID + + if len(lastPart.MultipartHash) > 0 { + binaryUnmarshaler := multipartHash.(encoding.BinaryUnmarshaler) + if err = binaryUnmarshaler.UnmarshalBinary(lastPart.MultipartHash); err != nil { + return nil, nil, fmt.Errorf("unmarshal last part hash: %w", err) + } + } + } + + if n.neoFS.IsHomomorphicHashingEnabled() && len(lastPart.HomoHash) > 0 { + homoHash = tz.New() + + if len(lastPart.MultipartHash) > 0 { + binaryUnmarshaler := homoHash.(encoding.BinaryUnmarshaler) + if err = binaryUnmarshaler.UnmarshalBinary(lastPart.HomoHash); err != nil { + return nil, nil, fmt.Errorf("unmarshal last part homo hash: %w", err) + } + } + } + } } initMetadata := make(map[string]string, len(multipartInfo.Meta)+1) @@ -435,45 +599,108 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar multipartObjetSize = int64(encMultipartObjectSize) } - r := &multiObjectReader{ - ctx: ctx, - layer: n, - parts: parts, - } - - r.prm.bktInfo = p.Info.Bkt - - extObjInfo, err := n.PutObject(ctx, &PutObjectParams{ + // This is our "big object". It doesn't have any payload. + prmHeaderObject := &PutObjectParams{ BktInfo: p.Info.Bkt, Object: p.Info.Key, - Reader: r, + Reader: bytes.NewBuffer(nil), Header: initMetadata, Size: multipartObjetSize, Encryption: p.Info.Encryption, CopiesNumber: multipartInfo.CopiesNumber, - }) + } + + header, err := n.prepareMultipartHeadObject(ctx, prmHeaderObject, multipartHash, homoHash, uint64(multipartObjetSize)) if err != nil { - n.log.Error("could not put a completed object (multipart upload)", - zap.String("uploadID", p.Info.UploadID), - zap.String("uploadKey", p.Info.Key), - zap.Error(err)) + return nil, nil, err + } - return nil, nil, s3errors.GetAPIError(s3errors.ErrInternalError) + // last part + prm := PrmObjectCreate{ + Container: p.Info.Bkt.CID, + Creator: p.Info.Bkt.Owner, + Filepath: p.Info.Key, + CreationTime: TimeNow(ctx), + CopiesNumber: multipartInfo.CopiesNumber, + Multipart: &Multipart{ + SplitID: multipartInfo.SplitID, + SplitPreviousID: &splitPreviousID, + HeaderObject: header, + }, + Payload: bytes.NewBuffer(nil), } - var addr oid.Address - addr.SetContainer(p.Info.Bkt.CID) - for _, partInfo := range partsInfo { - if err = n.objectDelete(ctx, p.Info.Bkt, partInfo.OID); err != nil { - n.log.Warn("could not delete upload part", - zap.Stringer("object id", &partInfo.OID), - zap.Stringer("bucket id", p.Info.Bkt.CID), - zap.Error(err)) - } - addr.SetObject(partInfo.OID) - n.cache.DeleteObject(addr) + lastPartObjID, _, err := n.objectPutAndHash(ctx, prm, p.Info.Bkt) + if err != nil { + return nil, nil, err } + children = append(children, lastPartObjID) + + // linking object + prm = PrmObjectCreate{ + Container: p.Info.Bkt.CID, + Creator: p.Info.Bkt.Owner, + CreationTime: TimeNow(ctx), + CopiesNumber: multipartInfo.CopiesNumber, + Multipart: &Multipart{ + SplitID: multipartInfo.SplitID, + HeaderObject: header, + Children: children, + }, + Payload: bytes.NewBuffer(nil), + } + + _, _, err = n.objectPutAndHash(ctx, prm, p.Info.Bkt) + if err != nil { + return nil, nil, err + } + + bktSettings, err := n.GetBucketSettings(ctx, p.Info.Bkt) + if err != nil { + return nil, nil, fmt.Errorf("couldn't get versioning settings object: %w", err) + } + + headerObjectID, _ := header.ID() + + // the "big object" is not presented in system, but we have to put correct info about it and its version. + + newVersion := &data.NodeVersion{ + BaseNodeVersion: data.BaseNodeVersion{ + FilePath: p.Info.Key, + Size: multipartObjetSize, + OID: headerObjectID, + ETag: hex.EncodeToString(multipartHash.Sum(nil)), + }, + IsUnversioned: !bktSettings.VersioningEnabled(), + } + + if newVersion.ID, err = n.treeService.AddVersion(ctx, p.Info.Bkt, newVersion); err != nil { + return nil, nil, fmt.Errorf("couldn't add multipart new verion to tree service: %w", err) + } + + n.cache.CleanListCacheEntriesContainingObject(p.Info.Key, p.Info.Bkt.CID) + + objInfo := &data.ObjectInfo{ + ID: headerObjectID, + CID: p.Info.Bkt.CID, + Owner: p.Info.Bkt.Owner, + Bucket: p.Info.Bkt.Name, + Name: p.Info.Key, + Size: multipartObjetSize, + Created: prm.CreationTime, + Headers: initMetadata, + ContentType: initMetadata[api.ContentType], + HashSum: newVersion.ETag, + } + + extObjInfo := &data.ExtendedObjectInfo{ + ObjectInfo: objInfo, + NodeVersion: newVersion, + } + + n.cache.PutObjectWithName(p.Info.Bkt.Owner, extObjInfo) + return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo.ID) } @@ -606,7 +833,7 @@ func (n *layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn func (n *layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, map[int]*data.PartInfo, error) { multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Bkt, p.Key, p.UploadID) if err != nil { - if stderrors.Is(err, ErrNodeNotFound) { + if errors.Is(err, ErrNodeNotFound) { return nil, nil, s3errors.GetAPIError(s3errors.ErrNoSuchUpload) } return nil, nil, err diff --git a/api/layer/neofs.go b/api/layer/neofs.go index 402f6860..dd1fee0c 100644 --- a/api/layer/neofs.go +++ b/api/layer/neofs.go @@ -3,6 +3,7 @@ package layer import ( "context" "errors" + "hash" "io" "time" @@ -114,6 +115,23 @@ type PrmObjectCreate struct { // Number of object copies that is enough to consider put successful. CopiesNumber uint32 + + Multipart *Multipart +} + +// Multipart contains info for local object slicing inside s3-gate during multipart upload operation. +type Multipart struct { + // MultipartHashes contains hashes for the multipart object payload calculation (optional). + MultipartHashes []hash.Hash + // SplitID contains splitID for multipart object (optional). + SplitID string + // SplitPreviousID contains [oid.ID] of previous object in chain (optional). + SplitPreviousID *oid.ID + // Children contains all objects in multipart chain, for linking object (optional). + Children []oid.ID + // HeaderObject is a virtual representation of complete multipart object (optional). It is used to set Parent in + // linking object. + HeaderObject *object.Object } // PrmObjectDelete groups parameters of NeoFS.DeleteObject operation. @@ -208,6 +226,9 @@ type NeoFS interface { // prevented the container from being created. CreateObject(context.Context, PrmObjectCreate) (oid.ID, error) + // FinalizeObjectWithPayloadChecksums fills and signs header object for complete multipart object. + FinalizeObjectWithPayloadChecksums(context.Context, object.Object, hash.Hash, hash.Hash, uint64) (*object.Object, error) + // DeleteObject marks the object to be removed from the NeoFS container by identifier. // Successful return does not guarantee actual removal. // @@ -223,4 +244,10 @@ type NeoFS interface { // // It returns any error encountered which prevented computing epochs. TimeToEpoch(ctx context.Context, now time.Time, future time.Time) (uint64, uint64, error) + + // MaxObjectSize returns configured payload size limit for object slicing when enabled. + MaxObjectSize() int64 + + // IsHomomorphicHashingEnabled shows if homomorphic hashing is enabled in config. + IsHomomorphicHashingEnabled() bool } diff --git a/api/layer/neofs_mock.go b/api/layer/neofs_mock.go index 868e5717..6ac92f23 100644 --- a/api/layer/neofs_mock.go +++ b/api/layer/neofs_mock.go @@ -8,6 +8,7 @@ import ( "encoding/hex" "errors" "fmt" + "hash" "io" "time" @@ -16,11 +17,13 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/checksum" "github.com/nspcc-dev/neofs-sdk-go/container" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" "github.com/nspcc-dev/neofs-sdk-go/eacl" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/session" "github.com/nspcc-dev/neofs-sdk-go/user" + "github.com/nspcc-dev/tzhash/tz" ) type TestNeoFS struct { @@ -30,13 +33,15 @@ type TestNeoFS struct { containers map[string]*container.Container eaclTables map[string]*eacl.Table currentEpoch uint64 + signer neofscrypto.Signer } -func NewTestNeoFS() *TestNeoFS { +func NewTestNeoFS(signer neofscrypto.Signer) *TestNeoFS { return &TestNeoFS{ objects: make(map[string]*object.Object), containers: make(map[string]*container.Container), eaclTables: make(map[string]*eacl.Table), + signer: signer, } } @@ -143,26 +148,97 @@ func (t *TestNeoFS) ReadObject(ctx context.Context, prm PrmObjectRead) (*ObjectP sAddr := addr.EncodeToString() - if obj, ok := t.objects[sAddr]; ok { - owner := getOwner(ctx) - if !obj.OwnerID().Equals(owner) { - return nil, ErrAccessDenied + obj, ok := t.objects[sAddr] + if !ok { + // trying to find linking object. + for _, o := range t.objects { + parentID, isSet := o.ParentID() + if !isSet { + continue + } + + if !parentID.Equals(prm.Object) { + continue + } + + if len(o.Children()) == 0 { + continue + } + + // linking object is found. + objPart, err := t.constructMupltipartObject(ctx, prm.Container, o) + if err != nil { + return nil, err + } + + obj = objPart.Head + + pl, err := io.ReadAll(objPart.Payload) + if err != nil { + return nil, err + } + + obj.SetPayload(pl) + ok = true + break } + } + + if !ok { + return nil, fmt.Errorf("object not found %s", addr) + } + + owner := getOwner(ctx) + if !obj.OwnerID().Equals(owner) { + return nil, ErrAccessDenied + } + + payload := obj.Payload() + + if prm.PayloadRange[0]+prm.PayloadRange[1] > 0 { + off := prm.PayloadRange[0] + payload = payload[off : off+prm.PayloadRange[1]] + } + + return &ObjectPart{ + Head: obj, + Payload: io.NopCloser(bytes.NewReader(payload)), + }, nil +} + +func (t *TestNeoFS) constructMupltipartObject(ctx context.Context, containerID cid.ID, linkingObject *object.Object) (*ObjectPart, error) { + if _, isSet := linkingObject.ParentID(); !isSet { + return nil, fmt.Errorf("linking object is invalid") + } + + var ( + addr oid.Address + headObject = linkingObject.Parent() + payloadReaders = make([]io.Reader, 0, len(linkingObject.Children())) + childList = linkingObject.Children() + ) - payload := obj.Payload() + addr.SetContainer(containerID) - if prm.PayloadRange[0]+prm.PayloadRange[1] > 0 { - off := prm.PayloadRange[0] - payload = payload[off : off+prm.PayloadRange[1]] + for _, c := range childList { + addr.SetObject(c) + + objPart, err := t.ReadObject(ctx, PrmObjectRead{ + Container: containerID, + Object: c, + }) + + if err != nil { + return nil, fmt.Errorf("child read: %w", err) } - return &ObjectPart{ - Head: obj, - Payload: io.NopCloser(bytes.NewReader(payload)), - }, nil + payloadReaders = append(payloadReaders, objPart.Payload) } - return nil, fmt.Errorf("object not found %s", addr) + return &ObjectPart{ + Head: headObject, + Payload: io.NopCloser(io.MultiReader(payloadReaders...)), + }, nil } func (t *TestNeoFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID, error) { @@ -194,6 +270,32 @@ func (t *TestNeoFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID obj.SetOwnerID(&prm.Creator) t.currentEpoch++ + if prm.Multipart != nil && prm.Multipart.SplitID != "" { + var split object.SplitID + if err := split.Parse(prm.Multipart.SplitID); err != nil { + return oid.ID{}, fmt.Errorf("split parse: %w", err) + } + obj.SetSplitID(&split) + + if prm.Multipart.SplitPreviousID != nil { + obj.SetPreviousID(*prm.Multipart.SplitPreviousID) + } + + if len(prm.Multipart.Children) > 0 { + obj.SetChildren(prm.Multipart.Children...) + } + + if prm.Multipart.HeaderObject != nil { + id, isSet := prm.Multipart.HeaderObject.ID() + if !isSet { + return oid.ID{}, errors.New("HeaderObject id is not set") + } + + obj.SetParentID(id) + obj.SetParent(prm.Multipart.HeaderObject) + } + } + if len(prm.Locks) > 0 { var lock object.Lock lock.WriteMembers(prm.Locks) @@ -220,6 +322,32 @@ func (t *TestNeoFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID return objID, nil } +func (t *TestNeoFS) FinalizeObjectWithPayloadChecksums(_ context.Context, header object.Object, metaChecksum hash.Hash, homomorphicChecksum hash.Hash, payloadLength uint64) (*object.Object, error) { + header.SetCreationEpoch(t.currentEpoch) + + var cs checksum.Checksum + + var csBytes [sha256.Size]byte + copy(csBytes[:], metaChecksum.Sum(nil)) + + cs.SetSHA256(csBytes) + header.SetPayloadChecksum(cs) + + if homomorphicChecksum != nil { + var csHomoBytes [tz.Size]byte + copy(csHomoBytes[:], homomorphicChecksum.Sum(nil)) + + cs.SetTillichZemor(csHomoBytes) + header.SetPayloadHomomorphicHash(cs) + } + + header.SetPayloadSize(payloadLength) + if err := header.SetIDWithSignature(t.signer); err != nil { + return nil, fmt.Errorf("setIDWithSignature: %w", err) + } + return &header, nil +} + func (t *TestNeoFS) DeleteObject(ctx context.Context, prm PrmObjectDelete) error { var addr oid.Address addr.SetContainer(prm.Container) @@ -241,6 +369,15 @@ func (t *TestNeoFS) TimeToEpoch(_ context.Context, now, futureTime time.Time) (u return t.currentEpoch, t.currentEpoch + uint64(futureTime.Sub(now).Seconds()), nil } +func (t *TestNeoFS) MaxObjectSize() int64 { + // 64 MB + return 67108864 +} + +func (t *TestNeoFS) IsHomomorphicHashingEnabled() bool { + return false +} + func (t *TestNeoFS) AllObjects(cnrID cid.ID) []oid.ID { result := make([]oid.ID, 0, len(t.objects)) diff --git a/api/layer/object.go b/api/layer/object.go index 173d9188..7ec1ecf2 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "errors" "fmt" + "hash" "io" "mime" "path/filepath" @@ -13,6 +14,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/minio/sio" "github.com/nspcc-dev/neofs-s3-gw/api" @@ -23,6 +25,7 @@ import ( cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/nspcc-dev/neofs-sdk-go/version" "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) @@ -309,6 +312,62 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend return extendedObjInfo, nil } +func (n *layer) prepareMultipartHeadObject(ctx context.Context, p *PutObjectParams, payloadHash hash.Hash, homoHash hash.Hash, payloadLength uint64) (*object.Object, error) { + var ( + err error + owner = n.Owner(ctx) + ) + + if p.Encryption.Enabled() { + p.Header[AttributeDecryptedSize] = strconv.FormatInt(p.Size, 10) + if err = addEncryptionHeaders(p.Header, p.Encryption); err != nil { + return nil, fmt.Errorf("add encryption header: %w", err) + } + + var encSize uint64 + if _, encSize, err = encryptionReader(p.Reader, uint64(p.Size), p.Encryption.Key()); err != nil { + return nil, fmt.Errorf("create encrypter: %w", err) + } + p.Size = int64(encSize) + } + + var headerObject object.Object + headerObject.SetContainerID(p.BktInfo.CID) + headerObject.SetType(object.TypeRegular) + headerObject.SetOwnerID(&owner) + + currentVersion := version.Current() + headerObject.SetVersion(¤tVersion) + + attributes := make([]object.Attribute, 0, len(p.Header)) + for k, v := range p.Header { + if v == "" { + return nil, ErrMetaEmptyParameterValue + } + + attributes = append(attributes, *object.NewAttribute(k, v)) + } + + creationTime := TimeNow(ctx) + if creationTime.IsZero() { + creationTime = time.Now() + } + attributes = append(attributes, *object.NewAttribute(object.AttributeTimestamp, strconv.FormatInt(creationTime.Unix(), 10))) + + if p.Object != "" { + attributes = append(attributes, *object.NewAttribute(object.AttributeFilePath, p.Object)) + } + + headerObject.SetAttributes(attributes...) + + multipartHeader, err := n.neoFS.FinalizeObjectWithPayloadChecksums(ctx, headerObject, payloadHash, homoHash, payloadLength) + if err != nil { + return nil, fmt.Errorf("FinalizeObjectWithPayloadChecksums: %w", err) + } + + return multipartHeader, nil +} + func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.BucketInfo, objectName string) (*data.ExtendedObjectInfo, error) { owner := n.Owner(ctx) if extObjInfo := n.cache.GetLastObject(owner, bkt.Name, objectName); extObjInfo != nil { @@ -416,6 +475,11 @@ func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktIn hash := sha256.New() prm.Payload = wrapReader(prm.Payload, 64*1024, func(buf []byte) { hash.Write(buf) + if prm.Multipart != nil { + for _, h := range prm.Multipart.MultipartHashes { + h.Write(buf) + } + } }) id, err := n.neoFS.CreateObject(ctx, prm) if err != nil { diff --git a/api/layer/tree_mock.go b/api/layer/tree_mock.go index e871c415..59ab5fdd 100644 --- a/api/layer/tree_mock.go +++ b/api/layer/tree_mock.go @@ -8,6 +8,7 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/api/data" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "golang.org/x/exp/slices" ) type TreeServiceMock struct { @@ -362,6 +363,90 @@ LOOP: return result, nil } +func (t *TreeServiceMock) GetLastPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) (*data.PartInfo, error) { + parts, err := t.GetParts(ctx, bktInfo, multipartNodeID) + if err != nil { + return nil, fmt.Errorf("get parts: %w", err) + } + + if len(parts) == 0 { + return nil, ErrPartListIsEmpty + } + + // Sort parts by part number, then by server creation time to make actual last uploaded parts with the same number. + slices.SortFunc(parts, func(a, b *data.PartInfo) int { + if a.Number < b.Number { + return -1 + } + + if a.ServerCreated.Before(b.ServerCreated) { + return -1 + } + + if a.ServerCreated.Equal(b.ServerCreated) { + return 0 + } + + return 1 + }) + + return parts[len(parts)-1], nil +} + +func (t *TreeServiceMock) GetPartsAfter(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, partID int) ([]*data.PartInfo, error) { + parts, err := t.GetParts(ctx, bktInfo, multipartNodeID) + if err != nil { + return nil, err + } + + mp := make(map[int]*data.PartInfo) + for _, partInfo := range parts { + if partInfo.Number <= partID { + continue + } + + mapped, ok := mp[partInfo.Number] + if !ok { + mp[partInfo.Number] = partInfo + continue + } + + if mapped.ServerCreated.After(partInfo.ServerCreated) { + continue + } + + mp[partInfo.Number] = partInfo + } + + if len(mp) == 0 { + return nil, ErrPartListIsEmpty + } + + result := make([]*data.PartInfo, 0, len(mp)) + for _, p := range mp { + result = append(result, p) + } + + // Sort parts by part number, then by server creation time to make actual last uploaded parts with the same number. + slices.SortFunc(result, func(a, b *data.PartInfo) int { + if a.Number < b.Number { + return -1 + } + + if a.ServerCreated.Before(b.ServerCreated) { + return -1 + } + + if a.ServerCreated.Equal(b.ServerCreated) { + return 0 + } + + return 1 + }) + + return result, nil +} + func (t *TreeServiceMock) DeleteMultipartUpload(_ context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error { cnrMultipartsMap := t.multiparts[bktInfo.CID.EncodeToString()] diff --git a/api/layer/tree_service.go b/api/layer/tree_service.go index 4a30910d..4494cf3b 100644 --- a/api/layer/tree_service.go +++ b/api/layer/tree_service.go @@ -74,6 +74,12 @@ type TreeService interface { // If object id to remove is not found returns ErrNoNodeToRemove error. AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) + // GetLastPart returns the latest uploaded part. + // + // Return errors: + // - [ErrPartListIsEmpty] if there is no parts in the upload id. + GetLastPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) (*data.PartInfo, error) + GetPartsAfter(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, partID int) ([]*data.PartInfo, error) // Compound methods for optimizations @@ -90,4 +96,7 @@ var ( // ErrNoNodeToRemove is returned from Tree service in case of the lack of node with OID to remove. ErrNoNodeToRemove = errors.New("no node to remove") + + // ErrPartListIsEmpty is returned if no parts available for the upload. + ErrPartListIsEmpty = errors.New("part list is empty") ) diff --git a/api/layer/versioning_test.go b/api/layer/versioning_test.go index fc79ccb3..147a94e2 100644 --- a/api/layer/versioning_test.go +++ b/api/layer/versioning_test.go @@ -152,7 +152,7 @@ func prepareContext(t *testing.T, cachesConfig ...*CachesConfig) *testContext { GateKey: key.PublicKey(), }, }) - tp := NewTestNeoFS() + tp := NewTestNeoFS(signer) bktName := "testbucket1" bktID, err := tp.CreateContainer(ctx, PrmContainerCreate{ diff --git a/go.mod b/go.mod index 024391ed..8910d79c 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240125143754-70b1ffbd8141 github.com/nspcc-dev/neofs-contract v0.19.1 github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20240130135633-cb11d035a4ed + github.com/nspcc-dev/tzhash v1.8.0 github.com/panjf2000/ants/v2 v2.5.0 github.com/prometheus/client_golang v1.13.0 github.com/spf13/pflag v1.0.5 @@ -65,7 +66,6 @@ require ( github.com/nats-io/nuid v1.0.1 // indirect github.com/nspcc-dev/neofs-crypto v0.4.0 // indirect github.com/nspcc-dev/rfc6979 v0.2.0 // indirect - github.com/nspcc-dev/tzhash v1.7.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.37.0 // indirect diff --git a/go.sum b/go.sum index bd24db96..421f6ad6 100644 --- a/go.sum +++ b/go.sum @@ -248,8 +248,8 @@ github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20240130135633-cb11d035a4ed h1: github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20240130135633-cb11d035a4ed/go.mod h1:dQzqPhx+7TUeEXCpOThNHxJqNgSoJzCfTcfstTlEQEA= github.com/nspcc-dev/rfc6979 v0.2.0 h1:3e1WNxrN60/6N0DW7+UYisLeZJyfqZTNOjeV/toYvOE= github.com/nspcc-dev/rfc6979 v0.2.0/go.mod h1:exhIh1PdpDC5vQmyEsGvc4YDM/lyQp/452QxGq/UEso= -github.com/nspcc-dev/tzhash v1.7.1 h1:6zmexLqdTF/ssbUAh7XJS7RxgKWaw28kdNpE/4UFdEU= -github.com/nspcc-dev/tzhash v1.7.1/go.mod h1:cIZAGSF8wA9Q8I9Yj9Ytc/IFpsdA54ZAQ5dLXijq178= +github.com/nspcc-dev/tzhash v1.8.0 h1:pJvzME2mZzP/h5rcy/Wb6amT9FJBFeKbJ3HEnWEeUpY= +github.com/nspcc-dev/tzhash v1.8.0/go.mod h1:oHiH0qwmTsZkeVs7pvCS5cVXUaLhXxSFvnmnZ++ijm4= github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= diff --git a/internal/neofs/neofs.go b/internal/neofs/neofs.go index 44a2c20f..40993933 100644 --- a/internal/neofs/neofs.go +++ b/internal/neofs/neofs.go @@ -3,9 +3,11 @@ package neofs import ( "bytes" "context" + "crypto/sha256" "encoding/hex" "errors" "fmt" + "hash" "io" "math" "strconv" @@ -16,6 +18,7 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/api/layer" "github.com/nspcc-dev/neofs-s3-gw/authmate" "github.com/nspcc-dev/neofs-s3-gw/creds/tokens" + "github.com/nspcc-dev/neofs-sdk-go/checksum" "github.com/nspcc-dev/neofs-sdk-go/client" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" "github.com/nspcc-dev/neofs-sdk-go/container" @@ -30,6 +33,7 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/stat" "github.com/nspcc-dev/neofs-sdk-go/user" "github.com/nspcc-dev/neofs-sdk-go/waiter" + "github.com/nspcc-dev/tzhash/tz" ) // Config allows to configure some [NeoFS] parameters. @@ -270,6 +274,32 @@ func (x *NeoFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oi obj.SetAttributes(attrs...) obj.SetPayloadSize(prm.PayloadSize) + if prm.Multipart != nil && prm.Multipart.SplitID != "" { + var split object.SplitID + if err := split.Parse(prm.Multipart.SplitID); err != nil { + return oid.ID{}, fmt.Errorf("parse split ID: %w", err) + } + obj.SetSplitID(&split) + + if prm.Multipart.SplitPreviousID != nil { + obj.SetPreviousID(*prm.Multipart.SplitPreviousID) + } + + if len(prm.Multipart.Children) > 0 { + obj.SetChildren(prm.Multipart.Children...) + } + + if prm.Multipart.HeaderObject != nil { + id, isSet := prm.Multipart.HeaderObject.ID() + if !isSet { + return oid.ID{}, errors.New("HeaderObject id is not set") + } + + obj.SetParentID(id) + obj.SetParent(prm.Multipart.HeaderObject) + } + } + if len(prm.Locks) > 0 { var lock object.Lock lock.WriteMembers(prm.Locks) @@ -345,6 +375,34 @@ func (x *NeoFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oi return writer.GetResult().StoredObjectID(), nil } +// FinalizeObjectWithPayloadChecksums implements neofs.NeoFS interface method. +func (x *NeoFS) FinalizeObjectWithPayloadChecksums(ctx context.Context, header object.Object, metaChecksum hash.Hash, homomorphicChecksum hash.Hash, payloadLength uint64) (*object.Object, error) { + header.SetCreationEpoch(x.epochGetter.CurrentEpoch()) + + var cs checksum.Checksum + + var csBytes [sha256.Size]byte + copy(csBytes[:], metaChecksum.Sum(nil)) + + cs.SetSHA256(csBytes) + header.SetPayloadChecksum(cs) + + if homomorphicChecksum != nil { + var csHomoBytes [tz.Size]byte + copy(csHomoBytes[:], homomorphicChecksum.Sum(nil)) + + cs.SetTillichZemor(csHomoBytes) + header.SetPayloadHomomorphicHash(cs) + } + + header.SetPayloadSize(payloadLength) + if err := header.SetIDWithSignature(x.signer(ctx)); err != nil { + return nil, fmt.Errorf("setIDWithSignature: %w", err) + } + + return &header, nil +} + // wraps io.ReadCloser and transforms Read errors related to access violation // to neofs.ErrAccessDenied. type payloadReader struct { @@ -468,6 +526,16 @@ func (x *NeoFS) DeleteObject(ctx context.Context, prm layer.PrmObjectDelete) err return nil } +// MaxObjectSize returns configured payload size limit for object slicing when enabled. +func (x *NeoFS) MaxObjectSize() int64 { + return x.cfg.MaxObjectSize +} + +// IsHomomorphicHashingEnabled shows if homomorphic hashing is enabled in config. +func (x *NeoFS) IsHomomorphicHashingEnabled() bool { + return x.cfg.IsHomomorphicEnabled +} + func isErrAccessDenied(err error) (string, bool) { unwrappedErr := errors.Unwrap(err) for unwrappedErr != nil { diff --git a/internal/neofs/tree.go b/internal/neofs/tree.go index 5534daff..76fa7aba 100644 --- a/internal/neofs/tree.go +++ b/internal/neofs/tree.go @@ -17,6 +17,7 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/internal/neofs/services/tree" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/user" + "golang.org/x/exp/slices" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) @@ -55,9 +56,13 @@ const ( isUnversionedKV = "IsUnversioned" isTagKV = "IsTag" uploadIDKV = "UploadId" + splitIDKV = "SplitId" partNumberKV = "Number" sizeKV = "Size" etagKV = "ETag" + multipartHashKV = "MultipartHashes" + homoHashKV = "HomoHash" + elementsKV = "Elements" // keys for lock. isLockKV = "IsLock" @@ -221,6 +226,8 @@ func newMultipartInfo(node NodeResponse) (*data.MultipartInfo, error) { } case ownerKV: _ = multipartInfo.Owner.DecodeString(string(kv.GetValue())) + case splitIDKV: + multipartInfo.SplitID = string(kv.GetValue()) default: multipartInfo.Meta[kv.GetKey()] = string(kv.GetValue()) } @@ -266,6 +273,21 @@ func newPartInfo(node NodeResponse) (*data.PartInfo, error) { return nil, fmt.Errorf("invalid server created timestamp: %w", err) } partInfo.ServerCreated = time.UnixMilli(utcMilli) + case multipartHashKV: + partInfo.MultipartHash = []byte(value) + case homoHashKV: + partInfo.HomoHash = []byte(value) + case elementsKV: + elements := strings.Split(value, ",") + partInfo.Elements = make([]oid.ID, len(elements)) + for i, e := range elements { + var id oid.ID + if err = id.DecodeString(e); err != nil { + return nil, fmt.Errorf("invalid oid: %w", err) + } + + partInfo.Elements[i] = id + } } } @@ -909,6 +931,11 @@ func (c *TreeClient) AddPart(ctx context.Context, bktInfo *data.BucketInfo, mult return oid.ID{}, err } + elements := make([]string, len(info.Elements)) + for i, e := range info.Elements { + elements[i] = e.String() + } + meta := map[string]string{ partNumberKV: strconv.Itoa(info.Number), oidKV: info.OID.EncodeToString(), @@ -916,6 +943,9 @@ func (c *TreeClient) AddPart(ctx context.Context, bktInfo *data.BucketInfo, mult createdKV: strconv.FormatInt(info.Created.UTC().UnixMilli(), 10), serverCreatedKV: strconv.FormatInt(time.Now().UTC().UnixMilli(), 10), etagKV: info.ETag, + multipartHashKV: string(info.MultipartHash), + homoHashKV: string(info.HomoHash), + elementsKV: strings.Join(elements, ","), } var foundPartID uint64 @@ -965,6 +995,105 @@ func (c *TreeClient) GetParts(ctx context.Context, bktInfo *data.BucketInfo, mul return result, nil } +func (c *TreeClient) GetLastPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) (*data.PartInfo, error) { + parts, err := c.GetParts(ctx, bktInfo, multipartNodeID) + if err != nil { + return nil, fmt.Errorf("get parts: %w", err) + } + + if len(parts) == 0 { + return nil, layer.ErrPartListIsEmpty + } + + // Sort parts by part number, then by server creation time to make actual last uploaded parts with the same number. + slices.SortFunc(parts, func(a, b *data.PartInfo) int { + if a.Number < b.Number { + return -1 + } + + if a.ServerCreated.Before(b.ServerCreated) { + return -1 + } + + if a.ServerCreated.Equal(b.ServerCreated) { + return 0 + } + + return 1 + }) + + return parts[len(parts)-1], nil +} + +// GetPartsAfter returns parts uploaded after partID. These parts are sorted and filtered by creation time. +// It means, if any upload had a re-uploaded data (few part versions), the list contains only the latest version of the upload. +func (c *TreeClient) GetPartsAfter(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, partID int) ([]*data.PartInfo, error) { + parts, err := c.getSubTree(ctx, bktInfo, systemTree, multipartNodeID, 2) + if err != nil { + return nil, err + } + + if len(parts) == 0 { + return nil, layer.ErrPartListIsEmpty + } + + mp := make(map[int]*data.PartInfo) + for _, part := range parts { + if part.GetNodeId() == multipartNodeID { + continue + } + + partInfo, err := newPartInfo(part) + if err != nil { + continue + } + + if partInfo.Number <= partID { + continue + } + + mapped, ok := mp[partInfo.Number] + if !ok { + mp[partInfo.Number] = partInfo + continue + } + + if mapped.ServerCreated.After(partInfo.ServerCreated) { + continue + } + + mp[partInfo.Number] = partInfo + } + + if len(mp) == 0 { + return nil, layer.ErrPartListIsEmpty + } + + result := make([]*data.PartInfo, 0, len(mp)) + for _, p := range mp { + result = append(result, p) + } + + // Sort parts by part number, then by server creation time to make actual last uploaded parts with the same number. + slices.SortFunc(result, func(a, b *data.PartInfo) int { + if a.Number < b.Number { + return -1 + } + + if a.ServerCreated.Before(b.ServerCreated) { + return -1 + } + + if a.ServerCreated.Equal(b.ServerCreated) { + return 0 + } + + return 1 + }) + + return result, nil +} + func (c *TreeClient) DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error { return c.removeNode(ctx, bktInfo, systemTree, multipartNodeID) } @@ -1191,6 +1320,7 @@ func metaFromMultipart(info *data.MultipartInfo, fileName string) map[string]str info.Meta[uploadIDKV] = info.UploadID info.Meta[ownerKV] = info.Owner.EncodeToString() info.Meta[createdKV] = strconv.FormatInt(info.Created.UTC().UnixMilli(), 10) + info.Meta[splitIDKV] = info.SplitID return info.Meta }