Skip to content

Commit

Permalink
Dont use tree for tags (#1066)
Browse files Browse the repository at this point in the history
  • Loading branch information
roman-khimov authored Jan 31, 2025
2 parents 7141b84 + d833f8a commit 83c6195
Show file tree
Hide file tree
Showing 15 changed files with 279 additions and 633 deletions.
32 changes: 29 additions & 3 deletions api/handler/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,27 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
return
}

settingsSrc, err := h.obj.GetBucketSettings(r.Context(), srcObjPrm.BktInfo)
if err != nil {
h.logAndSendError(w, "could not get bucket settings", reqInfo, err)
return
}

if settingsSrc.VersioningEnabled() && srcObjPrm.VersionID == "" {
headObjectPrm := &layer.HeadObjectParams{
BktInfo: srcObjPrm.BktInfo,
Object: reqInfo.ObjectName,
}

ei, err := h.obj.GetExtendedObjectInfo(r.Context(), headObjectPrm)
if err != nil {
h.logAndSendError(w, "could not find object", reqInfo, err)
return
}

srcObjPrm.VersionID = ei.ObjectInfo.VersionID()
}

dstBktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName)
if err != nil {
h.logAndSendError(w, "couldn't get target bucket", reqInfo, err)
Expand Down Expand Up @@ -157,6 +178,10 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
NodeVersion: extendedSrcObjInfo.NodeVersion,
}

if !settingsSrc.VersioningEnabled() {
tagPrm.ObjectVersion.VersionID = ""
}

_, tagSet, err = h.obj.GetObjectTagging(r.Context(), tagPrm)
if err != nil {
h.logAndSendError(w, "could not get object tagging", reqInfo, err)
Expand Down Expand Up @@ -252,10 +277,11 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
ObjectName: reqInfo.ObjectName,
VersionID: dstObjInfo.VersionID(),
},
TagSet: tagSet,
NodeVersion: extendedDstObjInfo.NodeVersion,
TagSet: tagSet,
NodeVersion: extendedDstObjInfo.NodeVersion,
CopiesNumber: h.cfg.CopiesNumber,
}
if _, err = h.obj.PutObjectTagging(r.Context(), tagPrm); err != nil {
if err = h.obj.PutObjectTagging(r.Context(), tagPrm); err != nil {
h.logAndSendError(w, "could not upload object tagging", reqInfo, err)
return
}
Expand Down
16 changes: 10 additions & 6 deletions api/handler/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,16 @@ func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
VersionID: info.VersionID(),
}

bktSettings, err := h.obj.GetBucketSettings(r.Context(), bktInfo)
if err != nil {
h.logAndSendError(w, "could not get bucket settings", reqInfo, err)
return
}

if !bktSettings.VersioningEnabled() {
t.VersionID = ""
}

tagSet, lockInfo, err := h.obj.GetObjectTaggingAndLock(r.Context(), t, extendedInfo.NodeVersion)
if err != nil && !s3errors.IsS3Error(err, s3errors.ErrNoSuchKey) {
h.logAndSendError(w, "could not get object meta data", reqInfo, err)
Expand Down Expand Up @@ -103,12 +113,6 @@ func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
return
}

bktSettings, err := h.obj.GetBucketSettings(r.Context(), bktInfo)
if err != nil {
h.logAndSendError(w, "could not get bucket settings", reqInfo, err)
return
}

writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned())
w.WriteHeader(http.StatusOK)
}
Expand Down
7 changes: 4 additions & 3 deletions api/handler/multipart_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,10 +433,11 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
ObjectName: objInfo.Name,
VersionID: objInfo.VersionID(),
},
TagSet: uploadData.TagSet,
NodeVersion: extendedObjInfo.NodeVersion,
TagSet: uploadData.TagSet,
NodeVersion: extendedObjInfo.NodeVersion,
CopiesNumber: h.cfg.CopiesNumber,
}
if _, err = h.obj.PutObjectTagging(r.Context(), tagPrm); err != nil {
if err = h.obj.PutObjectTagging(r.Context(), tagPrm); err != nil {
h.logAndSendError(w, "could not put tagging file of completed multipart upload", reqInfo, err, additional...)
return
}
Expand Down
17 changes: 12 additions & 5 deletions api/handler/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,16 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
ObjectName: objInfo.Name,
VersionID: objInfo.VersionID(),
},
TagSet: tagSet,
NodeVersion: extendedObjInfo.NodeVersion,
TagSet: tagSet,
NodeVersion: extendedObjInfo.NodeVersion,
CopiesNumber: h.cfg.CopiesNumber,
}
if _, err = h.obj.PutObjectTagging(r.Context(), tagPrm); err != nil {

if !settings.VersioningEnabled() {
tagPrm.ObjectVersion.VersionID = ""
}

if err = h.obj.PutObjectTagging(r.Context(), tagPrm); err != nil {
h.logAndSendError(w, "could not upload object tagging", reqInfo, err)
return
}
Expand Down Expand Up @@ -540,10 +546,11 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
ObjectName: objInfo.Name,
VersionID: objInfo.VersionID(),
},
NodeVersion: extendedObjInfo.NodeVersion,
NodeVersion: extendedObjInfo.NodeVersion,
CopiesNumber: h.cfg.CopiesNumber,
}

