diff --git a/Makefile b/Makefile index 689fa0c..8644c32 100644 --- a/Makefile +++ b/Makefile @@ -17,6 +17,9 @@ unittest: mockgen: mockgen -source=pkg/proc/yrreader.go -destination=pkg/mock/proc/yrreader.go -package=mock + mockgen -source=pkg/database/database.go -destination=pkg/mock/database.go -package=mock + mockgen -source=pkg/backups/backups.go -destination=pkg/mock/backups.go -package=mock + mockgen -source=pkg/storage/storage_interractor.go -destination=pkg/mock/storage.go -package=mock version = $(shell git describe --tags --abbrev=0) package: diff --git a/pkg/backups/backups.go b/pkg/backups/backups.go new file mode 100644 index 0000000..27bf1c6 --- /dev/null +++ b/pkg/backups/backups.go @@ -0,0 +1,67 @@ +package backups + +import ( + "bytes" + "encoding/json" + "fmt" + "os/exec" + "strings" + + "github.com/yezzey-gp/yproxy/pkg/ylogger" +) + +type BackupLSN struct { + Lsn uint64 `json:"LSN"` +} + +//go:generate mockgen -destination=pkg/mock/backups.go -package=mock +type BackupInterractor interface { + GetFirstLSN(int) (uint64, error) +} + +type WalgBackupInterractor struct { +} + +// get lsn of the oldest backup +func (b *WalgBackupInterractor) GetFirstLSN(seg int) (uint64, error) { + cmd := exec.Command("/usr/bin/wal-g", "st", "ls", fmt.Sprintf("segments_005/seg%d/basebackups_005/", seg), "--config=/etc/wal-g/wal-g.yaml") + ylogger.Zero.Debug().Any("flags", cmd.Args).Msg("Command args") + var out bytes.Buffer + cmd.Stdout = &out + + err := cmd.Run() + if err != nil { + ylogger.Zero.Debug().AnErr("error", err).Msg("Failed to run st ls") + return 0, err + } + p1 := strings.Split(out.String(), "\n") + + minLSN := BackupLSN{Lsn: ^uint64(0)} + for _, line := range p1 { + if !strings.Contains(line, ".json") { + continue + } + p2 := strings.Split(line, " ") + p3 := p2[len(p2)-1] + + ylogger.Zero.Debug().Str("file: %s", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, p3)).Msg("check lsn in file") + cmd2 := exec.Command("/usr/bin/wal-g", "st", "cat", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, p3), "--config=/etc/wal-g/wal-g.yaml") + + var out2 bytes.Buffer + cmd2.Stdout = &out2 + + err = cmd2.Run() + if err != nil { + ylogger.Zero.Debug().AnErr("error", err).Msg("Failed to run st cat") + return 0, err + } + lsn := BackupLSN{} + err = json.Unmarshal(out2.Bytes(), &lsn) + + if lsn.Lsn < minLSN.Lsn { + minLSN.Lsn = lsn.Lsn + } + } + + return minLSN.Lsn, err +} diff --git a/pkg/database/database.go b/pkg/database/database.go new file mode 100644 index 0000000..a57a031 --- /dev/null +++ b/pkg/database/database.go @@ -0,0 +1,177 @@ +package database + +import ( + "fmt" + "strings" + + "github.com/jackc/pgx" + "github.com/jackc/pgx/pgtype" + "github.com/pkg/errors" + "github.com/yezzey-gp/yproxy/pkg/ylogger" +) + +//go:generate mockgen -destination=../mock/mock_database_interractor.go -package mock +type DatabaseInterractor interface { + GetVirtualExpireIndexes(int) (map[string]bool, map[string]uint64, error) +} + +type DatabaseHandler struct { +} + +type DB struct { + name string + tablespace pgtype.OID + oid pgtype.OID +} + +type Ei struct { + reloid pgtype.OID + relfileoid pgtype.OID + expireLsn string + fqnmd5 string +} + +func (database *DatabaseHandler) GetVirtualExpireIndexes(port int) (map[string]bool, map[string]uint64, error) { //TODO несколько баз + db, err := getDatabase(port) + if err != nil { + return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix + } + ylogger.Zero.Debug().Str("database name", db.name).Msg("recieved database") + conn, err := connectToDatabase(port, db.name) + if err != nil { + return nil, nil, err + } + defer conn.Close() //error + ylogger.Zero.Debug().Msg("connected to database") + + rows, err := conn.Query(`SELECT reloid, relfileoid, expire_lsn, fqnmd5 FROM yezzey.yezzey_expire_index WHERE expire_lsn != '0/0';`) + if err != nil { + return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix + } + defer rows.Close() + ylogger.Zero.Debug().Msg("executed select") + + c := make(map[string]uint64, 0) + for rows.Next() { + row := Ei{} + if err := rows.Scan(&row.reloid, &row.relfileoid, &row.expireLsn, &row.fqnmd5); err != nil { + return nil, nil, fmt.Errorf("unable to parse query output %v", err) + } + + lsn, err := pgx.ParseLSN(row.expireLsn) + if err != nil { + return nil, nil, fmt.Errorf("unable to parse query output %v", err) + } + + ylogger.Zero.Debug().Str("file", fmt.Sprintf("%d_%d_%s_%d_", db.tablespace, db.oid, row.fqnmd5, row.relfileoid)).Msg("added file to ei") + c[fmt.Sprintf("%d_%d_%s_%d_", db.tablespace, db.oid, row.fqnmd5, row.relfileoid)] = lsn + } + ylogger.Zero.Debug().Msg("read 1") + + rows2, err := conn.Query(`SELECT x_path FROM yezzey.yezzey_virtual_index;`) + if err != nil { + return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix + } + defer rows2.Close() + ylogger.Zero.Debug().Msg("read 2") + + c2 := make(map[string]bool, 0) + for rows2.Next() { + xpath := "" + if err := rows2.Scan(&xpath); err != nil { + return nil, nil, fmt.Errorf("unable to parse query output %v", err) + } + p1 := strings.Split(xpath, "/") + p2 := p1[len(p1)-1] + p3 := strings.Split(p2, "_") + if len(p3) >= 4 { + p2 = fmt.Sprintf("%s_%s_%s_%s_", p3[0], p3[1], p3[2], p3[3]) + } + c2[p2] = true + ylogger.Zero.Debug().Str("file", p2).Msg("added") + } + ylogger.Zero.Debug().Msg("read 3") + + return c2, c, err +} + +func getDatabase(port int) (DB, error) { + conn, err := connectToDatabase(port, "postgres") + if err != nil { + return DB{}, err + } + defer conn.Close() //error + ylogger.Zero.Debug().Msg("connected to db") + rows, err := conn.Query(`SELECT dattablespace, oid, datname FROM pg_database WHERE datallowconn;`) + if err != nil { + return DB{}, err + } + defer rows.Close() + ylogger.Zero.Debug().Msg("recieved db list") + + for rows.Next() { + row := DB{} + ylogger.Zero.Debug().Msg("cycle 1") + if err := rows.Scan(&row.tablespace, &row.oid, &row.name); err != nil { + return DB{}, err + } + ylogger.Zero.Debug().Msg("cycle 2") + ylogger.Zero.Debug().Str("db", row.name).Int("db", int(row.oid)).Int("db", int(row.tablespace)).Msg("database") + if row.name == "postgres" { + continue + } + + ylogger.Zero.Debug().Str("db", row.name).Msg("check database") + connDb, err := connectToDatabase(port, row.name) + if err != nil { + return DB{}, err + } + defer connDb.Close() //error + ylogger.Zero.Debug().Msg("cycle 3") + + rowsdb, err := connDb.Query(`SELECT exists(SELECT * FROM information_schema.schemata WHERE schema_name='yezzey');`) + if err != nil { + return DB{}, err + } + defer rowsdb.Close() + ylogger.Zero.Debug().Msg("cycle 4") + var ans bool + rowsdb.Next() + err = rowsdb.Scan(&ans) + if err != nil { + ylogger.Zero.Error().AnErr("error", err).Msg("error during yezzey check") + return DB{}, err + } + ylogger.Zero.Debug().Bool("result", ans).Msg("find yezzey schema") + if ans { + ylogger.Zero.Debug().Str("db", row.name).Msg("found yezzey schema in database") + ylogger.Zero.Debug().Int("db", int(row.oid)).Int("db", int(row.tablespace)).Msg("found yezzey schema in database") + return row, nil + } + + ylogger.Zero.Debug().Str("db", row.name).Msg("no yezzey schema in database") + } + return DB{}, fmt.Errorf("no yezzey schema across databases") +} + +func connectToDatabase(port int, database string) (*pgx.Conn, error) { + config, err := pgx.ParseEnvLibpq() + if err != nil { + return nil, errors.Wrap(err, "Connect: unable to read environment variables") + } + + config.Port = uint16(port) + config.Database = database + + config.RuntimeParams["gp_role"] = "utility" + conn, err := pgx.Connect(config) + if err != nil { + config.RuntimeParams["gp_session_role"] = "utility" + conn, err = pgx.Connect(config) + if err != nil { + fmt.Printf("error in connection %v", err) // delete this + return nil, err + } + } + return conn, nil +} diff --git a/pkg/mock/backups.go b/pkg/mock/backups.go new file mode 100644 index 0000000..530bb1b --- /dev/null +++ b/pkg/mock/backups.go @@ -0,0 +1,49 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: pkg/backups/backups.go + +// Package mock is a generated GoMock package. +package mock + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockBackupInterractor is a mock of BackupInterractor interface. +type MockBackupInterractor struct { + ctrl *gomock.Controller + recorder *MockBackupInterractorMockRecorder +} + +// MockBackupInterractorMockRecorder is the mock recorder for MockBackupInterractor. +type MockBackupInterractorMockRecorder struct { + mock *MockBackupInterractor +} + +// NewMockBackupInterractor creates a new mock instance. +func NewMockBackupInterractor(ctrl *gomock.Controller) *MockBackupInterractor { + mock := &MockBackupInterractor{ctrl: ctrl} + mock.recorder = &MockBackupInterractorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockBackupInterractor) EXPECT() *MockBackupInterractorMockRecorder { + return m.recorder +} + +// GetFirstLSN mocks base method. +func (m *MockBackupInterractor) GetFirstLSN(arg0 int) (uint64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetFirstLSN", arg0) + ret0, _ := ret[0].(uint64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetFirstLSN indicates an expected call of GetFirstLSN. +func (mr *MockBackupInterractorMockRecorder) GetFirstLSN(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetFirstLSN", reflect.TypeOf((*MockBackupInterractor)(nil).GetFirstLSN), arg0) +} diff --git a/pkg/mock/database.go b/pkg/mock/database.go new file mode 100644 index 0000000..d04c573 --- /dev/null +++ b/pkg/mock/database.go @@ -0,0 +1,50 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: pkg/database/database.go + +// Package mock is a generated GoMock package. +package mock + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockDatabaseInterractor is a mock of DatabaseInterractor interface. +type MockDatabaseInterractor struct { + ctrl *gomock.Controller + recorder *MockDatabaseInterractorMockRecorder +} + +// MockDatabaseInterractorMockRecorder is the mock recorder for MockDatabaseInterractor. +type MockDatabaseInterractorMockRecorder struct { + mock *MockDatabaseInterractor +} + +// NewMockDatabaseInterractor creates a new mock instance. +func NewMockDatabaseInterractor(ctrl *gomock.Controller) *MockDatabaseInterractor { + mock := &MockDatabaseInterractor{ctrl: ctrl} + mock.recorder = &MockDatabaseInterractorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockDatabaseInterractor) EXPECT() *MockDatabaseInterractorMockRecorder { + return m.recorder +} + +// GetVirtualExpireIndexes mocks base method. +func (m *MockDatabaseInterractor) GetVirtualExpireIndexes(arg0 int) (map[string]bool, map[string]uint64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetVirtualExpireIndexes", arg0) + ret0, _ := ret[0].(map[string]bool) + ret1, _ := ret[1].(map[string]uint64) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// GetVirtualExpireIndexes indicates an expected call of GetVirtualExpireIndexes. +func (mr *MockDatabaseInterractorMockRecorder) GetVirtualExpireIndexes(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetVirtualExpireIndexes", reflect.TypeOf((*MockDatabaseInterractor)(nil).GetVirtualExpireIndexes), arg0) +} diff --git a/pkg/mock/storage.go b/pkg/mock/storage.go new file mode 100644 index 0000000..e0bc880 --- /dev/null +++ b/pkg/mock/storage.go @@ -0,0 +1,315 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: pkg/storage/storage_interractor.go + +// Package mock is a generated GoMock package. +package mock + +import ( + io "io" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + storage "github.com/yezzey-gp/yproxy/pkg/storage" +) + +// MockStorageInteractor is a mock of StorageInteractor interface. +type MockStorageInteractor struct { + ctrl *gomock.Controller + recorder *MockStorageInteractorMockRecorder +} + +// MockStorageInteractorMockRecorder is the mock recorder for MockStorageInteractor. +type MockStorageInteractorMockRecorder struct { + mock *MockStorageInteractor +} + +// NewMockStorageInteractor creates a new mock instance. +func NewMockStorageInteractor(ctrl *gomock.Controller) *MockStorageInteractor { + mock := &MockStorageInteractor{ctrl: ctrl} + mock.recorder = &MockStorageInteractorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockStorageInteractor) EXPECT() *MockStorageInteractorMockRecorder { + return m.recorder +} + +// CatFileFromStorage mocks base method. +func (m *MockStorageInteractor) CatFileFromStorage(name string, offset int64) (io.ReadCloser, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CatFileFromStorage", name, offset) + ret0, _ := ret[0].(io.ReadCloser) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CatFileFromStorage indicates an expected call of CatFileFromStorage. +func (mr *MockStorageInteractorMockRecorder) CatFileFromStorage(name, offset interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CatFileFromStorage", reflect.TypeOf((*MockStorageInteractor)(nil).CatFileFromStorage), name, offset) +} + +// DeleteObject mocks base method. +func (m *MockStorageInteractor) DeleteObject(key string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteObject", key) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteObject indicates an expected call of DeleteObject. +func (mr *MockStorageInteractorMockRecorder) DeleteObject(key interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObject", reflect.TypeOf((*MockStorageInteractor)(nil).DeleteObject), key) +} + +// ListPath mocks base method. +func (m *MockStorageInteractor) ListPath(name string) ([]*storage.S3ObjectMeta, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListPath", name) + ret0, _ := ret[0].([]*storage.S3ObjectMeta) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListPath indicates an expected call of ListPath. +func (mr *MockStorageInteractorMockRecorder) ListPath(name interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListPath", reflect.TypeOf((*MockStorageInteractor)(nil).ListPath), name) +} + +// MoveObject mocks base method. +func (m *MockStorageInteractor) MoveObject(from, to string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MoveObject", from, to) + ret0, _ := ret[0].(error) + return ret0 +} + +// MoveObject indicates an expected call of MoveObject. +func (mr *MockStorageInteractorMockRecorder) MoveObject(from, to interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MoveObject", reflect.TypeOf((*MockStorageInteractor)(nil).MoveObject), from, to) +} + +// PatchFile mocks base method. +func (m *MockStorageInteractor) PatchFile(name string, r io.ReadSeeker, startOffste int64) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PatchFile", name, r, startOffste) + ret0, _ := ret[0].(error) + return ret0 +} + +// PatchFile indicates an expected call of PatchFile. +func (mr *MockStorageInteractorMockRecorder) PatchFile(name, r, startOffste interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PatchFile", reflect.TypeOf((*MockStorageInteractor)(nil).PatchFile), name, r, startOffste) +} + +// PutFileToDest mocks base method. +func (m *MockStorageInteractor) PutFileToDest(name string, r io.Reader) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PutFileToDest", name, r) + ret0, _ := ret[0].(error) + return ret0 +} + +// PutFileToDest indicates an expected call of PutFileToDest. +func (mr *MockStorageInteractorMockRecorder) PutFileToDest(name, r interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutFileToDest", reflect.TypeOf((*MockStorageInteractor)(nil).PutFileToDest), name, r) +} + +// MockStorageReader is a mock of StorageReader interface. +type MockStorageReader struct { + ctrl *gomock.Controller + recorder *MockStorageReaderMockRecorder +} + +// MockStorageReaderMockRecorder is the mock recorder for MockStorageReader. +type MockStorageReaderMockRecorder struct { + mock *MockStorageReader +} + +// NewMockStorageReader creates a new mock instance. +func NewMockStorageReader(ctrl *gomock.Controller) *MockStorageReader { + mock := &MockStorageReader{ctrl: ctrl} + mock.recorder = &MockStorageReaderMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockStorageReader) EXPECT() *MockStorageReaderMockRecorder { + return m.recorder +} + +// CatFileFromStorage mocks base method. +func (m *MockStorageReader) CatFileFromStorage(name string, offset int64) (io.ReadCloser, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CatFileFromStorage", name, offset) + ret0, _ := ret[0].(io.ReadCloser) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CatFileFromStorage indicates an expected call of CatFileFromStorage. +func (mr *MockStorageReaderMockRecorder) CatFileFromStorage(name, offset interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CatFileFromStorage", reflect.TypeOf((*MockStorageReader)(nil).CatFileFromStorage), name, offset) +} + +// ListPath mocks base method. +func (m *MockStorageReader) ListPath(name string) ([]*storage.S3ObjectMeta, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListPath", name) + ret0, _ := ret[0].([]*storage.S3ObjectMeta) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListPath indicates an expected call of ListPath. +func (mr *MockStorageReaderMockRecorder) ListPath(name interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListPath", reflect.TypeOf((*MockStorageReader)(nil).ListPath), name) +} + +// MockStorageWriter is a mock of StorageWriter interface. +type MockStorageWriter struct { + ctrl *gomock.Controller + recorder *MockStorageWriterMockRecorder +} + +// MockStorageWriterMockRecorder is the mock recorder for MockStorageWriter. +type MockStorageWriterMockRecorder struct { + mock *MockStorageWriter +} + +// NewMockStorageWriter creates a new mock instance. +func NewMockStorageWriter(ctrl *gomock.Controller) *MockStorageWriter { + mock := &MockStorageWriter{ctrl: ctrl} + mock.recorder = &MockStorageWriterMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockStorageWriter) EXPECT() *MockStorageWriterMockRecorder { + return m.recorder +} + +// PatchFile mocks base method. +func (m *MockStorageWriter) PatchFile(name string, r io.ReadSeeker, startOffste int64) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PatchFile", name, r, startOffste) + ret0, _ := ret[0].(error) + return ret0 +} + +// PatchFile indicates an expected call of PatchFile. +func (mr *MockStorageWriterMockRecorder) PatchFile(name, r, startOffste interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PatchFile", reflect.TypeOf((*MockStorageWriter)(nil).PatchFile), name, r, startOffste) +} + +// PutFileToDest mocks base method. +func (m *MockStorageWriter) PutFileToDest(name string, r io.Reader) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PutFileToDest", name, r) + ret0, _ := ret[0].(error) + return ret0 +} + +// PutFileToDest indicates an expected call of PutFileToDest. +func (mr *MockStorageWriterMockRecorder) PutFileToDest(name, r interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutFileToDest", reflect.TypeOf((*MockStorageWriter)(nil).PutFileToDest), name, r) +} + +// MockStorageLister is a mock of StorageLister interface. +type MockStorageLister struct { + ctrl *gomock.Controller + recorder *MockStorageListerMockRecorder +} + +// MockStorageListerMockRecorder is the mock recorder for MockStorageLister. +type MockStorageListerMockRecorder struct { + mock *MockStorageLister +} + +// NewMockStorageLister creates a new mock instance. +func NewMockStorageLister(ctrl *gomock.Controller) *MockStorageLister { + mock := &MockStorageLister{ctrl: ctrl} + mock.recorder = &MockStorageListerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockStorageLister) EXPECT() *MockStorageListerMockRecorder { + return m.recorder +} + +// ListPath mocks base method. +func (m *MockStorageLister) ListPath(prefix string) ([]*storage.S3ObjectMeta, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListPath", prefix) + ret0, _ := ret[0].([]*storage.S3ObjectMeta) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListPath indicates an expected call of ListPath. +func (mr *MockStorageListerMockRecorder) ListPath(prefix interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListPath", reflect.TypeOf((*MockStorageLister)(nil).ListPath), prefix) +} + +// MockStorageMover is a mock of StorageMover interface. +type MockStorageMover struct { + ctrl *gomock.Controller + recorder *MockStorageMoverMockRecorder +} + +// MockStorageMoverMockRecorder is the mock recorder for MockStorageMover. +type MockStorageMoverMockRecorder struct { + mock *MockStorageMover +} + +// NewMockStorageMover creates a new mock instance. +func NewMockStorageMover(ctrl *gomock.Controller) *MockStorageMover { + mock := &MockStorageMover{ctrl: ctrl} + mock.recorder = &MockStorageMoverMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockStorageMover) EXPECT() *MockStorageMoverMockRecorder { + return m.recorder +} + +// DeleteObject mocks base method. +func (m *MockStorageMover) DeleteObject(key string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteObject", key) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteObject indicates an expected call of DeleteObject. +func (mr *MockStorageMoverMockRecorder) DeleteObject(key interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObject", reflect.TypeOf((*MockStorageMover)(nil).DeleteObject), key) +} + +// MoveObject mocks base method. +func (m *MockStorageMover) MoveObject(from, to string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MoveObject", from, to) + ret0, _ := ret[0].(error) + return ret0 +} + +// MoveObject indicates an expected call of MoveObject. +func (mr *MockStorageMoverMockRecorder) MoveObject(from, to interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MoveObject", reflect.TypeOf((*MockStorageMover)(nil).MoveObject), from, to) +} diff --git a/pkg/proc/delete_handler.go b/pkg/proc/delete_handler.go new file mode 100644 index 0000000..cb0174e --- /dev/null +++ b/pkg/proc/delete_handler.go @@ -0,0 +1,123 @@ +package proc + +import ( + "fmt" + "strings" + + "github.com/pkg/errors" + "github.com/yezzey-gp/yproxy/pkg/backups" + "github.com/yezzey-gp/yproxy/pkg/database" + "github.com/yezzey-gp/yproxy/pkg/message" + "github.com/yezzey-gp/yproxy/pkg/storage" + "github.com/yezzey-gp/yproxy/pkg/ylogger" +) + +//go:generate mockgen -destination=../../../test/mocks/mock_object.go -package mocks -build_flags -mod=readonly github.com/wal-g/wal-g/pkg/storages/storage Object +type DeleteHandler interface { + HandleDeleteGarbage(message.DeleteMessage) error + HandleDeleteFile(message.DeleteMessage) error +} + +type BasicDeleteHandler struct { + BackupInterractor backups.BackupInterractor + DbInterractor database.DatabaseInterractor + StorageInterractor storage.StorageInteractor +} + +func (dh *BasicDeleteHandler) HandleDeleteGarbage(msg message.DeleteMessage) error { + fileList, err := dh.ListGarbageFiles(msg) + if err != nil { + return errors.Wrap(err, "failed to delete file") + } + + if !msg.Confirm { //do not delete files if no confirmation flag provided + return nil + } + + var failed []string + retryCount := 0 + for len(fileList) > 0 && retryCount < 10 { + retryCount++ + for i := 0; i < len(fileList); i++ { + filePathParts := strings.Split(fileList[i], "/") + err = dh.StorageInterractor.MoveObject(fileList[i], fmt.Sprintf("segments_005/seg%d/basebackups_005/yezzey/trash/%s", msg.Segnum, filePathParts[len(filePathParts)-1])) + if err != nil { + ylogger.Zero.Warn().AnErr("err", err).Str("file", fileList[i]).Msg("failed to move file") + failed = append(failed, fileList[i]) + } + } + fileList = failed + failed = make([]string, 0) + } + + if len(fileList) > 0 { + ylogger.Zero.Error().Int("failed files count", len(fileList)).Msg("some files were not moved") + ylogger.Zero.Error().Any("failed files", fileList).Msg("failed to move some files") + return errors.Wrap(err, "failed to move some files") + } + + return nil +} + +func (dh *BasicDeleteHandler) HandleDeleteFile(msg message.DeleteMessage) error { + err := dh.StorageInterractor.DeleteObject(msg.Name) + if err != nil { + ylogger.Zero.Error().AnErr("err", err).Msg("failed to delete file") + return errors.Wrap(err, "failed to delete file") + } + return nil +} + +func (dh *BasicDeleteHandler) ListGarbageFiles(msg message.DeleteMessage) ([]string, error) { + //get firsr backup lsn + firstBackupLSN, err := dh.BackupInterractor.GetFirstLSN(msg.Segnum) + if err != nil { + ylogger.Zero.Error().AnErr("err", err).Msg("failed to get first lsn") //return or just assume there are no backups? + } + ylogger.Zero.Info().Uint64("lsn", firstBackupLSN).Msg("first backup LSN") + + //list files in storage + ylogger.Zero.Info().Str("path", msg.Name).Msg("going to list path") + objectMetas, err := dh.StorageInterractor.ListPath(msg.Name) + if err != nil { + return nil, errors.Wrap(err, "could not list objects") + } + ylogger.Zero.Info().Int("amount", len(objectMetas)).Msg("objects count") + + vi, ei, err := dh.DbInterractor.GetVirtualExpireIndexes(msg.Port) + if err != nil { + ylogger.Zero.Error().AnErr("err", err).Msg("failed to get indexes") + return nil, errors.Wrap(err, "could not get virtual and expire indexes") + } + ylogger.Zero.Info().Msg("recieved virtual index and expire index") + ylogger.Zero.Debug().Int("virtual", len(vi)).Msg("vi count") + ylogger.Zero.Debug().Int("expire", len(ei)).Msg("ei count") + + filesToDelete := make([]string, 0) + for i := 0; i < len(objectMetas); i++ { + reworkedName := ReworkFileName(objectMetas[i].Path) + lsn, ok := ei[reworkedName] + ylogger.Zero.Debug().Uint64("lsn", lsn).Uint64("backup lsn", firstBackupLSN).Msg("comparing lsn") + if !vi[reworkedName] && (lsn < firstBackupLSN || !ok) { + ylogger.Zero.Debug().Str("file", objectMetas[i].Path). + Bool("file in expire index", ok). + Bool("lsn is less than in first backup", lsn < firstBackupLSN). + Msg("file will be deleted") + filesToDelete = append(filesToDelete, objectMetas[i].Path) + } + } + + ylogger.Zero.Info().Int("amount", len(filesToDelete)).Msg("files will be deleted") + + return filesToDelete, nil +} + +func ReworkFileName(str string) string { + p1 := strings.Split(str, "/") + p2 := p1[len(p1)-1] + p3 := strings.Split(p2, "_") + if len(p3) >= 4 { + p2 = fmt.Sprintf("%s_%s_%s_%s_", p3[0], p3[1], p3[2], p3[3]) + } + return p2 +} diff --git a/pkg/proc/delete_handler_test.go b/pkg/proc/delete_handler_test.go new file mode 100644 index 0000000..895faff --- /dev/null +++ b/pkg/proc/delete_handler_test.go @@ -0,0 +1,101 @@ +package proc_test + +import ( + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/yezzey-gp/yproxy/pkg/message" + mock "github.com/yezzey-gp/yproxy/pkg/mock" + "github.com/yezzey-gp/yproxy/pkg/proc" + "github.com/yezzey-gp/yproxy/pkg/storage" +) + +func TestReworkingName(t *testing.T) { + type TestCase struct { + input string + expected string + } + + testCases := []TestCase{ + { + input: "/segments_005/seg1/basebackups_005/yezzey/1663_16530_a4c5ad8305b83f07200b020694c36563_17660_1__DY_1_xlog_19649822496", + expected: "1663_16530_a4c5ad8305b83f07200b020694c36563_17660_", + }, + { + input: "1663_16530_a4c5ad8305b83f07200b020694c36563_17660_1__DY_1_xlog_19649822496", + expected: "1663_16530_a4c5ad8305b83f07200b020694c36563_17660_", + }, + { + input: "seg1/basebackups_005/yezzey/1663_16530_a4c5ad8305b83f07200b020694c36563_17660_1__DY_1_xlog_19649822496", + expected: "1663_16530_a4c5ad8305b83f07200b020694c36563_17660_", + }, + { + input: "1663_16530_a4c5ad8305b83f07200b020694c36563", + expected: "1663_16530_a4c5ad8305b83f07200b020694c36563", + }, + { + input: "1663___a4c5ad8305b83f07200b020694c36563___", + expected: "1663___a4c5ad8305b83f07200b020694c36563_", + }, + { + input: "file", + expected: "file", + }, + } + + for _, testCase := range testCases { + ans := proc.ReworkFileName(testCase.input) + assert.Equal(t, testCase.expected, ans) + } +} + +func TestFilesToDeletion(t *testing.T) { + ctrl := gomock.NewController(t) + + msg := message.DeleteMessage{ + Name: "path", + Port: 6000, + Segnum: 0, + Confirm: false, + } + + filesInStorage := []*storage.S3ObjectMeta{ + {Path: "1663_16530_not-deleted_18002_"}, + {Path: "1663_16530_deleted-after-backup_18002_"}, + {Path: "1663_16530_deleted-when-backup-start_18002_"}, + {Path: "1663_16530_deleted-before-backup_18002_"}, + {Path: "some_trash"}, + } + storage := mock.NewMockStorageInteractor(ctrl) + storage.EXPECT().ListPath(msg.Name).Return(filesInStorage, nil) + + backup := mock.NewMockBackupInterractor(ctrl) + backup.EXPECT().GetFirstLSN(msg.Segnum).Return(uint64(1337), nil) + + vi := map[string]bool{ + "1663_16530_not-deleted_18002_": true, + "1663_16530_deleted-after-backup_18002_": true, + "1663_16530_deleted-when-backup-start_18002_": true, + } + ei := map[string]uint64{ + "1663_16530_deleted-after-backup_18002_": uint64(1400), + "1663_16530_deleted-when-backup-start_18002_": uint64(1337), + "1663_16530_deleted-before-backup_18002_": uint64(1300), + } + database := mock.NewMockDatabaseInterractor(ctrl) + database.EXPECT().GetVirtualExpireIndexes(msg.Port).Return(vi, ei, nil) + + handler := proc.BasicDeleteHandler{ + StorageInterractor: storage, + DbInterractor: database, + BackupInterractor: backup, + } + + list, err := handler.ListGarbageFiles(msg) + + assert.NoError(t, err) + assert.Equal(t, 2, len(list)) + assert.Equal(t, "1663_16530_deleted-before-backup_18002_", list[0]) + assert.Equal(t, "some_trash", list[1]) +} diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index 1a69b84..f357f1a 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -1,20 +1,16 @@ package proc import ( - "bytes" - "encoding/json" "fmt" "io" - "os/exec" "strings" "sync" - "github.com/jackc/pgx" - "github.com/jackc/pgx/pgtype" - "github.com/pkg/errors" "github.com/yezzey-gp/yproxy/config" + "github.com/yezzey-gp/yproxy/pkg/backups" "github.com/yezzey-gp/yproxy/pkg/client" "github.com/yezzey-gp/yproxy/pkg/crypt" + "github.com/yezzey-gp/yproxy/pkg/database" "github.com/yezzey-gp/yproxy/pkg/message" "github.com/yezzey-gp/yproxy/pkg/storage" "github.com/yezzey-gp/yproxy/pkg/ylogger" @@ -329,75 +325,19 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl ycl.SetExternalFilePath(msg.Name) - //get firsr backup lsn - firstBackupLSN, err := getFirstLSN(msg.Segnum) - if err != nil { - ylogger.Zero.Error().AnErr("err", err).Msg("failed to get first lsn") //return or just assume there are no backups? - } - ylogger.Zero.Info().Uint64("lsn", firstBackupLSN).Msg("first backup LSN") + dbInterractor := &database.DatabaseHandler{} + backupHandler := &backups.WalgBackupInterractor{} - //list files in storage - ylogger.Zero.Info().Str("path", msg.Name).Msg("going to list path") - objectMetas, err := s.ListPath(msg.Name) - if err != nil { - _ = ycl.ReplyError(fmt.Errorf("could not list objects: %s", err), "failed to complete request") - return nil + var dh DeleteHandler + dh = &BasicDeleteHandler{ + StorageInterractor: s, + DbInterractor: dbInterractor, + BackupInterractor: backupHandler, } - ylogger.Zero.Info().Int("amount", len(objectMetas)).Msg("objects count") - vi, ei, err := getVirtualExpireIndexes(msg.Port) + err = dh.HandleDeleteGarbage(msg) if err != nil { - ylogger.Zero.Error().AnErr("err", err).Msg("failed to get indexes") - _ = ycl.ReplyError(fmt.Errorf("could not get virtual and expire indexes: %s", err), "failed to compelete request") - return nil - } - ylogger.Zero.Info().Msg("recieved virtual index and expire index") - ylogger.Zero.Debug().Int("virtual", len(vi)).Msg("vi count") - ylogger.Zero.Debug().Int("expire", len(ei)).Msg("ei count") - - var failed []*storage.S3ObjectMeta - retryCount := 0 - deletedFilesCount := 0 - for len(objectMetas) > 0 && retryCount < 10 { - retryCount++ - for i := 0; i < len(objectMetas); i++ { - p1 := strings.Split(objectMetas[i].Path, "/") //TODO fix this - p2 := p1[len(p1)-1] - p3 := strings.Split(p2, "_") - if len(p3) >= 4 { - p2 = fmt.Sprintf("%s_%s_%s_%s_", p3[0], p3[1], p3[2], p3[3]) - } - lsn, ok := ei[p2] - ylogger.Zero.Debug().Uint64("lsn", lsn).Uint64("backup lsn", firstBackupLSN).Msg("comparing lsn") - if !vi[p2] && (lsn < firstBackupLSN || !ok) { - ylogger.Zero.Debug().Str("file", objectMetas[i].Path). - Bool("file in expire indexi", ok). - Bool("lsn is less than in first backup", lsn < firstBackupLSN). - Msg("file will be deleted") - deletedFilesCount++ - if !msg.Confirm { //do not delete files if no confirmation flag provided - continue - } - filePathParts := strings.Split(objectMetas[i].Path, "/") - err = s.MoveObject(objectMetas[i].Path, fmt.Sprintf("segments_005/seg%d/basebackups_005/yezzey/trash/%s", msg.Segnum, filePathParts[len(filePathParts)-1])) - if err != nil { - deletedFilesCount-- - ylogger.Zero.Warn().AnErr("err", err).Str("file", objectMetas[i].Path).Msg("failed to move file") - failed = append(failed, objectMetas[i]) - } - } - } - objectMetas = failed - failed = make([]*storage.S3ObjectMeta, 0) - } - - ylogger.Zero.Info().Int("amount", deletedFilesCount).Msg("deleted files count") - - if len(objectMetas) > 0 { - ylogger.Zero.Error().Int("failed files count", len(objectMetas)).Msg("some files were not moved") - ylogger.Zero.Error().Any("failed files", objectMetas).Msg("failed to move some files") - - _ = ycl.ReplyError(err, "failed to move some files") + _ = ycl.ReplyError(err, "failed to finish operation") return nil } @@ -419,210 +359,3 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl return nil } - -// get lsn of the oldest backup -func getFirstLSN(seg int) (uint64, error) { - cmd := exec.Command("/usr/bin/wal-g", "st", "ls", fmt.Sprintf("segments_005/seg%d/basebackups_005/", seg), "--config=/etc/wal-g/wal-g.yaml") - ylogger.Zero.Debug().Any("flags", cmd.Args).Msg("Command args") - var out bytes.Buffer - cmd.Stdout = &out - - err := cmd.Run() - if err != nil { - ylogger.Zero.Debug().AnErr("error", err).Msg("Failed to run st ls") - return 0, err - } - p1 := strings.Split(out.String(), "\n") - - minLSN := BackupLSN{Lsn: ^uint64(0)} - for _, line := range p1 { - if !strings.Contains(line, ".json") { - continue - } - p2 := strings.Split(line, " ") - p3 := p2[len(p2)-1] - - ylogger.Zero.Debug().Str("file: %s", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, p3)).Msg("check lsn in file") - cmd2 := exec.Command("/usr/bin/wal-g", "st", "cat", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, p3), "--config=/etc/wal-g/wal-g.yaml") - - var out2 bytes.Buffer - cmd2.Stdout = &out2 - - err = cmd2.Run() - if err != nil { - ylogger.Zero.Debug().AnErr("error", err).Msg("Failed to run st cat") - return 0, err - } - lsn := BackupLSN{} - err = json.Unmarshal(out2.Bytes(), &lsn) - - if lsn.Lsn < minLSN.Lsn { - minLSN.Lsn = lsn.Lsn - } - - } - - return minLSN.Lsn, err -} - -func connectToDatabase(port int, database string) (*pgx.Conn, error) { - config, err := pgx.ParseEnvLibpq() - if err != nil { - return nil, errors.Wrap(err, "Connect: unable to read environment variables") - } - - config.Port = uint16(port) - config.Database = database - - config.RuntimeParams["gp_role"] = "utility" - conn, err := pgx.Connect(config) - if err != nil { - config.RuntimeParams["gp_session_role"] = "utility" - conn, err = pgx.Connect(config) - if err != nil { - fmt.Printf("error in connection %v", err) // delete this - return nil, err - } - } - return conn, nil -} - -func getVirtualExpireIndexes(port int) (map[string]bool, map[string]uint64, error) { //TODO несколько баз - db, err := getDatabase(port) - if err != nil { - return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix - } - ylogger.Zero.Debug().Str("database name", db.name).Msg("recieved database") - conn, err := connectToDatabase(port, db.name) - if err != nil { - return nil, nil, err - } - defer conn.Close() //error - ylogger.Zero.Debug().Msg("connected to database") - - rows, err := conn.Query(`SELECT reloid, relfileoid, expire_lsn, fqnmd5 FROM yezzey.yezzey_expire_index WHERE expire_lsn != '0/0';`) - if err != nil { - return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix - } - defer rows.Close() - ylogger.Zero.Debug().Msg("executed select") - - c := make(map[string]uint64, 0) - for rows.Next() { - row := Ei{} - if err := rows.Scan(&row.reloid, &row.relfileoid, &row.expireLsn, &row.fqnmd5); err != nil { - return nil, nil, fmt.Errorf("unable to parse query output %v", err) - } - - lsn, err := pgx.ParseLSN(row.expireLsn) - if err != nil { - return nil, nil, fmt.Errorf("unable to parse query output %v", err) - } - - ylogger.Zero.Debug().Str("file", fmt.Sprintf("%d_%d_%s_%d_", db.tablespace, db.oid, row.fqnmd5, row.relfileoid)).Msg("added file to ei") - c[fmt.Sprintf("%d_%d_%s_%d_", db.tablespace, db.oid, row.fqnmd5, row.relfileoid)] = lsn - } - ylogger.Zero.Debug().Msg("read 1") - - rows2, err := conn.Query(`SELECT x_path FROM yezzey.yezzey_virtual_index;`) - if err != nil { - return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix - } - defer rows2.Close() - ylogger.Zero.Debug().Msg("read 2") - - c2 := make(map[string]bool, 0) - for rows2.Next() { - xpath := "" - if err := rows2.Scan(&xpath); err != nil { - return nil, nil, fmt.Errorf("unable to parse query output %v", err) - } - p1 := strings.Split(xpath, "/") - p2 := p1[len(p1)-1] - p3 := strings.Split(p2, "_") - if len(p3) >= 4 { - p2 = fmt.Sprintf("%s_%s_%s_%s_", p3[0], p3[1], p3[2], p3[3]) - } - c2[p2] = true - ylogger.Zero.Debug().Str("file", p2).Msg("added") - } - ylogger.Zero.Debug().Msg("read 3") - - return c2, c, err -} - -func getDatabase(port int) (DB, error) { - conn, err := connectToDatabase(port, "postgres") - if err != nil { - return DB{}, err - } - defer conn.Close() //error - ylogger.Zero.Debug().Msg("connected to db") - rows, err := conn.Query(`SELECT dattablespace, oid, datname FROM pg_database WHERE datallowconn;`) - if err != nil { - return DB{}, err - } - defer rows.Close() - ylogger.Zero.Debug().Msg("recieved db list") - - for rows.Next() { - row := DB{} - ylogger.Zero.Debug().Msg("cycle 1") - if err := rows.Scan(&row.tablespace, &row.oid, &row.name); err != nil { - return DB{}, err - } - ylogger.Zero.Debug().Msg("cycle 2") - ylogger.Zero.Debug().Str("db", row.name).Int("db", int(row.oid)).Int("db", int(row.tablespace)).Msg("database") - if row.name == "postgres" { - continue - } - - ylogger.Zero.Debug().Str("db", row.name).Msg("check database") - connDb, err := connectToDatabase(port, row.name) - if err != nil { - return DB{}, err - } - defer connDb.Close() //error - ylogger.Zero.Debug().Msg("cycle 3") - - rowsdb, err := connDb.Query(`SELECT exists(SELECT * FROM information_schema.schemata WHERE schema_name='yezzey');`) - if err != nil { - return DB{}, err - } - defer rowsdb.Close() - ylogger.Zero.Debug().Msg("cycle 4") - var ans bool - rowsdb.Next() - err = rowsdb.Scan(&ans) - if err != nil { - ylogger.Zero.Error().AnErr("error", err).Msg("error during yezzey check") - return DB{}, err - } - ylogger.Zero.Debug().Bool("result", ans).Msg("find yezzey schema") - if ans { - ylogger.Zero.Debug().Str("db", row.name).Msg("found yezzey schema in database") - ylogger.Zero.Debug().Int("db", int(row.oid)).Int("db", int(row.tablespace)).Msg("found yezzey schema in database") - return row, nil - } - - ylogger.Zero.Debug().Str("db", row.name).Msg("no yezzey schema in database") - } - return DB{}, fmt.Errorf("no yezzey schema across databases") -} - -type Ei struct { - reloid pgtype.OID - relfileoid pgtype.OID - expireLsn string - fqnmd5 string -} - -type DB struct { - name string - tablespace pgtype.OID - oid pgtype.OID -} - -type BackupLSN struct { - Lsn uint64 `json:"FinishLSN"` -} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index a4fed5a..5a6a1eb 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -6,7 +6,7 @@ import ( "io" "path" - "time" + "strings" "github.com/yezzey-gp/aws-sdk-go/aws" "github.com/yezzey-gp/aws-sdk-go/service/s3" @@ -53,6 +53,7 @@ func NewStorage(cnf *config.Storage) (StorageInteractor, error) { }, nil default: return nil, fmt.Errorf("wrong storage type " + cnf.StorageType) + } } @@ -129,9 +130,8 @@ func (s *S3StorageInteractor) PatchFile(name string, r io.ReadSeeker, startOffst } type S3ObjectMeta struct { - Path string - Size int64 - LastModified time.Time + Path string + Size int64 } func (s *S3StorageInteractor) ListPath(prefix string) ([]*S3ObjectMeta, error) { @@ -159,9 +159,8 @@ func (s *S3StorageInteractor) ListPath(prefix string) ([]*S3ObjectMeta, error) { for _, obj := range out.Contents { metas = append(metas, &S3ObjectMeta{ - Path: *obj.Key, - Size: *obj.Size, - LastModified: *obj.LastModified, + Path: *obj.Key, + Size: *obj.Size, }) } @@ -199,15 +198,36 @@ func (s *S3StorageInteractor) MoveObject(from string, to string) error { } ylogger.Zero.Debug().Str("", out.GoString()).Msg("copied object") + err = s.DeleteObject(fromPath) + if err != nil { + ylogger.Zero.Err(err).Msg("failed to delete old object") + } + ylogger.Zero.Debug().Msg("deleted object") + return err +} + +func (s *S3StorageInteractor) DeleteObject(key string) error { + sess, err := s.pool.GetSession(context.TODO()) + if err != nil { + ylogger.Zero.Err(err).Msg("failed to acquire s3 session") + return err + } + ylogger.Zero.Debug().Msg("aquired session") + + if !strings.HasPrefix(key, s.cnf.StoragePrefix) { + key = path.Join(s.cnf.StoragePrefix, key) + } + input2 := s3.DeleteObjectInput{ Bucket: &s.cnf.StorageBucket, - Key: aws.String(fromPath), + Key: aws.String(key), } _, err = sess.DeleteObject(&input2) if err != nil { ylogger.Zero.Err(err).Msg("failed to delete old object") + return err } ylogger.Zero.Debug().Msg("deleted object") - return err + return nil } diff --git a/pkg/storage/storage_interractor.go b/pkg/storage/storage_interractor.go new file mode 100644 index 0000000..c23603e --- /dev/null +++ b/pkg/storage/storage_interractor.go @@ -0,0 +1,30 @@ +package storage + +import "io" + +//go:generate mockgen -destination=pkg/mock/storage.go -package=mock +type StorageInteractor interface { + StorageReader + StorageWriter + StorageLister + StorageMover +} + +type StorageReader interface { + CatFileFromStorage(name string, offset int64) (io.ReadCloser, error) + ListPath(name string) ([]*S3ObjectMeta, error) +} + +type StorageWriter interface { + PutFileToDest(name string, r io.Reader) error + PatchFile(name string, r io.ReadSeeker, startOffste int64) error +} + +type StorageLister interface { + ListPath(prefix string) ([]*S3ObjectMeta, error) +} + +type StorageMover interface { + MoveObject(from string, to string) error + DeleteObject(key string) error +}