diff --git a/tobler/area_weighted/area_interpolate_dask.py b/tobler/area_weighted/area_interpolate_dask.py index faf5e02..6522bff 100755 --- a/tobler/area_weighted/area_interpolate_dask.py +++ b/tobler/area_weighted/area_interpolate_dask.py @@ -264,3 +264,152 @@ def id_area_interpolate( pandas.DataFrame(index=estimates.index, columns=category_vars_to_add) ) return estimates + + +def area_interpolate_dask_native( + source_dgdf, + target_dgdf, + source_id, + target_id, + extensive_variables=None, + intensive_variables=None, + categorical_variables=None, + categorical_frequency=True +): + # Categoricals must be Dask's known categorical + if categorical_variables is not None: + category_vars = [] + for cat_var in categorical_variables: + var_names = [f'{cat_var}_{c}' for c in source_dgdf[cat_var].cat.categories] + category_vars.extend(var_names) + else: + category_vars = None + #---------------------------------------- + # Build cross-over table + #---------------------------------------- + # Build tasks by joining pairs of chunks from left/right + dsk = {} + new_spatial_partitions = [] + parts = geopandas.sjoin( + source_dgdf.spatial_partitions.to_frame('geometry'), + target_dgdf.spatial_partitions.to_frame('geometry'), + how='inner', + predicate='intersects' + ) + parts_left = np.asarray(parts.index) + parts_right = np.asarray(parts['index_right'].values) + name = 'area_interpolate-' + tokenize( + target_dgdf, source_dgdf + ) + # Create computation items for workers + for i, (l, r) in enumerate(zip(parts_left, parts_right)): + dsk[(name, i)] = ( + id_area_table, + (source_dgdf._name, l), + (target_dgdf._name, r), + source_id, + target_id, + 'auto', + ) + lr = source_dgdf.spatial_partitions.iloc[l] + rr = target_dgdf.spatial_partitions.iloc[r] + extent = lr.intersection(rr) + new_spatial_partitions.append(extent) + # Create geometries for new spatial partitions + new_spatial_partitions = geopandas.GeoSeries( + data=new_spatial_partitions, crs=source_dgdf.crs + ) + # Build Dask graph + graph = HighLevelGraph.from_collections( + name, dsk, dependencies=[source_dgdf, target_dgdf] + ) + # Get metadata for the outcome table + meta = id_area_table( + source_dgdf._meta, + target_dgdf._meta, + source_id, + target_id, + spatial_index='auto', + ) + #---------------------------------------- + # Build output table + areas = dask_geopandas.GeoDataFrame( + graph, + name, + meta, + [None] * (len(dsk) + 1), + new_spatial_partitions + ) + # Merge chunks + out = target_dgdf[[target_id, 'geometry']] + ## Extensive --> Not implemented (DAB: the below does not match single-core) + if extensive_variables is not None: + weights = areas / ( + areas.groupby(source_id).transform('sum', meta=areas._meta) + ) + tmp = ( + weights + .join( + source_df.set_index(source_id)[extensive_variables], + on=source_id + ) + ) + for ev in extensive_variables: + tmp[ev] = tmp[[ev, 'weight']].prod(axis='columns') + out_extensive = tmp.groupby(target_id)[extensive_variables].sum() + out = out.join(out_extensive, on=target_id) + ''' + ## Intensive --> Weight by area of the chunk (Not implemented) + ## Categorical --> Add up proportions + if categorical_variables is not None: + out_categorical = ( + transferred + [category_vars] + .astype(float) + .groupby(transferred[target_id]) + .agg({v: 'sum' for v in category_vars}) + ) + out = out.join(out_categorical, on=target_id) + if categorical_frequency is True: + cols = out_categorical.columns.tolist() + out[cols] = out[cols].div( + out.area, axis='index' + ) + ''' + return out + +def id_area_table( + source_df, + target_df, + source_id, + target_id, + spatial_index='auto', +): + df1 = source_df.copy() + df2 = target_df.copy() + + # it is generally more performant to use the longer df as spatial index + if spatial_index == "auto": + if df1.shape[0] > df2.shape[0]: + spatial_index = "source" + else: + spatial_index = "target" + + if spatial_index == "source": + ids_tgt, ids_src = df1.sindex.query(df2.geometry, predicate="intersects") + elif spatial_index == "target": + ids_src, ids_tgt = df2.sindex.query(df1.geometry, predicate="intersects") + else: + raise ValueError( + f"'{spatial_index}' is not a valid option. Use 'auto', 'source' or 'target'." + ) + + areas = df1.geometry.values[ids_src].intersection(df2.geometry.values[ids_tgt]).area + + table = pandas.DataFrame({ + source_id: source_df[source_id].iloc[ids_src].values, + target_id: target_df[target_id].iloc[ids_tgt].values, + 'overlap': areas + }) + table['uid'] = table[source_id].astype(str) + '-' + table[source_id].astype(str) + \ No newline at end of file