From fe241f0e2ccd9840c16bbc199078cee8e4b31c05 Mon Sep 17 00:00:00 2001 From: debebantur Date: Tue, 30 Jul 2024 10:40:34 +0000 Subject: [PATCH] working delete garbadge --- cmd/client/main.go | 4 +- pkg/message/delete_message.go | 23 ++++++--- pkg/message/message_test.go | 3 +- pkg/proc/interaction.go | 87 ++++++++++++++++++++++++++++------- 4 files changed, 92 insertions(+), 25 deletions(-) diff --git a/cmd/client/main.go b/cmd/client/main.go index 97f6fba..0570078 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -24,6 +24,7 @@ var encrypt bool var offset uint64 var segmentPort int var segmentNum int +var confirm bool // TODOV func Runner(f func(net.Conn, *config.Instance, []string) error) func(*cobra.Command, []string) error { @@ -233,7 +234,7 @@ var deleteCmd = &cobra.Command{ defer con.Close() ylogger.Zero.Info().Str("name", args[0]).Msg("delete") - msg := message.NewDeleteMessage(args[0], segmentPort, segmentNum).Encode() + msg := message.NewDeleteMessage(args[0], segmentPort, segmentNum, confirm).Encode() _, err = con.Write(msg) if err != nil { return err @@ -292,6 +293,7 @@ func init() { deleteCmd.PersistentFlags().IntVarP(&segmentPort, "port", "p", 6000, "port that segment is listening on") deleteCmd.PersistentFlags().IntVarP(&segmentNum, "segnum", "s", 0, "number of the segment") + deleteCmd.PersistentFlags().BoolVarP(&confirm, "confirm", "", false, "confirm deletion") rootCmd.AddCommand(deleteCmd) } diff --git a/pkg/message/delete_message.go b/pkg/message/delete_message.go index 592601a..2af9881 100644 --- a/pkg/message/delete_message.go +++ b/pkg/message/delete_message.go @@ -6,18 +6,20 @@ import ( ) type DeleteMessage struct { //seg port - Name string - Port int - Segnum int + Name string + Port int + Segnum int + Confirm bool } var _ ProtoMessage = &DeleteMessage{} -func NewDeleteMessage(name string, port int, seg int) *DeleteMessage { +func NewDeleteMessage(name string, port int, seg int, confirm bool) *DeleteMessage { return &DeleteMessage{ - Name: name, - Port: port, - Segnum: seg, + Name: name, + Port: port, + Segnum: seg, + Confirm: confirm, } } @@ -29,6 +31,10 @@ func (c *DeleteMessage) Encode() []byte { 0, } + if c.Confirm { + bt[1] = 1 + } + bt = append(bt, []byte(c.Name)...) bt = append(bt, 0) @@ -47,6 +53,9 @@ func (c *DeleteMessage) Encode() []byte { } func (c *DeleteMessage) Decode(body []byte) { + if body[1] == 1 { + c.Confirm = true + } c.Name = c.GetDeleteName(body[4:]) c.Port = int(binary.BigEndian.Uint64(body[len(body)-16 : len(body)-8])) c.Segnum = int(binary.BigEndian.Uint64(body[len(body)-8:])) diff --git a/pkg/message/message_test.go b/pkg/message/message_test.go index 1681685..a522ccf 100644 --- a/pkg/message/message_test.go +++ b/pkg/message/message_test.go @@ -185,7 +185,7 @@ func TestCopyMsg(t *testing.T) { func TestDeleteMsg(t *testing.T) { assert := assert.New(t) - msg := message.NewDeleteMessage("myname/mynextname", 5432, 42) + msg := message.NewDeleteMessage("myname/mynextname", 5432, 42, true) body := msg.Encode() assert.Equal(body[8], byte(message.MessageTypeDelete)) @@ -196,4 +196,5 @@ func TestDeleteMsg(t *testing.T) { assert.Equal("myname/mynextname", msg2.Name) assert.Equal(5432, msg2.Port) assert.Equal(42, msg2.Segnum) + assert.True(msg2.Confirm) } diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index 4011727..970a113 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -333,25 +333,31 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl if err != nil { fmt.Printf("error in gettime %v", err) // delete this } - fmt.Printf("backup lsn %v", firstBackupLSN) // delete this + ylogger.Zero.Info().Uint64("lsn", firstBackupLSN).Msg("first backup LSN") //залистить файлы + ylogger.Zero.Info().Str("path", msg.Name).Msg("going to list path") objectMetas, err := s.ListPath(msg.Name) if err != nil { _ = ycl.ReplyError(fmt.Errorf("could not list objects: %s", err), "failed to compelete request") return nil } + ylogger.Zero.Info().Int("metas count", len(objectMetas)).Msg("got object metas") // проверить yezzey vi и ei - может прочитать зарание //conect pgx to port and select from yezzey schema vi, ei, err := getVirtualIndex(msg.Port) //TODO rename method if err != nil { fmt.Printf("error in copy %v", err) // delete this } + ylogger.Zero.Info().Msg("recieved virtual index and expire index") + ylogger.Zero.Debug().Int("virtual", len(vi)).Msg("vi count") + ylogger.Zero.Debug().Int("expire", len(ei)).Msg("ei count") //проверить и скопировать var failed []*storage.S3ObjectMeta retryCount := 0 + amount := 0 for len(objectMetas) > 0 && retryCount < 10 { retryCount++ for i := 0; i < len(objectMetas); i++ { @@ -361,8 +367,14 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl ans := fmt.Sprintf("%s_%s_%s_%s_", p3[0], p3[1], p3[2], p3[3]) lsn, ok := ei[ans] if !vi[ans] && (lsn < firstBackupLSN || !ok) { + amount++ + if !msg.Confirm { + ylogger.Zero.Info().Str("file: %s", objectMetas[i].Path).Msg("File will be deleted") + continue + } err = s.MoveObject(objectMetas[i].Path, objectMetas[i].Path+"_trash") if err != nil { + amount-- fmt.Printf("error in copy %v", err) // delete this failed = append(failed, objectMetas[i]) } @@ -371,6 +383,7 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl objectMetas = failed fmt.Printf("failed files count: %d\n", len(objectMetas)) failed = make([]*storage.S3ObjectMeta, 0) + ylogger.Zero.Info().Int("amount", amount).Msg("deleted files count") } default: @@ -385,31 +398,47 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl // get lsn of the oldest backup func getLSN(seg int) (uint64, error) { - cmd := exec.Command("wal-g", "st ls", fmt.Sprintf("segments_005/seg%d/basebackups_005/", seg)) - + cmd := exec.Command("/usr/bin/wal-g", "st", "ls", fmt.Sprintf("segments_005/seg%d/basebackups_005/", seg), "--config=/etc/wal-g/wal-g.yaml") + ylogger.Zero.Debug().Any("flags", cmd.Args).Msg("Command args") var out bytes.Buffer cmd.Stdout = &out err := cmd.Run() if err != nil { + ylogger.Zero.Debug().AnErr("error", err).Msg("Failed to run st ls") return 0, err } p1 := strings.Split(out.String(), "\n") - p2 := p1[len(p1)-1] //TODO not really last - cmd2 := exec.Command("wal-g", "st cat", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, p2)) + minLSN := BackupLSN{Lsn: ^uint64(0)} + for _, line := range p1 { + if !strings.Contains(line, ".json") { + continue + } + p2 := strings.Split(line, " ") + p3 := p2[len(p2)-1] - var out2 bytes.Buffer - cmd2.Stdout = &out2 + ylogger.Zero.Debug().Str("file: %s", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, p3)).Msg("check lsn in file") + cmd2 := exec.Command("/usr/bin/wal-g", "st", "cat", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, p3), "--config=/etc/wal-g/wal-g.yaml") + + var out2 bytes.Buffer + cmd2.Stdout = &out2 + + err = cmd2.Run() + if err != nil { + ylogger.Zero.Debug().AnErr("error", err).Msg("Failed to run st cat") + return 0, err + } + lsn := BackupLSN{} + err = json.Unmarshal(out2.Bytes(), &lsn) + + if lsn.Lsn < minLSN.Lsn { + minLSN.Lsn = lsn.Lsn + } - err = cmd2.Run() - if err != nil { - return 0, err } - lsn := BackupLSN{} - err = json.Unmarshal(out2.Bytes(), &lsn) - return lsn.Lsn, err + return minLSN.Lsn, err } func connectToDatabase(port int, database string) (*pgx.Conn, error) { @@ -439,17 +468,20 @@ func getVirtualIndex(port int) (map[string]bool, map[string]uint64, error) { //T if err != nil { return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix } + ylogger.Zero.Debug().Str("database name", db.name).Msg("recieved database") conn, err := connectToDatabase(port, db.name) if err != nil { return nil, nil, err } defer conn.Close() //error + ylogger.Zero.Debug().Msg("connected to database") - rows, err := conn.Query(`SELECT reloid, relfileoid, expire_lsn, fqnmd5 FROM yezzey.yezzey_expire_index;`) + rows, err := conn.Query(`SELECT reloid, relfileoid, expire_lsn, fqnmd5 FROM yezzey.yezzey_expire_index WHERE expire_lsn != '0/0';`) if err != nil { return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix } defer rows.Close() + ylogger.Zero.Debug().Msg("executed select") c := make(map[string]uint64, 0) for rows.Next() { @@ -465,15 +497,17 @@ func getVirtualIndex(port int) (map[string]bool, map[string]uint64, error) { //T c[fmt.Sprintf("%d_%d_%s_%s_", db.tablespace, db.oid, row.fqnmd5, row.relfileoid)] = lsn } + ylogger.Zero.Debug().Msg("read 1") rows2, err := conn.Query(`SELECT x_path FROM yezzey.yezzey_virtual_index;`) if err != nil { return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix } defer rows2.Close() + ylogger.Zero.Debug().Msg("read 2") c2 := make(map[string]bool, 0) - for rows.Next() { + for rows2.Next() { xpath := "" if err := rows2.Scan(&xpath); err != nil { return nil, nil, fmt.Errorf("unable to parse query output %v", err) @@ -483,7 +517,9 @@ func getVirtualIndex(port int) (map[string]bool, map[string]uint64, error) { //T p3 := strings.Split(p2, "_") ans := fmt.Sprintf("%s_%s_%s_%s_", p3[0], p3[1], p3[2], p3[3]) c2[ans] = true + ylogger.Zero.Debug().Str("file", ans).Msg("added") } + ylogger.Zero.Debug().Msg("read 3") return c2, c, err } @@ -494,34 +530,53 @@ func getDatabase(port int) (DB, error) { return DB{}, err } defer conn.Close() //error + ylogger.Zero.Debug().Msg("connected to db") rows, err := conn.Query(`SELECT datname, dattablespace, oid FROM pg_database WHERE datallowconn;`) if err != nil { return DB{}, err } defer rows.Close() + ylogger.Zero.Debug().Msg("recieved db list") for rows.Next() { row := DB{} + ylogger.Zero.Debug().Msg("cycle 1") if err := rows.Scan(&row.name, &row.tablespace, &row.oid); err != nil { return DB{}, err } + ylogger.Zero.Debug().Msg("cycle 2") + if row.name == "postgres" { + continue + } + ylogger.Zero.Debug().Str("db", row.name).Msg("check database") connDb, err := connectToDatabase(port, row.name) if err != nil { return DB{}, err } defer connDb.Close() //error + ylogger.Zero.Debug().Msg("cycle 3") rowsdb, err := connDb.Query(`SELECT exists(SELECT * FROM information_schema.schemata WHERE schema_name='yezzey');`) if err != nil { return DB{}, err } defer rowsdb.Close() + ylogger.Zero.Debug().Msg("cycle 4") var ans bool - rowsdb.Scan(&ans) + rowsdb.Next() + err = rowsdb.Scan(&ans) + if err != nil { + ylogger.Zero.Error().AnErr("error", err).Msg("error during yezzey check") + return DB{}, err + } + ylogger.Zero.Debug().Bool("result", ans).Msg("find yezzey schema") if ans { + ylogger.Zero.Debug().Str("db: %s", row.name).Msg("found yezzey schema in database") return row, nil } + + ylogger.Zero.Debug().Str("db: %s", row.name).Msg("no yezzey schema in database") } return DB{}, fmt.Errorf("no yezzey schema across databases") }