From 9e70298b6ffcb1a8abd778f2a7c0b1b1911f849d Mon Sep 17 00:00:00 2001 From: Emile Sonneveld Date: Wed, 16 Oct 2024 11:33:01 +0200 Subject: [PATCH 1/6] Move tiff file to result, to avoid arriving empty on fusemount. https://github.com/Open-EO/openeo-geotrellis-extensions/issues/329 --- .../openeo/geotrellis/geotiff/package.scala | 38 ++++++++++++------- .../org/openeo/geotrellis/PackageTest.scala | 23 +++++++++++ 2 files changed, 48 insertions(+), 13 deletions(-) diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala index b6cc19226..6d98706a9 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala @@ -33,12 +33,13 @@ import spire.math.Integral import spire.syntax.cfor.cfor import java.nio.channels.FileChannel -import java.nio.file.{FileAlreadyExistsException, Path, Paths} +import java.nio.file.{FileAlreadyExistsException, Files, Path, Paths} import java.time.Duration import java.time.format.DateTimeFormatter import java.util.{ArrayList, Collections, Map, List => JList} import scala.collection.JavaConverters._ import scala.reflect._ +import scala.util.control.Breaks.break package object geotiff { @@ -827,7 +828,7 @@ package object geotiff { .toList.asJava } - private def writeGeoTiff(geoTiff: MultibandGeoTiff, path: String): String = { + def writeGeoTiff(geoTiff: MultibandGeoTiff, path: String): String = { import java.nio.file.Files if (path.startsWith("s3:/")) { val correctS3Path = path.replaceFirst("s3:/(?!/)", "s3://") @@ -842,17 +843,7 @@ package object geotiff { // TODO: Try to run fsync on the file opened by GeoTrellis (without the temporary copy) geoTiff.write(tempFile.toString, optimizedOrder = true) - // Geotrellis writes the file piecewise and sometimes files are only partially written. - // Maybe a move operation is easier for the fusemount: - try { - Files.move(tempFile, Path.of(path)) - } catch { - case e: FileAlreadyExistsException => - logger.info("FileAlreadyExistsException. Will overwrite file: " + e.getMessage) - // The existing file could be a partial result of a previous failing Spark task. - Files.deleteIfExists(Path.of(path)) - Files.move(tempFile, Path.of(path)) - } + moveOverwriteWithRetries(tempFile, Path.of(path)) // Call fsync on the parent path to assure the fusemount is up-to-date. // The equivalent of Python's os.fsync @@ -863,6 +854,27 @@ package object geotiff { } + def moveOverwriteWithRetries(oldPath: Path, newPath: Path): Unit = { + var try_count = 1 + while (true) { + try { + if (newPath.toFile.exists()) { + // It might be a partial result of a previous failing task. + logger.info(f"Will replace $newPath. (try $try_count)") + } + Files.deleteIfExists(newPath) + Files.move(oldPath, newPath) + break + } catch { + case e: FileAlreadyExistsException => + // Here if another executor wrote the file between the delete and the move statement. + try_count += 1 + if (try_count > 5) { + throw e + } + } + } + } def uploadToS3(localFile: Path, s3Path: String) = { val s3Uri = new AmazonS3URI(s3Path) diff --git a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/PackageTest.scala b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/PackageTest.scala index 1eae19c94..316d9dbf7 100644 --- a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/PackageTest.scala +++ b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/PackageTest.scala @@ -1,8 +1,12 @@ package org.openeo.geotrellis +import geotrellis.raster.io.geotiff.GeoTiff import geotrellis.raster.{ByteCellType, ByteUserDefinedNoDataCellType, FloatUserDefinedNoDataCellType, UByteCellType, UByteUserDefinedNoDataCellType} import org.junit.Assert._ import org.junit.Test +import org.openeo.geotrellis.geotiff._ + +import java.nio.file.{Files, Path} class PackageTest { @Test @@ -12,4 +16,23 @@ class PackageTest { assertEquals(FloatUserDefinedNoDataCellType(42), toSigned(FloatUserDefinedNoDataCellType(42))) assertEquals(ByteUserDefinedNoDataCellType(42), toSigned(ByteUserDefinedNoDataCellType(42))) } + + @Test + def testFileMove(): Unit = { + val refFile = Thread.currentThread().getContextClassLoader.getResource("org/openeo/geotrellis/Sentinel2FileLayerProvider_multiband_reference.tif") + val refTiff = GeoTiff.readMultiband(refFile.getPath) + val p = Path.of(f"tmp/testFileMove/") + Files.createDirectories(p) + + (1 to 20).foreach { i => + val dst = Path.of(p + f"/$i.tif") + // Limit the amount of parallel jobs to avoid getting over the max retries + (1 to 4).par.foreach { _ => + writeGeoTiff(refTiff, dst.toString) + assertTrue(Files.exists(dst)) + } + val refTiff2 = GeoTiff.readMultiband(dst.toString) + assertEquals(refTiff2.cellSize, refTiff.cellSize) + } + } } From 758286fb9daa28aa5186401e16deaf245b32b22d Mon Sep 17 00:00:00 2001 From: Emile Sonneveld Date: Wed, 16 Oct 2024 12:36:09 +0200 Subject: [PATCH 2/6] Fix error: scala.util.control.BreakControl --- .../openeo/geotrellis/geotiff/package.scala | 34 ++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala index 6d98706a9..dcce32a9f 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala @@ -39,7 +39,7 @@ import java.time.format.DateTimeFormatter import java.util.{ArrayList, Collections, Map, List => JList} import scala.collection.JavaConverters._ import scala.reflect._ -import scala.util.control.Breaks.break +import scala.util.control.Breaks.{break, breakable} package object geotiff { @@ -856,22 +856,24 @@ package object geotiff { def moveOverwriteWithRetries(oldPath: Path, newPath: Path): Unit = { var try_count = 1 - while (true) { - try { - if (newPath.toFile.exists()) { - // It might be a partial result of a previous failing task. - logger.info(f"Will replace $newPath. (try $try_count)") - } - Files.deleteIfExists(newPath) - Files.move(oldPath, newPath) - break - } catch { - case e: FileAlreadyExistsException => - // Here if another executor wrote the file between the delete and the move statement. - try_count += 1 - if (try_count > 5) { - throw e + breakable { + while (true) { + try { + if (newPath.toFile.exists()) { + // It might be a partial result of a previous failing task. + logger.info(f"Will replace $newPath. (try $try_count)") } + Files.deleteIfExists(newPath) + Files.move(oldPath, newPath) + break + } catch { + case e: FileAlreadyExistsException => + // Here if another executor wrote the file between the delete and the move statement. + try_count += 1 + if (try_count > 5) { + throw e + } + } } } } From c9019ab8bb62cd1266be642883b9c2e21c22e163 Mon Sep 17 00:00:00 2001 From: Emile Sonneveld Date: Wed, 16 Oct 2024 13:30:15 +0200 Subject: [PATCH 3/6] File may also be deleted by other executor, so try-catch around FileChannel.force. --- .../scala/org/openeo/geotrellis/geotiff/package.scala | 9 ++++++--- .../test/scala/org/openeo/geotrellis/PackageTest.scala | 2 +- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala index dcce32a9f..86fe1dacd 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala @@ -33,7 +33,7 @@ import spire.math.Integral import spire.syntax.cfor.cfor import java.nio.channels.FileChannel -import java.nio.file.{FileAlreadyExistsException, Files, Path, Paths} +import java.nio.file.{FileAlreadyExistsException, Files, NoSuchFileException, Path, Paths} import java.time.Duration import java.time.format.DateTimeFormatter import java.util.{ArrayList, Collections, Map, List => JList} @@ -847,8 +847,11 @@ package object geotiff { // Call fsync on the parent path to assure the fusemount is up-to-date. // The equivalent of Python's os.fsync - FileChannel.open(Path.of(path)).force(true) - + try { + FileChannel.open(Path.of(path)).force(true) + } catch { + case _: NoSuchFileException => // Ignore. The file may already be deleted by another executor + } path } diff --git a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/PackageTest.scala b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/PackageTest.scala index 316d9dbf7..3144aedb5 100644 --- a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/PackageTest.scala +++ b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/PackageTest.scala @@ -29,8 +29,8 @@ class PackageTest { // Limit the amount of parallel jobs to avoid getting over the max retries (1 to 4).par.foreach { _ => writeGeoTiff(refTiff, dst.toString) - assertTrue(Files.exists(dst)) } + assertTrue(Files.exists(dst)) val refTiff2 = GeoTiff.readMultiband(dst.toString) assertEquals(refTiff2.cellSize, refTiff.cellSize) } From 945d360768bb459147ffca8626e94d5830f71984 Mon Sep 17 00:00:00 2001 From: Emile Sonneveld Date: Thu, 17 Oct 2024 00:53:50 +0200 Subject: [PATCH 4/6] Clean up logging. --- .../main/scala/org/openeo/geotrellis/geotiff/package.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala index 86fe1dacd..7fb433c12 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala @@ -700,7 +700,6 @@ package object geotiff { croppedExtent: Option[Extent], cropDimensions: Option[java.util.ArrayList[Int]], compression: Compression, formatOptions: Option[GTiffOptions] = None ) = { - this.logger.info("stitchAndWriteToTiff tiles.size: " + tiles.size) // Remove before release val raster: Raster[MultibandTile] = ContextSeq(tiles, layout).stitch() val re = raster.rasterExtent @@ -743,9 +742,7 @@ package object geotiff { ) { geotiff = geotiff.withOverviews(NearestNeighbor, List(4, 8, 16)) } - val res = writeGeoTiff(geotiff, filePath) - this.logger.info("stitchAndWriteToTiff writeGeoTiff done. filePath: " + filePath) // Remove before release - res + writeGeoTiff(geotiff, filePath) } def saveSamples(rdd: MultibandTileLayerRDD[SpaceTimeKey], From 704ddb3572f1a457f1539371cb35b88dda6bf369 Mon Sep 17 00:00:00 2001 From: Emile Sonneveld Date: Thu, 17 Oct 2024 05:35:23 +0200 Subject: [PATCH 5/6] Fix file permissions problem. https://github.com/Open-EO/openeo-geotrellis-extensions/issues/329 --- .../main/scala/org/openeo/geotrellis/geotiff/package.scala | 2 +- .../src/main/scala/org/openeo/geotrellis/package.scala | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala index 7fb433c12..2067799fb 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala @@ -836,7 +836,7 @@ package object geotiff { uploadToS3(tempFile, correctS3Path) } else { - val tempFile = Files.createTempFile(null, ".tif") + val tempFile = getTempFile(null, ".tif") // TODO: Try to run fsync on the file opened by GeoTrellis (without the temporary copy) geoTiff.write(tempFile.toString, optimizedOrder = true) diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/package.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/package.scala index 685f6781a..73fac17fd 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/package.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/package.scala @@ -128,7 +128,9 @@ package object geotrellis { } /** - * Inspired on 'Files.createTempFile' + * Inspired on 'Files.createTempFile', but does not create an empty file. + * The default permissions of 'createTempFile' are a bit too strict too: 600, which is not accessible by other users. + * This function could have default 664 for example. */ def getTempFile(prefix: String, suffix: String): Path = { val prefixNonNull = if (prefix == null) "" else suffix From 6819023a80111bed09e63930add26d203dd17bd8 Mon Sep 17 00:00:00 2001 From: Emile Sonneveld Date: Thu, 17 Oct 2024 14:39:56 +0200 Subject: [PATCH 6/6] Comment --- .../src/main/scala/org/openeo/geotrellis/geotiff/package.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala index 2067799fb..98ac2c12a 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala @@ -840,6 +840,7 @@ package object geotiff { // TODO: Try to run fsync on the file opened by GeoTrellis (without the temporary copy) geoTiff.write(tempFile.toString, optimizedOrder = true) + // TODO: Write to unique path instead to avoid collisions between executors. Let the driver choose the paths. moveOverwriteWithRetries(tempFile, Path.of(path)) // Call fsync on the parent path to assure the fusemount is up-to-date.