From 8105eda605d1dd8c2685d350586e803bc14a599e Mon Sep 17 00:00:00 2001 From: reshke Date: Tue, 26 Dec 2023 00:07:33 +0500 Subject: [PATCH] Refactor: move message definition and util functions to separate pkg (#21) * Refactor: move message definition and util functions to separate pkg Also define vacuum and delete_msg structures, TBD * Fix makefile --- Makefile | 2 +- cmd/client/main.go | 11 ++--- pkg/{proc => message}/cat_message.go | 2 +- pkg/{proc => message}/command_complete.go | 2 +- pkg/{proc => message}/copy_data.go | 2 +- pkg/message/delete_message.go | 52 +++++++++++++++++++++++ pkg/{proc => message}/message.go | 5 ++- pkg/{proc => message}/message_test.go | 16 +++---- pkg/{proc => message}/put_message.go | 2 +- pkg/{proc => message}/ready_for_query.go | 2 +- pkg/proc/interaction.go | 19 +++++---- pkg/proc/proto.go | 5 ++- pkg/vacuum/vacuum.go | 9 ++++ 13 files changed, 98 insertions(+), 31 deletions(-) rename pkg/{proc => message}/cat_message.go (98%) rename pkg/{proc => message}/command_complete.go (96%) rename pkg/{proc => message}/copy_data.go (97%) create mode 100644 pkg/message/delete_message.go rename pkg/{proc => message}/message.go (88%) rename pkg/{proc => message}/message_test.go (80%) rename pkg/{proc => message}/put_message.go (98%) rename pkg/{proc => message}/ready_for_query.go (96%) create mode 100644 pkg/vacuum/vacuum.go diff --git a/Makefile b/Makefile index 6e9be7f..67b0770 100644 --- a/Makefile +++ b/Makefile @@ -13,5 +13,5 @@ build: ####################### TESTS ####################### unittest: - go test -race ./pkg/proc/... + go test -race ./pkg/message/... diff --git a/cmd/client/main.go b/cmd/client/main.go index aab7838..4288a75 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -9,6 +9,7 @@ import ( "github.com/spf13/cobra" "github.com/yezzey-gp/yproxy/config" "github.com/yezzey-gp/yproxy/pkg/client" + "github.com/yezzey-gp/yproxy/pkg/message" "github.com/yezzey-gp/yproxy/pkg/proc" "github.com/yezzey-gp/yproxy/pkg/ylogger" ) @@ -42,7 +43,7 @@ var catCmd = &cobra.Command{ } defer con.Close() - msg := proc.NewCatMessage(args[0], decrypt).Encode() + msg := message.NewCatMessage(args[0], decrypt).Encode() _, err = con.Write(msg) if err != nil { return err @@ -81,7 +82,7 @@ var putCmd = &cobra.Command{ r := proc.NewProtoReader(ycl) defer con.Close() - msg := proc.NewPutMessage(args[0], encrypt).Encode() + msg := message.NewPutMessage(args[0], encrypt).Encode() _, err = con.Write(msg) if err != nil { return err @@ -94,7 +95,7 @@ var putCmd = &cobra.Command{ for { n, err := os.Stdin.Read(chunk) if n > 0 { - msg := proc.NewCopyDataMessage() + msg := message.NewCopyDataMessage() msg.Sz = uint64(n) msg.Data = make([]byte, msg.Sz) copy(msg.Data, chunk[:n]) @@ -119,7 +120,7 @@ var putCmd = &cobra.Command{ ylogger.Zero.Debug().Msg("send command complete msg") - msg = proc.NewCommandCompleteMessage().Encode() + msg = message.NewCommandCompleteMessage().Encode() _, err = con.Write(msg) if err != nil { return err @@ -130,7 +131,7 @@ var putCmd = &cobra.Command{ return err } - if tp == proc.MessageTypeReadyForQuery { + if tp == message.MessageTypeReadyForQuery { // ok ylogger.Zero.Debug().Msg("got rfq") diff --git a/pkg/proc/cat_message.go b/pkg/message/cat_message.go similarity index 98% rename from pkg/proc/cat_message.go rename to pkg/message/cat_message.go index b0acc05..10ff96f 100644 --- a/pkg/proc/cat_message.go +++ b/pkg/message/cat_message.go @@ -1,4 +1,4 @@ -package proc +package message import ( "bytes" diff --git a/pkg/proc/command_complete.go b/pkg/message/command_complete.go similarity index 96% rename from pkg/proc/command_complete.go rename to pkg/message/command_complete.go index 7bcfb61..d8c705c 100644 --- a/pkg/proc/command_complete.go +++ b/pkg/message/command_complete.go @@ -1,4 +1,4 @@ -package proc +package message import "encoding/binary" diff --git a/pkg/proc/copy_data.go b/pkg/message/copy_data.go similarity index 97% rename from pkg/proc/copy_data.go rename to pkg/message/copy_data.go index acd5d13..e8c5329 100644 --- a/pkg/proc/copy_data.go +++ b/pkg/message/copy_data.go @@ -1,4 +1,4 @@ -package proc +package message import ( "encoding/binary" diff --git a/pkg/message/delete_message.go b/pkg/message/delete_message.go new file mode 100644 index 0000000..ccab831 --- /dev/null +++ b/pkg/message/delete_message.go @@ -0,0 +1,52 @@ +package message + +import ( + "bytes" + "encoding/binary" +) + +type DeleteMessage struct { + Name string +} + +var _ ProtoMessage = &DeleteMessage{} + +func NewDeleteMessage(name string) *DeleteMessage { + return &DeleteMessage{ + Name: name, + } +} + +func (c *DeleteMessage) Encode() []byte { + bt := []byte{ + byte(MessageTypeDelete), + 0, + 0, + 0, + } + + bt = append(bt, []byte(c.Name)...) + bt = append(bt, 0) + ln := len(bt) + 8 + + bs := make([]byte, 8) + binary.BigEndian.PutUint64(bs, uint64(ln)) + return append(bs, bt...) +} + +func (c *DeleteMessage) Decode(body []byte) { + c.Name = c.GetDeleteName(body[4:]) +} + +func (c *DeleteMessage) GetDeleteName(b []byte) string { + buff := bytes.NewBufferString("") + + for i := 0; i < len(b); i++ { + if b[i] == 0 { + break + } + buff.WriteByte(b[i]) + } + + return buff.String() +} diff --git a/pkg/proc/message.go b/pkg/message/message.go similarity index 88% rename from pkg/proc/message.go rename to pkg/message/message.go index 4272391..ee83e4c 100644 --- a/pkg/proc/message.go +++ b/pkg/message/message.go @@ -1,4 +1,4 @@ -package proc +package message type ProtoMessage interface { Decode([]byte) @@ -15,6 +15,7 @@ const ( MessageTypeCommandComplete = MessageType(44) MessageTypeReadyForQuery = MessageType(45) MessageTypeCopyData = MessageType(46) + MessageTypeDelete = MessageType(47) DecryptMessage = RequestEncryption(1) NoDecryptMessage = RequestEncryption(0) @@ -35,6 +36,8 @@ func (m MessageType) String() string { return "READY FOR QUERY" case MessageTypeCopyData: return "COPY DATA" + case MessageTypeDelete: + return "DELETE" } return "UNKNOWN" } diff --git a/pkg/proc/message_test.go b/pkg/message/message_test.go similarity index 80% rename from pkg/proc/message_test.go rename to pkg/message/message_test.go index 960d77a..8a22462 100644 --- a/pkg/proc/message_test.go +++ b/pkg/message/message_test.go @@ -1,11 +1,11 @@ -package proc_test +package message_test import ( "encoding/binary" "testing" "github.com/stretchr/testify/assert" - "github.com/yezzey-gp/yproxy/pkg/proc" + "github.com/yezzey-gp/yproxy/pkg/message" ) func TestCatMsg(t *testing.T) { @@ -25,10 +25,10 @@ func TestCatMsg(t *testing.T) { }, } { - msg := proc.NewCatMessage(tt.name, tt.decrypt) + msg := message.NewCatMessage(tt.name, tt.decrypt) body := msg.Encode() - msg2 := proc.CatMessage{} + msg2 := message.CatMessage{} msg2.Decode(body[8:]) @@ -54,10 +54,10 @@ func TestPutMsg(t *testing.T) { }, } { - msg := proc.NewPutMessage(tt.name, tt.encrypt) + msg := message.NewPutMessage(tt.name, tt.encrypt) body := msg.Encode() - msg2 := proc.CatMessage{} + msg2 := message.CatMessage{} msg2.Decode(body[8:]) @@ -83,12 +83,12 @@ func TestCopyDataMsg(t *testing.T) { }, } { - msg := proc.NewCopyDataMessage() + msg := message.NewCopyDataMessage() msg.Data = tt.body msg.Sz = uint64(len(tt.body)) body := msg.Encode() - msg2 := proc.CopyDataMessage{} + msg2 := message.CopyDataMessage{} msg2.Decode(body[8:]) diff --git a/pkg/proc/put_message.go b/pkg/message/put_message.go similarity index 98% rename from pkg/proc/put_message.go rename to pkg/message/put_message.go index b29aaef..d541858 100644 --- a/pkg/proc/put_message.go +++ b/pkg/message/put_message.go @@ -1,4 +1,4 @@ -package proc +package message import ( "bytes" diff --git a/pkg/proc/ready_for_query.go b/pkg/message/ready_for_query.go similarity index 96% rename from pkg/proc/ready_for_query.go rename to pkg/message/ready_for_query.go index 99e6a2f..9a3886a 100644 --- a/pkg/proc/ready_for_query.go +++ b/pkg/message/ready_for_query.go @@ -1,4 +1,4 @@ -package proc +package message import "encoding/binary" diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index 6d3a63c..19bea47 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -7,6 +7,7 @@ import ( "github.com/yezzey-gp/yproxy/pkg/client" "github.com/yezzey-gp/yproxy/pkg/crypt" + "github.com/yezzey-gp/yproxy/pkg/message" "github.com/yezzey-gp/yproxy/pkg/storage" "github.com/yezzey-gp/yproxy/pkg/ylogger" ) @@ -24,9 +25,9 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl *client.YClient ylogger.Zero.Debug().Str("msg-type", tp.String()).Msg("recieved client request") switch tp { - case MessageTypeCat: + case message.MessageTypeCat: // omit first byte - msg := CatMessage{} + msg := message.CatMessage{} msg.Decode(body) ylogger.Zero.Debug().Str("object-path", msg.Name).Msg("cat object") r, err := s.CatFileFromStorage(msg.Name) @@ -48,9 +49,9 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl *client.YClient _ = ycl.Conn.Close() - case MessageTypePut: + case message.MessageTypePut: - msg := PutMessage{} + msg := message.PutMessage{} msg.Decode(body) var w io.WriteCloser @@ -89,8 +90,8 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl *client.YClient ylogger.Zero.Debug().Str("msg-type", tp.String()).Msg("recieved client request") switch tp { - case MessageTypeCopyData: - msg := CopyDataMessage{} + case message.MessageTypeCopyData: + msg := message.CopyDataMessage{} msg.Decode(body) if n, err := ww.Write(msg.Data); err != nil { _ = ycl.ReplyError(err, "failed to compelete request") @@ -104,8 +105,8 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl *client.YClient _ = ycl.Conn.Close() return } - case MessageTypeCommandComplete: - msg := CommandCompleteMessage{} + case message.MessageTypeCommandComplete: + msg := message.CommandCompleteMessage{} msg.Decode(body) if err := ww.Close(); err != nil { @@ -131,7 +132,7 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl *client.YClient return ycl.Conn.Close() } - _, err = ycl.Conn.Write(NewReadyForQueryMessage().Encode()) + _, err = ycl.Conn.Write(message.NewReadyForQueryMessage().Encode()) if err != nil { _ = ycl.ReplyError(err, "failed to upload") diff --git a/pkg/proc/proto.go b/pkg/proc/proto.go index 234476c..6779b67 100644 --- a/pkg/proc/proto.go +++ b/pkg/proc/proto.go @@ -7,6 +7,7 @@ import ( "net" "github.com/yezzey-gp/yproxy/pkg/client" + "github.com/yezzey-gp/yproxy/pkg/message" "github.com/yezzey-gp/yproxy/pkg/ylogger" ) @@ -22,7 +23,7 @@ func NewProtoReader(ycl *client.YClient) *ProtoReader { const maxMsgLen = 1 << 20 -func (r *ProtoReader) ReadPacket() (MessageType, []byte, error) { +func (r *ProtoReader) ReadPacket() (message.MessageType, []byte, error) { msgLenBuf := make([]byte, 8) _, err := io.ReadFull(r.c, msgLenBuf) if err != nil { @@ -49,6 +50,6 @@ func (r *ProtoReader) ReadPacket() (MessageType, []byte, error) { return 0, nil, err } - msgType := MessageType(data[0]) + msgType := message.MessageType(data[0]) return msgType, data, nil } diff --git a/pkg/vacuum/vacuum.go b/pkg/vacuum/vacuum.go new file mode 100644 index 0000000..2c866ff --- /dev/null +++ b/pkg/vacuum/vacuum.go @@ -0,0 +1,9 @@ +package vacuum + +// Greenplum + yezzey specific logic for external storage vacuuming +// Yezzey stores AO/AOCS relations data in one or several files in external +// storage (chunks). Chunk considered to be obsolete when +// table referencing this chunk was deleted and all backups (WAL-G) which +// contains this table was deleted + +// TODO: implement