From f916512e659af48819bf56753b91b84e19a5f4e5 Mon Sep 17 00:00:00 2001 From: Emile Sonneveld Date: Tue, 29 Oct 2024 10:13:02 +0100 Subject: [PATCH] Add waitTillPathAvailable when accessing file from driver that was written in executor. https://github.com/Open-EO/openeo-geotrellis-extensions/issues/329 --- .../openeo/geotrellis/geotiff/package.scala | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala index 575c968a..a50b727f 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala @@ -196,6 +196,7 @@ package object geotiff { // Move output file to standard location. (On S3, a move is more a copy and delete): val relativePath = Path.of(path).relativize(Path.of(absolutePath)).toString val destinationPath = Path.of(path).resolve(relativePath.substring(relativePath.indexOf("/") + 1)) + waitTillPathAvailable(Path.of(absolutePath)) Files.move(Path.of(absolutePath), destinationPath) (destinationPath.toString, timestamp, croppedExtent, bandIndices) }).toList.asJava @@ -277,6 +278,7 @@ package object geotiff { val beforeOut = path.substring(0, path.length - "out".length) val relativePath = Path.of(beforeOut).relativize(Path.of(absolutePath)).toString val destinationPath = beforeOut + relativePath.substring(relativePath.indexOf("/") + 1) + waitTillPathAvailable(Path.of(absolutePath)) Files.move(Path.of(absolutePath), Path.of(destinationPath)) (destinationPath, y) } else { @@ -875,11 +877,29 @@ package object geotiff { val correctS3Path = path.replaceFirst("s3:/(?!/)", "s3://") uploadToS3(tempFile, correctS3Path) } else { + // Retry should not be needed at this point, but it is almost free to keep it. moveOverwriteWithRetries(tempFile, Path.of(path)) path } } + private def waitTillPathAvailable(path: Path): Unit = { + var retry = 0 + val maxTries = 20 + while (!path.toFile.exists()) { + if (retry < maxTries) { + retry += 1 + val seconds = 5 + logger.info(f"Waiting for path to be available. Try $retry/$maxTries (sleep:$seconds seconds): $path") + Thread.sleep(seconds * 1000) + } else { + logger.warn(f"Path is not available after $maxTries tries: $path") + // Throw error instead? + return + } + } + } + def moveOverwriteWithRetries(oldPath: Path, newPath: Path): Unit = { var try_count = 1 breakable {