Skip to content

Commit

Permalink
Factor out damages code common to events & RPs
Browse files Browse the repository at this point in the history
  • Loading branch information
thomas-fred committed Feb 16, 2024
1 parent 94e9fe9 commit 047be28
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 161 deletions.
104 changes: 103 additions & 1 deletion src/open_gira/direct_damages.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
"""

from abc import ABC, abstractmethod
import logging
import re
from typing import Union

import geopandas as gpd
import pandas as pd
import snkit
from scipy.interpolate import interp1d

from open_gira import fields
from open_gira.utils import natural_sort
Expand Down Expand Up @@ -288,3 +289,104 @@ def annotate_rehab_cost(edges: gpd.GeoDataFrame, network_type: str, rehab_cost:
raise ValueError(f"No lookup function available for {network_type=}")

return edges


def direct_damage(
exposure: gpd.GeoDataFrame,
damage_curves: dict[str, pd.DataFrame],
hazard_columns: list[str],
non_hazard_columns: list[str],
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
"""
Calculate direct damages for exposed edge assets. Take hazard intensity,
lookup damage fraction from a damage curve, multiply by a rehabilitation
cost.
Args:
exposure: Table containing exposed assets, likely edges split on a
raster grid (i.e. `edge_id` is not necessarily unique).
damage_curves: Relationship between hazard intensity and damage
fraction, keyed by `asset_type`.
hazard_columns: Columns in `exposure` which denote hazard intensities.
non_hazard_columns: Columns in `exposure` which do not denote hazard
intensities.
Returns:
Direct damage fraction, rows are splits of edges.
Direct damage cost, rows are edges and `edge_id` should now be unique.
"""

##########################################################
# DAMAGE FRACTIONS (per split geometry, for all rasters) #
##########################################################

# calculate damages for assets we have damage curves for
damage_fraction_by_asset_type = []
logging.info(f"Exposed assets {natural_sort(set(exposure.asset_type))}")
for asset_type in natural_sort(set(exposure.asset_type) & set(damage_curves.keys())):

damage_curve: pd.DataFrame = damage_curves[asset_type]

# pick out rows of asset type and columns of hazard intensity
asset_type_mask: gpd.GeoDataFrame = exposure.asset_type == asset_type
asset_exposure: pd.DataFrame = pd.DataFrame(exposure.loc[asset_type_mask, hazard_columns])

logging.info(f"Processing {asset_type=}, with n={len(asset_exposure)}")

# create interpolated damage curve for given asset type
hazard_intensity, damage_fraction = damage_curve.iloc[:, 0], damage_curve.iloc[:, 1]
# if curve of length n, where x < x_0, y = y_0 and where x > x_n, y = y_n
bounds = tuple(f(damage_fraction) for f in (min, max))
interpolated_damage_curve = interp1d(
hazard_intensity,
damage_fraction,
kind='linear',
fill_value=bounds,
bounds_error=False
)

# apply damage_curve function to exposure table
# the return value of interpolated_damage_curve is a numpy array
damage_fraction_for_asset_type = pd.DataFrame(
interpolated_damage_curve(asset_exposure),
index=asset_exposure.index,
columns=asset_exposure.columns
)

# store the computed direct damages and any columns we started with
# (other than exposure)
damage_fraction_by_asset_type.append(
pd.concat(
[
damage_fraction_for_asset_type,
exposure.loc[asset_type_mask, non_hazard_columns]
],
axis="columns"
)
)

# concatenate damage fractions for different asset types into single dataframe
damage_fraction: gpd.GeoDataFrame = gpd.GeoDataFrame(pd.concat(damage_fraction_by_asset_type))

###################################################################
# DAMAGE COST (for split, then grouped geometry, for all rasters) #
###################################################################

# multiply the damage fraction estimates by a cost to rebuild the asset
# units are: 1 * USD/km * km = USD
logging.info("Calculating direct damage costs")
direct_damages_only = damage_fraction[hazard_columns] \
.multiply(damage_fraction[fields.REHAB_COST], axis="index") \
.multiply(damage_fraction[fields.SPLIT_LENGTH], axis="index")

# join the other fields with the direct damage estimates
logging.info("Unifying rasterised segments and summing damage costs")

# grouping on edge_id, sum all direct damage estimates to give a total dollar cost per edge
direct_damages = pd.concat(
[direct_damages_only, damage_fraction["edge_id"]],
axis="columns"
).set_index("edge_id")
grouped_direct_damages = direct_damages.groupby(direct_damages.index).sum()

return damage_fraction, grouped_direct_damages
84 changes: 3 additions & 81 deletions workflow/transport-flood/event_set_direct_damages.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,10 @@
import warnings

import geopandas as gpd
import pandas as pd
from scipy.interpolate import interp1d

from open_gira import fields
from open_gira.direct_damages import annotate_rehab_cost
from open_gira.direct_damages import annotate_rehab_cost, direct_damage
from open_gira.io import write_empty_frames, read_damage_curves, read_rehab_costs
from open_gira.utils import natural_sort


if __name__ == "__main__":
Expand Down Expand Up @@ -69,87 +66,12 @@
hazard_columns = [col for col in exposure.columns if col.startswith(fields.HAZARD_PREFIX)]
non_hazard_columns = list(set(exposure.columns) - set(hazard_columns))

##########################################################
# DAMAGE FRACTIONS (per split geometry, for all rasters) #
##########################################################

# calculate damages for assets we have damage curves for
damage_fraction_by_asset_type = []
logging.info(f"Exposed assets {natural_sort(set(exposure.asset_type))}")
for asset_type in natural_sort(set(exposure.asset_type) & set(damage_curves.keys())):

damage_curve: pd.DataFrame = damage_curves[asset_type]

# pick out rows of asset type and columns of hazard intensity
asset_type_mask: gpd.GeoDataFrame = exposure.asset_type == asset_type
asset_exposure: pd.DataFrame = pd.DataFrame(exposure.loc[asset_type_mask, hazard_columns])

logging.info(f"Processing {asset_type=}, with n={len(asset_exposure)}")

# create interpolated damage curve for given asset type
hazard_intensity, damage_fraction = damage_curve.iloc[:, 0], damage_curve.iloc[:, 1]
# if curve of length n, where x < x_0, y = y_0 and where x > x_n, y = y_n
bounds = tuple(f(damage_fraction) for f in (min, max))
interpolated_damage_curve = interp1d(
hazard_intensity,
damage_fraction,
kind='linear',
fill_value=bounds,
bounds_error=False
)

# apply damage_curve function to exposure table
# the return value of interpolated_damage_curve is a numpy array
damage_fraction_for_asset_type = pd.DataFrame(
interpolated_damage_curve(asset_exposure),
index=asset_exposure.index,
columns=asset_exposure.columns
)

# store the computed direct damages and any columns we started with
# (other than exposure)
damage_fraction_by_asset_type.append(
pd.concat(
[
damage_fraction_for_asset_type,
exposure.loc[asset_type_mask, non_hazard_columns]
],
axis="columns"
)
)

# concatenate damage fractions for different asset types into single dataframe
damage_fraction: gpd.GeoDataFrame = gpd.GeoDataFrame(pd.concat(damage_fraction_by_asset_type))

###################################################################
# DAMAGE COST (for split, then grouped geometry, for all rasters) #
###################################################################

# multiply the damage fraction estimates by a cost to rebuild the asset
# units are: 1 * USD/km * km = USD
logging.info("Calculating direct damage costs")
direct_damages_only = damage_fraction[hazard_columns] \
.multiply(damage_fraction[fields.REHAB_COST], axis="index") \
.multiply(damage_fraction[fields.SPLIT_LENGTH], axis="index")
damage_fraction, grouped_direct_damages = direct_damage(exposure, damage_curves, hazard_columns, non_hazard_columns)

logging.info("Reading raw network data for unsplit geometry")
unsplit: gpd.GeoDataFrame = gpd.read_parquet(unsplit_path)
logging.info(f"{unsplit.shape=}")

# join the other fields with the direct damage estimates
logging.info("Unifying rasterised segments and summing damage costs")

# grouping on edge_id, sum all direct damage estimates to give a total dollar cost per edge
direct_damages = pd.concat(
[direct_damages_only, damage_fraction["edge_id"]],
axis="columns"
).set_index("edge_id")
grouped_direct_damages = direct_damages.groupby(direct_damages.index).sum()

#########################################
# JOINING, VALIDATION AND SERIALIZATION #
#########################################

# lose columns like "cell_indicies" or rastered length measures that are specific to _rastered_ edges
non_hazard_output_columns = list(set(non_hazard_columns) & set(unsplit.columns))
unsplit_subset = unsplit[non_hazard_output_columns].set_index("edge_id", drop=False)
Expand All @@ -174,4 +96,4 @@
logging.info(f"Writing out {direct_damages.shape=} (per unified geometry, event raster)")
direct_damages.to_parquet(damage_cost_path)

logging.info("Done calculating direct damages")
logging.info("Done calculating direct damages")
85 changes: 6 additions & 79 deletions workflow/transport-flood/return_period_direct_damages.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,11 @@

import geopandas as gpd
import pandas as pd
from scipy.interpolate import interp1d
from scipy.integrate import simpson

from open_gira import fields
from open_gira.direct_damages import ReturnPeriodMap, generate_rp_maps, annotate_rehab_cost
from open_gira.direct_damages import ReturnPeriodMap, generate_rp_maps, annotate_rehab_cost, direct_damage
from open_gira.io import write_empty_frames, read_damage_curves, read_rehab_costs
from open_gira.utils import natural_sort


if __name__ == "__main__":
Expand Down Expand Up @@ -75,82 +73,7 @@
hazard_columns = [col for col in exposure.columns if col.startswith(fields.HAZARD_PREFIX)]
non_hazard_columns = list(set(exposure.columns) - set(hazard_columns))

##########################################################
# DAMAGE FRACTIONS (per split geometry, for all rasters) #
##########################################################

# calculate damages for assets we have damage curves for
damage_fraction_by_asset_type = []
logging.info(f"Exposed assets {natural_sort(set(exposure.asset_type))}")
for asset_type in natural_sort(set(exposure.asset_type) & set(damage_curves.keys())):

damage_curve: pd.DataFrame = damage_curves[asset_type]

# pick out rows of asset type and columns of hazard intensity
asset_type_mask: gpd.GeoDataFrame = exposure.asset_type == asset_type
asset_exposure: pd.DataFrame = pd.DataFrame(exposure.loc[asset_type_mask, hazard_columns])

logging.info(f"Processing {asset_type=}, with n={len(asset_exposure)}")

# create interpolated damage curve for given asset type
hazard_intensity, damage_fraction = damage_curve.iloc[:, 0], damage_curve.iloc[:, 1]
# if curve of length n, where x < x_0, y = y_0 and where x > x_n, y = y_n
bounds = tuple(f(damage_fraction) for f in (min, max))
interpolated_damage_curve = interp1d(
hazard_intensity,
damage_fraction,
kind='linear',
fill_value=bounds,
bounds_error=False
)

# apply damage_curve function to exposure table
# the return value of interpolated_damage_curve is a numpy array
damage_fraction_for_asset_type = pd.DataFrame(
interpolated_damage_curve(asset_exposure),
index=asset_exposure.index,
columns=asset_exposure.columns
)

# store the computed direct damages and any columns we started with
# (other than exposure)
damage_fraction_by_asset_type.append(
pd.concat(
[
damage_fraction_for_asset_type,
exposure.loc[asset_type_mask, non_hazard_columns]
],
axis="columns"
)
)

# concatenate damage fractions for different asset types into single dataframe
damage_fraction: gpd.GeoDataFrame = gpd.GeoDataFrame(pd.concat(damage_fraction_by_asset_type))

###################################################################
# DAMAGE COST (for split, then grouped geometry, for all rasters) #
###################################################################

# multiply the damage fraction estimates by a cost to rebuild the asset
# units are: 1 * USD/km * km = USD
logging.info("Calculating direct damage costs")
direct_damages_only = damage_fraction[hazard_columns] \
.multiply(damage_fraction[fields.REHAB_COST], axis="index") \
.multiply(damage_fraction[fields.SPLIT_LENGTH], axis="index")

logging.info("Reading raw network data for unsplit geometry")
unsplit: gpd.GeoDataFrame = gpd.read_parquet(unsplit_path)
logging.info(f"{unsplit.shape=}")

# join the other fields with the direct damage estimates
logging.info("Unifying rasterised segments and summing damage costs")

# grouping on edge_id, sum all direct damage estimates to give a total dollar cost per edge
direct_damages = pd.concat(
[direct_damages_only, damage_fraction["edge_id"]],
axis="columns"
).set_index("edge_id")
grouped_direct_damages = direct_damages.groupby(direct_damages.index).sum()
damage_fraction, grouped_direct_damages = direct_damage(exposure, damage_curves, hazard_columns, non_hazard_columns)

###############################################################################
# EXPECTED ANNUAL DAMAGE COST (for grouped geometry, aggregations of rasters) #
Expand Down Expand Up @@ -201,6 +124,10 @@
# JOINING, VALIDATION AND SERIALIZATION #
#########################################

logging.info("Reading raw network data for unsplit geometry")
unsplit: gpd.GeoDataFrame = gpd.read_parquet(unsplit_path)
logging.info(f"{unsplit.shape=}")

# lose columns like "cell_indicies" or rastered length measures that are specific to _rastered_ edges
non_hazard_output_columns = list(set(non_hazard_columns) & set(unsplit.columns))
unsplit_subset = unsplit[non_hazard_output_columns].set_index("edge_id", drop=False)
Expand Down

0 comments on commit 047be28

Please sign in to comment.