Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-threaded raster strategy #161

Open
wants to merge 27 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
936c126
Add libtbb as a dependency, not properly setup to be optional here.
shortcutman Nov 2, 2024
a8afe44
Running sequential logic through TBB pipeline with no parallelisation.
shortcutman Nov 2, 2024
77dce98
Refactored order of operations to be ready for more pipelining.
shortcutman Nov 3, 2024
e2a5d72
Feed subgrids into tbb pipeline.
shortcutman Nov 4, 2024
68f90a9
Isolate work to take subgrid and find feature hits.
shortcutman Nov 4, 2024
63d88e7
Make subgrid to feature intersection parallel.
shortcutman Nov 4, 2024
67eb00e
Addition of specific serial queue for raster read.
shortcutman Nov 4, 2024
364f079
Handle tasks where it has been determined there is no future work.
shortcutman Nov 4, 2024
bae0b03
Reaching end of input stream should return with dummy data that will …
shortcutman Nov 4, 2024
4b6f849
Process zonal stats in parallel into separate StatRegistry objects.
shortcutman Nov 4, 2024
fc09454
Stubbed merging of stats registries for simple ops for parallel execu…
shortcutman Nov 4, 2024
af49ebd
Added step to merge StatsRegistry and RasterStats. Some floating poin…
shortcutman Nov 4, 2024
cee81ef
Resolve race condition errors by using thread local GEOS contexts as …
shortcutman Nov 17, 2024
cb51593
Clean up checks at start of pipeline filters.
shortcutman Nov 17, 2024
d7cd157
Command line argument to configure parallel tokens.
shortcutman Nov 17, 2024
d90d2b1
Combine stats for all but variance.
shortcutman Nov 17, 2024
f86d643
Parallel processor OFF by default and made required dependencies opti…
shortcutman Nov 23, 2024
46c5439
Renamed parallel_raste_processor.* files for consistency.
shortcutman Nov 23, 2024
b1f9c4f
Restore progress functionality matching raster sequential processor o…
shortcutman Nov 23, 2024
68889fd
Simple clang-format fixes
shortcutman Nov 23, 2024
b010b3d
Clang format changes
shortcutman Dec 2, 2024
9545436
Added basic test for combining RasterStats
shortcutman Jan 10, 2025
391177a
Refactor to support multiple operations.
shortcutman Jan 12, 2025
40f8512
Fixes for clang-format
shortcutman Jan 12, 2025
2c5ff4f
Error out on unsupported weighted operations.
shortcutman Jan 31, 2025
ecbcbb0
Changed tokens to threads for easier adoption by users.
shortcutman Jan 31, 2025
3b2fc80
Error on unsupported variance operations in raster-parallel strategy.
shortcutman Jan 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ option(BUILD_CLI "Build the exactextract cli binary" ON) #requires gdal, cli11
option(BUILD_TEST "Build the exactextract tests" ON) #requires catch
option(BUILD_PYTHON "Build the exactextract Python bindings" ON) # requires pybind11
option(BUILD_DOC "Build documentation" ON) #requires doxygen
option(BUILD_PARALLEL_TBB "Build with parallel support" OFF) #requires libtbb

if (${CMAKE_SYSTEM_NAME} MATCHES "Darwin")
# workaround for MacOS 10.13 and below which does not fully support std::variant
Expand Down Expand Up @@ -220,6 +221,14 @@ set(PROJECT_SOURCES
${SRC_GENERATED}/version.h
)

if (BUILD_PARALLEL_TBB)
set(PROJECT_SOURCES
${PROJECT_SOURCES}
src/raster_parallel_processor.cpp
src/raster_parallel_processor.h
)
endif() # BUILD_PARALLEL_TBB

add_library(${LIB_NAME} ${PROJECT_SOURCES})


Expand Down Expand Up @@ -277,8 +286,17 @@ if(BUILD_DOC)

else (DOXYGEN_FOUND)
message("Doxygen need to be installed to generate the doxygen documentation")



endif (DOXYGEN_FOUND)
endif() #BUILD_DOC

if (BUILD_PARALLEL_TBB)
find_package(TBB REQUIRED)

