From 8f56a9b7df95a7a4b7db010a90002099352aec18 Mon Sep 17 00:00:00 2001 From: reshke kirill Date: Sat, 24 Aug 2024 08:03:14 +0000 Subject: [PATCH] Refactor client-server interaction facilities --- cmd/client/main.go | 5 +- pkg/message/object_meta_message.go | 10 +- pkg/mock/backups.go | 9 +- pkg/mock/database.go | 9 +- pkg/mock/proc/yrreader.go | 11 +- pkg/mock/storage.go | 54 ++++---- pkg/object/objectInfo.go | 6 + pkg/proc/delete_handler_test.go | 4 +- pkg/proc/interaction.go | 197 ++++++++++++++++------------- pkg/storage/filestorage.go | 10 +- pkg/storage/s3storage.go | 15 +-- pkg/storage/storage.go | 6 +- 12 files changed, 193 insertions(+), 143 deletions(-) create mode 100644 pkg/object/objectInfo.go diff --git a/cmd/client/main.go b/cmd/client/main.go index 151da04..9f1ed15 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -6,12 +6,11 @@ import ( "net" "os" - "github.com/yezzey-gp/yproxy/pkg/storage" - "github.com/spf13/cobra" "github.com/yezzey-gp/yproxy/config" "github.com/yezzey-gp/yproxy/pkg/client" "github.com/yezzey-gp/yproxy/pkg/message" + "github.com/yezzey-gp/yproxy/pkg/object" "github.com/yezzey-gp/yproxy/pkg/proc" "github.com/yezzey-gp/yproxy/pkg/ylogger" ) @@ -167,7 +166,7 @@ func listFunc(con net.Conn, instanceCnf *config.Instance, args []string) error { r := proc.NewProtoReader(ycl) done := false - res := make([]*storage.ObjectInfo, 0) + res := make([]*object.ObjectInfo, 0) for { if done { break diff --git a/pkg/message/object_meta_message.go b/pkg/message/object_meta_message.go index 6f8bfc4..2c3cf2e 100644 --- a/pkg/message/object_meta_message.go +++ b/pkg/message/object_meta_message.go @@ -4,16 +4,16 @@ import ( "bytes" "encoding/binary" - "github.com/yezzey-gp/yproxy/pkg/storage" + "github.com/yezzey-gp/yproxy/pkg/object" ) type ObjectInfoMessage struct { - Content []*storage.ObjectInfo + Content []*object.ObjectInfo } var _ ProtoMessage = &ObjectInfoMessage{} -func NewObjectMetaMessage(content []*storage.ObjectInfo) *ObjectInfoMessage { +func NewObjectMetaMessage(content []*object.ObjectInfo) *ObjectInfoMessage { return &ObjectInfoMessage{ Content: content, } @@ -44,12 +44,12 @@ func (c *ObjectInfoMessage) Encode() []byte { func (c *ObjectInfoMessage) Decode(body []byte) { body = body[4:] - c.Content = make([]*storage.ObjectInfo, 0) + c.Content = make([]*object.ObjectInfo, 0) for len(body) > 0 { name, index := c.GetString(body) size := int64(binary.BigEndian.Uint64(body[index : index+8])) - c.Content = append(c.Content, &storage.ObjectInfo{ + c.Content = append(c.Content, &object.ObjectInfo{ Path: name, Size: size, }) diff --git a/pkg/mock/backups.go b/pkg/mock/backups.go index 530bb1b..8ec8647 100644 --- a/pkg/mock/backups.go +++ b/pkg/mock/backups.go @@ -1,5 +1,10 @@ // Code generated by MockGen. DO NOT EDIT. // Source: pkg/backups/backups.go +// +// Generated by this command: +// +// mockgen -source=pkg/backups/backups.go -destination=pkg/mock/backups.go -package=mock +// // Package mock is a generated GoMock package. package mock @@ -7,7 +12,7 @@ package mock import ( reflect "reflect" - gomock "github.com/golang/mock/gomock" + gomock "go.uber.org/mock/gomock" ) // MockBackupInterractor is a mock of BackupInterractor interface. @@ -43,7 +48,7 @@ func (m *MockBackupInterractor) GetFirstLSN(arg0 int) (uint64, error) { } // GetFirstLSN indicates an expected call of GetFirstLSN. -func (mr *MockBackupInterractorMockRecorder) GetFirstLSN(arg0 interface{}) *gomock.Call { +func (mr *MockBackupInterractorMockRecorder) GetFirstLSN(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetFirstLSN", reflect.TypeOf((*MockBackupInterractor)(nil).GetFirstLSN), arg0) } diff --git a/pkg/mock/database.go b/pkg/mock/database.go index d04c573..04408c2 100644 --- a/pkg/mock/database.go +++ b/pkg/mock/database.go @@ -1,5 +1,10 @@ // Code generated by MockGen. DO NOT EDIT. // Source: pkg/database/database.go +// +// Generated by this command: +// +// mockgen -source=pkg/database/database.go -destination=pkg/mock/database.go -package=mock +// // Package mock is a generated GoMock package. package mock @@ -7,7 +12,7 @@ package mock import ( reflect "reflect" - gomock "github.com/golang/mock/gomock" + gomock "go.uber.org/mock/gomock" ) // MockDatabaseInterractor is a mock of DatabaseInterractor interface. @@ -44,7 +49,7 @@ func (m *MockDatabaseInterractor) GetVirtualExpireIndexes(arg0 int) (map[string] } // GetVirtualExpireIndexes indicates an expected call of GetVirtualExpireIndexes. -func (mr *MockDatabaseInterractorMockRecorder) GetVirtualExpireIndexes(arg0 interface{}) *gomock.Call { +func (mr *MockDatabaseInterractorMockRecorder) GetVirtualExpireIndexes(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetVirtualExpireIndexes", reflect.TypeOf((*MockDatabaseInterractor)(nil).GetVirtualExpireIndexes), arg0) } diff --git a/pkg/mock/proc/yrreader.go b/pkg/mock/proc/yrreader.go index d0f6975..f125507 100644 --- a/pkg/mock/proc/yrreader.go +++ b/pkg/mock/proc/yrreader.go @@ -1,5 +1,10 @@ // Code generated by MockGen. DO NOT EDIT. // Source: pkg/proc/yrreader.go +// +// Generated by this command: +// +// mockgen -source=pkg/proc/yrreader.go -destination=pkg/mock/proc/yrreader.go -package=mock +// // Package mock is a generated GoMock package. package mock @@ -7,7 +12,7 @@ package mock import ( reflect "reflect" - gomock "github.com/golang/mock/gomock" + gomock "go.uber.org/mock/gomock" ) // MockRestartReader is a mock of RestartReader interface. @@ -57,7 +62,7 @@ func (m *MockRestartReader) Read(p []byte) (int, error) { } // Read indicates an expected call of Read. -func (mr *MockRestartReaderMockRecorder) Read(p interface{}) *gomock.Call { +func (mr *MockRestartReaderMockRecorder) Read(p any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Read", reflect.TypeOf((*MockRestartReader)(nil).Read), p) } @@ -71,7 +76,7 @@ func (m *MockRestartReader) Restart(offsetStart int64) error { } // Restart indicates an expected call of Restart. -func (mr *MockRestartReaderMockRecorder) Restart(offsetStart interface{}) *gomock.Call { +func (mr *MockRestartReaderMockRecorder) Restart(offsetStart any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Restart", reflect.TypeOf((*MockRestartReader)(nil).Restart), offsetStart) } diff --git a/pkg/mock/storage.go b/pkg/mock/storage.go index a733f0d..3dbe9cc 100644 --- a/pkg/mock/storage.go +++ b/pkg/mock/storage.go @@ -1,5 +1,10 @@ // Code generated by MockGen. DO NOT EDIT. // Source: pkg/storage/storage.go +// +// Generated by this command: +// +// mockgen -source=pkg/storage/storage.go -destination=pkg/mock/storage.go -package=mock +// // Package mock is a generated GoMock package. package mock @@ -8,8 +13,9 @@ import ( io "io" reflect "reflect" - gomock "github.com/golang/mock/gomock" - storage "github.com/yezzey-gp/yproxy/pkg/storage" + message "github.com/yezzey-gp/yproxy/pkg/message" + object "github.com/yezzey-gp/yproxy/pkg/object" + gomock "go.uber.org/mock/gomock" ) // MockStorageReader is a mock of StorageReader interface. @@ -45,7 +51,7 @@ func (m *MockStorageReader) CatFileFromStorage(name string, offset int64) (io.Re } // CatFileFromStorage indicates an expected call of CatFileFromStorage. -func (mr *MockStorageReaderMockRecorder) CatFileFromStorage(name, offset interface{}) *gomock.Call { +func (mr *MockStorageReaderMockRecorder) CatFileFromStorage(name, offset any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CatFileFromStorage", reflect.TypeOf((*MockStorageReader)(nil).CatFileFromStorage), name, offset) } @@ -82,23 +88,23 @@ func (m *MockStorageWriter) PatchFile(name string, r io.ReadSeeker, startOffset } // PatchFile indicates an expected call of PatchFile. -func (mr *MockStorageWriterMockRecorder) PatchFile(name, r, startOffset interface{}) *gomock.Call { +func (mr *MockStorageWriterMockRecorder) PatchFile(name, r, startOffset any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PatchFile", reflect.TypeOf((*MockStorageWriter)(nil).PatchFile), name, r, startOffset) } // PutFileToDest mocks base method. -func (m *MockStorageWriter) PutFileToDest(name string, r io.Reader) error { +func (m *MockStorageWriter) PutFileToDest(name string, r io.Reader, settings []message.PutSetting) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PutFileToDest", name, r) + ret := m.ctrl.Call(m, "PutFileToDest", name, r, settings) ret0, _ := ret[0].(error) return ret0 } // PutFileToDest indicates an expected call of PutFileToDest. -func (mr *MockStorageWriterMockRecorder) PutFileToDest(name, r interface{}) *gomock.Call { +func (mr *MockStorageWriterMockRecorder) PutFileToDest(name, r, settings any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutFileToDest", reflect.TypeOf((*MockStorageWriter)(nil).PutFileToDest), name, r) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutFileToDest", reflect.TypeOf((*MockStorageWriter)(nil).PutFileToDest), name, r, settings) } // MockStorageLister is a mock of StorageLister interface. @@ -125,16 +131,16 @@ func (m *MockStorageLister) EXPECT() *MockStorageListerMockRecorder { } // ListPath mocks base method. -func (m *MockStorageLister) ListPath(prefix string) ([]*storage.ObjectInfo, error) { +func (m *MockStorageLister) ListPath(prefix string) ([]*object.ObjectInfo, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ListPath", prefix) - ret0, _ := ret[0].([]*storage.ObjectInfo) + ret0, _ := ret[0].([]*object.ObjectInfo) ret1, _ := ret[1].(error) return ret0, ret1 } // ListPath indicates an expected call of ListPath. -func (mr *MockStorageListerMockRecorder) ListPath(prefix interface{}) *gomock.Call { +func (mr *MockStorageListerMockRecorder) ListPath(prefix any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListPath", reflect.TypeOf((*MockStorageLister)(nil).ListPath), prefix) } @@ -171,7 +177,7 @@ func (m *MockStorageMover) DeleteObject(key string) error { } // DeleteObject indicates an expected call of DeleteObject. -func (mr *MockStorageMoverMockRecorder) DeleteObject(key interface{}) *gomock.Call { +func (mr *MockStorageMoverMockRecorder) DeleteObject(key any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObject", reflect.TypeOf((*MockStorageMover)(nil).DeleteObject), key) } @@ -185,7 +191,7 @@ func (m *MockStorageMover) MoveObject(from, to string) error { } // MoveObject indicates an expected call of MoveObject. -func (mr *MockStorageMoverMockRecorder) MoveObject(from, to interface{}) *gomock.Call { +func (mr *MockStorageMoverMockRecorder) MoveObject(from, to any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MoveObject", reflect.TypeOf((*MockStorageMover)(nil).MoveObject), from, to) } @@ -223,7 +229,7 @@ func (m *MockStorageInteractor) CatFileFromStorage(name string, offset int64) (i } // CatFileFromStorage indicates an expected call of CatFileFromStorage. -func (mr *MockStorageInteractorMockRecorder) CatFileFromStorage(name, offset interface{}) *gomock.Call { +func (mr *MockStorageInteractorMockRecorder) CatFileFromStorage(name, offset any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CatFileFromStorage", reflect.TypeOf((*MockStorageInteractor)(nil).CatFileFromStorage), name, offset) } @@ -237,22 +243,22 @@ func (m *MockStorageInteractor) DeleteObject(key string) error { } // DeleteObject indicates an expected call of DeleteObject. -func (mr *MockStorageInteractorMockRecorder) DeleteObject(key interface{}) *gomock.Call { +func (mr *MockStorageInteractorMockRecorder) DeleteObject(key any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObject", reflect.TypeOf((*MockStorageInteractor)(nil).DeleteObject), key) } // ListPath mocks base method. -func (m *MockStorageInteractor) ListPath(prefix string) ([]*storage.ObjectInfo, error) { +func (m *MockStorageInteractor) ListPath(prefix string) ([]*object.ObjectInfo, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ListPath", prefix) - ret0, _ := ret[0].([]*storage.ObjectInfo) + ret0, _ := ret[0].([]*object.ObjectInfo) ret1, _ := ret[1].(error) return ret0, ret1 } // ListPath indicates an expected call of ListPath. -func (mr *MockStorageInteractorMockRecorder) ListPath(prefix interface{}) *gomock.Call { +func (mr *MockStorageInteractorMockRecorder) ListPath(prefix any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListPath", reflect.TypeOf((*MockStorageInteractor)(nil).ListPath), prefix) } @@ -266,7 +272,7 @@ func (m *MockStorageInteractor) MoveObject(from, to string) error { } // MoveObject indicates an expected call of MoveObject. -func (mr *MockStorageInteractorMockRecorder) MoveObject(from, to interface{}) *gomock.Call { +func (mr *MockStorageInteractorMockRecorder) MoveObject(from, to any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MoveObject", reflect.TypeOf((*MockStorageInteractor)(nil).MoveObject), from, to) } @@ -280,21 +286,21 @@ func (m *MockStorageInteractor) PatchFile(name string, r io.ReadSeeker, startOff } // PatchFile indicates an expected call of PatchFile. -func (mr *MockStorageInteractorMockRecorder) PatchFile(name, r, startOffset interface{}) *gomock.Call { +func (mr *MockStorageInteractorMockRecorder) PatchFile(name, r, startOffset any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PatchFile", reflect.TypeOf((*MockStorageInteractor)(nil).PatchFile), name, r, startOffset) } // PutFileToDest mocks base method. -func (m *MockStorageInteractor) PutFileToDest(name string, r io.Reader) error { +func (m *MockStorageInteractor) PutFileToDest(name string, r io.Reader, settings []message.PutSetting) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PutFileToDest", name, r) + ret := m.ctrl.Call(m, "PutFileToDest", name, r, settings) ret0, _ := ret[0].(error) return ret0 } // PutFileToDest indicates an expected call of PutFileToDest. -func (mr *MockStorageInteractorMockRecorder) PutFileToDest(name, r interface{}) *gomock.Call { +func (mr *MockStorageInteractorMockRecorder) PutFileToDest(name, r, settings any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutFileToDest", reflect.TypeOf((*MockStorageInteractor)(nil).PutFileToDest), name, r) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutFileToDest", reflect.TypeOf((*MockStorageInteractor)(nil).PutFileToDest), name, r, settings) } diff --git a/pkg/object/objectInfo.go b/pkg/object/objectInfo.go new file mode 100644 index 0000000..ae7f549 --- /dev/null +++ b/pkg/object/objectInfo.go @@ -0,0 +1,6 @@ +package object + +type ObjectInfo struct { + Path string + Size int64 +} diff --git a/pkg/proc/delete_handler_test.go b/pkg/proc/delete_handler_test.go index a37995d..c0fabd2 100644 --- a/pkg/proc/delete_handler_test.go +++ b/pkg/proc/delete_handler_test.go @@ -7,8 +7,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/yezzey-gp/yproxy/pkg/message" mock "github.com/yezzey-gp/yproxy/pkg/mock" + "github.com/yezzey-gp/yproxy/pkg/object" "github.com/yezzey-gp/yproxy/pkg/proc" - "github.com/yezzey-gp/yproxy/pkg/storage" ) func TestReworkingName(t *testing.T) { @@ -60,7 +60,7 @@ func TestFilesToDeletion(t *testing.T) { Confirm: false, } - filesInStorage := []*storage.ObjectInfo{ + filesInStorage := []*object.ObjectInfo{ {Path: "1663_16530_not-deleted_18002_"}, {Path: "1663_16530_deleted-after-backup_18002_"}, {Path: "1663_16530_deleted-when-backup-start_18002_"}, diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index ba2207d..a37b3d9 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -12,10 +12,110 @@ import ( "github.com/yezzey-gp/yproxy/pkg/crypt" "github.com/yezzey-gp/yproxy/pkg/database" "github.com/yezzey-gp/yproxy/pkg/message" + "github.com/yezzey-gp/yproxy/pkg/object" "github.com/yezzey-gp/yproxy/pkg/storage" "github.com/yezzey-gp/yproxy/pkg/ylogger" ) +func ProcessPutExtended( + s storage.StorageInteractor, + pr *ProtoReader, + name string, + encrypt bool, settings []message.PutSetting, cr crypt.Crypter, ycl client.YproxyClient) error { + + ycl.SetExternalFilePath(name) + + var w io.WriteCloser + + r, w := io.Pipe() + + wg := sync.WaitGroup{} + wg.Add(1) + + go func() { + + var ww io.WriteCloser = w + if encrypt { + if cr == nil { + _ = ycl.ReplyError(fmt.Errorf("failed to encrypt, crypter not configured"), "connection aborted") + ycl.Close() + return + } + + var err error + ww, err = cr.Encrypt(w) + if err != nil { + _ = ycl.ReplyError(err, "failed to encrypt") + + ycl.Close() + return + } + } else { + ylogger.Zero.Debug().Str("path", name).Msg("omit encryption for chunk") + } + + defer w.Close() + defer wg.Done() + + for { + tp, body, err := pr.ReadPacket() + if err != nil { + _ = ycl.ReplyError(err, "failed to read chunk of data") + return + } + + ylogger.Zero.Debug().Str("msg-type", tp.String()).Msg("recieved client request") + + switch tp { + case message.MessageTypeCopyData: + msg := message.CopyDataMessage{} + msg.Decode(body) + if n, err := ww.Write(msg.Data); err != nil { + _ = ycl.ReplyError(err, "failed to write copy data") + + return + } else if n != int(msg.Sz) { + + _ = ycl.ReplyError(fmt.Errorf("unfull write"), "failed to compelete request") + + return + } + case message.MessageTypeCommandComplete: + msg := message.CommandCompleteMessage{} + msg.Decode(body) + + if err := ww.Close(); err != nil { + _ = ycl.ReplyError(err, "failed to close connection") + return + } + + ylogger.Zero.Debug().Msg("closing msg writer") + return + } + } + }() + + err := s.PutFileToDest(name, r, settings) + + wg.Wait() + + if err != nil { + _ = ycl.ReplyError(err, "failed to upload") + + return err + } + + _, err = ycl.GetRW().Write(message.NewReadyForQueryMessage().Encode()) + + if err != nil { + _ = ycl.ReplyError(err, "failed to upload") + + return err + } + + return nil +} + func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyClient) error { defer func() { @@ -77,94 +177,17 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl msg := message.PutMessage{} msg.Decode(body) - ycl.SetExternalFilePath(msg.Name) - - var w io.WriteCloser - - r, w := io.Pipe() - - wg := sync.WaitGroup{} - wg.Add(1) - - go func() { - - var ww io.WriteCloser = w - if msg.Encrypt { - if cr == nil { - _ = ycl.ReplyError(err, "failed to encrypt, crypter not configured") - ycl.Close() - return - } - - var err error - ww, err = cr.Encrypt(w) - if err != nil { - _ = ycl.ReplyError(err, "failed to encrypt") - - ycl.Close() - return - } - } else { - ylogger.Zero.Debug().Str("path", msg.Name).Msg("omit encryption for chunk") - } - - defer w.Close() - defer wg.Done() - - for { - tp, body, err := pr.ReadPacket() - if err != nil { - _ = ycl.ReplyError(err, "failed to read chunk of data") - return - } - - ylogger.Zero.Debug().Str("msg-type", tp.String()).Msg("recieved client request") - - switch tp { - case message.MessageTypeCopyData: - msg := message.CopyDataMessage{} - msg.Decode(body) - if n, err := ww.Write(msg.Data); err != nil { - _ = ycl.ReplyError(err, "failed to write copy data") - - return - } else if n != int(msg.Sz) { - - _ = ycl.ReplyError(fmt.Errorf("unfull write"), "failed to compelete request") - - return - } - case message.MessageTypeCommandComplete: - msg := message.CommandCompleteMessage{} - msg.Decode(body) - - if err := ww.Close(); err != nil { - _ = ycl.ReplyError(err, "failed to close connection") - return - } - - ylogger.Zero.Debug().Msg("closing msg writer") - return - } - } - }() - - err := s.PutFileToDest(msg.Name, r) - - wg.Wait() - - if err != nil { - _ = ycl.ReplyError(err, "failed to upload") - - return nil + if err := ProcessPutExtended(s, pr, msg.Name, msg.Encrypt, nil, cr, ycl); err != nil { + return err } - _, err = ycl.GetRW().Write(message.NewReadyForQueryMessage().Encode()) + case message.MessageTypePutV2: - if err != nil { - _ = ycl.ReplyError(err, "failed to upload") + msg := message.PutMessageV2{} + msg.Decode(body) - return nil + if err := ProcessPutExtended(s, pr, msg.Name, msg.Encrypt, msg.Settings, cr, ycl); err != nil { + return err } case message.MessageTypeList: @@ -226,7 +249,7 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl return nil } - var failed []*storage.ObjectInfo + var failed []*object.ObjectInfo retryCount := 0 for len(objectMetas) > 0 && retryCount < 10 { retryCount++ @@ -289,7 +312,7 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl }() //write file - err = s.PutFileToDest(path, readerEncrypt) + err = s.PutFileToDest(path, readerEncrypt, nil) if err != nil { ylogger.Zero.Error().Err(err).Msg("failed to upload file") failed = append(failed, objectMetas[i]) @@ -298,7 +321,7 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl } objectMetas = failed fmt.Printf("failed files count: %d\n", len(objectMetas)) - failed = make([]*storage.ObjectInfo, 0) + failed = make([]*object.ObjectInfo, 0) } if len(objectMetas) > 0 { diff --git a/pkg/storage/filestorage.go b/pkg/storage/filestorage.go index 4b4d970..d066220 100644 --- a/pkg/storage/filestorage.go +++ b/pkg/storage/filestorage.go @@ -9,6 +9,8 @@ import ( "path/filepath" "github.com/yezzey-gp/yproxy/config" + "github.com/yezzey-gp/yproxy/pkg/message" + "github.com/yezzey-gp/yproxy/pkg/object" ) // Storage prefix uses as path to folder. @@ -26,8 +28,8 @@ func (s *FileStorageInteractor) CatFileFromStorage(name string, offset int64) (i _, err = io.CopyN(io.Discard, file, offset) return file, err } -func (s *FileStorageInteractor) ListPath(prefix string) ([]*ObjectInfo, error) { - var data []*ObjectInfo +func (s *FileStorageInteractor) ListPath(prefix string) ([]*object.ObjectInfo, error) { + var data []*object.ObjectInfo err := filepath.WalkDir(s.cnf.StoragePrefix+prefix, func(path string, d fs.DirEntry, err error) error { if err != nil { return err @@ -43,13 +45,13 @@ func (s *FileStorageInteractor) ListPath(prefix string) ([]*ObjectInfo, error) { if err != nil { return err } - data = append(data, &ObjectInfo{fileinfo.Name(), fileinfo.Size()}) + data = append(data, &object.ObjectInfo{Path: fileinfo.Name(), Size: fileinfo.Size()}) return nil }) return data, err } -func (s *FileStorageInteractor) PutFileToDest(name string, r io.Reader) error { +func (s *FileStorageInteractor) PutFileToDest(name string, r io.Reader, _ []message.PutSetting) error { fPath := path.Join(s.cnf.StoragePrefix, name) fDir := path.Dir(fPath) os.MkdirAll(fDir, 0700) diff --git a/pkg/storage/s3storage.go b/pkg/storage/s3storage.go index bbf6e2b..4f078d4 100644 --- a/pkg/storage/s3storage.go +++ b/pkg/storage/s3storage.go @@ -11,6 +11,8 @@ import ( "github.com/yezzey-gp/aws-sdk-go/service/s3" "github.com/yezzey-gp/aws-sdk-go/service/s3/s3manager" "github.com/yezzey-gp/yproxy/config" + "github.com/yezzey-gp/yproxy/pkg/message" + "github.com/yezzey-gp/yproxy/pkg/object" "github.com/yezzey-gp/yproxy/pkg/ylogger" ) @@ -44,7 +46,7 @@ func (s *S3StorageInteractor) CatFileFromStorage(name string, offset int64) (io. return object.Body, err } -func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader) error { +func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings []message.PutSetting) error { sess, err := s.pool.GetSession(context.TODO()) if err != nil { ylogger.Zero.Err(err).Msg("failed to acquire s3 session") @@ -94,12 +96,7 @@ func (s *S3StorageInteractor) PatchFile(name string, r io.ReadSeeker, startOffse return err } -type ObjectInfo struct { - Path string - Size int64 -} - -func (s *S3StorageInteractor) ListPath(prefix string) ([]*ObjectInfo, error) { +func (s *S3StorageInteractor) ListPath(prefix string) ([]*object.ObjectInfo, error) { sess, err := s.pool.GetSession(context.TODO()) if err != nil { ylogger.Zero.Err(err).Msg("failed to acquire s3 session") @@ -108,7 +105,7 @@ func (s *S3StorageInteractor) ListPath(prefix string) ([]*ObjectInfo, error) { var continuationToken *string prefix = path.Join(s.cnf.StoragePrefix, prefix) - metas := make([]*ObjectInfo, 0) + metas := make([]*object.ObjectInfo, 0) for { input := &s3.ListObjectsV2Input{ @@ -123,7 +120,7 @@ func (s *S3StorageInteractor) ListPath(prefix string) ([]*ObjectInfo, error) { } for _, obj := range out.Contents { - metas = append(metas, &ObjectInfo{ + metas = append(metas, &object.ObjectInfo{ Path: *obj.Key, Size: *obj.Size, }) diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 6f02559..d5e3e98 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -5,6 +5,8 @@ import ( "io" "github.com/yezzey-gp/yproxy/config" + "github.com/yezzey-gp/yproxy/pkg/message" + "github.com/yezzey-gp/yproxy/pkg/object" ) type StorageReader interface { @@ -12,12 +14,12 @@ type StorageReader interface { } type StorageWriter interface { - PutFileToDest(name string, r io.Reader) error + PutFileToDest(name string, r io.Reader, settings []message.PutSetting) error PatchFile(name string, r io.ReadSeeker, startOffset int64) error } type StorageLister interface { - ListPath(prefix string) ([]*ObjectInfo, error) + ListPath(prefix string) ([]*object.ObjectInfo, error) } type StorageMover interface {