Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: spatial join #580

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
394 changes: 339 additions & 55 deletions python/core/Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions python/core/python/geoarrow/rust/core/_rust.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1426,6 +1426,7 @@ def total_bounds(
# Top-level table functions

def explode(input: ArrowStreamExportable) -> GeoTable: ...
def sjoin(left: ArrowStreamExportable, right: ArrowStreamExportable) -> GeoTable: ...

# I/O

Expand Down
1 change: 1 addition & 0 deletions python/core/src/algorithm/geo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ pub mod line_locate_point;
pub mod rotate;
pub mod scale;
pub mod simplify;
pub mod sjoin;
pub mod skew;
pub mod translate;
9 changes: 9 additions & 0 deletions python/core/src/algorithm/geo/sjoin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use crate::error::PyGeoArrowResult;
use crate::table::GeoTable;
use geoarrow::algorithm::geo::spatial_join;
use pyo3::prelude::*;

#[pyfunction]
pub fn sjoin(left: GeoTable, right: GeoTable) -> PyGeoArrowResult<GeoTable> {
Ok(GeoTable(spatial_join(&left.0, &right.0)?))
}
1 change: 1 addition & 0 deletions python/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ fn _rust(_py: Python, m: &PyModule) -> PyResult<()> {
crate::algorithm::native::explode::explode,
m
)?)?;
m.add_function(wrap_pyfunction!(crate::algorithm::geo::sjoin::sjoin, m)?)?;

// IO

Expand Down
5 changes: 5 additions & 0 deletions src/algorithm/geo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ pub use simplify_vw::SimplifyVw;
mod simplify_vw_preserve;
pub use simplify_vw_preserve::SimplifyVwPreserve;

#[cfg(feature = "rayon")]
mod sjoin;
#[cfg(feature = "rayon")]
pub use sjoin::spatial_join;

/// Skew geometries by shearing it at angles along the x and y dimensions
mod skew;
pub use skew::Skew;
Expand Down
237 changes: 237 additions & 0 deletions src/algorithm/geo/sjoin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
use std::collections::HashSet;
use std::sync::Arc;

use arrow::array::Array;
use arrow::array::UInt32Builder;
use arrow::compute::take_record_batch;
use arrow_array::RecordBatch;
use arrow_schema::SchemaBuilder;
use geo::Intersects;
use geo_index::rtree::sort::HilbertSort;
use geo_index::rtree::{OwnedRTree, RTreeBuilder, RTreeIndex};
use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};

use crate::error::Result;
use crate::indexed::array::{create_indexed_array, IndexedGeometryArrayTrait};
use crate::table::GeoTable;
use crate::GeometryArrayTrait;

/// For now, only inner, intersects join
pub fn spatial_join(left: &GeoTable, right: &GeoTable) -> Result<GeoTable> {
let left_indexed_chunks = create_indexed_chunks(&left.geometry()?.geometry_chunks())?;
let right_indexed_chunks = create_indexed_chunks(&right.geometry()?.geometry_chunks())?;

let left_indexed_chunks_refs = left_indexed_chunks
.iter()
.map(|chunk| chunk.as_ref())
.collect::<Vec<_>>();
let right_indexed_chunks_refs = right_indexed_chunks
.iter()
.map(|chunk| chunk.as_ref())
.collect::<Vec<_>>();

let chunk_candidates = get_chunk_candidates(
left_indexed_chunks_refs.as_slice(),
right_indexed_chunks_refs.as_slice(),
);

let new_batches = chunk_candidates
.into_par_iter()
.map(|(left_chunk_idx, right_chunk_idx)| {
let left_chunk = left_indexed_chunks_refs[left_chunk_idx];
let right_chunk = right_indexed_chunks_refs[right_chunk_idx];

let left_batch = &left.batches()[left_chunk_idx];
let right_batch = &right.batches()[right_chunk_idx];

let (left_indices, right_indices) = chunk_intersects(left_chunk, right_chunk);
let new_left_batch = take_record_batch(left_batch, &left_indices)?;
let new_right_batch = take_record_batch(right_batch, &right_indices)?;
Ok((new_left_batch, new_right_batch))
})
.collect::<Result<Vec<_>>>()?;

assemble_table(new_batches, left, right)
}

fn create_indexed_chunks(
chunks: &[&dyn GeometryArrayTrait],
) -> Result<Vec<Arc<dyn IndexedGeometryArrayTrait>>> {
chunks
.par_iter()
.map(|chunk| create_indexed_array(*chunk))
.collect()
}

