Skip to content

Commit

Permalink
Save output from executors to unique folders. Move the successful res…
Browse files Browse the repository at this point in the history
…ults to the output folder from driver, and clean up afterwards. #329
  • Loading branch information
EmileSonneveld committed Oct 17, 2024
1 parent 244ba1e commit 3e67e96
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import geotrellis.spark.pyramid.Pyramid
import geotrellis.store.s3._
import geotrellis.util._
import geotrellis.vector.{ProjectedExtent, _}
import org.apache.commons.io.FilenameUtils
import org.apache.commons.io.{FileUtils, FilenameUtils}
import org.apache.spark.SparkContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -134,7 +134,7 @@ package object geotiff {
val bandSegmentCount = totalCols * totalRows
val bandLabels = formatOptions.tags.bandTags.map(_("DESCRIPTION"))

preprocessedRdd.flatMap { case (key: SpaceTimeKey, multibandTile: MultibandTile) =>
val res = preprocessedRdd.flatMap { case (key: SpaceTimeKey, multibandTile: MultibandTile) =>
var bandIndex = -1
//Warning: for deflate compression, the segmentcount and index is not really used, making it stateless.
//Not sure how this works out for other types of compression!!!
Expand Down Expand Up @@ -173,7 +173,12 @@ package object geotiff {
val bandIndices = sequence.map(_._3).toSet.toList.asJava

val segmentCount = bandSegmentCount * tiffBands
val thePath = Paths.get(path).resolve(filename).toString

// Each executor writes to a unique folder to avoid conflicts:
val uniqueFolderName = "tmp" + java.lang.Long.toUnsignedString(new java.security.SecureRandom().nextLong())
val base = Paths.get(path + "/" + uniqueFolderName)
Files.createDirectories(base)
val thePath = base.resolve(filename).toString

// filter band tags that match bandIndices
val fo = formatOptions.deepClone()
Expand All @@ -186,8 +191,22 @@ package object geotiff {
tileLayout, compression, cellTypes.head, tiffBands, segmentCount, fo,
)
(correctedPath, timestamp, croppedExtent, bandIndices)
}.collect().toList.asJava

}.collect().map({
case (absolutePath, timestamp, croppedExtent, bandIndices) =>
// Move output file to standard location:
val relativePath = Path.of(path).relativize(Path.of(absolutePath)).toString
val destinationPath = Path.of(path).resolve(relativePath.substring(relativePath.indexOf("/") + 1))
Files.move(Path.of(absolutePath), destinationPath)
(destinationPath.toString, timestamp, croppedExtent, bandIndices)
}).toList.asJava

// Clean up failed tasks:
Files.list(Path.of(path)).forEach { p =>
if (Files.isDirectory(p) && p.getFileName.toString.startsWith("tmp")) {
FileUtils.deleteDirectory(p.toFile)
}
}
res
}


Expand Down Expand Up @@ -232,10 +251,13 @@ package object geotiff {
((name, bandIndex), (key, t))
}
}
rdd_per_band.groupByKey().map { case ((name, bandIndex), tiles) =>
val res = rdd_per_band.groupByKey().map { case ((name, bandIndex), tiles) =>
val uniqueFolderName = "tmp" + java.lang.Long.toUnsignedString(new java.security.SecureRandom().nextLong())
val fixedPath =
if (path.endsWith("out")) {
path.substring(0, path.length - 3) + name
val base = path.substring(0, path.length - 3) + uniqueFolderName + "/"
Files.createDirectories(Path.of(base))
base + name
}
else {
path
Expand All @@ -248,7 +270,27 @@ package object geotiff {

(stitchAndWriteToTiff(tiles, fixedPath, layout, crs, extent, None, None, compression, Some(fo)),
Collections.singletonList(bandIndex))
}.collect().toList.sortBy(_._1).asJava
}.collect().map({
case (absolutePath, y) =>
if (path.endsWith("out")) {
// Move output file to standard location:
val beforeOut = path.substring(0, path.length - "out".length)
val relativePath = Path.of(beforeOut).relativize(Path.of(absolutePath)).toString
val destinationPath = beforeOut + relativePath.substring(relativePath.indexOf("/") + 1)
Files.move(Path.of(absolutePath), Path.of(destinationPath))
(destinationPath, y)
} else {
(absolutePath, y)
}
}).toList.sortBy(_._1).asJava
// Clean up failed tasks:
val beforeOut = path.substring(0, path.length - "out".length)
Files.list(Path.of(beforeOut)).forEach { p =>
if (Files.isDirectory(p) && p.getFileName.toString.startsWith("tmp")) {
FileUtils.deleteDirectory(p.toFile)
}
}
res
} else {
val tmp = saveRDDGeneric(rdd, bandCount, path, zLevel, cropBounds, formatOptions).asScala
tmp.map(t => (t, (0 until bandCount).toList.asJava)).asJava
Expand Down Expand Up @@ -826,33 +868,16 @@ package object geotiff {
}

def writeGeoTiff(geoTiff: MultibandGeoTiff, path: String): String = {
import java.nio.file.Files
val tempFile = getTempFile(null, ".tif")
geoTiff.write(tempFile.toString, optimizedOrder = true)

if (path.startsWith("s3:/")) {
val correctS3Path = path.replaceFirst("s3:/(?!/)", "s3://")


val tempFile = Files.createTempFile(null, null)
geoTiff.write(tempFile.toString, optimizedOrder = true)
uploadToS3(tempFile, correctS3Path)

} else {
val tempFile = getTempFile(null, ".tif")
// TODO: Try to run fsync on the file opened by GeoTrellis (without the temporary copy)
geoTiff.write(tempFile.toString, optimizedOrder = true)

// TODO: Write to unique path instead to avoid collisions between executors. Let the driver choose the paths.
moveOverwriteWithRetries(tempFile, Path.of(path))

// Call fsync on the parent path to assure the fusemount is up-to-date.
// The equivalent of Python's os.fsync
try {
FileChannel.open(Path.of(path)).force(true)
} catch {
case _: NoSuchFileException => // Ignore. The file may already be deleted by another executor
}
path
}

}

def moveOverwriteWithRetries(oldPath: Path, newPath: Path): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.openeo.geotrellis.{LayerFixtures, OpenEOProcesses, ProjectedPolygons}
import org.openeo.sparklisteners.GetInfoSparkListener
import org.slf4j.{Logger, LoggerFactory}

import java.nio.file.{Files, Paths}
import java.nio.file.{Files, Path, Paths}
import java.time.{LocalDate, LocalTime, ZoneOffset, ZonedDateTime}
import java.util
import java.util.zip.Deflater._
Expand Down Expand Up @@ -77,6 +77,7 @@ class WriteRDDToGeotiffTest {

val tileLayerRDD = TileLayerRDDBuilders.createMultibandTileLayerRDD(WriteRDDToGeotiffTest.sc,MultibandTile(imageTile),TileLayout(layoutCols,layoutRows,256,256),LatLng)
val filename = "out.tif"
Files.deleteIfExists(Path.of(filename))

saveRDD(tileLayerRDD.withContext{_.repartition(layoutCols*layoutRows)},1,filename,formatOptions = allOverviewOptions)

Expand Down Expand Up @@ -151,6 +152,10 @@ class WriteRDDToGeotiffTest {

@Test
def testWriteRDD_apply_neighborhood(): Unit ={
val outDir = Paths.get("tmp/testWriteRDD_apply_neighborhood/")
new Directory(outDir.toFile).deepList().foreach(_.delete())
Files.createDirectories(outDir)

val layoutCols = 8
val layoutRows = 4

Expand All @@ -164,10 +169,11 @@ class WriteRDDToGeotiffTest {
val buffered: MultibandTileLayerRDD[SpaceTimeKey] = p.remove_overlap(p.retileGeneric(tileLayerRDD,224,224,16,16),224,224,16,16)

val cropBounds = Extent(-115, -65, 5.0, 56)
saveRDDTemporal(buffered,"./",cropBounds = Some(cropBounds))
saveRDDTemporal(buffered, outDir.toString, cropBounds = Some(cropBounds))

val croppedRaster: Raster[MultibandTile] = tileLayerRDD.toSpatial().stitch().crop(cropBounds)
val referenceFile = "croppedRaster.tif"
Files.deleteIfExists(Path.of(referenceFile))
GeoTiff(croppedRaster,LatLng).write(referenceFile)

val result = GeoTiff.readMultiband(filename).raster
Expand All @@ -190,6 +196,7 @@ class WriteRDDToGeotiffTest {

val tileLayerRDD = TileLayerRDDBuilders.createMultibandTileLayerRDD(WriteRDDToGeotiffTest.sc,MultibandTile(imageTile,secondBand,thirdBand),TileLayout(layoutCols,layoutRows,256,256),LatLng)
val filename = "outRGB.tif"
Files.deleteIfExists(Path.of(filename))
saveRDD(tileLayerRDD.withContext{_.repartition(layoutCols*layoutRows)},3,filename)
val result = GeoTiff.readMultiband(filename).raster.tile
assertArrayEquals(imageTile.toArray(),result.band(0).toArray())
Expand All @@ -216,8 +223,10 @@ class WriteRDDToGeotiffTest {

val croppedRaster: Raster[MultibandTile] = tileLayerRDD.stitch().crop(cropBounds)
val referenceFile = "croppedRaster.tif"
Files.deleteIfExists(Path.of(referenceFile))
GeoTiff(croppedRaster,LatLng).write(referenceFile)
val filename = "outRGBCropped3.tif"
Files.deleteIfExists(Path.of(filename))
saveRDD(tileLayerRDD.withContext{_.repartition(layoutCols*layoutRows)},3,filename,cropBounds = Some(cropBounds))
val result = GeoTiff.readMultiband(filename).raster
val reference = GeoTiff.readMultiband(referenceFile).raster
Expand Down Expand Up @@ -248,6 +257,7 @@ class WriteRDDToGeotiffTest {
GeoTiff(croppedRaster,LatLng).write(referenceFile)

val filename = "outCropped.tif"
Files.deleteIfExists(Path.of(filename))
saveRDD(tileLayerRDD.withContext{_.repartition(tileLayerRDD.count().toInt)},3,filename,cropBounds = Some(cropBounds))
val resultRaster = GeoTiff.readMultiband(filename).raster

Expand All @@ -269,6 +279,7 @@ class WriteRDDToGeotiffTest {
val tileLayerRDD = TileLayerRDDBuilders.createMultibandTileLayerRDD(WriteRDDToGeotiffTest.sc,MultibandTile(imageTile),TileLayout(layoutCols,layoutRows,256,256),LatLng)
val empty = tileLayerRDD.withContext{_.filter(_ => false)}
val filename = "outEmpty.tif"
Files.deleteIfExists(Path.of(filename))
val cropBounds = Extent(-115, -65, 5.0, 56)
saveRDD(empty,-1,filename,cropBounds = Some(cropBounds))

Expand Down Expand Up @@ -311,7 +322,7 @@ class WriteRDDToGeotiffTest {
val (imageTile: ByteArrayTile, filtered: MultibandTileLayerRDD[SpatialKey]) = LayerFixtures.createLayerWithGaps(layoutCols, layoutRows)

val outDir = Paths.get("tmp/testWriteMultibandRDDWithGapsSeparateAssetPerBand/")
new Directory(outDir.toFile).deepFiles.foreach(_.delete())
new Directory(outDir.toFile).deepList().foreach(_.delete())
Files.createDirectories(outDir)

val filename = outDir + "/out"
Expand Down

0 comments on commit 3e67e96

Please sign in to comment.