From 182829fd9589e4e84222b3849f2d17565a8e2927 Mon Sep 17 00:00:00 2001 From: Stepan Filippov <43007025+debebantur@users.noreply.github.com> Date: Tue, 7 May 2024 11:27:12 +0500 Subject: [PATCH] Add command for copy files (#36) --- cmd/client/main.go | 42 ++++++++++++- config/instance.go | 39 ++++++------ pkg/message/copy_message.go | 73 +++++++++++++++++++++++ pkg/message/message.go | 3 + pkg/message/message_test.go | 17 ++++++ pkg/proc/interaction.go | 115 +++++++++++++++++++++++++++++++++++- pkg/storage/storage.go | 39 ++++++++---- 7 files changed, 294 insertions(+), 34 deletions(-) create mode 100644 pkg/message/copy_message.go diff --git a/cmd/client/main.go b/cmd/client/main.go index fd70893..eacb630 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -17,6 +17,7 @@ import ( ) var cfgPath string +var oldCfgPath string var logLevel string var decrypt bool var encrypt bool @@ -51,7 +52,7 @@ var catCmd = &cobra.Command{ return err } - ylogger.Zero.Debug().Bytes("msg", msg).Msg("constructed message") + ylogger.Zero.Debug().Bytes("msg", msg).Msg("constructed cat message") _, err = io.Copy(os.Stdout, con) if err != nil { @@ -62,6 +63,36 @@ var catCmd = &cobra.Command{ }, } +var copyCmd = &cobra.Command{ + Use: "copy", + Short: "copy", + RunE: func(cmd *cobra.Command, args []string) error { + ylogger.Zero.Info().Msg("Execute copy command") + err := config.LoadInstanceConfig(cfgPath) + if err != nil { + return err + } + instanceCnf := config.InstanceConfig() + + con, err := net.Dial("unix", instanceCnf.SocketPath) + if err != nil { + return err + } + + defer con.Close() + ylogger.Zero.Info().Str("name", args[0]).Msg("copy") + msg := message.NewCopyMessage(args[0], oldCfgPath, encrypt, decrypt).Encode() + _, err = con.Write(msg) + if err != nil { + return err + } + + ylogger.Zero.Debug().Bytes("msg", msg).Msg("constructed copy msg") + + return nil + }, +} + var putCmd = &cobra.Command{ Use: "put", Short: "put", @@ -90,7 +121,7 @@ var putCmd = &cobra.Command{ return err } - ylogger.Zero.Debug().Bytes("msg", msg).Msg("constructed message") + ylogger.Zero.Debug().Bytes("msg", msg).Msg("constructed put message") const SZ = 65536 chunk := make([]byte, SZ) @@ -170,7 +201,7 @@ var listCmd = &cobra.Command{ return err } - ylogger.Zero.Debug().Bytes("msg", msg).Msg("constructed message") + ylogger.Zero.Debug().Bytes("msg", msg).Msg("constructed list message") ycl := client.NewYClient(con) r := proc.NewProtoReader(ycl) @@ -216,6 +247,11 @@ func init() { catCmd.PersistentFlags().BoolVarP(&decrypt, "decrypt", "d", false, "decrypt external object or not") rootCmd.AddCommand(catCmd) + copyCmd.PersistentFlags().BoolVarP(&decrypt, "decrypt", "d", false, "decrypt external object or not") + copyCmd.PersistentFlags().BoolVarP(&encrypt, "encrypt", "e", false, "encrypt external object before put") + copyCmd.PersistentFlags().StringVarP(&oldCfgPath, "old-config", "", "/etc/yproxy/yproxy.yaml", "path to old yproxy config file") + rootCmd.AddCommand(copyCmd) + putCmd.PersistentFlags().BoolVarP(&encrypt, "encrypt", "e", false, "encrypt external object before put") rootCmd.AddCommand(putCmd) diff --git a/config/instance.go b/config/instance.go index 8d0f3ea..bcb8abe 100644 --- a/config/instance.go +++ b/config/instance.go @@ -66,12 +66,29 @@ func EmbedDefaults(cfgInstance *Instance) { } } -func LoadInstanceConfig(cfgPath string) error { +func LoadInstanceConfig(cfgPath string) (err error) { + cfgInstance, err = ReadInstanceConfig(cfgPath) + if err != nil { + return + } + + cfgInstance.ReadSystemdSocketPath() + EmbedDefaults(&cfgInstance) + + configBytes, err := json.MarshalIndent(cfgInstance, "", " ") + if err != nil { + return + } + + log.Println("Running config:", string(configBytes)) + return +} + +func ReadInstanceConfig(cfgPath string) (Instance, error) { var cfg Instance file, err := os.Open(cfgPath) if err != nil { - cfgInstance = cfg - return err + return cfg, err } defer func(file *os.File) { err := file.Close() @@ -81,20 +98,8 @@ func LoadInstanceConfig(cfgPath string) error { }(file) if err := initInstanceConfig(file, &cfg); err != nil { - cfgInstance = cfg - return err + return cfg, err } - configBytes, err := json.MarshalIndent(cfg, "", " ") - if err != nil { - cfgInstance = cfg - return err - } - - cfg.ReadSystemdSocketPath() - EmbedDefaults(&cfg) - - log.Println("Running config:", string(configBytes)) - cfgInstance = cfg - return nil + return cfg, nil } diff --git a/pkg/message/copy_message.go b/pkg/message/copy_message.go new file mode 100644 index 0000000..3a1e1cc --- /dev/null +++ b/pkg/message/copy_message.go @@ -0,0 +1,73 @@ +package message + +import ( + "encoding/binary" + "fmt" + + "github.com/yezzey-gp/yproxy/pkg/ylogger" +) + +type CopyMessage struct { + Decrypt bool + Encrypt bool + Name string + OldCfgPath string +} + +var _ ProtoMessage = &CopyMessage{} + +func NewCopyMessage(name, oldCfgPath string, encrypt, decrypt bool) *CopyMessage { + return &CopyMessage{ + Name: name, + Encrypt: encrypt, + Decrypt: decrypt, + OldCfgPath: oldCfgPath, + } +} + +func (message *CopyMessage) Encode() []byte { + encodedMessage := []byte{ + byte(MessageTypeCopy), + byte(NoDecryptMessage), + byte(NoEncryptMessage), + 0, + } + + if message.Decrypt { + encodedMessage[1] = byte(DecryptMessage) + } + + if message.Encrypt { + encodedMessage[2] = byte(EncryptMessage) + } + + byteName := []byte(message.Name) + byteLen := make([]byte, 8) + binary.BigEndian.PutUint64(byteLen, uint64(len(byteName))) + encodedMessage = append(encodedMessage, byteLen...) + encodedMessage = append(encodedMessage, byteName...) + + byteOldCfg := []byte(message.OldCfgPath) + binary.BigEndian.PutUint64(byteLen, uint64(len(byteOldCfg))) + encodedMessage = append(encodedMessage, byteLen...) + encodedMessage = append(encodedMessage, byteOldCfg...) + + binary.BigEndian.PutUint64(byteLen, uint64(len(encodedMessage)+8)) + fmt.Printf("send: %v\n", MessageType(encodedMessage[0])) + ylogger.Zero.Debug().Str("object-path", MessageType(encodedMessage[0]).String()).Msg("decrypt object") + return append(byteLen, encodedMessage...) +} + +func (encodedMessage *CopyMessage) Decode(data []byte) { + if data[1] == byte(DecryptMessage) { + encodedMessage.Decrypt = true + } + if data[2] == byte(EncryptMessage) { + encodedMessage.Encrypt = true + } + + nameLen := binary.BigEndian.Uint64(data[4:12]) + encodedMessage.Name = string(data[12 : 12+nameLen]) + oldConfLen := binary.BigEndian.Uint64(data[12+nameLen : 12+nameLen+8]) + encodedMessage.OldCfgPath = string(data[12+nameLen+8 : 12+nameLen+8+oldConfLen]) +} diff --git a/pkg/message/message.go b/pkg/message/message.go index 11513d6..06e813a 100644 --- a/pkg/message/message.go +++ b/pkg/message/message.go @@ -19,6 +19,7 @@ const ( MessageTypeList = MessageType(48) MessageTypeObjectMeta = MessageType(49) MessageTypePatch = MessageType(50) + MessageTypeCopy = MessageType(51) DecryptMessage = RequestEncryption(1) NoDecryptMessage = RequestEncryption(0) @@ -45,6 +46,8 @@ func (m MessageType) String() string { return "LIST" case MessageTypeObjectMeta: return "OBJECT META" + case MessageTypeCopy: + return "COPY" } return "UNKNOWN" } diff --git a/pkg/message/message_test.go b/pkg/message/message_test.go index 14549e1..515019d 100644 --- a/pkg/message/message_test.go +++ b/pkg/message/message_test.go @@ -156,3 +156,20 @@ func TestListMsg(t *testing.T) { assert.Equal(msg.Prefix, msg2.Prefix) } } + +func TestCopyMsg(t *testing.T) { + assert := assert.New(t) + + msg := message.NewCopyMessage("myname/mynextname", "myoldcfg/path", true, true) + body := msg.Encode() + + assert.Equal(body[8], byte(message.MessageTypeCopy)) + + msg2 := message.CopyMessage{} + msg2.Decode(body[8:]) + + assert.Equal("myname/mynextname", msg2.Name) + assert.Equal("myoldcfg/path", msg2.OldCfgPath) + assert.True(msg2.Decrypt) + assert.True(msg2.Encrypt) +} diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index eff3215..7df615f 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -3,8 +3,10 @@ package proc import ( "fmt" "io" + "strings" "sync" + "github.com/yezzey-gp/yproxy/config" "github.com/yezzey-gp/yproxy/pkg/client" "github.com/yezzey-gp/yproxy/pkg/crypt" "github.com/yezzey-gp/yproxy/pkg/message" @@ -48,10 +50,11 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl *client.YClient return err } } - _, err = io.Copy(ycl.Conn, contentReader) + n, err := io.Copy(ycl.Conn, contentReader) if err != nil { _ = ycl.ReplyError(err, "copy failed to complete") } + ylogger.Zero.Debug().Int64("copied bytes", n).Msg("decrypt object") case message.MessageTypePut: @@ -169,7 +172,117 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl *client.YClient return nil } + case message.MessageTypeCopy: + msg := message.CopyMessage{} + msg.Decode(body) + + //get config for old bucket + instanceCnf, err := config.ReadInstanceConfig(msg.OldCfgPath) + if err != nil { + _ = ycl.ReplyError(fmt.Errorf("could not read old config: %s", err), "failed to compelete request") + return nil + } + config.EmbedDefaults(&instanceCnf) + oldStorage := storage.NewStorage(&instanceCnf.StorageCnf) + fmt.Printf("ok new conf: %v\n", instanceCnf) + + //list objects + objectMetas, err := oldStorage.ListPath(msg.Name) + if err != nil { + _ = ycl.ReplyError(fmt.Errorf("could not list objects: %s", err), "failed to compelete request") + return nil + } + + var failed []*storage.S3ObjectMeta + retryCount := 0 + for len(objectMetas) > 0 && retryCount < 10 { + retryCount++ + for i := 0; i < len(objectMetas); i++ { + path := strings.TrimPrefix(objectMetas[i].Path, instanceCnf.StorageCnf.StoragePrefix) + + //get reader + yr := NewYRetryReader(NewRestartReader(oldStorage, path)) + var fromReader io.Reader + fromReader = yr + defer yr.Close() + + if msg.Decrypt { + fromReader, err = cr.Decrypt(yr) + if err != nil { + ylogger.Zero.Error().Err(err).Msg("failed to decrypt object") + failed = append(failed, objectMetas[i]) + continue + } + } + + //reencrypt + r, w := io.Pipe() + + go func() { + defer func() { + if err := w.Close(); err != nil { + ylogger.Zero.Warn().Err(err).Msg("failed to close writer") + } + }() + + var ww io.WriteCloser = w + + if msg.Encrypt { + var err error + ww, err = cr.Encrypt(w) + if err != nil { + ylogger.Zero.Error().Err(err).Msg("failed to encrypt object") + failed = append(failed, objectMetas[i]) + return + } + } + + if _, err := io.Copy(ww, fromReader); err != nil { + ylogger.Zero.Error().Err(err).Msg("failed to copy data") + failed = append(failed, objectMetas[i]) + return + } + + if err := ww.Close(); err != nil { + ylogger.Zero.Error().Err(err).Msg("failed to close writer") + failed = append(failed, objectMetas[i]) + return + } + }() + + //write file + err = s.PutFileToDest(path, r) + if err != nil { + ylogger.Zero.Error().Err(err).Msg("failed to upload file") + failed = append(failed, objectMetas[i]) + continue + } + } + objectMetas = failed + fmt.Printf("failed files count: %d\n", len(objectMetas)) + failed = make([]*storage.S3ObjectMeta, 0) + } + + if _, err = ycl.Conn.Write(message.NewReadyForQueryMessage().Encode()); err != nil { + _ = ycl.ReplyError(err, "failed to upload") + return nil + } + + if len(objectMetas) > 0 { + fmt.Printf("failed files count: %d\n", len(objectMetas)) + fmt.Printf("failed files: %v\n", objectMetas) + ylogger.Zero.Error().Int("failed files count", len(objectMetas)).Msg("failed to upload some files") + ylogger.Zero.Error().Any("failed files", objectMetas).Msg("failed to upload some files") + + _ = ycl.ReplyError(err, "failed to copy some files") + return nil + } else { + fmt.Println("Copy finished successfully") + ylogger.Zero.Info().Msg("Copy finished successfully") + } + default: + ylogger.Zero.Error().Any("type", tp).Msg("what tip is it") _ = ycl.ReplyError(nil, "wrong request type") return nil diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index e63c27b..176620d 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -132,21 +132,34 @@ func (s *S3StorageInteractor) ListPath(prefix string) ([]*S3ObjectMeta, error) { return nil, err } + var continuationToken *string prefix = path.Join(s.cnf.StoragePrefix, prefix) - input := &s3.ListObjectsInput{ - Bucket: &s.cnf.StorageBucket, - Prefix: aws.String(prefix), - } - - out, err := sess.ListObjects(input) - metas := make([]*S3ObjectMeta, 0) - for _, obj := range out.Contents { - metas = append(metas, &S3ObjectMeta{ - Path: *obj.Key, - Size: *obj.Size, - }) - } + for { + input := &s3.ListObjectsV2Input{ + Bucket: &s.cnf.StorageBucket, + Prefix: aws.String(prefix), + ContinuationToken: continuationToken, + } + + out, err := sess.ListObjectsV2(input) + if err != nil { + fmt.Printf("list error: %v\n", err) + } + + for _, obj := range out.Contents { + metas = append(metas, &S3ObjectMeta{ + Path: *obj.Key, + Size: *obj.Size, + }) + } + + if !*out.IsTruncated { + break + } + + continuationToken = out.NextContinuationToken + } return metas, nil }