Skip to content

Commit

Permalink
Update utilities for NAN codes:
Browse files Browse the repository at this point in the history
* update export utility to export, validate, and test the missing cols
* add a nancodes utility, which contains NAN constants
* add deletion coding to the archiver and make it expect missing cols
  • Loading branch information
dshemetov committed Apr 20, 2021
1 parent 45ea5f1 commit 773fe08
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 30 deletions.
19 changes: 14 additions & 5 deletions _delphi_utils_python/delphi_utils/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@
from git import Repo
from git.refs.head import Head
import pandas as pd
import numpy as np

from .utils import read_params
from .logger import get_structured_logger
from .nancodes import Nans

Files = List[str]
FileDiffMap = Dict[str, Optional[str]]
Expand Down Expand Up @@ -73,8 +75,10 @@ def diff_export_csv(
changed_df is the pd.DataFrame of common rows from after_csv with changed values.
added_df is the pd.DataFrame of added rows from after_csv.
"""
export_csv_dtypes = {"geo_id": str, "val": float,
"se": float, "sample_size": float}
export_csv_dtypes = {
"geo_id": str, "val": float, "se": float, "sample_size": float,
"missing_val": int, "missing_se":int, "missing_sample_size": int
}

before_df = pd.read_csv(before_csv, dtype=export_csv_dtypes)
before_df.set_index("geo_id", inplace=True)
Expand All @@ -93,8 +97,13 @@ def diff_export_csv(
same_mask = before_df_cmn == after_df_cmn
same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn)

# Code deleted entries as nans with the deleted missing code
deleted_df = before_df.loc[deleted_idx, :].copy()
deleted_df[["val", "se", "sample_size"]] = np.nan
deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED

return (
before_df.loc[deleted_idx, :],
deleted_df,
after_df_cmn.loc[~(same_mask.all(axis=1)), :],
after_df.loc[added_idx, :])

Expand Down Expand Up @@ -227,11 +236,11 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]:

deleted_df, changed_df, added_df = diff_export_csv(
before_file, after_file)
new_issues_df = pd.concat([changed_df, added_df], axis=0)
new_issues_df = pd.concat([deleted_df, changed_df, added_df], axis=0)

if len(deleted_df) > 0:
print(
f"Warning, diff has deleted indices in {after_file} that will be ignored")
f"Diff has deleted indices in {after_file} that have been coded as nans.")

# Write the diffs to diff_file, if applicable
if len(new_issues_df) > 0:
Expand Down
63 changes: 61 additions & 2 deletions _delphi_utils_python/delphi_utils/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,53 @@
from datetime import datetime
from os.path import join
from typing import Optional
import logging

import numpy as np
import pandas as pd

from .nancodes import Nans

def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None):
"""Find values with contradictory missingness codes, filter them, and log."""
val_contradictory_missing_mask = (
(df["val"].isna() & df["missing_val"].eq(Nans.NOT_MISSING))
|
(df["val"].notna() & df["missing_val"].ne(Nans.NOT_MISSING))
)
se_contradictory_missing_mask = (
(df["se"].isna() & df["missing_se"].eq(Nans.NOT_MISSING))
|
(df["se"].notna() & df["missing_se"].ne(Nans.NOT_MISSING))
)
sample_size_contradictory_missing_mask = (
(df["sample_size"].isna() & df["missing_sample_size"].eq(Nans.NOT_MISSING))
|
(df["sample_size"].notna() & df["missing_sample_size"].ne(Nans.NOT_MISSING))
)
if df.loc[val_contradictory_missing_mask].size > 0:
if not logger is None:
logger.info(
"Filtering contradictory missing code in " +
"{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d"))
)
df = df.loc[~val_contradictory_missing_mask]
if df.loc[se_contradictory_missing_mask].size > 0:
if not logger is None:
logger.info(
"Filtering contradictory missing code in " +
"{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d"))
)
df = df.loc[~se_contradictory_missing_mask]
if df.loc[sample_size_contradictory_missing_mask].size > 0:
if not logger is None:
logger.info(
"Filtering contradictory missing code in " +
"{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d"))
)
df = df.loc[~se_contradictory_missing_mask]
return df

def create_export_csv(
df: pd.DataFrame,
export_dir: str,
Expand All @@ -15,7 +58,8 @@ def create_export_csv(
metric: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
remove_null_samples: Optional[bool] = False
remove_null_samples: Optional[bool] = False,
logger: Optional[logging.Logger] = None
):
"""Export data in the format expected by the Delphi API.
Expand All @@ -39,6 +83,8 @@ def create_export_csv(
Latest date to export or None if no maximum date restrictions should be applied.
remove_null_samples: Optional[bool]
Whether to remove entries whose sample sizes are null.
logger: Optional[logging.Logger]
Pass a logger object here to log information about contradictory missing codes.
Returns
---------
Expand All @@ -64,7 +110,20 @@ def create_export_csv(
else:
export_filename = f"{date.strftime('%Y%m%d')}_{geo_res}_{metric}_{sensor}.csv"
export_file = join(export_dir, export_filename)
export_df = df[df["timestamp"] == date][["geo_id", "val", "se", "sample_size",]]
expected_columns = [
"geo_id",
"val",
"se",
"sample_size",
"missing_val",
"missing_se",
"missing_sample_size"
]
export_df = df[df["timestamp"] == date].filter(items=expected_columns)
if "missing_val" in export_df.columns:
export_df = filter_contradicting_missing_codes(
export_df, sensor, metric, date, logger=logger
)
if remove_null_samples:
export_df = export_df[export_df["sample_size"].notnull()]
export_df = export_df.round({"val": 7, "se": 7})
Expand Down
91 changes: 69 additions & 22 deletions _delphi_utils_python/tests/test_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,45 @@
import pytest

from delphi_utils.archive import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer,\
archiver_from_params
archiver_from_params, Nans

CSV_DTYPES = {"geo_id": str, "val": float, "se": float, "sample_size": float}
CSV_DTYPES = {
"geo_id": str, "val": float, "se": float, "sample_size": float,
"missing_val": int, "missing_se":int, "missing_sample_size": int
}

CSVS_BEFORE = {
# Common
"csv0": pd.DataFrame({
"geo_id": ["1", "2", "3"],
"val": [1.000000001, 2.00000002, 3.00000003],
"se": [0.1, 0.2, 0.3],
"sample_size": [10.0, 20.0, 30.0]}),
"sample_size": [10.0, 20.0, 30.0],
"missing_val": [Nans.NOT_MISSING] * 3,
"missing_se": [Nans.NOT_MISSING] * 3,
"missing_sample_size": [Nans.NOT_MISSING] * 3,
}),

"csv1": pd.DataFrame({
"geo_id": ["1", "2", "3"],
"val": [1.0, 2.0, 3.0],
"se": [np.nan, 0.20000002, 0.30000003],
"sample_size": [10.0, 20.0, 30.0]}),
"sample_size": [10.0, 20.0, 30.0],
"missing_val": [Nans.NOT_MISSING] * 3,
"missing_se": [Nans.NOT_MISSING] * 3,
"missing_sample_size": [Nans.NOT_MISSING] * 3,
}),

