Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

first pass at handling the new nhsn format for weekly data #3

Merged
merged 4 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 78 additions & 16 deletions src/iddata/loader.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
from itertools import product
from urllib.parse import urljoin

Expand Down Expand Up @@ -106,7 +107,7 @@ def load_one_us_census_file(self, f):
def load_us_census(self, fillna = True):
files = [
self._construct_data_raw_url("us-census/nst-est2019-alldata.csv"),
self._construct_data_raw_url("us-census/NST-EST2022-ALLDATA.csv")]
self._construct_data_raw_url("us-census/NST-EST2023-ALLDATA.csv")]
us_pops = pd.concat([self.load_one_us_census_file(f) for f in files], axis=0)

fips_mappings = pd.read_csv(self._construct_data_raw_url("fips-mappings/fips_mappings.csv"))
Expand All @@ -128,7 +129,7 @@ def load_us_census(self, fillna = True):

if fillna:
all_locations = dat["location"].unique()
all_seasons = [str(y) + "/" + str(y+1)[-2:] for y in range(1997, 2024)]
all_seasons = [str(y) + "/" + str(y+1)[-2:] for y in range(1997, 2025)]
full_result = pd.DataFrame.from_records(product(all_locations, all_seasons))
full_result.columns = ["location", "season"]
dat = full_result.merge(dat, how="left", on=["location", "season"]) \
Expand Down Expand Up @@ -303,21 +304,33 @@ def load_ilinet(self,


def load_nhsn(self, rates=True, drop_pandemic_seasons=True, as_of=None):
if drop_pandemic_seasons:
if as_of is None:
file_path = "influenza-hhs/hhs.csv"
else:
# find the largest stored file dated on or before the as_of date
as_of_file_path = f"influenza-hhs/hhs-{str(as_of)}.csv"
glob_results = s3fs.S3FileSystem(anon=True) \
.glob("infectious-disease-data/data-raw/influenza-hhs/hhs-????-??-??.csv")
all_file_paths = sorted([f[len("infectious-disease-data/data-raw/"):] for f in glob_results])
all_file_paths = [f for f in all_file_paths if f <= as_of_file_path]
file_path = all_file_paths[-1]
if not drop_pandemic_seasons:
raise NotImplementedError("Functionality for loading all seasons of NHSN data with specified as_of date is not implemented.")

if as_of is None:
as_of = datetime.date.today()

if isinstance(as_of, str):
as_of = datetime.date.fromisoformat(as_of)
Copy link
Member

@matthewcornell matthewcornell Nov 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wouldn't hurt to try/catch fromisoformat() in case the format is invalid. From https://github.com/reichlab/operational-models/blob/main/flu_ar2/main.py :

    try:
        today_date = datetime.date.fromisoformat(today_date)
    except (TypeError, ValueError):  # if today_date is None or a bad format
        today_date = datetime.date.today()
    reference_date = today_date + relativedelta.relativedelta(weekday=5)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to throw an error if this date is malformatted than to set it to a value the caller may not be expecting.


if as_of < datetime.date.fromisoformat("2024-11-15"):
return self.load_nhsn_from_hhs(rates=rates, as_of=as_of)
else:
if as_of is not None:
raise NotImplementedError("Functionality for loading all seasons of NHSN data with specified as_of date is not implemented.")
file_path = "influenza-hhs/hhs_complete.csv"
return self.load_nhsn_from_nhsn(
rates=rates,
as_of=as_of,
drop_pandemic_seasons=drop_pandemic_seasons
)


def load_nhsn_from_hhs(self, rates=True, as_of=None):
# find the largest stored file dated on or before the as_of date
as_of_file_path = f"influenza-hhs/hhs-{str(as_of)}.csv"
glob_results = s3fs.S3FileSystem(anon=True) \
.glob("infectious-disease-data/data-raw/influenza-hhs/hhs-????-??-??.csv")
all_file_paths = sorted([f[len("infectious-disease-data/data-raw/"):] for f in glob_results])
all_file_paths = [f for f in all_file_paths if f <= as_of_file_path]
file_path = all_file_paths[-1]

dat = pd.read_csv(self._construct_data_raw_url(file_path))
dat.rename(columns={"date": "wk_end_date"}, inplace=True)
Expand All @@ -340,6 +353,55 @@ def load_nhsn(self, rates=True, drop_pandemic_seasons=True, as_of=None):
return dat


def load_nhsn_from_nhsn(self, rates=True, as_of=None, drop_pandemic_seasons=True):
# find the largest stored file dated on or before the as_of date
as_of_file_path = f"influenza-nhsn/nhsn-{str(as_of)}.csv"
glob_results = s3fs.S3FileSystem(anon=True) \
.glob("infectious-disease-data/data-raw/influenza-nhsn/nhsn-????-??-??.csv")
all_file_paths = sorted([f[len("infectious-disease-data/data-raw/"):] for f in glob_results])
all_file_paths = [f for f in all_file_paths if f <= as_of_file_path]
file_path = all_file_paths[-1]

dat = pd.read_csv(self._construct_data_raw_url(file_path))
# Keeping Percent Hospitals Reporting field for now in case it's useful later.
dat = dat[["Geographic aggregation", "Week Ending Date", "Total Influenza Admissions", "Percent Hospitals Reporting Influenza Admissions"]]
dat.columns = ["abbreviation", "wk_end_date", "inc", "pct_report"]

# add us data
us_dat = (
dat
.groupby("wk_end_date")
["inc"]
.sum()
.reset_index()
)
us_dat["abbreviation"] = "US"
dat = pd.concat([dat, us_dat], axis=0)

# get to location codes/FIPS
fips_mappings = self.load_fips_mappings()
dat = dat.merge(fips_mappings, on=["abbreviation"], how="left")

ew_str = dat.apply(utils.date_to_ew_str, axis=1)
dat["season"] = utils.convert_epiweek_to_season(ew_str)
dat["season_week"] = utils.convert_epiweek_to_season_week(ew_str)
dat = dat.sort_values(by=["season", "season_week"])

if drop_pandemic_seasons:
dat.loc[dat["season"].isin(["2020/21", "2021/22"]), "inc"] = np.nan

if rates:
pops = self.load_us_census()
dat = dat.merge(pops, on = ["location", "season"], how="left") \
.assign(inc=lambda x: x["inc"] / x["pop"] * 100000)

dat["wk_end_date"] = pd.to_datetime(dat["wk_end_date"])

dat["agg_level"] = np.where(dat["location"] == "US", "national", "state")
dat = dat[["agg_level", "location", "season", "season_week", "wk_end_date", "inc"]]
dat["source"] = "nhsn"
return dat

def load_agg_transform_ilinet(self, fips_mappings, **ilinet_kwargs):
df_ilinet_full = self.load_ilinet(**ilinet_kwargs)
# df_ilinet_full.loc[df_ilinet_full['inc'] < np.exp(-7), 'inc'] = np.exp(-7)
Expand Down
4 changes: 2 additions & 2 deletions tests/iddata/unit/test_load_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def test_load_data_sources():

@pytest.mark.parametrize("test_kwargs, season_expected, wk_end_date_expected", [
(None, "2022/23", "2023-12-23"),
({"drop_pandemic_seasons": False}, "2019/20", "2023-12-23"),
# ({"drop_pandemic_seasons": False}, "2019/20", "2023-12-23"),
({"drop_pandemic_seasons": True, "as_of": datetime.date.fromisoformat("2023-12-30")},
"2022/23", "2023-12-23")
])
Expand All @@ -33,7 +33,7 @@ def test_load_data_nhsn_kwargs(test_kwargs, season_expected, wk_end_date_expecte

df = fdl.load_data(sources=["nhsn"], nhsn_kwargs=test_kwargs)

assert df["season"].min() == season_expected
assert df.dropna()["season"].min() == season_expected
wk_end_date_actual = str(df["wk_end_date"].max())[:10]
if test_kwargs is not None and "as_of" in test_kwargs:
assert wk_end_date_actual == wk_end_date_expected
Expand Down