Skip to content

Commit

Permalink
neofs: Deduplicate object attributes
Browse files Browse the repository at this point in the history
Closes #1054.

Signed-off-by: Evgenii Baidakov <[email protected]>
  • Loading branch information
smallhive committed Jan 22, 2025
1 parent c163d4f commit f6b8e54
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 56 deletions.
18 changes: 8 additions & 10 deletions api/layer/multipart_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ type (
bktInfo *data.BucketInfo
multipartInfo *data.MultipartInfo
tzHash hash.Hash
attributes [][2]string
attributes map[string]string
uploadPartParams *UploadPartParams
creationTime time.Time
payloadReader io.Reader
Expand Down Expand Up @@ -257,15 +257,15 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
bktInfo = p.Info.Bkt
payloadReader = p.Reader
decSize = p.Size
attributes [][2]string
attributes = make(map[string]string)
)

if p.Info.Encryption.Enabled() {
r, encSize, err := encryptionReader(p.Reader, uint64(p.Size), p.Info.Encryption.Key())
if err != nil {
return nil, fmt.Errorf("failed to create ecnrypted reader: %w", err)
}
attributes = append(attributes, [2]string{AttributeDecryptedSize, strconv.FormatInt(p.Size, 10)})
attributes[AttributeDecryptedSize] = strconv.FormatInt(p.Size, 10)
payloadReader = r
p.Size = int64(encSize)
}
Expand Down Expand Up @@ -452,7 +452,7 @@ func (n *layer) uploadZeroPart(ctx context.Context, multipartInfo *data.Multipar

var (
bktInfo = p.Bkt
attributes [][2]string
attributes = make(map[string]string)
multipartHash = sha256.New()
tzHash hash.Hash
id oid.ID
Expand All @@ -462,7 +462,7 @@ func (n *layer) uploadZeroPart(ctx context.Context, multipartInfo *data.Multipar
)

if p.Encryption.Enabled() {
attributes = append(attributes, [2]string{AttributeDecryptedSize, "0"})
attributes[AttributeDecryptedSize] = "0"
}

if n.neoFS.IsHomomorphicHashingEnabled() {
Expand Down Expand Up @@ -1348,11 +1348,9 @@ func (n *layer) uploadPartAsSlot(ctx context.Context, params uploadPartAsSlotPar
multipartHash = sha256.New()
)

params.attributes = append(params.attributes,
[2]string{headerS3MultipartUpload, params.multipartInfo.UploadID},
[2]string{headerS3MultipartNumber, strconv.FormatInt(int64(params.uploadPartParams.PartNumber), 10)},
[2]string{headerS3MultipartCreated, strconv.FormatInt(time.Now().UnixNano(), 10)},
)
params.attributes[headerS3MultipartUpload] = params.multipartInfo.UploadID
params.attributes[headerS3MultipartNumber] = strconv.FormatInt(int64(params.uploadPartParams.PartNumber), 10)
params.attributes[headerS3MultipartCreated] = strconv.FormatInt(time.Now().UnixNano(), 10)

prm := PrmObjectCreate{
Container: params.bktInfo.CID,
Expand Down
2 changes: 1 addition & 1 deletion api/layer/neofs.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type PrmObjectCreate struct {
Creator user.ID

// Key-value object attributes.
Attributes [][2]string
Attributes map[string]string

// Value for Timestamp attribute (optional).
CreationTime time.Time
Expand Down
4 changes: 2 additions & 2 deletions api/layer/neofs_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,8 @@ func (t *TestNeoFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID
attrs = append(attrs, *a)
}

for i := range prm.Attributes {
a := object.NewAttribute(prm.Attributes[i][0], prm.Attributes[i][1])
for k, v := range prm.Attributes {
a := object.NewAttribute(k, v)
attrs = append(attrs, *a)
}

Expand Down
17 changes: 7 additions & 10 deletions api/layer/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,12 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
}
}

for _, v := range p.Header {
if v == "" {
return nil, ErrMetaEmptyParameterValue
}
}

prm := PrmObjectCreate{
Container: p.BktInfo.CID,
Creator: owner,
Expand All @@ -241,16 +247,7 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
Payload: r,
CreationTime: TimeNow(ctx),
CopiesNumber: p.CopiesNumber,
}

prm.Attributes = make([][2]string, 0, len(p.Header))

for k, v := range p.Header {
if v == "" {
return nil, ErrMetaEmptyParameterValue
}

prm.Attributes = append(prm.Attributes, [2]string{k, v})
Attributes: p.Header,
}

id, hash, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
Expand Down
10 changes: 4 additions & 6 deletions api/layer/system_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,11 @@ func (n *layer) PutBucketSettings(ctx context.Context, p *PutSettingsParams) err
return nil
}

func (n *layer) attributesFromLock(ctx context.Context, lock *data.ObjectLock) ([][2]string, error) {
func (n *layer) attributesFromLock(ctx context.Context, lock *data.ObjectLock) (map[string]string, error) {
var (
err error
expEpoch uint64
result [][2]string
result = make(map[string]string)
)

if lock.Retention != nil {
Expand All @@ -230,7 +230,7 @@ func (n *layer) attributesFromLock(ctx context.Context, lock *data.ObjectLock) (
}

if lock.Retention.IsCompliance {
result = append(result, [2]string{AttributeComplianceMode, "true"})
result[AttributeComplianceMode] = "true"
}
}

Expand All @@ -242,9 +242,7 @@ func (n *layer) attributesFromLock(ctx context.Context, lock *data.ObjectLock) (
}

if expEpoch != 0 {
result = append(result, [2]string{
object.AttributeExpirationEpoch, strconv.FormatUint(expEpoch, 10),
})
result[object.AttributeExpirationEpoch] = strconv.FormatUint(expEpoch, 10)
}

return result, nil
Expand Down
48 changes: 21 additions & 27 deletions internal/neofs/neofs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"fmt"
"hash"
"io"
"maps"
"math"
"strconv"
"sync"
Expand Down Expand Up @@ -262,39 +263,33 @@ func (x *NeoFS) signMultipartObject(obj *object.Object, signer neofscrypto.Signe

// CreateObject implements neofs.NeoFS interface method.
func (x *NeoFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oid.ID, error) {
attrNum := len(prm.Attributes) + 1 // + creation time

if prm.Filepath != "" {
attrNum++
}

attrs := make([]object.Attribute, 0, attrNum)

creationTime := prm.CreationTime
if creationTime.IsZero() {
creationTime = time.Now()
}
var a *object.Attribute
a = object.NewAttribute(object.AttributeTimestamp, strconv.FormatInt(creationTime.Unix(), 10))

attrs = append(attrs, *a)
nonce := make([]byte, objectNonceSize)
if _, err := rand.Read(nonce); err != nil {
return oid.ID{}, fmt.Errorf("object nonce: %w", err)
}

for i := range prm.Attributes {
a = object.NewAttribute(prm.Attributes[i][0], prm.Attributes[i][1])
attrs = append(attrs, *a)
uniqAttributes := maps.Clone(prm.Attributes)
if uniqAttributes == nil {
uniqAttributes = make(map[string]string)
}

uniqAttributes[object.AttributeTimestamp] = strconv.FormatInt(creationTime.Unix(), 10)
uniqAttributes[objectNonceAttribute] = base64.StdEncoding.EncodeToString(nonce)

if prm.Filepath != "" {
a = object.NewAttribute(object.AttributeFilePath, prm.Filepath)
attrs = append(attrs, *a)
uniqAttributes[object.AttributeFilePath] = prm.Filepath
}

nonce := make([]byte, objectNonceSize)
if _, err := rand.Read(nonce); err != nil {
return oid.ID{}, fmt.Errorf("object nonce: %w", err)
attrs := make([]object.Attribute, 0, len(uniqAttributes))
for k, v := range uniqAttributes {
attr := object.NewAttribute(k, v)
attrs = append(attrs, *attr)
}
objectNonceAttr := object.NewAttribute(objectNonceAttribute, base64.StdEncoding.EncodeToString(nonce))
attrs = append(attrs, *objectNonceAttr)

var obj object.Object
obj.SetContainerID(prm.Container)
Expand Down Expand Up @@ -688,12 +683,11 @@ func (x *AuthmateNeoFS) ReadObjectPayload(ctx context.Context, addr oid.Address)
// CreateObject implements authmate.NeoFS interface method.
func (x *AuthmateNeoFS) CreateObject(ctx context.Context, prm tokens.PrmObjectCreate) (oid.ID, error) {
return x.neoFS.CreateObject(ctx, layer.PrmObjectCreate{
Creator: prm.Creator,
Container: prm.Container,
Filepath: prm.Filepath,
Attributes: [][2]string{
{object.AttributeExpirationEpoch, strconv.FormatUint(prm.ExpirationEpoch, 10)}},
Payload: bytes.NewReader(prm.Payload),
Creator: prm.Creator,
Container: prm.Container,
Filepath: prm.Filepath,
Attributes: map[string]string{object.AttributeExpirationEpoch: strconv.FormatUint(prm.ExpirationEpoch, 10)},
Payload: bytes.NewReader(prm.Payload),
})
}

Expand Down

0 comments on commit f6b8e54

Please sign in to comment.