Skip to content

Commit

Permalink
Merge pull request #335 from Open-EO/move_file_to_results
Browse files Browse the repository at this point in the history
Move file to results
  • Loading branch information
EmileSonneveld authored Oct 17, 2024
2 parents 4d44e53 + 6819023 commit 244ba1e
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ import spire.math.Integral
import spire.syntax.cfor.cfor

import java.nio.channels.FileChannel
import java.nio.file.{FileAlreadyExistsException, Path, Paths}
import java.nio.file.{FileAlreadyExistsException, Files, NoSuchFileException, Path, Paths}
import java.time.Duration
import java.time.format.DateTimeFormatter
import java.util.{ArrayList, Collections, Map, List => JList}
import scala.collection.JavaConverters._
import scala.reflect._
import scala.util.control.Breaks.{break, breakable}

package object geotiff {

Expand Down Expand Up @@ -699,7 +700,6 @@ package object geotiff {
croppedExtent: Option[Extent], cropDimensions: Option[java.util.ArrayList[Int]],
compression: Compression, formatOptions: Option[GTiffOptions] = None
) = {
this.logger.info("stitchAndWriteToTiff tiles.size: " + tiles.size) // Remove before release
val raster: Raster[MultibandTile] = ContextSeq(tiles, layout).stitch()

val re = raster.rasterExtent
Expand Down Expand Up @@ -742,9 +742,7 @@ package object geotiff {
) {
geotiff = geotiff.withOverviews(NearestNeighbor, List(4, 8, 16))
}
val res = writeGeoTiff(geotiff, filePath)
this.logger.info("stitchAndWriteToTiff writeGeoTiff done. filePath: " + filePath) // Remove before release
res
writeGeoTiff(geotiff, filePath)
}

def saveSamples(rdd: MultibandTileLayerRDD[SpaceTimeKey],
Expand Down Expand Up @@ -827,7 +825,7 @@ package object geotiff {
.toList.asJava
}

private def writeGeoTiff(geoTiff: MultibandGeoTiff, path: String): String = {
def writeGeoTiff(geoTiff: MultibandGeoTiff, path: String): String = {
import java.nio.file.Files
if (path.startsWith("s3:/")) {
val correctS3Path = path.replaceFirst("s3:/(?!/)", "s3://")
Expand All @@ -838,31 +836,48 @@ package object geotiff {
uploadToS3(tempFile, correctS3Path)

} else {
val tempFile = Files.createTempFile(null, ".tif")
val tempFile = getTempFile(null, ".tif")
// TODO: Try to run fsync on the file opened by GeoTrellis (without the temporary copy)
geoTiff.write(tempFile.toString, optimizedOrder = true)

// Geotrellis writes the file piecewise and sometimes files are only partially written.
// Maybe a move operation is easier for the fusemount:
try {
Files.move(tempFile, Path.of(path))
} catch {
case e: FileAlreadyExistsException =>
logger.info("FileAlreadyExistsException. Will overwrite file: " + e.getMessage)
// The existing file could be a partial result of a previous failing Spark task.
Files.deleteIfExists(Path.of(path))
Files.move(tempFile, Path.of(path))
}
// TODO: Write to unique path instead to avoid collisions between executors. Let the driver choose the paths.
moveOverwriteWithRetries(tempFile, Path.of(path))

// Call fsync on the parent path to assure the fusemount is up-to-date.
// The equivalent of Python's os.fsync
FileChannel.open(Path.of(path)).force(true)

try {
FileChannel.open(Path.of(path)).force(true)
} catch {
case _: NoSuchFileException => // Ignore. The file may already be deleted by another executor
}
path
}

}

def moveOverwriteWithRetries(oldPath: Path, newPath: Path): Unit = {
var try_count = 1
breakable {
while (true) {
try {
if (newPath.toFile.exists()) {
// It might be a partial result of a previous failing task.
logger.info(f"Will replace $newPath. (try $try_count)")
}
Files.deleteIfExists(newPath)
Files.move(oldPath, newPath)
break
} catch {
case e: FileAlreadyExistsException =>
// Here if another executor wrote the file between the delete and the move statement.
try_count += 1
if (try_count > 5) {
throw e
}
}
}
}
}

def uploadToS3(localFile: Path, s3Path: String) = {
val s3Uri = new AmazonS3URI(s3Path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ package object geotrellis {
}

/**
* Inspired on 'Files.createTempFile'
* Inspired on 'Files.createTempFile', but does not create an empty file.
* The default permissions of 'createTempFile' are a bit too strict too: 600, which is not accessible by other users.
* This function could have default 664 for example.
*/
def getTempFile(prefix: String, suffix: String): Path = {
val prefixNonNull = if (prefix == null) "" else suffix
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package org.openeo.geotrellis

import geotrellis.raster.io.geotiff.GeoTiff
import geotrellis.raster.{ByteCellType, ByteUserDefinedNoDataCellType, FloatUserDefinedNoDataCellType, UByteCellType, UByteUserDefinedNoDataCellType}
import org.junit.Assert._
import org.junit.Test
import org.openeo.geotrellis.geotiff._

import java.nio.file.{Files, Path}

class PackageTest {
@Test
Expand All @@ -12,4 +16,23 @@ class PackageTest {
assertEquals(FloatUserDefinedNoDataCellType(42), toSigned(FloatUserDefinedNoDataCellType(42)))
assertEquals(ByteUserDefinedNoDataCellType(42), toSigned(ByteUserDefinedNoDataCellType(42)))
}

@Test
def testFileMove(): Unit = {
val refFile = Thread.currentThread().getContextClassLoader.getResource("org/openeo/geotrellis/Sentinel2FileLayerProvider_multiband_reference.tif")
val refTiff = GeoTiff.readMultiband(refFile.getPath)
val p = Path.of(f"tmp/testFileMove/")
Files.createDirectories(p)

(1 to 20).foreach { i =>
val dst = Path.of(p + f"/$i.tif")
// Limit the amount of parallel jobs to avoid getting over the max retries
(1 to 4).par.foreach { _ =>
writeGeoTiff(refTiff, dst.toString)
}
assertTrue(Files.exists(dst))
val refTiff2 = GeoTiff.readMultiband(dst.toString)
assertEquals(refTiff2.cellSize, refTiff.cellSize)
}
}
}

0 comments on commit 244ba1e

Please sign in to comment.