diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/creo/CreoS3Utils.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/creo/CreoS3Utils.scala index 560832192..4b1e07448 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/creo/CreoS3Utils.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/creo/CreoS3Utils.scala @@ -1,19 +1,29 @@ package org.openeo.geotrellis.creo +import geotrellis.store.s3.AmazonS3URI +import org.apache.commons.io.FileUtils import org.openeo.geotrelliss3.S3Utils +import org.slf4j.LoggerFactory import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider} import software.amazon.awssdk.awscore.retry.conditions.RetryOnErrorCodeCondition import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration import software.amazon.awssdk.core.retry.RetryPolicy import software.amazon.awssdk.core.retry.backoff.FullJitterBackoffStrategy import software.amazon.awssdk.core.retry.conditions.{OrRetryCondition, RetryCondition} +import software.amazon.awssdk.core.sync.RequestBody import software.amazon.awssdk.regions.Region +import software.amazon.awssdk.services.s3.model._ import software.amazon.awssdk.services.s3.{S3AsyncClient, S3Client, S3Configuration} import java.net.URI +import java.nio.file.{FileAlreadyExistsException, Files, Path} import java.time.Duration +import scala.collection.JavaConverters._ +import scala.collection.immutable.Iterable +import scala.util.control.Breaks.{break, breakable} object CreoS3Utils { + private val logger = LoggerFactory.getLogger(getClass) private val cloudFerroRegion: Region = Region.of("RegionOne") @@ -66,8 +76,175 @@ object CreoS3Utils { overrideConfig } - def deleteCreoSubFolder(bucket_name: String, subfolder: String) = { + //noinspection ScalaWeakerAccess + def deleteCreoSubFolder(bucket_name: String, subfolder: String): Unit = { val s3Client = getCreoS3Client() S3Utils.deleteSubFolder(s3Client, bucket_name, subfolder) } + + def isS3(path: String): Boolean = { + path.toLowerCase.startsWith("s3:/") + } + + private def toAmazonS3URI(path: String): AmazonS3URI = { + val correctS3Path = path.replaceFirst("(?i)s3:/(?!/)", "s3://") + new AmazonS3URI(correctS3Path) + } + + // In the following functions an asset path could be a local path or an S3 path. + + /** + * S3 does not have folders, so we interpret the path as a prefix. + */ + def assetDeleteFolders(paths: Iterable[String]): Unit = { + for (path <- paths) { + if (isS3(path)) { + val s3Uri = toAmazonS3URI(path) + deleteCreoSubFolder(s3Uri.getBucket, s3Uri.getKey) + } else { + val p = Path.of(path) + if (Files.exists(p)) { + if (Files.isDirectory(p)) { + FileUtils.deleteDirectory(p.toFile) + } else { + throw new IllegalArgumentException(f"Can only delete directory here: $path") + } + } + } + } + } + + def assetDelete(path: String): Unit = { + if (isS3(path)) { + val s3Uri = toAmazonS3URI(path) + val keys = Seq(path) + val deleteObjectsRequest = DeleteObjectsRequest.builder + .bucket(s3Uri.getBucket) + .delete(Delete.builder.objects(keys.map(key => ObjectIdentifier.builder.key(key).build).asJavaCollection).build) + .build + getCreoS3Client().deleteObjects(deleteObjectsRequest) + } else { + val p = Path.of(path) + if (Files.isDirectory(p)) { + throw new IllegalArgumentException(f"Cannot delete directory like this: $path") + } else { + Files.deleteIfExists(p) + } + } + } + + def asseetPathListDirectChildren(path: String): Set[String] = { + if (isS3(path)) { + val s3Uri = toAmazonS3URI(path) + val listObjectsRequest = ListObjectsRequest.builder + .bucket(s3Uri.getBucket) + .prefix(s3Uri.getKey) + .build + val listObjectsResponse = getCreoS3Client().listObjects(listObjectsRequest) + listObjectsResponse.contents.asScala.map(o => f"s3://${s3Uri.getBucket}/${o.key}").toSet + } else { + val list = Files.list(Path.of(path)) + List(list).map(_.toString).toSet + } + } + + def assetExists(path: String): Boolean = { + if (isS3(path)) { + try { + // https://stackoverflow.com/a/56038360/1448736 + val s3Uri = toAmazonS3URI(path) + val objectRequest = HeadObjectRequest.builder + .bucket(s3Uri.getBucket) + .key(s3Uri.getKey) + .build + getCreoS3Client().headObject(objectRequest) + true + } catch { + case _: NoSuchKeyException => false + } + } else { + Files.exists(Path.of(path)) + } + } + + def copyAsset(pathOrigin: String, pathDestination: String): Unit = { + if (isS3(pathOrigin) && isS3(pathDestination)) { + val s3UriOrigin = toAmazonS3URI(pathOrigin) + val s3UriDestination = toAmazonS3URI(pathDestination) + val copyRequest = CopyObjectRequest.builder + .sourceBucket(s3UriOrigin.getBucket) + .sourceKey(s3UriOrigin.getKey) + .destinationBucket(s3UriDestination.getBucket) + .destinationKey(s3UriDestination.getKey) + .build + getCreoS3Client().copyObject(copyRequest) + } else if (!isS3(pathOrigin) && !isS3(pathDestination)) { + Files.copy(Path.of(pathOrigin), Path.of(pathDestination)) + } else if (!isS3(pathOrigin) && isS3(pathDestination)) { + uploadToS3(Path.of(pathOrigin), pathDestination) + } else if (isS3(pathOrigin) && !isS3(pathDestination)) { + // TODO: Download + throw new IllegalArgumentException(f"S3->local not supported here yet ($pathOrigin, $pathDestination)") + } else { + throw new IllegalArgumentException(f"Should be impossible to get here ($pathOrigin, $pathDestination)") + } + } + + def moveAsset(pathOrigin: String, pathDestination: String): Unit = { + // This could be optimized using move when on file system. + copyAsset(pathOrigin, pathDestination) + assetDelete(pathOrigin) + } + + def waitTillPathAvailable(path: Path): Unit = { + var retry = 0 + val maxTries = 20 + while (!assetExists(path.toString)) { + if (retry < maxTries) { + retry += 1 + val seconds = 5 + logger.info(f"Waiting for path to be available. Try $retry/$maxTries (sleep:$seconds seconds): $path") + Thread.sleep(seconds * 1000) + } else { + logger.warn(f"Path is not available after $maxTries tries: $path") + // Throw error instead? + return + } + } + } + + def moveOverwriteWithRetries(oldPath: Path, newPath: Path): Unit = { + var try_count = 1 + breakable { + while (true) { + try { + if (assetExists(newPath.toString)) { + // It might be a partial result of a previous failing task. + logger.info(f"Will replace $newPath. (try $try_count)") + } + Files.deleteIfExists(newPath) + Files.move(oldPath, newPath) + break + } catch { + case e: FileAlreadyExistsException => + // Here if another executor wrote the file between the delete and the move statement. + try_count += 1 + if (try_count > 5) { + throw e + } + } + } + } + } + + def uploadToS3(localFile: Path, s3Path: String) = { + val s3Uri = toAmazonS3URI(s3Path) + val objectRequest = PutObjectRequest.builder + .bucket(s3Uri.getBucket) + .key(s3Uri.getKey) + .build + + getCreoS3Client().putObject(objectRequest, RequestBody.fromFile(localFile)) + s3Path + } } diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala index 75a52560d..09c71f563 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala @@ -27,20 +27,16 @@ import org.openeo.geotrellis.netcdf.NetCDFRDDWriter.fixedTimeOffset import org.openeo.geotrellis.stac.STACItem import org.openeo.geotrellis.tile_grid.TileGrid import org.slf4j.LoggerFactory -import software.amazon.awssdk.core.sync.RequestBody -import software.amazon.awssdk.services.s3.model.PutObjectRequest import spire.math.Integral import spire.syntax.cfor.cfor import java.io.IOException -import java.nio.channels.FileChannel -import java.nio.file.{FileAlreadyExistsException, Files, NoSuchFileException, Path, Paths} +import java.nio.file.{Files, Path, Paths} import java.time.Duration import java.time.format.DateTimeFormatter import java.util.{ArrayList, Collections, Map, List => JList} import scala.collection.JavaConverters._ import scala.reflect._ -import scala.util.control.Breaks.{break, breakable} package object geotiff { @@ -118,7 +114,9 @@ package object geotiff { val rand = new java.security.SecureRandom().nextLong() val uniqueFolderName = executorAttemptDirectoryPrefix + java.lang.Long.toUnsignedString(rand) val executorAttemptDirectory = Paths.get(parentDirectory + "/" + uniqueFolderName) - Files.createDirectories(executorAttemptDirectory) + if (!CreoS3Utils.isS3(parentDirectory.toString)) { + Files.createDirectories(executorAttemptDirectory) + } executorAttemptDirectory } @@ -128,18 +126,17 @@ package object geotiff { if (!relativePath.startsWith(executorAttemptDirectoryPrefix)) throw new Exception() // Remove the executorAttemptDirectory part from the path: val destinationPath = parentDirectory.resolve(relativePath.substring(relativePath.indexOf("/") + 1)) - waitTillPathAvailable(Path.of(absolutePath)) - Files.createDirectories(destinationPath.getParent) - Files.move(Path.of(absolutePath), destinationPath) + CreoS3Utils.waitTillPathAvailable(Path.of(absolutePath)) + if (!CreoS3Utils.isS3(parentDirectory.toString)) { + Files.createDirectories(destinationPath.getParent) + } + CreoS3Utils.moveAsset(absolutePath, destinationPath.toString) // TODO: Use move instead of copy destinationPath } - private def cleanUpExecutorAttemptDirectory(parentDirectory: Path): Unit = { - Files.list(parentDirectory).forEach { p => - if (Files.isDirectory(p) && p.getFileName.toString.startsWith(executorAttemptDirectoryPrefix)) { - FileUtils.deleteDirectory(p.toFile) - } - } + private def cleanUpExecutorAttemptDirectory(parentDirectory: String): Unit = { + val list = CreoS3Utils.asseetPathListDirectChildren(parentDirectory).filter(_.contains(executorAttemptDirectoryPrefix)) + CreoS3Utils.assetDeleteFolders(list) } /** @@ -238,7 +235,7 @@ package object geotiff { (destinationPath.toString, timestamp, croppedExtent, bandIndices) }.toList.asJava - cleanUpExecutorAttemptDirectory(Path.of(path)) + cleanUpExecutorAttemptDirectory(path) res } @@ -324,7 +321,7 @@ package object geotiff { if (path.endsWith("out")) { val beforeOut = path.substring(0, path.length - "out".length) - cleanUpExecutorAttemptDirectory(Path.of(beforeOut)) + cleanUpExecutorAttemptDirectory(beforeOut) } res @@ -778,7 +775,7 @@ package object geotiff { (destinationPath.toString, croppedExtent) }.toList.asJava - cleanUpExecutorAttemptDirectory(Path.of(path).getParent) + cleanUpExecutorAttemptDirectory(Path.of(path).getParent.toString) res } @@ -921,65 +918,14 @@ package object geotiff { if (path.startsWith("s3:/")) { val correctS3Path = path.replaceFirst("s3:/(?!/)", "s3://") - uploadToS3(tempFile, correctS3Path) + CreoS3Utils.uploadToS3(tempFile, correctS3Path) } else { // Retry should not be needed at this point, but it is almost free to keep it. - moveOverwriteWithRetries(tempFile, Path.of(path)) + CreoS3Utils.moveOverwriteWithRetries(tempFile, Path.of(path)) path } } - private def waitTillPathAvailable(path: Path): Unit = { - var retry = 0 - val maxTries = 20 - while (!path.toFile.exists()) { - if (retry < maxTries) { - retry += 1 - val seconds = 5 - logger.info(f"Waiting for path to be available. Try $retry/$maxTries (sleep:$seconds seconds): $path") - Thread.sleep(seconds * 1000) - } else { - logger.warn(f"Path is not available after $maxTries tries: $path") - // Throw error instead? - return - } - } - } - - def moveOverwriteWithRetries(oldPath: Path, newPath: Path): Unit = { - var try_count = 1 - breakable { - while (true) { - try { - if (newPath.toFile.exists()) { - // It might be a partial result of a previous failing task. - logger.info(f"Will replace $newPath. (try $try_count)") - } - Files.deleteIfExists(newPath) - Files.move(oldPath, newPath) - break - } catch { - case e: FileAlreadyExistsException => - // Here if another executor wrote the file between the delete and the move statement. - try_count += 1 - if (try_count > 5) { - throw e - } - } - } - } - } - - def uploadToS3(localFile: Path, s3Path: String) = { - val s3Uri = new AmazonS3URI(s3Path) - val objectRequest = PutObjectRequest.builder - .bucket(s3Uri.getBucket) - .key(s3Uri.getKey) - .build - - CreoS3Utils.getCreoS3Client().putObject(objectRequest, RequestBody.fromFile(localFile)) - s3Path - } case class ContextSeq[K, V, M](tiles: Iterable[(K, V)], metadata: LayoutDefinition) extends Seq[(K, V)] with Metadata[LayoutDefinition] { override def length: Int = tiles.size diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/png/package.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/png/package.scala index 1da4f2b4b..acd5fcfec 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/png/package.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/png/package.scala @@ -6,7 +6,8 @@ import geotrellis.raster.render.RGBA import geotrellis.raster.{MultibandTile, UByteCellType} import geotrellis.spark._ import geotrellis.vector.{Extent, ProjectedExtent} -import org.openeo.geotrellis.geotiff.{SRDD, uploadToS3} +import org.openeo.geotrellis.creo.CreoS3Utils.uploadToS3 +import org.openeo.geotrellis.geotiff.SRDD import java.io.File import java.nio.file.{Files, Paths} diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/stac/STACItem.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/stac/STACItem.scala index 9abfeaf60..5b39bcfa7 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/stac/STACItem.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/stac/STACItem.scala @@ -1,6 +1,6 @@ package org.openeo.geotrellis.stac -import org.openeo.geotrellis.geotiff.uploadToS3 +import org.openeo.geotrellis.creo.CreoS3Utils.uploadToS3 import org.openeo.geotrellis.getTempFile import org.slf4j.LoggerFactory