Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Yezzey data files vacuum. WAL-G backup integration #44

Merged
merged 5 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ unittest:

mockgen:
mockgen -source=pkg/proc/yrreader.go -destination=pkg/mock/proc/yrreader.go -package=mock
mockgen -source=pkg/database/database.go -destination=pkg/mock/database.go -package=mock
mockgen -source=pkg/backups/backups.go -destination=pkg/mock/backups.go -package=mock
mockgen -source=pkg/storage/storage.go -destination=pkg/mock/storage.go -package=mock

version = $(shell git describe --tags --abbrev=0)
package:
Expand Down
57 changes: 54 additions & 3 deletions cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ var logLevel string
var decrypt bool
var encrypt bool
var offset uint64
var segmentPort int
var segmentNum int
var confirm bool
var garbage bool

// TODOV
func Runner(f func(net.Conn, *config.Instance, []string) error) func(*cobra.Command, []string) error {
Expand Down Expand Up @@ -179,12 +183,10 @@ func listFunc(con net.Conn, instanceCnf *config.Instance, args []string) error {
meta.Decode(body)

res = append(res, meta.Content...)
break
case message.MessageTypeReadyForQuery:
done = true
break
default:
return fmt.Errorf("Incorrect message type: %s", tp.String())
return fmt.Errorf("incorrect message type: %s", tp.String())
}
}

Expand Down Expand Up @@ -213,6 +215,49 @@ var copyCmd = &cobra.Command{
RunE: Runner(copyFunc),
}

var deleteCmd = &cobra.Command{
debebantur marked this conversation as resolved.
Show resolved Hide resolved
Use: "delete",
Short: "delete",
RunE: func(cmd *cobra.Command, args []string) error {
ylogger.Zero.Info().Msg("Execute delete command")
err := config.LoadInstanceConfig(cfgPath)
if err != nil {
return err
}
instanceCnf := config.InstanceConfig()

con, err := net.Dial("unix", instanceCnf.SocketPath)
if err != nil {
return err
}
defer con.Close()

ylogger.Zero.Info().Str("name", args[0]).Msg("delete")
msg := message.NewDeleteMessage(args[0], segmentPort, segmentNum, confirm, garbage).Encode()
_, err = con.Write(msg)
if err != nil {
return err
}

ylogger.Zero.Debug().Bytes("msg", msg).Msg("constructed delete msg")

client := client.NewYClient(con)
protoReader := proc.NewProtoReader(client)

ansType, body, err := protoReader.ReadPacket()
if err != nil {
ylogger.Zero.Debug().Err(err).Msg("error while recieving answer")
return err
}

if ansType != message.MessageTypeReadyForQuery {
return fmt.Errorf("failed to delete, msg: %v", body)
}

return nil
},
}

