From 6746827dd19a984b672ac198d7a49c37323a5384 Mon Sep 17 00:00:00 2001 From: debebantur Date: Fri, 19 Jul 2024 15:38:35 +0000 Subject: [PATCH 1/5] some changes --- cmd/client/main.go | 49 ++++++++ go.mod | 11 +- go.sum | 15 ++- pkg/message/delete_message.go | 24 +++- pkg/message/message_test.go | 16 +++ pkg/proc/interaction.go | 218 +++++++++++++++++++++++++++++++++- pkg/storage/storage.go | 161 +++++++++++++++++++++++++ 7 files changed, 482 insertions(+), 12 deletions(-) diff --git a/cmd/client/main.go b/cmd/client/main.go index f396d07..97f6fba 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -22,6 +22,8 @@ var logLevel string var decrypt bool var encrypt bool var offset uint64 +var segmentPort int +var segmentNum int // TODOV func Runner(f func(net.Conn, *config.Instance, []string) error) func(*cobra.Command, []string) error { @@ -213,6 +215,49 @@ var copyCmd = &cobra.Command{ RunE: Runner(copyFunc), } +var deleteCmd = &cobra.Command{ + Use: "delete_garbage", + Short: "delete_garbage", + RunE: func(cmd *cobra.Command, args []string) error { + ylogger.Zero.Info().Msg("Execute delete_garbage command") + err := config.LoadInstanceConfig(cfgPath) + if err != nil { + return err + } + instanceCnf := config.InstanceConfig() + + con, err := net.Dial("unix", instanceCnf.SocketPath) + if err != nil { + return err + } + + defer con.Close() + ylogger.Zero.Info().Str("name", args[0]).Msg("delete") + msg := message.NewDeleteMessage(args[0], segmentPort, segmentNum).Encode() + _, err = con.Write(msg) + if err != nil { + return err + } + + ylogger.Zero.Debug().Bytes("msg", msg).Msg("constructed delete msg") + + client := client.NewYClient(con) + protoReader := proc.NewProtoReader(client) + + ansType, body, err := protoReader.ReadPacket() + if err != nil { + ylogger.Zero.Debug().Err(err).Msg("error while recieving answer") + return err + } + + if ansType != message.MessageTypeReadyForQuery { + return fmt.Errorf("failed to delete, msg: %v", body) + } + + return nil + }, +} + var putCmd = &cobra.Command{ Use: "put", Short: "put", @@ -244,6 +289,10 @@ func init() { rootCmd.AddCommand(putCmd) rootCmd.AddCommand(listCmd) + + deleteCmd.PersistentFlags().IntVarP(&segmentPort, "port", "p", 6000, "port that segment is listening on") + deleteCmd.PersistentFlags().IntVarP(&segmentNum, "segnum", "s", 0, "number of the segment") + rootCmd.AddCommand(deleteCmd) } func main() { diff --git a/go.mod b/go.mod index b824141..d226e8b 100644 --- a/go.mod +++ b/go.mod @@ -15,12 +15,16 @@ require ( gopkg.in/yaml.v2 v2.4.0 ) +require golang.org/x/text v0.14.0 // indirect + require ( - github.com/jackc/pgpassfile v1.0.0 // indirect - github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect + github.com/cockroachdb/apd v1.1.0 // indirect + github.com/gofrs/uuid v4.4.0+incompatible // indirect + github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 // indirect github.com/kr/text v0.2.0 // indirect + github.com/lib/pq v1.10.9 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect - golang.org/x/text v0.14.0 // indirect + github.com/shopspring/decimal v1.4.0 // indirect ) require ( @@ -28,6 +32,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang/mock v1.6.0 github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/jackc/pgx v3.6.2+incompatible github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.19 // indirect diff --git a/go.sum b/go.sum index c528825..ca60523 100644 --- a/go.sum +++ b/go.sum @@ -5,6 +5,8 @@ github.com/ProtonMail/go-crypto v0.0.0-20230923063757-afb1ddc0824c/go.mod h1:EjA github.com/bwesterb/go-ristretto v1.2.3/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0= github.com/cloudflare/circl v1.3.3 h1:fE/Qz0QdIGqeWfnwq0RE0R7MI51s0M2E4Ga9kq5AEMs= github.com/cloudflare/circl v1.3.3/go.mod h1:5XYMA4rFBvNIrhs50XuiBJ15vF2pZn4nnUKZrLbUZFA= +github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I= +github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= @@ -12,14 +14,20 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gofrs/uuid v4.4.0+incompatible h1:3qXRTX8/NbyulANqlc0lchS1gqAVxRgsuW1YrTJupqA= +github.com/gofrs/uuid v4.4.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 h1:vr3AYkKovP8uR8AvSGGUK1IDqRa5lAAvEkZG1LKaCRc= +github.com/jackc/fake v0.0.0-20150926172116-812a484cc733/go.mod h1:WrMFNQdiFJ80sQsxDoMokWK1W5TQtxBFNpzWTD84ibQ= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx v3.6.2+incompatible h1:2zP5OD7kiyR3xzRYMhOcXVvkDZsImVXfj+yIyTQf3/o= +github.com/jackc/pgx v3.6.2+incompatible/go.mod h1:0ZGrqGqkRlliWnWB4zKnWtjbSWbGkVEFm4TeybAXq+I= github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY= github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= @@ -30,6 +38,8 @@ github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= @@ -45,13 +55,13 @@ github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.31.0 h1:FcTR3NnLWW+NnTwwhFWiJSZr4ECLpqCm6QsEnyvbV4A= github.com/rs/zerolog v1.31.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= +github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0= github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/yezzey-gp/aws-sdk-go v0.1.0 h1:as6ANEva14gKdhWPjZy6qaGR+/WhP0HN4UMzDHLDqmU= @@ -127,6 +137,5 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EV gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/message/delete_message.go b/pkg/message/delete_message.go index ccab831..592601a 100644 --- a/pkg/message/delete_message.go +++ b/pkg/message/delete_message.go @@ -5,15 +5,19 @@ import ( "encoding/binary" ) -type DeleteMessage struct { - Name string +type DeleteMessage struct { //seg port + Name string + Port int + Segnum int } var _ ProtoMessage = &DeleteMessage{} -func NewDeleteMessage(name string) *DeleteMessage { +func NewDeleteMessage(name string, port int, seg int) *DeleteMessage { return &DeleteMessage{ - Name: name, + Name: name, + Port: port, + Segnum: seg, } } @@ -27,8 +31,16 @@ func (c *DeleteMessage) Encode() []byte { bt = append(bt, []byte(c.Name)...) bt = append(bt, 0) - ln := len(bt) + 8 + p := make([]byte, 8) + binary.BigEndian.PutUint64(p, uint64(c.Port)) + bt = append(bt, p...) + + p = make([]byte, 8) + binary.BigEndian.PutUint64(p, uint64(c.Segnum)) + bt = append(bt, p...) + + ln := len(bt) + 8 bs := make([]byte, 8) binary.BigEndian.PutUint64(bs, uint64(ln)) return append(bs, bt...) @@ -36,6 +48,8 @@ func (c *DeleteMessage) Encode() []byte { func (c *DeleteMessage) Decode(body []byte) { c.Name = c.GetDeleteName(body[4:]) + c.Port = int(binary.BigEndian.Uint64(body[len(body)-16 : len(body)-8])) + c.Segnum = int(binary.BigEndian.Uint64(body[len(body)-8:])) } func (c *DeleteMessage) GetDeleteName(b []byte) string { diff --git a/pkg/message/message_test.go b/pkg/message/message_test.go index 19ea27b..1681685 100644 --- a/pkg/message/message_test.go +++ b/pkg/message/message_test.go @@ -181,3 +181,19 @@ func TestCopyMsg(t *testing.T) { assert.True(msg2.Decrypt) assert.True(msg2.Encrypt) } + +func TestDeleteMsg(t *testing.T) { + assert := assert.New(t) + + msg := message.NewDeleteMessage("myname/mynextname", 5432, 42) + body := msg.Encode() + + assert.Equal(body[8], byte(message.MessageTypeDelete)) + + msg2 := message.DeleteMessage{} + msg2.Decode(body[8:]) + + assert.Equal("myname/mynextname", msg2.Name) + assert.Equal(5432, msg2.Port) + assert.Equal(42, msg2.Segnum) +} diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index fbc4380..4011727 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -1,11 +1,16 @@ package proc import ( + "bytes" + "encoding/json" "fmt" "io" + "os/exec" "strings" "sync" + "github.com/jackc/pgx" + "github.com/pkg/errors" "github.com/yezzey-gp/yproxy/config" "github.com/yezzey-gp/yproxy/pkg/client" "github.com/yezzey-gp/yproxy/pkg/crypt" @@ -230,7 +235,6 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl retryCount++ for i := 0; i < len(objectMetas); i++ { path := strings.TrimPrefix(objectMetas[i].Path, instanceCnf.StorageCnf.StoragePrefix) - //get reader readerFromOldBucket := NewYRetryReader(NewRestartReader(oldStorage, path)) var fromReader io.Reader @@ -317,6 +321,58 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl fmt.Println("Copy finished successfully") ylogger.Zero.Info().Msg("Copy finished successfully") + case message.MessageTypeDelete: + //recieve message + msg := message.DeleteMessage{} + msg.Decode(body) + + ycl.SetExternalFilePath(msg.Name) + + // получить время первого бэкапа + firstBackupLSN, err := getLSN(msg.Segnum) + if err != nil { + fmt.Printf("error in gettime %v", err) // delete this + } + fmt.Printf("backup lsn %v", firstBackupLSN) // delete this + + //залистить файлы + objectMetas, err := s.ListPath(msg.Name) + if err != nil { + _ = ycl.ReplyError(fmt.Errorf("could not list objects: %s", err), "failed to compelete request") + + return nil + } + // проверить yezzey vi и ei - может прочитать зарание + //conect pgx to port and select from yezzey schema + vi, ei, err := getVirtualIndex(msg.Port) //TODO rename method + if err != nil { + fmt.Printf("error in copy %v", err) // delete this + } + + //проверить и скопировать + var failed []*storage.S3ObjectMeta + retryCount := 0 + for len(objectMetas) > 0 && retryCount < 10 { + retryCount++ + for i := 0; i < len(objectMetas); i++ { + p1 := strings.Split(objectMetas[i].Path, "/") + p2 := p1[len(p1)-1] + p3 := strings.Split(p2, "_") + ans := fmt.Sprintf("%s_%s_%s_%s_", p3[0], p3[1], p3[2], p3[3]) + lsn, ok := ei[ans] + if !vi[ans] && (lsn < firstBackupLSN || !ok) { + err = s.MoveObject(objectMetas[i].Path, objectMetas[i].Path+"_trash") + if err != nil { + fmt.Printf("error in copy %v", err) // delete this + failed = append(failed, objectMetas[i]) + } + } + } + objectMetas = failed + fmt.Printf("failed files count: %d\n", len(objectMetas)) + failed = make([]*storage.S3ObjectMeta, 0) + } + default: ylogger.Zero.Error().Any("type", tp).Msg("what type is it") _ = ycl.ReplyError(nil, "wrong request type") @@ -326,3 +382,163 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl return nil } + +// get lsn of the oldest backup +func getLSN(seg int) (uint64, error) { + cmd := exec.Command("wal-g", "st ls", fmt.Sprintf("segments_005/seg%d/basebackups_005/", seg)) + + var out bytes.Buffer + cmd.Stdout = &out + + err := cmd.Run() + if err != nil { + return 0, err + } + p1 := strings.Split(out.String(), "\n") + p2 := p1[len(p1)-1] //TODO not really last + + cmd2 := exec.Command("wal-g", "st cat", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, p2)) + + var out2 bytes.Buffer + cmd2.Stdout = &out2 + + err = cmd2.Run() + if err != nil { + return 0, err + } + lsn := BackupLSN{} + err = json.Unmarshal(out2.Bytes(), &lsn) + + return lsn.Lsn, err +} + +func connectToDatabase(port int, database string) (*pgx.Conn, error) { + config, err := pgx.ParseEnvLibpq() + if err != nil { + return nil, errors.Wrap(err, "Connect: unable to read environment variables") + } + + config.Port = uint16(port) + config.Database = database + + config.RuntimeParams["gp_role"] = "utility" + conn, err := pgx.Connect(config) + if err != nil { + config.RuntimeParams["gp_session_role"] = "utility" + conn, err = pgx.Connect(config) + if err != nil { + fmt.Printf("error in connection %v", err) // delete this + return nil, err + } + } + return conn, nil +} + +func getVirtualIndex(port int) (map[string]bool, map[string]uint64, error) { //TODO несколько баз + db, err := getDatabase(port) + if err != nil { + return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix + } + conn, err := connectToDatabase(port, db.name) + if err != nil { + return nil, nil, err + } + defer conn.Close() //error + + rows, err := conn.Query(`SELECT reloid, relfileoid, expire_lsn, fqnmd5 FROM yezzey.yezzey_expire_index;`) + if err != nil { + return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix + } + defer rows.Close() + + c := make(map[string]uint64, 0) + for rows.Next() { + row := Ei{} + if err := rows.Scan(&row.reloid, &row.relfileoid, &row.expireLsn, &row.fqnmd5); err != nil { + return nil, nil, fmt.Errorf("unable to parse query output %v", err) + } + + lsn, err := pgx.ParseLSN(row.expireLsn) + if err != nil { + return nil, nil, fmt.Errorf("unable to parse query output %v", err) + } + + c[fmt.Sprintf("%d_%d_%s_%s_", db.tablespace, db.oid, row.fqnmd5, row.relfileoid)] = lsn + } + + rows2, err := conn.Query(`SELECT x_path FROM yezzey.yezzey_virtual_index;`) + if err != nil { + return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix + } + defer rows2.Close() + + c2 := make(map[string]bool, 0) + for rows.Next() { + xpath := "" + if err := rows2.Scan(&xpath); err != nil { + return nil, nil, fmt.Errorf("unable to parse query output %v", err) + } + p1 := strings.Split(xpath, "/") + p2 := p1[len(p1)-1] + p3 := strings.Split(p2, "_") + ans := fmt.Sprintf("%s_%s_%s_%s_", p3[0], p3[1], p3[2], p3[3]) + c2[ans] = true + } + + return c2, c, err +} + +func getDatabase(port int) (DB, error) { + conn, err := connectToDatabase(port, "postgres") + if err != nil { + return DB{}, err + } + defer conn.Close() //error + rows, err := conn.Query(`SELECT datname, dattablespace, oid FROM pg_database WHERE datallowconn;`) + if err != nil { + return DB{}, err + } + defer rows.Close() + + for rows.Next() { + row := DB{} + if err := rows.Scan(&row.name, &row.tablespace, &row.oid); err != nil { + return DB{}, err + } + + connDb, err := connectToDatabase(port, row.name) + if err != nil { + return DB{}, err + } + defer connDb.Close() //error + + rowsdb, err := connDb.Query(`SELECT exists(SELECT * FROM information_schema.schemata WHERE schema_name='yezzey');`) + if err != nil { + return DB{}, err + } + defer rowsdb.Close() + var ans bool + rowsdb.Scan(&ans) + if ans { + return row, nil + } + } + return DB{}, fmt.Errorf("no yezzey schema across databases") +} + +type Ei struct { + reloid string + relfileoid string + expireLsn string + fqnmd5 string +} + +type DB struct { + name string + tablespace int + oid int +} + +type BackupLSN struct { + Lsn uint64 `json:"FinishLSN"` +} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index f12ea6f..e0ffd39 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -1,10 +1,18 @@ package storage import ( + "context" "fmt" "io" + "path" + "time" + + "github.com/yezzey-gp/aws-sdk-go/aws" + "github.com/yezzey-gp/aws-sdk-go/service/s3" + "github.com/yezzey-gp/aws-sdk-go/service/s3/s3manager" "github.com/yezzey-gp/yproxy/config" + "github.com/yezzey-gp/yproxy/pkg/ylogger" ) type StorageReader interface { @@ -22,6 +30,7 @@ type StorageLister interface { type StorageMover interface { MoveObject(from string, to string) error + DeleteObject(key string) error } type StorageInteractor interface { @@ -46,3 +55,155 @@ func NewStorage(cnf *config.Storage) (StorageInteractor, error) { return nil, fmt.Errorf("wrong storage type " + cnf.StorageType) } } + +func (s *S3StorageInteractor) CatFileFromStorage(name string, offset int64) (io.ReadCloser, error) { + // XXX: fix this + sess, err := s.pool.GetSession(context.TODO()) + if err != nil { + ylogger.Zero.Err(err).Msg("failed to acquire s3 session") + return nil, err + } + + objectPath := path.Join(s.cnf.StoragePrefix, name) + input := &s3.GetObjectInput{ + Bucket: &s.cnf.StorageBucket, + Key: aws.String(objectPath), + Range: aws.String(fmt.Sprintf("bytes=%d-", offset)), + } + + ylogger.Zero.Debug().Str("key", objectPath).Int64("offset", offset).Str("bucket", + s.cnf.StorageBucket).Msg("requesting external storage") + + object, err := sess.GetObject(input) + return object.Body, err +} + +func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader) error { + sess, err := s.pool.GetSession(context.TODO()) + if err != nil { + ylogger.Zero.Err(err).Msg("failed to acquire s3 session") + return nil + } + + objectPath := path.Join(s.cnf.StoragePrefix, name) + + up := s3manager.NewUploaderWithClient(sess, func(uploader *s3manager.Uploader) { + uploader.PartSize = int64(1 << 24) + uploader.Concurrency = 1 + }) + + _, err = up.Upload( + &s3manager.UploadInput{ + Bucket: aws.String(s.cnf.StorageBucket), + Key: aws.String(objectPath), + Body: r, + StorageClass: aws.String("STANDARD"), + }, + ) + + return err +} + +func (s *S3StorageInteractor) PatchFile(name string, r io.ReadSeeker, startOffste int64) error { + sess, err := s.pool.GetSession(context.TODO()) + if err != nil { + ylogger.Zero.Err(err).Msg("failed to acquire s3 session") + return nil + } + + objectPath := path.Join(s.cnf.StoragePrefix, name) + + input := &s3.PatchObjectInput{ + Bucket: &s.cnf.StorageBucket, + Key: aws.String(objectPath), + Body: r, + ContentRange: aws.String(fmt.Sprintf("bytes %d-18446744073709551615", startOffste)), + } + + _, err = sess.PatchObject(input) + + ylogger.Zero.Debug().Str("key", objectPath).Str("bucket", + s.cnf.StorageBucket).Msg("modifying file in external storage") + + return err +} + +type S3ObjectMeta struct { + Path string + Size int64 + LastModified time.Time +} + +func (s *S3StorageInteractor) ListPath(prefix string) ([]*S3ObjectMeta, error) { + sess, err := s.pool.GetSession(context.TODO()) + if err != nil { + ylogger.Zero.Err(err).Msg("failed to acquire s3 session") + return nil, err + } + + var continuationToken *string + prefix = path.Join(s.cnf.StoragePrefix, prefix) + metas := make([]*S3ObjectMeta, 0) + + for { + input := &s3.ListObjectsV2Input{ + Bucket: &s.cnf.StorageBucket, + Prefix: aws.String(prefix), + ContinuationToken: continuationToken, + } + + out, err := sess.ListObjectsV2(input) + if err != nil { + fmt.Printf("list error: %v\n", err) + } + + for _, obj := range out.Contents { + metas = append(metas, &S3ObjectMeta{ + Path: *obj.Key, + Size: *obj.Size, + LastModified: *obj.LastModified, + }) + } + + if !*out.IsTruncated { + break + } + + continuationToken = out.NextContinuationToken + } + return metas, nil +} + +func (s *S3StorageInteractor) MoveObject(from string, to string) error { + sess, err := s.pool.GetSession(context.TODO()) + if err != nil { + ylogger.Zero.Err(err).Msg("failed to acquire s3 session") + return err + } + + fromPath := path.Join(s.cnf.StoragePrefix, from) + toPath := path.Join(s.cnf.StoragePrefix, to) + + input := s3.CopyObjectInput{ + Bucket: &s.cnf.StorageBucket, + CopySource: aws.String(fromPath), + Key: aws.String(toPath), + } + + _, err = sess.CopyObject(&input) + if err != nil { + ylogger.Zero.Err(err).Msg("failed to copy object") + return err + } + + input2 := s3.DeleteObjectInput{ + Bucket: &s.cnf.StorageBucket, + Key: aws.String(fromPath), + } + + _, err = sess.DeleteObject(&input2) + if err != nil { + ylogger.Zero.Err(err).Msg("failed to delete old object") + } + return err +} From fe241f0e2ccd9840c16bbc199078cee8e4b31c05 Mon Sep 17 00:00:00 2001 From: debebantur Date: Tue, 30 Jul 2024 10:40:34 +0000 Subject: [PATCH 2/5] working delete garbadge --- cmd/client/main.go | 4 +- pkg/message/delete_message.go | 23 ++++++--- pkg/message/message_test.go | 3 +- pkg/proc/interaction.go | 87 ++++++++++++++++++++++++++++------- 4 files changed, 92 insertions(+), 25 deletions(-) diff --git a/cmd/client/main.go b/cmd/client/main.go index 97f6fba..0570078 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -24,6 +24,7 @@ var encrypt bool var offset uint64 var segmentPort int var segmentNum int +var confirm bool // TODOV func Runner(f func(net.Conn, *config.Instance, []string) error) func(*cobra.Command, []string) error { @@ -233,7 +234,7 @@ var deleteCmd = &cobra.Command{ defer con.Close() ylogger.Zero.Info().Str("name", args[0]).Msg("delete") - msg := message.NewDeleteMessage(args[0], segmentPort, segmentNum).Encode() + msg := message.NewDeleteMessage(args[0], segmentPort, segmentNum, confirm).Encode() _, err = con.Write(msg) if err != nil { return err @@ -292,6 +293,7 @@ func init() { deleteCmd.PersistentFlags().IntVarP(&segmentPort, "port", "p", 6000, "port that segment is listening on") deleteCmd.PersistentFlags().IntVarP(&segmentNum, "segnum", "s", 0, "number of the segment") + deleteCmd.PersistentFlags().BoolVarP(&confirm, "confirm", "", false, "confirm deletion") rootCmd.AddCommand(deleteCmd) } diff --git a/pkg/message/delete_message.go b/pkg/message/delete_message.go index 592601a..2af9881 100644 --- a/pkg/message/delete_message.go +++ b/pkg/message/delete_message.go @@ -6,18 +6,20 @@ import ( ) type DeleteMessage struct { //seg port - Name string - Port int - Segnum int + Name string + Port int + Segnum int + Confirm bool } var _ ProtoMessage = &DeleteMessage{} -func NewDeleteMessage(name string, port int, seg int) *DeleteMessage { +func NewDeleteMessage(name string, port int, seg int, confirm bool) *DeleteMessage { return &DeleteMessage{ - Name: name, - Port: port, - Segnum: seg, + Name: name, + Port: port, + Segnum: seg, + Confirm: confirm, } } @@ -29,6 +31,10 @@ func (c *DeleteMessage) Encode() []byte { 0, } + if c.Confirm { + bt[1] = 1 + } + bt = append(bt, []byte(c.Name)...) bt = append(bt, 0) @@ -47,6 +53,9 @@ func (c *DeleteMessage) Encode() []byte { } func (c *DeleteMessage) Decode(body []byte) { + if body[1] == 1 { + c.Confirm = true + } c.Name = c.GetDeleteName(body[4:]) c.Port = int(binary.BigEndian.Uint64(body[len(body)-16 : len(body)-8])) c.Segnum = int(binary.BigEndian.Uint64(body[len(body)-8:])) diff --git a/pkg/message/message_test.go b/pkg/message/message_test.go index 1681685..a522ccf 100644 --- a/pkg/message/message_test.go +++ b/pkg/message/message_test.go @@ -185,7 +185,7 @@ func TestCopyMsg(t *testing.T) { func TestDeleteMsg(t *testing.T) { assert := assert.New(t) - msg := message.NewDeleteMessage("myname/mynextname", 5432, 42) + msg := message.NewDeleteMessage("myname/mynextname", 5432, 42, true) body := msg.Encode() assert.Equal(body[8], byte(message.MessageTypeDelete)) @@ -196,4 +196,5 @@ func TestDeleteMsg(t *testing.T) { assert.Equal("myname/mynextname", msg2.Name) assert.Equal(5432, msg2.Port) assert.Equal(42, msg2.Segnum) + assert.True(msg2.Confirm) } diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index 4011727..970a113 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -333,25 +333,31 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl if err != nil { fmt.Printf("error in gettime %v", err) // delete this } - fmt.Printf("backup lsn %v", firstBackupLSN) // delete this + ylogger.Zero.Info().Uint64("lsn", firstBackupLSN).Msg("first backup LSN") //залистить файлы + ylogger.Zero.Info().Str("path", msg.Name).Msg("going to list path") objectMetas, err := s.ListPath(msg.Name) if err != nil { _ = ycl.ReplyError(fmt.Errorf("could not list objects: %s", err), "failed to compelete request") return nil } + ylogger.Zero.Info().Int("metas count", len(objectMetas)).Msg("got object metas") // проверить yezzey vi и ei - может прочитать зарание //conect pgx to port and select from yezzey schema vi, ei, err := getVirtualIndex(msg.Port) //TODO rename method if err != nil { fmt.Printf("error in copy %v", err) // delete this } + ylogger.Zero.Info().Msg("recieved virtual index and expire index") + ylogger.Zero.Debug().Int("virtual", len(vi)).Msg("vi count") + ylogger.Zero.Debug().Int("expire", len(ei)).Msg("ei count") //проверить и скопировать var failed []*storage.S3ObjectMeta retryCount := 0 + amount := 0 for len(objectMetas) > 0 && retryCount < 10 { retryCount++ for i := 0; i < len(objectMetas); i++ { @@ -361,8 +367,14 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl ans := fmt.Sprintf("%s_%s_%s_%s_", p3[0], p3[1], p3[2], p3[3]) lsn, ok := ei[ans] if !vi[ans] && (lsn < firstBackupLSN || !ok) { + amount++ + if !msg.Confirm { + ylogger.Zero.Info().Str("file: %s", objectMetas[i].Path).Msg("File will be deleted") + continue + } err = s.MoveObject(objectMetas[i].Path, objectMetas[i].Path+"_trash") if err != nil { + amount-- fmt.Printf("error in copy %v", err) // delete this failed = append(failed, objectMetas[i]) } @@ -371,6 +383,7 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl objectMetas = failed fmt.Printf("failed files count: %d\n", len(objectMetas)) failed = make([]*storage.S3ObjectMeta, 0) + ylogger.Zero.Info().Int("amount", amount).Msg("deleted files count") } default: @@ -385,31 +398,47 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl // get lsn of the oldest backup func getLSN(seg int) (uint64, error) { - cmd := exec.Command("wal-g", "st ls", fmt.Sprintf("segments_005/seg%d/basebackups_005/", seg)) - + cmd := exec.Command("/usr/bin/wal-g", "st", "ls", fmt.Sprintf("segments_005/seg%d/basebackups_005/", seg), "--config=/etc/wal-g/wal-g.yaml") + ylogger.Zero.Debug().Any("flags", cmd.Args).Msg("Command args") var out bytes.Buffer cmd.Stdout = &out err := cmd.Run() if err != nil { + ylogger.Zero.Debug().AnErr("error", err).Msg("Failed to run st ls") return 0, err } p1 := strings.Split(out.String(), "\n") - p2 := p1[len(p1)-1] //TODO not really last - cmd2 := exec.Command("wal-g", "st cat", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, p2)) + minLSN := BackupLSN{Lsn: ^uint64(0)} + for _, line := range p1 { + if !strings.Contains(line, ".json") { + continue + } + p2 := strings.Split(line, " ") + p3 := p2[len(p2)-1] - var out2 bytes.Buffer - cmd2.Stdout = &out2 + ylogger.Zero.Debug().Str("file: %s", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, p3)).Msg("check lsn in file") + cmd2 := exec.Command("/usr/bin/wal-g", "st", "cat", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, p3), "--config=/etc/wal-g/wal-g.yaml") + + var out2 bytes.Buffer + cmd2.Stdout = &out2 + + err = cmd2.Run() + if err != nil { + ylogger.Zero.Debug().AnErr("error", err).Msg("Failed to run st cat") + return 0, err + } + lsn := BackupLSN{} + err = json.Unmarshal(out2.Bytes(), &lsn) + + if lsn.Lsn < minLSN.Lsn { + minLSN.Lsn = lsn.Lsn + } - err = cmd2.Run() - if err != nil { - return 0, err } - lsn := BackupLSN{} - err = json.Unmarshal(out2.Bytes(), &lsn) - return lsn.Lsn, err + return minLSN.Lsn, err } func connectToDatabase(port int, database string) (*pgx.Conn, error) { @@ -439,17 +468,20 @@ func getVirtualIndex(port int) (map[string]bool, map[string]uint64, error) { //T if err != nil { return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix } + ylogger.Zero.Debug().Str("database name", db.name).Msg("recieved database") conn, err := connectToDatabase(port, db.name) if err != nil { return nil, nil, err } defer conn.Close() //error + ylogger.Zero.Debug().Msg("connected to database") - rows, err := conn.Query(`SELECT reloid, relfileoid, expire_lsn, fqnmd5 FROM yezzey.yezzey_expire_index;`) + rows, err := conn.Query(`SELECT reloid, relfileoid, expire_lsn, fqnmd5 FROM yezzey.yezzey_expire_index WHERE expire_lsn != '0/0';`) if err != nil { return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix } defer rows.Close() + ylogger.Zero.Debug().Msg("executed select") c := make(map[string]uint64, 0) for rows.Next() { @@ -465,15 +497,17 @@ func getVirtualIndex(port int) (map[string]bool, map[string]uint64, error) { //T c[fmt.Sprintf("%d_%d_%s_%s_", db.tablespace, db.oid, row.fqnmd5, row.relfileoid)] = lsn } + ylogger.Zero.Debug().Msg("read 1") rows2, err := conn.Query(`SELECT x_path FROM yezzey.yezzey_virtual_index;`) if err != nil { return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix } defer rows2.Close() + ylogger.Zero.Debug().Msg("read 2") c2 := make(map[string]bool, 0) - for rows.Next() { + for rows2.Next() { xpath := "" if err := rows2.Scan(&xpath); err != nil { return nil, nil, fmt.Errorf("unable to parse query output %v", err) @@ -483,7 +517,9 @@ func getVirtualIndex(port int) (map[string]bool, map[string]uint64, error) { //T p3 := strings.Split(p2, "_") ans := fmt.Sprintf("%s_%s_%s_%s_", p3[0], p3[1], p3[2], p3[3]) c2[ans] = true + ylogger.Zero.Debug().Str("file", ans).Msg("added") } + ylogger.Zero.Debug().Msg("read 3") return c2, c, err } @@ -494,34 +530,53 @@ func getDatabase(port int) (DB, error) { return DB{}, err } defer conn.Close() //error + ylogger.Zero.Debug().Msg("connected to db") rows, err := conn.Query(`SELECT datname, dattablespace, oid FROM pg_database WHERE datallowconn;`) if err != nil { return DB{}, err } defer rows.Close() + ylogger.Zero.Debug().Msg("recieved db list") for rows.Next() { row := DB{} + ylogger.Zero.Debug().Msg("cycle 1") if err := rows.Scan(&row.name, &row.tablespace, &row.oid); err != nil { return DB{}, err } + ylogger.Zero.Debug().Msg("cycle 2") + if row.name == "postgres" { + continue + } + ylogger.Zero.Debug().Str("db", row.name).Msg("check database") connDb, err := connectToDatabase(port, row.name) if err != nil { return DB{}, err } defer connDb.Close() //error + ylogger.Zero.Debug().Msg("cycle 3") rowsdb, err := connDb.Query(`SELECT exists(SELECT * FROM information_schema.schemata WHERE schema_name='yezzey');`) if err != nil { return DB{}, err } defer rowsdb.Close() + ylogger.Zero.Debug().Msg("cycle 4") var ans bool - rowsdb.Scan(&ans) + rowsdb.Next() + err = rowsdb.Scan(&ans) + if err != nil { + ylogger.Zero.Error().AnErr("error", err).Msg("error during yezzey check") + return DB{}, err + } + ylogger.Zero.Debug().Bool("result", ans).Msg("find yezzey schema") if ans { + ylogger.Zero.Debug().Str("db: %s", row.name).Msg("found yezzey schema in database") return row, nil } + + ylogger.Zero.Debug().Str("db: %s", row.name).Msg("no yezzey schema in database") } return DB{}, fmt.Errorf("no yezzey schema across databases") } From 658d9af75478f65d9b6506affd3ff7734882bead Mon Sep 17 00:00:00 2001 From: debebantur Date: Thu, 1 Aug 2024 08:24:37 +0000 Subject: [PATCH 3/5] fixed some cases --- pkg/proc/interaction.go | 109 +++++++++++++++++++++++++--------------- pkg/storage/storage.go | 10 ++-- 2 files changed, 76 insertions(+), 43 deletions(-) diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index 970a113..1a69b84 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -10,6 +10,7 @@ import ( "sync" "github.com/jackc/pgx" + "github.com/jackc/pgx/pgtype" "github.com/pkg/errors" "github.com/yezzey-gp/yproxy/config" "github.com/yezzey-gp/yproxy/pkg/client" @@ -328,66 +329,89 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl ycl.SetExternalFilePath(msg.Name) - // получить время первого бэкапа - firstBackupLSN, err := getLSN(msg.Segnum) + //get firsr backup lsn + firstBackupLSN, err := getFirstLSN(msg.Segnum) if err != nil { - fmt.Printf("error in gettime %v", err) // delete this + ylogger.Zero.Error().AnErr("err", err).Msg("failed to get first lsn") //return or just assume there are no backups? } ylogger.Zero.Info().Uint64("lsn", firstBackupLSN).Msg("first backup LSN") - //залистить файлы + //list files in storage ylogger.Zero.Info().Str("path", msg.Name).Msg("going to list path") objectMetas, err := s.ListPath(msg.Name) if err != nil { - _ = ycl.ReplyError(fmt.Errorf("could not list objects: %s", err), "failed to compelete request") - + _ = ycl.ReplyError(fmt.Errorf("could not list objects: %s", err), "failed to complete request") return nil } - ylogger.Zero.Info().Int("metas count", len(objectMetas)).Msg("got object metas") - // проверить yezzey vi и ei - может прочитать зарание - //conect pgx to port and select from yezzey schema - vi, ei, err := getVirtualIndex(msg.Port) //TODO rename method + ylogger.Zero.Info().Int("amount", len(objectMetas)).Msg("objects count") + + vi, ei, err := getVirtualExpireIndexes(msg.Port) if err != nil { - fmt.Printf("error in copy %v", err) // delete this + ylogger.Zero.Error().AnErr("err", err).Msg("failed to get indexes") + _ = ycl.ReplyError(fmt.Errorf("could not get virtual and expire indexes: %s", err), "failed to compelete request") + return nil } ylogger.Zero.Info().Msg("recieved virtual index and expire index") ylogger.Zero.Debug().Int("virtual", len(vi)).Msg("vi count") ylogger.Zero.Debug().Int("expire", len(ei)).Msg("ei count") - //проверить и скопировать var failed []*storage.S3ObjectMeta retryCount := 0 - amount := 0 + deletedFilesCount := 0 for len(objectMetas) > 0 && retryCount < 10 { retryCount++ for i := 0; i < len(objectMetas); i++ { - p1 := strings.Split(objectMetas[i].Path, "/") + p1 := strings.Split(objectMetas[i].Path, "/") //TODO fix this p2 := p1[len(p1)-1] p3 := strings.Split(p2, "_") - ans := fmt.Sprintf("%s_%s_%s_%s_", p3[0], p3[1], p3[2], p3[3]) - lsn, ok := ei[ans] - if !vi[ans] && (lsn < firstBackupLSN || !ok) { - amount++ - if !msg.Confirm { - ylogger.Zero.Info().Str("file: %s", objectMetas[i].Path).Msg("File will be deleted") + if len(p3) >= 4 { + p2 = fmt.Sprintf("%s_%s_%s_%s_", p3[0], p3[1], p3[2], p3[3]) + } + lsn, ok := ei[p2] + ylogger.Zero.Debug().Uint64("lsn", lsn).Uint64("backup lsn", firstBackupLSN).Msg("comparing lsn") + if !vi[p2] && (lsn < firstBackupLSN || !ok) { + ylogger.Zero.Debug().Str("file", objectMetas[i].Path). + Bool("file in expire indexi", ok). + Bool("lsn is less than in first backup", lsn < firstBackupLSN). + Msg("file will be deleted") + deletedFilesCount++ + if !msg.Confirm { //do not delete files if no confirmation flag provided continue } - err = s.MoveObject(objectMetas[i].Path, objectMetas[i].Path+"_trash") + filePathParts := strings.Split(objectMetas[i].Path, "/") + err = s.MoveObject(objectMetas[i].Path, fmt.Sprintf("segments_005/seg%d/basebackups_005/yezzey/trash/%s", msg.Segnum, filePathParts[len(filePathParts)-1])) if err != nil { - amount-- - fmt.Printf("error in copy %v", err) // delete this + deletedFilesCount-- + ylogger.Zero.Warn().AnErr("err", err).Str("file", objectMetas[i].Path).Msg("failed to move file") failed = append(failed, objectMetas[i]) } } } objectMetas = failed - fmt.Printf("failed files count: %d\n", len(objectMetas)) failed = make([]*storage.S3ObjectMeta, 0) - ylogger.Zero.Info().Int("amount", amount).Msg("deleted files count") + } + + ylogger.Zero.Info().Int("amount", deletedFilesCount).Msg("deleted files count") + + if len(objectMetas) > 0 { + ylogger.Zero.Error().Int("failed files count", len(objectMetas)).Msg("some files were not moved") + ylogger.Zero.Error().Any("failed files", objectMetas).Msg("failed to move some files") + + _ = ycl.ReplyError(err, "failed to move some files") + return nil + } + + if _, err = ycl.GetRW().Write(message.NewReadyForQueryMessage().Encode()); err != nil { + _ = ycl.ReplyError(err, "failed to upload") + return nil + } + ylogger.Zero.Info().Msg("Deleted garbage successfully") + if !msg.Confirm { + ylogger.Zero.Warn().Msg("It was a dry-run, nothing was deleted") } default: - ylogger.Zero.Error().Any("type", tp).Msg("what type is it") + ylogger.Zero.Error().Any("type", tp).Msg("unknown message type") _ = ycl.ReplyError(nil, "wrong request type") return nil @@ -397,7 +421,7 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl } // get lsn of the oldest backup -func getLSN(seg int) (uint64, error) { +func getFirstLSN(seg int) (uint64, error) { cmd := exec.Command("/usr/bin/wal-g", "st", "ls", fmt.Sprintf("segments_005/seg%d/basebackups_005/", seg), "--config=/etc/wal-g/wal-g.yaml") ylogger.Zero.Debug().Any("flags", cmd.Args).Msg("Command args") var out bytes.Buffer @@ -463,7 +487,7 @@ func connectToDatabase(port int, database string) (*pgx.Conn, error) { return conn, nil } -func getVirtualIndex(port int) (map[string]bool, map[string]uint64, error) { //TODO несколько баз +func getVirtualExpireIndexes(port int) (map[string]bool, map[string]uint64, error) { //TODO несколько баз db, err := getDatabase(port) if err != nil { return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix @@ -495,7 +519,8 @@ func getVirtualIndex(port int) (map[string]bool, map[string]uint64, error) { //T return nil, nil, fmt.Errorf("unable to parse query output %v", err) } - c[fmt.Sprintf("%d_%d_%s_%s_", db.tablespace, db.oid, row.fqnmd5, row.relfileoid)] = lsn + ylogger.Zero.Debug().Str("file", fmt.Sprintf("%d_%d_%s_%d_", db.tablespace, db.oid, row.fqnmd5, row.relfileoid)).Msg("added file to ei") + c[fmt.Sprintf("%d_%d_%s_%d_", db.tablespace, db.oid, row.fqnmd5, row.relfileoid)] = lsn } ylogger.Zero.Debug().Msg("read 1") @@ -515,9 +540,11 @@ func getVirtualIndex(port int) (map[string]bool, map[string]uint64, error) { //T p1 := strings.Split(xpath, "/") p2 := p1[len(p1)-1] p3 := strings.Split(p2, "_") - ans := fmt.Sprintf("%s_%s_%s_%s_", p3[0], p3[1], p3[2], p3[3]) - c2[ans] = true - ylogger.Zero.Debug().Str("file", ans).Msg("added") + if len(p3) >= 4 { + p2 = fmt.Sprintf("%s_%s_%s_%s_", p3[0], p3[1], p3[2], p3[3]) + } + c2[p2] = true + ylogger.Zero.Debug().Str("file", p2).Msg("added") } ylogger.Zero.Debug().Msg("read 3") @@ -531,7 +558,7 @@ func getDatabase(port int) (DB, error) { } defer conn.Close() //error ylogger.Zero.Debug().Msg("connected to db") - rows, err := conn.Query(`SELECT datname, dattablespace, oid FROM pg_database WHERE datallowconn;`) + rows, err := conn.Query(`SELECT dattablespace, oid, datname FROM pg_database WHERE datallowconn;`) if err != nil { return DB{}, err } @@ -541,10 +568,11 @@ func getDatabase(port int) (DB, error) { for rows.Next() { row := DB{} ylogger.Zero.Debug().Msg("cycle 1") - if err := rows.Scan(&row.name, &row.tablespace, &row.oid); err != nil { + if err := rows.Scan(&row.tablespace, &row.oid, &row.name); err != nil { return DB{}, err } ylogger.Zero.Debug().Msg("cycle 2") + ylogger.Zero.Debug().Str("db", row.name).Int("db", int(row.oid)).Int("db", int(row.tablespace)).Msg("database") if row.name == "postgres" { continue } @@ -572,26 +600,27 @@ func getDatabase(port int) (DB, error) { } ylogger.Zero.Debug().Bool("result", ans).Msg("find yezzey schema") if ans { - ylogger.Zero.Debug().Str("db: %s", row.name).Msg("found yezzey schema in database") + ylogger.Zero.Debug().Str("db", row.name).Msg("found yezzey schema in database") + ylogger.Zero.Debug().Int("db", int(row.oid)).Int("db", int(row.tablespace)).Msg("found yezzey schema in database") return row, nil } - ylogger.Zero.Debug().Str("db: %s", row.name).Msg("no yezzey schema in database") + ylogger.Zero.Debug().Str("db", row.name).Msg("no yezzey schema in database") } return DB{}, fmt.Errorf("no yezzey schema across databases") } type Ei struct { - reloid string - relfileoid string + reloid pgtype.OID + relfileoid pgtype.OID expireLsn string fqnmd5 string } type DB struct { name string - tablespace int - oid int + tablespace pgtype.OID + oid pgtype.OID } type BackupLSN struct { diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index e0ffd39..a4fed5a 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -180,21 +180,24 @@ func (s *S3StorageInteractor) MoveObject(from string, to string) error { ylogger.Zero.Err(err).Msg("failed to acquire s3 session") return err } + ylogger.Zero.Debug().Msg("aquired session") - fromPath := path.Join(s.cnf.StoragePrefix, from) + fromPath := from toPath := path.Join(s.cnf.StoragePrefix, to) + ylogger.Zero.Debug().Str("to", toPath).Msg("to path") input := s3.CopyObjectInput{ Bucket: &s.cnf.StorageBucket, - CopySource: aws.String(fromPath), + CopySource: aws.String(s.cnf.StorageBucket + "/" + fromPath), Key: aws.String(toPath), } - _, err = sess.CopyObject(&input) + out, err := sess.CopyObject(&input) if err != nil { ylogger.Zero.Err(err).Msg("failed to copy object") return err } + ylogger.Zero.Debug().Str("", out.GoString()).Msg("copied object") input2 := s3.DeleteObjectInput{ Bucket: &s.cnf.StorageBucket, @@ -205,5 +208,6 @@ func (s *S3StorageInteractor) MoveObject(from string, to string) error { if err != nil { ylogger.Zero.Err(err).Msg("failed to delete old object") } + ylogger.Zero.Debug().Msg("deleted object") return err } From 804de055aeda20c8299599d93ac8554b47736775 Mon Sep 17 00:00:00 2001 From: debebantur Date: Fri, 2 Aug 2024 11:56:53 +0000 Subject: [PATCH 4/5] added tests --- Makefile | 3 + pkg/backups/backups.go | 67 ++++++ pkg/database/database.go | 177 ++++++++++++++++ pkg/mock/backups.go | 49 +++++ pkg/mock/database.go | 50 +++++ pkg/mock/storage.go | 315 +++++++++++++++++++++++++++++ pkg/proc/delete_handler.go | 123 +++++++++++ pkg/proc/delete_handler_test.go | 101 +++++++++ pkg/proc/interaction.go | 289 +------------------------- pkg/storage/storage.go | 38 +++- pkg/storage/storage_interractor.go | 30 +++ 11 files changed, 955 insertions(+), 287 deletions(-) create mode 100644 pkg/backups/backups.go create mode 100644 pkg/database/database.go create mode 100644 pkg/mock/backups.go create mode 100644 pkg/mock/database.go create mode 100644 pkg/mock/storage.go create mode 100644 pkg/proc/delete_handler.go create mode 100644 pkg/proc/delete_handler_test.go create mode 100644 pkg/storage/storage_interractor.go diff --git a/Makefile b/Makefile index 689fa0c..8644c32 100644 --- a/Makefile +++ b/Makefile @@ -17,6 +17,9 @@ unittest: mockgen: mockgen -source=pkg/proc/yrreader.go -destination=pkg/mock/proc/yrreader.go -package=mock + mockgen -source=pkg/database/database.go -destination=pkg/mock/database.go -package=mock + mockgen -source=pkg/backups/backups.go -destination=pkg/mock/backups.go -package=mock + mockgen -source=pkg/storage/storage_interractor.go -destination=pkg/mock/storage.go -package=mock version = $(shell git describe --tags --abbrev=0) package: diff --git a/pkg/backups/backups.go b/pkg/backups/backups.go new file mode 100644 index 0000000..27bf1c6 --- /dev/null +++ b/pkg/backups/backups.go @@ -0,0 +1,67 @@ +package backups + +import ( + "bytes" + "encoding/json" + "fmt" + "os/exec" + "strings" + + "github.com/yezzey-gp/yproxy/pkg/ylogger" +) + +type BackupLSN struct { + Lsn uint64 `json:"LSN"` +} + +//go:generate mockgen -destination=pkg/mock/backups.go -package=mock +type BackupInterractor interface { + GetFirstLSN(int) (uint64, error) +} + +type WalgBackupInterractor struct { +} + +// get lsn of the oldest backup +func (b *WalgBackupInterractor) GetFirstLSN(seg int) (uint64, error) { + cmd := exec.Command("/usr/bin/wal-g", "st", "ls", fmt.Sprintf("segments_005/seg%d/basebackups_005/", seg), "--config=/etc/wal-g/wal-g.yaml") + ylogger.Zero.Debug().Any("flags", cmd.Args).Msg("Command args") + var out bytes.Buffer + cmd.Stdout = &out + + err := cmd.Run() + if err != nil { + ylogger.Zero.Debug().AnErr("error", err).Msg("Failed to run st ls") + return 0, err + } + p1 := strings.Split(out.String(), "\n") + + minLSN := BackupLSN{Lsn: ^uint64(0)} + for _, line := range p1 { + if !strings.Contains(line, ".json") { + continue + } + p2 := strings.Split(line, " ") + p3 := p2[len(p2)-1] + + ylogger.Zero.Debug().Str("file: %s", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, p3)).Msg("check lsn in file") + cmd2 := exec.Command("/usr/bin/wal-g", "st", "cat", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, p3), "--config=/etc/wal-g/wal-g.yaml") + + var out2 bytes.Buffer + cmd2.Stdout = &out2 + + err = cmd2.Run() + if err != nil { + ylogger.Zero.Debug().AnErr("error", err).Msg("Failed to run st cat") + return 0, err + } + lsn := BackupLSN{} + err = json.Unmarshal(out2.Bytes(), &lsn) + + if lsn.Lsn < minLSN.Lsn { + minLSN.Lsn = lsn.Lsn + } + } + + return minLSN.Lsn, err +} diff --git a/pkg/database/database.go b/pkg/database/database.go new file mode 100644 index 0000000..a57a031 --- /dev/null +++ b/pkg/database/database.go @@ -0,0 +1,177 @@ +package database + +import ( + "fmt" + "strings" + + "github.com/jackc/pgx" + "github.com/jackc/pgx/pgtype" + "github.com/pkg/errors" + "github.com/yezzey-gp/yproxy/pkg/ylogger" +) + +//go:generate mockgen -destination=../mock/mock_database_interractor.go -package mock +type DatabaseInterractor interface { + GetVirtualExpireIndexes(int) (map[string]bool, map[string]uint64, error) +} + +type DatabaseHandler struct { +} + +type DB struct { + name string + tablespace pgtype.OID + oid pgtype.OID +} + +type Ei struct { + reloid pgtype.OID + relfileoid pgtype.OID + expireLsn string + fqnmd5 string +} + +func (database *DatabaseHandler) GetVirtualExpireIndexes(port int) (map[string]bool, map[string]uint64, error) { //TODO несколько баз + db, err := getDatabase(port) + if err != nil { + return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix + } + ylogger.Zero.Debug().Str("database name", db.name).Msg("recieved database") + conn, err := connectToDatabase(port, db.name) + if err != nil { + return nil, nil, err + } + defer conn.Close() //error + ylogger.Zero.Debug().Msg("connected to database") + + rows, err := conn.Query(`SELECT reloid, relfileoid, expire_lsn, fqnmd5 FROM yezzey.yezzey_expire_index WHERE expire_lsn != '0/0';`) + if err != nil { + return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix + } + defer rows.Close() + ylogger.Zero.Debug().Msg("executed select") + + c := make(map[string]uint64, 0) + for rows.Next() { + row := Ei{} + if err := rows.Scan(&row.reloid, &row.relfileoid, &row.expireLsn, &row.fqnmd5); err != nil { + return nil, nil, fmt.Errorf("unable to parse query output %v", err) + } + + lsn, err := pgx.ParseLSN(row.expireLsn) + if err != nil { + return nil, nil, fmt.Errorf("unable to parse query output %v", err) + } + + ylogger.Zero.Debug().Str("file", fmt.Sprintf("%d_%d_%s_%d_", db.tablespace, db.oid, row.fqnmd5, row.relfileoid)).Msg("added file to ei") + c[fmt.Sprintf("%d_%d_%s_%d_", db.tablespace, db.oid, row.fqnmd5, row.relfileoid)] = lsn + } + ylogger.Zero.Debug().Msg("read 1") + + rows2, err := conn.Query(`SELECT x_path FROM yezzey.yezzey_virtual_index;`) + if err != nil { + return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix + } + defer rows2.Close() + ylogger.Zero.Debug().Msg("read 2") + + c2 := make(map[string]bool, 0) + for rows2.Next() { + xpath := "" + if err := rows2.Scan(&xpath); err != nil { + return nil, nil, fmt.Errorf("unable to parse query output %v", err) + } + p1 := strings.Split(xpath, "/") + p2 := p1[len(p1)-1] + p3 := strings.Split(p2, "_") + if len(p3) >= 4 { + p2 = fmt.Sprintf("%s_%s_%s_%s_", p3[0], p3[1], p3[2], p3[3]) + } + c2[p2] = true + ylogger.Zero.Debug().Str("file", p2).Msg("added") + } + ylogger.Zero.Debug().Msg("read 3") + + return c2, c, err +} + +func getDatabase(port int) (DB, error) { + conn, err := connectToDatabase(port, "postgres") + if err != nil { + return DB{}, err + } + defer conn.Close() //error + ylogger.Zero.Debug().Msg("connected to db") + rows, err := conn.Query(`SELECT dattablespace, oid, datname FROM pg_database WHERE datallowconn;`) + if err != nil { + return DB{}, err + } + defer rows.Close() + ylogger.Zero.Debug().Msg("recieved db list") + + for rows.Next() { + row := DB{} + ylogger.Zero.Debug().Msg("cycle 1") + if err := rows.Scan(&row.tablespace, &row.oid, &row.name); err != nil { + return DB{}, err + } + ylogger.Zero.Debug().Msg("cycle 2") + ylogger.Zero.Debug().Str("db", row.name).Int("db", int(row.oid)).Int("db", int(row.tablespace)).Msg("database") + if row.name == "postgres" { + continue + } + + ylogger.Zero.Debug().Str("db", row.name).Msg("check database") + connDb, err := connectToDatabase(port, row.name) + if err != nil { + return DB{}, err + } + defer connDb.Close() //error + ylogger.Zero.Debug().Msg("cycle 3") + + rowsdb, err := connDb.Query(`SELECT exists(SELECT * FROM information_schema.schemata WHERE schema_name='yezzey');`) + if err != nil { + return DB{}, err + } + defer rowsdb.Close() + ylogger.Zero.Debug().Msg("cycle 4") + var ans bool + rowsdb.Next() + err = rowsdb.Scan(&ans) + if err != nil { + ylogger.Zero.Error().AnErr("error", err).Msg("error during yezzey check") + return DB{}, err + } + ylogger.Zero.Debug().Bool("result", ans).Msg("find yezzey schema") + if ans { + ylogger.Zero.Debug().Str("db", row.name).Msg("found yezzey schema in database") + ylogger.Zero.Debug().Int("db", int(row.oid)).Int("db", int(row.tablespace)).Msg("found yezzey schema in database") + return row, nil + } + + ylogger.Zero.Debug().Str("db", row.name).Msg("no yezzey schema in database") + } + return DB{}, fmt.Errorf("no yezzey schema across databases") +} + +func connectToDatabase(port int, database string) (*pgx.Conn, error) { + config, err := pgx.ParseEnvLibpq() + if err != nil { + return nil, errors.Wrap(err, "Connect: unable to read environment variables") + } + + config.Port = uint16(port) + config.Database = database + + config.RuntimeParams["gp_role"] = "utility" + conn, err := pgx.Connect(config) + if err != nil { + config.RuntimeParams["gp_session_role"] = "utility" + conn, err = pgx.Connect(config) + if err != nil { + fmt.Printf("error in connection %v", err) // delete this + return nil, err + } + } + return conn, nil +} diff --git a/pkg/mock/backups.go b/pkg/mock/backups.go new file mode 100644 index 0000000..530bb1b --- /dev/null +++ b/pkg/mock/backups.go @@ -0,0 +1,49 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: pkg/backups/backups.go + +// Package mock is a generated GoMock package. +package mock + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockBackupInterractor is a mock of BackupInterractor interface. +type MockBackupInterractor struct { + ctrl *gomock.Controller + recorder *MockBackupInterractorMockRecorder +} + +// MockBackupInterractorMockRecorder is the mock recorder for MockBackupInterractor. +type MockBackupInterractorMockRecorder struct { + mock *MockBackupInterractor +} + +// NewMockBackupInterractor creates a new mock instance. +func NewMockBackupInterractor(ctrl *gomock.Controller) *MockBackupInterractor { + mock := &MockBackupInterractor{ctrl: ctrl} + mock.recorder = &MockBackupInterractorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockBackupInterractor) EXPECT() *MockBackupInterractorMockRecorder { + return m.recorder +} + +// GetFirstLSN mocks base method. +func (m *MockBackupInterractor) GetFirstLSN(arg0 int) (uint64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetFirstLSN", arg0) + ret0, _ := ret[0].(uint64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetFirstLSN indicates an expected call of GetFirstLSN. +func (mr *MockBackupInterractorMockRecorder) GetFirstLSN(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetFirstLSN", reflect.TypeOf((*MockBackupInterractor)(nil).GetFirstLSN), arg0) +} diff --git a/pkg/mock/database.go b/pkg/mock/database.go new file mode 100644 index 0000000..d04c573 --- /dev/null +++ b/pkg/mock/database.go @@ -0,0 +1,50 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: pkg/database/database.go + +// Package mock is a generated GoMock package. +package mock + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockDatabaseInterractor is a mock of DatabaseInterractor interface. +type MockDatabaseInterractor struct { + ctrl *gomock.Controller + recorder *MockDatabaseInterractorMockRecorder +} + +// MockDatabaseInterractorMockRecorder is the mock recorder for MockDatabaseInterractor. +type MockDatabaseInterractorMockRecorder struct { + mock *MockDatabaseInterractor +} + +// NewMockDatabaseInterractor creates a new mock instance. +func NewMockDatabaseInterractor(ctrl *gomock.Controller) *MockDatabaseInterractor { + mock := &MockDatabaseInterractor{ctrl: ctrl} + mock.recorder = &MockDatabaseInterractorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockDatabaseInterractor) EXPECT() *MockDatabaseInterractorMockRecorder { + return m.recorder +} + +// GetVirtualExpireIndexes mocks base method. +func (m *MockDatabaseInterractor) GetVirtualExpireIndexes(arg0 int) (map[string]bool, map[string]uint64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetVirtualExpireIndexes", arg0) + ret0, _ := ret[0].(map[string]bool) + ret1, _ := ret[1].(map[string]uint64) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// GetVirtualExpireIndexes indicates an expected call of GetVirtualExpireIndexes. +func (mr *MockDatabaseInterractorMockRecorder) GetVirtualExpireIndexes(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetVirtualExpireIndexes", reflect.TypeOf((*MockDatabaseInterractor)(nil).GetVirtualExpireIndexes), arg0) +} diff --git a/pkg/mock/storage.go b/pkg/mock/storage.go new file mode 100644 index 0000000..e0bc880 --- /dev/null +++ b/pkg/mock/storage.go @@ -0,0 +1,315 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: pkg/storage/storage_interractor.go + +// Package mock is a generated GoMock package. +package mock + +import ( + io "io" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + storage "github.com/yezzey-gp/yproxy/pkg/storage" +) + +// MockStorageInteractor is a mock of StorageInteractor interface. +type MockStorageInteractor struct { + ctrl *gomock.Controller + recorder *MockStorageInteractorMockRecorder +} + +// MockStorageInteractorMockRecorder is the mock recorder for MockStorageInteractor. +type MockStorageInteractorMockRecorder struct { + mock *MockStorageInteractor +} + +// NewMockStorageInteractor creates a new mock instance. +func NewMockStorageInteractor(ctrl *gomock.Controller) *MockStorageInteractor { + mock := &MockStorageInteractor{ctrl: ctrl} + mock.recorder = &MockStorageInteractorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockStorageInteractor) EXPECT() *MockStorageInteractorMockRecorder { + return m.recorder +} + +// CatFileFromStorage mocks base method. +func (m *MockStorageInteractor) CatFileFromStorage(name string, offset int64) (io.ReadCloser, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CatFileFromStorage", name, offset) + ret0, _ := ret[0].(io.ReadCloser) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CatFileFromStorage indicates an expected call of CatFileFromStorage. +func (mr *MockStorageInteractorMockRecorder) CatFileFromStorage(name, offset interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CatFileFromStorage", reflect.TypeOf((*MockStorageInteractor)(nil).CatFileFromStorage), name, offset) +} + +// DeleteObject mocks base method. +func (m *MockStorageInteractor) DeleteObject(key string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteObject", key) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteObject indicates an expected call of DeleteObject. +func (mr *MockStorageInteractorMockRecorder) DeleteObject(key interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObject", reflect.TypeOf((*MockStorageInteractor)(nil).DeleteObject), key) +} + +// ListPath mocks base method. +func (m *MockStorageInteractor) ListPath(name string) ([]*storage.S3ObjectMeta, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListPath", name) + ret0, _ := ret[0].([]*storage.S3ObjectMeta) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListPath indicates an expected call of ListPath. +func (mr *MockStorageInteractorMockRecorder) ListPath(name interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListPath", reflect.TypeOf((*MockStorageInteractor)(nil).ListPath), name) +} + +// MoveObject mocks base method. +func (m *MockStorageInteractor) MoveObject(from, to string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MoveObject", from, to) + ret0, _ := ret[0].(error) + return ret0 +} + +// MoveObject indicates an expected call of MoveObject. +func (mr *MockStorageInteractorMockRecorder) MoveObject(from, to interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MoveObject", reflect.TypeOf((*MockStorageInteractor)(nil).MoveObject), from, to) +} + +// PatchFile mocks base method. +func (m *MockStorageInteractor) PatchFile(name string, r io.ReadSeeker, startOffste int64) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PatchFile", name, r, startOffste) + ret0, _ := ret[0].(error) + return ret0 +} + +// PatchFile indicates an expected call of PatchFile. +func (mr *MockStorageInteractorMockRecorder) PatchFile(name, r, startOffste interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PatchFile", reflect.TypeOf((*MockStorageInteractor)(nil).PatchFile), name, r, startOffste) +} + +// PutFileToDest mocks base method. +func (m *MockStorageInteractor) PutFileToDest(name string, r io.Reader) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PutFileToDest", name, r) + ret0, _ := ret[0].(error) + return ret0 +} + +// PutFileToDest indicates an expected call of PutFileToDest. +func (mr *MockStorageInteractorMockRecorder) PutFileToDest(name, r interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutFileToDest", reflect.TypeOf((*MockStorageInteractor)(nil).PutFileToDest), name, r) +} + +// MockStorageReader is a mock of StorageReader interface. +type MockStorageReader struct { + ctrl *gomock.Controller + recorder *MockStorageReaderMockRecorder +} + +// MockStorageReaderMockRecorder is the mock recorder for MockStorageReader. +type MockStorageReaderMockRecorder struct { + mock *MockStorageReader +} + +// NewMockStorageReader creates a new mock instance. +func NewMockStorageReader(ctrl *gomock.Controller) *MockStorageReader { + mock := &MockStorageReader{ctrl: ctrl} + mock.recorder = &MockStorageReaderMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockStorageReader) EXPECT() *MockStorageReaderMockRecorder { + return m.recorder +} + +// CatFileFromStorage mocks base method. +func (m *MockStorageReader) CatFileFromStorage(name string, offset int64) (io.ReadCloser, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CatFileFromStorage", name, offset) + ret0, _ := ret[0].(io.ReadCloser) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CatFileFromStorage indicates an expected call of CatFileFromStorage. +func (mr *MockStorageReaderMockRecorder) CatFileFromStorage(name, offset interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CatFileFromStorage", reflect.TypeOf((*MockStorageReader)(nil).CatFileFromStorage), name, offset) +} + +// ListPath mocks base method. +func (m *MockStorageReader) ListPath(name string) ([]*storage.S3ObjectMeta, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListPath", name) + ret0, _ := ret[0].([]*storage.S3ObjectMeta) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListPath indicates an expected call of ListPath. +func (mr *MockStorageReaderMockRecorder) ListPath(name interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListPath", reflect.TypeOf((*MockStorageReader)(nil).ListPath), name) +} + +// MockStorageWriter is a mock of StorageWriter interface. +type MockStorageWriter struct { + ctrl *gomock.Controller + recorder *MockStorageWriterMockRecorder +} + +// MockStorageWriterMockRecorder is the mock recorder for MockStorageWriter. +type MockStorageWriterMockRecorder struct { + mock *MockStorageWriter +} + +// NewMockStorageWriter creates a new mock instance. +func NewMockStorageWriter(ctrl *gomock.Controller) *MockStorageWriter { + mock := &MockStorageWriter{ctrl: ctrl} + mock.recorder = &MockStorageWriterMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockStorageWriter) EXPECT() *MockStorageWriterMockRecorder { + return m.recorder +} + +// PatchFile mocks base method. +func (m *MockStorageWriter) PatchFile(name string, r io.ReadSeeker, startOffste int64) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PatchFile", name, r, startOffste) + ret0, _ := ret[0].(error) + return ret0 +} + +// PatchFile indicates an expected call of PatchFile. +func (mr *MockStorageWriterMockRecorder) PatchFile(name, r, startOffste interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PatchFile", reflect.TypeOf((*MockStorageWriter)(nil).PatchFile), name, r, startOffste) +} + +// PutFileToDest mocks base method. +func (m *MockStorageWriter) PutFileToDest(name string, r io.Reader) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PutFileToDest", name, r) + ret0, _ := ret[0].(error) + return ret0 +} + +// PutFileToDest indicates an expected call of PutFileToDest. +func (mr *MockStorageWriterMockRecorder) PutFileToDest(name, r interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutFileToDest", reflect.TypeOf((*MockStorageWriter)(nil).PutFileToDest), name, r) +} + +// MockStorageLister is a mock of StorageLister interface. +type MockStorageLister struct { + ctrl *gomock.Controller + recorder *MockStorageListerMockRecorder +} + +// MockStorageListerMockRecorder is the mock recorder for MockStorageLister. +type MockStorageListerMockRecorder struct { + mock *MockStorageLister +} + +// NewMockStorageLister creates a new mock instance. +func NewMockStorageLister(ctrl *gomock.Controller) *MockStorageLister { + mock := &MockStorageLister{ctrl: ctrl} + mock.recorder = &MockStorageListerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockStorageLister) EXPECT() *MockStorageListerMockRecorder { + return m.recorder +} + +// ListPath mocks base method. +func (m *MockStorageLister) ListPath(prefix string) ([]*storage.S3ObjectMeta, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListPath", prefix) + ret0, _ := ret[0].([]*storage.S3ObjectMeta) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListPath indicates an expected call of ListPath. +func (mr *MockStorageListerMockRecorder) ListPath(prefix interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListPath", reflect.TypeOf((*MockStorageLister)(nil).ListPath), prefix) +} + +// MockStorageMover is a mock of StorageMover interface. +type MockStorageMover struct { + ctrl *gomock.Controller + recorder *MockStorageMoverMockRecorder +} + +// MockStorageMoverMockRecorder is the mock recorder for MockStorageMover. +type MockStorageMoverMockRecorder struct { + mock *MockStorageMover +} + +// NewMockStorageMover creates a new mock instance. +func NewMockStorageMover(ctrl *gomock.Controller) *MockStorageMover { + mock := &MockStorageMover{ctrl: ctrl} + mock.recorder = &MockStorageMoverMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockStorageMover) EXPECT() *MockStorageMoverMockRecorder { + return m.recorder +} + +// DeleteObject mocks base method. +func (m *MockStorageMover) DeleteObject(key string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteObject", key) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteObject indicates an expected call of DeleteObject. +func (mr *MockStorageMoverMockRecorder) DeleteObject(key interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObject", reflect.TypeOf((*MockStorageMover)(nil).DeleteObject), key) +} + +// MoveObject mocks base method. +func (m *MockStorageMover) MoveObject(from, to string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MoveObject", from, to) + ret0, _ := ret[0].(error) + return ret0 +} + +// MoveObject indicates an expected call of MoveObject. +func (mr *MockStorageMoverMockRecorder) MoveObject(from, to interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MoveObject", reflect.TypeOf((*MockStorageMover)(nil).MoveObject), from, to) +} diff --git a/pkg/proc/delete_handler.go b/pkg/proc/delete_handler.go new file mode 100644 index 0000000..cb0174e --- /dev/null +++ b/pkg/proc/delete_handler.go @@ -0,0 +1,123 @@ +package proc + +import ( + "fmt" + "strings" + + "github.com/pkg/errors" + "github.com/yezzey-gp/yproxy/pkg/backups" + "github.com/yezzey-gp/yproxy/pkg/database" + "github.com/yezzey-gp/yproxy/pkg/message" + "github.com/yezzey-gp/yproxy/pkg/storage" + "github.com/yezzey-gp/yproxy/pkg/ylogger" +) + +//go:generate mockgen -destination=../../../test/mocks/mock_object.go -package mocks -build_flags -mod=readonly github.com/wal-g/wal-g/pkg/storages/storage Object +type DeleteHandler interface { + HandleDeleteGarbage(message.DeleteMessage) error + HandleDeleteFile(message.DeleteMessage) error +} + +type BasicDeleteHandler struct { + BackupInterractor backups.BackupInterractor + DbInterractor database.DatabaseInterractor + StorageInterractor storage.StorageInteractor +} + +func (dh *BasicDeleteHandler) HandleDeleteGarbage(msg message.DeleteMessage) error { + fileList, err := dh.ListGarbageFiles(msg) + if err != nil { + return errors.Wrap(err, "failed to delete file") + } + + if !msg.Confirm { //do not delete files if no confirmation flag provided + return nil + } + + var failed []string + retryCount := 0 + for len(fileList) > 0 && retryCount < 10 { + retryCount++ + for i := 0; i < len(fileList); i++ { + filePathParts := strings.Split(fileList[i], "/") + err = dh.StorageInterractor.MoveObject(fileList[i], fmt.Sprintf("segments_005/seg%d/basebackups_005/yezzey/trash/%s", msg.Segnum, filePathParts[len(filePathParts)-1])) + if err != nil { + ylogger.Zero.Warn().AnErr("err", err).Str("file", fileList[i]).Msg("failed to move file") + failed = append(failed, fileList[i]) + } + } + fileList = failed + failed = make([]string, 0) + } + + if len(fileList) > 0 { + ylogger.Zero.Error().Int("failed files count", len(fileList)).Msg("some files were not moved") + ylogger.Zero.Error().Any("failed files", fileList).Msg("failed to move some files") + return errors.Wrap(err, "failed to move some files") + } + + return nil +} + +func (dh *BasicDeleteHandler) HandleDeleteFile(msg message.DeleteMessage) error { + err := dh.StorageInterractor.DeleteObject(msg.Name) + if err != nil { + ylogger.Zero.Error().AnErr("err", err).Msg("failed to delete file") + return errors.Wrap(err, "failed to delete file") + } + return nil +} + +func (dh *BasicDeleteHandler) ListGarbageFiles(msg message.DeleteMessage) ([]string, error) { + //get firsr backup lsn + firstBackupLSN, err := dh.BackupInterractor.GetFirstLSN(msg.Segnum) + if err != nil { + ylogger.Zero.Error().AnErr("err", err).Msg("failed to get first lsn") //return or just assume there are no backups? + } + ylogger.Zero.Info().Uint64("lsn", firstBackupLSN).Msg("first backup LSN") + + //list files in storage + ylogger.Zero.Info().Str("path", msg.Name).Msg("going to list path") + objectMetas, err := dh.StorageInterractor.ListPath(msg.Name) + if err != nil { + return nil, errors.Wrap(err, "could not list objects") + } + ylogger.Zero.Info().Int("amount", len(objectMetas)).Msg("objects count") + + vi, ei, err := dh.DbInterractor.GetVirtualExpireIndexes(msg.Port) + if err != nil { + ylogger.Zero.Error().AnErr("err", err).Msg("failed to get indexes") + return nil, errors.Wrap(err, "could not get virtual and expire indexes") + } + ylogger.Zero.Info().Msg("recieved virtual index and expire index") + ylogger.Zero.Debug().Int("virtual", len(vi)).Msg("vi count") + ylogger.Zero.Debug().Int("expire", len(ei)).Msg("ei count") + + filesToDelete := make([]string, 0) + for i := 0; i < len(objectMetas); i++ { + reworkedName := ReworkFileName(objectMetas[i].Path) + lsn, ok := ei[reworkedName] + ylogger.Zero.Debug().Uint64("lsn", lsn).Uint64("backup lsn", firstBackupLSN).Msg("comparing lsn") + if !vi[reworkedName] && (lsn < firstBackupLSN || !ok) { + ylogger.Zero.Debug().Str("file", objectMetas[i].Path). + Bool("file in expire index", ok). + Bool("lsn is less than in first backup", lsn < firstBackupLSN). + Msg("file will be deleted") + filesToDelete = append(filesToDelete, objectMetas[i].Path) + } + } + + ylogger.Zero.Info().Int("amount", len(filesToDelete)).Msg("files will be deleted") + + return filesToDelete, nil +} + +func ReworkFileName(str string) string { + p1 := strings.Split(str, "/") + p2 := p1[len(p1)-1] + p3 := strings.Split(p2, "_") + if len(p3) >= 4 { + p2 = fmt.Sprintf("%s_%s_%s_%s_", p3[0], p3[1], p3[2], p3[3]) + } + return p2 +} diff --git a/pkg/proc/delete_handler_test.go b/pkg/proc/delete_handler_test.go new file mode 100644 index 0000000..895faff --- /dev/null +++ b/pkg/proc/delete_handler_test.go @@ -0,0 +1,101 @@ +package proc_test + +import ( + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/yezzey-gp/yproxy/pkg/message" + mock "github.com/yezzey-gp/yproxy/pkg/mock" + "github.com/yezzey-gp/yproxy/pkg/proc" + "github.com/yezzey-gp/yproxy/pkg/storage" +) + +func TestReworkingName(t *testing.T) { + type TestCase struct { + input string + expected string + } + + testCases := []TestCase{ + { + input: "/segments_005/seg1/basebackups_005/yezzey/1663_16530_a4c5ad8305b83f07200b020694c36563_17660_1__DY_1_xlog_19649822496", + expected: "1663_16530_a4c5ad8305b83f07200b020694c36563_17660_", + }, + { + input: "1663_16530_a4c5ad8305b83f07200b020694c36563_17660_1__DY_1_xlog_19649822496", + expected: "1663_16530_a4c5ad8305b83f07200b020694c36563_17660_", + }, + { + input: "seg1/basebackups_005/yezzey/1663_16530_a4c5ad8305b83f07200b020694c36563_17660_1__DY_1_xlog_19649822496", + expected: "1663_16530_a4c5ad8305b83f07200b020694c36563_17660_", + }, + { + input: "1663_16530_a4c5ad8305b83f07200b020694c36563", + expected: "1663_16530_a4c5ad8305b83f07200b020694c36563", + }, + { + input: "1663___a4c5ad8305b83f07200b020694c36563___", + expected: "1663___a4c5ad8305b83f07200b020694c36563_", + }, + { + input: "file", + expected: "file", + }, + } + + for _, testCase := range testCases { + ans := proc.ReworkFileName(testCase.input) + assert.Equal(t, testCase.expected, ans) + } +} + +func TestFilesToDeletion(t *testing.T) { + ctrl := gomock.NewController(t) + + msg := message.DeleteMessage{ + Name: "path", + Port: 6000, + Segnum: 0, + Confirm: false, + } + + filesInStorage := []*storage.S3ObjectMeta{ + {Path: "1663_16530_not-deleted_18002_"}, + {Path: "1663_16530_deleted-after-backup_18002_"}, + {Path: "1663_16530_deleted-when-backup-start_18002_"}, + {Path: "1663_16530_deleted-before-backup_18002_"}, + {Path: "some_trash"}, + } + storage := mock.NewMockStorageInteractor(ctrl) + storage.EXPECT().ListPath(msg.Name).Return(filesInStorage, nil) + + backup := mock.NewMockBackupInterractor(ctrl) + backup.EXPECT().GetFirstLSN(msg.Segnum).Return(uint64(1337), nil) + + vi := map[string]bool{ + "1663_16530_not-deleted_18002_": true, + "1663_16530_deleted-after-backup_18002_": true, + "1663_16530_deleted-when-backup-start_18002_": true, + } + ei := map[string]uint64{ + "1663_16530_deleted-after-backup_18002_": uint64(1400), + "1663_16530_deleted-when-backup-start_18002_": uint64(1337), + "1663_16530_deleted-before-backup_18002_": uint64(1300), + } + database := mock.NewMockDatabaseInterractor(ctrl) + database.EXPECT().GetVirtualExpireIndexes(msg.Port).Return(vi, ei, nil) + + handler := proc.BasicDeleteHandler{ + StorageInterractor: storage, + DbInterractor: database, + BackupInterractor: backup, + } + + list, err := handler.ListGarbageFiles(msg) + + assert.NoError(t, err) + assert.Equal(t, 2, len(list)) + assert.Equal(t, "1663_16530_deleted-before-backup_18002_", list[0]) + assert.Equal(t, "some_trash", list[1]) +} diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index 1a69b84..f357f1a 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -1,20 +1,16 @@ package proc import ( - "bytes" - "encoding/json" "fmt" "io" - "os/exec" "strings" "sync" - "github.com/jackc/pgx" - "github.com/jackc/pgx/pgtype" - "github.com/pkg/errors" "github.com/yezzey-gp/yproxy/config" + "github.com/yezzey-gp/yproxy/pkg/backups" "github.com/yezzey-gp/yproxy/pkg/client" "github.com/yezzey-gp/yproxy/pkg/crypt" + "github.com/yezzey-gp/yproxy/pkg/database" "github.com/yezzey-gp/yproxy/pkg/message" "github.com/yezzey-gp/yproxy/pkg/storage" "github.com/yezzey-gp/yproxy/pkg/ylogger" @@ -329,75 +325,19 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl ycl.SetExternalFilePath(msg.Name) - //get firsr backup lsn - firstBackupLSN, err := getFirstLSN(msg.Segnum) - if err != nil { - ylogger.Zero.Error().AnErr("err", err).Msg("failed to get first lsn") //return or just assume there are no backups? - } - ylogger.Zero.Info().Uint64("lsn", firstBackupLSN).Msg("first backup LSN") + dbInterractor := &database.DatabaseHandler{} + backupHandler := &backups.WalgBackupInterractor{} - //list files in storage - ylogger.Zero.Info().Str("path", msg.Name).Msg("going to list path") - objectMetas, err := s.ListPath(msg.Name) - if err != nil { - _ = ycl.ReplyError(fmt.Errorf("could not list objects: %s", err), "failed to complete request") - return nil + var dh DeleteHandler + dh = &BasicDeleteHandler{ + StorageInterractor: s, + DbInterractor: dbInterractor, + BackupInterractor: backupHandler, } - ylogger.Zero.Info().Int("amount", len(objectMetas)).Msg("objects count") - vi, ei, err := getVirtualExpireIndexes(msg.Port) + err = dh.HandleDeleteGarbage(msg) if err != nil { - ylogger.Zero.Error().AnErr("err", err).Msg("failed to get indexes") - _ = ycl.ReplyError(fmt.Errorf("could not get virtual and expire indexes: %s", err), "failed to compelete request") - return nil - } - ylogger.Zero.Info().Msg("recieved virtual index and expire index") - ylogger.Zero.Debug().Int("virtual", len(vi)).Msg("vi count") - ylogger.Zero.Debug().Int("expire", len(ei)).Msg("ei count") - - var failed []*storage.S3ObjectMeta - retryCount := 0 - deletedFilesCount := 0 - for len(objectMetas) > 0 && retryCount < 10 { - retryCount++ - for i := 0; i < len(objectMetas); i++ { - p1 := strings.Split(objectMetas[i].Path, "/") //TODO fix this - p2 := p1[len(p1)-1] - p3 := strings.Split(p2, "_") - if len(p3) >= 4 { - p2 = fmt.Sprintf("%s_%s_%s_%s_", p3[0], p3[1], p3[2], p3[3]) - } - lsn, ok := ei[p2] - ylogger.Zero.Debug().Uint64("lsn", lsn).Uint64("backup lsn", firstBackupLSN).Msg("comparing lsn") - if !vi[p2] && (lsn < firstBackupLSN || !ok) { - ylogger.Zero.Debug().Str("file", objectMetas[i].Path). - Bool("file in expire indexi", ok). - Bool("lsn is less than in first backup", lsn < firstBackupLSN). - Msg("file will be deleted") - deletedFilesCount++ - if !msg.Confirm { //do not delete files if no confirmation flag provided - continue - } - filePathParts := strings.Split(objectMetas[i].Path, "/") - err = s.MoveObject(objectMetas[i].Path, fmt.Sprintf("segments_005/seg%d/basebackups_005/yezzey/trash/%s", msg.Segnum, filePathParts[len(filePathParts)-1])) - if err != nil { - deletedFilesCount-- - ylogger.Zero.Warn().AnErr("err", err).Str("file", objectMetas[i].Path).Msg("failed to move file") - failed = append(failed, objectMetas[i]) - } - } - } - objectMetas = failed - failed = make([]*storage.S3ObjectMeta, 0) - } - - ylogger.Zero.Info().Int("amount", deletedFilesCount).Msg("deleted files count") - - if len(objectMetas) > 0 { - ylogger.Zero.Error().Int("failed files count", len(objectMetas)).Msg("some files were not moved") - ylogger.Zero.Error().Any("failed files", objectMetas).Msg("failed to move some files") - - _ = ycl.ReplyError(err, "failed to move some files") + _ = ycl.ReplyError(err, "failed to finish operation") return nil } @@ -419,210 +359,3 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl return nil } - -// get lsn of the oldest backup -func getFirstLSN(seg int) (uint64, error) { - cmd := exec.Command("/usr/bin/wal-g", "st", "ls", fmt.Sprintf("segments_005/seg%d/basebackups_005/", seg), "--config=/etc/wal-g/wal-g.yaml") - ylogger.Zero.Debug().Any("flags", cmd.Args).Msg("Command args") - var out bytes.Buffer - cmd.Stdout = &out - - err := cmd.Run() - if err != nil { - ylogger.Zero.Debug().AnErr("error", err).Msg("Failed to run st ls") - return 0, err - } - p1 := strings.Split(out.String(), "\n") - - minLSN := BackupLSN{Lsn: ^uint64(0)} - for _, line := range p1 { - if !strings.Contains(line, ".json") { - continue - } - p2 := strings.Split(line, " ") - p3 := p2[len(p2)-1] - - ylogger.Zero.Debug().Str("file: %s", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, p3)).Msg("check lsn in file") - cmd2 := exec.Command("/usr/bin/wal-g", "st", "cat", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, p3), "--config=/etc/wal-g/wal-g.yaml") - - var out2 bytes.Buffer - cmd2.Stdout = &out2 - - err = cmd2.Run() - if err != nil { - ylogger.Zero.Debug().AnErr("error", err).Msg("Failed to run st cat") - return 0, err - } - lsn := BackupLSN{} - err = json.Unmarshal(out2.Bytes(), &lsn) - - if lsn.Lsn < minLSN.Lsn { - minLSN.Lsn = lsn.Lsn - } - - } - - return minLSN.Lsn, err -} - -func connectToDatabase(port int, database string) (*pgx.Conn, error) { - config, err := pgx.ParseEnvLibpq() - if err != nil { - return nil, errors.Wrap(err, "Connect: unable to read environment variables") - } - - config.Port = uint16(port) - config.Database = database - - config.RuntimeParams["gp_role"] = "utility" - conn, err := pgx.Connect(config) - if err != nil { - config.RuntimeParams["gp_session_role"] = "utility" - conn, err = pgx.Connect(config) - if err != nil { - fmt.Printf("error in connection %v", err) // delete this - return nil, err - } - } - return conn, nil -} - -func getVirtualExpireIndexes(port int) (map[string]bool, map[string]uint64, error) { //TODO несколько баз - db, err := getDatabase(port) - if err != nil { - return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix - } - ylogger.Zero.Debug().Str("database name", db.name).Msg("recieved database") - conn, err := connectToDatabase(port, db.name) - if err != nil { - return nil, nil, err - } - defer conn.Close() //error - ylogger.Zero.Debug().Msg("connected to database") - - rows, err := conn.Query(`SELECT reloid, relfileoid, expire_lsn, fqnmd5 FROM yezzey.yezzey_expire_index WHERE expire_lsn != '0/0';`) - if err != nil { - return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix - } - defer rows.Close() - ylogger.Zero.Debug().Msg("executed select") - - c := make(map[string]uint64, 0) - for rows.Next() { - row := Ei{} - if err := rows.Scan(&row.reloid, &row.relfileoid, &row.expireLsn, &row.fqnmd5); err != nil { - return nil, nil, fmt.Errorf("unable to parse query output %v", err) - } - - lsn, err := pgx.ParseLSN(row.expireLsn) - if err != nil { - return nil, nil, fmt.Errorf("unable to parse query output %v", err) - } - - ylogger.Zero.Debug().Str("file", fmt.Sprintf("%d_%d_%s_%d_", db.tablespace, db.oid, row.fqnmd5, row.relfileoid)).Msg("added file to ei") - c[fmt.Sprintf("%d_%d_%s_%d_", db.tablespace, db.oid, row.fqnmd5, row.relfileoid)] = lsn - } - ylogger.Zero.Debug().Msg("read 1") - - rows2, err := conn.Query(`SELECT x_path FROM yezzey.yezzey_virtual_index;`) - if err != nil { - return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix - } - defer rows2.Close() - ylogger.Zero.Debug().Msg("read 2") - - c2 := make(map[string]bool, 0) - for rows2.Next() { - xpath := "" - if err := rows2.Scan(&xpath); err != nil { - return nil, nil, fmt.Errorf("unable to parse query output %v", err) - } - p1 := strings.Split(xpath, "/") - p2 := p1[len(p1)-1] - p3 := strings.Split(p2, "_") - if len(p3) >= 4 { - p2 = fmt.Sprintf("%s_%s_%s_%s_", p3[0], p3[1], p3[2], p3[3]) - } - c2[p2] = true - ylogger.Zero.Debug().Str("file", p2).Msg("added") - } - ylogger.Zero.Debug().Msg("read 3") - - return c2, c, err -} - -func getDatabase(port int) (DB, error) { - conn, err := connectToDatabase(port, "postgres") - if err != nil { - return DB{}, err - } - defer conn.Close() //error - ylogger.Zero.Debug().Msg("connected to db") - rows, err := conn.Query(`SELECT dattablespace, oid, datname FROM pg_database WHERE datallowconn;`) - if err != nil { - return DB{}, err - } - defer rows.Close() - ylogger.Zero.Debug().Msg("recieved db list") - - for rows.Next() { - row := DB{} - ylogger.Zero.Debug().Msg("cycle 1") - if err := rows.Scan(&row.tablespace, &row.oid, &row.name); err != nil { - return DB{}, err - } - ylogger.Zero.Debug().Msg("cycle 2") - ylogger.Zero.Debug().Str("db", row.name).Int("db", int(row.oid)).Int("db", int(row.tablespace)).Msg("database") - if row.name == "postgres" { - continue - } - - ylogger.Zero.Debug().Str("db", row.name).Msg("check database") - connDb, err := connectToDatabase(port, row.name) - if err != nil { - return DB{}, err - } - defer connDb.Close() //error - ylogger.Zero.Debug().Msg("cycle 3") - - rowsdb, err := connDb.Query(`SELECT exists(SELECT * FROM information_schema.schemata WHERE schema_name='yezzey');`) - if err != nil { - return DB{}, err - } - defer rowsdb.Close() - ylogger.Zero.Debug().Msg("cycle 4") - var ans bool - rowsdb.Next() - err = rowsdb.Scan(&ans) - if err != nil { - ylogger.Zero.Error().AnErr("error", err).Msg("error during yezzey check") - return DB{}, err - } - ylogger.Zero.Debug().Bool("result", ans).Msg("find yezzey schema") - if ans { - ylogger.Zero.Debug().Str("db", row.name).Msg("found yezzey schema in database") - ylogger.Zero.Debug().Int("db", int(row.oid)).Int("db", int(row.tablespace)).Msg("found yezzey schema in database") - return row, nil - } - - ylogger.Zero.Debug().Str("db", row.name).Msg("no yezzey schema in database") - } - return DB{}, fmt.Errorf("no yezzey schema across databases") -} - -type Ei struct { - reloid pgtype.OID - relfileoid pgtype.OID - expireLsn string - fqnmd5 string -} - -type DB struct { - name string - tablespace pgtype.OID - oid pgtype.OID -} - -type BackupLSN struct { - Lsn uint64 `json:"FinishLSN"` -} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index a4fed5a..5a6a1eb 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -6,7 +6,7 @@ import ( "io" "path" - "time" + "strings" "github.com/yezzey-gp/aws-sdk-go/aws" "github.com/yezzey-gp/aws-sdk-go/service/s3" @@ -53,6 +53,7 @@ func NewStorage(cnf *config.Storage) (StorageInteractor, error) { }, nil default: return nil, fmt.Errorf("wrong storage type " + cnf.StorageType) + } } @@ -129,9 +130,8 @@ func (s *S3StorageInteractor) PatchFile(name string, r io.ReadSeeker, startOffst } type S3ObjectMeta struct { - Path string - Size int64 - LastModified time.Time + Path string + Size int64 } func (s *S3StorageInteractor) ListPath(prefix string) ([]*S3ObjectMeta, error) { @@ -159,9 +159,8 @@ func (s *S3StorageInteractor) ListPath(prefix string) ([]*S3ObjectMeta, error) { for _, obj := range out.Contents { metas = append(metas, &S3ObjectMeta{ - Path: *obj.Key, - Size: *obj.Size, - LastModified: *obj.LastModified, + Path: *obj.Key, + Size: *obj.Size, }) } @@ -199,15 +198,36 @@ func (s *S3StorageInteractor) MoveObject(from string, to string) error { } ylogger.Zero.Debug().Str("", out.GoString()).Msg("copied object") + err = s.DeleteObject(fromPath) + if err != nil { + ylogger.Zero.Err(err).Msg("failed to delete old object") + } + ylogger.Zero.Debug().Msg("deleted object") + return err +} + +func (s *S3StorageInteractor) DeleteObject(key string) error { + sess, err := s.pool.GetSession(context.TODO()) + if err != nil { + ylogger.Zero.Err(err).Msg("failed to acquire s3 session") + return err + } + ylogger.Zero.Debug().Msg("aquired session") + + if !strings.HasPrefix(key, s.cnf.StoragePrefix) { + key = path.Join(s.cnf.StoragePrefix, key) + } + input2 := s3.DeleteObjectInput{ Bucket: &s.cnf.StorageBucket, - Key: aws.String(fromPath), + Key: aws.String(key), } _, err = sess.DeleteObject(&input2) if err != nil { ylogger.Zero.Err(err).Msg("failed to delete old object") + return err } ylogger.Zero.Debug().Msg("deleted object") - return err + return nil } diff --git a/pkg/storage/storage_interractor.go b/pkg/storage/storage_interractor.go new file mode 100644 index 0000000..c23603e --- /dev/null +++ b/pkg/storage/storage_interractor.go @@ -0,0 +1,30 @@ +package storage + +import "io" + +//go:generate mockgen -destination=pkg/mock/storage.go -package=mock +type StorageInteractor interface { + StorageReader + StorageWriter + StorageLister + StorageMover +} + +type StorageReader interface { + CatFileFromStorage(name string, offset int64) (io.ReadCloser, error) + ListPath(name string) ([]*S3ObjectMeta, error) +} + +type StorageWriter interface { + PutFileToDest(name string, r io.Reader) error + PatchFile(name string, r io.ReadSeeker, startOffste int64) error +} + +type StorageLister interface { + ListPath(prefix string) ([]*S3ObjectMeta, error) +} + +type StorageMover interface { + MoveObject(from string, to string) error + DeleteObject(key string) error +} From ec7ea9a50d70980d139d470d4a2bf1aa0a26a1db Mon Sep 17 00:00:00 2001 From: debebantur Date: Tue, 6 Aug 2024 08:43:25 +0000 Subject: [PATCH 5/5] fixes after refactoring --- Makefile | 2 +- cmd/client/main.go | 18 +-- pkg/backups/backups.go | 22 +-- pkg/message/delete_message.go | 10 +- pkg/message/message_test.go | 3 +- pkg/mock/storage.go | 247 ++++++++++++++--------------- pkg/proc/delete_handler_test.go | 2 +- pkg/proc/interaction.go | 16 +- pkg/storage/s3storage.go | 27 ++++ pkg/storage/storage.go | 187 +--------------------- pkg/storage/storage_interractor.go | 30 ---- 11 files changed, 190 insertions(+), 374 deletions(-) delete mode 100644 pkg/storage/storage_interractor.go diff --git a/Makefile b/Makefile index 8644c32..ca4b0b5 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ mockgen: mockgen -source=pkg/proc/yrreader.go -destination=pkg/mock/proc/yrreader.go -package=mock mockgen -source=pkg/database/database.go -destination=pkg/mock/database.go -package=mock mockgen -source=pkg/backups/backups.go -destination=pkg/mock/backups.go -package=mock - mockgen -source=pkg/storage/storage_interractor.go -destination=pkg/mock/storage.go -package=mock + mockgen -source=pkg/storage/storage.go -destination=pkg/mock/storage.go -package=mock version = $(shell git describe --tags --abbrev=0) package: diff --git a/cmd/client/main.go b/cmd/client/main.go index 0570078..0a65828 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -25,6 +25,7 @@ var offset uint64 var segmentPort int var segmentNum int var confirm bool +var garbage bool // TODOV func Runner(f func(net.Conn, *config.Instance, []string) error) func(*cobra.Command, []string) error { @@ -182,12 +183,10 @@ func listFunc(con net.Conn, instanceCnf *config.Instance, args []string) error { meta.Decode(body) res = append(res, meta.Content...) - break case message.MessageTypeReadyForQuery: done = true - break default: - return fmt.Errorf("Incorrect message type: %s", tp.String()) + return fmt.Errorf("incorrect message type: %s", tp.String()) } } @@ -217,10 +216,10 @@ var copyCmd = &cobra.Command{ } var deleteCmd = &cobra.Command{ - Use: "delete_garbage", - Short: "delete_garbage", + Use: "delete", + Short: "delete", RunE: func(cmd *cobra.Command, args []string) error { - ylogger.Zero.Info().Msg("Execute delete_garbage command") + ylogger.Zero.Info().Msg("Execute delete command") err := config.LoadInstanceConfig(cfgPath) if err != nil { return err @@ -231,10 +230,10 @@ var deleteCmd = &cobra.Command{ if err != nil { return err } - defer con.Close() + ylogger.Zero.Info().Str("name", args[0]).Msg("delete") - msg := message.NewDeleteMessage(args[0], segmentPort, segmentNum, confirm).Encode() + msg := message.NewDeleteMessage(args[0], segmentPort, segmentNum, confirm, garbage).Encode() _, err = con.Write(msg) if err != nil { return err @@ -292,8 +291,9 @@ func init() { rootCmd.AddCommand(listCmd) deleteCmd.PersistentFlags().IntVarP(&segmentPort, "port", "p", 6000, "port that segment is listening on") - deleteCmd.PersistentFlags().IntVarP(&segmentNum, "segnum", "s", 0, "number of the segment") + deleteCmd.PersistentFlags().IntVarP(&segmentNum, "segnum", "s", 0, "logical number of a segment") deleteCmd.PersistentFlags().BoolVarP(&confirm, "confirm", "", false, "confirm deletion") + deleteCmd.PersistentFlags().BoolVarP(&garbage, "garbage", "g", false, "delete garbage") rootCmd.AddCommand(deleteCmd) } diff --git a/pkg/backups/backups.go b/pkg/backups/backups.go index 27bf1c6..6245ad1 100644 --- a/pkg/backups/backups.go +++ b/pkg/backups/backups.go @@ -19,7 +19,7 @@ type BackupInterractor interface { GetFirstLSN(int) (uint64, error) } -type WalgBackupInterractor struct { +type WalgBackupInterractor struct { //TODO: rewrite to using s3 instead of wal-g cmd } // get lsn of the oldest backup @@ -34,29 +34,29 @@ func (b *WalgBackupInterractor) GetFirstLSN(seg int) (uint64, error) { ylogger.Zero.Debug().AnErr("error", err).Msg("Failed to run st ls") return 0, err } - p1 := strings.Split(out.String(), "\n") + lines := strings.Split(out.String(), "\n") minLSN := BackupLSN{Lsn: ^uint64(0)} - for _, line := range p1 { + for _, line := range lines { if !strings.Contains(line, ".json") { continue } - p2 := strings.Split(line, " ") - p3 := p2[len(p2)-1] + parts := strings.Split(line, " ") + fileName := parts[len(parts)-1] - ylogger.Zero.Debug().Str("file: %s", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, p3)).Msg("check lsn in file") - cmd2 := exec.Command("/usr/bin/wal-g", "st", "cat", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, p3), "--config=/etc/wal-g/wal-g.yaml") + ylogger.Zero.Debug().Str("file: %s", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, fileName)).Msg("check lsn in file") + catCmd := exec.Command("/usr/bin/wal-g", "st", "cat", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, fileName), "--config=/etc/wal-g/wal-g.yaml") - var out2 bytes.Buffer - cmd2.Stdout = &out2 + var catOut bytes.Buffer + catCmd.Stdout = &catOut - err = cmd2.Run() + err = catCmd.Run() if err != nil { ylogger.Zero.Debug().AnErr("error", err).Msg("Failed to run st cat") return 0, err } lsn := BackupLSN{} - err = json.Unmarshal(out2.Bytes(), &lsn) + err = json.Unmarshal(catOut.Bytes(), &lsn) if lsn.Lsn < minLSN.Lsn { minLSN.Lsn = lsn.Lsn diff --git a/pkg/message/delete_message.go b/pkg/message/delete_message.go index 2af9881..599ba1c 100644 --- a/pkg/message/delete_message.go +++ b/pkg/message/delete_message.go @@ -10,16 +10,18 @@ type DeleteMessage struct { //seg port Port int Segnum int Confirm bool + Garbage bool } var _ ProtoMessage = &DeleteMessage{} -func NewDeleteMessage(name string, port int, seg int, confirm bool) *DeleteMessage { +func NewDeleteMessage(name string, port int, seg int, confirm bool, garbage bool) *DeleteMessage { return &DeleteMessage{ Name: name, Port: port, Segnum: seg, Confirm: confirm, + Garbage: garbage, } } @@ -34,6 +36,9 @@ func (c *DeleteMessage) Encode() []byte { if c.Confirm { bt[1] = 1 } + if c.Garbage { + bt[2] = 1 + } bt = append(bt, []byte(c.Name)...) bt = append(bt, 0) @@ -56,6 +61,9 @@ func (c *DeleteMessage) Decode(body []byte) { if body[1] == 1 { c.Confirm = true } + if body[2] == 1 { + c.Garbage = true + } c.Name = c.GetDeleteName(body[4:]) c.Port = int(binary.BigEndian.Uint64(body[len(body)-16 : len(body)-8])) c.Segnum = int(binary.BigEndian.Uint64(body[len(body)-8:])) diff --git a/pkg/message/message_test.go b/pkg/message/message_test.go index a522ccf..8cb8734 100644 --- a/pkg/message/message_test.go +++ b/pkg/message/message_test.go @@ -185,7 +185,7 @@ func TestCopyMsg(t *testing.T) { func TestDeleteMsg(t *testing.T) { assert := assert.New(t) - msg := message.NewDeleteMessage("myname/mynextname", 5432, 42, true) + msg := message.NewDeleteMessage("myname/mynextname", 5432, 42, true, true) body := msg.Encode() assert.Equal(body[8], byte(message.MessageTypeDelete)) @@ -197,4 +197,5 @@ func TestDeleteMsg(t *testing.T) { assert.Equal(5432, msg2.Port) assert.Equal(42, msg2.Segnum) assert.True(msg2.Confirm) + assert.True(msg2.Garbage) } diff --git a/pkg/mock/storage.go b/pkg/mock/storage.go index e0bc880..a733f0d 100644 --- a/pkg/mock/storage.go +++ b/pkg/mock/storage.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: pkg/storage/storage_interractor.go +// Source: pkg/storage/storage.go // Package mock is a generated GoMock package. package mock @@ -12,115 +12,6 @@ import ( storage "github.com/yezzey-gp/yproxy/pkg/storage" ) -// MockStorageInteractor is a mock of StorageInteractor interface. -type MockStorageInteractor struct { - ctrl *gomock.Controller - recorder *MockStorageInteractorMockRecorder -} - -// MockStorageInteractorMockRecorder is the mock recorder for MockStorageInteractor. -type MockStorageInteractorMockRecorder struct { - mock *MockStorageInteractor -} - -// NewMockStorageInteractor creates a new mock instance. -func NewMockStorageInteractor(ctrl *gomock.Controller) *MockStorageInteractor { - mock := &MockStorageInteractor{ctrl: ctrl} - mock.recorder = &MockStorageInteractorMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockStorageInteractor) EXPECT() *MockStorageInteractorMockRecorder { - return m.recorder -} - -// CatFileFromStorage mocks base method. -func (m *MockStorageInteractor) CatFileFromStorage(name string, offset int64) (io.ReadCloser, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CatFileFromStorage", name, offset) - ret0, _ := ret[0].(io.ReadCloser) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// CatFileFromStorage indicates an expected call of CatFileFromStorage. -func (mr *MockStorageInteractorMockRecorder) CatFileFromStorage(name, offset interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CatFileFromStorage", reflect.TypeOf((*MockStorageInteractor)(nil).CatFileFromStorage), name, offset) -} - -// DeleteObject mocks base method. -func (m *MockStorageInteractor) DeleteObject(key string) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeleteObject", key) - ret0, _ := ret[0].(error) - return ret0 -} - -// DeleteObject indicates an expected call of DeleteObject. -func (mr *MockStorageInteractorMockRecorder) DeleteObject(key interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObject", reflect.TypeOf((*MockStorageInteractor)(nil).DeleteObject), key) -} - -// ListPath mocks base method. -func (m *MockStorageInteractor) ListPath(name string) ([]*storage.S3ObjectMeta, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ListPath", name) - ret0, _ := ret[0].([]*storage.S3ObjectMeta) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// ListPath indicates an expected call of ListPath. -func (mr *MockStorageInteractorMockRecorder) ListPath(name interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListPath", reflect.TypeOf((*MockStorageInteractor)(nil).ListPath), name) -} - -// MoveObject mocks base method. -func (m *MockStorageInteractor) MoveObject(from, to string) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "MoveObject", from, to) - ret0, _ := ret[0].(error) - return ret0 -} - -// MoveObject indicates an expected call of MoveObject. -func (mr *MockStorageInteractorMockRecorder) MoveObject(from, to interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MoveObject", reflect.TypeOf((*MockStorageInteractor)(nil).MoveObject), from, to) -} - -// PatchFile mocks base method. -func (m *MockStorageInteractor) PatchFile(name string, r io.ReadSeeker, startOffste int64) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PatchFile", name, r, startOffste) - ret0, _ := ret[0].(error) - return ret0 -} - -// PatchFile indicates an expected call of PatchFile. -func (mr *MockStorageInteractorMockRecorder) PatchFile(name, r, startOffste interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PatchFile", reflect.TypeOf((*MockStorageInteractor)(nil).PatchFile), name, r, startOffste) -} - -// PutFileToDest mocks base method. -func (m *MockStorageInteractor) PutFileToDest(name string, r io.Reader) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PutFileToDest", name, r) - ret0, _ := ret[0].(error) - return ret0 -} - -// PutFileToDest indicates an expected call of PutFileToDest. -func (mr *MockStorageInteractorMockRecorder) PutFileToDest(name, r interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutFileToDest", reflect.TypeOf((*MockStorageInteractor)(nil).PutFileToDest), name, r) -} - // MockStorageReader is a mock of StorageReader interface. type MockStorageReader struct { ctrl *gomock.Controller @@ -159,21 +50,6 @@ func (mr *MockStorageReaderMockRecorder) CatFileFromStorage(name, offset interfa return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CatFileFromStorage", reflect.TypeOf((*MockStorageReader)(nil).CatFileFromStorage), name, offset) } -// ListPath mocks base method. -func (m *MockStorageReader) ListPath(name string) ([]*storage.S3ObjectMeta, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ListPath", name) - ret0, _ := ret[0].([]*storage.S3ObjectMeta) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// ListPath indicates an expected call of ListPath. -func (mr *MockStorageReaderMockRecorder) ListPath(name interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListPath", reflect.TypeOf((*MockStorageReader)(nil).ListPath), name) -} - // MockStorageWriter is a mock of StorageWriter interface. type MockStorageWriter struct { ctrl *gomock.Controller @@ -198,17 +74,17 @@ func (m *MockStorageWriter) EXPECT() *MockStorageWriterMockRecorder { } // PatchFile mocks base method. -func (m *MockStorageWriter) PatchFile(name string, r io.ReadSeeker, startOffste int64) error { +func (m *MockStorageWriter) PatchFile(name string, r io.ReadSeeker, startOffset int64) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PatchFile", name, r, startOffste) + ret := m.ctrl.Call(m, "PatchFile", name, r, startOffset) ret0, _ := ret[0].(error) return ret0 } // PatchFile indicates an expected call of PatchFile. -func (mr *MockStorageWriterMockRecorder) PatchFile(name, r, startOffste interface{}) *gomock.Call { +func (mr *MockStorageWriterMockRecorder) PatchFile(name, r, startOffset interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PatchFile", reflect.TypeOf((*MockStorageWriter)(nil).PatchFile), name, r, startOffste) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PatchFile", reflect.TypeOf((*MockStorageWriter)(nil).PatchFile), name, r, startOffset) } // PutFileToDest mocks base method. @@ -249,10 +125,10 @@ func (m *MockStorageLister) EXPECT() *MockStorageListerMockRecorder { } // ListPath mocks base method. -func (m *MockStorageLister) ListPath(prefix string) ([]*storage.S3ObjectMeta, error) { +func (m *MockStorageLister) ListPath(prefix string) ([]*storage.ObjectInfo, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ListPath", prefix) - ret0, _ := ret[0].([]*storage.S3ObjectMeta) + ret0, _ := ret[0].([]*storage.ObjectInfo) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -313,3 +189,112 @@ func (mr *MockStorageMoverMockRecorder) MoveObject(from, to interface{}) *gomock mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MoveObject", reflect.TypeOf((*MockStorageMover)(nil).MoveObject), from, to) } + +// MockStorageInteractor is a mock of StorageInteractor interface. +type MockStorageInteractor struct { + ctrl *gomock.Controller + recorder *MockStorageInteractorMockRecorder +} + +// MockStorageInteractorMockRecorder is the mock recorder for MockStorageInteractor. +type MockStorageInteractorMockRecorder struct { + mock *MockStorageInteractor +} + +// NewMockStorageInteractor creates a new mock instance. +func NewMockStorageInteractor(ctrl *gomock.Controller) *MockStorageInteractor { + mock := &MockStorageInteractor{ctrl: ctrl} + mock.recorder = &MockStorageInteractorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockStorageInteractor) EXPECT() *MockStorageInteractorMockRecorder { + return m.recorder +} + +// CatFileFromStorage mocks base method. +func (m *MockStorageInteractor) CatFileFromStorage(name string, offset int64) (io.ReadCloser, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CatFileFromStorage", name, offset) + ret0, _ := ret[0].(io.ReadCloser) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CatFileFromStorage indicates an expected call of CatFileFromStorage. +func (mr *MockStorageInteractorMockRecorder) CatFileFromStorage(name, offset interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CatFileFromStorage", reflect.TypeOf((*MockStorageInteractor)(nil).CatFileFromStorage), name, offset) +} + +// DeleteObject mocks base method. +func (m *MockStorageInteractor) DeleteObject(key string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteObject", key) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteObject indicates an expected call of DeleteObject. +func (mr *MockStorageInteractorMockRecorder) DeleteObject(key interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObject", reflect.TypeOf((*MockStorageInteractor)(nil).DeleteObject), key) +} + +// ListPath mocks base method. +func (m *MockStorageInteractor) ListPath(prefix string) ([]*storage.ObjectInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListPath", prefix) + ret0, _ := ret[0].([]*storage.ObjectInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListPath indicates an expected call of ListPath. +func (mr *MockStorageInteractorMockRecorder) ListPath(prefix interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListPath", reflect.TypeOf((*MockStorageInteractor)(nil).ListPath), prefix) +} + +// MoveObject mocks base method. +func (m *MockStorageInteractor) MoveObject(from, to string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MoveObject", from, to) + ret0, _ := ret[0].(error) + return ret0 +} + +// MoveObject indicates an expected call of MoveObject. +func (mr *MockStorageInteractorMockRecorder) MoveObject(from, to interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MoveObject", reflect.TypeOf((*MockStorageInteractor)(nil).MoveObject), from, to) +} + +// PatchFile mocks base method. +func (m *MockStorageInteractor) PatchFile(name string, r io.ReadSeeker, startOffset int64) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PatchFile", name, r, startOffset) + ret0, _ := ret[0].(error) + return ret0 +} + +// PatchFile indicates an expected call of PatchFile. +func (mr *MockStorageInteractorMockRecorder) PatchFile(name, r, startOffset interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PatchFile", reflect.TypeOf((*MockStorageInteractor)(nil).PatchFile), name, r, startOffset) +} + +// PutFileToDest mocks base method. +func (m *MockStorageInteractor) PutFileToDest(name string, r io.Reader) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PutFileToDest", name, r) + ret0, _ := ret[0].(error) + return ret0 +} + +// PutFileToDest indicates an expected call of PutFileToDest. +func (mr *MockStorageInteractorMockRecorder) PutFileToDest(name, r interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutFileToDest", reflect.TypeOf((*MockStorageInteractor)(nil).PutFileToDest), name, r) +} diff --git a/pkg/proc/delete_handler_test.go b/pkg/proc/delete_handler_test.go index 895faff..a37995d 100644 --- a/pkg/proc/delete_handler_test.go +++ b/pkg/proc/delete_handler_test.go @@ -60,7 +60,7 @@ func TestFilesToDeletion(t *testing.T) { Confirm: false, } - filesInStorage := []*storage.S3ObjectMeta{ + filesInStorage := []*storage.ObjectInfo{ {Path: "1663_16530_not-deleted_18002_"}, {Path: "1663_16530_deleted-after-backup_18002_"}, {Path: "1663_16530_deleted-when-backup-start_18002_"}, diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index f357f1a..b3f81da 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -335,10 +335,18 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl BackupInterractor: backupHandler, } - err = dh.HandleDeleteGarbage(msg) - if err != nil { - _ = ycl.ReplyError(err, "failed to finish operation") - return nil + if msg.Garbage { + err = dh.HandleDeleteGarbage(msg) + if err != nil { + _ = ycl.ReplyError(err, "failed to finish operation") + return nil + } + } else { + err = dh.HandleDeleteFile(msg) + if err != nil { + _ = ycl.ReplyError(err, "failed to finish operation") + return nil + } } if _, err = ycl.GetRW().Write(message.NewReadyForQueryMessage().Encode()); err != nil { diff --git a/pkg/storage/s3storage.go b/pkg/storage/s3storage.go index f190718..bbf6e2b 100644 --- a/pkg/storage/s3storage.go +++ b/pkg/storage/s3storage.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "path" + "strings" "github.com/yezzey-gp/aws-sdk-go/aws" "github.com/yezzey-gp/aws-sdk-go/service/s3" @@ -136,3 +137,29 @@ func (s *S3StorageInteractor) ListPath(prefix string) ([]*ObjectInfo, error) { } return metas, nil } + +func (s *S3StorageInteractor) DeleteObject(key string) error { + sess, err := s.pool.GetSession(context.TODO()) + if err != nil { + ylogger.Zero.Err(err).Msg("failed to acquire s3 session") + return err + } + ylogger.Zero.Debug().Msg("aquired session") + + if !strings.HasPrefix(key, s.cnf.StoragePrefix) { + key = path.Join(s.cnf.StoragePrefix, key) + } + + input2 := s3.DeleteObjectInput{ + Bucket: &s.cnf.StorageBucket, + Key: aws.String(key), + } + + _, err = sess.DeleteObject(&input2) + if err != nil { + ylogger.Zero.Err(err).Msg("failed to delete old object") + return err + } + ylogger.Zero.Debug().Msg("deleted object") + return nil +} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 5a6a1eb..6f02559 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -1,18 +1,10 @@ package storage import ( - "context" "fmt" "io" - "path" - "strings" - - "github.com/yezzey-gp/aws-sdk-go/aws" - "github.com/yezzey-gp/aws-sdk-go/service/s3" - "github.com/yezzey-gp/aws-sdk-go/service/s3/s3manager" "github.com/yezzey-gp/yproxy/config" - "github.com/yezzey-gp/yproxy/pkg/ylogger" ) type StorageReader interface { @@ -30,9 +22,10 @@ type StorageLister interface { type StorageMover interface { MoveObject(from string, to string) error - DeleteObject(key string) error } + +//go:generate mockgen -destination=pkg/mock/storage.go -package=mock type StorageInteractor interface { StorageReader StorageWriter @@ -53,181 +46,5 @@ func NewStorage(cnf *config.Storage) (StorageInteractor, error) { }, nil default: return nil, fmt.Errorf("wrong storage type " + cnf.StorageType) - - } -} - -func (s *S3StorageInteractor) CatFileFromStorage(name string, offset int64) (io.ReadCloser, error) { - // XXX: fix this - sess, err := s.pool.GetSession(context.TODO()) - if err != nil { - ylogger.Zero.Err(err).Msg("failed to acquire s3 session") - return nil, err - } - - objectPath := path.Join(s.cnf.StoragePrefix, name) - input := &s3.GetObjectInput{ - Bucket: &s.cnf.StorageBucket, - Key: aws.String(objectPath), - Range: aws.String(fmt.Sprintf("bytes=%d-", offset)), - } - - ylogger.Zero.Debug().Str("key", objectPath).Int64("offset", offset).Str("bucket", - s.cnf.StorageBucket).Msg("requesting external storage") - - object, err := sess.GetObject(input) - return object.Body, err -} - -func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader) error { - sess, err := s.pool.GetSession(context.TODO()) - if err != nil { - ylogger.Zero.Err(err).Msg("failed to acquire s3 session") - return nil - } - - objectPath := path.Join(s.cnf.StoragePrefix, name) - - up := s3manager.NewUploaderWithClient(sess, func(uploader *s3manager.Uploader) { - uploader.PartSize = int64(1 << 24) - uploader.Concurrency = 1 - }) - - _, err = up.Upload( - &s3manager.UploadInput{ - Bucket: aws.String(s.cnf.StorageBucket), - Key: aws.String(objectPath), - Body: r, - StorageClass: aws.String("STANDARD"), - }, - ) - - return err -} - -func (s *S3StorageInteractor) PatchFile(name string, r io.ReadSeeker, startOffste int64) error { - sess, err := s.pool.GetSession(context.TODO()) - if err != nil { - ylogger.Zero.Err(err).Msg("failed to acquire s3 session") - return nil - } - - objectPath := path.Join(s.cnf.StoragePrefix, name) - - input := &s3.PatchObjectInput{ - Bucket: &s.cnf.StorageBucket, - Key: aws.String(objectPath), - Body: r, - ContentRange: aws.String(fmt.Sprintf("bytes %d-18446744073709551615", startOffste)), - } - - _, err = sess.PatchObject(input) - - ylogger.Zero.Debug().Str("key", objectPath).Str("bucket", - s.cnf.StorageBucket).Msg("modifying file in external storage") - - return err -} - -type S3ObjectMeta struct { - Path string - Size int64 -} - -func (s *S3StorageInteractor) ListPath(prefix string) ([]*S3ObjectMeta, error) { - sess, err := s.pool.GetSession(context.TODO()) - if err != nil { - ylogger.Zero.Err(err).Msg("failed to acquire s3 session") - return nil, err - } - - var continuationToken *string - prefix = path.Join(s.cnf.StoragePrefix, prefix) - metas := make([]*S3ObjectMeta, 0) - - for { - input := &s3.ListObjectsV2Input{ - Bucket: &s.cnf.StorageBucket, - Prefix: aws.String(prefix), - ContinuationToken: continuationToken, - } - - out, err := sess.ListObjectsV2(input) - if err != nil { - fmt.Printf("list error: %v\n", err) - } - - for _, obj := range out.Contents { - metas = append(metas, &S3ObjectMeta{ - Path: *obj.Key, - Size: *obj.Size, - }) - } - - if !*out.IsTruncated { - break - } - - continuationToken = out.NextContinuationToken - } - return metas, nil -} - -func (s *S3StorageInteractor) MoveObject(from string, to string) error { - sess, err := s.pool.GetSession(context.TODO()) - if err != nil { - ylogger.Zero.Err(err).Msg("failed to acquire s3 session") - return err - } - ylogger.Zero.Debug().Msg("aquired session") - - fromPath := from - toPath := path.Join(s.cnf.StoragePrefix, to) - ylogger.Zero.Debug().Str("to", toPath).Msg("to path") - - input := s3.CopyObjectInput{ - Bucket: &s.cnf.StorageBucket, - CopySource: aws.String(s.cnf.StorageBucket + "/" + fromPath), - Key: aws.String(toPath), - } - - out, err := sess.CopyObject(&input) - if err != nil { - ylogger.Zero.Err(err).Msg("failed to copy object") - return err - } - ylogger.Zero.Debug().Str("", out.GoString()).Msg("copied object") - - err = s.DeleteObject(fromPath) - if err != nil { - ylogger.Zero.Err(err).Msg("failed to delete old object") - } - ylogger.Zero.Debug().Msg("deleted object") - return err -} - -func (s *S3StorageInteractor) DeleteObject(key string) error { - sess, err := s.pool.GetSession(context.TODO()) - if err != nil { - ylogger.Zero.Err(err).Msg("failed to acquire s3 session") - return err - } - ylogger.Zero.Debug().Msg("aquired session") - - if !strings.HasPrefix(key, s.cnf.StoragePrefix) { - key = path.Join(s.cnf.StoragePrefix, key) - } - - input2 := s3.DeleteObjectInput{ - Bucket: &s.cnf.StorageBucket, - Key: aws.String(key), - } - - _, err = sess.DeleteObject(&input2) - if err != nil { - ylogger.Zero.Err(err).Msg("failed to delete old object") - return err } - ylogger.Zero.Debug().Msg("deleted object") - return nil } diff --git a/pkg/storage/storage_interractor.go b/pkg/storage/storage_interractor.go deleted file mode 100644 index c23603e..0000000 --- a/pkg/storage/storage_interractor.go +++ /dev/null @@ -1,30 +0,0 @@ -package storage - -import "io" - -//go:generate mockgen -destination=pkg/mock/storage.go -package=mock -type StorageInteractor interface { - StorageReader - StorageWriter - StorageLister - StorageMover -} - -type StorageReader interface { - CatFileFromStorage(name string, offset int64) (io.ReadCloser, error) - ListPath(name string) ([]*S3ObjectMeta, error) -} - -type StorageWriter interface { - PutFileToDest(name string, r io.Reader) error - PatchFile(name string, r io.ReadSeeker, startOffste int64) error -} - -type StorageLister interface { - ListPath(prefix string) ([]*S3ObjectMeta, error) -} - -type StorageMover interface { - MoveObject(from string, to string) error - DeleteObject(key string) error -}