Skip to content

Commit

Permalink
working delete garbadge
Browse files Browse the repository at this point in the history
  • Loading branch information
debebantur committed Aug 6, 2024
1 parent 6746827 commit fe241f0
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 25 deletions.
4 changes: 3 additions & 1 deletion cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ var encrypt bool
var offset uint64
var segmentPort int
var segmentNum int
var confirm bool

// TODOV
func Runner(f func(net.Conn, *config.Instance, []string) error) func(*cobra.Command, []string) error {
Expand Down Expand Up @@ -233,7 +234,7 @@ var deleteCmd = &cobra.Command{

defer con.Close()
ylogger.Zero.Info().Str("name", args[0]).Msg("delete")
msg := message.NewDeleteMessage(args[0], segmentPort, segmentNum).Encode()
msg := message.NewDeleteMessage(args[0], segmentPort, segmentNum, confirm).Encode()
_, err = con.Write(msg)
if err != nil {
return err
Expand Down Expand Up @@ -292,6 +293,7 @@ func init() {

deleteCmd.PersistentFlags().IntVarP(&segmentPort, "port", "p", 6000, "port that segment is listening on")
deleteCmd.PersistentFlags().IntVarP(&segmentNum, "segnum", "s", 0, "number of the segment")
deleteCmd.PersistentFlags().BoolVarP(&confirm, "confirm", "", false, "confirm deletion")
rootCmd.AddCommand(deleteCmd)
}

Expand Down
23 changes: 16 additions & 7 deletions pkg/message/delete_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,20 @@ import (
)

type DeleteMessage struct { //seg port
Name string
Port int
Segnum int
Name string
Port int
Segnum int
Confirm bool
}

var _ ProtoMessage = &DeleteMessage{}

func NewDeleteMessage(name string, port int, seg int) *DeleteMessage {
func NewDeleteMessage(name string, port int, seg int, confirm bool) *DeleteMessage {
return &DeleteMessage{
Name: name,
Port: port,
Segnum: seg,
Name: name,
Port: port,
Segnum: seg,
Confirm: confirm,
}
}

Expand All @@ -29,6 +31,10 @@ func (c *DeleteMessage) Encode() []byte {
0,
}

if c.Confirm {
bt[1] = 1
}

bt = append(bt, []byte(c.Name)...)
bt = append(bt, 0)

Expand All @@ -47,6 +53,9 @@ func (c *DeleteMessage) Encode() []byte {
}

func (c *DeleteMessage) Decode(body []byte) {
if body[1] == 1 {
c.Confirm = true
}
c.Name = c.GetDeleteName(body[4:])
c.Port = int(binary.BigEndian.Uint64(body[len(body)-16 : len(body)-8]))
c.Segnum = int(binary.BigEndian.Uint64(body[len(body)-8:]))
Expand Down
3 changes: 2 additions & 1 deletion pkg/message/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func TestCopyMsg(t *testing.T) {
func TestDeleteMsg(t *testing.T) {
assert := assert.New(t)

msg := message.NewDeleteMessage("myname/mynextname", 5432, 42)
msg := message.NewDeleteMessage("myname/mynextname", 5432, 42, true)
body := msg.Encode()

assert.Equal(body[8], byte(message.MessageTypeDelete))
Expand All @@ -196,4 +196,5 @@ func TestDeleteMsg(t *testing.T) {
assert.Equal("myname/mynextname", msg2.Name)
assert.Equal(5432, msg2.Port)
assert.Equal(42, msg2.Segnum)
assert.True(msg2.Confirm)
}
87 changes: 71 additions & 16 deletions pkg/proc/interaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,25 +333,31 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl
if err != nil {
fmt.Printf("error in gettime %v", err) // delete this
}
fmt.Printf("backup lsn %v", firstBackupLSN) // delete this
ylogger.Zero.Info().Uint64("lsn", firstBackupLSN).Msg("first backup LSN")

//залистить файлы
ylogger.Zero.Info().Str("path", msg.Name).Msg("going to list path")
objectMetas, err := s.ListPath(msg.Name)
if err != nil {
_ = ycl.ReplyError(fmt.Errorf("could not list objects: %s", err), "failed to compelete request")

return nil
}
ylogger.Zero.Info().Int("metas count", len(objectMetas)).Msg("got object metas")
// проверить yezzey vi и ei - может прочитать зарание
//conect pgx to port and select from yezzey schema
vi, ei, err := getVirtualIndex(msg.Port) //TODO rename method
if err != nil {
fmt.Printf("error in copy %v", err) // delete this
}
ylogger.Zero.Info().Msg("recieved virtual index and expire index")
ylogger.Zero.Debug().Int("virtual", len(vi)).Msg("vi count")
ylogger.Zero.Debug().Int("expire", len(ei)).Msg("ei count")

//проверить и скопировать
var failed []*storage.S3ObjectMeta
retryCount := 0
amount := 0
for len(objectMetas) > 0 && retryCount < 10 {
retryCount++
for i := 0; i < len(objectMetas); i++ {
Expand All @@ -361,8 +367,14 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl
ans := fmt.Sprintf("%s_%s_%s_%s_", p3[0], p3[1], p3[2], p3[3])
lsn, ok := ei[ans]
if !vi[ans] && (lsn < firstBackupLSN || !ok) {
amount++
if !msg.Confirm {
ylogger.Zero.Info().Str("file: %s", objectMetas[i].Path).Msg("File will be deleted")
continue
}
err = s.MoveObject(objectMetas[i].Path, objectMetas[i].Path+"_trash")
if err != nil {
amount--
fmt.Printf("error in copy %v", err) // delete this
failed = append(failed, objectMetas[i])
}
Expand All @@ -371,6 +383,7 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl
objectMetas = failed
fmt.Printf("failed files count: %d\n", len(objectMetas))
failed = make([]*storage.S3ObjectMeta, 0)
ylogger.Zero.Info().Int("amount", amount).Msg("deleted files count")
}

default:
Expand All @@ -385,31 +398,47 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl

// get lsn of the oldest backup
func getLSN(seg int) (uint64, error) {
cmd := exec.Command("wal-g", "st ls", fmt.Sprintf("segments_005/seg%d/basebackups_005/", seg))

cmd := exec.Command("/usr/bin/wal-g", "st", "ls", fmt.Sprintf("segments_005/seg%d/basebackups_005/", seg), "--config=/etc/wal-g/wal-g.yaml")
ylogger.Zero.Debug().Any("flags", cmd.Args).Msg("Command args")
var out bytes.Buffer
cmd.Stdout = &out

err := cmd.Run()
if err != nil {
ylogger.Zero.Debug().AnErr("error", err).Msg("Failed to run st ls")
return 0, err
}
p1 := strings.Split(out.String(), "\n")
p2 := p1[len(p1)-1] //TODO not really last

cmd2 := exec.Command("wal-g", "st cat", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, p2))
minLSN := BackupLSN{Lsn: ^uint64(0)}
for _, line := range p1 {
if !strings.Contains(line, ".json") {
continue
}
p2 := strings.Split(line, " ")
p3 := p2[len(p2)-1]

var out2 bytes.Buffer
cmd2.Stdout = &out2
ylogger.Zero.Debug().Str("file: %s", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, p3)).Msg("check lsn in file")
cmd2 := exec.Command("/usr/bin/wal-g", "st", "cat", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, p3), "--config=/etc/wal-g/wal-g.yaml")

var out2 bytes.Buffer
cmd2.Stdout = &out2

err = cmd2.Run()
if err != nil {
ylogger.Zero.Debug().AnErr("error", err).Msg("Failed to run st cat")
return 0, err
}
lsn := BackupLSN{}
err = json.Unmarshal(out2.Bytes(), &lsn)

if lsn.Lsn < minLSN.Lsn {
minLSN.Lsn = lsn.Lsn
}

err = cmd2.Run()
if err != nil {
return 0, err
}
lsn := BackupLSN{}
err = json.Unmarshal(out2.Bytes(), &lsn)

return lsn.Lsn, err
return minLSN.Lsn, err
}

func connectToDatabase(port int, database string) (*pgx.Conn, error) {
Expand Down Expand Up @@ -439,17 +468,20 @@ func getVirtualIndex(port int) (map[string]bool, map[string]uint64, error) { //T
if err != nil {
return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix
}
ylogger.Zero.Debug().Str("database name", db.name).Msg("recieved database")
conn, err := connectToDatabase(port, db.name)
if err != nil {
return nil, nil, err
}
defer conn.Close() //error
ylogger.Zero.Debug().Msg("connected to database")

rows, err := conn.Query(`SELECT reloid, relfileoid, expire_lsn, fqnmd5 FROM yezzey.yezzey_expire_index;`)
rows, err := conn.Query(`SELECT reloid, relfileoid, expire_lsn, fqnmd5 FROM yezzey.yezzey_expire_index WHERE expire_lsn != '0/0';`)
if err != nil {
return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix
}
defer rows.Close()
ylogger.Zero.Debug().Msg("executed select")

c := make(map[string]uint64, 0)
for rows.Next() {
Expand All @@ -465,15 +497,17 @@ func getVirtualIndex(port int) (map[string]bool, map[string]uint64, error) { //T

c[fmt.Sprintf("%d_%d_%s_%s_", db.tablespace, db.oid, row.fqnmd5, row.relfileoid)] = lsn
}
ylogger.Zero.Debug().Msg("read 1")

rows2, err := conn.Query(`SELECT x_path FROM yezzey.yezzey_virtual_index;`)
if err != nil {
return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix
}
defer rows2.Close()
ylogger.Zero.Debug().Msg("read 2")

c2 := make(map[string]bool, 0)
for rows.Next() {
for rows2.Next() {
xpath := ""
if err := rows2.Scan(&xpath); err != nil {
return nil, nil, fmt.Errorf("unable to parse query output %v", err)
Expand All @@ -483,7 +517,9 @@ func getVirtualIndex(port int) (map[string]bool, map[string]uint64, error) { //T
p3 := strings.Split(p2, "_")
ans := fmt.Sprintf("%s_%s_%s_%s_", p3[0], p3[1], p3[2], p3[3])
c2[ans] = true
ylogger.Zero.Debug().Str("file", ans).Msg("added")
}
ylogger.Zero.Debug().Msg("read 3")

return c2, c, err
}
Expand All @@ -494,34 +530,53 @@ func getDatabase(port int) (DB, error) {
return DB{}, err
}
defer conn.Close() //error
ylogger.Zero.Debug().Msg("connected to db")
rows, err := conn.Query(`SELECT datname, dattablespace, oid FROM pg_database WHERE datallowconn;`)
if err != nil {
return DB{}, err
}
defer rows.Close()
ylogger.Zero.Debug().Msg("recieved db list")

for rows.Next() {
row := DB{}
ylogger.Zero.Debug().Msg("cycle 1")
if err := rows.Scan(&row.name, &row.tablespace, &row.oid); err != nil {
return DB{}, err
}
ylogger.Zero.Debug().Msg("cycle 2")
if row.name == "postgres" {
continue
}

ylogger.Zero.Debug().Str("db", row.name).Msg("check database")
connDb, err := connectToDatabase(port, row.name)
if err != nil {
return DB{}, err
}
defer connDb.Close() //error
ylogger.Zero.Debug().Msg("cycle 3")

rowsdb, err := connDb.Query(`SELECT exists(SELECT * FROM information_schema.schemata WHERE schema_name='yezzey');`)
if err != nil {
return DB{}, err
}
defer rowsdb.Close()
ylogger.Zero.Debug().Msg("cycle 4")
var ans bool
rowsdb.Scan(&ans)
rowsdb.Next()
err = rowsdb.Scan(&ans)
if err != nil {
ylogger.Zero.Error().AnErr("error", err).Msg("error during yezzey check")
return DB{}, err
}
ylogger.Zero.Debug().Bool("result", ans).Msg("find yezzey schema")
if ans {
ylogger.Zero.Debug().Str("db: %s", row.name).Msg("found yezzey schema in database")
return row, nil
}

ylogger.Zero.Debug().Str("db: %s", row.name).Msg("no yezzey schema in database")
}
return DB{}, fmt.Errorf("no yezzey schema across databases")
}
Expand Down

0 comments on commit fe241f0

Please sign in to comment.