if (TBB_FOUND)
message(STATUS "TBB version: " ${TBB_VERSION})
target_link_libraries(${LIB_NAME} PUBLIC TBB::tbb)
add_definitions(-DEE_PARALLEL)
else()
message(FATAL_ERROR "TBB is required.")
endif()
endif() # BUILD_PARALLEL_TBB
12 changes: 12 additions & 0 deletions src/exactextract.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
#include "utils_cli.h"
#include "version.h"

#ifdef EE_PARALLEL
#include "raster_parallel_processor.h"
#endif

using exactextract::GDALDatasetWrapper;
using exactextract::GDALRasterWrapper;
using exactextract::Operation;
Expand All @@ -54,6 +58,7 @@ main(int argc, char** argv)
std::vector<std::string> weight_descriptors;
std::vector<std::string> include_cols;
size_t max_cells_in_memory = 30;
size_t threads = 4;

bool progress = false;
bool nested_output = false;
Expand All @@ -68,6 +73,7 @@ main(int argc, char** argv)
app.add_option("-s,--stat", stats, "statistics")->required(false)->expected(-1);
app.add_option("--max-cells", max_cells_in_memory, "maximum number of raster cells to read in memory at once, in millions")->required(false)->default_val("30");
app.add_option("--strategy", strategy, "processing strategy")->required(false)->default_val("feature-sequential");
app.add_option("--threads", threads, "maximum number of parallel items that can processed at one time, default 4")->required(false)->default_val(4);
app.add_option("--id-type", dst_id_type, "override type of id field in output")->required(false);
app.add_option("--id-name", dst_id_name, "override name of id field in output")->required(false);
app.add_flag("--nested-output", nested_output, "nested output");
Expand Down Expand Up @@ -133,6 +139,12 @@ main(int argc, char** argv)
proc = std::make_unique<exactextract::FeatureSequentialProcessor>(shp, *writer);
} else if (strategy == "raster-sequential") {
proc = std::make_unique<exactextract::RasterSequentialProcessor>(shp, *writer);
} else if (strategy == "raster-parallel") {
#ifdef EE_PARALLEL
proc = std::make_unique<exactextract::RasterParallelProcessor>(shp, *writer, threads);
#else
throw std::runtime_error("Parallel processor not supported.");
#endif
} else {
throw std::runtime_error("Unknown processing strategy: " + strategy);
}
Expand Down
220 changes: 220 additions & 0 deletions src/raster_parallel_processor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@

// Copyright (c) 2024 ISciences, LLC.
// All rights reserved.
//
// This software is licensed under the Apache License, Version 2.0 (the "License").
// You may not use this file except in compliance with the License. You may
// obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "raster_parallel_processor.h"
#include "operation.h"
#include "raster_cell_intersection.h"
#include "raster_source.h"

#include <cassert>
#include <map>
#include <memory>
#include <ranges>
#include <set>
#include <sstream>

#include <oneapi/tbb.h>

namespace {

void
errorHandlerParallel(const char* fmt, ...)
{

char buf[BUFSIZ], *p;
va_list ap;
va_start(ap, fmt);
vsprintf(buf, fmt, ap);
va_end(ap);
p = buf + strlen(buf) - 1;
if (strlen(buf) > 0 && *p == '\n')
*p = '\0';

std::cout << "Error: " << buf << std::endl;
}

}

