Skip to content

Commit

Permalink
Patch object message defitions
Browse files Browse the repository at this point in the history
  • Loading branch information
reshke committed Jan 14, 2024
1 parent 14edfd6 commit 333e295
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 5 deletions.
1 change: 1 addition & 0 deletions pkg/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
MessageTypeDelete = MessageType(47)
MessageTypeList = MessageType(48)
MessageTypeObjectMeta = MessageType(49)
MessageTypePatch = MessageType(50)

DecryptMessage = RequestEncryption(1)
NoDecryptMessage = RequestEncryption(0)
Expand Down
36 changes: 34 additions & 2 deletions pkg/message/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,44 @@ func TestPutMsg(t *testing.T) {
msg := message.NewPutMessage(tt.name, tt.encrypt)
body := msg.Encode()

msg2 := message.CatMessage{}
msg2 := message.PutMessage{}

msg2.Decode(body[8:])

assert.Equal(msg.Name, msg2.Name)
assert.Equal(msg.Encrypt, msg2.Encrypt)
}
}

func TestPatchMsg(t *testing.T) {
assert := assert.New(t)

type tcase struct {
name string
encrypt bool
off uint64
err error
}

for _, tt := range []tcase{
{
"nam1",
true,
1235,
nil,
},
} {

msg := message.NewPatchMessage(tt.name, tt.off, tt.encrypt)
body := msg.Encode()

msg2 := message.PatchMessage{}

msg2.Decode(body[8:])

assert.Equal(msg.Name, msg2.Name)
assert.Equal(msg.Encrypt, msg2.Decrypt)
assert.Equal(msg.Encrypt, msg2.Encrypt)
assert.Equal(msg.Offset, msg2.Offset)
}
}

Expand Down
70 changes: 70 additions & 0 deletions pkg/message/patch_message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package message

import (
"bytes"
"encoding/binary"
)

type PatchMessage struct {
Encrypt bool
Offset uint64
Name string
}

var _ ProtoMessage = &PatchMessage{}

func NewPatchMessage(name string, offset uint64, encrypt bool) *PatchMessage {
return &PatchMessage{
Encrypt: encrypt,
Offset: offset,
Name: name,
}
}

func (c *PatchMessage) Encode() []byte {
bt := []byte{
byte(MessageTypePatch),
0,
0,
0,
}

if c.Encrypt {
bt[1] = byte(EncryptMessage)
} else {
bt[1] = byte(NoEncryptMessage)
}
offset := make([]byte, 8)
binary.BigEndian.PutUint64(offset, uint64(c.Offset))

bt = append(bt, offset...)
bt = append(bt, []byte(c.Name)...)
bt = append(bt, 0)
ln := len(bt) + 8

bs := make([]byte, 8)
binary.BigEndian.PutUint64(bs, uint64(ln))
return append(bs, bt...)
}

func (c *PatchMessage) GetPatchName(b []byte) string {
buff := bytes.NewBufferString("")

for i := 0; i < len(b); i++ {
if b[i] == 0 {
break
}
buff.WriteByte(b[i])
}

return buff.String()
}

func (c *PatchMessage) Decode(body []byte) {
if body[1] == byte(EncryptMessage) {
c.Encrypt = true
}

c.Offset = binary.BigEndian.Uint64(body[4:12])
c.Name = c.GetPatchName(body[12:])
}
35 changes: 32 additions & 3 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package storage

import (
"context"
"fmt"
"io"
"path"

Expand All @@ -19,16 +20,21 @@ type StorageReader interface {

type StorageWriter interface {
PutFileToDest(name string, r io.Reader) error
PatchFile(name string, r io.ReadSeeker, startOffste int64) error
}

type StorageLister interface {
ListPath(prefix string) ([]*S3ObjectMeta, error)
}

type StorageInteractor interface {
StorageReader
StorageWriter
StorageLister
}

type S3StorageInteractor struct {
StorageReader
StorageWriter
StorageInteractor

pool SessionPool
cnf *config.Storage
Expand Down Expand Up @@ -78,7 +84,6 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader) error {

_, err = up.Upload(
&s3manager.UploadInput{

Bucket: aws.String(s.cnf.StorageBucket),
Key: aws.String(objectPath),
Body: r,
Expand All @@ -89,6 +94,30 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader) error {
return err
}

func (s *S3StorageInteractor) PatchFile(name string, r io.ReadSeeker, startOffste int64) error {
sess, err := s.pool.GetSession(context.TODO())
if err != nil {
ylogger.Zero.Err(err).Msg("failed to acquire s3 session")
return nil
}

objectPath := path.Join(s.cnf.StoragePrefix, name)

input := &s3.PatchObjectInput{
Bucket: &s.cnf.StorageBucket,
Key: aws.String(objectPath),
Body: r,
ContentRange: aws.String(fmt.Sprintf("bytes %d-18446744073709551615", startOffste)),
}

_, err = sess.PatchObject(input)

ylogger.Zero.Debug().Str("key", objectPath).Str("bucket",
s.cnf.StorageBucket).Msg("modifying file in external storage")

return err
}

type S3ObjectMeta struct {
Path string
Size int64
Expand Down

0 comments on commit 333e295

Please sign in to comment.