Skip to content

Commit

Permalink
fixes after refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
debebantur committed Aug 6, 2024
1 parent 804de05 commit ec7ea9a
Show file tree
Hide file tree
Showing 11 changed files with 190 additions and 374 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ mockgen:
mockgen -source=pkg/proc/yrreader.go -destination=pkg/mock/proc/yrreader.go -package=mock
mockgen -source=pkg/database/database.go -destination=pkg/mock/database.go -package=mock
mockgen -source=pkg/backups/backups.go -destination=pkg/mock/backups.go -package=mock
mockgen -source=pkg/storage/storage_interractor.go -destination=pkg/mock/storage.go -package=mock
mockgen -source=pkg/storage/storage.go -destination=pkg/mock/storage.go -package=mock

version = $(shell git describe --tags --abbrev=0)
package:
Expand Down
18 changes: 9 additions & 9 deletions cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var offset uint64
var segmentPort int
var segmentNum int
var confirm bool
var garbage bool

// TODOV
func Runner(f func(net.Conn, *config.Instance, []string) error) func(*cobra.Command, []string) error {
Expand Down Expand Up @@ -182,12 +183,10 @@ func listFunc(con net.Conn, instanceCnf *config.Instance, args []string) error {
meta.Decode(body)

res = append(res, meta.Content...)
break
case message.MessageTypeReadyForQuery:
done = true
break
default:
return fmt.Errorf("Incorrect message type: %s", tp.String())
return fmt.Errorf("incorrect message type: %s", tp.String())
}
}

Expand Down Expand Up @@ -217,10 +216,10 @@ var copyCmd = &cobra.Command{
}

var deleteCmd = &cobra.Command{
Use: "delete_garbage",
Short: "delete_garbage",
Use: "delete",
Short: "delete",
RunE: func(cmd *cobra.Command, args []string) error {
ylogger.Zero.Info().Msg("Execute delete_garbage command")
ylogger.Zero.Info().Msg("Execute delete command")
err := config.LoadInstanceConfig(cfgPath)
if err != nil {
return err
Expand All @@ -231,10 +230,10 @@ var deleteCmd = &cobra.Command{
if err != nil {
return err
}

defer con.Close()

ylogger.Zero.Info().Str("name", args[0]).Msg("delete")
msg := message.NewDeleteMessage(args[0], segmentPort, segmentNum, confirm).Encode()
msg := message.NewDeleteMessage(args[0], segmentPort, segmentNum, confirm, garbage).Encode()
_, err = con.Write(msg)
if err != nil {
return err
Expand Down Expand Up @@ -292,8 +291,9 @@ func init() {
rootCmd.AddCommand(listCmd)

deleteCmd.PersistentFlags().IntVarP(&segmentPort, "port", "p", 6000, "port that segment is listening on")
deleteCmd.PersistentFlags().IntVarP(&segmentNum, "segnum", "s", 0, "number of the segment")
deleteCmd.PersistentFlags().IntVarP(&segmentNum, "segnum", "s", 0, "logical number of a segment")
deleteCmd.PersistentFlags().BoolVarP(&confirm, "confirm", "", false, "confirm deletion")
deleteCmd.PersistentFlags().BoolVarP(&garbage, "garbage", "g", false, "delete garbage")
rootCmd.AddCommand(deleteCmd)
}

Expand Down
22 changes: 11 additions & 11 deletions pkg/backups/backups.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type BackupInterractor interface {
GetFirstLSN(int) (uint64, error)
}

type WalgBackupInterractor struct {
type WalgBackupInterractor struct { //TODO: rewrite to using s3 instead of wal-g cmd
}

// get lsn of the oldest backup
Expand All @@ -34,29 +34,29 @@ func (b *WalgBackupInterractor) GetFirstLSN(seg int) (uint64, error) {
ylogger.Zero.Debug().AnErr("error", err).Msg("Failed to run st ls")
return 0, err
}
p1 := strings.Split(out.String(), "\n")
lines := strings.Split(out.String(), "\n")

minLSN := BackupLSN{Lsn: ^uint64(0)}
for _, line := range p1 {
for _, line := range lines {
if !strings.Contains(line, ".json") {
continue
}
p2 := strings.Split(line, " ")
p3 := p2[len(p2)-1]
parts := strings.Split(line, " ")
fileName := parts[len(parts)-1]

ylogger.Zero.Debug().Str("file: %s", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, p3)).Msg("check lsn in file")
cmd2 := exec.Command("/usr/bin/wal-g", "st", "cat", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, p3), "--config=/etc/wal-g/wal-g.yaml")
ylogger.Zero.Debug().Str("file: %s", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, fileName)).Msg("check lsn in file")
catCmd := exec.Command("/usr/bin/wal-g", "st", "cat", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, fileName), "--config=/etc/wal-g/wal-g.yaml")

var out2 bytes.Buffer
cmd2.Stdout = &out2
var catOut bytes.Buffer
catCmd.Stdout = &catOut

err = cmd2.Run()
err = catCmd.Run()
if err != nil {
ylogger.Zero.Debug().AnErr("error", err).Msg("Failed to run st cat")
return 0, err
}
lsn := BackupLSN{}
err = json.Unmarshal(out2.Bytes(), &lsn)
err = json.Unmarshal(catOut.Bytes(), &lsn)

if lsn.Lsn < minLSN.Lsn {
minLSN.Lsn = lsn.Lsn
Expand Down
10 changes: 9 additions & 1 deletion pkg/message/delete_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,18 @@ type DeleteMessage struct { //seg port
Port int
Segnum int
Confirm bool
Garbage bool
}

var _ ProtoMessage = &DeleteMessage{}

func NewDeleteMessage(name string, port int, seg int, confirm bool) *DeleteMessage {
func NewDeleteMessage(name string, port int, seg int, confirm bool, garbage bool) *DeleteMessage {
return &DeleteMessage{
Name: name,
Port: port,
Segnum: seg,
Confirm: confirm,
Garbage: garbage,
}
}

Expand All @@ -34,6 +36,9 @@ func (c *DeleteMessage) Encode() []byte {
if c.Confirm {
bt[1] = 1
}
if c.Garbage {
bt[2] = 1
}

bt = append(bt, []byte(c.Name)...)
bt = append(bt, 0)
Expand All @@ -56,6 +61,9 @@ func (c *DeleteMessage) Decode(body []byte) {
if body[1] == 1 {
c.Confirm = true
}
if body[2] == 1 {
c.Garbage = true
}
c.Name = c.GetDeleteName(body[4:])
c.Port = int(binary.BigEndian.Uint64(body[len(body)-16 : len(body)-8]))
c.Segnum = int(binary.BigEndian.Uint64(body[len(body)-8:]))
Expand Down
3 changes: 2 additions & 1 deletion pkg/message/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func TestCopyMsg(t *testing.T) {
func TestDeleteMsg(t *testing.T) {
assert := assert.New(t)

msg := message.NewDeleteMessage("myname/mynextname", 5432, 42, true)
msg := message.NewDeleteMessage("myname/mynextname", 5432, 42, true, true)
body := msg.Encode()

assert.Equal(body[8], byte(message.MessageTypeDelete))
Expand All @@ -197,4 +197,5 @@ func TestDeleteMsg(t *testing.T) {
assert.Equal(5432, msg2.Port)
assert.Equal(42, msg2.Segnum)
assert.True(msg2.Confirm)
assert.True(msg2.Garbage)
}
Loading

0 comments on commit ec7ea9a

Please sign in to comment.