Skip to content

Commit

Permalink
s3: add support for passing object metadata
Browse files Browse the repository at this point in the history
Signed-off-by: Janusz Marcinkiewicz <[email protected]>
  • Loading branch information
VirrageS committed Jan 14, 2025
1 parent ce849f3 commit c429346
Show file tree
Hide file tree
Showing 9 changed files with 205 additions and 18 deletions.
17 changes: 16 additions & 1 deletion ais/backend/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,9 @@ func _getCustom(lom *core.LOM, obj *s3.GetObjectOutput) (md5 *cos.Cksum) {
md5 = cos.NewCksum(cos.ChecksumMD5, v)
lom.SetCustomKey(cmn.MD5ObjMD, v)
}
for k, v := range h.EncodeMetadata(obj.Metadata) {
lom.SetCustomKey(k, v)
}
mtime := *(obj.LastModified)
lom.SetCustomKey(cmn.LastModified, fmtTime(mtime))
return
Expand Down Expand Up @@ -662,6 +665,11 @@ func (*s3bp) PutObj(r io.ReadCloser, lom *core.LOM, oreq *http.Request) (ecode i

md[cos.S3MetadataChecksumType] = cksumType
md[cos.S3MetadataChecksumVal] = cksumValue
if oreq != nil {
for k, v := range cmn.BackendHelpers.Amazon.DecodeMetadata(oreq.Header) {
md[k] = v
}
}

uploader = s3manager.NewUploader(svc)
uploadOutput, err = uploader.Upload(context.Background(), &s3.PutObjectInput{
Expand All @@ -677,7 +685,7 @@ func (*s3bp) PutObj(r io.ReadCloser, lom *core.LOM, oreq *http.Request) (ecode i
}

exit:
// compare with setCustomS3() above
// compare with _getCustom() above
if v, ok := h.EncodeVersion(uploadOutput.VersionID); ok {
lom.SetCustomKey(cmn.VersionObjMD, v)
lom.SetVersion(v)
Expand All @@ -688,6 +696,13 @@ exit:
if v, ok := h.EncodeCksum(uploadOutput.ETag); ok {
lom.SetCustomKey(cmn.MD5ObjMD, v)
}
if oreq != nil {
for header := range oreq.Header {
if strings.HasPrefix(header, aiss3.HeaderMetaPrefix) {
lom.SetCustomKey(header, oreq.Header.Get(header))
}
}
}
if cmn.Rom.FastV(5, cos.SmoduleBackend) {
nlog.Infoln(tag, lom.String())
}
Expand Down
10 changes: 8 additions & 2 deletions ais/backend/awsmpt.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,18 @@ func StartMpt(lom *core.LOM, oreq *http.Request, oq url.Values) (id string, ecod
}
}

var metadata map[string]string
if oreq != nil {
metadata = cmn.BackendHelpers.Amazon.DecodeMetadata(oreq.Header)
}

var (
cloudBck = lom.Bck().RemoteBck()
sessConf = sessConf{bck: cloudBck}
input = s3.CreateMultipartUploadInput{
Bucket: aws.String(cloudBck.Name),
Key: aws.String(lom.ObjName),
Bucket: aws.String(cloudBck.Name),
Key: aws.String(lom.ObjName),
Metadata: metadata,
}
)
svc, errN := sessConf.s3client("[start_mpt]")
Expand Down
8 changes: 7 additions & 1 deletion ais/s3/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
*/
package s3

import "github.com/NVIDIA/aistore/cmn"

const (
// AWS URL params
QparamVersioning = "versioning"
Expand All @@ -30,7 +32,11 @@ const (
QparamSignature = "Signature"
QparamXID = "x-id"

HeaderPrefix = "X-Amz-"
HeaderPrefix = "X-Amz-"

// https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingMetadata.html#UserMetadata
HeaderMetaPrefix = cmn.AwsHeaderMetaPrefix

HeaderCredentials = "X-Amz-Credential" //nolint:gosec // This is just a header name definition...

versioningEnabled = "Enabled"
Expand Down
30 changes: 21 additions & 9 deletions ais/s3/mpt.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ type (
Num int32 // part number (*)
}
mpt struct {
ctime time.Time // InitUpload time
bckName string
objName string
parts []*MptPart // by part number
ctime time.Time // InitUpload time
bckName string
objName string
parts []*MptPart // by part number
metadata map[string]string
}
uploads map[string]*mpt // by upload ID
)
Expand All @@ -43,16 +44,17 @@ var (
)

// Start miltipart upload
func InitUpload(id, bckName, objName string) {
func InitUpload(id, bckName, objName string, metadata map[string]string) {
mu.Lock()
if ups == nil {
ups = make(uploads, 8)
}
ups[id] = &mpt{
bckName: bckName,
objName: objName,
parts: make([]*MptPart, 0, iniCapParts),
ctime: time.Now(),
bckName: bckName,
objName: objName,
parts: make([]*MptPart, 0, iniCapParts),
ctime: time.Now(),
metadata: metadata,
}
mu.Unlock()
}
Expand Down Expand Up @@ -122,6 +124,16 @@ func ObjSize(id string) (size int64, err error) {
return
}

func GetUploadMetadata(id string) (metadata map[string]string) {
mu.RLock()
defer mu.RUnlock()
mpt, ok := ups[id]
if !ok {
return nil
}
return mpt.metadata
}

// remove all temp files and delete from the map
// if completed (i.e., not aborted): store xattr
func CleanupUpload(id, fqn string, aborted bool) (exists bool) {
Expand Down
6 changes: 6 additions & 0 deletions ais/s3/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/url"
"path"
"strconv"
"strings"
"time"

"github.com/NVIDIA/aistore/api/apc"
Expand Down Expand Up @@ -219,6 +220,11 @@ func SetS3Headers(hdr http.Header, lom *core.LOM) {
hdr.Set(cos.S3VersionHeader, v)
}
}
for k, v := range lom.GetCustomMD() {
if strings.HasPrefix(k, HeaderMetaPrefix) {
hdr.Set(k, v)
}
}
}

func (r *CopyObjectResult) MustMarshal(sgl *memsys.SGL) {
Expand Down
98 changes: 98 additions & 0 deletions ais/test/s3_compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"testing"
"time"

aiss3 "github.com/NVIDIA/aistore/ais/s3"
"github.com/NVIDIA/aistore/api"
"github.com/NVIDIA/aistore/api/apc"
"github.com/NVIDIA/aistore/api/env"
Expand Down Expand Up @@ -390,3 +391,100 @@ func TestS3ETag(t *testing.T) {
tassert.Errorf(t, *multipartOutput.ETag == *headOutput.ETag, "ETag for PUT does not match (local: %v != remote: %v)", *multipartOutput.ETag, *headOutput.ETag)
})
}

// export AWS_PROFILE=default; export AIS_ENDPOINT="http://localhost:8080"; export BUCKET="aws://..."; go test -v -run="TestS3ObjMetadata" -count=1 ./ais/test/.
func TestS3ObjMetadata(t *testing.T) {
tools.CheckSkip(t, &tools.SkipTestArgs{Long: true, Bck: cliBck, RequiredCloudProvider: apc.AWS})

var (
bck = cliBck
objName = "object.txt"
objSize = 50 * cos.KiB
)

_, err := api.HeadBucket(baseParams, bck, false)
tassert.CheckFatal(t, err)

cfg, err := config.LoadDefaultConfig(
context.Background(),
loadCredentials(t),
)
tassert.CheckFatal(t, err)
s3Client := s3.NewFromConfig(cfg)

metadata := map[string]string{
"User": "guest",
"name": "test",
}

t.Run("PutObject", func(t *testing.T) {
reader, err := readers.NewRand(int64(objSize), cos.ChecksumNone)
tassert.CheckFatal(t, err)
header := make(http.Header)
for k, v := range metadata {
header.Set(aiss3.HeaderMetaPrefix+k, v)
}
_, err = api.PutObject(&api.PutArgs{
BaseParams: baseParams,
Bck: bck,
ObjName: objName,
Reader: reader,
Header: header,
})
tassert.CheckFatal(t, err)

output, err := s3Client.HeadObject(context.Background(), &s3.HeadObjectInput{
Bucket: aws.String(bck.Name),
Key: aws.String(objName),
})
tassert.CheckFatal(t, err)

for k, v := range metadata {
tassert.Errorf(t, output.Metadata[strings.ToLower(k)] == v, `Metadata does not match (key: %q, local: %q != remote: %q)`, k, v, output.Metadata[k])
}
})

t.Run("PutObjectMultipart", func(t *testing.T) {
const (
objSize = 20 * cos.MiB
partSize = 5 * cos.MiB
)

reader, err := readers.NewRand(int64(objSize), cos.ChecksumNone)
tassert.CheckFatal(t, err)
cfg.HTTPClient = newS3Client(true /*pathStyle*/)
aisClient := s3.NewFromConfig(cfg)
uploader := s3manager.NewUploader(aisClient, func(uploader *s3manager.Uploader) {
uploader.PartSize = partSize
uploader.ClientOptions = []func(*s3.Options){
func(opts *s3.Options) {
opts.BaseEndpoint = aws.String(proxyURL)
},
}
}, func(uploader *s3manager.Uploader) {
uploader.ClientOptions = append(uploader.ClientOptions, func(options *s3.Options) {
options.APIOptions = append(options.APIOptions, func(stack *middleware.Stack) error {
return stack.Finalize.Add(&addGetBodyMiddleware{}, middleware.After)
})
})
})
_, err = uploader.Upload(context.Background(), &s3.PutObjectInput{
Bucket: aws.String(bck.Name),
Key: aws.String(objName),
Body: reader,
ContentLength: aws.Int64(objSize),
Metadata: metadata,
})
tassert.CheckFatal(t, err)

headOutput, err := s3Client.HeadObject(context.Background(), &s3.HeadObjectInput{
Bucket: aws.String(bck.Name),
Key: aws.String(objName),
})
tassert.CheckFatal(t, err)

for k, v := range metadata {
tassert.Errorf(t, headOutput.Metadata[strings.ToLower(k)] == v, `Metadata does not match (key: %q, local: %q != remote: %q)`, k, v, headOutput.Metadata[k])
}
})
}
8 changes: 7 additions & 1 deletion ais/tgts3mpt.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (t *target) startMpt(w http.ResponseWriter, r *http.Request, items []string
var (
objName = s3.ObjName(items)
lom = &core.LOM{ObjName: objName}
metadata map[string]string
uploadID string
ecode int
)
Expand All @@ -65,6 +66,7 @@ func (t *target) startMpt(w http.ResponseWriter, r *http.Request, items []string
return
}
if bck.IsRemoteS3() {
metadata = cmn.BackendHelpers.Amazon.DecodeMetadata(r.Header)
uploadID, ecode, err = backend.StartMpt(lom, r, q)
if err != nil {
s3.WriteErr(w, r, err, ecode)
Expand All @@ -74,7 +76,7 @@ func (t *target) startMpt(w http.ResponseWriter, r *http.Request, items []string
uploadID = cos.GenUUID()
}

s3.InitUpload(uploadID, bck.Name, objName)
s3.InitUpload(uploadID, bck.Name, objName, metadata)
result := &s3.InitiateMptUploadResult{Bucket: bck.Name, Key: objName, UploadID: uploadID}

sgl := t.gmm.NewSGL(0)
Expand Down Expand Up @@ -354,6 +356,10 @@ func (t *target) completeMpt(w http.ResponseWriter, r *http.Request, items []str
if version != "" {
lom.SetCustomKey(cmn.VersionObjMD, version)
}
metadata := s3.GetUploadMetadata(uploadID)
for k, v := range cmn.BackendHelpers.Amazon.EncodeMetadata(metadata) {
lom.SetCustomKey(k, v)
}
}
lom.SetCustomKey(cmn.ETag, etag)

Expand Down
3 changes: 3 additions & 0 deletions api/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ type (
// - we massively write a new content into a bucket, and/or
// - we simply don't care.
SkipVC bool

Header http.Header
}
)

Expand Down Expand Up @@ -301,6 +303,7 @@ func PutObject(args *PutArgs) (oah ObjAttrs, err error) {
reqArgs.Path = apc.URLPathObjects.Join(args.Bck.Name, args.ObjName)
reqArgs.Query = query
reqArgs.BodyR = args.Reader
reqArgs.Header = args.Header
}
resp, err = DoWithRetry(args.BaseParams.Client, args.put, reqArgs) //nolint:bodyclose // is closed inside
cmn.FreeHra(reqArgs)
Expand Down
Loading

0 comments on commit c429346

Please sign in to comment.