if _, err = h.obj.PutObjectTagging(r.Context(), tagPrm); err != nil {
if err = h.obj.PutObjectTagging(r.Context(), tagPrm); err != nil {
h.logAndSendError(w, "could not upload object tagging", reqInfo, err)
return
}
Expand Down
72 changes: 61 additions & 11 deletions api/handler/tagging.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,39 @@ func (h *handler) PutObjectTaggingHandler(w http.ResponseWriter, r *http.Request
ObjectName: reqInfo.ObjectName,
VersionID: reqInfo.URL.Query().Get(api.QueryVersionID),
},
TagSet: tagSet,
TagSet: tagSet,
CopiesNumber: h.cfg.CopiesNumber,
}
nodeVersion, err := h.obj.PutObjectTagging(r.Context(), tagPrm)
settings, err := h.obj.GetBucketSettings(r.Context(), bktInfo)
if err != nil {
h.logAndSendError(w, "could not get bucket settings", reqInfo, err)
return
}

if settings.VersioningEnabled() && tagPrm.ObjectVersion.VersionID == "" {
headObjectPrm := &layer.HeadObjectParams{
BktInfo: bktInfo,
Object: reqInfo.ObjectName,
}

ei, err := h.obj.GetExtendedObjectInfo(r.Context(), headObjectPrm)
if err != nil {
h.logAndSendError(w, "could not find object", reqInfo, err)
return
}

tagPrm.ObjectVersion.VersionID = ei.ObjectInfo.VersionID()
}

if err = h.obj.PutObjectTagging(r.Context(), tagPrm); err != nil {
h.logAndSendError(w, "could not put object tagging", reqInfo, err)
return
}

s := &SendNotificationParams{
Event: EventObjectTaggingPut,
NotificationInfo: &data.NotificationInfo{
Name: nodeVersion.FilePath,
Size: nodeVersion.Size,
Version: nodeVersion.OID.EncodeToString(),
HashSum: nodeVersion.ETag,
Name: reqInfo.ObjectName,
},
BktInfo: bktInfo,
ReqInfo: reqInfo,
Expand Down Expand Up @@ -94,6 +112,21 @@ func (h *handler) GetObjectTaggingHandler(w http.ResponseWriter, r *http.Request
},
}

if settings.VersioningEnabled() && tagPrm.ObjectVersion.VersionID == "" {
headObjectPrm := &layer.HeadObjectParams{
BktInfo: bktInfo,
Object: reqInfo.ObjectName,
}

ei, err := h.obj.GetExtendedObjectInfo(r.Context(), headObjectPrm)
if err != nil {
h.logAndSendError(w, "could not find object", reqInfo, err)
return
}

tagPrm.ObjectVersion.VersionID = ei.ObjectInfo.VersionID()
}

