Skip to content

Commit

Permalink
Adjust vacuum API
Browse files Browse the repository at this point in the history
  • Loading branch information
reshke committed Oct 3, 2024
1 parent 5ddca9c commit dd2a90c
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 17 deletions.
2 changes: 2 additions & 0 deletions config/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ type Instance struct {

CryptoCnf Crypto `json:"crypto" toml:"crypto" yaml:"crypto"`

VacuumCnf Vacuum `json:"vacuum" toml:"vacuum" yaml:"vacuum`

LogPath string `json:"log_path" toml:"log_path" yaml:"log_path"`
LogLevel string `json:"log_level" toml:"log_level" yaml:"log_level"`
SocketPath string `json:"socket_path" toml:"socket_path" yaml:"socket_path"`
Expand Down
5 changes: 5 additions & 0 deletions config/vacuum.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package config

type Vacuum struct {
CheckBackup bool `json:"check_backup,default=true" toml:"check_backup,default=true" yaml:"check_backup,default=true"`
}
2 changes: 1 addition & 1 deletion pkg/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (i *Instance) Run(instanceCnf *config.Instance) error {
defer clConn.Close()
ycl := client.NewYClient(clConn)
i.pool.Put(ycl)
if err := proc.ProcConn(s, cr, ycl); err != nil {
if err := proc.ProcConn(s, cr, ycl, &instanceCnf.VacuumCnf); err != nil {
ylogger.Zero.Debug().Uint("id", ycl.ID()).Err(err).Msg("got error serving client")
}
_, err := i.pool.Pop(ycl.ID())
Expand Down
17 changes: 12 additions & 5 deletions pkg/message/delete_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (
)

type DeleteMessage struct { //seg port
Name string
Port uint64
Segnum uint64
Confirm bool
Garbage bool
Name string
Port uint64
Segnum uint64
Confirm bool
Garbage bool
CrazyDrop bool
}

var _ ProtoMessage = &DeleteMessage{}
Expand Down Expand Up @@ -39,6 +40,9 @@ func (c *DeleteMessage) Encode() []byte {
if c.Garbage {
bt[2] = 1
}
if c.CrazyDrop {
bt[3] = 1
}

bt = append(bt, []byte(c.Name)...)
bt = append(bt, 0)
Expand All @@ -64,6 +68,9 @@ func (c *DeleteMessage) Decode(body []byte) {
if body[2] == 1 {
c.Garbage = true
}
if body[3] == 1 {
c.CrazyDrop = true
}
c.Name = c.GetDeleteName(body[4:])
c.Port = binary.BigEndian.Uint64(body[len(body)-16 : len(body)-8])
c.Segnum = binary.BigEndian.Uint64(body[len(body)-8:])
Expand Down
52 changes: 43 additions & 9 deletions pkg/proc/delete_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package proc

import (
"fmt"
"path"
"strings"

"github.com/pkg/errors"
"github.com/yezzey-gp/yproxy/config"
"github.com/yezzey-gp/yproxy/pkg/backups"
"github.com/yezzey-gp/yproxy/pkg/database"
"github.com/yezzey-gp/yproxy/pkg/message"
Expand All @@ -22,6 +24,8 @@ type BasicDeleteHandler struct {
BackupInterractor backups.BackupInterractor
DbInterractor database.DatabaseInterractor
StorageInterractor storage.StorageInteractor

cnf *config.Vacuum
}

func (dh *BasicDeleteHandler) HandleDeleteGarbage(msg message.DeleteMessage) error {
Expand All @@ -39,10 +43,26 @@ func (dh *BasicDeleteHandler) HandleDeleteGarbage(msg message.DeleteMessage) err
for len(fileList) > 0 && retryCount < 10 {
retryCount++
for i := 0; i < len(fileList); i++ {
filePathParts := strings.Split(fileList[i], "/")
err = dh.StorageInterractor.MoveObject(fileList[i], fmt.Sprintf("segments_005/seg%d/basebackups_005/yezzey/trash/%s", msg.Segnum, filePathParts[len(filePathParts)-1]))

if msg.CrazyDrop {
ylogger.Zero.Debug().Str("path", fileList[i]).Msg("simply delete without any 'plan B'")
err = dh.StorageInterractor.DeleteObject(fileList[i])

} else {

filePathParts := strings.Split(fileList[i], "/")

destPath := path.Join(
"segments_005",
fmt.Sprintf("seg%d", msg.Segnum),
"basebackups_005",
"yezzey",
"trash", filePathParts[len(filePathParts)-1])

err = dh.StorageInterractor.MoveObject(fileList[i], destPath)
}
if err != nil {
ylogger.Zero.Warn().AnErr("err", err).Str("file", fileList[i]).Msg("failed to move file")
ylogger.Zero.Warn().AnErr("err", err).Str("file", fileList[i]).Msg("failed to obsolete file")
failed = append(failed, fileList[i])
}
}
Expand Down Expand Up @@ -70,11 +90,19 @@ func (dh *BasicDeleteHandler) HandleDeleteFile(msg message.DeleteMessage) error

func (dh *BasicDeleteHandler) ListGarbageFiles(msg message.DeleteMessage) ([]string, error) {
//get firsr backup lsn
firstBackupLSN, err := dh.BackupInterractor.GetFirstLSN(msg.Segnum)
if err != nil {
ylogger.Zero.Error().AnErr("err", err).Msg("failed to get first lsn") //return or just assume there are no backups?
var firstBackupLSN uint64
var err error

if dh.cnf.CheckBackup {
firstBackupLSN, err = dh.BackupInterractor.GetFirstLSN(msg.Segnum)
if err != nil {
ylogger.Zero.Error().AnErr("err", err).Msg("failed to get first lsn") //return or just assume there are no backups?
}
ylogger.Zero.Info().Uint64("lsn", firstBackupLSN).Msg("first backup LSN")
} else {
firstBackupLSN = ^uint64(0)
ylogger.Zero.Info().Uint64("lsn", firstBackupLSN).Msg("omit first backup LSN")
}
ylogger.Zero.Info().Uint64("lsn", firstBackupLSN).Msg("first backup LSN")

//list files in storage
ylogger.Zero.Info().Str("path", msg.Name).Msg("going to list path")
Expand All @@ -96,9 +124,15 @@ func (dh *BasicDeleteHandler) ListGarbageFiles(msg message.DeleteMessage) ([]str
filesToDelete := make([]string, 0)
for i := 0; i < len(objectMetas); i++ {
reworkedName := ReworkFileName(objectMetas[i].Path)
ylogger.Zero.Debug().Str("reworked name", reworkedName).Msg("lookup chunk")

if vi[reworkedName] {
continue
}

lsn, ok := ei[reworkedName]
ylogger.Zero.Debug().Uint64("lsn", lsn).Uint64("backup lsn", firstBackupLSN).Msg("comparing lsn")
if !vi[reworkedName] && (lsn < firstBackupLSN || !ok) {
ylogger.Zero.Debug().Uint64("lsn", lsn).Uint64("backup lsn", firstBackupLSN).Str("path", objectMetas[i].Path).Msg("comparing lsn")
if lsn < firstBackupLSN || !ok {
ylogger.Zero.Debug().Str("file", objectMetas[i].Path).
Bool("file in expire index", ok).
Bool("lsn is less than in first backup", lsn < firstBackupLSN).
Expand Down
5 changes: 3 additions & 2 deletions pkg/proc/interaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func ProcessPutExtended(
return nil
}

func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyClient) error {
func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyClient, cnf *config.Vacuum) error {

defer func() {
_ = ycl.Close()
Expand Down Expand Up @@ -280,7 +280,7 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl
if err != nil {
return err
}
fmt.Printf("ok new conf: %v\n", instanceCnf)
ylogger.Zero.Info().Interface("cnf", instanceCnf).Msg("loaded new config")

//list objects
objectMetas, err := oldStorage.ListPath(msg.Name)
Expand Down Expand Up @@ -396,6 +396,7 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl
StorageInterractor: s,
DbInterractor: dbInterractor,
BackupInterractor: backupHandler,
cnf: cnf,
}

if msg.Garbage {
Expand Down

0 comments on commit dd2a90c

Please sign in to comment.