diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index fe538f9..adb6968 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -26,13 +26,24 @@ func ProcessPutExtended( ycl.SetExternalFilePath(name) var w io.WriteCloser - r, w := io.Pipe() + err := s.PutFileToDest(name, r, settings) + + if err != nil { + _ = ycl.ReplyError(err, "failed to upload") + + return err + } + + defer r.Close() + defer w.Close() + wg := sync.WaitGroup{} wg.Add(1) go func() { + defer wg.Done() var ww io.WriteCloser = w if encrypt { @@ -54,13 +65,6 @@ func ProcessPutExtended( ylogger.Zero.Debug().Str("path", name).Msg("omit encryption for upload chunks") } - for _, set := range settings { - ylogger.Zero.Debug().Str("setting name", set.Name).Str("value", set.Value).Msg("setting for chunk") - } - - defer w.Close() - defer wg.Done() - for { tp, body, err := pr.ReadPacket() if err != nil { @@ -99,16 +103,8 @@ func ProcessPutExtended( } }() - err := s.PutFileToDest(name, r, settings) - wg.Wait() - if err != nil { - _ = ycl.ReplyError(err, "failed to upload") - - return err - } - _, err = ycl.GetRW().Write(message.NewReadyForQueryMessage().Encode()) if err != nil { @@ -224,7 +220,7 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl if err != nil { _ = ycl.ReplyError(err, "failed to upload") - return nil + return err } case message.MessageTypeCopy: @@ -340,7 +336,7 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl if _, err = ycl.GetRW().Write(message.NewReadyForQueryMessage().Encode()); err != nil { _ = ycl.ReplyError(err, "failed to upload") - return nil + return err } fmt.Println("Copy finished successfully") ylogger.Zero.Info().Msg("Copy finished successfully") @@ -366,19 +362,19 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl err = dh.HandleDeleteGarbage(msg) if err != nil { _ = ycl.ReplyError(err, "failed to finish operation") - return nil + return err } } else { err = dh.HandleDeleteFile(msg) if err != nil { _ = ycl.ReplyError(err, "failed to finish operation") - return nil + return err } } if _, err = ycl.GetRW().Write(message.NewReadyForQueryMessage().Encode()); err != nil { _ = ycl.ReplyError(err, "failed to upload") - return nil + return err } ylogger.Zero.Info().Msg("Deleted garbage successfully") if !msg.Confirm { diff --git a/pkg/storage/s3storage.go b/pkg/storage/s3storage.go index a644201..38d3196 100644 --- a/pkg/storage/s3storage.go +++ b/pkg/storage/s3storage.go @@ -68,7 +68,8 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings [ bucket, ok := s.bucketMap[tableSpace] if !ok { - ylogger.Zero.Err(err).Msg(fmt.Sprintf("failed to match tablespace %s to s3 bucket.", tableSpace)) + err := fmt.Errorf("failed to match tablespace %s to s3 bucket.", tableSpace) + ylogger.Zero.Err(err) return err }