# Deleted
"csv2": pd.DataFrame({
"geo_id": ["1"],
"val": [1.0],
"se": [0.1],
"sample_size": [10.0]}),
"sample_size": [10.0],
"missing_val": [Nans.NOT_MISSING],
"missing_se": [Nans.NOT_MISSING],
"missing_sample_size": [Nans.NOT_MISSING],
}),
}

CSVS_AFTER = {
Expand All @@ -45,20 +60,32 @@
"geo_id": ["1", "2", "3"],
"val": [1.0, 2.0, 3.0],
"se": [0.10000001, 0.20000002, 0.30000003],
"sample_size": [10.0, 20.0, 30.0]}),
"sample_size": [10.0, 20.0, 30.0],
"missing_val": [Nans.NOT_MISSING] * 3,
"missing_se": [Nans.NOT_MISSING] * 3,
"missing_sample_size": [Nans.NOT_MISSING] * 3,
}),

"csv1": pd.DataFrame({
"geo_id": ["1", "2", "4"],
"val": [1.0, 2.1, 4.0],
"se": [np.nan, 0.21, np.nan],
"sample_size": [10.0, 21.0, 40.0]}),
"sample_size": [10.0, 21.0, 40.0],
"missing_val": [Nans.NOT_MISSING] * 3,
"missing_se": [Nans.NOT_MISSING] * 3,
"missing_sample_size": [Nans.NOT_MISSING] * 3,
}),

