Skip to content

Commit

Permalink
Refactor: move message definition and util functions to separate pkg (#…
Browse files Browse the repository at this point in the history
…21)

* Refactor: move message definition and util functions to separate pkg

Also define vacuum and delete_msg structures, TBD

* Fix makefile
  • Loading branch information
reshke authored Dec 25, 2023
1 parent 181dec2 commit 8105eda
Show file tree
Hide file tree
Showing 13 changed files with 98 additions and 31 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ build:
####################### TESTS #######################

unittest:
go test -race ./pkg/proc/...
go test -race ./pkg/message/...

11 changes: 6 additions & 5 deletions cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/spf13/cobra"
"github.com/yezzey-gp/yproxy/config"
"github.com/yezzey-gp/yproxy/pkg/client"
"github.com/yezzey-gp/yproxy/pkg/message"
"github.com/yezzey-gp/yproxy/pkg/proc"
"github.com/yezzey-gp/yproxy/pkg/ylogger"
)
Expand Down Expand Up @@ -42,7 +43,7 @@ var catCmd = &cobra.Command{
}

defer con.Close()
msg := proc.NewCatMessage(args[0], decrypt).Encode()
msg := message.NewCatMessage(args[0], decrypt).Encode()
_, err = con.Write(msg)
if err != nil {
return err
Expand Down Expand Up @@ -81,7 +82,7 @@ var putCmd = &cobra.Command{
r := proc.NewProtoReader(ycl)

defer con.Close()
msg := proc.NewPutMessage(args[0], encrypt).Encode()
msg := message.NewPutMessage(args[0], encrypt).Encode()
_, err = con.Write(msg)
if err != nil {
return err
Expand All @@ -94,7 +95,7 @@ var putCmd = &cobra.Command{
for {
n, err := os.Stdin.Read(chunk)
if n > 0 {
msg := proc.NewCopyDataMessage()
msg := message.NewCopyDataMessage()
msg.Sz = uint64(n)
msg.Data = make([]byte, msg.Sz)
copy(msg.Data, chunk[:n])
Expand All @@ -119,7 +120,7 @@ var putCmd = &cobra.Command{

ylogger.Zero.Debug().Msg("send command complete msg")

msg = proc.NewCommandCompleteMessage().Encode()
msg = message.NewCommandCompleteMessage().Encode()
_, err = con.Write(msg)
if err != nil {
return err
Expand All @@ -130,7 +131,7 @@ var putCmd = &cobra.Command{
return err
}

if tp == proc.MessageTypeReadyForQuery {
if tp == message.MessageTypeReadyForQuery {
// ok

ylogger.Zero.Debug().Msg("got rfq")
Expand Down
2 changes: 1 addition & 1 deletion pkg/proc/cat_message.go → pkg/message/cat_message.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package proc
package message

import (
"bytes"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package proc
package message

import "encoding/binary"

Expand Down
2 changes: 1 addition & 1 deletion pkg/proc/copy_data.go → pkg/message/copy_data.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package proc
package message

import (
"encoding/binary"
Expand Down
52 changes: 52 additions & 0 deletions pkg/message/delete_message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package message

import (
"bytes"
"encoding/binary"
)

type DeleteMessage struct {
Name string
}

var _ ProtoMessage = &DeleteMessage{}

func NewDeleteMessage(name string) *DeleteMessage {
return &DeleteMessage{
Name: name,
}
}

func (c *DeleteMessage) Encode() []byte {
bt := []byte{
byte(MessageTypeDelete),
0,
0,
0,
}

bt = append(bt, []byte(c.Name)...)
bt = append(bt, 0)
ln := len(bt) + 8

bs := make([]byte, 8)
binary.BigEndian.PutUint64(bs, uint64(ln))
return append(bs, bt...)
}

func (c *DeleteMessage) Decode(body []byte) {
c.Name = c.GetDeleteName(body[4:])
}

func (c *DeleteMessage) GetDeleteName(b []byte) string {
buff := bytes.NewBufferString("")

for i := 0; i < len(b); i++ {
if b[i] == 0 {
break
}
buff.WriteByte(b[i])
}

return buff.String()
}
5 changes: 4 additions & 1 deletion pkg/proc/message.go → pkg/message/message.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package proc
package message

type ProtoMessage interface {
Decode([]byte)
Expand All @@ -15,6 +15,7 @@ const (
MessageTypeCommandComplete = MessageType(44)
MessageTypeReadyForQuery = MessageType(45)
MessageTypeCopyData = MessageType(46)
MessageTypeDelete = MessageType(47)

DecryptMessage = RequestEncryption(1)
NoDecryptMessage = RequestEncryption(0)
Expand All @@ -35,6 +36,8 @@ func (m MessageType) String() string {
return "READY FOR QUERY"
case MessageTypeCopyData:
return "COPY DATA"
case MessageTypeDelete:
return "DELETE"
}
return "UNKNOWN"
}
16 changes: 8 additions & 8 deletions pkg/proc/message_test.go → pkg/message/message_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package proc_test
package message_test

import (
"encoding/binary"
"testing"

"github.com/stretchr/testify/assert"
"github.com/yezzey-gp/yproxy/pkg/proc"
"github.com/yezzey-gp/yproxy/pkg/message"
)

func TestCatMsg(t *testing.T) {
Expand All @@ -25,10 +25,10 @@ func TestCatMsg(t *testing.T) {
},
} {

msg := proc.NewCatMessage(tt.name, tt.decrypt)
msg := message.NewCatMessage(tt.name, tt.decrypt)
body := msg.Encode()

msg2 := proc.CatMessage{}
msg2 := message.CatMessage{}

msg2.Decode(body[8:])

Expand All @@ -54,10 +54,10 @@ func TestPutMsg(t *testing.T) {
},
} {

msg := proc.NewPutMessage(tt.name, tt.encrypt)
msg := message.NewPutMessage(tt.name, tt.encrypt)
body := msg.Encode()

msg2 := proc.CatMessage{}
msg2 := message.CatMessage{}

msg2.Decode(body[8:])

Expand All @@ -83,12 +83,12 @@ func TestCopyDataMsg(t *testing.T) {
},
} {

msg := proc.NewCopyDataMessage()
msg := message.NewCopyDataMessage()
msg.Data = tt.body
msg.Sz = uint64(len(tt.body))
body := msg.Encode()

msg2 := proc.CopyDataMessage{}
msg2 := message.CopyDataMessage{}

msg2.Decode(body[8:])

Expand Down
2 changes: 1 addition & 1 deletion pkg/proc/put_message.go → pkg/message/put_message.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package proc
package message

import (
"bytes"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package proc
package message

import "encoding/binary"

Expand Down
19 changes: 10 additions & 9 deletions pkg/proc/interaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/yezzey-gp/yproxy/pkg/client"
"github.com/yezzey-gp/yproxy/pkg/crypt"
"github.com/yezzey-gp/yproxy/pkg/message"
"github.com/yezzey-gp/yproxy/pkg/storage"
"github.com/yezzey-gp/yproxy/pkg/ylogger"
)
Expand All @@ -24,9 +25,9 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl *client.YClient
ylogger.Zero.Debug().Str("msg-type", tp.String()).Msg("recieved client request")

switch tp {
case MessageTypeCat:
case message.MessageTypeCat:
// omit first byte
msg := CatMessage{}
msg := message.CatMessage{}
msg.Decode(body)
ylogger.Zero.Debug().Str("object-path", msg.Name).Msg("cat object")
r, err := s.CatFileFromStorage(msg.Name)
Expand All @@ -48,9 +49,9 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl *client.YClient

_ = ycl.Conn.Close()

case MessageTypePut:
case message.MessageTypePut:

msg := PutMessage{}
msg := message.PutMessage{}
msg.Decode(body)

var w io.WriteCloser
Expand Down Expand Up @@ -89,8 +90,8 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl *client.YClient
ylogger.Zero.Debug().Str("msg-type", tp.String()).Msg("recieved client request")

switch tp {
case MessageTypeCopyData:
msg := CopyDataMessage{}
case message.MessageTypeCopyData:
msg := message.CopyDataMessage{}
msg.Decode(body)
if n, err := ww.Write(msg.Data); err != nil {
_ = ycl.ReplyError(err, "failed to compelete request")
Expand All @@ -104,8 +105,8 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl *client.YClient
_ = ycl.Conn.Close()
return
}
case MessageTypeCommandComplete:
msg := CommandCompleteMessage{}
case message.MessageTypeCommandComplete:
msg := message.CommandCompleteMessage{}
msg.Decode(body)

if err := ww.Close(); err != nil {
Expand All @@ -131,7 +132,7 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl *client.YClient
return ycl.Conn.Close()
}

_, err = ycl.Conn.Write(NewReadyForQueryMessage().Encode())
_, err = ycl.Conn.Write(message.NewReadyForQueryMessage().Encode())

if err != nil {
_ = ycl.ReplyError(err, "failed to upload")
Expand Down
5 changes: 3 additions & 2 deletions pkg/proc/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net"

"github.com/yezzey-gp/yproxy/pkg/client"
"github.com/yezzey-gp/yproxy/pkg/message"
"github.com/yezzey-gp/yproxy/pkg/ylogger"
)

Expand All @@ -22,7 +23,7 @@ func NewProtoReader(ycl *client.YClient) *ProtoReader {

const maxMsgLen = 1 << 20

func (r *ProtoReader) ReadPacket() (MessageType, []byte, error) {
func (r *ProtoReader) ReadPacket() (message.MessageType, []byte, error) {
msgLenBuf := make([]byte, 8)
_, err := io.ReadFull(r.c, msgLenBuf)
if err != nil {
Expand All @@ -49,6 +50,6 @@ func (r *ProtoReader) ReadPacket() (MessageType, []byte, error) {
return 0, nil, err
}

msgType := MessageType(data[0])
msgType := message.MessageType(data[0])
return msgType, data, nil
}
9 changes: 9 additions & 0 deletions pkg/vacuum/vacuum.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package vacuum

// Greenplum + yezzey specific logic for external storage vacuuming
// Yezzey stores AO/AOCS relations data in one or several files in external
// storage (chunks). Chunk considered to be obsolete when
// table referencing this chunk was deleted and all backups (WAL-G) which
// contains this table was deleted

// TODO: implement

0 comments on commit 8105eda

Please sign in to comment.