versionID, tagSet, err := h.obj.GetObjectTagging(r.Context(), tagPrm)
if err != nil {
h.logAndSendError(w, "could not get object tagging", reqInfo, err)
Expand Down Expand Up @@ -123,19 +156,36 @@ func (h *handler) DeleteObjectTaggingHandler(w http.ResponseWriter, r *http.Requ
VersionID: reqInfo.URL.Query().Get(api.QueryVersionID),
}

nodeVersion, err := h.obj.DeleteObjectTagging(r.Context(), p)
settings, err := h.obj.GetBucketSettings(r.Context(), bktInfo)
if err != nil {
h.logAndSendError(w, "could not get bucket settings", reqInfo, err)
return
}

if settings.VersioningEnabled() && p.VersionID == "" {
headObjectPrm := &layer.HeadObjectParams{
BktInfo: bktInfo,
Object: reqInfo.ObjectName,
}

ei, err := h.obj.GetExtendedObjectInfo(r.Context(), headObjectPrm)
if err != nil {
h.logAndSendError(w, "could not find object", reqInfo, err)
return
}

p.VersionID = ei.ObjectInfo.VersionID()
}

if err = h.obj.DeleteObjectTagging(r.Context(), p); err != nil {
h.logAndSendError(w, "could not delete object tagging", reqInfo, err)
return
}

s := &SendNotificationParams{
Event: EventObjectTaggingDelete,
NotificationInfo: &data.NotificationInfo{
Name: nodeVersion.FilePath,
Size: nodeVersion.Size,
Version: nodeVersion.OID.EncodeToString(),
HashSum: nodeVersion.ETag,
Name: reqInfo.ObjectName,
},
BktInfo: bktInfo,
ReqInfo: reqInfo,
Expand Down
9 changes: 3 additions & 6 deletions api/layer/compound.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,11 @@ func (n *layer) GetObjectTaggingAndLock(ctx context.Context, objVersion *ObjectV
return tags, lockInfo, nil
}

if nodeVersion == nil {
nodeVersion, err = n.getNodeVersion(ctx, objVersion)
if err != nil {
return nil, nil, err
}
var p = GetObjectTaggingParams{
ObjectVersion: objVersion,
}

tags, err = n.treeService.GetObjectTagging(ctx, objVersion.BktInfo, nodeVersion)
_, tags, err = n.GetObjectTagging(ctx, &p)
if err != nil {
if errorsStd.Is(err, ErrNodeNotFound) {
return nil, nil, s3errors.GetAPIError(s3errors.ErrNoSuchKey)
Expand Down
4 changes: 2 additions & 2 deletions api/layer/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ type (
DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error

GetObjectTagging(ctx context.Context, p *GetObjectTaggingParams) (string, map[string]string, error)
PutObjectTagging(ctx context.Context, p *PutObjectTaggingParams) (*data.NodeVersion, error)
DeleteObjectTagging(ctx context.Context, p *ObjectVersion) (*data.NodeVersion, error)
PutObjectTagging(ctx context.Context, p *PutObjectTaggingParams) error
DeleteObjectTagging(ctx context.Context, p *ObjectVersion) error

PutObject(ctx context.Context, p *PutObjectParams) (*data.ExtendedObjectInfo, error)

Expand Down
4 changes: 0 additions & 4 deletions api/layer/multipart_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -1027,10 +1027,6 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
IsUnversioned: !bktSettings.VersioningEnabled(),
}

if newVersion.ID, err = n.treeService.AddVersion(ctx, p.Info.Bkt, newVersion); err != nil {
return nil, nil, fmt.Errorf("couldn't add multipart new verion to tree service: %w", err)
}

n.cache.CleanListCacheEntriesContainingObject(p.Info.Key, p.Info.Bkt.CID)

objInfo := &data.ObjectInfo{
Expand Down
64 changes: 33 additions & 31 deletions api/layer/neofs_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,40 +491,42 @@ func (t *TestNeoFS) SearchObjects(_ context.Context, prm PrmObjectSearch) ([]oid
}

for _, obj := range t.objects {
var isOk = true

for _, attr := range obj.Attributes() {
for _, f := range prm.Filters {
if attr.Key() == f.Header() {
switch f.Operation() {
case object.MatchStringEqual:
if f.Header() == "$Object:objectType" {
isOk = isOk && obj.Type().String() == f.Value()
} else {
isOk = isOk && attr.Value() == f.Value()
}
case object.MatchStringNotEqual:
isOk = isOk && attr.Value() != f.Value()
case object.MatchCommonPrefix:
isOk = isOk && strings.HasPrefix(attr.Value(), f.Value())
case object.MatchNotPresent:
isOk = false
default:
isOk = false
}
}

if f.Header() == "$Object:objectType" {
isOk = isOk && obj.Type().String() == f.Value()
}
}
}

// all filters are valid for obj.
if isOk {
if checkFilters(obj, prm.Filters) {
oids = append(oids, obj.GetID())
}
}

return oids, nil
}

func checkFilters(obj *object.Object, filters object.SearchFilters) bool {
var isOk = true

attrs := make(map[string]string, len(obj.Attributes()))
for _, attr := range obj.Attributes() {
attrs[attr.Key()] = attr.Value()
}

for _, f := range filters {
val, present := attrs[f.Header()]

switch f.Operation() {
case object.MatchStringEqual:
if f.Header() == "$Object:objectType" {
isOk = isOk && obj.Type().String() == f.Value()
} else {
isOk = isOk && val == f.Value()
}
case object.MatchStringNotEqual:
isOk = isOk && val != f.Value()
case object.MatchCommonPrefix:
isOk = isOk && strings.HasPrefix(val, f.Value())
case object.MatchNotPresent:
isOk = !present
default:
isOk = false
}
}

return isOk
}
Loading

0 comments on commit 83c6195

Please sign in to comment.