Skip to content

Commit

Permalink
Added S3 listing (#23)
Browse files Browse the repository at this point in the history
* Added new message types

* Added storage.ListPath method

* Added ListMessage processing

* Added list command to client

* Fixes & improvements

* Changed chunk size

* Incorrect message tyoe processing

* Path -> Prefix in ListMessage

* Added test for ListMessage encoding/decoding

---------

Co-authored-by: Yury Frolov <[email protected]>
  • Loading branch information
EinKrebs and Yury Frolov authored Dec 28, 2023
1 parent 375657b commit abf0f7c
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 0 deletions.
68 changes: 68 additions & 0 deletions cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"net"
"os"

"github.com/yezzey-gp/yproxy/pkg/storage"

"github.com/spf13/cobra"
"github.com/yezzey-gp/yproxy/config"
"github.com/yezzey-gp/yproxy/pkg/client"
Expand Down Expand Up @@ -143,6 +145,70 @@ var putCmd = &cobra.Command{
},
}

var listCmd = &cobra.Command{
Use: "list",
Short: "list",
RunE: func(cmd *cobra.Command, args []string) error {

err := config.LoadInstanceConfig(cfgPath)
if err != nil {
return err
}

instanceCnf := config.InstanceConfig()

con, err := net.Dial("unix", instanceCnf.SocketPath)

if err != nil {
return err
}

defer con.Close()
msg := message.NewListMessage(args[0]).Encode()
_, err = con.Write(msg)
if err != nil {
return err
}

ylogger.Zero.Debug().Bytes("msg", msg).Msg("constructed message")

ycl := client.NewYClient(con)
r := proc.NewProtoReader(ycl)

done := false
res := make([]*storage.S3ObjectMeta, 0)
for {
if done {
break
}
tp, body, err := r.ReadPacket()
if err != nil {
return err
}

switch tp {
case message.MessageTypeObjectMeta:
meta := message.ObjectMetaMessage{}
meta.Decode(body)

res = append(res, meta.Content...)
break
case message.MessageTypeReadyForQuery:
done = true
break
default:
return fmt.Errorf("Incorrect message type: %s", tp.String())
}
}

for _, meta := range res {
fmt.Printf("Object: {Name: \"%s\", size: %d}\n", meta.Path, meta.Size)
}

return nil
},
}

