Skip to content

Commit

Permalink
Support using S3 API directly for tiff output. #329
Browse files Browse the repository at this point in the history
  • Loading branch information
EmileSonneveld committed Nov 4, 2024
1 parent 41ab3dd commit f199970
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 74 deletions.
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
package org.openeo.geotrellis.creo

import geotrellis.store.s3.AmazonS3URI
import org.apache.commons.io.FileUtils
import org.openeo.geotrelliss3.S3Utils
import org.slf4j.LoggerFactory
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.awscore.retry.conditions.RetryOnErrorCodeCondition
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration
import software.amazon.awssdk.core.retry.RetryPolicy
import software.amazon.awssdk.core.retry.backoff.FullJitterBackoffStrategy
import software.amazon.awssdk.core.retry.conditions.{OrRetryCondition, RetryCondition}
import software.amazon.awssdk.core.sync.RequestBody
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.model._
import software.amazon.awssdk.services.s3.{S3AsyncClient, S3Client, S3Configuration}

import java.net.URI
import java.nio.file.{FileAlreadyExistsException, Files, Path}
import java.time.Duration
import scala.collection.JavaConverters._
import scala.collection.immutable.Iterable
import scala.util.control.Breaks.{break, breakable}

object CreoS3Utils {
private val logger = LoggerFactory.getLogger(getClass)

private val cloudFerroRegion: Region = Region.of("RegionOne")

Expand Down Expand Up @@ -66,8 +76,175 @@ object CreoS3Utils {
overrideConfig
}

def deleteCreoSubFolder(bucket_name: String, subfolder: String) = {
//noinspection ScalaWeakerAccess
def deleteCreoSubFolder(bucket_name: String, subfolder: String): Unit = {
val s3Client = getCreoS3Client()
S3Utils.deleteSubFolder(s3Client, bucket_name, subfolder)
}

def isS3(path: String): Boolean = {
path.toLowerCase.startsWith("s3:/")
}

private def toAmazonS3URI(path: String): AmazonS3URI = {
val correctS3Path = path.replaceFirst("(?i)s3:/(?!/)", "s3://")
new AmazonS3URI(correctS3Path)
}

// In the following functions an asset path could be a local path or an S3 path.

/**
* S3 does not have folders, so we interpret the path as a prefix.
*/
def assetDeleteFolders(paths: Iterable[String]): Unit = {
for (path <- paths) {
if (isS3(path)) {
val s3Uri = toAmazonS3URI(path)
deleteCreoSubFolder(s3Uri.getBucket, s3Uri.getKey)
} else {
val p = Path.of(path)
if (Files.exists(p)) {
if (Files.isDirectory(p)) {
FileUtils.deleteDirectory(p.toFile)
} else {
throw new IllegalArgumentException(f"Can only delete directory here: $path")
}
}
}
}
}

def assetDelete(path: String): Unit = {
if (isS3(path)) {
val s3Uri = toAmazonS3URI(path)
val keys = Seq(path)
val deleteObjectsRequest = DeleteObjectsRequest.builder
.bucket(s3Uri.getBucket)
.delete(Delete.builder.objects(keys.map(key => ObjectIdentifier.builder.key(key).build).asJavaCollection).build)
.build
getCreoS3Client().deleteObjects(deleteObjectsRequest)
} else {
val p = Path.of(path)
if (Files.isDirectory(p)) {
throw new IllegalArgumentException(f"Cannot delete directory like this: $path")
} else {
Files.deleteIfExists(p)
}
}
}

def asseetPathListDirectChildren(path: String): Set[String] = {
if (isS3(path)) {
val s3Uri = toAmazonS3URI(path)
val listObjectsRequest = ListObjectsRequest.builder
.bucket(s3Uri.getBucket)
.prefix(s3Uri.getKey)
.build
val listObjectsResponse = getCreoS3Client().listObjects(listObjectsRequest)
listObjectsResponse.contents.asScala.map(o => f"s3://${s3Uri.getBucket}/${o.key}").toSet
} else {
val list = Files.list(Path.of(path))
List(list).map(_.toString).toSet
}
}

def assetExists(path: String): Boolean = {
if (isS3(path)) {
try {
// https://stackoverflow.com/a/56038360/1448736
val s3Uri = toAmazonS3URI(path)
val objectRequest = HeadObjectRequest.builder
.bucket(s3Uri.getBucket)
.key(s3Uri.getKey)
.build
getCreoS3Client().headObject(objectRequest)
true
} catch {
case _: NoSuchKeyException => false
}
} else {
Files.exists(Path.of(path))
}
}

def copyAsset(pathOrigin: String, pathDestination: String): Unit = {
if (isS3(pathOrigin) && isS3(pathDestination)) {
val s3UriOrigin = toAmazonS3URI(pathOrigin)
val s3UriDestination = toAmazonS3URI(pathDestination)
val copyRequest = CopyObjectRequest.builder
.sourceBucket(s3UriOrigin.getBucket)
.sourceKey(s3UriOrigin.getKey)
.destinationBucket(s3UriDestination.getBucket)
.destinationKey(s3UriDestination.getKey)
.build
getCreoS3Client().copyObject(copyRequest)
} else if (!isS3(pathOrigin) && !isS3(pathDestination)) {
Files.copy(Path.of(pathOrigin), Path.of(pathDestination))
} else if (!isS3(pathOrigin) && isS3(pathDestination)) {
uploadToS3(Path.of(pathOrigin), pathDestination)
} else if (isS3(pathOrigin) && !isS3(pathDestination)) {
// TODO: Download
throw new IllegalArgumentException(f"S3->local not supported here yet ($pathOrigin, $pathDestination)")
} else {
throw new IllegalArgumentException(f"Should be impossible to get here ($pathOrigin, $pathDestination)")
}
}

def moveAsset(pathOrigin: String, pathDestination: String): Unit = {
// This could be optimized using move when on file system.
copyAsset(pathOrigin, pathDestination)
assetDelete(pathOrigin)
}

def waitTillPathAvailable(path: Path): Unit = {
var retry = 0
val maxTries = 20
while (!assetExists(path.toString)) {
if (retry < maxTries) {
retry += 1
val seconds = 5
logger.info(f"Waiting for path to be available. Try $retry/$maxTries (sleep:$seconds seconds): $path")
Thread.sleep(seconds * 1000)
} else {
logger.warn(f"Path is not available after $maxTries tries: $path")
// Throw error instead?
return
}
}
}

def moveOverwriteWithRetries(oldPath: Path, newPath: Path): Unit = {
var try_count = 1
breakable {
while (true) {
try {
if (assetExists(newPath.toString)) {
// It might be a partial result of a previous failing task.
logger.info(f"Will replace $newPath. (try $try_count)")
}
Files.deleteIfExists(newPath)
Files.move(oldPath, newPath)
break
} catch {
case e: FileAlreadyExistsException =>
// Here if another executor wrote the file between the delete and the move statement.
try_count += 1
if (try_count > 5) {
throw e
}
}
}
}
}

def uploadToS3(localFile: Path, s3Path: String) = {
val s3Uri = toAmazonS3URI(s3Path)
val objectRequest = PutObjectRequest.builder
.bucket(s3Uri.getBucket)
.key(s3Uri.getKey)
.build

getCreoS3Client().putObject(objectRequest, RequestBody.fromFile(localFile))
s3Path
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,16 @@ import org.openeo.geotrellis.netcdf.NetCDFRDDWriter.fixedTimeOffset
import org.openeo.geotrellis.stac.STACItem
import org.openeo.geotrellis.tile_grid.TileGrid
import org.slf4j.LoggerFactory
import software.amazon.awssdk.core.sync.RequestBody
import software.amazon.awssdk.services.s3.model.PutObjectRequest
import spire.math.Integral
import spire.syntax.cfor.cfor

import java.io.IOException
import java.nio.channels.FileChannel
import java.nio.file.{FileAlreadyExistsException, Files, NoSuchFileException, Path, Paths}
import java.nio.file.{Files, Path, Paths}
import java.time.Duration
import java.time.format.DateTimeFormatter
import java.util.{ArrayList, Collections, Map, List => JList}
import scala.collection.JavaConverters._
import scala.reflect._
import scala.util.control.Breaks.{break, breakable}

package object geotiff {

Expand Down Expand Up @@ -118,7 +114,9 @@ package object geotiff {
val rand = new java.security.SecureRandom().nextLong()
val uniqueFolderName = executorAttemptDirectoryPrefix + java.lang.Long.toUnsignedString(rand)
val executorAttemptDirectory = Paths.get(parentDirectory + "/" + uniqueFolderName)
Files.createDirectories(executorAttemptDirectory)
if (!CreoS3Utils.isS3(parentDirectory.toString)) {
Files.createDirectories(executorAttemptDirectory)
}
executorAttemptDirectory
}

Expand All @@ -128,18 +126,17 @@ package object geotiff {
if (!relativePath.startsWith(executorAttemptDirectoryPrefix)) throw new Exception()
// Remove the executorAttemptDirectory part from the path:
val destinationPath = parentDirectory.resolve(relativePath.substring(relativePath.indexOf("/") + 1))
waitTillPathAvailable(Path.of(absolutePath))
Files.createDirectories(destinationPath.getParent)
Files.move(Path.of(absolutePath), destinationPath)
CreoS3Utils.waitTillPathAvailable(Path.of(absolutePath))
if (!CreoS3Utils.isS3(parentDirectory.toString)) {
Files.createDirectories(destinationPath.getParent)
}
CreoS3Utils.moveAsset(absolutePath, destinationPath.toString) // TODO: Use move instead of copy
destinationPath
}

private def cleanUpExecutorAttemptDirectory(parentDirectory: Path): Unit = {
Files.list(parentDirectory).forEach { p =>
if (Files.isDirectory(p) && p.getFileName.toString.startsWith(executorAttemptDirectoryPrefix)) {
FileUtils.deleteDirectory(p.toFile)
}
}
private def cleanUpExecutorAttemptDirectory(parentDirectory: String): Unit = {
val list = CreoS3Utils.asseetPathListDirectChildren(parentDirectory).filter(_.contains(executorAttemptDirectoryPrefix))
CreoS3Utils.assetDeleteFolders(list)
}

/**
Expand Down Expand Up @@ -238,7 +235,7 @@ package object geotiff {
(destinationPath.toString, timestamp, croppedExtent, bandIndices)
}.toList.asJava

cleanUpExecutorAttemptDirectory(Path.of(path))
cleanUpExecutorAttemptDirectory(path)

res
}
Expand Down Expand Up @@ -324,7 +321,7 @@ package object geotiff {

if (path.endsWith("out")) {
val beforeOut = path.substring(0, path.length - "out".length)
cleanUpExecutorAttemptDirectory(Path.of(beforeOut))
cleanUpExecutorAttemptDirectory(beforeOut)
}

res
Expand Down Expand Up @@ -778,7 +775,7 @@ package object geotiff {
(destinationPath.toString, croppedExtent)
}.toList.asJava

cleanUpExecutorAttemptDirectory(Path.of(path).getParent)
cleanUpExecutorAttemptDirectory(Path.of(path).getParent.toString)

res
}
Expand Down Expand Up @@ -921,65 +918,14 @@ package object geotiff {

if (path.startsWith("s3:/")) {
val correctS3Path = path.replaceFirst("s3:/(?!/)", "s3://")
uploadToS3(tempFile, correctS3Path)
CreoS3Utils.uploadToS3(tempFile, correctS3Path)
} else {
// Retry should not be needed at this point, but it is almost free to keep it.
moveOverwriteWithRetries(tempFile, Path.of(path))
CreoS3Utils.moveOverwriteWithRetries(tempFile, Path.of(path))
path
}
}

private def waitTillPathAvailable(path: Path): Unit = {
var retry = 0
val maxTries = 20
while (!path.toFile.exists()) {
if (retry < maxTries) {
retry += 1
val seconds = 5
logger.info(f"Waiting for path to be available. Try $retry/$maxTries (sleep:$seconds seconds): $path")
Thread.sleep(seconds * 1000)
} else {
logger.warn(f"Path is not available after $maxTries tries: $path")
// Throw error instead?
return
}
}
}

def moveOverwriteWithRetries(oldPath: Path, newPath: Path): Unit = {
var try_count = 1
breakable {
while (true) {
try {
if (newPath.toFile.exists()) {
// It might be a partial result of a previous failing task.
logger.info(f"Will replace $newPath. (try $try_count)")
}
Files.deleteIfExists(newPath)
Files.move(oldPath, newPath)
break
} catch {
case e: FileAlreadyExistsException =>
// Here if another executor wrote the file between the delete and the move statement.
try_count += 1
if (try_count > 5) {
throw e
}
}
}
}
}

def uploadToS3(localFile: Path, s3Path: String) = {
val s3Uri = new AmazonS3URI(s3Path)
val objectRequest = PutObjectRequest.builder
.bucket(s3Uri.getBucket)
.key(s3Uri.getKey)
.build

CreoS3Utils.getCreoS3Client().putObject(objectRequest, RequestBody.fromFile(localFile))
s3Path
}

case class ContextSeq[K, V, M](tiles: Iterable[(K, V)], metadata: LayoutDefinition) extends Seq[(K, V)] with Metadata[LayoutDefinition] {
override def length: Int = tiles.size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import geotrellis.raster.render.RGBA
import geotrellis.raster.{MultibandTile, UByteCellType}
import geotrellis.spark._
import geotrellis.vector.{Extent, ProjectedExtent}
import org.openeo.geotrellis.geotiff.{SRDD, uploadToS3}
import org.openeo.geotrellis.creo.CreoS3Utils.uploadToS3
import org.openeo.geotrellis.geotiff.SRDD

import java.io.File
import java.nio.file.{Files, Paths}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.openeo.geotrellis.stac

import org.openeo.geotrellis.geotiff.uploadToS3
import org.openeo.geotrellis.creo.CreoS3Utils.uploadToS3
import org.openeo.geotrellis.getTempFile
import org.slf4j.LoggerFactory

Expand Down

0 comments on commit f199970

Please sign in to comment.