From 64d4812b5851a87b7bda128defe11f6584daebd0 Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Tue, 20 Aug 2024 10:28:25 +0200 Subject: [PATCH] improve FileLayerProvider resilience * retryForever: no delay after final failure https://github.com/eu-cdse/openeo-cdse-infra/issues/196 * smaller job runs successfully locally https://github.com/eu-cdse/openeo-cdse-infra/issues/196 * simple GDALRasterSource.read is also successful https://github.com/eu-cdse/openeo-cdse-infra/issues/196 * optimize retryForever - remove outer retryForever in favor of more attempts for inner retryForever - optimization: implement with exponential back-off https://github.com/eu-cdse/openeo-cdse-infra/issues/196 * disable test https://github.com/eu-cdse/openeo-cdse-infra/issues/196 * restore retry of RasterSource.reproject() as it can fail at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2328) at org.apache.spark.rdd.RDD.count(RDD.scala:1266) at org.openeo.geotrellis.netcdf.NetCDFRDDWriter$.cacheAndRepartition(NetCDFRDDWriter.scala:267) at org.openeo.geotrellis.netcdf.NetCDFRDDWriter$.saveSingleNetCDFGeneric(NetCDFRDDWriter.scala:126) at org.openeo.geotrellis.netcdf.NetCDFRDDWriter$.saveSingleNetCDFGeneric(NetCDFRDDWriter.scala:108) at org.openeo.geotrellis.netcdf.NetCDFRDDWriter$.writeRasters(NetCDFRDDWriter.scala:80) at org.openeo.geotrellis.netcdf.NetCDFRDDWriter.writeRasters(NetCDFRDDWriter.scala) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.io.IOException: load_collection/load_stac: error while reading from: /vsis3/EODATA/Sentinel-2/MSI/L2A_N0500/2018/03/27/S2A_MSIL2A_20180327T114351_N0500_R123_T29UNV_20230828T122340.SAFE/GRANULE/L2A_T29UNV_A014420_20180327T114351/IMG_DATA/R10m/T29UNV_20180327T114351_B08_10m.jp2. Detailed error: Unable to parse projection as CRS. GDAL Error Code: 4 at org.openeo.geotrellis.layers.FileLayerProvider$.$anonfun$loadPartitionBySource$1(FileLayerProvider.scala:663) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ... 1 more Caused by: geotrellis.raster.gdal.MalformedProjectionException: Unable to parse projection as CRS. GDAL Error Code: 4 at geotrellis.raster.gdal.GDALDataset$.$anonfun$crs$1(GDALDataset.scala:293) at geotrellis.raster.gdal.GDALDataset$.$anonfun$crs$1$adapted(GDALDataset.scala:290) at geotrellis.raster.gdal.GDALDataset$.errorHandler$extension(GDALDataset.scala:422) at geotrellis.raster.gdal.GDALDataset$.crs$extension1(GDALDataset.scala:290) at geotrellis.raster.gdal.GDALDataset$.crs$extension0(GDALDataset.scala:282) at geotrellis.raster.gdal.GDALRasterSource.crs$lzycompute(GDALRasterSource.scala:84) at geotrellis.raster.gdal.GDALRasterSource.crs(GDALRasterSource.scala:84) at org.openeo.geotrellis.layers.ValueOffsetRasterSource.crs(ValueOffsetRasterSource.scala:93) at geotrellis.raster.RasterSource.reproject(RasterSource.scala:54) at org.openeo.geotrellis.layers.BandCompositeRasterSource.$anonfun$reprojectedSources$2(FileLayerProvider.scala:84) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.openeo.geotrellis.layers.BandCompositeRasterSource.reprojectedSources(FileLayerProvider.scala:84) at org.openeo.geotrellis.layers.BandCompositeRasterSource.read(FileLayerProvider.scala:129) at geotrellis.raster.RasterSource.read(RasterSource.scala:128) at org.openeo.geotrellis.layers.FileLayerProvider$.$anonfun$loadPartitionBySource$6(FileLayerProvider.scala:661) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator.toStream(Iterator.scala:1417) at scala.collection.Iterator.toStream$(Iterator.scala:1416) at scala.collection.AbstractIterator.toStream(Iterator.scala:1431) at scala.collection.TraversableOnce.toSeq(TraversableOnce.scala:354) at scala.collection.TraversableOnce.toSeq$(TraversableOnce.scala:354) at scala.collection.AbstractIterator.toSeq(Iterator.scala:1431) at org.openeo.geotrellis.layers.FileLayerProvider$.$anonfun$loadPartitionBySource$1(FileLayerProvider.scala:661) ... 14 more https://github.com/eu-cdse/openeo-cdse-infra/issues/196 * make GDALRasterSource fail with an error https://github.com/eu-cdse/openeo-cdse-infra/issues/196 * add test https://github.com/eu-cdse/openeo-cdse-infra/issues/196 * support soft errors https://github.com/eu-cdse/openeo-cdse-infra/issues/196 * restore number-of-attempts and disable test * make attempts argument explicit https://github.com/eu-cdse/openeo-cdse-infra/issues/196 * cleanup https://github.com/eu-cdse/openeo-cdse-infra/issues/196 * cleanup https://github.com/eu-cdse/openeo-cdse-infra/issues/196 * cleanup https://github.com/eu-cdse/openeo-cdse-infra/issues/196 --- geotrellis-common/pom.xml | 4 +- .../org/openeo/geotrelliscommon/package.scala | 6 +- .../openeo/geotrelliscommon/PackageTest.scala | 38 ++++++++ .../geotrellis/file/PyramidFactory.scala | 7 +- .../geotrellis/layers/FileLayerProvider.scala | 93 ++++++++++++------- .../layers/FileLayerProviderTest.scala | 16 +++- 6 files changed, 124 insertions(+), 40 deletions(-) create mode 100644 geotrellis-common/src/test/scala/org/openeo/geotrelliscommon/PackageTest.scala diff --git a/geotrellis-common/pom.xml b/geotrellis-common/pom.xml index a3d7e81d3..66acd81db 100644 --- a/geotrellis-common/pom.xml +++ b/geotrellis-common/pom.xml @@ -55,13 +55,13 @@ org.junit.jupiter junit-jupiter-api - 5.3.2 + 5.10.3 test org.junit.vintage junit-vintage-engine - 5.3.2 + 5.10.3 test diff --git a/geotrellis-common/src/main/scala/org/openeo/geotrelliscommon/package.scala b/geotrellis-common/src/main/scala/org/openeo/geotrelliscommon/package.scala index de10848d4..4d00127c0 100644 --- a/geotrellis-common/src/main/scala/org/openeo/geotrelliscommon/package.scala +++ b/geotrellis-common/src/main/scala/org/openeo/geotrelliscommon/package.scala @@ -231,16 +231,16 @@ package object geotrelliscommon { import java.util.concurrent.TimeUnit - def retryForever[R](delay: Duration, retries: Int = 20, onAttemptFailed: Exception => Unit = _ => ())(f: => R): R = { + def retryForever[R](delay: Duration, attempts: Int = 20, onAttemptFailed: Exception => Unit = _ => ())(f: => R): R = { var lastException: Exception = null - var countDown = retries + var countDown = attempts while (countDown>0) { try return f catch { case e: Exception => onAttemptFailed(e) lastException = e - TimeUnit.SECONDS.sleep(delay.getSeconds) + if (countDown > 1) TimeUnit.SECONDS.sleep(delay.getSeconds) } countDown = countDown - 1 } diff --git a/geotrellis-common/src/test/scala/org/openeo/geotrelliscommon/PackageTest.scala b/geotrellis-common/src/test/scala/org/openeo/geotrelliscommon/PackageTest.scala new file mode 100644 index 000000000..806731885 --- /dev/null +++ b/geotrellis-common/src/test/scala/org/openeo/geotrelliscommon/PackageTest.scala @@ -0,0 +1,38 @@ +package org.openeo.geotrelliscommon + +import org.junit.jupiter.api.Assertions.{assertEquals, assertThrowsExactly, fail} +import org.junit.jupiter.api.{Test, Timeout} + +import java.time.Duration + +class PackageTest { + class FailedAttempt extends Exception + + @Test + def retryForeverNumberOfAttempts(): Unit = { + var attempts = 0 + + try { + retryForever(delay = Duration.ZERO, attempts = 3, onAttemptFailed = _ => attempts += 1) { + println("attempting...") + throw new FailedAttempt + } + + fail("should have thrown a FailedAttempt") + } catch { + case _: FailedAttempt => + } + + // count the number of failures to get the number of attempts + assertEquals(3, attempts) + } + + @Test + @Timeout(5) // less than RetryForever's delay below + def retryForeverNoDelayAfterFinalFailure(): Unit = + assertThrowsExactly(classOf[FailedAttempt], () => + retryForever(delay = Duration.ofSeconds(60), attempts = 1) { + println("attempting...") + throw new FailedAttempt + }) +} diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/file/PyramidFactory.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/file/PyramidFactory.scala index 7182a04fb..34d4778d9 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/file/PyramidFactory.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/file/PyramidFactory.scala @@ -43,7 +43,9 @@ class PyramidFactory(openSearchClient: OpenSearchClient, openSearchLinkTitles: util.List[String], rootPath: String, maxSpatialResolution: CellSize, - experimental: Boolean = false) { + experimental: Boolean = false, + maxSoftErrorsRatio: Double = 0.0, + ) { require(openSearchLinkTitles.size() > 0) import PyramidFactory._ @@ -75,7 +77,8 @@ class PyramidFactory(openSearchClient: OpenSearchClient, metadataProperties, layoutScheme, correlationId = correlationId, - experimental = experimental + experimental = experimental, + maxSoftErrorsRatio = maxSoftErrorsRatio, ) def datacube_seq(polygons:ProjectedPolygons, from_date: String, to_date: String, diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/layers/FileLayerProvider.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/layers/FileLayerProvider.scala index 0c43ae216..8643538f5 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/layers/FileLayerProvider.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/layers/FileLayerProvider.scala @@ -20,16 +20,17 @@ import geotrellis.spark.partition.SpacePartitioner import geotrellis.vector import geotrellis.vector.Extent.toPolygon import geotrellis.vector._ +import net.jodah.failsafe.{Failsafe, RetryPolicy} +import net.jodah.failsafe.event.ExecutionAttemptedEvent import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.util.LongAccumulator import org.locationtech.jts.geom.Geometry import org.openeo.geotrellis.OpenEOProcessScriptBuilder.AnyProcess import org.openeo.geotrellis.file.{AbstractPyramidFactory, FixedFeaturesOpenSearchClient} -import org.openeo.geotrellis.tile_grid.TileGrid import org.openeo.geotrellis.{OpenEOProcessScriptBuilder, sortableSourceName} import org.openeo.geotrelliscommon.DatacubeSupport.prepareMask -import org.openeo.geotrelliscommon.{BatchJobMetadataTracker, ByKeyPartitioner, CloudFilterStrategy, ConfigurableSpatialPartitioner, DataCubeParameters, DatacubeSupport, L1CCloudFilterStrategy, MaskTileLoader, NoCloudFilterStrategy, ResampledTile, SCLConvolutionFilterStrategy, SpaceTimeByMonthPartitioner, SparseSpaceTimePartitioner, autoUtmEpsg, retryForever} +import org.openeo.geotrelliscommon.{BatchJobMetadataTracker, ByKeyPartitioner, CloudFilterStrategy, ConfigurableSpatialPartitioner, DataCubeParameters, DatacubeSupport, L1CCloudFilterStrategy, MaskTileLoader, NoCloudFilterStrategy, ResampledTile, SCLConvolutionFilterStrategy, SpaceTimeByMonthPartitioner, SparseSpaceTimePartitioner, autoUtmEpsg} import org.openeo.opensearch.OpenSearchClient import org.openeo.opensearch.OpenSearchResponses.{Feature, Link} import org.slf4j.LoggerFactory @@ -38,11 +39,11 @@ import java.io.{IOException, Serializable} import java.net.URI import java.nio.file.{Path, Paths} import java.time._ -import java.time.temporal.ChronoUnit +import java.time.temporal.ChronoUnit.{DAYS, SECONDS} +import java.util import java.util.concurrent.TimeUnit import scala.collection.GenSeq import scala.collection.JavaConverters._ -import scala.collection.parallel.immutable.{ParMap, ParSeq} import scala.reflect.ClassTag import scala.util.matching.Regex @@ -63,6 +64,19 @@ private class LayoutTileSourceFixed[K: SpatialComponent]( object BandCompositeRasterSource { private val logger = LoggerFactory.getLogger(classOf[BandCompositeRasterSource]) + + private def retryWithBackoff[R](maxAttempts: Int = 20, onAttemptFailed: Exception => Unit = _ => ())(f: => R): R = { + val retryPolicy = new RetryPolicy[R] + .handle(classOf[Exception]) // will otherwise retry Error + .withMaxAttempts(maxAttempts) + .withBackoff(1, 16, SECONDS) + .onFailedAttempt((attempt: ExecutionAttemptedEvent[R]) => + onAttemptFailed(attempt.getLastFailure.asInstanceOf[Exception])) + + Failsafe + .`with`(util.Collections.singletonList(retryPolicy)) + .get(f _) + } } @@ -72,21 +86,29 @@ class BandCompositeRasterSource(override val sources: NonEmptyList[RasterSource] override val crs: CRS, override val attributes: Map[String, String] = Map.empty, val predefinedExtent: Option[GridExtent[Long]] = None, - val parallelRead: Boolean = true + parallelRead: Boolean = true, + softErrors: Boolean = false, ) extends MosaicRasterSource { // TODO: don't inherit? import BandCompositeRasterSource._ - private val maxRetries = sys.env.getOrElse("GDALREAD_MAXRETRIES", "10").toInt + private val maxRetries = sys.env.getOrElse("GDALREAD_MAXRETRIES", "20").toInt protected def reprojectedSources: NonEmptyList[RasterSource] = sources map { _.reproject(crs) } protected def reprojectedSources(bands: Seq[Int]): Seq[RasterSource] = { - val selectedBands = bands.map(sources.toList) + def reprojectRasterSourceAttemptFailed(source: RasterSource)(e: Exception): Unit = + logger.warn(s"attempt to reproject ${source.name} to $crs failed", e) - selectedBands map { rs => - try retryForever(Duration.ofSeconds(10), maxRetries)(rs.reproject(crs)) + val selectedBands = bands.map(sources.toList) + selectedBands flatMap { rs => + try Some(retryWithBackoff(maxRetries, reprojectRasterSourceAttemptFailed(rs))(rs.reproject(crs))) catch { - case e: Exception => throw new IOException(s"Error while reading: ${rs.name.toString}", e) + // reading the CRS from a GDALRasterSource can fail + case e: Exception => + if (softErrors) { + logger.warn(s"ignoring soft error for ${rs.name}", e) + None + } else throw new IOException(s"Error while reading: ${rs.name}", e) } } } @@ -146,7 +168,11 @@ class BandCompositeRasterSource(override val sources: NonEmptyList[RasterSource] logger.debug(s"finished reading $bounds from ${source.name}") raster } catch { - case e: Exception => throw new IOException(s"Error while reading $bounds from ${source.name}", e) + case e: Exception => + if (softErrors) { + logger.warn(s"ignoring soft error for ${source.name}", e) + None + } else throw new IOException(s"Error while reading $bounds from ${source.name}", e) } } @@ -154,7 +180,7 @@ class BandCompositeRasterSource(override val sources: NonEmptyList[RasterSource] logger.warn(s"attempt to read $bounds from ${source.name} failed", e) val singleBandRasters = selectedSources - .map(rs => retryForever(Duration.ofSeconds(10), maxRetries, readBoundsAttemptFailed(rs)) { + .map(rs => retryWithBackoff(maxRetries, readBoundsAttemptFailed(rs)) { readBounds(rs) }) .collect { case Some(raster) => raster } @@ -187,14 +213,16 @@ class BandCompositeRasterSource(override val sources: NonEmptyList[RasterSource] method: ResampleMethod, strategy: OverviewStrategy ): RasterSource = new BandCompositeRasterSource( - reprojectedSources map { _.resample(resampleTarget, method, strategy) }, crs) + reprojectedSources map { _.resample(resampleTarget, method, strategy) }, crs, parallelRead = parallelRead, + softErrors = softErrors) override def convert(targetCellType: TargetCellType): RasterSource = - new BandCompositeRasterSource(reprojectedSources map { _.convert(targetCellType) }, crs, parallelRead = parallelRead) + new BandCompositeRasterSource(reprojectedSources map { _.convert(targetCellType) }, crs, + parallelRead = parallelRead, softErrors = softErrors) override def reprojection(targetCRS: CRS, resampleTarget: ResampleTarget, method: ResampleMethod, strategy: OverviewStrategy): RasterSource = new BandCompositeRasterSource(reprojectedSources map { _.reproject(targetCRS, resampleTarget, method, strategy) }, - crs, parallelRead = parallelRead) + crs, parallelRead = parallelRead, softErrors = softErrors) } // TODO: is this class necessary? Looks like a more general case of BandCompositeRasterSource so maybe the inheritance @@ -248,7 +276,6 @@ class MultibandCompositeRasterSource(val sourcesListWithBandIds: NonEmptyList[(R object FileLayerProvider { private val logger = LoggerFactory.getLogger(classOf[FileLayerProvider]) - private val maxRetries = sys.env.getOrElse("GDALREAD_MAXRETRIES", "10").toInt @@ -281,9 +308,9 @@ object FileLayerProvider { def apply(openSearch: OpenSearchClient, openSearchCollectionId: String, openSearchLinkTitles: NonEmptyList[String], rootPath: String, maxSpatialResolution: CellSize, pathDateExtractor: PathDateExtractor, attributeValues: Map[String, Any] = Map(), layoutScheme: LayoutScheme = ZoomedLayoutScheme(WebMercator, 256), bandIndices: Seq[Int] = Seq(), correlationId: String = "", experimental: Boolean = false, - retainNoDataTiles: Boolean = false): FileLayerProvider = new FileLayerProvider( + retainNoDataTiles: Boolean = false, maxSoftErrorsRatio: Double = 0.0): FileLayerProvider = new FileLayerProvider( openSearch, openSearchCollectionId, openSearchLinkTitles, rootPath, maxSpatialResolution, pathDateExtractor, - attributeValues, layoutScheme, bandIndices, correlationId, experimental, retainNoDataTiles, + attributeValues, layoutScheme, bandIndices, correlationId, experimental, retainNoDataTiles, maxSoftErrorsRatio, disambiguateConstructors = null ) @@ -303,7 +330,7 @@ object FileLayerProvider { def rasterSourceRDD(rasterSources: Seq[RasterSource], metadata: TileLayerMetadata[SpaceTimeKey], maxSpatialResolution: CellSize, collection: String)(implicit sc: SparkContext): RDD[LayoutTileSource[SpaceTimeKey]] = { val keyExtractor = new TemporalKeyExtractor { - def getMetadata(rs: RasterMetadata): ZonedDateTime = ZonedDateTime.parse(rs.attributes("date")).truncatedTo(ChronoUnit.DAYS) + def getMetadata(rs: RasterMetadata): ZonedDateTime = ZonedDateTime.parse(rs.attributes("date")).truncatedTo(DAYS) } val sources = sc.parallelize(rasterSources,rasterSources.size) @@ -502,13 +529,14 @@ object FileLayerProvider { private val PIXEL_COUNTER = "InputPixels" private def rasterRegionsToTilesLoadPerProductStrategy(rasterRegionRDD: RDD[(SpaceTimeKey, (RasterRegion, SourceName))], - metadata: TileLayerMetadata[SpaceTimeKey], - retainNoDataTiles: Boolean, - cloudFilterStrategy: CloudFilterStrategy = NoCloudFilterStrategy, - partitionerOption: Option[SpacePartitioner[SpaceTimeKey]] = None, - datacubeParams : Option[DataCubeParameters] = None, + metadata: TileLayerMetadata[SpaceTimeKey], + retainNoDataTiles: Boolean, + cloudFilterStrategy: CloudFilterStrategy = NoCloudFilterStrategy, + partitionerOption: Option[SpacePartitioner[SpaceTimeKey]] = None, + datacubeParams : Option[DataCubeParameters] = None, expectedBandCount : Int = -1, - sources: Seq[(RasterSource, Feature)] + sources: Seq[(RasterSource, Feature)], + softErrors: Boolean, ): RDD[(SpaceTimeKey, MultibandTile)] with Metadata[TileLayerMetadata[SpaceTimeKey]] = { if(cloudFilterStrategy!=NoCloudFilterStrategy) { @@ -538,7 +566,7 @@ object FileLayerProvider { case source1: BandCompositeRasterSource => //decompose into individual bands - source1.sources.map(s => (s.name, GridBoundsRasterRegion(new BandCompositeRasterSource(NonEmptyList.one(s),source1.crs,source1.attributes,source1.predefinedExtent, parallelRead = datacubeParams.forall(!_.loadPerProduct)), bounds))).zipWithIndex.map(t => (t._1._1, (Seq(t._2), key_region_sourcename._1, t._1._2))).toList.toSeq + source1.sources.map(s => (s.name, GridBoundsRasterRegion(new BandCompositeRasterSource(NonEmptyList.one(s),source1.crs,source1.attributes,source1.predefinedExtent, parallelRead = datacubeParams.forall(!_.loadPerProduct), softErrors = softErrors), bounds))).zipWithIndex.map(t => (t._1._1, (Seq(t._2), key_region_sourcename._1, t._1._2))).toList.toSeq case _ => Seq((source.name, (Seq(0), key_region_sourcename._1, key_region_sourcename._2._1))) @@ -663,7 +691,7 @@ object FileLayerProvider { val allRasters = try{ - bounds.toIterator.flatMap(b => retryForever(Duration.ofSeconds(10),maxRetries)(source.read(b).iterator)).map(_.mapTile(_.convert(cellType))).toSeq + bounds.toIterator.flatMap(b => source.read(b).iterator).map(_.mapTile(_.convert(cellType))).toSeq } catch { case e: Exception => throw new IOException(s"load_collection/load_stac: error while reading from: ${source.name.toString}. Detailed error: ${e.getMessage}", e) } @@ -871,7 +899,7 @@ object FileLayerProvider { class FileLayerProvider private(openSearch: OpenSearchClient, openSearchCollectionId: String, openSearchLinkTitles: NonEmptyList[String], rootPath: String, maxSpatialResolution: CellSize, pathDateExtractor: PathDateExtractor, attributeValues: Map[String, Any], layoutScheme: LayoutScheme, bandIndices: Seq[Int], correlationId: String, experimental: Boolean, - retainNoDataTiles: Boolean, + retainNoDataTiles: Boolean, maxSoftErrorsRatio: Double, disambiguateConstructors: Null) extends LayerProvider { // workaround for: constructors have the same type after erasure import DatacubeSupport._ @@ -882,7 +910,7 @@ class FileLayerProvider private(openSearch: OpenSearchClient, openSearchCollecti def this(openSearch: OpenSearchClient, openSearchCollectionId: String, openSearchLinkTitles: NonEmptyList[String], rootPath: String, maxSpatialResolution: CellSize, pathDateExtractor: PathDateExtractor, attributeValues: Map[String, Any] = Map(), layoutScheme: LayoutScheme = ZoomedLayoutScheme(WebMercator, 256), bandIds: Seq[Seq[Int]] = Seq(), correlationId: String = "", experimental: Boolean = false, - retainNoDataTiles: Boolean = false) = this(openSearch, openSearchCollectionId, + retainNoDataTiles: Boolean = false, maxSoftErrorsRatio: Double = 0.0) = this(openSearch, openSearchCollectionId, openSearchLinkTitles = NonEmptyList.fromListUnsafe(for { (title, bandIndices) <- openSearchLinkTitles.toList.zipAll(bandIds, thisElem = "", thatElem = Seq(0)) _ <- bandIndices @@ -890,7 +918,7 @@ class FileLayerProvider private(openSearch: OpenSearchClient, openSearchCollecti rootPath, maxSpatialResolution, pathDateExtractor, attributeValues, layoutScheme, bandIndices = bandIds.flatten, correlationId, experimental, - retainNoDataTiles, disambiguateConstructors = null) + retainNoDataTiles, maxSoftErrorsRatio, disambiguateConstructors = null) assert(bandIndices.isEmpty || bandIndices.size == openSearchLinkTitles.size) @@ -900,6 +928,7 @@ class FileLayerProvider private(openSearch: OpenSearchClient, openSearchCollecti private val _rootPath = if(rootPath != null) Paths.get(rootPath) else null private val fromLoadStac = openSearch.isInstanceOf[FixedFeaturesOpenSearchClient] + private val softErrors = maxSoftErrorsRatio > 0.0 private val openSearchLinkTitlesWithBandId: Seq[(String, Int)] = { if (bandIndices.nonEmpty) { @@ -1240,7 +1269,7 @@ class FileLayerProvider private(openSearch: OpenSearchClient, openSearchCollecti if(!datacubeParams.map(_.loadPerProduct).getOrElse(false) || theMaskStrategy != NoCloudFilterStrategy ){ rasterRegionsToTiles(regions, metadata, retainNoDataTiles, theMaskStrategy, partitioner, datacubeParams) }else{ - rasterRegionsToTilesLoadPerProductStrategy(regions, metadata, retainNoDataTiles, NoCloudFilterStrategy, partitioner, datacubeParams, openSearchLinkTitlesWithBandId.size,readKeysToRasterSourcesResult._4) + rasterRegionsToTilesLoadPerProductStrategy(regions, metadata, retainNoDataTiles, NoCloudFilterStrategy, partitioner, datacubeParams, openSearchLinkTitlesWithBandId.size,readKeysToRasterSourcesResult._4, softErrors) } logger.info(s"Created cube for ${openSearchCollectionId} with metadata ${cube.metadata} and partitioner ${cube.partitioner}") cube @@ -1511,7 +1540,7 @@ class FileLayerProvider private(openSearch: OpenSearchClient, openSearchCollecti return None } - Some((new BandCompositeRasterSource(sources.map { case (rasterSource, _) => rasterSource }, targetExtent.crs, attributes, predefinedExtent = predefinedExtent), feature)) + Some((new BandCompositeRasterSource(sources.map { case (rasterSource, _) => rasterSource }, targetExtent.crs, attributes, predefinedExtent = predefinedExtent, softErrors = softErrors), feature)) } else Some((new MultibandCompositeRasterSource(sources.map { case (rasterSource, bandIndex) => (rasterSource, Seq(bandIndex))}, targetExtent.crs, attributes), feature)) } } diff --git a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/FileLayerProviderTest.scala b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/FileLayerProviderTest.scala index 4590d27ea..f4ac18df8 100644 --- a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/FileLayerProviderTest.scala +++ b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/FileLayerProviderTest.scala @@ -3,6 +3,7 @@ package org.openeo.geotrellis.layers import cats.data.NonEmptyList import geotrellis.layer.{FloatingLayoutScheme, LayoutTileSource, SpaceTimeKey, SpatialKey, TileLayerMetadata} import geotrellis.proj4.{CRS, LatLng} +import geotrellis.raster.gdal.{GDALIOException, GDALRasterSource} import geotrellis.raster.io.geotiff.GeoTiff import geotrellis.raster.summary.polygonal.Summary import geotrellis.raster.summary.polygonal.visitors.MeanVisitor @@ -16,7 +17,7 @@ import geotrellis.vector._ import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.junit.jupiter.api.Assertions.{assertEquals, assertNotSame, assertSame, assertTrue} -import org.junit.jupiter.api.{AfterAll, BeforeAll, Test, Timeout} +import org.junit.jupiter.api.{AfterAll, BeforeAll, Disabled, Test, Timeout} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource import org.openeo.geotrellis.TestImplicits._ @@ -1303,4 +1304,17 @@ class FileLayerProviderTest extends RasterMatchers{ assertEquals(Some((Link(URI.create("NETCDF:http://openeo.vito.be/job-xxx/results/result.nc:dry_matter_productivity"),Some("DMP")),0)),httpResult) } + + @Disabled("temporarily disabled: lowering geotrellis.raster.gdal.number-of-attempts does not work") + @Test + def readGDALRasterSourceFromCorruptTileThrows(): Unit = { + val rs = GDALRasterSource("https://artifactory.vgt.vito.be/artifactory/testdata-public/T29UMV_20180327T114351_B04_10m.jp2") + + try { + rs.read() + fail(s"should have thrown a GDALIOException (geotrellis.raster.gdal.numberOfAttempts is ${geotrellis.raster.gdal.numberOfAttempts})") + } catch { + case _: GDALIOException => // OK + } + } }