diff --git a/CMakeLists.txt b/CMakeLists.txt index a62fe52..f81dedf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -42,6 +42,7 @@ option(BUILD_CLI "Build the exactextract cli binary" ON) #requires gdal, cli11 option(BUILD_TEST "Build the exactextract tests" ON) #requires catch option(BUILD_PYTHON "Build the exactextract Python bindings" ON) # requires pybind11 option(BUILD_DOC "Build documentation" ON) #requires doxygen +option(BUILD_PARALLEL_TBB "Build with parallel support" OFF) #requires libtbb if (${CMAKE_SYSTEM_NAME} MATCHES "Darwin") # workaround for MacOS 10.13 and below which does not fully support std::variant @@ -220,6 +221,14 @@ set(PROJECT_SOURCES ${SRC_GENERATED}/version.h ) +if (BUILD_PARALLEL_TBB) + set(PROJECT_SOURCES + ${PROJECT_SOURCES} + src/raster_parallel_processor.cpp + src/raster_parallel_processor.h + ) +endif() # BUILD_PARALLEL_TBB + add_library(${LIB_NAME} ${PROJECT_SOURCES}) @@ -277,8 +286,17 @@ if(BUILD_DOC) else (DOXYGEN_FOUND) message("Doxygen need to be installed to generate the doxygen documentation") - - - endif (DOXYGEN_FOUND) endif() #BUILD_DOC + +if (BUILD_PARALLEL_TBB) + find_package(TBB REQUIRED) + + if (TBB_FOUND) + message(STATUS "TBB version: " ${TBB_VERSION}) + target_link_libraries(${LIB_NAME} PUBLIC TBB::tbb) + add_definitions(-DEE_PARALLEL) + else() + message(FATAL_ERROR "TBB is required.") + endif() +endif() # BUILD_PARALLEL_TBB diff --git a/src/exactextract.cpp b/src/exactextract.cpp index 81b0ec1..cb4646d 100644 --- a/src/exactextract.cpp +++ b/src/exactextract.cpp @@ -32,6 +32,10 @@ #include "utils_cli.h" #include "version.h" +#ifdef EE_PARALLEL +#include "raster_parallel_processor.h" +#endif + using exactextract::GDALDatasetWrapper; using exactextract::GDALRasterWrapper; using exactextract::Operation; @@ -54,6 +58,7 @@ main(int argc, char** argv) std::vector weight_descriptors; std::vector include_cols; size_t max_cells_in_memory = 30; + size_t threads = 4; bool progress = false; bool nested_output = false; @@ -68,6 +73,7 @@ main(int argc, char** argv) app.add_option("-s,--stat", stats, "statistics")->required(false)->expected(-1); app.add_option("--max-cells", max_cells_in_memory, "maximum number of raster cells to read in memory at once, in millions")->required(false)->default_val("30"); app.add_option("--strategy", strategy, "processing strategy")->required(false)->default_val("feature-sequential"); + app.add_option("--threads", threads, "maximum number of parallel items that can processed at one time, default 4")->required(false)->default_val(4); app.add_option("--id-type", dst_id_type, "override type of id field in output")->required(false); app.add_option("--id-name", dst_id_name, "override name of id field in output")->required(false); app.add_flag("--nested-output", nested_output, "nested output"); @@ -133,6 +139,12 @@ main(int argc, char** argv) proc = std::make_unique(shp, *writer); } else if (strategy == "raster-sequential") { proc = std::make_unique(shp, *writer); + } else if (strategy == "raster-parallel") { +#ifdef EE_PARALLEL + proc = std::make_unique(shp, *writer, threads); +#else + throw std::runtime_error("Parallel processor not supported."); +#endif } else { throw std::runtime_error("Unknown processing strategy: " + strategy); } diff --git a/src/raster_parallel_processor.cpp b/src/raster_parallel_processor.cpp new file mode 100644 index 0000000..b30d187 --- /dev/null +++ b/src/raster_parallel_processor.cpp @@ -0,0 +1,220 @@ + +// Copyright (c) 2024 ISciences, LLC. +// All rights reserved. +// +// This software is licensed under the Apache License, Version 2.0 (the "License"). +// You may not use this file except in compliance with the License. You may +// obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0. +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "raster_parallel_processor.h" +#include "operation.h" +#include "raster_cell_intersection.h" +#include "raster_source.h" + +#include +#include +#include +#include +#include +#include + +#include + +namespace { + +void +errorHandlerParallel(const char* fmt, ...) +{ + + char buf[BUFSIZ], *p; + va_list ap; + va_start(ap, fmt); + vsprintf(buf, fmt, ap); + va_end(ap); + p = buf + strlen(buf) - 1; + if (strlen(buf) > 0 && *p == '\n') + *p = '\0'; + + std::cout << "Error: " << buf << std::endl; +} + +} + +namespace exactextract { + +typedef std::vector FeatureHits; +typedef std::shared_ptr StatsRegistryPtr; + +struct ZonalStatsCalc +{ + RasterSource* source = nullptr; + Grid subgrid = Grid::make_empty(); + FeatureHits hits = std::vector(); + std::unique_ptr values = nullptr; +}; + +void +RasterParallelProcessor::read_features() +{ + while (m_shp.next()) { + const Feature& feature = m_shp.feature(); + MapFeature mf(feature); + m_features.push_back(std::move(mf)); + } +} + +void +RasterParallelProcessor::populate_index() +{ + assert(m_feature_tree != nullptr); + + for (const auto& f : m_features) { + // TODO compute envelope of dataset, and crop raster by that extent before processing? + GEOSSTRtree_insert_r(m_geos_context, m_feature_tree.get(), f.geometry(), (void*)&f); + } +} + +void +RasterParallelProcessor::process() +{ + read_features(); + populate_index(); + + std::set raster_sources; + for (const auto& op : m_operations) { + if (op->weighted()) { + throw std::runtime_error("Weighted operations not yet supported in raster-parallel strategy."); + } else if (op->requires_variance()) { + throw std::runtime_error("Variance operations not yet supported in raster-parallel strategy."); + } + + raster_sources.insert(op->values); + m_reg.prepare(*op); + } + + std::vector raster_grids; + for (auto& source : raster_sources) { + auto subgrids = subdivide(source->grid(), m_max_cells_in_memory); + for (auto& subgrid : subgrids) { + raster_grids.push_back({ .source = source, + .subgrid = subgrid }); + } + } + const auto total_subgrids = raster_grids.size(); + size_t processed_subgrids = 0; + + oneapi::tbb::enumerable_thread_specific geos_context([]() -> GEOSContextHandle_t { + return initGEOS_r(errorHandlerParallel, errorHandlerParallel); + }); + + // clang-format off + oneapi::tbb::parallel_pipeline(m_threads, + oneapi::tbb::make_filter(oneapi::tbb::filter_mode::serial_in_order, + [&raster_grids] (oneapi::tbb::flow_control& fc) -> ZonalStatsCalc { + //TODO: split subgridding by raster to optimise IO based on raster block size per input? + //logic below currently assumes that all raster inputs have the same total geospatial coverage + if (raster_grids.empty()) { + fc.stop(); + return {nullptr, Grid::make_empty()}; + } + + auto sg = std::move(raster_grids.back()); + raster_grids.pop_back(); + return sg; + }) & + oneapi::tbb::make_filter(oneapi::tbb::filter_mode::parallel, + [&geos_context, this] (ZonalStatsCalc context) -> ZonalStatsCalc { + std::vector hits; + auto query_rect = geos_make_box_polygon(geos_context.local(), context.subgrid.extent()); + + GEOSSTRtree_query_r( + geos_context.local(), m_feature_tree.get(), query_rect.get(), [](void* hit, void* userdata) { + auto feature = static_cast(hit); + auto vec = static_cast*>(userdata); + + vec->push_back(feature); + }, + &hits); + + if (hits.empty()) { + return context; + } + + context.hits = hits; + return context; + }) & + oneapi::tbb::make_filter(oneapi::tbb::filter_mode::serial_out_of_order, + [raster_sources, this] (ZonalStatsCalc context) -> ZonalStatsCalc { + if (context.subgrid.empty() || context.hits.empty()) { + return context; + } + + auto values = std::make_unique(context.source->read_box(context.subgrid.extent().intersection(context.source->grid().extent()))); + context.values = std::move(values); + return context; + }) & + oneapi::tbb::make_filter, StatsRegistryPtr>>(oneapi::tbb::filter_mode::parallel, + [&geos_context, this] (ZonalStatsCalc context) -> std::tuple, StatsRegistryPtr> { + if (context.subgrid.empty() || context.hits.empty() || !context.source || !context.values) { + return {context.subgrid, nullptr}; + } + + auto block_registry = std::make_shared(); + std::vector trimmed_ops; + + for (const auto& op : m_operations) { + if (op->values == context.source) { + block_registry->prepare(*op); + trimmed_ops.push_back(op.get()); + } + } + + for (const Feature* f : context.hits) { + auto coverage = std::make_unique>(raster_cell_intersection(context.subgrid, geos_context.local(), f->geometry())); + std::set processed; + + for (const auto op : trimmed_ops) { + //TODO: push this earlier and remove duplicate operations entirely + if (processed.find(op->key()) != processed.end()) { + continue; + } else { + processed.insert(op->key()); + } + + block_registry->update_stats(*f, *op, *coverage, *context.values); + } + } + + return {context.subgrid, block_registry}; + }) & + oneapi::tbb::make_filter, StatsRegistryPtr>, void>(oneapi::tbb::filter_mode::serial_out_of_order, + [&processed_subgrids, total_subgrids, this] (std::tuple, StatsRegistryPtr> registry) { + auto& [registryGrid, regptr] = registry; + if (regptr) { + this->m_reg.merge(*regptr); + } + + if (m_show_progress && !registryGrid.empty()) { + processed_subgrids++; + + std::stringstream ss; + ss << registryGrid.extent(); + + progress(static_cast(processed_subgrids) / static_cast(total_subgrids), ss.str()); + } + }) + ); + // clang-format on + + for (const auto& f_in : m_features) { + write_result(f_in); + } +} + +} diff --git a/src/raster_parallel_processor.h b/src/raster_parallel_processor.h new file mode 100644 index 0000000..c364391 --- /dev/null +++ b/src/raster_parallel_processor.h @@ -0,0 +1,45 @@ + +// Copyright (c) 2024 ISciences, LLC. +// All rights reserved. +// +// This software is licensed under the Apache License, Version 2.0 (the "License"). +// You may not use this file except in compliance with the License. You may +// obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0. +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "geos_utils.h" +#include "map_feature.h" +#include "processor.h" + +namespace exactextract { + +class RasterParallelProcessor : public Processor +{ + public: + using Processor::Processor; + + RasterParallelProcessor(FeatureSource& ds, OutputWriter& out, size_t threads) + : Processor(ds, out) + , m_threads(threads) + { + } + + void read_features(); + void populate_index(); + + void process() override; + + private: + size_t m_threads; + std::vector m_features; + tree_ptr_r m_feature_tree{ geos_ptr(m_geos_context, GEOSSTRtree_create_r(m_geos_context, 10)) }; +}; + +} diff --git a/src/raster_stats.h b/src/raster_stats.h index b10d0a4..4095e43 100644 --- a/src/raster_stats.h +++ b/src/raster_stats.h @@ -82,6 +82,48 @@ class RasterStats { } + RasterStats(RasterStats&& other) = default; + RasterStats& operator=(RasterStats&& other) = default; + + void combine(const RasterStats& source) + { + m_min = std::min(m_min, source.m_min); + m_max = std::max(m_max, source.m_max); + + if (m_options.store_xy) { + double minX = std::min(m_min_xy.first, source.m_min_xy.first); + double minY = std::min(m_min_xy.second, source.m_min_xy.second); + m_min_xy = { minX, minY }; + + double maxX = std::min(m_max_xy.first, source.m_max_xy.first); + double maxY = std::min(m_max_xy.second, source.m_max_xy.second); + m_max_xy = { maxX, maxY }; + } + + m_sum_ciwi += source.m_sum_ciwi; + m_sum_ci += source.m_sum_ci; + m_sum_xici += source.m_sum_xici; + m_sum_xiciwi += source.m_sum_xiciwi; + + for (auto& v : source.m_freq) { + auto it = m_freq.find(v.first); + if (it != m_freq.end()) { + it->second.m_sum_ci += v.second.m_sum_ci; + it->second.m_sum_ciwi += v.second.m_sum_ciwi; + } else { + m_freq.insert(v); + } + } + + m_cell_cov.insert(m_cell_cov.end(), source.m_cell_cov.begin(), source.m_cell_cov.end()); + m_cell_values.insert(m_cell_values.end(), source.m_cell_values.begin(), source.m_cell_values.end()); + m_cell_weights.insert(m_cell_weights.end(), source.m_cell_weights.begin(), source.m_cell_weights.end()); + m_cell_x.insert(m_cell_x.end(), source.m_cell_x.begin(), source.m_cell_x.end()); + m_cell_y.insert(m_cell_y.end(), source.m_cell_y.begin(), source.m_cell_y.end()); + m_cell_values_defined.insert(m_cell_values_defined.end(), source.m_cell_values_defined.begin(), source.m_cell_values_defined.end()); + m_cell_weights_defined.insert(m_cell_weights_defined.end(), source.m_cell_weights_defined.begin(), source.m_cell_weights_defined.end()); + } + static bool get_or_default(const AbstractRaster& r, std::size_t i, std::size_t j, T& val, const std::optional& default_value) { if (r.get(i, j, val)) { diff --git a/src/stats_registry.cpp b/src/stats_registry.cpp index 425f27c..646f9f5 100644 --- a/src/stats_registry.cpp +++ b/src/stats_registry.cpp @@ -62,6 +62,34 @@ StatsRegistry::update_stats(const Feature& f, const Operation& op, const Raster< weights); } +void +StatsRegistry::merge(StatsRegistry& source) +{ + for (auto& feature : source.m_feature_stats) { + auto& my_feature = this->m_feature_stats[feature.first]; + + for (auto& op : feature.second) { + const auto& feature_key = op.first; + auto& src_rsv = op.second; + + auto my_feature_rsv = my_feature.find(feature_key); + if (my_feature_rsv == my_feature.end()) { + std::visit([&my_feature, feature_key](auto& src) { + my_feature.emplace(feature_key, std::move(src)); + }, + src_rsv); + } else { + std::visit([&my_feature_rsv](auto& src) { + using value_type = typename std::remove_reference_t::ValueType; + RasterStats& dest = std::get>(my_feature_rsv->second); + dest.combine(src); + }, + src_rsv); + } + } + } +} + StatsRegistry::RasterStatsVariant& StatsRegistry::stats(const Feature& feature, const Operation& op) { diff --git a/src/stats_registry.h b/src/stats_registry.h index 24d30d4..dcac5be 100644 --- a/src/stats_registry.h +++ b/src/stats_registry.h @@ -70,6 +70,8 @@ class StatsRegistry void update_stats(const Feature& f, const Operation& op, const Raster& coverage, const RasterVariant& values, const RasterVariant& weights); + void merge(StatsRegistry& source); + private: template static bool requires_stored_values(const T& ops) diff --git a/test/test_stats.cpp b/test/test_stats.cpp index 07b956a..b5fb36b 100644 --- a/test/test_stats.cpp +++ b/test/test_stats.cpp @@ -575,4 +575,55 @@ TEST_CASE("min_coverage_frac is respected", "[stats]") CHECK(stats.mode().value() == 2); } +TEST_CASE("Stats are combined") +{ + Box extent{ 0.0, 0.0, 1.0, 1.0 }; + Grid ex{ extent, 0.5, 0.5 }; + GEOSContextHandle_t context = init_geos(); + auto g = GEOSGeom_read_r(context, "POLYGON ((0.0 0.0, 1.0 0.0, 1.0 1.0, 0.0 1.0, 0.0 0.0))"); + Raster areas = raster_cell_intersection(ex, context, g.get()); + Raster values{ Matrix{ { { 1, 1 }, { 1, 1 } } }, extent }; + + auto opts = RasterStatsOptions{ + .store_histogram = true, + .store_values = true, + .store_weights = true, + .store_coverage_fraction = true, + .store_xy = true, + .include_nodata = true, + }; + RasterStats a(opts); + a.process(areas, values, values); + + RasterStats b(opts); + b.combine(a); + + CHECK(b.mean() == 1); + CHECK(b.weighted_mean() == 1); + CHECK(b.weighted_fraction() == 1); + CHECK(b.mode() == 1); + CHECK(b.min() == 1); + CHECK(b.min_xy() == std::make_pair(0.0, 0.0)); + CHECK(b.max() == 1); + CHECK(b.max_xy() == std::make_pair(0.0, 0.0)); + CHECK(b.quantile(0) == 1); + CHECK(b.sum() == 4.0); + CHECK(b.weighted_sum() == 4.0); + CHECK(b.count() == 4.0); + CHECK(b.count(1) == 4.0); + CHECK(b.frac(1) == 1.0); + CHECK(b.weighted_frac(1) == 1.0); + CHECK(b.weighted_count() == 4.0); + CHECK(b.weighted_count(1) == 4.0); + CHECK(b.minority() == 1); + CHECK(b.variety() == 1); + CHECK(b.values() == std::vector{ 1, 1, 1, 1 }); + CHECK(b.values_defined() == std::vector{ true, true, true, true }); + CHECK(b.coverage_fractions() == std::vector{ 1.0f, 1.0f, 1.0f, 1.0f }); + CHECK(b.weights() == std::vector{ 1.0, 1.0, 1.0, 1.0 }); + CHECK(b.weights_defined() == std::vector{ true, true, true, true }); + CHECK(b.center_x() == std::vector{ 0.25, 0.75, 0.25, 0.75 }); + CHECK(b.center_y() == std::vector{ 0.75, 0.75, 0.25, 0.25 }); +} + }