From 047be2883398fd240e9991bc01cd454dc467d5a8 Mon Sep 17 00:00:00 2001 From: Fred Thomas Date: Fri, 16 Feb 2024 16:42:30 +0000 Subject: [PATCH] Factor out damages code common to events & RPs --- src/open_gira/direct_damages.py | 104 +++++++++++++++++- .../event_set_direct_damages.py | 84 +------------- .../return_period_direct_damages.py | 85 +------------- 3 files changed, 112 insertions(+), 161 deletions(-) diff --git a/src/open_gira/direct_damages.py b/src/open_gira/direct_damages.py index 6a73d30d..33c7edc2 100644 --- a/src/open_gira/direct_damages.py +++ b/src/open_gira/direct_damages.py @@ -3,12 +3,13 @@ """ from abc import ABC, abstractmethod +import logging import re from typing import Union import geopandas as gpd import pandas as pd -import snkit +from scipy.interpolate import interp1d from open_gira import fields from open_gira.utils import natural_sort @@ -288,3 +289,104 @@ def annotate_rehab_cost(edges: gpd.GeoDataFrame, network_type: str, rehab_cost: raise ValueError(f"No lookup function available for {network_type=}") return edges + + +def direct_damage( + exposure: gpd.GeoDataFrame, + damage_curves: dict[str, pd.DataFrame], + hazard_columns: list[str], + non_hazard_columns: list[str], +) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]: + """ + Calculate direct damages for exposed edge assets. Take hazard intensity, + lookup damage fraction from a damage curve, multiply by a rehabilitation + cost. + + Args: + exposure: Table containing exposed assets, likely edges split on a + raster grid (i.e. `edge_id` is not necessarily unique). + damage_curves: Relationship between hazard intensity and damage + fraction, keyed by `asset_type`. + hazard_columns: Columns in `exposure` which denote hazard intensities. + non_hazard_columns: Columns in `exposure` which do not denote hazard + intensities. + + Returns: + Direct damage fraction, rows are splits of edges. + Direct damage cost, rows are edges and `edge_id` should now be unique. + """ + + ########################################################## + # DAMAGE FRACTIONS (per split geometry, for all rasters) # + ########################################################## + + # calculate damages for assets we have damage curves for + damage_fraction_by_asset_type = [] + logging.info(f"Exposed assets {natural_sort(set(exposure.asset_type))}") + for asset_type in natural_sort(set(exposure.asset_type) & set(damage_curves.keys())): + + damage_curve: pd.DataFrame = damage_curves[asset_type] + + # pick out rows of asset type and columns of hazard intensity + asset_type_mask: gpd.GeoDataFrame = exposure.asset_type == asset_type + asset_exposure: pd.DataFrame = pd.DataFrame(exposure.loc[asset_type_mask, hazard_columns]) + + logging.info(f"Processing {asset_type=}, with n={len(asset_exposure)}") + + # create interpolated damage curve for given asset type + hazard_intensity, damage_fraction = damage_curve.iloc[:, 0], damage_curve.iloc[:, 1] + # if curve of length n, where x < x_0, y = y_0 and where x > x_n, y = y_n + bounds = tuple(f(damage_fraction) for f in (min, max)) + interpolated_damage_curve = interp1d( + hazard_intensity, + damage_fraction, + kind='linear', + fill_value=bounds, + bounds_error=False + ) + + # apply damage_curve function to exposure table + # the return value of interpolated_damage_curve is a numpy array + damage_fraction_for_asset_type = pd.DataFrame( + interpolated_damage_curve(asset_exposure), + index=asset_exposure.index, + columns=asset_exposure.columns + ) + + # store the computed direct damages and any columns we started with + # (other than exposure) + damage_fraction_by_asset_type.append( + pd.concat( + [ + damage_fraction_for_asset_type, + exposure.loc[asset_type_mask, non_hazard_columns] + ], + axis="columns" + ) + ) + + # concatenate damage fractions for different asset types into single dataframe + damage_fraction: gpd.GeoDataFrame = gpd.GeoDataFrame(pd.concat(damage_fraction_by_asset_type)) + + ################################################################### + # DAMAGE COST (for split, then grouped geometry, for all rasters) # + ################################################################### + + # multiply the damage fraction estimates by a cost to rebuild the asset + # units are: 1 * USD/km * km = USD + logging.info("Calculating direct damage costs") + direct_damages_only = damage_fraction[hazard_columns] \ + .multiply(damage_fraction[fields.REHAB_COST], axis="index") \ + .multiply(damage_fraction[fields.SPLIT_LENGTH], axis="index") + + # join the other fields with the direct damage estimates + logging.info("Unifying rasterised segments and summing damage costs") + + # grouping on edge_id, sum all direct damage estimates to give a total dollar cost per edge + direct_damages = pd.concat( + [direct_damages_only, damage_fraction["edge_id"]], + axis="columns" + ).set_index("edge_id") + grouped_direct_damages = direct_damages.groupby(direct_damages.index).sum() + + return damage_fraction, grouped_direct_damages \ No newline at end of file diff --git a/workflow/transport-flood/event_set_direct_damages.py b/workflow/transport-flood/event_set_direct_damages.py index 94a382d0..b6ca23a0 100644 --- a/workflow/transport-flood/event_set_direct_damages.py +++ b/workflow/transport-flood/event_set_direct_damages.py @@ -8,13 +8,10 @@ import warnings import geopandas as gpd -import pandas as pd -from scipy.interpolate import interp1d from open_gira import fields -from open_gira.direct_damages import annotate_rehab_cost +from open_gira.direct_damages import annotate_rehab_cost, direct_damage from open_gira.io import write_empty_frames, read_damage_curves, read_rehab_costs -from open_gira.utils import natural_sort if __name__ == "__main__": @@ -69,87 +66,12 @@ hazard_columns = [col for col in exposure.columns if col.startswith(fields.HAZARD_PREFIX)] non_hazard_columns = list(set(exposure.columns) - set(hazard_columns)) - ########################################################## - # DAMAGE FRACTIONS (per split geometry, for all rasters) # - ########################################################## - - # calculate damages for assets we have damage curves for - damage_fraction_by_asset_type = [] - logging.info(f"Exposed assets {natural_sort(set(exposure.asset_type))}") - for asset_type in natural_sort(set(exposure.asset_type) & set(damage_curves.keys())): - - damage_curve: pd.DataFrame = damage_curves[asset_type] - - # pick out rows of asset type and columns of hazard intensity - asset_type_mask: gpd.GeoDataFrame = exposure.asset_type == asset_type - asset_exposure: pd.DataFrame = pd.DataFrame(exposure.loc[asset_type_mask, hazard_columns]) - - logging.info(f"Processing {asset_type=}, with n={len(asset_exposure)}") - - # create interpolated damage curve for given asset type - hazard_intensity, damage_fraction = damage_curve.iloc[:, 0], damage_curve.iloc[:, 1] - # if curve of length n, where x < x_0, y = y_0 and where x > x_n, y = y_n - bounds = tuple(f(damage_fraction) for f in (min, max)) - interpolated_damage_curve = interp1d( - hazard_intensity, - damage_fraction, - kind='linear', - fill_value=bounds, - bounds_error=False - ) - - # apply damage_curve function to exposure table - # the return value of interpolated_damage_curve is a numpy array - damage_fraction_for_asset_type = pd.DataFrame( - interpolated_damage_curve(asset_exposure), - index=asset_exposure.index, - columns=asset_exposure.columns - ) - - # store the computed direct damages and any columns we started with - # (other than exposure) - damage_fraction_by_asset_type.append( - pd.concat( - [ - damage_fraction_for_asset_type, - exposure.loc[asset_type_mask, non_hazard_columns] - ], - axis="columns" - ) - ) - - # concatenate damage fractions for different asset types into single dataframe - damage_fraction: gpd.GeoDataFrame = gpd.GeoDataFrame(pd.concat(damage_fraction_by_asset_type)) - - ################################################################### - # DAMAGE COST (for split, then grouped geometry, for all rasters) # - ################################################################### - - # multiply the damage fraction estimates by a cost to rebuild the asset - # units are: 1 * USD/km * km = USD - logging.info("Calculating direct damage costs") - direct_damages_only = damage_fraction[hazard_columns] \ - .multiply(damage_fraction[fields.REHAB_COST], axis="index") \ - .multiply(damage_fraction[fields.SPLIT_LENGTH], axis="index") + damage_fraction, grouped_direct_damages = direct_damage(exposure, damage_curves, hazard_columns, non_hazard_columns) logging.info("Reading raw network data for unsplit geometry") unsplit: gpd.GeoDataFrame = gpd.read_parquet(unsplit_path) logging.info(f"{unsplit.shape=}") - # join the other fields with the direct damage estimates - logging.info("Unifying rasterised segments and summing damage costs") - - # grouping on edge_id, sum all direct damage estimates to give a total dollar cost per edge - direct_damages = pd.concat( - [direct_damages_only, damage_fraction["edge_id"]], - axis="columns" - ).set_index("edge_id") - grouped_direct_damages = direct_damages.groupby(direct_damages.index).sum() - - ######################################### - # JOINING, VALIDATION AND SERIALIZATION # - ######################################### - # lose columns like "cell_indicies" or rastered length measures that are specific to _rastered_ edges non_hazard_output_columns = list(set(non_hazard_columns) & set(unsplit.columns)) unsplit_subset = unsplit[non_hazard_output_columns].set_index("edge_id", drop=False) @@ -174,4 +96,4 @@ logging.info(f"Writing out {direct_damages.shape=} (per unified geometry, event raster)") direct_damages.to_parquet(damage_cost_path) - logging.info("Done calculating direct damages") + logging.info("Done calculating direct damages") \ No newline at end of file diff --git a/workflow/transport-flood/return_period_direct_damages.py b/workflow/transport-flood/return_period_direct_damages.py index c81f9546..95f73106 100644 --- a/workflow/transport-flood/return_period_direct_damages.py +++ b/workflow/transport-flood/return_period_direct_damages.py @@ -10,13 +10,11 @@ import geopandas as gpd import pandas as pd -from scipy.interpolate import interp1d from scipy.integrate import simpson from open_gira import fields -from open_gira.direct_damages import ReturnPeriodMap, generate_rp_maps, annotate_rehab_cost +from open_gira.direct_damages import ReturnPeriodMap, generate_rp_maps, annotate_rehab_cost, direct_damage from open_gira.io import write_empty_frames, read_damage_curves, read_rehab_costs -from open_gira.utils import natural_sort if __name__ == "__main__": @@ -75,82 +73,7 @@ hazard_columns = [col for col in exposure.columns if col.startswith(fields.HAZARD_PREFIX)] non_hazard_columns = list(set(exposure.columns) - set(hazard_columns)) - ########################################################## - # DAMAGE FRACTIONS (per split geometry, for all rasters) # - ########################################################## - - # calculate damages for assets we have damage curves for - damage_fraction_by_asset_type = [] - logging.info(f"Exposed assets {natural_sort(set(exposure.asset_type))}") - for asset_type in natural_sort(set(exposure.asset_type) & set(damage_curves.keys())): - - damage_curve: pd.DataFrame = damage_curves[asset_type] - - # pick out rows of asset type and columns of hazard intensity - asset_type_mask: gpd.GeoDataFrame = exposure.asset_type == asset_type - asset_exposure: pd.DataFrame = pd.DataFrame(exposure.loc[asset_type_mask, hazard_columns]) - - logging.info(f"Processing {asset_type=}, with n={len(asset_exposure)}") - - # create interpolated damage curve for given asset type - hazard_intensity, damage_fraction = damage_curve.iloc[:, 0], damage_curve.iloc[:, 1] - # if curve of length n, where x < x_0, y = y_0 and where x > x_n, y = y_n - bounds = tuple(f(damage_fraction) for f in (min, max)) - interpolated_damage_curve = interp1d( - hazard_intensity, - damage_fraction, - kind='linear', - fill_value=bounds, - bounds_error=False - ) - - # apply damage_curve function to exposure table - # the return value of interpolated_damage_curve is a numpy array - damage_fraction_for_asset_type = pd.DataFrame( - interpolated_damage_curve(asset_exposure), - index=asset_exposure.index, - columns=asset_exposure.columns - ) - - # store the computed direct damages and any columns we started with - # (other than exposure) - damage_fraction_by_asset_type.append( - pd.concat( - [ - damage_fraction_for_asset_type, - exposure.loc[asset_type_mask, non_hazard_columns] - ], - axis="columns" - ) - ) - - # concatenate damage fractions for different asset types into single dataframe - damage_fraction: gpd.GeoDataFrame = gpd.GeoDataFrame(pd.concat(damage_fraction_by_asset_type)) - - ################################################################### - # DAMAGE COST (for split, then grouped geometry, for all rasters) # - ################################################################### - - # multiply the damage fraction estimates by a cost to rebuild the asset - # units are: 1 * USD/km * km = USD - logging.info("Calculating direct damage costs") - direct_damages_only = damage_fraction[hazard_columns] \ - .multiply(damage_fraction[fields.REHAB_COST], axis="index") \ - .multiply(damage_fraction[fields.SPLIT_LENGTH], axis="index") - - logging.info("Reading raw network data for unsplit geometry") - unsplit: gpd.GeoDataFrame = gpd.read_parquet(unsplit_path) - logging.info(f"{unsplit.shape=}") - - # join the other fields with the direct damage estimates - logging.info("Unifying rasterised segments and summing damage costs") - - # grouping on edge_id, sum all direct damage estimates to give a total dollar cost per edge - direct_damages = pd.concat( - [direct_damages_only, damage_fraction["edge_id"]], - axis="columns" - ).set_index("edge_id") - grouped_direct_damages = direct_damages.groupby(direct_damages.index).sum() + damage_fraction, grouped_direct_damages = direct_damage(exposure, damage_curves, hazard_columns, non_hazard_columns) ############################################################################### # EXPECTED ANNUAL DAMAGE COST (for grouped geometry, aggregations of rasters) # @@ -201,6 +124,10 @@ # JOINING, VALIDATION AND SERIALIZATION # ######################################### + logging.info("Reading raw network data for unsplit geometry") + unsplit: gpd.GeoDataFrame = gpd.read_parquet(unsplit_path) + logging.info(f"{unsplit.shape=}") + # lose columns like "cell_indicies" or rastered length measures that are specific to _rastered_ edges non_hazard_output_columns = list(set(non_hazard_columns) & set(unsplit.columns)) unsplit_subset = unsplit[non_hazard_output_columns].set_index("edge_id", drop=False)