var putCmd = &cobra.Command{
Use: "put",
Short: "put",
Expand Down Expand Up @@ -244,6 +289,12 @@ func init() {
rootCmd.AddCommand(putCmd)

rootCmd.AddCommand(listCmd)

deleteCmd.PersistentFlags().IntVarP(&segmentPort, "port", "p", 6000, "port that segment is listening on")
deleteCmd.PersistentFlags().IntVarP(&segmentNum, "segnum", "s", 0, "logical number of a segment")
deleteCmd.PersistentFlags().BoolVarP(&confirm, "confirm", "", false, "confirm deletion")
deleteCmd.PersistentFlags().BoolVarP(&garbage, "garbage", "g", false, "delete garbage")
rootCmd.AddCommand(deleteCmd)
}

func main() {
Expand Down
11 changes: 8 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,24 @@ require (
gopkg.in/yaml.v2 v2.4.0
)

require golang.org/x/text v0.14.0 // indirect

require (
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/cockroachdb/apd v1.1.0 // indirect
github.com/gofrs/uuid v4.4.0+incompatible // indirect
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
golang.org/x/text v0.14.0 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
)

require (
github.com/cloudflare/circl v1.3.3 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/mock v1.6.0
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/pgx v3.6.2+incompatible
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
Expand Down
15 changes: 12 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,29 @@ github.com/ProtonMail/go-crypto v0.0.0-20230923063757-afb1ddc0824c/go.mod h1:EjA
github.com/bwesterb/go-ristretto v1.2.3/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0=
github.com/cloudflare/circl v1.3.3 h1:fE/Qz0QdIGqeWfnwq0RE0R7MI51s0M2E4Ga9kq5AEMs=
github.com/cloudflare/circl v1.3.3/go.mod h1:5XYMA4rFBvNIrhs50XuiBJ15vF2pZn4nnUKZrLbUZFA=
github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I=
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofrs/uuid v4.4.0+incompatible h1:3qXRTX8/NbyulANqlc0lchS1gqAVxRgsuW1YrTJupqA=
github.com/gofrs/uuid v4.4.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 h1:vr3AYkKovP8uR8AvSGGUK1IDqRa5lAAvEkZG1LKaCRc=
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733/go.mod h1:WrMFNQdiFJ80sQsxDoMokWK1W5TQtxBFNpzWTD84ibQ=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx v3.6.2+incompatible h1:2zP5OD7kiyR3xzRYMhOcXVvkDZsImVXfj+yIyTQf3/o=
github.com/jackc/pgx v3.6.2+incompatible/go.mod h1:0ZGrqGqkRlliWnWB4zKnWtjbSWbGkVEFm4TeybAXq+I=
github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY=
github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
Expand All @@ -30,6 +38,8 @@ github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
Expand All @@ -45,13 +55,13 @@ github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.31.0 h1:FcTR3NnLWW+NnTwwhFWiJSZr4ECLpqCm6QsEnyvbV4A=
github.com/rs/zerolog v1.31.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k=
github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME=
github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0=
github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/yezzey-gp/aws-sdk-go v0.1.0 h1:as6ANEva14gKdhWPjZy6qaGR+/WhP0HN4UMzDHLDqmU=
Expand Down Expand Up @@ -127,6 +137,5 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EV
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
67 changes: 67 additions & 0 deletions pkg/backups/backups.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package backups

import (
"bytes"
"encoding/json"
"fmt"
"os/exec"
"strings"

"github.com/yezzey-gp/yproxy/pkg/ylogger"
)

type BackupLSN struct {
Lsn uint64 `json:"LSN"`
}

//go:generate mockgen -destination=pkg/mock/backups.go -package=mock
type BackupInterractor interface {
GetFirstLSN(int) (uint64, error)
}

type WalgBackupInterractor struct { //TODO: rewrite to using s3 instead of wal-g cmd
}

// get lsn of the oldest backup
func (b *WalgBackupInterractor) GetFirstLSN(seg int) (uint64, error) {
cmd := exec.Command("/usr/bin/wal-g", "st", "ls", fmt.Sprintf("segments_005/seg%d/basebackups_005/", seg), "--config=/etc/wal-g/wal-g.yaml")
ylogger.Zero.Debug().Any("flags", cmd.Args).Msg("Command args")
var out bytes.Buffer
cmd.Stdout = &out

err := cmd.Run()
if err != nil {
ylogger.Zero.Debug().AnErr("error", err).Msg("Failed to run st ls")
return 0, err
}
lines := strings.Split(out.String(), "\n")

minLSN := BackupLSN{Lsn: ^uint64(0)}
for _, line := range lines {
if !strings.Contains(line, ".json") {
continue
}
parts := strings.Split(line, " ")
fileName := parts[len(parts)-1]

ylogger.Zero.Debug().Str("file: %s", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, fileName)).Msg("check lsn in file")
catCmd := exec.Command("/usr/bin/wal-g", "st", "cat", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, fileName), "--config=/etc/wal-g/wal-g.yaml")

var catOut bytes.Buffer
catCmd.Stdout = &catOut

err = catCmd.Run()
if err != nil {
ylogger.Zero.Debug().AnErr("error", err).Msg("Failed to run st cat")
return 0, err
}
lsn := BackupLSN{}
err = json.Unmarshal(catOut.Bytes(), &lsn)

if lsn.Lsn < minLSN.Lsn {
minLSN.Lsn = lsn.Lsn
}
}

return minLSN.Lsn, err
}
Loading
Loading