# Added
"csv3": pd.DataFrame({
"geo_id": ["2"],
"val": [2.0000002],
"se": [0.2],
"sample_size": [20.0]}),
"sample_size": [20.0],
"missing_val": [Nans.NOT_MISSING],
"missing_se": [Nans.NOT_MISSING],
"missing_sample_size": [Nans.NOT_MISSING],
}),
}


Expand All @@ -80,10 +107,14 @@ def test_diff_and_filter_exports(self, tmp_path):
mkdir(export_dir)

csv1_diff = pd.DataFrame({
"geo_id": ["2", "4"],
"val": [2.1, 4.0],
"se": [0.21, np.nan],
"sample_size": [21.0, 40.0]})
"geo_id": ["3", "2", "4"],
"val": [np.nan, 2.1, 4.0],
"se": [np.nan, 0.21, np.nan],
"sample_size": [np.nan, 21.0, 40.0],
"missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
})

arch_diff = ArchiveDiffer(cache_dir, export_dir)

Expand Down Expand Up @@ -261,10 +292,14 @@ def test_run(self, tmp_path, s3_client):
# Check exports directory just has incremental changes
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"}
csv1_diff = pd.DataFrame({
"geo_id": ["2", "4"],
"val": [2.1, 4.0],
"se": [0.21, np.nan],
"sample_size": [21.0, 40.0]})
"geo_id": ["3", "2", "4"],
"val": [np.nan, 2.1, 4.0],
"se": [np.nan, 0.21, np.nan],
"sample_size": [np.nan, 21.0, 40.0],
"missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
})
assert_frame_equal(
pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES),
csv1_diff)
Expand Down Expand Up @@ -346,7 +381,11 @@ def test_diff_exports(self, tmp_path):
"geo_id": ["1", "2", "3"],
"val": [1.0, 2.0, 3.0],
"se": [0.1, 0.2, 0.3],
"sample_size": [10.0, 20.0, 30.0]})
"sample_size": [10.0, 20.0, 30.0],
"missing_val": [Nans.NOT_MISSING] * 3,
"missing_se": [Nans.NOT_MISSING] * 3,
"missing_sample_size": [Nans.NOT_MISSING] * 3,
})

# Write exact same CSV into cache and export, so no diffs expected
csv1.to_csv(join(cache_dir, "csv1.csv"), index=False)
Expand Down Expand Up @@ -383,7 +422,11 @@ def test_archive_exports(self, tmp_path):
"geo_id": ["1", "2", "3"],
"val": [1.0, 2.0, 3.0],
"se": [0.1, 0.2, 0.3],
"sample_size": [10.0, 20.0, 30.0]})
"sample_size": [10.0, 20.0, 30.0],
"missing_val": [Nans.NOT_MISSING] * 3,
"missing_se": [Nans.NOT_MISSING] * 3,
"missing_sample_size": [Nans.NOT_MISSING] * 3,
})

# csv1.csv is now a dirty edit in the repo, and to be exported too
csv1.to_csv(join(cache_dir, "csv1.csv"), index=False)
Expand Down Expand Up @@ -462,10 +505,14 @@ def test_run(self, tmp_path):
# Check exports directory just has incremental changes
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"}
csv1_diff = pd.DataFrame({
"geo_id": ["2", "4"],
"val": [2.1, 4.0],
"se": [0.21, np.nan],
"sample_size": [21.0, 40.0]})
"geo_id": ["3", "2", "4"],
"val": [np.nan, 2.1, 4.0],
"se": [np.nan, 0.21, np.nan],
"sample_size": [np.nan, 21.0, 40.0],
"missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
})
assert_frame_equal(
pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES),
csv1_diff)
Expand Down
Loading

0 comments on commit 773fe08

Please sign in to comment.