From edeb8219ac5667b485f20cbf3bd3e6dcd7c2a493 Mon Sep 17 00:00:00 2001 From: Victor <87538976+visill@users.noreply.github.com> Date: Sun, 4 Aug 2024 16:01:14 +0700 Subject: [PATCH] Refactoring (#45) * Refactoring + Filestorage * Naming * Remove filestorage * Persistency * ArgCheck * cobra usage --- cmd/client/main.go | 313 +++++++++++++---------------- pkg/message/object_meta_message.go | 20 +- pkg/proc/interaction.go | 4 +- pkg/storage/s3storage.go | 138 +++++++++++++ pkg/storage/storage.go | 134 +----------- 5 files changed, 294 insertions(+), 315 deletions(-) create mode 100644 pkg/storage/s3storage.go diff --git a/cmd/client/main.go b/cmd/client/main.go index 92bd551..f396d07 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -23,15 +23,10 @@ var decrypt bool var encrypt bool var offset uint64 -var rootCmd = &cobra.Command{ - Use: "", - Short: "", -} +// TODOV +func Runner(f func(net.Conn, *config.Instance, []string) error) func(*cobra.Command, []string) error { -var catCmd = &cobra.Command{ - Use: "cat", - Short: "cat", - RunE: func(cmd *cobra.Command, args []string) error { + return func(cmd *cobra.Command, args []string) error { err := config.LoadInstanceConfig(cfgPath) if err != nil { @@ -47,211 +42,189 @@ var catCmd = &cobra.Command{ } defer con.Close() - msg := message.NewCatMessage(args[0], decrypt, offset).Encode() - _, err = con.Write(msg) - if err != nil { - return err - } - - ylogger.Zero.Debug().Bytes("msg", msg).Msg("constructed cat message") - - _, err = io.Copy(os.Stdout, con) - if err != nil { - return err - } - - return nil - }, + return f(con, instanceCnf, args) + } } -var copyCmd = &cobra.Command{ - Use: "copy", - Short: "copy", - RunE: func(cmd *cobra.Command, args []string) error { - ylogger.Zero.Info().Msg("Execute copy command") - err := config.LoadInstanceConfig(cfgPath) - if err != nil { - return err - } - instanceCnf := config.InstanceConfig() - - con, err := net.Dial("unix", instanceCnf.SocketPath) - if err != nil { - return err - } - - defer con.Close() - ylogger.Zero.Info().Str("name", args[0]).Msg("copy") - msg := message.NewCopyMessage(args[0], oldCfgPath, encrypt, decrypt).Encode() - _, err = con.Write(msg) - if err != nil { - return err - } - - ylogger.Zero.Debug().Bytes("msg", msg).Msg("constructed copy msg") - - client := client.NewYClient(con) - protoReader := proc.NewProtoReader(client) +func catFunc(con net.Conn, instanceCnf *config.Instance, args []string) error { + msg := message.NewCatMessage(args[0], decrypt, offset).Encode() + _, err := con.Write(msg) + if err != nil { + return err + } - ansType, body, err := protoReader.ReadPacket() - if err != nil { - ylogger.Zero.Debug().Err(err).Msg("error while ans") - return err - } + ylogger.Zero.Debug().Bytes("msg", msg).Msg("constructed cat message") - if ansType != message.MessageTypeReadyForQuery { - return fmt.Errorf("failed to copy, msg: %v", body) - } + _, err = io.Copy(os.Stdout, con) + if err != nil { + return err + } - return nil - }, + return nil } -var putCmd = &cobra.Command{ - Use: "put", - Short: "put", - RunE: func(cmd *cobra.Command, args []string) error { - - err := config.LoadInstanceConfig(cfgPath) - if err != nil { - return err - } - - instanceCnf := config.InstanceConfig() +func copyFunc(con net.Conn, instanceCnf *config.Instance, args []string) error { + ylogger.Zero.Info().Msg("Execute copy command") + ylogger.Zero.Info().Str("name", args[0]).Msg("copy") + msg := message.NewCopyMessage(args[0], oldCfgPath, encrypt, decrypt).Encode() + _, err := con.Write(msg) + if err != nil { + return err + } - con, err := net.Dial("unix", instanceCnf.SocketPath) + ylogger.Zero.Debug().Bytes("msg", msg).Msg("constructed copy msg") - if err != nil { - return err - } + client := client.NewYClient(con) + protoReader := proc.NewProtoReader(client) - ycl := client.NewYClient(con) - r := proc.NewProtoReader(ycl) + ansType, body, err := protoReader.ReadPacket() + if err != nil { + ylogger.Zero.Debug().Err(err).Msg("error while ans") + return err + } - defer con.Close() - msg := message.NewPutMessage(args[0], encrypt).Encode() - _, err = con.Write(msg) - if err != nil { - return err - } + if ansType != message.MessageTypeReadyForQuery { + return fmt.Errorf("failed to copy, msg: %v", body) + } + return nil +} - ylogger.Zero.Debug().Bytes("msg", msg).Msg("constructed put message") +func putFunc(con net.Conn, instanceCnf *config.Instance, args []string) error { + ycl := client.NewYClient(con) + r := proc.NewProtoReader(ycl) - const SZ = 65536 - chunk := make([]byte, SZ) - for { - n, err := os.Stdin.Read(chunk) - if n > 0 { - msg := message.NewCopyDataMessage() - msg.Sz = uint64(n) - msg.Data = make([]byte, msg.Sz) - copy(msg.Data, chunk[:n]) + msg := message.NewPutMessage(args[0], encrypt).Encode() + _, err := con.Write(msg) + if err != nil { + return err + } - nwr, err := con.Write(msg.Encode()) - if err != nil { - return err - } + ylogger.Zero.Debug().Bytes("msg", msg).Msg("constructed put message") - ylogger.Zero.Debug().Int("len", nwr).Msg("written copy data msg") - } + const SZ = 65536 + chunk := make([]byte, SZ) + for { + n, err := os.Stdin.Read(chunk) + if n > 0 { + msg := message.NewCopyDataMessage() + msg.Sz = uint64(n) + msg.Data = make([]byte, msg.Sz) + copy(msg.Data, chunk[:n]) - if err == nil { - continue - } - if err == io.EOF { - break - } else { + nwr, err := con.Write(msg.Encode()) + if err != nil { return err } - } - - ylogger.Zero.Debug().Msg("send command complete msg") - msg = message.NewCommandCompleteMessage().Encode() - _, err = con.Write(msg) - if err != nil { - return err + ylogger.Zero.Debug().Int("len", nwr).Msg("written copy data msg") } - tp, _, err := r.ReadPacket() - if err != nil { + if err == nil { + continue + } + if err == io.EOF { + break + } else { return err } + } - if tp == message.MessageTypeReadyForQuery { - // ok + ylogger.Zero.Debug().Msg("send command complete msg") - ylogger.Zero.Debug().Msg("got rfq") - } else { - return fmt.Errorf("failed to get rfq") - } + msg = message.NewCommandCompleteMessage().Encode() + _, err = con.Write(msg) + if err != nil { + return err + } + tp, _, err := r.ReadPacket() + if err != nil { + return err + } + + if tp == message.MessageTypeReadyForQuery { + ylogger.Zero.Debug().Msg("got rfq") return nil - }, + } else { + return fmt.Errorf("failed to get rfq") + } } -var listCmd = &cobra.Command{ - Use: "list", - Short: "list", - RunE: func(cmd *cobra.Command, args []string) error { - - err := config.LoadInstanceConfig(cfgPath) - if err != nil { - return err - } +func listFunc(con net.Conn, instanceCnf *config.Instance, args []string) error { + msg := message.NewListMessage(args[0]).Encode() + _, err := con.Write(msg) + if err != nil { + return err + } - instanceCnf := config.InstanceConfig() + ylogger.Zero.Debug().Bytes("msg", msg).Msg("constructed list message") - con, err := net.Dial("unix", instanceCnf.SocketPath) + ycl := client.NewYClient(con) + r := proc.NewProtoReader(ycl) + done := false + res := make([]*storage.ObjectInfo, 0) + for { + if done { + break + } + tp, body, err := r.ReadPacket() if err != nil { return err } - defer con.Close() - msg := message.NewListMessage(args[0]).Encode() - _, err = con.Write(msg) - if err != nil { - return err + switch tp { + case message.MessageTypeObjectMeta: + meta := message.ObjectInfoMessage{} + meta.Decode(body) + + res = append(res, meta.Content...) + break + case message.MessageTypeReadyForQuery: + done = true + break + default: + return fmt.Errorf("Incorrect message type: %s", tp.String()) } + } - ylogger.Zero.Debug().Bytes("msg", msg).Msg("constructed list message") + for _, meta := range res { + fmt.Printf("Object: {Name: \"%s\", size: %d}\n", meta.Path, meta.Size) + } + return nil +} - ycl := client.NewYClient(con) - r := proc.NewProtoReader(ycl) +var rootCmd = &cobra.Command{ + Use: "", + Short: "", +} - done := false - res := make([]*storage.S3ObjectMeta, 0) - for { - if done { - break - } - tp, body, err := r.ReadPacket() - if err != nil { - return err - } +var catCmd = &cobra.Command{ + Use: "cat", + Short: "cat", + Args: cobra.ExactArgs(1), + RunE: Runner(catFunc), +} - switch tp { - case message.MessageTypeObjectMeta: - meta := message.ObjectMetaMessage{} - meta.Decode(body) - - res = append(res, meta.Content...) - break - case message.MessageTypeReadyForQuery: - done = true - break - default: - return fmt.Errorf("Incorrect message type: %s", tp.String()) - } - } +var copyCmd = &cobra.Command{ + Use: "copy", + Short: "copy", + Args: cobra.ExactArgs(1), + RunE: Runner(copyFunc), +} - for _, meta := range res { - fmt.Printf("Object: {Name: \"%s\", size: %d}\n", meta.Path, meta.Size) - } +var putCmd = &cobra.Command{ + Use: "put", + Short: "put", + Args: cobra.ExactArgs(1), + RunE: Runner(putFunc), +} - return nil - }, +var listCmd = &cobra.Command{ + Use: "list", + Short: "list", + Args: cobra.ExactArgs(1), + RunE: Runner(listFunc), } func init() { diff --git a/pkg/message/object_meta_message.go b/pkg/message/object_meta_message.go index 62e31ab..6f8bfc4 100644 --- a/pkg/message/object_meta_message.go +++ b/pkg/message/object_meta_message.go @@ -7,19 +7,19 @@ import ( "github.com/yezzey-gp/yproxy/pkg/storage" ) -type ObjectMetaMessage struct { - Content []*storage.S3ObjectMeta +type ObjectInfoMessage struct { + Content []*storage.ObjectInfo } -var _ ProtoMessage = &ObjectMetaMessage{} +var _ ProtoMessage = &ObjectInfoMessage{} -func NewObjectMetaMessage(content []*storage.S3ObjectMeta) *ObjectMetaMessage { - return &ObjectMetaMessage{ +func NewObjectMetaMessage(content []*storage.ObjectInfo) *ObjectInfoMessage { + return &ObjectInfoMessage{ Content: content, } } -func (c *ObjectMetaMessage) Encode() []byte { +func (c *ObjectInfoMessage) Encode() []byte { bt := []byte{ byte(MessageTypeObjectMeta), 0, @@ -42,14 +42,14 @@ func (c *ObjectMetaMessage) Encode() []byte { return append(bs, bt...) } -func (c *ObjectMetaMessage) Decode(body []byte) { +func (c *ObjectInfoMessage) Decode(body []byte) { body = body[4:] - c.Content = make([]*storage.S3ObjectMeta, 0) + c.Content = make([]*storage.ObjectInfo, 0) for len(body) > 0 { name, index := c.GetString(body) size := int64(binary.BigEndian.Uint64(body[index : index+8])) - c.Content = append(c.Content, &storage.S3ObjectMeta{ + c.Content = append(c.Content, &storage.ObjectInfo{ Path: name, Size: size, }) @@ -57,7 +57,7 @@ func (c *ObjectMetaMessage) Decode(body []byte) { } } -func (c *ObjectMetaMessage) GetString(b []byte) (string, int) { +func (c *ObjectInfoMessage) GetString(b []byte) (string, int) { buff := bytes.NewBufferString("") i := 0 diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index d78fc0c..60e9d6d 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -221,7 +221,7 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl return nil } - var failed []*storage.S3ObjectMeta + var failed []*storage.ObjectInfo retryCount := 0 for len(objectMetas) > 0 && retryCount < 10 { retryCount++ @@ -294,7 +294,7 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl } objectMetas = failed fmt.Printf("failed files count: %d\n", len(objectMetas)) - failed = make([]*storage.S3ObjectMeta, 0) + failed = make([]*storage.ObjectInfo, 0) } if len(objectMetas) > 0 { diff --git a/pkg/storage/s3storage.go b/pkg/storage/s3storage.go new file mode 100644 index 0000000..bb22474 --- /dev/null +++ b/pkg/storage/s3storage.go @@ -0,0 +1,138 @@ +package storage + +import ( + "context" + "fmt" + "io" + "path" + + "github.com/yezzey-gp/aws-sdk-go/aws" + "github.com/yezzey-gp/aws-sdk-go/service/s3" + "github.com/yezzey-gp/aws-sdk-go/service/s3/s3manager" + "github.com/yezzey-gp/yproxy/config" + "github.com/yezzey-gp/yproxy/pkg/ylogger" +) + +type S3StorageInteractor struct { + StorageInteractor + + pool SessionPool + + cnf *config.Storage +} + +func (s *S3StorageInteractor) CatFileFromStorage(name string, offset int64) (io.ReadCloser, error) { + // XXX: fix this + sess, err := s.pool.GetSession(context.TODO()) + if err != nil { + ylogger.Zero.Err(err).Msg("failed to acquire s3 session") + return nil, err + } + + objectPath := path.Join(s.cnf.StoragePrefix, name) + input := &s3.GetObjectInput{ + Bucket: &s.cnf.StorageBucket, + Key: aws.String(objectPath), + Range: aws.String(fmt.Sprintf("bytes=%d-", offset)), + } + + ylogger.Zero.Debug().Str("key", objectPath).Int64("offset", offset).Str("bucket", + s.cnf.StorageBucket).Msg("requesting external storage") + + object, err := sess.GetObject(input) + return object.Body, err +} + +func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader) error { + sess, err := s.pool.GetSession(context.TODO()) + if err != nil { + ylogger.Zero.Err(err).Msg("failed to acquire s3 session") + return nil + } + + objectPath := path.Join(s.cnf.StoragePrefix, name) + + up := s3manager.NewUploaderWithClient(sess, func(uploader *s3manager.Uploader) { + uploader.PartSize = int64(1 << 24) + uploader.Concurrency = 1 + }) + + _, err = up.Upload( + &s3manager.UploadInput{ + Bucket: aws.String(s.cnf.StorageBucket), + Key: aws.String(objectPath), + Body: r, + StorageClass: aws.String("STANDARD"), + }, + ) + + return err +} + +func (s *S3StorageInteractor) PatchFile(name string, r io.ReadSeeker, startOffste int64) error { + sess, err := s.pool.GetSession(context.TODO()) + if err != nil { + ylogger.Zero.Err(err).Msg("failed to acquire s3 session") + return nil + } + + objectPath := path.Join(s.cnf.StoragePrefix, name) + + input := &s3.PatchObjectInput{ + Bucket: &s.cnf.StorageBucket, + Key: aws.String(objectPath), + Body: r, + ContentRange: aws.String(fmt.Sprintf("bytes %d-18446744073709551615", startOffste)), + } + + _, err = sess.PatchObject(input) + + ylogger.Zero.Debug().Str("key", objectPath).Str("bucket", + s.cnf.StorageBucket).Msg("modifying file in external storage") + + return err +} + +type ObjectInfo struct { + Path string + Size int64 +} + +func (s *S3StorageInteractor) ListPath(prefix string) ([]*ObjectInfo, error) { + sess, err := s.pool.GetSession(context.TODO()) + if err != nil { + ylogger.Zero.Err(err).Msg("failed to acquire s3 session") + return nil, err + } + + var continuationToken *string + prefix = path.Join(s.cnf.StoragePrefix, prefix) + metas := make([]*ObjectInfo, 0) + + for { + input := &s3.ListObjectsV2Input{ + Bucket: &s.cnf.StorageBucket, + Prefix: aws.String(prefix), + ContinuationToken: continuationToken, + } + + out, err := sess.ListObjectsV2(input) + if err != nil { + fmt.Printf("list error: %v\n", err) + } + + for _, obj := range out.Contents { + metas = append(metas, &ObjectInfo{ + Path: *obj.Key, + Size: *obj.Size, + }) + } + + if !*out.IsTruncated { + break + } + + continuationToken = out.NextContinuationToken + } + return metas, nil +} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 176620d..293a04b 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -1,21 +1,13 @@ package storage import ( - "context" - "fmt" "io" - "path" - "github.com/yezzey-gp/aws-sdk-go/aws" - "github.com/yezzey-gp/aws-sdk-go/service/s3" - "github.com/yezzey-gp/aws-sdk-go/service/s3/s3manager" "github.com/yezzey-gp/yproxy/config" - "github.com/yezzey-gp/yproxy/pkg/ylogger" ) type StorageReader interface { CatFileFromStorage(name string, offset int64) (io.ReadCloser, error) - ListPath(name string) ([]*S3ObjectMeta, error) } type StorageWriter interface { @@ -24,7 +16,7 @@ type StorageWriter interface { } type StorageLister interface { - ListPath(prefix string) ([]*S3ObjectMeta, error) + ListPath(prefix string) ([]*ObjectInfo, error) } type StorageInteractor interface { @@ -33,133 +25,9 @@ type StorageInteractor interface { StorageLister } -type S3StorageInteractor struct { - StorageInteractor - - pool SessionPool - - cnf *config.Storage -} - func NewStorage(cnf *config.Storage) StorageInteractor { return &S3StorageInteractor{ pool: NewSessionPool(cnf), cnf: cnf, } } - -func (s *S3StorageInteractor) CatFileFromStorage(name string, offset int64) (io.ReadCloser, error) { - // XXX: fix this - sess, err := s.pool.GetSession(context.TODO()) - if err != nil { - ylogger.Zero.Err(err).Msg("failed to acquire s3 session") - return nil, err - } - - objectPath := path.Join(s.cnf.StoragePrefix, name) - input := &s3.GetObjectInput{ - Bucket: &s.cnf.StorageBucket, - Key: aws.String(objectPath), - Range: aws.String(fmt.Sprintf("bytes=%d-", offset)), - } - - ylogger.Zero.Debug().Str("key", objectPath).Int64("offset", offset).Str("bucket", - s.cnf.StorageBucket).Msg("requesting external storage") - - object, err := sess.GetObject(input) - return object.Body, err -} - -func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader) error { - sess, err := s.pool.GetSession(context.TODO()) - if err != nil { - ylogger.Zero.Err(err).Msg("failed to acquire s3 session") - return nil - } - - objectPath := path.Join(s.cnf.StoragePrefix, name) - - up := s3manager.NewUploaderWithClient(sess, func(uploader *s3manager.Uploader) { - uploader.PartSize = int64(1 << 24) - uploader.Concurrency = 1 - }) - - _, err = up.Upload( - &s3manager.UploadInput{ - Bucket: aws.String(s.cnf.StorageBucket), - Key: aws.String(objectPath), - Body: r, - StorageClass: aws.String("STANDARD"), - }, - ) - - return err -} - -func (s *S3StorageInteractor) PatchFile(name string, r io.ReadSeeker, startOffste int64) error { - sess, err := s.pool.GetSession(context.TODO()) - if err != nil { - ylogger.Zero.Err(err).Msg("failed to acquire s3 session") - return nil - } - - objectPath := path.Join(s.cnf.StoragePrefix, name) - - input := &s3.PatchObjectInput{ - Bucket: &s.cnf.StorageBucket, - Key: aws.String(objectPath), - Body: r, - ContentRange: aws.String(fmt.Sprintf("bytes %d-18446744073709551615", startOffste)), - } - - _, err = sess.PatchObject(input) - - ylogger.Zero.Debug().Str("key", objectPath).Str("bucket", - s.cnf.StorageBucket).Msg("modifying file in external storage") - - return err -} - -type S3ObjectMeta struct { - Path string - Size int64 -} - -func (s *S3StorageInteractor) ListPath(prefix string) ([]*S3ObjectMeta, error) { - sess, err := s.pool.GetSession(context.TODO()) - if err != nil { - ylogger.Zero.Err(err).Msg("failed to acquire s3 session") - return nil, err - } - - var continuationToken *string - prefix = path.Join(s.cnf.StoragePrefix, prefix) - metas := make([]*S3ObjectMeta, 0) - - for { - input := &s3.ListObjectsV2Input{ - Bucket: &s.cnf.StorageBucket, - Prefix: aws.String(prefix), - ContinuationToken: continuationToken, - } - - out, err := sess.ListObjectsV2(input) - if err != nil { - fmt.Printf("list error: %v\n", err) - } - - for _, obj := range out.Contents { - metas = append(metas, &S3ObjectMeta{ - Path: *obj.Key, - Size: *obj.Size, - }) - } - - if !*out.IsTruncated { - break - } - - continuationToken = out.NextContinuationToken - } - return metas, nil -}