namespace exactextract {

typedef std::vector<const Feature*> FeatureHits;
typedef std::shared_ptr<StatsRegistry> StatsRegistryPtr;

struct ZonalStatsCalc
{
RasterSource* source = nullptr;
Grid<bounded_extent> subgrid = Grid<bounded_extent>::make_empty();
FeatureHits hits = std::vector<const Feature*>();
std::unique_ptr<RasterVariant> values = nullptr;
};

void
RasterParallelProcessor::read_features()
{
while (m_shp.next()) {
const Feature& feature = m_shp.feature();
MapFeature mf(feature);
m_features.push_back(std::move(mf));
}
}

void
RasterParallelProcessor::populate_index()
{
assert(m_feature_tree != nullptr);

for (const auto& f : m_features) {
// TODO compute envelope of dataset, and crop raster by that extent before processing?
GEOSSTRtree_insert_r(m_geos_context, m_feature_tree.get(), f.geometry(), (void*)&f);
}
}

void
RasterParallelProcessor::process()
{
read_features();
populate_index();

std::set<RasterSource*> raster_sources;
for (const auto& op : m_operations) {
if (op->weighted()) {
throw std::runtime_error("Weighted operations not yet supported in raster-parallel strategy.");
} else if (op->requires_variance()) {
throw std::runtime_error("Variance operations not yet supported in raster-parallel strategy.");
}

raster_sources.insert(op->values);
m_reg.prepare(*op);
}

std::vector<ZonalStatsCalc> raster_grids;
for (auto& source : raster_sources) {
auto subgrids = subdivide(source->grid(), m_max_cells_in_memory);
for (auto& subgrid : subgrids) {
raster_grids.push_back({ .source = source,
.subgrid = subgrid });
}
}
const auto total_subgrids = raster_grids.size();
size_t processed_subgrids = 0;

oneapi::tbb::enumerable_thread_specific<GEOSContextHandle_t> geos_context([]() -> GEOSContextHandle_t {
return initGEOS_r(errorHandlerParallel, errorHandlerParallel);
});

// clang-format off
oneapi::tbb::parallel_pipeline(m_threads,
oneapi::tbb::make_filter<void, ZonalStatsCalc>(oneapi::tbb::filter_mode::serial_in_order,
[&raster_grids] (oneapi::tbb::flow_control& fc) -> ZonalStatsCalc {
//TODO: split subgridding by raster to optimise IO based on raster block size per input?
//logic below currently assumes that all raster inputs have the same total geospatial coverage
if (raster_grids.empty()) {
fc.stop();
return {nullptr, Grid<bounded_extent>::make_empty()};
}

auto sg = std::move(raster_grids.back());
raster_grids.pop_back();
return sg;
}) &
oneapi::tbb::make_filter<ZonalStatsCalc, ZonalStatsCalc>(oneapi::tbb::filter_mode::parallel,
[&geos_context, this] (ZonalStatsCalc context) -> ZonalStatsCalc {
std::vector<const Feature*> hits;
auto query_rect = geos_make_box_polygon(geos_context.local(), context.subgrid.extent());

GEOSSTRtree_query_r(
geos_context.local(), m_feature_tree.get(), query_rect.get(), [](void* hit, void* userdata) {
auto feature = static_cast<const Feature*>(hit);
auto vec = static_cast<std::vector<const Feature*>*>(userdata);

vec->push_back(feature);
},
&hits);

if (hits.empty()) {
return context;
}

context.hits = hits;
return context;
}) &
oneapi::tbb::make_filter<ZonalStatsCalc, ZonalStatsCalc>(oneapi::tbb::filter_mode::serial_out_of_order,
[raster_sources, this] (ZonalStatsCalc context) -> ZonalStatsCalc {
if (context.subgrid.empty() || context.hits.empty()) {
return context;
}

auto values = std::make_unique<RasterVariant>(context.source->read_box(context.subgrid.extent().intersection(context.source->grid().extent())));
context.values = std::move(values);
return context;
}) &
oneapi::tbb::make_filter<ZonalStatsCalc, std::tuple<Grid<bounded_extent>, StatsRegistryPtr>>(oneapi::tbb::filter_mode::parallel,
[&geos_context, this] (ZonalStatsCalc context) -> std::tuple<Grid<bounded_extent>, StatsRegistryPtr> {
if (context.subgrid.empty() || context.hits.empty() || !context.source || !context.values) {
return {context.subgrid, nullptr};
}

auto block_registry = std::make_shared<StatsRegistry>();
std::vector<Operation*> trimmed_ops;

for (const auto& op : m_operations) {
if (op->values == context.source) {
block_registry->prepare(*op);
trimmed_ops.push_back(op.get());
}
}

for (const Feature* f : context.hits) {
auto coverage = std::make_unique<Raster<float>>(raster_cell_intersection(context.subgrid, geos_context.local(), f->geometry()));
std::set<std::string> processed;

for (const auto op : trimmed_ops) {
//TODO: push this earlier and remove duplicate operations entirely
if (processed.find(op->key()) != processed.end()) {
continue;
} else {
processed.insert(op->key());
}

block_registry->update_stats(*f, *op, *coverage, *context.values);
}
}

return {context.subgrid, block_registry};
}) &
oneapi::tbb::make_filter<std::tuple<Grid<bounded_extent>, StatsRegistryPtr>, void>(oneapi::tbb::filter_mode::serial_out_of_order,
[&processed_subgrids, total_subgrids, this] (std::tuple<Grid<bounded_extent>, StatsRegistryPtr> registry) {
auto& [registryGrid, regptr] = registry;
if (regptr) {
this->m_reg.merge(*regptr);
}

if (m_show_progress && !registryGrid.empty()) {
processed_subgrids++;

std::stringstream ss;
ss << registryGrid.extent();

progress(static_cast<double>(processed_subgrids) / static_cast<double>(total_subgrids), ss.str());
}
})
);
// clang-format on

for (const auto& f_in : m_features) {
write_result(f_in);
}
}

}
45 changes: 45 additions & 0 deletions src/raster_parallel_processor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@

