From e4c268227b261d1208706404e9d3f6e7334300be Mon Sep 17 00:00:00 2001 From: Victor Date: Mon, 5 Aug 2024 07:28:48 +0000 Subject: [PATCH 1/2] Add filestorage, update interface, fix typo error --- config/storage.go | 3 ++ pkg/storage/filestorage.go | 71 ++++++++++++++++++++++++++++++++++++++ pkg/storage/s3storage.go | 4 +-- pkg/storage/storage.go | 20 ++++++++--- 4 files changed, 92 insertions(+), 6 deletions(-) create mode 100644 pkg/storage/filestorage.go diff --git a/config/storage.go b/config/storage.go index c46a5ff..0b54712 100644 --- a/config/storage.go +++ b/config/storage.go @@ -16,4 +16,7 @@ type Storage struct { StorageConcurrency int64 `json:"storage_concurrency" toml:"storage_concurrency" yaml:"storage_concurrency"` StorageRegion string `json:"storage_region" toml:"storage_region" yaml:"storage_region"` + + // File storage default s3. Available: s3, fs + StorageType string `json:"storage_type" toml:"storage_type" yaml:"storage_type"` } diff --git a/pkg/storage/filestorage.go b/pkg/storage/filestorage.go new file mode 100644 index 0000000..bcc6826 --- /dev/null +++ b/pkg/storage/filestorage.go @@ -0,0 +1,71 @@ +package storage + +import ( + "fmt" + "io" + "io/fs" + "os" + "path/filepath" + + "github.com/yezzey-gp/yproxy/config" +) + +// Storage prefix uses as path to folder. +// "/path/to/folder/" + "path/to/file.txt" +type FileStorageInteractor struct { + StorageInteractor + cnf *config.Storage +} + +func (s *FileStorageInteractor) CatFileFromStorage(name string, offset int64) (io.ReadCloser, error) { + file, err := os.Open(s.cnf.StoragePrefix + name) + if err != nil { + return nil, err + } + _, err = io.CopyN(io.Discard, file, offset) + return file, err +} +func (s *FileStorageInteractor) ListPath(prefix string) ([]*ObjectInfo, error) { + var data []*ObjectInfo + err := filepath.WalkDir(s.cnf.StoragePrefix+prefix, func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + if d.IsDir() { + return nil + } + file, err := os.Open(path) + if err != nil { + return err + } + fileinfo, err := file.Stat() + if err != nil { + return err + } + data = append(data, &ObjectInfo{fileinfo.Name(), fileinfo.Size()}) + return nil + }) + return data, err +} + +func (s *FileStorageInteractor) PutFileToDest(name string, r io.Reader) error { + file, err := os.Create(s.cnf.StoragePrefix + name) + if err != nil { + return err + } + _, err = io.Copy(file, r) + return err +} + +func (s *FileStorageInteractor) PatchFile(name string, r io.ReadSeeker, startOffset int64) error { + //UNUSED TODO + return fmt.Errorf("TODO") +} + +func (s *FileStorageInteractor) MoveObject(from string, to string) error { + return os.Rename(s.cnf.StoragePrefix+from, s.cnf.StoragePrefix+to) +} + +func (s *FileStorageInteractor) DeleteObject(key string) error { + return os.Remove(s.cnf.StoragePrefix + key) +} diff --git a/pkg/storage/s3storage.go b/pkg/storage/s3storage.go index bb22474..f190718 100644 --- a/pkg/storage/s3storage.go +++ b/pkg/storage/s3storage.go @@ -69,7 +69,7 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader) error { return err } -func (s *S3StorageInteractor) PatchFile(name string, r io.ReadSeeker, startOffste int64) error { +func (s *S3StorageInteractor) PatchFile(name string, r io.ReadSeeker, startOffset int64) error { sess, err := s.pool.GetSession(context.TODO()) if err != nil { ylogger.Zero.Err(err).Msg("failed to acquire s3 session") @@ -82,7 +82,7 @@ func (s *S3StorageInteractor) PatchFile(name string, r io.ReadSeeker, startOffst Bucket: &s.cnf.StorageBucket, Key: aws.String(objectPath), Body: r, - ContentRange: aws.String(fmt.Sprintf("bytes %d-18446744073709551615", startOffste)), + ContentRange: aws.String(fmt.Sprintf("bytes %d-18446744073709551615", startOffset)), } _, err = sess.PatchObject(input) diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 293a04b..b4c78ff 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -12,22 +12,34 @@ type StorageReader interface { type StorageWriter interface { PutFileToDest(name string, r io.Reader) error - PatchFile(name string, r io.ReadSeeker, startOffste int64) error + PatchFile(name string, r io.ReadSeeker, startOffset int64) error } type StorageLister interface { ListPath(prefix string) ([]*ObjectInfo, error) } +type StorageMover interface { + MoveObject(from string, to string) error + DeleteObject(key string) error +} type StorageInteractor interface { StorageReader StorageWriter StorageLister + StorageMover } func NewStorage(cnf *config.Storage) StorageInteractor { - return &S3StorageInteractor{ - pool: NewSessionPool(cnf), - cnf: cnf, + switch cnf.StorageType { + case "fs": + return &FileStorageInteractor{ + cnf: cnf, + } + default: + return &S3StorageInteractor{ + pool: NewSessionPool(cnf), + cnf: cnf, + } } } From e299a488860770d4c39c669b9c4c5d0a2e081978 Mon Sep 17 00:00:00 2001 From: Victor Date: Mon, 5 Aug 2024 10:34:30 +0000 Subject: [PATCH 2/2] 1 --- config/instance.go | 3 +++ config/proxy.go | 2 +- pkg/core/core.go | 6 ++++-- pkg/proc/interaction.go | 5 ++++- pkg/storage/storage.go | 11 +++++++---- 5 files changed, 19 insertions(+), 8 deletions(-) diff --git a/config/instance.go b/config/instance.go index fbdc883..f7083ab 100644 --- a/config/instance.go +++ b/config/instance.go @@ -64,6 +64,9 @@ const ( ) func EmbedDefaults(cfgInstance *Instance) { + if cfgInstance.StorageCnf.StorageType == "" { + cfgInstance.StorageCnf.StorageType = "s3" + } if cfgInstance.StorageCnf.StorageConcurrency == 0 { cfgInstance.StorageCnf.StorageConcurrency = DefaultStorageConcurrency } diff --git a/config/proxy.go b/config/proxy.go index e99d811..c78853a 100644 --- a/config/proxy.go +++ b/config/proxy.go @@ -1,5 +1,5 @@ package config type Proxy struct { - ConsolePort string `json:"console_port" toml:"console+port" yaml:"console_port"` + ConsolePort string `json:"console_port" toml:"console_port" yaml:"console_port"` } diff --git a/pkg/core/core.go b/pkg/core/core.go index cea946f..f629612 100644 --- a/pkg/core/core.go +++ b/pkg/core/core.go @@ -116,10 +116,12 @@ func (i *Instance) Run(instanceCnf *config.Instance) error { return err } - s := storage.NewStorage( + s, err := storage.NewStorage( &instanceCnf.StorageCnf, ) - + if err != nil { + return err + } var cr crypt.Crypter = nil if instanceCnf.CryptoCnf.GPGKeyPath != "" { cr, err = crypt.NewCrypto(&instanceCnf.CryptoCnf) diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index 60e9d6d..fbc4380 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -211,7 +211,10 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl return nil } config.EmbedDefaults(&instanceCnf) - oldStorage := storage.NewStorage(&instanceCnf.StorageCnf) + oldStorage, err := storage.NewStorage(&instanceCnf.StorageCnf) + if err != nil { + return err + } fmt.Printf("ok new conf: %v\n", instanceCnf) //list objects diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index b4c78ff..f12ea6f 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -1,6 +1,7 @@ package storage import ( + "fmt" "io" "github.com/yezzey-gp/yproxy/config" @@ -30,16 +31,18 @@ type StorageInteractor interface { StorageMover } -func NewStorage(cnf *config.Storage) StorageInteractor { +func NewStorage(cnf *config.Storage) (StorageInteractor, error) { switch cnf.StorageType { case "fs": return &FileStorageInteractor{ cnf: cnf, - } - default: + }, nil + case "s3": return &S3StorageInteractor{ pool: NewSessionPool(cnf), cnf: cnf, - } + }, nil + default: + return nil, fmt.Errorf("wrong storage type " + cnf.StorageType) } }