/// Get the _chunks_ from the left and right sides whose children need to be compared with each
/// other.
///
/// That is, if the left dataset has 5 chunks and the right dataset has 10 chunks. If there is
/// _any_ spatial sorting happening, not all of the 5 left chunks will intersect with every single
/// one of the 5 right chunks. So instead of having to perform _50_ child calls (`left * right`),
/// we can perform less than that: only those whose bounding boxes actually intersect.
///
/// We use the root bounding box of each RTree to infer a "higher level" RTree, which itself we
/// search through to find these candidates.
fn get_chunk_candidates(
left_chunks: &[&dyn IndexedGeometryArrayTrait],
right_chunks: &[&dyn IndexedGeometryArrayTrait],
) -> Vec<(usize, usize)> {
if left_chunks.len() == 1 && right_chunks.len() == 1 {
return vec![(0, 0)];
}

let left_chunk_tree = create_tree_from_chunks(left_chunks);
let right_chunk_tree = create_tree_from_chunks(right_chunks);

match (left_chunk_tree, right_chunk_tree) {
(None, None) => panic!("should be covered above"),
(Some(left), None) => {
let right_bbox = right_chunks[0].total_bounds();
let left_chunk_idxs = left.search(
right_bbox.minx,
right_bbox.miny,
right_bbox.maxx,
right_bbox.maxy,
);
left_chunk_idxs.iter().map(|idx| (*idx, 0)).collect()
}
(None, Some(right)) => {
let left_bbox = left_chunks[0].total_bounds();
let right_chunk_idxs = right.search(
left_bbox.minx,
left_bbox.miny,
left_bbox.maxx,
left_bbox.maxy,
);
right_chunk_idxs.iter().map(|idx| (0, *idx)).collect()
}
(Some(left), Some(right)) => left
.intersection_candidates_with_other_tree(&right)
.collect(),
}
}

fn create_tree_from_chunks(chunks: &[&dyn IndexedGeometryArrayTrait]) -> Option<OwnedRTree<f64>> {
if chunks.len() == 1 {
return None;
}

let mut tree = RTreeBuilder::<f64>::new(chunks.len());
chunks.iter().for_each(|chunk| {
let bounding_rect = chunk.total_bounds();
tree.add(
bounding_rect.minx,
bounding_rect.miny,
bounding_rect.maxx,
bounding_rect.maxy,
);
});
Some(tree.finish::<HilbertSort>())
}

/// Call [Intersects][geo::Intersects] on the pairs within the given chunks whose bounding boxes
/// intersect.
///
/// Returns the left and right indexes that pairwise intersect.
fn chunk_intersects(
left_chunk: &dyn IndexedGeometryArrayTrait,
right_chunk: &dyn IndexedGeometryArrayTrait,
) -> (Arc<dyn Array>, Arc<dyn Array>) {
let left_array = left_chunk.array();
let right_array = right_chunk.array();

let mut left_idxs = UInt32Builder::new();
let mut right_idxs = UInt32Builder::new();
for (left_idx, right_idx) in left_chunk
.index()
.intersection_candidates_with_other_tree(right_chunk.index())
{
let left_geom = left_array.value_as_geo_geometry(left_idx);
let right_geom = right_array.value_as_geo_geometry(right_idx);
if left_geom.intersects(&right_geom) {
left_idxs.append_value(left_idx.try_into().unwrap());
right_idxs.append_value(right_idx.try_into().unwrap());
}
}

(Arc::new(left_idxs.finish()), Arc::new(right_idxs.finish()))
}

fn assemble_table(
new_batches: Vec<(RecordBatch, RecordBatch)>,
prev_left_table: &GeoTable,
prev_right_table: &GeoTable,
) -> Result<GeoTable> {
let prev_left_schema = prev_left_table.schema();
let prev_right_schema = prev_right_table.schema();
let right_geom_col_idx = prev_right_table.geometry_column_index();

let mut left_names = HashSet::with_capacity(prev_left_schema.fields().len());
prev_left_schema.fields().iter().for_each(|field| {
left_names.insert(field.name());
});

let mut overlapping_field_names = HashSet::new();
prev_right_schema
.fields()
.iter()
.enumerate()
.for_each(|(idx, field)| {
if idx != right_geom_col_idx {
if left_names.contains(field.name()) {
overlapping_field_names.insert(field.name());
}
}
});

let mut new_schema = SchemaBuilder::with_capacity(
prev_left_schema.fields().len() + prev_right_schema.fields.len() - 1,
);
prev_left_schema.fields().iter().for_each(|field| {
if overlapping_field_names.contains(field.name()) {
let new_field = field
.as_ref()
.clone()
.with_name(format!("{}_left", field.name()));
new_schema.push(Arc::new(new_field))
} else {
new_schema.push(field.clone())
}
});
prev_right_schema
.fields()
.iter()
.enumerate()
.for_each(|(idx, field)| {
if idx == right_geom_col_idx {
()
} else if overlapping_field_names.contains(field.name()) {
let new_field = field
.as_ref()
.clone()
.with_name(format!("{}_right", field.name()));
new_schema.push(Arc::new(new_field))
} else {
new_schema.push(field.clone())
}
});
let new_schema = Arc::new(new_schema.finish());

let new_batches = new_batches
.into_iter()
.map(|(left_batch, right_batch)| {
let mut left_columns = left_batch.columns().to_vec();
let mut right_columns = right_batch.columns().to_vec();
right_columns.remove(right_geom_col_idx);
left_columns.extend_from_slice(&right_columns);
RecordBatch::try_new(new_schema.clone(), left_columns).unwrap()
})
.collect::<Vec<_>>();

GeoTable::try_new(
new_schema,
new_batches,
prev_left_table.geometry_column_index(),
)
}
4 changes: 4 additions & 0 deletions src/array/binary/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ impl<O: OffsetSizeTrait> GeometryArrayTrait for WKBArray<O> {
fn as_ref(&self) -> &dyn GeometryArrayTrait {
self
}

fn value_as_geo_geometry(&self, i: usize) -> geo::Geometry {
self.value_as_geo(i)
}
}

