Skip to content

Commit

Permalink
Calibration and Validation Function Improvements (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
rileyhales authored Oct 24, 2022
1 parent e5c988c commit fee9466
Show file tree
Hide file tree
Showing 12 changed files with 745 additions and 437 deletions.
8 changes: 4 additions & 4 deletions docs/user-guide/visualization.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import saber as saber
workdir = '/path/to/project/directory/'
assign_table = saber.table.read(workdir)
drain_shape = '/my/file/path/'
saber.gis.clip_by_assignment(workdir, assign_table, drain_shape)
saber.gis.clip_by_cluster(workdir, assign_table, drain_shape)
saber.gis.clip_by_unassigned(workdir, assign_table, drain_shape)
saber.gis.map_by_reason(workdir, assign_table, drain_shape)
saber.gis.map_by_cluster(workdir, assign_table, drain_shape)
saber.gis.map_unassigned(workdir, assign_table, drain_shape)

# or if you have a specific set of ID's to check on
list_of_model_ids = [123, 456, 789]
saber.gis.clip_by_ids(workdir, list_of_model_ids, drain_shape)
saber.gis.map_ids(workdir, list_of_model_ids, drain_shape)
```

After this step, your project directory should look like this:
Expand Down
118 changes: 118 additions & 0 deletions examples/saber_script_long.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import logging
from multiprocessing import Pool

import geopandas as gpd
import numpy as np
import pandas as pd

import saber

np.seterr(all="ignore")

logging.basicConfig(
level=logging.INFO,
filename='',
filemode='a',
datefmt='%Y-%m-%d %X',
format='%(asctime)s: %(name)s - %(levelname)s - %(message)s'
)

if __name__ == "__main__":
logger = logging.getLogger(__name__)

# USER INPUTS - POPULATE THESE PATHS
workdir = ''
x_fdc_train = ''
x_fdc_all = ''
drain_gis = ''
gauge_data = ''
hindcast_zarr = ''
n_processes = 1 # number of processes to use for multiprocessing
# END USER INPUTS

logger.info('Generate Clusters')
x_fdc_train = pd.read_parquet(x_fdc_train).values
x_fdc_all = pd.read_parquet(x_fdc_all)
saber.cluster.generate(workdir, x=x_fdc_train)
saber.cluster.summarize_fit(workdir)
saber.cluster.calc_silhouette(workdir, x=x_fdc_train, n_clusters=range(2, 10))

logger.info('Create Plots')
saber.cluster.plot_clusters(workdir, x=x_fdc_train)
saber.cluster.plot_centers(workdir)
saber.cluster.plot_fit_metrics(workdir)
saber.cluster.plot_silhouettes(workdir)

# After this step, you should evaluate the many tables and figures that were generated by the clustering steps.
# Determine the number of clusters which best represents the modeled discharge data.

n_clusters = 5
saber.cluster.predict_labels(workdir, n_clusters, x=x_fdc_all)

# Generate assignments table
logger.info('Generate Assignment Table')
assign_df = saber.assign.generate(workdir)

# Assign gauged basins
logger.info('Assign Gauged Basins')
assign_df = saber.assign.assign_gauged(assign_df)
gauged_mids = assign_df[assign_df[saber.io.gid_col].notna()][saber.io.mid_col].values

with Pool(20) as p:
logger.info('Assign by Hydraulic Connectivity')
df_prop_down = pd.concat(p.starmap(saber.assign.map_propagate, [(assign_df, x, 'down') for x in gauged_mids]))
df_prop_up = pd.concat(p.starmap(saber.assign.map_propagate, [(assign_df, x, 'up') for x in gauged_mids]))
df_prop = pd.concat([df_prop_down, df_prop_up]).reset_index(drop=True)
df_prop = pd.concat(
p.starmap(saber.assign.map_resolve_props, [(df_prop, x) for x in df_prop[saber.io.mid_col].unique()])
)

logger.info('Resolve Propagation Assignments')
assign_df = saber.assign.assign_propagation(assign_df, df_prop)

logger.info('Assign Remaining Basins by Cluster, Spatial, and Physical Decisions')
for cluster_number in range(n_clusters):
logger.info(f'Assigning basins in cluster {cluster_number}')
# limit by cluster number
c_df = assign_df[assign_df[saber.io.clbl_col] == cluster_number]
# keep a list of the unassigned basins in the cluster
mids = c_df[c_df[saber.io.reason_col] == 'unassigned'][saber.io.mid_col].values
# filter cluster dataframe to find only gauged basins
c_df = c_df[c_df[saber.io.gid_col].notna()]
assign_df = pd.concat([
pd.concat(p.starmap(saber.assign.map_assign_ungauged, [(assign_df, c_df, x) for x in mids])),
assign_df[~assign_df[saber.io.mid_col].isin(mids)]
]).reset_index(drop=True)

# Cache the completed propagation tables for inspection later
logger.info('Caching Completed Tables')
saber.io.write_table(df_prop, workdir, 'prop_resolved')
saber.io.write_table(df_prop_down, workdir, 'prop_downstream')
saber.io.write_table(df_prop_up, workdir, 'prop_upstream')
saber.io.write_table(assign_df, workdir, 'assign_table')

logger.info('SABER Assignment Analysis Completed')

# Recommended Optional - Generate GIS files to visually inspect the assignments
logger.info('Generating GIS files')
drain_gis = gpd.read_file(drain_gis)
saber.gis.map_by_reason(workdir, assign_df, drain_gis)
saber.gis.map_by_cluster(workdir, assign_df, drain_gis)
saber.gis.map_unassigned(workdir, assign_df, drain_gis)

# Recommended Optional - Compute performance metrics
logger.info('Compute Performance Metrics')
saber.validate.mp_bootstrap(workdir, assign_df, gauge_data, hindcast_zarr, n_processes=n_processes)
saber.validate.bootstrap_figures(workdir)

# Optional - Compute the corrected simulation data
logger.info('Computing Bias Corrected Simulations')
with Pool(20) as p:
p.starmap(
saber.calibrate.map_saber,
[[mid, asgn_mid, asgn_gid, hindcast_zarr, gauge_data] for mid, asgn_mid, asgn_gid in
np.moveaxis(assign_df[[saber.io.mid_col, saber.io.asgn_mid_col, saber.io.gid_col]].values, 0, 0)]
)
logger.info('SABER Calibration Completed')

logger.info('SABER Completed')
55 changes: 55 additions & 0 deletions examples/saber_script_short.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import logging

import numpy as np
import pandas as pd

import saber

np.seterr(all="ignore")

log_path = '/Users/rchales/Projects/geoglows_saber/log.log'
logging.basicConfig(
level=logging.INFO,
filename=log_path,
filemode='a',
datefmt='%Y-%m-%d %X',
format='%(asctime)s: %(name)s - %(levelname)s - %(message)s'
)

if __name__ == "__main__":
logger = logging.getLogger(__name__)

# USER INPUTS - POPULATE THESE PATHS
workdir = ''
x_fdc_train = ''
x_fdc_all = ''
drain_gis = ''
gauge_data = ''
hindcast_zarr = ''
# END USER INPUTS

# Generate Clusters and Plots
logger.info('Create Clusters and Plots')
saber.cluster.cluster(workdir, x_fdc_train)
# Before continuing, review the clustering results and select the best n_clusters for the next function
saber.cluster.predict_labels(workdir, n_clusters=5, x=pd.read_parquet(x_fdc_all))

# Generate Assignments Table and Make Assignments
logger.info('Make Assignments of Ungauged Basins to Gauges')
assign_df = saber.assign.generate(workdir)
assign_df = saber.assign.mp_assign_all(workdir, assign_df)

# Recommended Optional - Generate GIS files to visually inspect the assignments
logger.info('Generating GIS files')
saber.gis.create_maps(workdir, assign_df, drain_gis)

# Recommended Optional - Compute performance metrics
logger.info('Compute Performance Metrics')
saber.validate.mp_bootstrap(workdir, assign_df, gauge_data, hindcast_zarr, n_processes=6)
saber.validate.bootstrap_figures(workdir)

# # Optional - Compute the Corrected Simulation Data
# logger.info('Compute Corrected Simulation Data')
# saber.calibrate.mp_saber(assign_df, hindcast_zarr, gauge_data)

logger.info('SABER Completed')
10 changes: 3 additions & 7 deletions saber/__init__.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
import saber.assign
import saber.calibrate
import saber.cluster
import saber.gis
import saber.io
import saber.prep
import saber.validate
from saber._calibrate import calibrate, calibrate_region

__all__ = [
# individual functions
'calibrate', 'calibrate_region',
# modules
'io', 'prep', 'cluster', 'assign', 'gis', 'validate',
'io', 'cluster', 'assign', 'gis', 'calibrate', 'validate',
]

__author__ = 'Riley C. Hales'
__version__ = '0.6.0'
__version__ = '0.7.0'
__license__ = 'BSD 3 Clause Clear'
66 changes: 60 additions & 6 deletions saber/assign.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from multiprocessing import Pool

import numpy as np
import pandas as pd
Expand All @@ -16,11 +17,64 @@
from .io import x_col
from .io import y_col

__all__ = ['generate', 'assign_gauged', 'map_propagate', 'map_resolve_propagations', 'map_assign_ungauged', ]
__all__ = ['mp_assign_all', 'generate', 'assign_gauged', 'map_propagate', 'map_resolve_props', 'map_assign_ungauged', ]

logger = logging.getLogger(__name__)


def mp_assign_all(workdir: str, assign_df: pd.DataFrame, n_processes: int or None = None) -> pd.DataFrame:
"""
Args:
workdir: path to the working directory
assign_df: the assignments table dataframe with the clustering labels already applied
n_processes: number of processes to use for multiprocessing, passed to Pool
Returns:
pd.DataFrame
"""
gauged_mids = assign_df[assign_df[gid_col].notna()][mid_col].values

# assign gauged basins
logger.info('Assigning Gauged Basins')
assign_df = assign_gauged(assign_df)

logger.info('Assigning by Hydraulic Connectivity')
with Pool(n_processes) as p:
logger.info('Finding Downstream Assignments')
df_prop_down = pd.concat(p.starmap(map_propagate, [(assign_df, x, 'down') for x in gauged_mids]))
logger.info('Caching Downstream Assignments')
write_table(df_prop_down, workdir, 'prop_downstream')

logger.info('Finding Upstream Assignments')
df_prop_up = pd.concat(p.starmap(map_propagate, [(assign_df, x, 'up') for x in gauged_mids]))
logger.info('Caching Upstream Assignments')
write_table(df_prop_up, workdir, 'prop_upstream')

logger.info('Resolving Propagation Assignments')
df_prop = pd.concat([df_prop_down, df_prop_up]).reset_index(drop=True)
df_prop = pd.concat(p.starmap(map_resolve_props, [(df_prop, x) for x in df_prop[mid_col].unique()]))
logger.info('Caching Propagation Assignments')
write_table(df_prop, workdir, 'prop_resolved')

logger.info('Assign Remaining Basins by Cluster, Spatial, and Physical Decisions')
for cluster_number in range(assign_df[clbl_col].max() + 1):
logger.info(f'Assigning basins in cluster {cluster_number}')
# limit by cluster number
c_df = assign_df[assign_df[clbl_col] == cluster_number]
# keep a list of the unassigned basins in the cluster
mids = c_df[c_df[reason_col] == 'unassigned'][mid_col].values
# filter cluster dataframe to find only gauged basins
c_df = c_df[c_df[gid_col].notna()]
assign_df = pd.concat([
pd.concat(p.starmap(map_assign_ungauged, [(assign_df, c_df, x) for x in mids])),
assign_df[~assign_df[mid_col].isin(mids)]
]).reset_index(drop=True)

logger.info('SABER Assignment Analysis Completed')
return assign_df


def generate(workdir: str, labels_df: pd.DataFrame = None, drain_table: pd.DataFrame = None,
gauge_table: pd.DataFrame = None, cache: bool = True) -> pd.DataFrame:
"""
Expand Down Expand Up @@ -100,7 +154,6 @@ def map_propagate(df: pd.DataFrame, start_mid: int, direction: str) -> pd.DataFr
Returns:
pd.DataFrame
"""
# logger.info(f'Prop {direction} from {start_mid}')
assigned_rows = []
start_order = df[df[mid_col] == start_mid][order_col].values[0]

Expand Down Expand Up @@ -151,7 +204,7 @@ def map_propagate(df: pd.DataFrame, start_mid: int, direction: str) -> pd.DataFr
return pd.DataFrame(columns=df.columns)


def map_resolve_propagations(df_props: pd.DataFrame, mid: str) -> pd.DataFrame:
def map_resolve_props(df_props: pd.DataFrame, mid: str) -> pd.DataFrame:
"""
Resolves the propagation assignments by choosing the assignment with the fewest steps
Expand All @@ -169,7 +222,7 @@ def map_resolve_propagations(df_props: pd.DataFrame, mid: str) -> pd.DataFrame:
df_mid['n_steps'] = df_mid['n_steps'].astype(float)
# sort by n_steps then by reason
df_mid = df_mid.sort_values(['n_steps', 'direction'], ascending=[True, True])
# return the first row which is the fewest steps and preferring downstream to upstream)
# return the first row which is the fewest steps and preferring downstream to upstream
return df_mid.head(1).drop(columns=['direction', 'n_steps'])


Expand Down Expand Up @@ -203,13 +256,14 @@ def map_assign_ungauged(assign_df: pd.DataFrame, gauges_df: np.array, mid: str)
a new row for the given mid with the assignments made
"""
try:
# todo account for physical parameters if they are included
# find the closest gauge using euclidean distance without accounting for projection/map distortion
mid_x, mid_y = assign_df.loc[assign_df[mid_col] == mid, [x_col, y_col]].values.flatten()
mid_x, mid_y = assign_df.loc[assign_df[mid_col] == mid, [x_col, y_col]].head(1).values.flatten()
row_idx_to_assign = pd.Series(
np.sqrt(np.power(gauges_df[x_col] - mid_x, 2) + np.power(gauges_df[y_col] - mid_y, 2))
).idxmin()

asgn_mid, asgn_gid = gauges_df.loc[row_idx_to_assign, [asgn_mid_col, asgn_gid_col]]
asgn_mid, asgn_gid = gauges_df.loc[row_idx_to_assign, [mid_col, gid_col]]
asgn_reason = f'cluster-{gauges_df[clbl_col].values[0]}'

new_row = assign_df[assign_df[mid_col] == mid].copy()
Expand Down
Loading

0 comments on commit fee9466

Please sign in to comment.