diff --git a/config/instance.go b/config/instance.go index 0e8b0e7..7164e50 100644 --- a/config/instance.go +++ b/config/instance.go @@ -17,6 +17,8 @@ type Instance struct { CryptoCnf Crypto `json:"crypto" toml:"crypto" yaml:"crypto"` + VacuumCnf Vacuum `json:"vacuum" toml:"vacuum" yaml:"vacuum"` + LogPath string `json:"log_path" toml:"log_path" yaml:"log_path"` LogLevel string `json:"log_level" toml:"log_level" yaml:"log_level"` SocketPath string `json:"socket_path" toml:"socket_path" yaml:"socket_path"` diff --git a/config/vacuum.go b/config/vacuum.go new file mode 100644 index 0000000..a66c6e5 --- /dev/null +++ b/config/vacuum.go @@ -0,0 +1,5 @@ +package config + +type Vacuum struct { + CheckBackup bool `json:"check_backup,default=true" toml:"check_backup,default=true" yaml:"check_backup,default=true"` +} diff --git a/pkg/core/core.go b/pkg/core/core.go index 9306b74..c0dbcf6 100644 --- a/pkg/core/core.go +++ b/pkg/core/core.go @@ -134,7 +134,7 @@ func (i *Instance) Run(instanceCnf *config.Instance) error { defer clConn.Close() ycl := client.NewYClient(clConn) i.pool.Put(ycl) - if err := proc.ProcConn(s, cr, ycl); err != nil { + if err := proc.ProcConn(s, cr, ycl, &instanceCnf.VacuumCnf); err != nil { ylogger.Zero.Debug().Uint("id", ycl.ID()).Err(err).Msg("got error serving client") } _, err := i.pool.Pop(ycl.ID()) diff --git a/pkg/message/delete_message.go b/pkg/message/delete_message.go index 596e9c7..79f8ee1 100644 --- a/pkg/message/delete_message.go +++ b/pkg/message/delete_message.go @@ -6,11 +6,12 @@ import ( ) type DeleteMessage struct { //seg port - Name string - Port uint64 - Segnum uint64 - Confirm bool - Garbage bool + Name string + Port uint64 + Segnum uint64 + Confirm bool + Garbage bool + CrazyDrop bool } var _ ProtoMessage = &DeleteMessage{} @@ -39,6 +40,9 @@ func (c *DeleteMessage) Encode() []byte { if c.Garbage { bt[2] = 1 } + if c.CrazyDrop { + bt[3] = 1 + } bt = append(bt, []byte(c.Name)...) bt = append(bt, 0) @@ -64,6 +68,9 @@ func (c *DeleteMessage) Decode(body []byte) { if body[2] == 1 { c.Garbage = true } + if body[3] == 1 { + c.CrazyDrop = true + } c.Name = c.GetDeleteName(body[4:]) c.Port = binary.BigEndian.Uint64(body[len(body)-16 : len(body)-8]) c.Segnum = binary.BigEndian.Uint64(body[len(body)-8:]) diff --git a/pkg/mock/backups.go b/pkg/mock/backups.go index 6172b3c..3ee263f 100644 --- a/pkg/mock/backups.go +++ b/pkg/mock/backups.go @@ -3,7 +3,7 @@ // // Generated by this command: // -// mockgen -destination=./pkg/mock/backups.go -source=pkg/backups/backups.go -package mock +// mockgen -source=pkg/backups/backups.go -destination=pkg/mock/backups.go -package=mock // // Package mock is a generated GoMock package. diff --git a/pkg/mock/database.go b/pkg/mock/database.go index 93e768a..817e9f4 100644 --- a/pkg/mock/database.go +++ b/pkg/mock/database.go @@ -1,9 +1,9 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: ./pkg/database/database.go +// Source: pkg/database/database.go // // Generated by this command: // -// mockgen -destination=./pkg/mock/database.go -source=./pkg/database/database.go -package mock +// mockgen -source=pkg/database/database.go -destination=pkg/mock/database.go -package=mock // // Package mock is a generated GoMock package. diff --git a/pkg/proc/delete_handler.go b/pkg/proc/delete_handler.go index 312f7ca..5d01709 100644 --- a/pkg/proc/delete_handler.go +++ b/pkg/proc/delete_handler.go @@ -2,9 +2,11 @@ package proc import ( "fmt" + "path" "strings" "github.com/pkg/errors" + "github.com/yezzey-gp/yproxy/config" "github.com/yezzey-gp/yproxy/pkg/backups" "github.com/yezzey-gp/yproxy/pkg/database" "github.com/yezzey-gp/yproxy/pkg/message" @@ -22,6 +24,8 @@ type BasicDeleteHandler struct { BackupInterractor backups.BackupInterractor DbInterractor database.DatabaseInterractor StorageInterractor storage.StorageInteractor + + Cnf *config.Vacuum } func (dh *BasicDeleteHandler) HandleDeleteGarbage(msg message.DeleteMessage) error { @@ -39,10 +43,26 @@ func (dh *BasicDeleteHandler) HandleDeleteGarbage(msg message.DeleteMessage) err for len(fileList) > 0 && retryCount < 10 { retryCount++ for i := 0; i < len(fileList); i++ { - filePathParts := strings.Split(fileList[i], "/") - err = dh.StorageInterractor.MoveObject(fileList[i], fmt.Sprintf("segments_005/seg%d/basebackups_005/yezzey/trash/%s", msg.Segnum, filePathParts[len(filePathParts)-1])) + + if msg.CrazyDrop { + ylogger.Zero.Debug().Str("path", fileList[i]).Msg("simply delete without any 'plan B'") + err = dh.StorageInterractor.DeleteObject(fileList[i]) + + } else { + + filePathParts := strings.Split(fileList[i], "/") + + destPath := path.Join( + "trash", + "segments_005", + fmt.Sprintf("seg%d", msg.Segnum), + "basebackups_005", + "yezzey", filePathParts[len(filePathParts)-1]) + + err = dh.StorageInterractor.MoveObject(fileList[i], destPath) + } if err != nil { - ylogger.Zero.Warn().AnErr("err", err).Str("file", fileList[i]).Msg("failed to move file") + ylogger.Zero.Warn().AnErr("err", err).Str("file", fileList[i]).Msg("failed to obsolete file") failed = append(failed, fileList[i]) } } @@ -70,11 +90,19 @@ func (dh *BasicDeleteHandler) HandleDeleteFile(msg message.DeleteMessage) error func (dh *BasicDeleteHandler) ListGarbageFiles(msg message.DeleteMessage) ([]string, error) { //get firsr backup lsn - firstBackupLSN, err := dh.BackupInterractor.GetFirstLSN(msg.Segnum) - if err != nil { - ylogger.Zero.Error().AnErr("err", err).Msg("failed to get first lsn") //return or just assume there are no backups? + var firstBackupLSN uint64 + var err error + + if dh.Cnf.CheckBackup { + firstBackupLSN, err = dh.BackupInterractor.GetFirstLSN(msg.Segnum) + if err != nil { + ylogger.Zero.Error().AnErr("err", err).Msg("failed to get first lsn") //return or just assume there are no backups? + } + ylogger.Zero.Info().Uint64("lsn", firstBackupLSN).Msg("first backup LSN") + } else { + firstBackupLSN = ^uint64(0) + ylogger.Zero.Info().Uint64("lsn", firstBackupLSN).Msg("omit first backup LSN") } - ylogger.Zero.Info().Uint64("lsn", firstBackupLSN).Msg("first backup LSN") //list files in storage ylogger.Zero.Info().Str("path", msg.Name).Msg("going to list path") @@ -96,13 +124,19 @@ func (dh *BasicDeleteHandler) ListGarbageFiles(msg message.DeleteMessage) ([]str filesToDelete := make([]string, 0) for i := 0; i < len(objectMetas); i++ { reworkedName := ReworkFileName(objectMetas[i].Path) + ylogger.Zero.Debug().Str("reworked name", reworkedName).Msg("lookup chunk") + + if vi[reworkedName] { + continue + } + lsn, ok := ei[reworkedName] - ylogger.Zero.Debug().Uint64("lsn", lsn).Uint64("backup lsn", firstBackupLSN).Msg("comparing lsn") - if !vi[reworkedName] && (lsn < firstBackupLSN || !ok) { + ylogger.Zero.Debug().Uint64("lsn", lsn).Uint64("backup lsn", firstBackupLSN).Str("path", objectMetas[i].Path).Msg("comparing lsn") + if lsn < firstBackupLSN || !ok { ylogger.Zero.Debug().Str("file", objectMetas[i].Path). Bool("file in expire index", ok). Bool("lsn is less than in first backup", lsn < firstBackupLSN). - Msg("file does not persisnt in virtual index, not needed for PITR, so will be deleted") + Msg("file does not persisnt in virtual index, nor needed for PITR, so will be deleted") filesToDelete = append(filesToDelete, objectMetas[i].Path) } } diff --git a/pkg/proc/delete_handler_test.go b/pkg/proc/delete_handler_test.go index 9f44ede..5e535df 100644 --- a/pkg/proc/delete_handler_test.go +++ b/pkg/proc/delete_handler_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/yezzey-gp/yproxy/config" "github.com/yezzey-gp/yproxy/pkg/message" mock "github.com/yezzey-gp/yproxy/pkg/mock" "github.com/yezzey-gp/yproxy/pkg/object" @@ -90,6 +91,7 @@ func TestFilesToDeletion(t *testing.T) { StorageInterractor: storage, DbInterractor: database, BackupInterractor: backup, + Cnf: &config.Vacuum{CheckBackup: true}, } list, err := handler.ListGarbageFiles(msg) diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index 05d6164..3d24881 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -173,7 +173,7 @@ func ProcessPutExtended( return nil } -func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyClient) error { +func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyClient, cnf *config.Vacuum) error { defer func() { _ = ycl.Close() @@ -280,7 +280,7 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl if err != nil { return err } - fmt.Printf("ok new conf: %v\n", instanceCnf) + ylogger.Zero.Info().Interface("cnf", instanceCnf).Msg("loaded new config") //list objects objectMetas, err := oldStorage.ListPath(msg.Name) @@ -396,6 +396,7 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl StorageInterractor: s, DbInterractor: dbInterractor, BackupInterractor: backupHandler, + Cnf: cnf, } if msg.Garbage {