Skip to content

Commit

Permalink
I put the code of deletion and copying into separate functions
Browse files Browse the repository at this point in the history
  • Loading branch information
visill committed Oct 7, 2024
1 parent e2e23c0 commit f001a10
Showing 1 changed file with 177 additions and 166 deletions.
343 changes: 177 additions & 166 deletions pkg/proc/interaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,181 @@ func ProcessPutExtended(

return nil
}
func procCopy(msg message.CopyMessage, s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyClient) error {

ycl.SetExternalFilePath(msg.Name)

//get config for old bucket
instanceCnf, err := config.ReadInstanceConfig(msg.OldCfgPath)
if err != nil {
_ = ycl.ReplyError(fmt.Errorf("could not read old config: %s", err), "failed to compelete request")
return nil
}
config.EmbedDefaults(&instanceCnf)
oldStorage, err := storage.NewStorage(&instanceCnf.StorageCnf)
if err != nil {
return err
}
ylogger.Zero.Info().Interface("cnf", instanceCnf).Msg("loaded new config")

//list objects
objectMetas, err := oldStorage.ListPath(msg.Name)
if err != nil {
_ = ycl.ReplyError(fmt.Errorf("could not list objects: %s", err), "failed to compelete request")
return nil
}

var failed []*object.ObjectInfo
retryCount := 0
for len(objectMetas) > 0 && retryCount < 10 {
retryCount++
for i := 0; i < len(objectMetas); i++ {
path := strings.TrimPrefix(objectMetas[i].Path, instanceCnf.StorageCnf.StoragePrefix)
//get reader
readerFromOldBucket := NewYRetryReader(NewRestartReader(oldStorage, path, nil))
var fromReader io.Reader
fromReader = readerFromOldBucket
defer readerFromOldBucket.Close()

if msg.Decrypt {
oldCr, err := crypt.NewCrypto(&instanceCnf.CryptoCnf)
if err != nil {
ylogger.Zero.Error().Err(err).Msg("failed to configure decrypter")
failed = append(failed, objectMetas[i])
continue
}
fromReader, err = oldCr.Decrypt(readerFromOldBucket)
if err != nil {
ylogger.Zero.Error().Err(err).Msg("failed to decrypt object")
failed = append(failed, objectMetas[i])
continue
}
}

//reencrypt
readerEncrypt, writerEncrypt := io.Pipe()

go func() {
defer func() {
if err := writerEncrypt.Close(); err != nil {
ylogger.Zero.Warn().Err(err).Msg("failed to close writer")
}
}()

var writerToNewBucket io.WriteCloser = writerEncrypt

if msg.Encrypt {
var err error
writerToNewBucket, err = cr.Encrypt(writerEncrypt)
if err != nil {
ylogger.Zero.Error().Err(err).Msg("failed to encrypt object")
failed = append(failed, objectMetas[i])
return
}
}

if _, err := io.Copy(writerToNewBucket, fromReader); err != nil {
ylogger.Zero.Error().Str("path", path).Err(err).Msg("failed to copy data")
failed = append(failed, objectMetas[i])
return
}

if err := writerToNewBucket.Close(); err != nil {
ylogger.Zero.Error().Str("path", path).Err(err).Msg("failed to close writer")
failed = append(failed, objectMetas[i])
return
}
}()

//write file
err = s.PutFileToDest(path, readerEncrypt, nil)
if err != nil {
ylogger.Zero.Error().Err(err).Msg("failed to upload file")
failed = append(failed, objectMetas[i])
continue
}
}
objectMetas = failed
fmt.Printf("failed files count: %d\n", len(objectMetas))
failed = make([]*object.ObjectInfo, 0)
}

if len(objectMetas) > 0 {
fmt.Printf("failed files count: %d\n", len(objectMetas))
fmt.Printf("failed files: %v\n", objectMetas)
ylogger.Zero.Error().Int("failed files count", len(objectMetas)).Msg("failed to upload some files")
ylogger.Zero.Error().Any("failed files", objectMetas).Msg("failed to upload some files")

// _ = ycl.ReplyError(err, "failed to copy some files")
// return nil
}

if _, err = ycl.GetRW().Write(message.NewReadyForQueryMessage().Encode()); err != nil {
_ = ycl.ReplyError(err, "failed to upload")
return err
}
fmt.Println("Copy finished successfully")
ylogger.Zero.Info().Msg("Copy finished successfully")
return nil
}
func procDelete(msg message.DeleteMessage, s storage.StorageInteractor, ycl client.YproxyClient, cnf *config.Vacuum) error {
ycl.SetExternalFilePath(msg.Name)

dbInterractor := &database.DatabaseHandler{}
backupHandler := &backups.WalgBackupInterractor{}

var dh = &BasicDeleteHandler{
StorageInterractor: s,
DbInterractor: dbInterractor,
BackupInterractor: backupHandler,
Cnf: cnf,
}

if msg.Garbage {
ylogger.Zero.Debug().
Str("Name", msg.Name).
Uint64("port", msg.Port).
Uint64("segment", msg.Segnum).
Bool("confirm", msg.Confirm).Msg("requested to perform external storage VACUUM")
} else {
ylogger.Zero.Debug().
Str("Name", msg.Name).
Uint64("port", msg.Port).
Uint64("segment", msg.Segnum).
Bool("confirm", msg.Confirm).Msg("requested to remove external chunk")
}

if msg.Garbage {
err := dh.HandleDeleteGarbage(msg)
if err != nil {
_ = ycl.ReplyError(err, "failed to finish operation")
return err
}
} else {
err := dh.HandleDeleteFile(msg)
if err != nil {
_ = ycl.ReplyError(err, "failed to finish operation")
return err
}
}

if _, err := ycl.GetRW().Write(message.NewReadyForQueryMessage().Encode()); err != nil {
_ = ycl.ReplyError(err, "failed to upload")
return err
}

if msg.Garbage {
if !msg.Confirm {
ylogger.Zero.Warn().Msg("It was a dry-run, nothing was deleted")
} else {
ylogger.Zero.Info().Msg("Deleted garbage successfully")
}
} else {
ylogger.Zero.Info().Msg("Deleted chunk successfully")
}

return nil
}
func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyClient, cnf *config.Vacuum) error {

defer func() {
Expand Down Expand Up @@ -267,181 +441,18 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl
msg := message.CopyMessage{}
msg.Decode(body)

ycl.SetExternalFilePath(msg.Name)

//get config for old bucket
instanceCnf, err := config.ReadInstanceConfig(msg.OldCfgPath)
if err != nil {
_ = ycl.ReplyError(fmt.Errorf("could not read old config: %s", err), "failed to compelete request")
return nil
}
config.EmbedDefaults(&instanceCnf)
oldStorage, err := storage.NewStorage(&instanceCnf.StorageCnf)
err := procCopy(msg, s, cr, ycl)
if err != nil {
return err
}
ylogger.Zero.Info().Interface("cnf", instanceCnf).Msg("loaded new config")

//list objects
objectMetas, err := oldStorage.ListPath(msg.Name)
if err != nil {
_ = ycl.ReplyError(fmt.Errorf("could not list objects: %s", err), "failed to compelete request")
return nil
}

var failed []*object.ObjectInfo
retryCount := 0
for len(objectMetas) > 0 && retryCount < 10 {
retryCount++
for i := 0; i < len(objectMetas); i++ {
path := strings.TrimPrefix(objectMetas[i].Path, instanceCnf.StorageCnf.StoragePrefix)
//get reader
readerFromOldBucket := NewYRetryReader(NewRestartReader(oldStorage, path, nil))
var fromReader io.Reader
fromReader = readerFromOldBucket
defer readerFromOldBucket.Close()

if msg.Decrypt {
oldCr, err := crypt.NewCrypto(&instanceCnf.CryptoCnf)
if err != nil {
ylogger.Zero.Error().Err(err).Msg("failed to configure decrypter")
failed = append(failed, objectMetas[i])
continue
}
fromReader, err = oldCr.Decrypt(readerFromOldBucket)
if err != nil {
ylogger.Zero.Error().Err(err).Msg("failed to decrypt object")
failed = append(failed, objectMetas[i])
continue
}
}

//reencrypt
readerEncrypt, writerEncrypt := io.Pipe()

go func() {
defer func() {
if err := writerEncrypt.Close(); err != nil {
ylogger.Zero.Warn().Err(err).Msg("failed to close writer")
}
}()

var writerToNewBucket io.WriteCloser = writerEncrypt

if msg.Encrypt {
var err error
writerToNewBucket, err = cr.Encrypt(writerEncrypt)
if err != nil {
ylogger.Zero.Error().Err(err).Msg("failed to encrypt object")
failed = append(failed, objectMetas[i])
return
}
}

if _, err := io.Copy(writerToNewBucket, fromReader); err != nil {
ylogger.Zero.Error().Str("path", path).Err(err).Msg("failed to copy data")
failed = append(failed, objectMetas[i])
return
}

if err := writerToNewBucket.Close(); err != nil {
ylogger.Zero.Error().Str("path", path).Err(err).Msg("failed to close writer")
failed = append(failed, objectMetas[i])
return
}
}()

//write file
err = s.PutFileToDest(path, readerEncrypt, nil)
if err != nil {
ylogger.Zero.Error().Err(err).Msg("failed to upload file")
failed = append(failed, objectMetas[i])
continue
}
}
objectMetas = failed
fmt.Printf("failed files count: %d\n", len(objectMetas))
failed = make([]*object.ObjectInfo, 0)
}

if len(objectMetas) > 0 {
fmt.Printf("failed files count: %d\n", len(objectMetas))
fmt.Printf("failed files: %v\n", objectMetas)
ylogger.Zero.Error().Int("failed files count", len(objectMetas)).Msg("failed to upload some files")
ylogger.Zero.Error().Any("failed files", objectMetas).Msg("failed to upload some files")

// _ = ycl.ReplyError(err, "failed to copy some files")
// return nil
}

if _, err = ycl.GetRW().Write(message.NewReadyForQueryMessage().Encode()); err != nil {
_ = ycl.ReplyError(err, "failed to upload")
return err
}
fmt.Println("Copy finished successfully")
ylogger.Zero.Info().Msg("Copy finished successfully")

case message.MessageTypeDelete:
//recieve message
msg := message.DeleteMessage{}
msg.Decode(body)

ycl.SetExternalFilePath(msg.Name)

dbInterractor := &database.DatabaseHandler{}
backupHandler := &backups.WalgBackupInterractor{}

var dh DeleteHandler
dh = &BasicDeleteHandler{
StorageInterractor: s,
DbInterractor: dbInterractor,
BackupInterractor: backupHandler,
Cnf: cnf,
}

if msg.Garbage {
ylogger.Zero.Debug().
Str("Name", msg.Name).
Uint64("port", msg.Port).
Uint64("segment", msg.Segnum).
Bool("confirm", msg.Confirm).Msg("requested to perform external storage VACUUM")
} else {
ylogger.Zero.Debug().
Str("Name", msg.Name).
Uint64("port", msg.Port).
Uint64("segment", msg.Segnum).
Bool("confirm", msg.Confirm).Msg("requested to remove external chunk")
}

if msg.Garbage {
err = dh.HandleDeleteGarbage(msg)
if err != nil {
_ = ycl.ReplyError(err, "failed to finish operation")
return err
}
} else {
err = dh.HandleDeleteFile(msg)
if err != nil {
_ = ycl.ReplyError(err, "failed to finish operation")
return err
}
}

if _, err = ycl.GetRW().Write(message.NewReadyForQueryMessage().Encode()); err != nil {
_ = ycl.ReplyError(err, "failed to upload")
err := procDelete(msg, s, ycl, cnf)
if err != nil {
return err
}

if msg.Garbage {
if !msg.Confirm {
ylogger.Zero.Warn().Msg("It was a dry-run, nothing was deleted")
} else {
ylogger.Zero.Info().Msg("Deleted garbage successfully")
}
} else {
ylogger.Zero.Info().Msg("Deleted chunk successfully")
}

case message.MessageTypeGool:
return ProcMotion(s, cr, ycl)

Expand Down

0 comments on commit f001a10

Please sign in to comment.