func init() {
rootCmd.PersistentFlags().StringVarP(&cfgPath, "config", "c", "/etc/yproxy/yproxy.yaml", "path to yproxy config file")
rootCmd.PersistentFlags().StringVarP(&logLevel, "log-level", "l", "", "log level")
Expand All @@ -152,6 +218,8 @@ func init() {

putCmd.PersistentFlags().BoolVarP(&encrypt, "encrypt", "e", false, "encrypt external object before put")
rootCmd.AddCommand(putCmd)

rootCmd.AddCommand(listCmd)
}

func main() {
Expand Down
52 changes: 52 additions & 0 deletions pkg/message/list_message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package message

import (
"bytes"
"encoding/binary"
)

type ListMessage struct {
Prefix string
}

var _ ProtoMessage = &ListMessage{}

func NewListMessage(name string) *ListMessage {
return &ListMessage{
Prefix: name,
}
}

func (c *ListMessage) Encode() []byte {
bt := []byte{
byte(MessageTypeList),
0,
0,
0,
}

bt = append(bt, []byte(c.Prefix)...)
bt = append(bt, 0)
ln := len(bt) + 8

bs := make([]byte, 8)
binary.BigEndian.PutUint64(bs, uint64(ln))
return append(bs, bt...)
}

func (c *ListMessage) Decode(body []byte) {
c.Prefix = c.GetListName(body[4:])
}

func (c *ListMessage) GetListName(b []byte) string {
buff := bytes.NewBufferString("")

for i := 0; i < len(b); i++ {
if b[i] == 0 {
break
}
buff.WriteByte(b[i])
}

return buff.String()
}
6 changes: 6 additions & 0 deletions pkg/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ const (
MessageTypeReadyForQuery = MessageType(45)
MessageTypeCopyData = MessageType(46)
MessageTypeDelete = MessageType(47)
MessageTypeList = MessageType(48)
MessageTypeObjectMeta = MessageType(49)

DecryptMessage = RequestEncryption(1)
NoDecryptMessage = RequestEncryption(0)
Expand All @@ -38,6 +40,10 @@ func (m MessageType) String() string {
return "COPY DATA"
case MessageTypeDelete:
return "DELETE"
case MessageTypeList:
return "LIST"
case MessageTypeObjectMeta:
return "OBJECT META"
}
return "UNKNOWN"
}
24 changes: 24 additions & 0 deletions pkg/message/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,27 @@ func TestCopyDataMsg(t *testing.T) {
assert.Equal(msg.Sz, msg2.Sz)
}
}

func TestListMsg(t *testing.T) {
assert := assert.New(t)

type tcase struct {
prefix string
}

for _, tt := range []tcase{
{
"nam1",
},
} {

msg := message.NewListMessage(tt.prefix)
body := msg.Encode()

msg2 := message.ListMessage{}

msg2.Decode(body[8:])

assert.Equal(msg.Prefix, msg2.Prefix)
}
}
72 changes: 72 additions & 0 deletions pkg/message/object_meta_message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package message

import (
"bytes"
"encoding/binary"

"github.com/yezzey-gp/yproxy/pkg/storage"
)

type ObjectMetaMessage struct {
Content []*storage.S3ObjectMeta
}

var _ ProtoMessage = &ObjectMetaMessage{}

func NewObjectMetaMessage(content []*storage.S3ObjectMeta) *ObjectMetaMessage {
return &ObjectMetaMessage{
Content: content,
}
}

func (c *ObjectMetaMessage) Encode() []byte {
bt := []byte{
byte(MessageTypeObjectMeta),
0,
0,
0,
}

for _, objMeta := range c.Content {
bt = append(bt, []byte(objMeta.Path)...)
bt = append(bt, 0)

bn := make([]byte, 8)
binary.BigEndian.PutUint64(bn, uint64(objMeta.Size))
bt = append(bt, bn...)
}

ln := len(bt) + 8
bs := make([]byte, 8)
binary.BigEndian.PutUint64(bs, uint64(ln))
return append(bs, bt...)
}

func (c *ObjectMetaMessage) Decode(body []byte) {
body = body[4:]
c.Content = make([]*storage.S3ObjectMeta, 0)
for len(body) > 0 {
name, index := c.GetString(body)
size := int64(binary.BigEndian.Uint64(body[index : index+8]))

c.Content = append(c.Content, &storage.S3ObjectMeta{
Path: name,
Size: size,
})
body = body[index+8:]
}
}

func (c *ObjectMetaMessage) GetString(b []byte) (string, int) {
buff := bytes.NewBufferString("")

i := 0
for ; i < len(b); i++ {
if b[i] == 0 {
break
}
buff.WriteByte(b[i])
}

return buff.String(), i + 1
}
31 changes: 31 additions & 0 deletions pkg/proc/interaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,37 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl *client.YClient
return ycl.Conn.Close()
}

case message.MessageTypeList:
msg := message.ListMessage{}
msg.Decode(body)

objectMetas, err := s.ListPath(msg.Prefix)
if err != nil {
_ = ycl.ReplyError(fmt.Errorf("could not list objects: %s", err), "failed to compelete request")

_ = ycl.Conn.Close()
}

const chunkSize = 1000

for i := 0; i < len(objectMetas); i += chunkSize {
_, err = ycl.Conn.Write(message.NewObjectMetaMessage(objectMetas[i:min(i+chunkSize, len(objectMetas))]).Encode())
if err != nil {
_ = ycl.ReplyError(err, "failed to upload")

return ycl.Conn.Close()
}

}

_, err = ycl.Conn.Write(message.NewReadyForQueryMessage().Encode())

if err != nil {
_ = ycl.ReplyError(err, "failed to upload")

return ycl.Conn.Close()
}

default:

_ = ycl.ReplyError(nil, "wrong request type")
Expand Down
32 changes: 32 additions & 0 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

type StorageReader interface {
CatFileFromStorage(name string) (io.Reader, error)
ListPath(name string) ([]*S3ObjectMeta, error)
}

type StorageWriter interface {
Expand Down Expand Up @@ -87,3 +88,34 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader) error {

return err
}

type S3ObjectMeta struct {
Path string
Size int64
}

func (s *S3StorageInteractor) ListPath(prefix string) ([]*S3ObjectMeta, error) {
sess, err := s.pool.GetSession(context.TODO())
if err != nil {
ylogger.Zero.Err(err).Msg("failed to acquire s3 session")
return nil, err
}

prefix = path.Join(s.cnf.StoragePrefix, prefix)
input := &s3.ListObjectsInput{
Bucket: &s.cnf.StorageBucket,
Prefix: aws.String(prefix),
}

out, err := sess.ListObjects(input)

metas := make([]*S3ObjectMeta, 0)
for _, obj := range out.Contents {
metas = append(metas, &S3ObjectMeta{
Path: *obj.Key,
Size: *obj.Size,
})
}

return metas, nil
}

0 comments on commit abf0f7c

Please sign in to comment.