From 5a50110752ede21190704e9e713d0637379236a2 Mon Sep 17 00:00:00 2001 From: Dani Arribas-Bel Date: Sun, 10 Sep 2023 21:55:03 +0100 Subject: [PATCH 1/2] Add native Dask implementation --- tobler/area_weighted/area_interpolate_dask.py | 296 ++++++++++++++++++ 1 file changed, 296 insertions(+) diff --git a/tobler/area_weighted/area_interpolate_dask.py b/tobler/area_weighted/area_interpolate_dask.py index faf5e02b..70a2f7b6 100755 --- a/tobler/area_weighted/area_interpolate_dask.py +++ b/tobler/area_weighted/area_interpolate_dask.py @@ -264,3 +264,299 @@ def id_area_interpolate( pandas.DataFrame(index=estimates.index, columns=category_vars_to_add) ) return estimates + + +def area_interpolate_dask_native( + source_dgdf, + target_dgdf, + source_id, + target_id, + extensive_variables=None, + intensive_variables=None, + categorical_variables=None, + categorical_frequency=True +): + # Categoricals must be Dask's known categorical + if categorical_variables is not None: + category_vars = [] + for cat_var in categorical_variables: + var_names = [f'{cat_var}_{c}' for c in source_dgdf[cat_var].cat.categories] + category_vars.extend(var_names) + else: + category_vars = None + #---------------------------------------- + # Build cross-over table + #---------------------------------------- + # Build tasks by joining pairs of chunks from left/right + dsk = {} + new_spatial_partitions = [] + parts = geopandas.sjoin( + source_dgdf.spatial_partitions.to_frame('geometry'), + target_dgdf.spatial_partitions.to_frame('geometry'), + how='inner', + predicate='intersects' + ) + parts_left = np.asarray(parts.index) + parts_right = np.asarray(parts['index_right'].values) + name = 'area_interpolate-' + tokenize( + target_dgdf, source_dgdf + ) + # Create computation items for workers + for i, (l, r) in enumerate(zip(parts_left, parts_right)): + dsk[(name, i)] = ( + id_area_table, + (source_dgdf._name, l), + (target_dgdf._name, r), + source_id, + target_id, + 'auto', + ) + lr = source_dgdf.spatial_partitions.iloc[l] + rr = target_dgdf.spatial_partitions.iloc[r] + extent = lr.intersection(rr) + new_spatial_partitions.append(extent) + # Create geometries for new spatial partitions + new_spatial_partitions = geopandas.GeoSeries( + data=new_spatial_partitions, crs=source_dgdf.crs + ) + # Build Dask graph + graph = HighLevelGraph.from_collections( + name, dsk, dependencies=[source_dgdf, target_dgdf] + ) + # Get metadata for the outcome table + meta = id_area_table( + source_dgdf._meta, + target_dgdf._meta, + source_id, + target_id, + spatial_index='auto', + ) + #---------------------------------------- + # Build output table + areas = dask_geopandas.GeoDataFrame( + graph, + name, + meta, + [None] * (len(dsk) + 1), + new_spatial_partitions + ) + # Merge chunks + out = target_dgdf[[target_id, 'geometry']] + ## Extensive --> Not implemented (DAB: the below does not match single-core) + if extensive_variables is not None: + weights = areas / ( + areas.groupby(source_id).transform('sum', meta=areas._meta) + ) + tmp = ( + weights + .join( + source_df.set_index(source_id)[extensive_variables], + on=source_id + ) + ) + for ev in extensive_variables: + tmp[ev] = tmp[[ev, 'weight']].prod(axis='columns') + out_extensive = tmp.groupby(target_id)[extensive_variables].sum() + out = out.join(out_extensive, on=target_id) + ''' + ## Intensive --> Weight by area of the chunk (Not implemented) + ## Categorical --> Add up proportions + if categorical_variables is not None: + out_categorical = ( + transferred + [category_vars] + .astype(float) + .groupby(transferred[target_id]) + .agg({v: 'sum' for v in category_vars}) + ) + out = out.join(out_categorical, on=target_id) + if categorical_frequency is True: + cols = out_categorical.columns.tolist() + out[cols] = out[cols].div( + out.area, axis='index' + ) + ''' + return out + +def id_area_table( + source_df, + target_df, + source_id, + target_id, + spatial_index='auto', +): + df1 = source_df.copy() + df2 = target_df.copy() + + # it is generally more performant to use the longer df as spatial index + if spatial_index == "auto": + if df1.shape[0] > df2.shape[0]: + spatial_index = "source" + else: + spatial_index = "target" + + if spatial_index == "source": + ids_tgt, ids_src = df1.sindex.query(df2.geometry, predicate="intersects") + elif spatial_index == "target": + ids_src, ids_tgt = df2.sindex.query(df1.geometry, predicate="intersects") + else: + raise ValueError( + f"'{spatial_index}' is not a valid option. Use 'auto', 'source' or 'target'." + ) + + areas = df1.geometry.values[ids_src].intersection(df2.geometry.values[ids_tgt]).area + + table = pandas.DataFrame({ + source_id: source_df[source_id].iloc[ids_src].values, + target_id: target_df[target_id].iloc[ids_tgt].values, + 'overlap': areas + }) + table['uid'] = table[source_id].astype(str) + '-' + table[source_id].astype(str) + +def area_interpolate_dask_native( + source_dgdf, + target_dgdf, + source_id, + target_id, + extensive_variables=None, + intensive_variables=None, + categorical_variables=None, + categorical_frequency=True +): + # Categoricals must be Dask's known categorical + if categorical_variables is not None: + category_vars = [] + for cat_var in categorical_variables: + var_names = [f'{cat_var}_{c}' for c in source_dgdf[cat_var].cat.categories] + category_vars.extend(var_names) + else: + category_vars = None + #---------------------------------------- + # Build cross-over table + #---------------------------------------- + # Build tasks by joining pairs of chunks from left/right + dsk = {} + new_spatial_partitions = [] + parts = geopandas.sjoin( + source_dgdf.spatial_partitions.to_frame('geometry'), + target_dgdf.spatial_partitions.to_frame('geometry'), + how='inner', + predicate='intersects' + ) + parts_left = np.asarray(parts.index) + parts_right = np.asarray(parts['index_right'].values) + name = 'area_interpolate-' + tokenize( + target_dgdf, source_dgdf + ) + # Create computation items for workers + for i, (l, r) in enumerate(zip(parts_left, parts_right)): + dsk[(name, i)] = ( + id_area_table, + (source_dgdf._name, l), + (target_dgdf._name, r), + source_id, + target_id, + 'auto', + ) + lr = source_dgdf.spatial_partitions.iloc[l] + rr = target_dgdf.spatial_partitions.iloc[r] + extent = lr.intersection(rr) + new_spatial_partitions.append(extent) + # Create geometries for new spatial partitions + new_spatial_partitions = geopandas.GeoSeries( + data=new_spatial_partitions, crs=source_dgdf.crs + ) + # Build Dask graph + graph = HighLevelGraph.from_collections( + name, dsk, dependencies=[source_dgdf, target_dgdf] + ) + # Get metadata for the outcome table + meta = id_area_table( + source_dgdf._meta, + target_dgdf._meta, + source_id, + target_id, + spatial_index='auto', + ) + #---------------------------------------- + # Build output table + areas = dask_geopandas.GeoDataFrame( + graph, + name, + meta, + [None] * (len(dsk) + 1), + new_spatial_partitions + ) + # Merge chunks + out = target_dgdf[[target_id, 'geometry']] + ## Extensive --> Not implemented (DAB: the below does not match single-core) + if extensive_variables is not None: + weights = areas / ( + areas.groupby(source_id).transform('sum', meta=areas._meta) + ) + tmp = ( + weights + .join( + source_df.set_index(source_id)[extensive_variables], + on=source_id + ) + ) + for ev in extensive_variables: + tmp[ev] = tmp[[ev, 'weight']].prod(axis='columns') + out_extensive = tmp.groupby(target_id)[extensive_variables].sum() + out = out.join(out_extensive, on=target_id) + ''' + ## Intensive --> Weight by area of the chunk (Not implemented) + ## Categorical --> Add up proportions + if categorical_variables is not None: + out_categorical = ( + transferred + [category_vars] + .astype(float) + .groupby(transferred[target_id]) + .agg({v: 'sum' for v in category_vars}) + ) + out = out.join(out_categorical, on=target_id) + if categorical_frequency is True: + cols = out_categorical.columns.tolist() + out[cols] = out[cols].div( + out.area, axis='index' + ) + ''' + return out + +def id_area_table( + source_df, + target_df, + source_id, + target_id, + spatial_index='auto', +): + df1 = source_df.copy() + df2 = target_df.copy() + + # it is generally more performant to use the longer df as spatial index + if spatial_index == "auto": + if df1.shape[0] > df2.shape[0]: + spatial_index = "source" + else: + spatial_index = "target" + + if spatial_index == "source": + ids_tgt, ids_src = df1.sindex.query(df2.geometry, predicate="intersects") + elif spatial_index == "target": + ids_src, ids_tgt = df2.sindex.query(df1.geometry, predicate="intersects") + else: + raise ValueError( + f"'{spatial_index}' is not a valid option. Use 'auto', 'source' or 'target'." + ) + + areas = df1.geometry.values[ids_src].intersection(df2.geometry.values[ids_tgt]).area + + table = pandas.DataFrame({ + source_id: source_df[source_id].iloc[ids_src].values, + target_id: target_df[target_id].iloc[ids_tgt].values, + 'overlap': areas + }) + table['uid'] = table[source_id].astype(str) + '-' + table[source_id].astype(str) + return table.set_index('uid') \ No newline at end of file From 5ce79e71a89fc4ede93ec22f6eb84b1acf884e4a Mon Sep 17 00:00:00 2001 From: Dani Arribas-Bel Date: Sun, 10 Sep 2023 22:08:13 +0100 Subject: [PATCH 2/2] De-dup methods (typo) --- tobler/area_weighted/area_interpolate_dask.py | 149 +----------------- 1 file changed, 1 insertion(+), 148 deletions(-) diff --git a/tobler/area_weighted/area_interpolate_dask.py b/tobler/area_weighted/area_interpolate_dask.py index 70a2f7b6..6522bffc 100755 --- a/tobler/area_weighted/area_interpolate_dask.py +++ b/tobler/area_weighted/area_interpolate_dask.py @@ -412,151 +412,4 @@ def id_area_table( 'overlap': areas }) table['uid'] = table[source_id].astype(str) + '-' + table[source_id].astype(str) - -def area_interpolate_dask_native( - source_dgdf, - target_dgdf, - source_id, - target_id, - extensive_variables=None, - intensive_variables=None, - categorical_variables=None, - categorical_frequency=True -): - # Categoricals must be Dask's known categorical - if categorical_variables is not None: - category_vars = [] - for cat_var in categorical_variables: - var_names = [f'{cat_var}_{c}' for c in source_dgdf[cat_var].cat.categories] - category_vars.extend(var_names) - else: - category_vars = None - #---------------------------------------- - # Build cross-over table - #---------------------------------------- - # Build tasks by joining pairs of chunks from left/right - dsk = {} - new_spatial_partitions = [] - parts = geopandas.sjoin( - source_dgdf.spatial_partitions.to_frame('geometry'), - target_dgdf.spatial_partitions.to_frame('geometry'), - how='inner', - predicate='intersects' - ) - parts_left = np.asarray(parts.index) - parts_right = np.asarray(parts['index_right'].values) - name = 'area_interpolate-' + tokenize( - target_dgdf, source_dgdf - ) - # Create computation items for workers - for i, (l, r) in enumerate(zip(parts_left, parts_right)): - dsk[(name, i)] = ( - id_area_table, - (source_dgdf._name, l), - (target_dgdf._name, r), - source_id, - target_id, - 'auto', - ) - lr = source_dgdf.spatial_partitions.iloc[l] - rr = target_dgdf.spatial_partitions.iloc[r] - extent = lr.intersection(rr) - new_spatial_partitions.append(extent) - # Create geometries for new spatial partitions - new_spatial_partitions = geopandas.GeoSeries( - data=new_spatial_partitions, crs=source_dgdf.crs - ) - # Build Dask graph - graph = HighLevelGraph.from_collections( - name, dsk, dependencies=[source_dgdf, target_dgdf] - ) - # Get metadata for the outcome table - meta = id_area_table( - source_dgdf._meta, - target_dgdf._meta, - source_id, - target_id, - spatial_index='auto', - ) - #---------------------------------------- - # Build output table - areas = dask_geopandas.GeoDataFrame( - graph, - name, - meta, - [None] * (len(dsk) + 1), - new_spatial_partitions - ) - # Merge chunks - out = target_dgdf[[target_id, 'geometry']] - ## Extensive --> Not implemented (DAB: the below does not match single-core) - if extensive_variables is not None: - weights = areas / ( - areas.groupby(source_id).transform('sum', meta=areas._meta) - ) - tmp = ( - weights - .join( - source_df.set_index(source_id)[extensive_variables], - on=source_id - ) - ) - for ev in extensive_variables: - tmp[ev] = tmp[[ev, 'weight']].prod(axis='columns') - out_extensive = tmp.groupby(target_id)[extensive_variables].sum() - out = out.join(out_extensive, on=target_id) - ''' - ## Intensive --> Weight by area of the chunk (Not implemented) - ## Categorical --> Add up proportions - if categorical_variables is not None: - out_categorical = ( - transferred - [category_vars] - .astype(float) - .groupby(transferred[target_id]) - .agg({v: 'sum' for v in category_vars}) - ) - out = out.join(out_categorical, on=target_id) - if categorical_frequency is True: - cols = out_categorical.columns.tolist() - out[cols] = out[cols].div( - out.area, axis='index' - ) - ''' - return out - -def id_area_table( - source_df, - target_df, - source_id, - target_id, - spatial_index='auto', -): - df1 = source_df.copy() - df2 = target_df.copy() - - # it is generally more performant to use the longer df as spatial index - if spatial_index == "auto": - if df1.shape[0] > df2.shape[0]: - spatial_index = "source" - else: - spatial_index = "target" - - if spatial_index == "source": - ids_tgt, ids_src = df1.sindex.query(df2.geometry, predicate="intersects") - elif spatial_index == "target": - ids_src, ids_tgt = df2.sindex.query(df1.geometry, predicate="intersects") - else: - raise ValueError( - f"'{spatial_index}' is not a valid option. Use 'auto', 'source' or 'target'." - ) - - areas = df1.geometry.values[ids_src].intersection(df2.geometry.values[ids_tgt]).area - - table = pandas.DataFrame({ - source_id: source_df[source_id].iloc[ids_src].values, - target_id: target_df[target_id].iloc[ids_tgt].values, - 'overlap': areas - }) - table['uid'] = table[source_id].astype(str) + '-' + table[source_id].astype(str) - return table.set_index('uid') \ No newline at end of file + \ No newline at end of file