Skip to content

Commit

Permalink
Add waitTillPathAvailable when accessing file from driver that was wr…
Browse files Browse the repository at this point in the history
…itten in executor. #329
  • Loading branch information
EmileSonneveld committed Oct 29, 2024
1 parent 003736c commit f916512
Showing 1 changed file with 20 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ package object geotiff {
// Move output file to standard location. (On S3, a move is more a copy and delete):
val relativePath = Path.of(path).relativize(Path.of(absolutePath)).toString
val destinationPath = Path.of(path).resolve(relativePath.substring(relativePath.indexOf("/") + 1))
waitTillPathAvailable(Path.of(absolutePath))
Files.move(Path.of(absolutePath), destinationPath)
(destinationPath.toString, timestamp, croppedExtent, bandIndices)
}).toList.asJava
Expand Down Expand Up @@ -277,6 +278,7 @@ package object geotiff {
val beforeOut = path.substring(0, path.length - "out".length)
val relativePath = Path.of(beforeOut).relativize(Path.of(absolutePath)).toString
val destinationPath = beforeOut + relativePath.substring(relativePath.indexOf("/") + 1)
waitTillPathAvailable(Path.of(absolutePath))
Files.move(Path.of(absolutePath), Path.of(destinationPath))
(destinationPath, y)
} else {
Expand Down Expand Up @@ -875,11 +877,29 @@ package object geotiff {
val correctS3Path = path.replaceFirst("s3:/(?!/)", "s3://")
uploadToS3(tempFile, correctS3Path)
} else {
// Retry should not be needed at this point, but it is almost free to keep it.
moveOverwriteWithRetries(tempFile, Path.of(path))
path
}
}

private def waitTillPathAvailable(path: Path): Unit = {
var retry = 0
val maxTries = 20
while (!path.toFile.exists()) {
if (retry < maxTries) {
retry += 1
val seconds = 5
logger.info(f"Waiting for path to be available. Try $retry/$maxTries (sleep:$seconds seconds): $path")
Thread.sleep(seconds * 1000)
} else {
logger.warn(f"Path is not available after $maxTries tries: $path")
// Throw error instead?
return
}
}
}

def moveOverwriteWithRetries(oldPath: Path, newPath: Path): Unit = {
var try_count = 1
breakable {
Expand Down

0 comments on commit f916512

Please sign in to comment.