diff --git a/Makefile b/Makefile index 8644c32..ca4b0b5 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ mockgen: mockgen -source=pkg/proc/yrreader.go -destination=pkg/mock/proc/yrreader.go -package=mock mockgen -source=pkg/database/database.go -destination=pkg/mock/database.go -package=mock mockgen -source=pkg/backups/backups.go -destination=pkg/mock/backups.go -package=mock - mockgen -source=pkg/storage/storage_interractor.go -destination=pkg/mock/storage.go -package=mock + mockgen -source=pkg/storage/storage.go -destination=pkg/mock/storage.go -package=mock version = $(shell git describe --tags --abbrev=0) package: diff --git a/cmd/client/main.go b/cmd/client/main.go index 0570078..0a65828 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -25,6 +25,7 @@ var offset uint64 var segmentPort int var segmentNum int var confirm bool +var garbage bool // TODOV func Runner(f func(net.Conn, *config.Instance, []string) error) func(*cobra.Command, []string) error { @@ -182,12 +183,10 @@ func listFunc(con net.Conn, instanceCnf *config.Instance, args []string) error { meta.Decode(body) res = append(res, meta.Content...) - break case message.MessageTypeReadyForQuery: done = true - break default: - return fmt.Errorf("Incorrect message type: %s", tp.String()) + return fmt.Errorf("incorrect message type: %s", tp.String()) } } @@ -217,10 +216,10 @@ var copyCmd = &cobra.Command{ } var deleteCmd = &cobra.Command{ - Use: "delete_garbage", - Short: "delete_garbage", + Use: "delete", + Short: "delete", RunE: func(cmd *cobra.Command, args []string) error { - ylogger.Zero.Info().Msg("Execute delete_garbage command") + ylogger.Zero.Info().Msg("Execute delete command") err := config.LoadInstanceConfig(cfgPath) if err != nil { return err @@ -231,10 +230,10 @@ var deleteCmd = &cobra.Command{ if err != nil { return err } - defer con.Close() + ylogger.Zero.Info().Str("name", args[0]).Msg("delete") - msg := message.NewDeleteMessage(args[0], segmentPort, segmentNum, confirm).Encode() + msg := message.NewDeleteMessage(args[0], segmentPort, segmentNum, confirm, garbage).Encode() _, err = con.Write(msg) if err != nil { return err @@ -292,8 +291,9 @@ func init() { rootCmd.AddCommand(listCmd) deleteCmd.PersistentFlags().IntVarP(&segmentPort, "port", "p", 6000, "port that segment is listening on") - deleteCmd.PersistentFlags().IntVarP(&segmentNum, "segnum", "s", 0, "number of the segment") + deleteCmd.PersistentFlags().IntVarP(&segmentNum, "segnum", "s", 0, "logical number of a segment") deleteCmd.PersistentFlags().BoolVarP(&confirm, "confirm", "", false, "confirm deletion") + deleteCmd.PersistentFlags().BoolVarP(&garbage, "garbage", "g", false, "delete garbage") rootCmd.AddCommand(deleteCmd) } diff --git a/pkg/backups/backups.go b/pkg/backups/backups.go index 27bf1c6..6245ad1 100644 --- a/pkg/backups/backups.go +++ b/pkg/backups/backups.go @@ -19,7 +19,7 @@ type BackupInterractor interface { GetFirstLSN(int) (uint64, error) } -type WalgBackupInterractor struct { +type WalgBackupInterractor struct { //TODO: rewrite to using s3 instead of wal-g cmd } // get lsn of the oldest backup @@ -34,29 +34,29 @@ func (b *WalgBackupInterractor) GetFirstLSN(seg int) (uint64, error) { ylogger.Zero.Debug().AnErr("error", err).Msg("Failed to run st ls") return 0, err } - p1 := strings.Split(out.String(), "\n") + lines := strings.Split(out.String(), "\n") minLSN := BackupLSN{Lsn: ^uint64(0)} - for _, line := range p1 { + for _, line := range lines { if !strings.Contains(line, ".json") { continue } - p2 := strings.Split(line, " ") - p3 := p2[len(p2)-1] + parts := strings.Split(line, " ") + fileName := parts[len(parts)-1] - ylogger.Zero.Debug().Str("file: %s", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, p3)).Msg("check lsn in file") - cmd2 := exec.Command("/usr/bin/wal-g", "st", "cat", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, p3), "--config=/etc/wal-g/wal-g.yaml") + ylogger.Zero.Debug().Str("file: %s", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, fileName)).Msg("check lsn in file") + catCmd := exec.Command("/usr/bin/wal-g", "st", "cat", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, fileName), "--config=/etc/wal-g/wal-g.yaml") - var out2 bytes.Buffer - cmd2.Stdout = &out2 + var catOut bytes.Buffer + catCmd.Stdout = &catOut - err = cmd2.Run() + err = catCmd.Run() if err != nil { ylogger.Zero.Debug().AnErr("error", err).Msg("Failed to run st cat") return 0, err } lsn := BackupLSN{} - err = json.Unmarshal(out2.Bytes(), &lsn) + err = json.Unmarshal(catOut.Bytes(), &lsn) if lsn.Lsn < minLSN.Lsn { minLSN.Lsn = lsn.Lsn diff --git a/pkg/message/delete_message.go b/pkg/message/delete_message.go index 2af9881..599ba1c 100644 --- a/pkg/message/delete_message.go +++ b/pkg/message/delete_message.go @@ -10,16 +10,18 @@ type DeleteMessage struct { //seg port Port int Segnum int Confirm bool + Garbage bool } var _ ProtoMessage = &DeleteMessage{} -func NewDeleteMessage(name string, port int, seg int, confirm bool) *DeleteMessage { +func NewDeleteMessage(name string, port int, seg int, confirm bool, garbage bool) *DeleteMessage { return &DeleteMessage{ Name: name, Port: port, Segnum: seg, Confirm: confirm, + Garbage: garbage, } } @@ -34,6 +36,9 @@ func (c *DeleteMessage) Encode() []byte { if c.Confirm { bt[1] = 1 } + if c.Garbage { + bt[2] = 1 + } bt = append(bt, []byte(c.Name)...) bt = append(bt, 0) @@ -56,6 +61,9 @@ func (c *DeleteMessage) Decode(body []byte) { if body[1] == 1 { c.Confirm = true } + if body[2] == 1 { + c.Garbage = true + } c.Name = c.GetDeleteName(body[4:]) c.Port = int(binary.BigEndian.Uint64(body[len(body)-16 : len(body)-8])) c.Segnum = int(binary.BigEndian.Uint64(body[len(body)-8:])) diff --git a/pkg/message/message_test.go b/pkg/message/message_test.go index a522ccf..8cb8734 100644 --- a/pkg/message/message_test.go +++ b/pkg/message/message_test.go @@ -185,7 +185,7 @@ func TestCopyMsg(t *testing.T) { func TestDeleteMsg(t *testing.T) { assert := assert.New(t) - msg := message.NewDeleteMessage("myname/mynextname", 5432, 42, true) + msg := message.NewDeleteMessage("myname/mynextname", 5432, 42, true, true) body := msg.Encode() assert.Equal(body[8], byte(message.MessageTypeDelete)) @@ -197,4 +197,5 @@ func TestDeleteMsg(t *testing.T) { assert.Equal(5432, msg2.Port) assert.Equal(42, msg2.Segnum) assert.True(msg2.Confirm) + assert.True(msg2.Garbage) } diff --git a/pkg/mock/storage.go b/pkg/mock/storage.go index e0bc880..a733f0d 100644 --- a/pkg/mock/storage.go +++ b/pkg/mock/storage.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: pkg/storage/storage_interractor.go +// Source: pkg/storage/storage.go // Package mock is a generated GoMock package. package mock @@ -12,115 +12,6 @@ import ( storage "github.com/yezzey-gp/yproxy/pkg/storage" ) -// MockStorageInteractor is a mock of StorageInteractor interface. -type MockStorageInteractor struct { - ctrl *gomock.Controller - recorder *MockStorageInteractorMockRecorder -} - -// MockStorageInteractorMockRecorder is the mock recorder for MockStorageInteractor. -type MockStorageInteractorMockRecorder struct { - mock *MockStorageInteractor -} - -// NewMockStorageInteractor creates a new mock instance. -func NewMockStorageInteractor(ctrl *gomock.Controller) *MockStorageInteractor { - mock := &MockStorageInteractor{ctrl: ctrl} - mock.recorder = &MockStorageInteractorMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockStorageInteractor) EXPECT() *MockStorageInteractorMockRecorder { - return m.recorder -} - -// CatFileFromStorage mocks base method. -func (m *MockStorageInteractor) CatFileFromStorage(name string, offset int64) (io.ReadCloser, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CatFileFromStorage", name, offset) - ret0, _ := ret[0].(io.ReadCloser) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// CatFileFromStorage indicates an expected call of CatFileFromStorage. -func (mr *MockStorageInteractorMockRecorder) CatFileFromStorage(name, offset interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CatFileFromStorage", reflect.TypeOf((*MockStorageInteractor)(nil).CatFileFromStorage), name, offset) -} - -// DeleteObject mocks base method. -func (m *MockStorageInteractor) DeleteObject(key string) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeleteObject", key) - ret0, _ := ret[0].(error) - return ret0 -} - -// DeleteObject indicates an expected call of DeleteObject. -func (mr *MockStorageInteractorMockRecorder) DeleteObject(key interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObject", reflect.TypeOf((*MockStorageInteractor)(nil).DeleteObject), key) -} - -// ListPath mocks base method. -func (m *MockStorageInteractor) ListPath(name string) ([]*storage.S3ObjectMeta, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ListPath", name) - ret0, _ := ret[0].([]*storage.S3ObjectMeta) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// ListPath indicates an expected call of ListPath. -func (mr *MockStorageInteractorMockRecorder) ListPath(name interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListPath", reflect.TypeOf((*MockStorageInteractor)(nil).ListPath), name) -} - -// MoveObject mocks base method. -func (m *MockStorageInteractor) MoveObject(from, to string) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "MoveObject", from, to) - ret0, _ := ret[0].(error) - return ret0 -} - -// MoveObject indicates an expected call of MoveObject. -func (mr *MockStorageInteractorMockRecorder) MoveObject(from, to interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MoveObject", reflect.TypeOf((*MockStorageInteractor)(nil).MoveObject), from, to) -} - -// PatchFile mocks base method. -func (m *MockStorageInteractor) PatchFile(name string, r io.ReadSeeker, startOffste int64) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PatchFile", name, r, startOffste) - ret0, _ := ret[0].(error) - return ret0 -} - -// PatchFile indicates an expected call of PatchFile. -func (mr *MockStorageInteractorMockRecorder) PatchFile(name, r, startOffste interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PatchFile", reflect.TypeOf((*MockStorageInteractor)(nil).PatchFile), name, r, startOffste) -} - -// PutFileToDest mocks base method. -func (m *MockStorageInteractor) PutFileToDest(name string, r io.Reader) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PutFileToDest", name, r) - ret0, _ := ret[0].(error) - return ret0 -} - -// PutFileToDest indicates an expected call of PutFileToDest. -func (mr *MockStorageInteractorMockRecorder) PutFileToDest(name, r interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutFileToDest", reflect.TypeOf((*MockStorageInteractor)(nil).PutFileToDest), name, r) -} - // MockStorageReader is a mock of StorageReader interface. type MockStorageReader struct { ctrl *gomock.Controller @@ -159,21 +50,6 @@ func (mr *MockStorageReaderMockRecorder) CatFileFromStorage(name, offset interfa return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CatFileFromStorage", reflect.TypeOf((*MockStorageReader)(nil).CatFileFromStorage), name, offset) } -// ListPath mocks base method. -func (m *MockStorageReader) ListPath(name string) ([]*storage.S3ObjectMeta, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ListPath", name) - ret0, _ := ret[0].([]*storage.S3ObjectMeta) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// ListPath indicates an expected call of ListPath. -func (mr *MockStorageReaderMockRecorder) ListPath(name interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListPath", reflect.TypeOf((*MockStorageReader)(nil).ListPath), name) -} - // MockStorageWriter is a mock of StorageWriter interface. type MockStorageWriter struct { ctrl *gomock.Controller @@ -198,17 +74,17 @@ func (m *MockStorageWriter) EXPECT() *MockStorageWriterMockRecorder { } // PatchFile mocks base method. -func (m *MockStorageWriter) PatchFile(name string, r io.ReadSeeker, startOffste int64) error { +func (m *MockStorageWriter) PatchFile(name string, r io.ReadSeeker, startOffset int64) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PatchFile", name, r, startOffste) + ret := m.ctrl.Call(m, "PatchFile", name, r, startOffset) ret0, _ := ret[0].(error) return ret0 } // PatchFile indicates an expected call of PatchFile. -func (mr *MockStorageWriterMockRecorder) PatchFile(name, r, startOffste interface{}) *gomock.Call { +func (mr *MockStorageWriterMockRecorder) PatchFile(name, r, startOffset interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PatchFile", reflect.TypeOf((*MockStorageWriter)(nil).PatchFile), name, r, startOffste) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PatchFile", reflect.TypeOf((*MockStorageWriter)(nil).PatchFile), name, r, startOffset) } // PutFileToDest mocks base method. @@ -249,10 +125,10 @@ func (m *MockStorageLister) EXPECT() *MockStorageListerMockRecorder { } // ListPath mocks base method. -func (m *MockStorageLister) ListPath(prefix string) ([]*storage.S3ObjectMeta, error) { +func (m *MockStorageLister) ListPath(prefix string) ([]*storage.ObjectInfo, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ListPath", prefix) - ret0, _ := ret[0].([]*storage.S3ObjectMeta) + ret0, _ := ret[0].([]*storage.ObjectInfo) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -313,3 +189,112 @@ func (mr *MockStorageMoverMockRecorder) MoveObject(from, to interface{}) *gomock mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MoveObject", reflect.TypeOf((*MockStorageMover)(nil).MoveObject), from, to) } + +// MockStorageInteractor is a mock of StorageInteractor interface. +type MockStorageInteractor struct { + ctrl *gomock.Controller + recorder *MockStorageInteractorMockRecorder +} + +// MockStorageInteractorMockRecorder is the mock recorder for MockStorageInteractor. +type MockStorageInteractorMockRecorder struct { + mock *MockStorageInteractor +} + +// NewMockStorageInteractor creates a new mock instance. +func NewMockStorageInteractor(ctrl *gomock.Controller) *MockStorageInteractor { + mock := &MockStorageInteractor{ctrl: ctrl} + mock.recorder = &MockStorageInteractorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockStorageInteractor) EXPECT() *MockStorageInteractorMockRecorder { + return m.recorder +} + +// CatFileFromStorage mocks base method. +func (m *MockStorageInteractor) CatFileFromStorage(name string, offset int64) (io.ReadCloser, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CatFileFromStorage", name, offset) + ret0, _ := ret[0].(io.ReadCloser) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CatFileFromStorage indicates an expected call of CatFileFromStorage. +func (mr *MockStorageInteractorMockRecorder) CatFileFromStorage(name, offset interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CatFileFromStorage", reflect.TypeOf((*MockStorageInteractor)(nil).CatFileFromStorage), name, offset) +} + +// DeleteObject mocks base method. +func (m *MockStorageInteractor) DeleteObject(key string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteObject", key) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteObject indicates an expected call of DeleteObject. +func (mr *MockStorageInteractorMockRecorder) DeleteObject(key interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObject", reflect.TypeOf((*MockStorageInteractor)(nil).DeleteObject), key) +} + +// ListPath mocks base method. +func (m *MockStorageInteractor) ListPath(prefix string) ([]*storage.ObjectInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListPath", prefix) + ret0, _ := ret[0].([]*storage.ObjectInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListPath indicates an expected call of ListPath. +func (mr *MockStorageInteractorMockRecorder) ListPath(prefix interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListPath", reflect.TypeOf((*MockStorageInteractor)(nil).ListPath), prefix) +} + +// MoveObject mocks base method. +func (m *MockStorageInteractor) MoveObject(from, to string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MoveObject", from, to) + ret0, _ := ret[0].(error) + return ret0 +} + +// MoveObject indicates an expected call of MoveObject. +func (mr *MockStorageInteractorMockRecorder) MoveObject(from, to interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MoveObject", reflect.TypeOf((*MockStorageInteractor)(nil).MoveObject), from, to) +} + +// PatchFile mocks base method. +func (m *MockStorageInteractor) PatchFile(name string, r io.ReadSeeker, startOffset int64) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PatchFile", name, r, startOffset) + ret0, _ := ret[0].(error) + return ret0 +} + +// PatchFile indicates an expected call of PatchFile. +func (mr *MockStorageInteractorMockRecorder) PatchFile(name, r, startOffset interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PatchFile", reflect.TypeOf((*MockStorageInteractor)(nil).PatchFile), name, r, startOffset) +} + +// PutFileToDest mocks base method. +func (m *MockStorageInteractor) PutFileToDest(name string, r io.Reader) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PutFileToDest", name, r) + ret0, _ := ret[0].(error) + return ret0 +} + +// PutFileToDest indicates an expected call of PutFileToDest. +func (mr *MockStorageInteractorMockRecorder) PutFileToDest(name, r interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutFileToDest", reflect.TypeOf((*MockStorageInteractor)(nil).PutFileToDest), name, r) +} diff --git a/pkg/proc/delete_handler_test.go b/pkg/proc/delete_handler_test.go index 895faff..a37995d 100644 --- a/pkg/proc/delete_handler_test.go +++ b/pkg/proc/delete_handler_test.go @@ -60,7 +60,7 @@ func TestFilesToDeletion(t *testing.T) { Confirm: false, } - filesInStorage := []*storage.S3ObjectMeta{ + filesInStorage := []*storage.ObjectInfo{ {Path: "1663_16530_not-deleted_18002_"}, {Path: "1663_16530_deleted-after-backup_18002_"}, {Path: "1663_16530_deleted-when-backup-start_18002_"}, diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index f357f1a..b3f81da 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -335,10 +335,18 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl BackupInterractor: backupHandler, } - err = dh.HandleDeleteGarbage(msg) - if err != nil { - _ = ycl.ReplyError(err, "failed to finish operation") - return nil + if msg.Garbage { + err = dh.HandleDeleteGarbage(msg) + if err != nil { + _ = ycl.ReplyError(err, "failed to finish operation") + return nil + } + } else { + err = dh.HandleDeleteFile(msg) + if err != nil { + _ = ycl.ReplyError(err, "failed to finish operation") + return nil + } } if _, err = ycl.GetRW().Write(message.NewReadyForQueryMessage().Encode()); err != nil { diff --git a/pkg/storage/s3storage.go b/pkg/storage/s3storage.go index f190718..bbf6e2b 100644 --- a/pkg/storage/s3storage.go +++ b/pkg/storage/s3storage.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "path" + "strings" "github.com/yezzey-gp/aws-sdk-go/aws" "github.com/yezzey-gp/aws-sdk-go/service/s3" @@ -136,3 +137,29 @@ func (s *S3StorageInteractor) ListPath(prefix string) ([]*ObjectInfo, error) { } return metas, nil } + +func (s *S3StorageInteractor) DeleteObject(key string) error { + sess, err := s.pool.GetSession(context.TODO()) + if err != nil { + ylogger.Zero.Err(err).Msg("failed to acquire s3 session") + return err + } + ylogger.Zero.Debug().Msg("aquired session") + + if !strings.HasPrefix(key, s.cnf.StoragePrefix) { + key = path.Join(s.cnf.StoragePrefix, key) + } + + input2 := s3.DeleteObjectInput{ + Bucket: &s.cnf.StorageBucket, + Key: aws.String(key), + } + + _, err = sess.DeleteObject(&input2) + if err != nil { + ylogger.Zero.Err(err).Msg("failed to delete old object") + return err + } + ylogger.Zero.Debug().Msg("deleted object") + return nil +} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 5a6a1eb..6f02559 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -1,18 +1,10 @@ package storage import ( - "context" "fmt" "io" - "path" - "strings" - - "github.com/yezzey-gp/aws-sdk-go/aws" - "github.com/yezzey-gp/aws-sdk-go/service/s3" - "github.com/yezzey-gp/aws-sdk-go/service/s3/s3manager" "github.com/yezzey-gp/yproxy/config" - "github.com/yezzey-gp/yproxy/pkg/ylogger" ) type StorageReader interface { @@ -30,9 +22,10 @@ type StorageLister interface { type StorageMover interface { MoveObject(from string, to string) error - DeleteObject(key string) error } + +//go:generate mockgen -destination=pkg/mock/storage.go -package=mock type StorageInteractor interface { StorageReader StorageWriter @@ -53,181 +46,5 @@ func NewStorage(cnf *config.Storage) (StorageInteractor, error) { }, nil default: return nil, fmt.Errorf("wrong storage type " + cnf.StorageType) - - } -} - -func (s *S3StorageInteractor) CatFileFromStorage(name string, offset int64) (io.ReadCloser, error) { - // XXX: fix this - sess, err := s.pool.GetSession(context.TODO()) - if err != nil { - ylogger.Zero.Err(err).Msg("failed to acquire s3 session") - return nil, err - } - - objectPath := path.Join(s.cnf.StoragePrefix, name) - input := &s3.GetObjectInput{ - Bucket: &s.cnf.StorageBucket, - Key: aws.String(objectPath), - Range: aws.String(fmt.Sprintf("bytes=%d-", offset)), - } - - ylogger.Zero.Debug().Str("key", objectPath).Int64("offset", offset).Str("bucket", - s.cnf.StorageBucket).Msg("requesting external storage") - - object, err := sess.GetObject(input) - return object.Body, err -} - -func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader) error { - sess, err := s.pool.GetSession(context.TODO()) - if err != nil { - ylogger.Zero.Err(err).Msg("failed to acquire s3 session") - return nil - } - - objectPath := path.Join(s.cnf.StoragePrefix, name) - - up := s3manager.NewUploaderWithClient(sess, func(uploader *s3manager.Uploader) { - uploader.PartSize = int64(1 << 24) - uploader.Concurrency = 1 - }) - - _, err = up.Upload( - &s3manager.UploadInput{ - Bucket: aws.String(s.cnf.StorageBucket), - Key: aws.String(objectPath), - Body: r, - StorageClass: aws.String("STANDARD"), - }, - ) - - return err -} - -func (s *S3StorageInteractor) PatchFile(name string, r io.ReadSeeker, startOffste int64) error { - sess, err := s.pool.GetSession(context.TODO()) - if err != nil { - ylogger.Zero.Err(err).Msg("failed to acquire s3 session") - return nil - } - - objectPath := path.Join(s.cnf.StoragePrefix, name) - - input := &s3.PatchObjectInput{ - Bucket: &s.cnf.StorageBucket, - Key: aws.String(objectPath), - Body: r, - ContentRange: aws.String(fmt.Sprintf("bytes %d-18446744073709551615", startOffste)), - } - - _, err = sess.PatchObject(input) - - ylogger.Zero.Debug().Str("key", objectPath).Str("bucket", - s.cnf.StorageBucket).Msg("modifying file in external storage") - - return err -} - -type S3ObjectMeta struct { - Path string - Size int64 -} - -func (s *S3StorageInteractor) ListPath(prefix string) ([]*S3ObjectMeta, error) { - sess, err := s.pool.GetSession(context.TODO()) - if err != nil { - ylogger.Zero.Err(err).Msg("failed to acquire s3 session") - return nil, err - } - - var continuationToken *string - prefix = path.Join(s.cnf.StoragePrefix, prefix) - metas := make([]*S3ObjectMeta, 0) - - for { - input := &s3.ListObjectsV2Input{ - Bucket: &s.cnf.StorageBucket, - Prefix: aws.String(prefix), - ContinuationToken: continuationToken, - } - - out, err := sess.ListObjectsV2(input) - if err != nil { - fmt.Printf("list error: %v\n", err) - } - - for _, obj := range out.Contents { - metas = append(metas, &S3ObjectMeta{ - Path: *obj.Key, - Size: *obj.Size, - }) - } - - if !*out.IsTruncated { - break - } - - continuationToken = out.NextContinuationToken - } - return metas, nil -} - -func (s *S3StorageInteractor) MoveObject(from string, to string) error { - sess, err := s.pool.GetSession(context.TODO()) - if err != nil { - ylogger.Zero.Err(err).Msg("failed to acquire s3 session") - return err - } - ylogger.Zero.Debug().Msg("aquired session") - - fromPath := from - toPath := path.Join(s.cnf.StoragePrefix, to) - ylogger.Zero.Debug().Str("to", toPath).Msg("to path") - - input := s3.CopyObjectInput{ - Bucket: &s.cnf.StorageBucket, - CopySource: aws.String(s.cnf.StorageBucket + "/" + fromPath), - Key: aws.String(toPath), - } - - out, err := sess.CopyObject(&input) - if err != nil { - ylogger.Zero.Err(err).Msg("failed to copy object") - return err - } - ylogger.Zero.Debug().Str("", out.GoString()).Msg("copied object") - - err = s.DeleteObject(fromPath) - if err != nil { - ylogger.Zero.Err(err).Msg("failed to delete old object") - } - ylogger.Zero.Debug().Msg("deleted object") - return err -} - -func (s *S3StorageInteractor) DeleteObject(key string) error { - sess, err := s.pool.GetSession(context.TODO()) - if err != nil { - ylogger.Zero.Err(err).Msg("failed to acquire s3 session") - return err - } - ylogger.Zero.Debug().Msg("aquired session") - - if !strings.HasPrefix(key, s.cnf.StoragePrefix) { - key = path.Join(s.cnf.StoragePrefix, key) - } - - input2 := s3.DeleteObjectInput{ - Bucket: &s.cnf.StorageBucket, - Key: aws.String(key), - } - - _, err = sess.DeleteObject(&input2) - if err != nil { - ylogger.Zero.Err(err).Msg("failed to delete old object") - return err } - ylogger.Zero.Debug().Msg("deleted object") - return nil } diff --git a/pkg/storage/storage_interractor.go b/pkg/storage/storage_interractor.go deleted file mode 100644 index c23603e..0000000 --- a/pkg/storage/storage_interractor.go +++ /dev/null @@ -1,30 +0,0 @@ -package storage - -import "io" - -//go:generate mockgen -destination=pkg/mock/storage.go -package=mock -type StorageInteractor interface { - StorageReader - StorageWriter - StorageLister - StorageMover -} - -type StorageReader interface { - CatFileFromStorage(name string, offset int64) (io.ReadCloser, error) - ListPath(name string) ([]*S3ObjectMeta, error) -} - -type StorageWriter interface { - PutFileToDest(name string, r io.Reader) error - PatchFile(name string, r io.ReadSeeker, startOffste int64) error -} - -type StorageLister interface { - ListPath(prefix string) ([]*S3ObjectMeta, error) -} - -type StorageMover interface { - MoveObject(from string, to string) error - DeleteObject(key string) error -}