From b5217c23b5e7303e7f51cf6647c273009e5f1b36 Mon Sep 17 00:00:00 2001 From: Kirill Date: Wed, 9 Oct 2024 19:24:57 +0300 Subject: [PATCH] More S3 params (#63) * More s3 params * multipart parameters * rename to multipart_upload --- cmd/client/main.go | 16 ++++++++++--- pkg/message/put_message_v2.go | 2 ++ pkg/storage/s3storage.go | 43 +++++++++++++++++++++++++++-------- 3 files changed, 49 insertions(+), 12 deletions(-) diff --git a/cmd/client/main.go b/cmd/client/main.go index ae534e6..2015943 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -24,9 +24,11 @@ var ( decrypt bool /* Put command flags */ - encrypt bool - storageClass string - tableSpace string + encrypt bool + storageClass string + tableSpace string + multipartChunksize int64 + multipartUpload bool offset uint64 @@ -115,6 +117,14 @@ func putFunc(con net.Conn, instanceCnf *config.Instance, args []string) error { Name: message.TableSpaceSetting, Value: tableSpace, }, + { + Name: message.MultipartChunksize, + Value: fmt.Sprintf("%d", multipartChunksize), + }, + { + Name: message.MultipartUpload, + Value: fmt.Sprintf("%t", multipartUpload), + }, }).Encode() _, err := con.Write(msg) if err != nil { diff --git a/pkg/message/put_message_v2.go b/pkg/message/put_message_v2.go index 28fc247..b9ba21f 100644 --- a/pkg/message/put_message_v2.go +++ b/pkg/message/put_message_v2.go @@ -8,6 +8,8 @@ import ( const StorageClassSetting = "StorageClass" const TableSpaceSetting = "TableSpace" +const MultipartChunksize = "MultipartChunksize" +const MultipartUpload = "MultipartUpload" type PutMessageV2 struct { Encrypt bool diff --git a/pkg/storage/s3storage.go b/pkg/storage/s3storage.go index f1d4bd0..c2eae3c 100644 --- a/pkg/storage/s3storage.go +++ b/pkg/storage/s3storage.go @@ -1,10 +1,12 @@ package storage import ( + "bytes" "context" "fmt" "io" "path" + "strconv" "strings" "github.com/yezzey-gp/aws-sdk-go/aws" @@ -69,14 +71,23 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings [ objectPath := path.Join(s.cnf.StoragePrefix, name) + storageClass := ResolveStorageSetting(settings, message.StorageClassSetting, "STANDARD") + tableSpace := ResolveStorageSetting(settings, message.TableSpaceSetting, tablespace.DefaultTableSpace) + multipartChunksizeStr := ResolveStorageSetting(settings, message.MultipartChunksize, "") + multipartChunksize, err := strconv.ParseInt(multipartChunksizeStr, 10, 64) + if err != nil { + return err + } + multipartUpload, err := strconv.ParseBool(ResolveStorageSetting(settings, message.MultipartUpload, "1")) + if err != nil { + return err + } + up := s3manager.NewUploaderWithClient(sess, func(uploader *s3manager.Uploader) { - uploader.PartSize = int64(1 << 24) + uploader.PartSize = int64(multipartChunksize) uploader.Concurrency = 1 }) - storageClass := ResolveStorageSetting(settings, message.StorageClassSetting, "STANDARD") - tableSpace := ResolveStorageSetting(settings, message.TableSpaceSetting, tablespace.DefaultTableSpace) - bucket, ok := s.bucketMap[tableSpace] if !ok { err := fmt.Errorf("failed to match tablespace %s to s3 bucket.", tableSpace) @@ -84,14 +95,28 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings [ return err } - _, err = up.Upload( - &s3manager.UploadInput{ + if multipartUpload { + _, err = up.Upload( + &s3manager.UploadInput{ + Bucket: aws.String(bucket), + Key: aws.String(objectPath), + Body: r, + StorageClass: aws.String(storageClass), + }, + ) + } else { + var body []byte + body, err = io.ReadAll(r) + if err != nil { + return err + } + _, err = sess.PutObject(&s3.PutObjectInput{ Bucket: aws.String(bucket), Key: aws.String(objectPath), - Body: r, + Body: bytes.NewReader(body), StorageClass: aws.String(storageClass), - }, - ) + }) + } return err }