From a310d9545838529892563cbd9fa9533699d57e3e Mon Sep 17 00:00:00 2001 From: reshke kirill Date: Mon, 26 Aug 2024 12:21:04 +0000 Subject: [PATCH 1/6] Support cat msg v2 --- pkg/message/cat_message_v2.go | 97 +++++++++++++++++++++++++++++++++++ pkg/message/message_test.go | 64 ++++++++++++++++++++++- pkg/message/put_message_v2.go | 35 +++---------- pkg/message/utils.go | 18 +++++++ pkg/mock/storage.go | 6 +-- pkg/proc/interaction.go | 3 +- pkg/settings/settings.go | 6 +++ pkg/storage/filestorage.go | 4 +- pkg/storage/s3storage.go | 3 +- pkg/storage/storage.go | 4 +- pkg/storage/utils.go | 4 +- pkg/storage/utils_test.go | 8 +-- 12 files changed, 208 insertions(+), 44 deletions(-) create mode 100644 pkg/message/cat_message_v2.go create mode 100644 pkg/message/utils.go create mode 100644 pkg/settings/settings.go diff --git a/pkg/message/cat_message_v2.go b/pkg/message/cat_message_v2.go new file mode 100644 index 0000000..1018459 --- /dev/null +++ b/pkg/message/cat_message_v2.go @@ -0,0 +1,97 @@ +package message + +import ( + "encoding/binary" + + "github.com/yezzey-gp/yproxy/pkg/settings" +) + +type CatMessageV2 struct { + Decrypt bool + Name string + StartOffset uint64 + + Settings []settings.StorageSettings +} + +var _ ProtoMessage = &CatMessage{} + +func NewCatMessageV2(name string, decrypt bool, StartOffset uint64) *CatMessageV2 { + return &CatMessageV2{ + Name: name, + Decrypt: decrypt, + StartOffset: StartOffset, + } +} + +func (c *CatMessageV2) Encode() []byte { + bt := []byte{ + byte(MessageTypeCat), + 0, + 0, + 0, + } + + if c.Decrypt { + bt[1] = byte(DecryptMessage) + } else { + bt[1] = byte(NoDecryptMessage) + } + + if c.StartOffset != 0 { + bt[2] = byte(ExtendedMesssage) + } + + bt = append(bt, []byte(c.Name)...) + bt = append(bt, 0) + if c.StartOffset != 0 { + bt = binary.BigEndian.AppendUint64(bt, c.StartOffset) + } + + slen := make([]byte, 8) + binary.BigEndian.PutUint64(slen, uint64(len(c.Settings))) + bt = append(bt, slen...) + + for _, s := range c.Settings { + + bt = append(bt, []byte(s.Name)...) + bt = append(bt, 0) + + bt = append(bt, []byte(s.Value)...) + bt = append(bt, 0) + } + + ln := len(bt) + 8 + + bs := make([]byte, 8) + binary.BigEndian.PutUint64(bs, uint64(ln)) + return append(bs, bt...) +} + +func (c *CatMessageV2) Decode(body []byte) { + var off uint64 + c.Name, off = GetCstring(body[4:]) + if body[1] == byte(DecryptMessage) { + c.Decrypt = true + } + if body[2] == byte(ExtendedMesssage) { + c.StartOffset = binary.BigEndian.Uint64(body[4+len(c.Name)+1:]) + } + + settLen := binary.BigEndian.Uint64(body[4+off : 4+off+8]) + + totalOff := 4 + off + 8 + + c.Settings = make([]settings.StorageSettings, settLen) + + for i := 0; i < int(settLen); i++ { + + var currOff uint64 + + c.Settings[i].Name, currOff = GetCstring(body[totalOff:]) + totalOff += currOff + + c.Settings[i].Value, currOff = GetCstring(body[totalOff:]) + totalOff += currOff + } +} diff --git a/pkg/message/message_test.go b/pkg/message/message_test.go index 287972e..35898e0 100644 --- a/pkg/message/message_test.go +++ b/pkg/message/message_test.go @@ -6,6 +6,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/yezzey-gp/yproxy/pkg/message" + "github.com/yezzey-gp/yproxy/pkg/settings" ) func TestCatMsg(t *testing.T) { @@ -81,7 +82,7 @@ func TestPutV2Msg(t *testing.T) { name string encrypt bool err error - settings []message.PutSettings + settings []settings.StorageSettings } for _, tt := range []tcase{ @@ -89,7 +90,7 @@ func TestPutV2Msg(t *testing.T) { "nam1", true, nil, - []message.PutSettings{ + []settings.StorageSettings{ { Name: "a", Value: "b", @@ -114,6 +115,65 @@ func TestPutV2Msg(t *testing.T) { } } +func TestCatMsgV2(t *testing.T) { + assert := assert.New(t) + + type tcase struct { + name string + decrypt bool + off uint64 + + settings []settings.StorageSettings + err error + } + + for _, tt := range []tcase{ + { + "nam1", + true, + 0, + []settings.StorageSettings{ + { + Name: "a", + Value: "b", + }, + { + Name: "cdsdsd", + Value: "ds", + }, + }, + nil, + }, + { + "nam1", + true, + 10, + []settings.StorageSettings{ + { + Name: "a", + Value: "b", + }, + { + Name: "cdsdsd", + Value: "ds", + }, + }, + nil, + }, + } { + + msg := message.NewCatMessage(tt.name, tt.decrypt, tt.off) + body := msg.Encode() + + msg2 := message.CatMessage{} + + msg2.Decode(body[8:]) + + assert.Equal(msg.Name, msg2.Name) + assert.Equal(msg.Decrypt, msg2.Decrypt) + } +} + func TestPatchMsg(t *testing.T) { assert := assert.New(t) diff --git a/pkg/message/put_message_v2.go b/pkg/message/put_message_v2.go index 1bc4186..28fc247 100644 --- a/pkg/message/put_message_v2.go +++ b/pkg/message/put_message_v2.go @@ -1,28 +1,24 @@ package message import ( - "bytes" "encoding/binary" + + "github.com/yezzey-gp/yproxy/pkg/settings" ) const StorageClassSetting = "StorageClass" const TableSpaceSetting = "TableSpace" -type PutSettings struct { - Name string - Value string -} - type PutMessageV2 struct { Encrypt bool Name string - Settings []PutSettings + Settings []settings.StorageSettings } var _ ProtoMessage = &PutMessageV2{} -func NewPutMessageV2(name string, encrypt bool, settings []PutSettings) *PutMessageV2 { +func NewPutMessageV2(name string, encrypt bool, settings []settings.StorageSettings) *PutMessageV2 { return &PutMessageV2{ Name: name, Encrypt: encrypt, @@ -67,42 +63,27 @@ func (c *PutMessageV2) Encode() []byte { return append(bs, bt...) } -func (c *PutMessageV2) GetCstring(b []byte) (string, uint64) { - offset := uint64(0) - buff := bytes.NewBufferString("") - - for i := 0; i < len(b); i++ { - offset++ - if b[i] == 0 { - break - } - buff.WriteByte(b[i]) - } - - return buff.String(), offset -} - func (c *PutMessageV2) Decode(body []byte) { if body[1] == byte(EncryptMessage) { c.Encrypt = true } var off uint64 - c.Name, off = c.GetCstring(body[4:]) + c.Name, off = GetCstring(body[4:]) settLen := binary.BigEndian.Uint64(body[4+off : 4+off+8]) totalOff := 4 + off + 8 - c.Settings = make([]PutSettings, settLen) + c.Settings = make([]settings.StorageSettings, settLen) for i := 0; i < int(settLen); i++ { var currOff uint64 - c.Settings[i].Name, currOff = c.GetCstring(body[totalOff:]) + c.Settings[i].Name, currOff = GetCstring(body[totalOff:]) totalOff += currOff - c.Settings[i].Value, currOff = c.GetCstring(body[totalOff:]) + c.Settings[i].Value, currOff = GetCstring(body[totalOff:]) totalOff += currOff } } diff --git a/pkg/message/utils.go b/pkg/message/utils.go new file mode 100644 index 0000000..05431a5 --- /dev/null +++ b/pkg/message/utils.go @@ -0,0 +1,18 @@ +package message + +import "bytes" + +func GetCstring(b []byte) (string, uint64) { + offset := uint64(0) + buff := bytes.NewBufferString("") + + for i := 0; i < len(b); i++ { + offset++ + if b[i] == 0 { + break + } + buff.WriteByte(b[i]) + } + + return buff.String(), offset +} diff --git a/pkg/mock/storage.go b/pkg/mock/storage.go index 8048012..47902f3 100644 --- a/pkg/mock/storage.go +++ b/pkg/mock/storage.go @@ -13,8 +13,8 @@ import ( io "io" reflect "reflect" - message "github.com/yezzey-gp/yproxy/pkg/message" object "github.com/yezzey-gp/yproxy/pkg/object" + settings "github.com/yezzey-gp/yproxy/pkg/settings" gomock "go.uber.org/mock/gomock" ) @@ -94,7 +94,7 @@ func (mr *MockStorageWriterMockRecorder) PatchFile(name, r, startOffset any) *go } // PutFileToDest mocks base method. -func (m *MockStorageWriter) PutFileToDest(name string, r io.Reader, settings []message.PutSettings) error { +func (m *MockStorageWriter) PutFileToDest(name string, r io.Reader, settings []settings.StorageSettings) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "PutFileToDest", name, r, settings) ret0, _ := ret[0].(error) @@ -292,7 +292,7 @@ func (mr *MockStorageInteractorMockRecorder) PatchFile(name, r, startOffset any) } // PutFileToDest mocks base method. -func (m *MockStorageInteractor) PutFileToDest(name string, r io.Reader, settings []message.PutSettings) error { +func (m *MockStorageInteractor) PutFileToDest(name string, r io.Reader, settings []settings.StorageSettings) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "PutFileToDest", name, r, settings) ret0, _ := ret[0].(error) diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index 591ec49..a733054 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -13,6 +13,7 @@ import ( "github.com/yezzey-gp/yproxy/pkg/database" "github.com/yezzey-gp/yproxy/pkg/message" "github.com/yezzey-gp/yproxy/pkg/object" + "github.com/yezzey-gp/yproxy/pkg/settings" "github.com/yezzey-gp/yproxy/pkg/storage" "github.com/yezzey-gp/yproxy/pkg/ylogger" ) @@ -21,7 +22,7 @@ func ProcessPutExtended( s storage.StorageInteractor, pr *ProtoReader, name string, - encrypt bool, settings []message.PutSettings, cr crypt.Crypter, ycl client.YproxyClient) error { + encrypt bool, settings []settings.StorageSettings, cr crypt.Crypter, ycl client.YproxyClient) error { ycl.SetExternalFilePath(name) diff --git a/pkg/settings/settings.go b/pkg/settings/settings.go new file mode 100644 index 0000000..16b29da --- /dev/null +++ b/pkg/settings/settings.go @@ -0,0 +1,6 @@ +package settings + +type StorageSettings struct { + Name string + Value string +} diff --git a/pkg/storage/filestorage.go b/pkg/storage/filestorage.go index 3e1da2d..4f64436 100644 --- a/pkg/storage/filestorage.go +++ b/pkg/storage/filestorage.go @@ -9,8 +9,8 @@ import ( "path/filepath" "github.com/yezzey-gp/yproxy/config" - "github.com/yezzey-gp/yproxy/pkg/message" "github.com/yezzey-gp/yproxy/pkg/object" + "github.com/yezzey-gp/yproxy/pkg/settings" ) // Storage prefix uses as path to folder. @@ -51,7 +51,7 @@ func (s *FileStorageInteractor) ListPath(prefix string) ([]*object.ObjectInfo, e return data, err } -func (s *FileStorageInteractor) PutFileToDest(name string, r io.Reader, _ []message.PutSettings) error { +func (s *FileStorageInteractor) PutFileToDest(name string, r io.Reader, _ []settings.StorageSettings) error { fPath := path.Join(s.cnf.StoragePrefix, name) fDir := path.Dir(fPath) os.MkdirAll(fDir, 0700) diff --git a/pkg/storage/s3storage.go b/pkg/storage/s3storage.go index 38d3196..3c472f1 100644 --- a/pkg/storage/s3storage.go +++ b/pkg/storage/s3storage.go @@ -13,6 +13,7 @@ import ( "github.com/yezzey-gp/yproxy/config" "github.com/yezzey-gp/yproxy/pkg/message" "github.com/yezzey-gp/yproxy/pkg/object" + "github.com/yezzey-gp/yproxy/pkg/settings" "github.com/yezzey-gp/yproxy/pkg/tablespace" "github.com/yezzey-gp/yproxy/pkg/ylogger" ) @@ -49,7 +50,7 @@ func (s *S3StorageInteractor) CatFileFromStorage(name string, offset int64) (io. return object.Body, err } -func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings []message.PutSettings) error { +func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings []settings.StorageSettings) error { sess, err := s.pool.GetSession(context.TODO()) if err != nil { ylogger.Zero.Err(err).Msg("failed to acquire s3 session") diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 86270b4..bb5cd0d 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -5,8 +5,8 @@ import ( "io" "github.com/yezzey-gp/yproxy/config" - "github.com/yezzey-gp/yproxy/pkg/message" "github.com/yezzey-gp/yproxy/pkg/object" + "github.com/yezzey-gp/yproxy/pkg/settings" "github.com/yezzey-gp/yproxy/pkg/tablespace" ) @@ -15,7 +15,7 @@ type StorageReader interface { } type StorageWriter interface { - PutFileToDest(name string, r io.Reader, settings []message.PutSettings) error + PutFileToDest(name string, r io.Reader, settings []settings.StorageSettings) error PatchFile(name string, r io.ReadSeeker, startOffset int64) error } diff --git a/pkg/storage/utils.go b/pkg/storage/utils.go index 80dd1b8..961ac15 100644 --- a/pkg/storage/utils.go +++ b/pkg/storage/utils.go @@ -1,8 +1,8 @@ package storage -import "github.com/yezzey-gp/yproxy/pkg/message" +import "github.com/yezzey-gp/yproxy/pkg/settings" -func ResolveStorageSetting(settings []message.PutSettings, name, defaultVal string) string { +func ResolveStorageSetting(settings []settings.StorageSettings, name, defaultVal string) string { for _, s := range settings { if s.Name == name { diff --git a/pkg/storage/utils_test.go b/pkg/storage/utils_test.go index 2e67cc2..bca91ca 100644 --- a/pkg/storage/utils_test.go +++ b/pkg/storage/utils_test.go @@ -5,7 +5,7 @@ import ( "github.com/stretchr/testify/assert" - "github.com/yezzey-gp/yproxy/pkg/message" + "github.com/yezzey-gp/yproxy/pkg/settings" "github.com/yezzey-gp/yproxy/pkg/storage" ) @@ -17,7 +17,7 @@ func TestResolveSettings(t *testing.T) { name string defaultV string exp string - settings []message.PutSettings + settings []settings.StorageSettings } for _, tt := range []tcase{ @@ -31,7 +31,7 @@ func TestResolveSettings(t *testing.T) { "ababa", "aboba", "aboba", - []message.PutSettings{ + []settings.StorageSettings{ { Name: "djewikdeowp", Value: "jdoiwejoidew", @@ -43,7 +43,7 @@ func TestResolveSettings(t *testing.T) { "ababa", "aboba", "valval", - []message.PutSettings{ + []settings.StorageSettings{ { Name: "ababa", Value: "valval", From 9891be719afd2598c6f3eb0720c4c9eaaf0c9636 Mon Sep 17 00:00:00 2001 From: reshke kirill Date: Mon, 26 Aug 2024 12:24:56 +0000 Subject: [PATCH 2/6] fixes --- pkg/message/cat_message_v2.go | 5 +++-- pkg/message/message.go | 1 + pkg/message/message_test.go | 4 ++-- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/message/cat_message_v2.go b/pkg/message/cat_message_v2.go index 1018459..11abf53 100644 --- a/pkg/message/cat_message_v2.go +++ b/pkg/message/cat_message_v2.go @@ -16,17 +16,18 @@ type CatMessageV2 struct { var _ ProtoMessage = &CatMessage{} -func NewCatMessageV2(name string, decrypt bool, StartOffset uint64) *CatMessageV2 { +func NewCatMessageV2(name string, decrypt bool, StartOffset uint64, Settings []settings.StorageSettings) *CatMessageV2 { return &CatMessageV2{ Name: name, Decrypt: decrypt, StartOffset: StartOffset, + Settings: Settings, } } func (c *CatMessageV2) Encode() []byte { bt := []byte{ - byte(MessageTypeCat), + byte(MessageTypeCatV2), 0, 0, 0, diff --git a/pkg/message/message.go b/pkg/message/message.go index 9fd8a65..2e114f6 100644 --- a/pkg/message/message.go +++ b/pkg/message/message.go @@ -22,6 +22,7 @@ const ( MessageTypeCopy = MessageType(51) MessageTypeGool = MessageType(52) MessageTypePutV2 = MessageType(53) + MessageTypeCatV2 = MessageType(54) DecryptMessage = RequestEncryption(1) NoDecryptMessage = RequestEncryption(0) diff --git a/pkg/message/message_test.go b/pkg/message/message_test.go index 35898e0..d5ec9ad 100644 --- a/pkg/message/message_test.go +++ b/pkg/message/message_test.go @@ -162,10 +162,10 @@ func TestCatMsgV2(t *testing.T) { }, } { - msg := message.NewCatMessage(tt.name, tt.decrypt, tt.off) + msg := message.NewCatMessageV2(tt.name, tt.decrypt, tt.off, tt.settings) body := msg.Encode() - msg2 := message.CatMessage{} + msg2 := message.CatMessageV2{} msg2.Decode(body[8:]) From 3d646af3ee42f23f3204e3509b1e7a98c91f30c8 Mon Sep 17 00:00:00 2001 From: reshke kirill Date: Mon, 26 Aug 2024 12:25:36 +0000 Subject: [PATCH 3/6] Fixes 2 --- cmd/client/main.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/client/main.go b/cmd/client/main.go index afd3845..a715679 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -12,6 +12,7 @@ import ( "github.com/yezzey-gp/yproxy/pkg/message" "github.com/yezzey-gp/yproxy/pkg/object" "github.com/yezzey-gp/yproxy/pkg/proc" + "github.com/yezzey-gp/yproxy/pkg/settings" "github.com/yezzey-gp/yproxy/pkg/tablespace" "github.com/yezzey-gp/yproxy/pkg/ylogger" ) @@ -105,7 +106,7 @@ func putFunc(con net.Conn, instanceCnf *config.Instance, args []string) error { ycl := client.NewYClient(con) r := proc.NewProtoReader(ycl) - msg := message.NewPutMessageV2(args[0], encrypt, []message.PutSettings{ + msg := message.NewPutMessageV2(args[0], encrypt, []settings.StorageSettings{ { Name: message.StorageClassSetting, Value: storageClass, From 90337ded049ba33c8a12facc9d7aa1673b2c4ff7 Mon Sep 17 00:00:00 2001 From: reshke kirill Date: Mon, 26 Aug 2024 12:31:57 +0000 Subject: [PATCH 4/6] Fix unit --- pkg/message/cat_message_v2.go | 20 +++++--------------- pkg/message/message_test.go | 3 +++ 2 files changed, 8 insertions(+), 15 deletions(-) diff --git a/pkg/message/cat_message_v2.go b/pkg/message/cat_message_v2.go index 11abf53..e3db6d9 100644 --- a/pkg/message/cat_message_v2.go +++ b/pkg/message/cat_message_v2.go @@ -39,19 +39,11 @@ func (c *CatMessageV2) Encode() []byte { bt[1] = byte(NoDecryptMessage) } - if c.StartOffset != 0 { - bt[2] = byte(ExtendedMesssage) - } - bt = append(bt, []byte(c.Name)...) bt = append(bt, 0) - if c.StartOffset != 0 { - bt = binary.BigEndian.AppendUint64(bt, c.StartOffset) - } + bt = binary.BigEndian.AppendUint64(bt, c.StartOffset) - slen := make([]byte, 8) - binary.BigEndian.PutUint64(slen, uint64(len(c.Settings))) - bt = append(bt, slen...) + bt = binary.BigEndian.AppendUint64(bt, uint64(len(c.Settings))) for _, s := range c.Settings { @@ -75,13 +67,11 @@ func (c *CatMessageV2) Decode(body []byte) { if body[1] == byte(DecryptMessage) { c.Decrypt = true } - if body[2] == byte(ExtendedMesssage) { - c.StartOffset = binary.BigEndian.Uint64(body[4+len(c.Name)+1:]) - } + c.StartOffset = binary.BigEndian.Uint64(body[4+len(c.Name)+1:]) - settLen := binary.BigEndian.Uint64(body[4+off : 4+off+8]) + settLen := binary.BigEndian.Uint64(body[4+8+off : 4+off+8+8]) - totalOff := 4 + off + 8 + totalOff := 4 + off + 8 + 8 c.Settings = make([]settings.StorageSettings, settLen) diff --git a/pkg/message/message_test.go b/pkg/message/message_test.go index d5ec9ad..096b88a 100644 --- a/pkg/message/message_test.go +++ b/pkg/message/message_test.go @@ -112,6 +112,7 @@ func TestPutV2Msg(t *testing.T) { assert.Equal(msg.Name, msg2.Name) assert.Equal(msg.Encrypt, msg2.Encrypt) + assert.Equal(msg.Settings, msg2.Settings) } } @@ -171,6 +172,8 @@ func TestCatMsgV2(t *testing.T) { assert.Equal(msg.Name, msg2.Name) assert.Equal(msg.Decrypt, msg2.Decrypt) + assert.Equal(msg.StartOffset, msg2.StartOffset) + assert.Equal(msg.Settings, msg2.Settings) } } From b832705b983fa59e4dc418aade856d967b9fd3c3 Mon Sep 17 00:00:00 2001 From: reshke kirill Date: Mon, 26 Aug 2024 12:49:49 +0000 Subject: [PATCH 5/6] handle settings in storage cat interactions --- pkg/mock/storage.go | 16 ++++---- pkg/proc/interaction.go | 83 +++++++++++++++++++++++++------------- pkg/proc/yrreader.go | 11 +++-- pkg/storage/filestorage.go | 2 +- pkg/storage/s3storage.go | 14 ++++++- pkg/storage/storage.go | 2 +- 6 files changed, 83 insertions(+), 45 deletions(-) diff --git a/pkg/mock/storage.go b/pkg/mock/storage.go index 47902f3..6f2488b 100644 --- a/pkg/mock/storage.go +++ b/pkg/mock/storage.go @@ -42,18 +42,18 @@ func (m *MockStorageReader) EXPECT() *MockStorageReaderMockRecorder { } // CatFileFromStorage mocks base method. -func (m *MockStorageReader) CatFileFromStorage(name string, offset int64) (io.ReadCloser, error) { +func (m *MockStorageReader) CatFileFromStorage(name string, offset int64, setts []settings.StorageSettings) (io.ReadCloser, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CatFileFromStorage", name, offset) + ret := m.ctrl.Call(m, "CatFileFromStorage", name, offset, setts) ret0, _ := ret[0].(io.ReadCloser) ret1, _ := ret[1].(error) return ret0, ret1 } // CatFileFromStorage indicates an expected call of CatFileFromStorage. -func (mr *MockStorageReaderMockRecorder) CatFileFromStorage(name, offset any) *gomock.Call { +func (mr *MockStorageReaderMockRecorder) CatFileFromStorage(name, offset, setts any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CatFileFromStorage", reflect.TypeOf((*MockStorageReader)(nil).CatFileFromStorage), name, offset) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CatFileFromStorage", reflect.TypeOf((*MockStorageReader)(nil).CatFileFromStorage), name, offset, setts) } // MockStorageWriter is a mock of StorageWriter interface. @@ -220,18 +220,18 @@ func (m *MockStorageInteractor) EXPECT() *MockStorageInteractorMockRecorder { } // CatFileFromStorage mocks base method. -func (m *MockStorageInteractor) CatFileFromStorage(name string, offset int64) (io.ReadCloser, error) { +func (m *MockStorageInteractor) CatFileFromStorage(name string, offset int64, setts []settings.StorageSettings) (io.ReadCloser, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CatFileFromStorage", name, offset) + ret := m.ctrl.Call(m, "CatFileFromStorage", name, offset, setts) ret0, _ := ret[0].(io.ReadCloser) ret1, _ := ret[1].(error) return ret0, ret1 } // CatFileFromStorage indicates an expected call of CatFileFromStorage. -func (mr *MockStorageInteractorMockRecorder) CatFileFromStorage(name, offset any) *gomock.Call { +func (mr *MockStorageInteractorMockRecorder) CatFileFromStorage(name, offset, setts any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CatFileFromStorage", reflect.TypeOf((*MockStorageInteractor)(nil).CatFileFromStorage), name, offset) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CatFileFromStorage", reflect.TypeOf((*MockStorageInteractor)(nil).CatFileFromStorage), name, offset, setts) } // DeleteObject mocks base method. diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index a733054..d9dae5b 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -18,6 +18,50 @@ import ( "github.com/yezzey-gp/yproxy/pkg/ylogger" ) +func ProcessCatExtended( + s storage.StorageInteractor, + pr *ProtoReader, + name string, + decrypt bool, startOffset uint64, settings []settings.StorageSettings, cr crypt.Crypter, ycl client.YproxyClient) error { + + ycl.SetExternalFilePath(name) + + yr := NewYRetryReader(NewRestartReader(s, name, settings)) + + var contentReader io.Reader + contentReader = yr + defer yr.Close() + var err error + + if decrypt { + if cr == nil { + err := fmt.Errorf("failed to decrypt object, decrypter not configured") + _ = ycl.ReplyError(err, "cat failed") + ycl.Close() + return err + } + ylogger.Zero.Debug().Str("object-path", name).Msg("decrypt object") + contentReader, err = cr.Decrypt(yr) + if err != nil { + _ = ycl.ReplyError(err, "failed to decrypt object") + + return err + } + } + + if startOffset != 0 { + io.CopyN(io.Discard, contentReader, int64(startOffset)) + } + + n, err := io.Copy(ycl.GetRW(), contentReader) + if err != nil { + _ = ycl.ReplyError(err, "copy failed to complete") + } + ylogger.Zero.Debug().Int64("copied bytes", n).Msg("decrypt object") + + return nil +} + func ProcessPutExtended( s storage.StorageInteractor, pr *ProtoReader, @@ -141,42 +185,23 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl switch tp { case message.MessageTypeCat: + // omit first byte msg := message.CatMessage{} msg.Decode(body) - ycl.SetExternalFilePath(msg.Name) - - yr := NewYRetryReader(NewRestartReader(s, msg.Name)) - - var contentReader io.Reader - contentReader = yr - defer yr.Close() - - if msg.Decrypt { - if cr == nil { - _ = ycl.ReplyError(err, "failed to decrypt object, decrypter not configured") - ycl.Close() - return nil - } - ylogger.Zero.Debug().Str("object-path", msg.Name).Msg("decrypt object") - contentReader, err = cr.Decrypt(yr) - if err != nil { - _ = ycl.ReplyError(err, "failed to decrypt object") - - return err - } + if err := ProcessCatExtended(s, pr, msg.Name, msg.Decrypt, msg.StartOffset, nil, cr, ycl); err != nil { + return err } - if msg.StartOffset != 0 { - io.CopyN(io.Discard, contentReader, int64(msg.StartOffset)) - } + case message.MessageTypeCatV2: + // omit first byte + msg := message.CatMessageV2{} + msg.Decode(body) - n, err := io.Copy(ycl.GetRW(), contentReader) - if err != nil { - _ = ycl.ReplyError(err, "copy failed to complete") + if err := ProcessCatExtended(s, pr, msg.Name, msg.Decrypt, msg.StartOffset, msg.Settings, cr, ycl); err != nil { + return err } - ylogger.Zero.Debug().Int64("copied bytes", n).Msg("decrypt object") case message.MessageTypePut: @@ -264,7 +289,7 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl for i := 0; i < len(objectMetas); i++ { path := strings.TrimPrefix(objectMetas[i].Path, instanceCnf.StorageCnf.StoragePrefix) //get reader - readerFromOldBucket := NewYRetryReader(NewRestartReader(oldStorage, path)) + readerFromOldBucket := NewYRetryReader(NewRestartReader(oldStorage, path, nil)) var fromReader io.Reader fromReader = readerFromOldBucket defer readerFromOldBucket.Close() diff --git a/pkg/proc/yrreader.go b/pkg/proc/yrreader.go index 12f63a7..45d5790 100644 --- a/pkg/proc/yrreader.go +++ b/pkg/proc/yrreader.go @@ -5,6 +5,7 @@ import ( "io" "time" + "github.com/yezzey-gp/yproxy/pkg/settings" "github.com/yezzey-gp/yproxy/pkg/storage" "github.com/yezzey-gp/yproxy/pkg/ylogger" ) @@ -18,6 +19,7 @@ type YRestartReader struct { underlying io.ReadCloser s storage.StorageInteractor name string + settings []settings.StorageSettings } // Close implements RestartReader. @@ -34,11 +36,12 @@ func (y *YRestartReader) Read(p []byte) (n int, err error) { } func NewRestartReader(s storage.StorageInteractor, - name string) RestartReader { + name string, setts []settings.StorageSettings) RestartReader { return &YRestartReader{ - s: s, - name: name, + s: s, + name: name, + settings: setts, } } @@ -51,7 +54,7 @@ func (y *YRestartReader) Restart(offsetStart int64) error { } else { ylogger.Zero.Error().Str("object-path", y.name).Int64("offset", offsetStart).Msg("cat object with offset after possible error") } - r, err := y.s.CatFileFromStorage(y.name, offsetStart) + r, err := y.s.CatFileFromStorage(y.name, offsetStart, y.settings) if err != nil { return err } diff --git a/pkg/storage/filestorage.go b/pkg/storage/filestorage.go index 4f64436..3f39350 100644 --- a/pkg/storage/filestorage.go +++ b/pkg/storage/filestorage.go @@ -20,7 +20,7 @@ type FileStorageInteractor struct { cnf *config.Storage } -func (s *FileStorageInteractor) CatFileFromStorage(name string, offset int64) (io.ReadCloser, error) { +func (s *FileStorageInteractor) CatFileFromStorage(name string, offset int64, _ []settings.StorageSettings) (io.ReadCloser, error) { file, err := os.Open(path.Join(s.cnf.StoragePrefix, name)) if err != nil { return nil, err diff --git a/pkg/storage/s3storage.go b/pkg/storage/s3storage.go index 3c472f1..6c5c14c 100644 --- a/pkg/storage/s3storage.go +++ b/pkg/storage/s3storage.go @@ -28,7 +28,7 @@ type S3StorageInteractor struct { bucketMap map[string]string } -func (s *S3StorageInteractor) CatFileFromStorage(name string, offset int64) (io.ReadCloser, error) { +func (s *S3StorageInteractor) CatFileFromStorage(name string, offset int64, setts []settings.StorageSettings) (io.ReadCloser, error) { // XXX: fix this sess, err := s.pool.GetSession(context.TODO()) if err != nil { @@ -37,8 +37,18 @@ func (s *S3StorageInteractor) CatFileFromStorage(name string, offset int64) (io. } objectPath := path.Join(s.cnf.StoragePrefix, name) + + tableSpace := ResolveStorageSetting(setts, message.TableSpaceSetting, tablespace.DefaultTableSpace) + + bucket, ok := s.bucketMap[tableSpace] + if !ok { + err := fmt.Errorf("failed to match tablespace %s to s3 bucket.", tableSpace) + ylogger.Zero.Err(err) + return nil, err + } + input := &s3.GetObjectInput{ - Bucket: &s.cnf.StorageBucket, + Bucket: &bucket, Key: aws.String(objectPath), Range: aws.String(fmt.Sprintf("bytes=%d-", offset)), } diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index bb5cd0d..3e00c4c 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -11,7 +11,7 @@ import ( ) type StorageReader interface { - CatFileFromStorage(name string, offset int64) (io.ReadCloser, error) + CatFileFromStorage(name string, offset int64, setts []settings.StorageSettings) (io.ReadCloser, error) } type StorageWriter interface { From 0e37661190caf6a499cee382e77dffc157ef373a Mon Sep 17 00:00:00 2001 From: reshke kirill Date: Mon, 26 Aug 2024 12:59:29 +0000 Subject: [PATCH 6/6] Fxies --- pkg/message/message.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/message/message.go b/pkg/message/message.go index 2e114f6..4392171 100644 --- a/pkg/message/message.go +++ b/pkg/message/message.go @@ -37,6 +37,8 @@ func (m MessageType) String() string { switch m { case MessageTypeCat: return "CAT" + case MessageTypeCatV2: + return "CATV2" case MessageTypePut: return "PUT" case MessageTypePutV2: