Skip to content

Commit

Permalink
Merge pull request #304 from Open-EO/723_agg_spatial_bands
Browse files Browse the repository at this point in the history
723 agg spatial bands
  • Loading branch information
jdries authored Jun 12, 2024
2 parents 05f0fab + c10ee1b commit 731ae89
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,16 @@ class OpenEOProcesses extends Serializable {
return None
}
}

def maybeBandLabels[K](cube: RDD[(K, MultibandTile)]): Option[Seq[String]] = {
if (cube.isInstanceOf[OpenEORasterCube[K]] && cube.asInstanceOf[OpenEORasterCube[K]].openEOMetadata.bandCount > 0) {
val labels = cube.asInstanceOf[OpenEORasterCube[K]].openEOMetadata.bands
return Some(labels)
}else{
return None
}
}

/**
* Get band count used in RDD (each tile in RDD should have same band count)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import org.apache.spark.rdd._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Column, Row, SaveMode, SparkSession}
import org.openeo.geotrellis.SpatialToSpacetimeJoinRdd
import org.openeo.geotrellis.{OpenEOProcesses, SpatialToSpacetimeJoinRdd}
import org.openeo.geotrellis.aggregate_polygon.intern.PixelRateValidator.exceedsTreshold
import org.openeo.geotrellis.aggregate_polygon.intern._
import org.openeo.geotrellis.layers.LayerProvider
Expand Down Expand Up @@ -153,7 +153,8 @@ class AggregatePolygonProcess() {
}
}
val cellType = datacube.metadata.cellType
aggregateByDateAndPolygon(pixelRDD, scriptBuilder, bandCount, cellType, outputPath)
val maybeLabels = new OpenEOProcesses().maybeBandLabels(datacube)
aggregateByDateAndPolygon(pixelRDD, scriptBuilder, bandCount, cellType, outputPath,maybeLabels)
}

def aggregateSpatialForGeometryWithSpatialCube(scriptBuilder: SparkAggregateScriptBuilder,
Expand Down Expand Up @@ -305,22 +306,23 @@ class AggregatePolygonProcess() {
}
}
val cellType = datacube.metadata.cellType
aggregateByDateAndPolygon(pixelRDD, scriptBuilder, bandCount, cellType, outputPath)
val maybeLabels = new OpenEOProcesses().maybeBandLabels(datacube)
aggregateByDateAndPolygon(pixelRDD, scriptBuilder, bandCount, cellType, outputPath,maybeLabels)

}finally{
byIndexMask.unpersist()
}
}

private def aggregateByDateAndPolygon(pixelRDD: RDD[Row], scriptBuilder: SparkAggregateScriptBuilder, bandCount: Int, cellType: CellType, outputPath: String) = {
private def aggregateByDateAndPolygon(pixelRDD: RDD[Row], scriptBuilder: SparkAggregateScriptBuilder, bandCount: Int, cellType: CellType, outputPath: String, maybeBandLabels: Option[Seq[String]] = Option.empty[Seq[String]]) = {
val session = SparkSession.builder().config(pixelRDD.sparkContext.getConf).getOrCreate()
val dataType =
if (cellType.isFloatingPoint) {
DoubleType
} else {
IntegerType
}
val bandColumns = (0 until bandCount).map(b => f"band_$b")
val bandColumns = maybeBandLabels.getOrElse (0 until bandCount).map(b => f"band_$b")

val bandStructs = bandColumns.map(StructField(_, dataType, true))

Expand Down

0 comments on commit 731ae89

Please sign in to comment.