impl<O: OffsetSizeTrait> GeometryArraySelfMethods for WKBArray<O> {
Expand Down
4 changes: 4 additions & 0 deletions src/array/coord/combined/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ impl GeometryArrayTrait for CoordBuffer {
fn as_ref(&self) -> &dyn GeometryArrayTrait {
self
}

fn value_as_geo_geometry(&self, _i: usize) -> geo::Geometry {
panic!("coord arrays can't impl value_as_geo_geometry")
}
}

impl GeometryArraySelfMethods for CoordBuffer {
Expand Down
4 changes: 4 additions & 0 deletions src/array/coord/interleaved/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ impl GeometryArrayTrait for InterleavedCoordBuffer {
fn as_ref(&self) -> &dyn GeometryArrayTrait {
self
}

fn value_as_geo_geometry(&self, _i: usize) -> geo::Geometry {
panic!("coord arrays can't impl value_as_geo_geometry")
}
}

impl GeometryArraySelfMethods for InterleavedCoordBuffer {
Expand Down
4 changes: 4 additions & 0 deletions src/array/coord/separated/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ impl GeometryArrayTrait for SeparatedCoordBuffer {
fn as_ref(&self) -> &dyn GeometryArrayTrait {
self
}

fn value_as_geo_geometry(&self, _i: usize) -> geo::Geometry {
panic!("coord arrays can't impl value_as_geo_geometry")
}
}

impl GeometryArraySelfMethods for SeparatedCoordBuffer {
Expand Down
4 changes: 4 additions & 0 deletions src/array/geometry/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ impl<O: OffsetSizeTrait> GeometryArrayTrait for GeometryArray<O> {
self
}

fn value_as_geo_geometry(&self, i: usize) -> geo::Geometry {
self.value_as_geo(i)
}

// /// Clones this [`GeometryArray`] with a new assigned bitmap.
// /// # Panic
// /// This function panics iff `validity.len() != self.len()`.
Expand Down
4 changes: 4 additions & 0 deletions src/array/geometrycollection/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ impl<O: OffsetSizeTrait> GeometryArrayTrait for GeometryCollectionArray<O> {
fn as_ref(&self) -> &dyn GeometryArrayTrait {
self
}

fn value_as_geo_geometry(&self, i: usize) -> geo::Geometry {
geo::Geometry::GeometryCollection(self.value_as_geo(i))
}
}

impl<O: OffsetSizeTrait> GeometryArraySelfMethods for GeometryCollectionArray<O> {
Expand Down
4 changes: 4 additions & 0 deletions src/array/linestring/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ impl<O: OffsetSizeTrait> GeometryArrayTrait for LineStringArray<O> {
fn as_ref(&self) -> &dyn GeometryArrayTrait {
self
}

fn value_as_geo_geometry(&self, i: usize) -> geo::Geometry {
geo::Geometry::LineString(self.value_as_geo(i))
}
}

impl<O: OffsetSizeTrait> GeometryArraySelfMethods for LineStringArray<O> {
Expand Down
4 changes: 4 additions & 0 deletions src/array/mixed/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,10 @@ impl<O: OffsetSizeTrait> GeometryArrayTrait for MixedGeometryArray<O> {
fn as_ref(&self) -> &dyn GeometryArrayTrait {
self
}

fn value_as_geo_geometry(&self, i: usize) -> geo::Geometry {
self.value_as_geo(i)
}
}

impl<O: OffsetSizeTrait> GeometryArraySelfMethods for MixedGeometryArray<O> {
Expand Down
4 changes: 4 additions & 0 deletions src/array/multilinestring/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ impl<O: OffsetSizeTrait> GeometryArrayTrait for MultiLineStringArray<O> {
fn as_ref(&self) -> &dyn GeometryArrayTrait {
self
}

fn value_as_geo_geometry(&self, i: usize) -> geo::Geometry {
geo::Geometry::MultiLineString(self.value_as_geo(i))
}
}

impl<O: OffsetSizeTrait> GeometryArraySelfMethods for MultiLineStringArray<O> {
Expand Down
4 changes: 4 additions & 0 deletions src/array/multipoint/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ impl<O: OffsetSizeTrait> GeometryArrayTrait for MultiPointArray<O> {
fn as_ref(&self) -> &dyn GeometryArrayTrait {
self
}

fn value_as_geo_geometry(&self, i: usize) -> geo::Geometry {
geo::Geometry::MultiPoint(self.value_as_geo(i))
}
}

impl<O: OffsetSizeTrait> GeometryArraySelfMethods for MultiPointArray<O> {
Expand Down
Loading
Loading