diff --git a/src/main/scala/com/nicta/scoobi/impl/io/Files.scala b/src/main/scala/com/nicta/scoobi/impl/io/Files.scala index 36fa3a263..f9c36ab03 100644 --- a/src/main/scala/com/nicta/scoobi/impl/io/Files.scala +++ b/src/main/scala/com/nicta/scoobi/impl/io/Files.scala @@ -45,22 +45,22 @@ trait Files { moveTo(new Path(dirPath(dir))).apply(path, new Path(path.getName)) /** @return a function moving a Path to a given directory */ - def moveTo(dir: Path)(implicit configuration: Configuration): (Path, Path) => Boolean = (path: Path, newPath: Path) => { + def moveTo(toPath: Path)(implicit configuration: Configuration): (Path, Path) => Boolean = (path: Path, newPath: Path) => { !pathExists(path) || { - val from = fileSystem(path) - val to = fileSystem(dir) + val fromFS = fileSystem(path) + val toFS = fileSystem(toPath) - val destPath = to.makeQualified(new Path(dirPath(dir.toString) + newPath)) - if (!pathExists(destPath.getParent)) to.mkdirs(destPath.getParent) + val destPath = toFS.makeQualified(new Path(dirPath(toPath.toString) + newPath)) + if (!pathExists(destPath.getParent)) toFS.mkdirs(destPath.getParent) - if (List("s3n", "s3").contains(to.getScheme.toLowerCase)) + if (List("s3n", "s3").contains(toFS.getScheme.toLowerCase)) // s3 has special cases (can't rename, can't copy/rename dir simultaneously, ...) - moveToS3(from, to, path, destPath) + moveToS3(fromFS, toFS, path, destPath) - else if (sameFileSystem(from, to)) + else if (sameFileSystem(fromFS, toFS)) (path == destPath) || // same files - (from.isDirectory(path) && - to.isDirectory(destPath) && + (fromFS.isDirectory(path) && + toFS.isDirectory(destPath) && path.toUri.getPath.startsWith(destPath.toUri.getPath)) || // nested directories { logger.debug(s"renaming $path to $destPath") @@ -70,7 +70,7 @@ trait Files { } else { logger.debug(s"copying $path to $destPath") - FileUtil.copy(from, path, to, destPath, + FileUtil.copy(fromFS, path, toFS, destPath, true /* deleteSource */, true /* overwrite */, configuration) } } @@ -80,22 +80,21 @@ trait Files { * @return true if copy to S3 is successful. S3 is a special cases * because it can't rename, can't copy/rename dir simultaneously, ... */ - def moveToS3(from: FileSystem, to: FileSystem, - path: Path, destPath: Path)(implicit configuration: Configuration) = { - if (from.getFileStatus(path).isDirectory) { + def moveToS3(fromFS: FileSystem, toFS: FileSystem, + fromPath: Path, toPath: Path)(implicit configuration: Configuration) = { + if (fromFS.getFileStatus(fromPath).isDirectory) { // copying from a dir/ to s3 requires copying individual dir/* files - val sourceFiles = FileSystem.get (path.toUri, configuration).listStatus(path) - .toSeq.map (_.getPath).toList - sourceFiles.forall { sourceFile => - // TODO: do parallel S3 copy to speed up process - logger.debug(s"individually copying $sourceFile to $destPath (S3)") - FileUtil.copy(from, sourceFile, to, destPath, + val sourceFiles = FileSystem.get(fromPath.toUri, configuration).listStatus(fromPath) + .toSeq.map(_.getPath).toList + sourceFiles.par.forall { _fromPath => + logger.debug(s"Parallel copying ${_fromPath} to $toPath (S3)") + FileUtil.copy(fromFS, _fromPath, toFS, toPath, true /* deleteSource */, true /* overwrite */, configuration) } } else { - // copying from one dir to S3 - logger.debug(s"copying $path to $destPath (S3)") - FileUtil.copy(from, path, to, destPath, + // move one file into S3 + logger.debug(s"Copying $fromPath to $toPath (S3)") + FileUtil.copy(fromFS, fromPath, toFS, toPath, true /* deleteSource */, true /* overwrite */, configuration) } }