Skip to content

Commit

Permalink
Try uploadToS3TryFirstWithStreaming. Only on CDSE-dev and CDSE-stagin…
Browse files Browse the repository at this point in the history
…g with extra logging and a try-catch. #347
  • Loading branch information
EmileSonneveld committed Nov 7, 2024
1 parent 8f51db3 commit 554590c
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package org.openeo.geotrellis.creo

import geotrellis.store.s3.AmazonS3URI
import org.apache.commons.io.FileUtils
import org.apache.spark.SparkContext
import org.openeo.geotrelliss3.S3Utils
import org.slf4j.LoggerFactory
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
Expand All @@ -14,9 +15,11 @@ import software.amazon.awssdk.core.sync.RequestBody
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.model._
import software.amazon.awssdk.services.s3.{S3AsyncClient, S3Client, S3Configuration}
import software.amazon.awssdk.transfer.s3.S3TransferManager
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest

import java.net.URI
import java.nio.file.{FileAlreadyExistsException, Files, Path}
import java.nio.file.{Files, Path}
import java.time.Duration
import scala.collection.JavaConverters._
import scala.collection.immutable.Iterable
Expand Down Expand Up @@ -246,4 +249,46 @@ object CreoS3Utils {
getCreoS3Client().putObject(objectRequest, RequestBody.fromFile(localFile))
s3Path
}


def uploadToS3LargeFile(localPath: Path, s3Path: String): String = {
val s3Uri = toAmazonS3URI(s3Path)

val putRequest = PutObjectRequest.builder
.bucket(s3Uri.getBucket)
.key(s3Uri.getKey)
.build
val uploadFileRequest = UploadFileRequest.builder
.putObjectRequest(putRequest)
.source(localPath)
.build

val transferManager = S3TransferManager.builder
.s3Client(CreoS3Utils.getAsyncClient())
.build
val fileUpload = transferManager.uploadFile(uploadFileRequest)

fileUpload.completionFuture.join
s3Path
}

def uploadToS3TryFirstWithStreaming(localPath: Path, s3Path: String): String = {
val context = SparkContext.getOrCreate
if (Seq("spark-jobs-staging", "spark-jobs-dev").contains(context.getConf.get("spark.kubernetes.namespace", "nothing"))) {
// TODO: Streaming to s3 could cause error, so disable on prod for the moment
// py4j.protocol.Py4JJavaError: An error occurred while calling z:org.openeo.geotrellis.netcdf.NetCDFRDDWriter.writeRasters.
//: java.util.concurrent.CompletionException: software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status Code: 400, Request ID: tx0000000000000613fcf8d-00655f6998-84eddc61-default)
try {
logger.info(f"uploadToS3TryFirstWithStreaming: Try to upload with streaming")
uploadToS3LargeFile(localPath, s3Path)
} catch {
case e: Exception =>
logger.warn(f"uploadToS3TryFirstWithStreaming: Failed to upload with streaming, trying with regular upload: $e")
uploadToS3(localPath, s3Path)
}
uploadToS3LargeFile(localPath, s3Path)
} else {
uploadToS3(localPath, s3Path)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ package object geotiff {
// The following line corrects this:
val correctS3Path = path.replaceFirst("s3:/(?!/)", "s3://")
if (fileExists) {
CreoS3Utils.uploadToS3(tempFile, correctS3Path)
CreoS3Utils.uploadToS3TryFirstWithStreaming(tempFile, path)
}
(correctS3Path, fileExists)
} else {
Expand Down

0 comments on commit 554590c

Please sign in to comment.