From 333e29579d5be5f1e945aefcba9abc236ce2e9de Mon Sep 17 00:00:00 2001 From: reshke Date: Sun, 14 Jan 2024 13:46:49 +0000 Subject: [PATCH] Patch object message defitions --- pkg/message/message.go | 1 + pkg/message/message_test.go | 36 +++++++++++++++++-- pkg/message/patch_message.go | 70 ++++++++++++++++++++++++++++++++++++ pkg/storage/storage.go | 35 ++++++++++++++++-- 4 files changed, 137 insertions(+), 5 deletions(-) create mode 100644 pkg/message/patch_message.go diff --git a/pkg/message/message.go b/pkg/message/message.go index 4f4a4ac..11513d6 100644 --- a/pkg/message/message.go +++ b/pkg/message/message.go @@ -18,6 +18,7 @@ const ( MessageTypeDelete = MessageType(47) MessageTypeList = MessageType(48) MessageTypeObjectMeta = MessageType(49) + MessageTypePatch = MessageType(50) DecryptMessage = RequestEncryption(1) NoDecryptMessage = RequestEncryption(0) diff --git a/pkg/message/message_test.go b/pkg/message/message_test.go index 8ae1fb7..14549e1 100644 --- a/pkg/message/message_test.go +++ b/pkg/message/message_test.go @@ -57,12 +57,44 @@ func TestPutMsg(t *testing.T) { msg := message.NewPutMessage(tt.name, tt.encrypt) body := msg.Encode() - msg2 := message.CatMessage{} + msg2 := message.PutMessage{} + + msg2.Decode(body[8:]) + + assert.Equal(msg.Name, msg2.Name) + assert.Equal(msg.Encrypt, msg2.Encrypt) + } +} + +func TestPatchMsg(t *testing.T) { + assert := assert.New(t) + + type tcase struct { + name string + encrypt bool + off uint64 + err error + } + + for _, tt := range []tcase{ + { + "nam1", + true, + 1235, + nil, + }, + } { + + msg := message.NewPatchMessage(tt.name, tt.off, tt.encrypt) + body := msg.Encode() + + msg2 := message.PatchMessage{} msg2.Decode(body[8:]) assert.Equal(msg.Name, msg2.Name) - assert.Equal(msg.Encrypt, msg2.Decrypt) + assert.Equal(msg.Encrypt, msg2.Encrypt) + assert.Equal(msg.Offset, msg2.Offset) } } diff --git a/pkg/message/patch_message.go b/pkg/message/patch_message.go new file mode 100644 index 0000000..2d5d6ea --- /dev/null +++ b/pkg/message/patch_message.go @@ -0,0 +1,70 @@ +package message + +import ( + "bytes" + "encoding/binary" +) + +type PatchMessage struct { + Encrypt bool + Offset uint64 + Name string +} + +var _ ProtoMessage = &PatchMessage{} + +func NewPatchMessage(name string, offset uint64, encrypt bool) *PatchMessage { + return &PatchMessage{ + Encrypt: encrypt, + Offset: offset, + Name: name, + } +} + +func (c *PatchMessage) Encode() []byte { + bt := []byte{ + byte(MessageTypePatch), + 0, + 0, + 0, + } + + if c.Encrypt { + bt[1] = byte(EncryptMessage) + } else { + bt[1] = byte(NoEncryptMessage) + } + offset := make([]byte, 8) + binary.BigEndian.PutUint64(offset, uint64(c.Offset)) + + bt = append(bt, offset...) + bt = append(bt, []byte(c.Name)...) + bt = append(bt, 0) + ln := len(bt) + 8 + + bs := make([]byte, 8) + binary.BigEndian.PutUint64(bs, uint64(ln)) + return append(bs, bt...) +} + +func (c *PatchMessage) GetPatchName(b []byte) string { + buff := bytes.NewBufferString("") + + for i := 0; i < len(b); i++ { + if b[i] == 0 { + break + } + buff.WriteByte(b[i]) + } + + return buff.String() +} + +func (c *PatchMessage) Decode(body []byte) { + if body[1] == byte(EncryptMessage) { + c.Encrypt = true + } + + c.Offset = binary.BigEndian.Uint64(body[4:12]) + c.Name = c.GetPatchName(body[12:]) +} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 6df6818..1824609 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -2,6 +2,7 @@ package storage import ( "context" + "fmt" "io" "path" @@ -19,16 +20,21 @@ type StorageReader interface { type StorageWriter interface { PutFileToDest(name string, r io.Reader) error + PatchFile(name string, r io.ReadSeeker, startOffste int64) error +} + +type StorageLister interface { + ListPath(prefix string) ([]*S3ObjectMeta, error) } type StorageInteractor interface { StorageReader StorageWriter + StorageLister } type S3StorageInteractor struct { - StorageReader - StorageWriter + StorageInteractor pool SessionPool cnf *config.Storage @@ -78,7 +84,6 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader) error { _, err = up.Upload( &s3manager.UploadInput{ - Bucket: aws.String(s.cnf.StorageBucket), Key: aws.String(objectPath), Body: r, @@ -89,6 +94,30 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader) error { return err } +func (s *S3StorageInteractor) PatchFile(name string, r io.ReadSeeker, startOffste int64) error { + sess, err := s.pool.GetSession(context.TODO()) + if err != nil { + ylogger.Zero.Err(err).Msg("failed to acquire s3 session") + return nil + } + + objectPath := path.Join(s.cnf.StoragePrefix, name) + + input := &s3.PatchObjectInput{ + Bucket: &s.cnf.StorageBucket, + Key: aws.String(objectPath), + Body: r, + ContentRange: aws.String(fmt.Sprintf("bytes %d-18446744073709551615", startOffste)), + } + + _, err = sess.PatchObject(input) + + ylogger.Zero.Debug().Str("key", objectPath).Str("bucket", + s.cnf.StorageBucket).Msg("modifying file in external storage") + + return err +} + type S3ObjectMeta struct { Path string Size int64