Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: move message definition and util functions to separate pkg #21

Merged
merged 2 commits into from
Dec 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ build:
####################### TESTS #######################

unittest:
go test -race ./pkg/proc/...
go test -race ./pkg/message/...

11 changes: 6 additions & 5 deletions cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/spf13/cobra"
"github.com/yezzey-gp/yproxy/config"
"github.com/yezzey-gp/yproxy/pkg/client"
"github.com/yezzey-gp/yproxy/pkg/message"
"github.com/yezzey-gp/yproxy/pkg/proc"
"github.com/yezzey-gp/yproxy/pkg/ylogger"
)
Expand Down Expand Up @@ -42,7 +43,7 @@ var catCmd = &cobra.Command{
}

defer con.Close()
msg := proc.NewCatMessage(args[0], decrypt).Encode()
msg := message.NewCatMessage(args[0], decrypt).Encode()
_, err = con.Write(msg)
if err != nil {
return err
Expand Down Expand Up @@ -81,7 +82,7 @@ var putCmd = &cobra.Command{
r := proc.NewProtoReader(ycl)

defer con.Close()
msg := proc.NewPutMessage(args[0], encrypt).Encode()
msg := message.NewPutMessage(args[0], encrypt).Encode()
_, err = con.Write(msg)
if err != nil {
return err
Expand All @@ -94,7 +95,7 @@ var putCmd = &cobra.Command{
for {
n, err := os.Stdin.Read(chunk)
if n > 0 {
msg := proc.NewCopyDataMessage()
msg := message.NewCopyDataMessage()
msg.Sz = uint64(n)
msg.Data = make([]byte, msg.Sz)
copy(msg.Data, chunk[:n])
Expand All @@ -119,7 +120,7 @@ var putCmd = &cobra.Command{

ylogger.Zero.Debug().Msg("send command complete msg")

msg = proc.NewCommandCompleteMessage().Encode()
msg = message.NewCommandCompleteMessage().Encode()
_, err = con.Write(msg)
if err != nil {
return err
Expand All @@ -130,7 +131,7 @@ var putCmd = &cobra.Command{
return err
}

if tp == proc.MessageTypeReadyForQuery {
if tp == message.MessageTypeReadyForQuery {
// ok

ylogger.Zero.Debug().Msg("got rfq")
Expand Down
2 changes: 1 addition & 1 deletion pkg/proc/cat_message.go → pkg/message/cat_message.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package proc
package message

import (
"bytes"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package proc
package message

import "encoding/binary"

Expand Down
2 changes: 1 addition & 1 deletion pkg/proc/copy_data.go → pkg/message/copy_data.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package proc
package message

import (
"encoding/binary"
Expand Down
52 changes: 52 additions & 0 deletions pkg/message/delete_message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package message

import (
"bytes"
"encoding/binary"
)

type DeleteMessage struct {
Name string
}

var _ ProtoMessage = &DeleteMessage{}

func NewDeleteMessage(name string) *DeleteMessage {
return &DeleteMessage{
Name: name,
}
}

func (c *DeleteMessage) Encode() []byte {
bt := []byte{
byte(MessageTypeDelete),
0,
0,
0,
}

bt = append(bt, []byte(c.Name)...)
bt = append(bt, 0)
ln := len(bt) + 8

bs := make([]byte, 8)
binary.BigEndian.PutUint64(bs, uint64(ln))
return append(bs, bt...)
}

func (c *DeleteMessage) Decode(body []byte) {
c.Name = c.GetDeleteName(body[4:])
}

func (c *DeleteMessage) GetDeleteName(b []byte) string {
buff := bytes.NewBufferString("")

for i := 0; i < len(b); i++ {
if b[i] == 0 {
break
}
buff.WriteByte(b[i])
}

return buff.String()
}
5 changes: 4 additions & 1 deletion pkg/proc/message.go → pkg/message/message.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package proc
package message

type ProtoMessage interface {
Decode([]byte)
Expand All @@ -15,6 +15,7 @@ const (
MessageTypeCommandComplete = MessageType(44)
MessageTypeReadyForQuery = MessageType(45)
MessageTypeCopyData = MessageType(46)
MessageTypeDelete = MessageType(47)

DecryptMessage = RequestEncryption(1)
NoDecryptMessage = RequestEncryption(0)
Expand All @@ -35,6 +36,8 @@ func (m MessageType) String() string {
return "READY FOR QUERY"
case MessageTypeCopyData:
return "COPY DATA"
case MessageTypeDelete:
return "DELETE"
}
return "UNKNOWN"
}
16 changes: 8 additions & 8 deletions pkg/proc/message_test.go → pkg/message/message_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package proc_test
package message_test

import (
"encoding/binary"
"testing"

"github.com/stretchr/testify/assert"
"github.com/yezzey-gp/yproxy/pkg/proc"
"github.com/yezzey-gp/yproxy/pkg/message"
)

func TestCatMsg(t *testing.T) {
Expand All @@ -25,10 +25,10 @@ func TestCatMsg(t *testing.T) {
},
} {

msg := proc.NewCatMessage(tt.name, tt.decrypt)
msg := message.NewCatMessage(tt.name, tt.decrypt)
body := msg.Encode()

msg2 := proc.CatMessage{}
msg2 := message.CatMessage{}

msg2.Decode(body[8:])

Expand All @@ -54,10 +54,10 @@ func TestPutMsg(t *testing.T) {
},
} {

msg := proc.NewPutMessage(tt.name, tt.encrypt)
msg := message.NewPutMessage(tt.name, tt.encrypt)
body := msg.Encode()

msg2 := proc.CatMessage{}
msg2 := message.CatMessage{}

msg2.Decode(body[8:])

Expand All @@ -83,12 +83,12 @@ func TestCopyDataMsg(t *testing.T) {
},
} {

msg := proc.NewCopyDataMessage()
msg := message.NewCopyDataMessage()
msg.Data = tt.body
msg.Sz = uint64(len(tt.body))
body := msg.Encode()

msg2 := proc.CopyDataMessage{}
msg2 := message.CopyDataMessage{}

msg2.Decode(body[8:])

Expand Down
2 changes: 1 addition & 1 deletion pkg/proc/put_message.go → pkg/message/put_message.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package proc
package message

import (
"bytes"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package proc
package message

import "encoding/binary"

Expand Down
19 changes: 10 additions & 9 deletions pkg/proc/interaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/yezzey-gp/yproxy/pkg/client"
"github.com/yezzey-gp/yproxy/pkg/crypt"
"github.com/yezzey-gp/yproxy/pkg/message"
"github.com/yezzey-gp/yproxy/pkg/storage"
"github.com/yezzey-gp/yproxy/pkg/ylogger"
)
Expand All @@ -24,9 +25,9 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl *client.YClient
ylogger.Zero.Debug().Str("msg-type", tp.String()).Msg("recieved client request")

switch tp {
case MessageTypeCat:
case message.MessageTypeCat:
// omit first byte
msg := CatMessage{}
msg := message.CatMessage{}
msg.Decode(body)
ylogger.Zero.Debug().Str("object-path", msg.Name).Msg("cat object")
r, err := s.CatFileFromStorage(msg.Name)
Expand All @@ -48,9 +49,9 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl *client.YClient

_ = ycl.Conn.Close()

case MessageTypePut:
case message.MessageTypePut:

msg := PutMessage{}
msg := message.PutMessage{}
msg.Decode(body)

var w io.WriteCloser
Expand Down Expand Up @@ -89,8 +90,8 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl *client.YClient
ylogger.Zero.Debug().Str("msg-type", tp.String()).Msg("recieved client request")

switch tp {
case MessageTypeCopyData:
msg := CopyDataMessage{}
case message.MessageTypeCopyData:
msg := message.CopyDataMessage{}
msg.Decode(body)
if n, err := ww.Write(msg.Data); err != nil {
_ = ycl.ReplyError(err, "failed to compelete request")
Expand All @@ -104,8 +105,8 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl *client.YClient
_ = ycl.Conn.Close()
return
}
case MessageTypeCommandComplete:
msg := CommandCompleteMessage{}
case message.MessageTypeCommandComplete:
msg := message.CommandCompleteMessage{}
msg.Decode(body)

if err := ww.Close(); err != nil {
Expand All @@ -131,7 +132,7 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl *client.YClient
return ycl.Conn.Close()
}

_, err = ycl.Conn.Write(NewReadyForQueryMessage().Encode())
_, err = ycl.Conn.Write(message.NewReadyForQueryMessage().Encode())

if err != nil {
_ = ycl.ReplyError(err, "failed to upload")
Expand Down
5 changes: 3 additions & 2 deletions pkg/proc/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net"

"github.com/yezzey-gp/yproxy/pkg/client"
"github.com/yezzey-gp/yproxy/pkg/message"
"github.com/yezzey-gp/yproxy/pkg/ylogger"
)

Expand All @@ -22,7 +23,7 @@ func NewProtoReader(ycl *client.YClient) *ProtoReader {

const maxMsgLen = 1 << 20

func (r *ProtoReader) ReadPacket() (MessageType, []byte, error) {
func (r *ProtoReader) ReadPacket() (message.MessageType, []byte, error) {
msgLenBuf := make([]byte, 8)
_, err := io.ReadFull(r.c, msgLenBuf)
if err != nil {
Expand All @@ -49,6 +50,6 @@ func (r *ProtoReader) ReadPacket() (MessageType, []byte, error) {
return 0, nil, err
}

msgType := MessageType(data[0])
msgType := message.MessageType(data[0])
return msgType, data, nil
}
9 changes: 9 additions & 0 deletions pkg/vacuum/vacuum.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package vacuum

// Greenplum + yezzey specific logic for external storage vacuuming
// Yezzey stores AO/AOCS relations data in one or several files in external
// storage (chunks). Chunk considered to be obsolete when
// table referencing this chunk was deleted and all backups (WAL-G) which
// contains this table was deleted

// TODO: implement