// Copyright (c) 2024 ISciences, LLC.
// All rights reserved.
//
// This software is licensed under the Apache License, Version 2.0 (the "License").
// You may not use this file except in compliance with the License. You may
// obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "geos_utils.h"
#include "map_feature.h"
#include "processor.h"

namespace exactextract {

class RasterParallelProcessor : public Processor
{
public:
using Processor::Processor;

RasterParallelProcessor(FeatureSource& ds, OutputWriter& out, size_t threads)
: Processor(ds, out)
, m_threads(threads)
{
}

void read_features();
void populate_index();

void process() override;

private:
size_t m_threads;
std::vector<MapFeature> m_features;
tree_ptr_r m_feature_tree{ geos_ptr(m_geos_context, GEOSSTRtree_create_r(m_geos_context, 10)) };
};

}
42 changes: 42 additions & 0 deletions src/raster_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,48 @@ class RasterStats
{
}

RasterStats(RasterStats<T>&& other) = default;
RasterStats<T>& operator=(RasterStats<T>&& other) = default;

void combine(const RasterStats<T>& source)
{
m_min = std::min(m_min, source.m_min);
m_max = std::max(m_max, source.m_max);

if (m_options.store_xy) {
double minX = std::min(m_min_xy.first, source.m_min_xy.first);
double minY = std::min(m_min_xy.second, source.m_min_xy.second);
m_min_xy = { minX, minY };

double maxX = std::min(m_max_xy.first, source.m_max_xy.first);
double maxY = std::min(m_max_xy.second, source.m_max_xy.second);
m_max_xy = { maxX, maxY };
}

m_sum_ciwi += source.m_sum_ciwi;
m_sum_ci += source.m_sum_ci;
m_sum_xici += source.m_sum_xici;
m_sum_xiciwi += source.m_sum_xiciwi;

for (auto& v : source.m_freq) {
auto it = m_freq.find(v.first);
if (it != m_freq.end()) {
it->second.m_sum_ci += v.second.m_sum_ci;
it->second.m_sum_ciwi += v.second.m_sum_ciwi;
} else {
m_freq.insert(v);
}
}

m_cell_cov.insert(m_cell_cov.end(), source.m_cell_cov.begin(), source.m_cell_cov.end());
m_cell_values.insert(m_cell_values.end(), source.m_cell_values.begin(), source.m_cell_values.end());
m_cell_weights.insert(m_cell_weights.end(), source.m_cell_weights.begin(), source.m_cell_weights.end());
m_cell_x.insert(m_cell_x.end(), source.m_cell_x.begin(), source.m_cell_x.end());
m_cell_y.insert(m_cell_y.end(), source.m_cell_y.begin(), source.m_cell_y.end());
m_cell_values_defined.insert(m_cell_values_defined.end(), source.m_cell_values_defined.begin(), source.m_cell_values_defined.end());
m_cell_weights_defined.insert(m_cell_weights_defined.end(), source.m_cell_weights_defined.begin(), source.m_cell_weights_defined.end());
}

static bool get_or_default(const AbstractRaster<T>& r, std::size_t i, std::size_t j, T& val, const std::optional<T>& default_value)
{
if (r.get(i, j, val)) {
Expand Down
Loading