diff --git a/src/iddata/loader.py b/src/iddata/loader.py index 1fe573e..a1f4a1e 100644 --- a/src/iddata/loader.py +++ b/src/iddata/loader.py @@ -1,5 +1,3 @@ -import glob - from itertools import product from urllib.parse import urljoin @@ -10,71 +8,72 @@ from iddata import utils + class FluDataLoader(): def __init__(self) -> None: - self.data_raw = 'https://infectious-disease-data.s3.amazonaws.com/data-raw/' + self.data_raw = "https://infectious-disease-data.s3.amazonaws.com/data-raw/" def _construct_data_raw_url(self, relative_path): return urljoin(self.data_raw, relative_path) def load_fips_mappings(self): - return pd.read_csv(self._construct_data_raw_url('fips-mappings/fips_mappings.csv')) + return pd.read_csv(self._construct_data_raw_url("fips-mappings/fips_mappings.csv")) def load_flusurv_rates_2022_23(self): - dat = pd.read_csv(self._construct_data_raw_url('influenza-flusurv/flusurv-rates/flusurv-rates-2022-23.csv'), - encoding='ISO-8859-1', - engine='python') + dat = pd.read_csv(self._construct_data_raw_url("influenza-flusurv/flusurv-rates/flusurv-rates-2022-23.csv"), + encoding="ISO-8859-1", + engine="python") dat.columns = dat.columns.str.lower() - dat = dat.loc[(dat['age category'] == 'Overall') & - (dat['sex category'] == 'Overall') & - (dat['race category'] == 'Overall')] + dat = dat.loc[(dat["age category"] == "Overall") & + (dat["sex category"] == "Overall") & + (dat["race category"] == "Overall")] - dat = dat.loc[~((dat.catchment == 'Entire Network') & + dat = dat.loc[~((dat.catchment == "Entire Network") & (dat.network != "FluSurv-NET"))] - dat['location'] = dat['catchment'] - dat['agg_level'] = np.where(dat['location'] == 'Entire Network', 'national', 'site') - dat['season'] = dat['year'].str.replace('-', '/') - epiweek = dat['mmwr-year'].astype(str) + dat['mmwr-week'].astype(str) - dat['season_week'] = utils.convert_epiweek_to_season_week(epiweek) - dat['wk_end_date'] = dat.apply( - lambda x: pymmwr.epiweek_to_date(pymmwr.Epiweek(year=x['mmwr-year'], - week=x['mmwr-week'], + dat["location"] = dat["catchment"] + dat["agg_level"] = np.where(dat["location"] == "Entire Network", "national", "site") + dat["season"] = dat["year"].str.replace("-", "/") + epiweek = dat["mmwr-year"].astype(str) + dat["mmwr-week"].astype(str) + dat["season_week"] = utils.convert_epiweek_to_season_week(epiweek) + dat["wk_end_date"] = dat.apply( + lambda x: pymmwr.epiweek_to_date(pymmwr.Epiweek(year=x["mmwr-year"], + week=x["mmwr-week"], day=7)) .strftime("%Y-%m-%d"), axis=1) - dat['wk_end_date'] = pd.to_datetime(dat['wk_end_date']) - dat['inc'] = dat['weekly rate '] - dat = dat[['agg_level', 'location', 'season', 'season_week', 'wk_end_date', 'inc']] + dat["wk_end_date"] = pd.to_datetime(dat["wk_end_date"]) + dat["inc"] = dat["weekly rate "] + dat = dat[["agg_level", "location", "season", "season_week", "wk_end_date", "inc"]] return dat def load_flusurv_rates_base(self, seasons=None, - locations=['California', 'Colorado', 'Connecticut', 'Entire Network', - 'Georgia', 'Maryland', 'Michigan', 'Minnesota', 'New Mexico', - 'New York - Albany', 'New York - Rochester', 'Ohio', 'Oregon', - 'Tennessee', 'Utah'], - age_labels=['0-4 yr', '5-17 yr', '18-49 yr', '50-64 yr', '65+ yr', 'Overall'] + locations=["California", "Colorado", "Connecticut", "Entire Network", + "Georgia", "Maryland", "Michigan", "Minnesota", "New Mexico", + "New York - Albany", "New York - Rochester", "Ohio", "Oregon", + "Tennessee", "Utah"], + age_labels=["0-4 yr", "5-17 yr", "18-49 yr", "50-64 yr", "65+ yr", "Overall"] ): # read flusurv data and do some minimal preprocessing - dat = pd.read_csv(self._construct_data_raw_url('influenza-flusurv/flusurv-rates/old-flusurv-rates.csv'), - encoding='ISO-8859-1', - engine='python') + dat = pd.read_csv(self._construct_data_raw_url("influenza-flusurv/flusurv-rates/old-flusurv-rates.csv"), + encoding="ISO-8859-1", + engine="python") dat.columns = dat.columns.str.lower() - dat['season'] = dat.sea_label.str.replace('-', '/') - dat['inc'] = dat.weeklyrate - dat['location'] = dat['region'] - dat['agg_level'] = np.where(dat['location'] == 'Entire Network', 'national', 'site') + dat["season"] = dat.sea_label.str.replace("-", "/") + dat["inc"] = dat.weeklyrate + dat["location"] = dat["region"] + dat["agg_level"] = np.where(dat["location"] == "Entire Network", "national", "site") dat = dat[dat.age_label.isin(age_labels)] - dat = dat.sort_values(by=['wk_end']) + dat = dat.sort_values(by=["wk_end"]) - dat['wk_end_date'] = pd.to_datetime(dat['wk_end']) - dat = dat[['agg_level', 'location', 'season', 'season_week', 'wk_end_date', 'inc']] + dat["wk_end_date"] = pd.to_datetime(dat["wk_end"]) + dat = dat[["agg_level", "location", "season", "season_week", "wk_end_date", "inc"]] # add in data from 2022/23 season dat = pd.concat( @@ -86,57 +85,57 @@ def load_flusurv_rates_base(self, if seasons is not None: dat = dat[dat.season.isin(seasons)] - dat['source'] = 'flusurvnet' + dat["source"] = "flusurvnet" return dat def load_one_us_census_file(self, f): - dat = pd.read_csv(f, engine='python', dtype={'STATE': str}) - dat = dat.loc[(dat['NAME'] == 'United States') | (dat['STATE'] != '00'), - (dat.columns == 'STATE') | (dat.columns.str.startswith('POPESTIMATE'))] - dat = dat.melt(id_vars = 'STATE', var_name='season', value_name='pop') - dat.rename(columns={'STATE': 'location'}, inplace=True) - dat.loc[dat['location'] == '00', 'location'] = 'US' - dat['season'] = dat['season'].str[-4:] - dat['season'] = dat['season'] + '/' + (dat['season'].str[-2:].astype(int) + 1).astype(str) + dat = pd.read_csv(f, engine="python", dtype={"STATE": str}) + dat = dat.loc[(dat["NAME"] == "United States") | (dat["STATE"] != "00"), + (dat.columns == "STATE") | (dat.columns.str.startswith("POPESTIMATE"))] + dat = dat.melt(id_vars = "STATE", var_name="season", value_name="pop") + dat.rename(columns={"STATE": "location"}, inplace=True) + dat.loc[dat["location"] == "00", "location"] = "US" + dat["season"] = dat["season"].str[-4:] + dat["season"] = dat["season"] + "/" + (dat["season"].str[-2:].astype(int) + 1).astype(str) return dat def load_us_census(self, fillna = True): files = [ - self._construct_data_raw_url('us-census/nst-est2019-alldata.csv'), - self._construct_data_raw_url('us-census/NST-EST2022-ALLDATA.csv')] + self._construct_data_raw_url("us-census/nst-est2019-alldata.csv"), + self._construct_data_raw_url("us-census/NST-EST2022-ALLDATA.csv")] us_pops = pd.concat([self.load_one_us_census_file(f) for f in files], axis=0) - fips_mappings = pd.read_csv(self._construct_data_raw_url('fips-mappings/fips_mappings.csv')) + fips_mappings = pd.read_csv(self._construct_data_raw_url("fips-mappings/fips_mappings.csv")) hhs_pops = us_pops.query("location != 'US'") \ .merge( fips_mappings.query("location != 'US'") \ - .assign(hhs_region=lambda x: 'Region ' + x['hhs_region'].astype(int).astype(str)), - on='location', - how = 'left' + .assign(hhs_region=lambda x: "Region " + x["hhs_region"].astype(int).astype(str)), + on="location", + how = "left" ) \ - .groupby(['hhs_region', 'season']) \ - ['pop'] \ + .groupby(["hhs_region", "season"]) \ + ["pop"] \ .sum() \ .reset_index() \ - .rename(columns={'hhs_region': 'location'}) + .rename(columns={"hhs_region": "location"}) dat = pd.concat([us_pops, hhs_pops], axis=0) if fillna: - all_locations = dat['location'].unique() - all_seasons = [str(y) + '/' + str(y+1)[-2:] for y in range(1997, 2024)] + all_locations = dat["location"].unique() + all_seasons = [str(y) + "/" + str(y+1)[-2:] for y in range(1997, 2024)] full_result = pd.DataFrame.from_records(product(all_locations, all_seasons)) - full_result.columns = ['location', 'season'] - dat = full_result.merge(dat, how='left', on=['location', 'season']) \ - .set_index('location') \ - .groupby(['location']) \ + full_result.columns = ["location", "season"] + dat = full_result.merge(dat, how="left", on=["location", "season"]) \ + .set_index("location") \ + .groupby(["location"]) \ .bfill() \ - .groupby(['location']) \ + .groupby(["location"]) \ .ffill() \ .reset_index() @@ -145,130 +144,130 @@ def load_us_census(self, fillna = True): def load_hosp_burden(self): burden_estimates = pd.read_csv( - self._construct_data_raw_url('burden-estimates/burden-estimates.csv'), - engine='python') + self._construct_data_raw_url("burden-estimates/burden-estimates.csv"), + engine="python") - burden_estimates.columns = ['season', 'hosp_burden'] + burden_estimates.columns = ["season", "hosp_burden"] - burden_estimates['season'] = burden_estimates['season'].str[:4] + '/' + burden_estimates['season'].str[7:9] + burden_estimates["season"] = burden_estimates["season"].str[:4] + "/" + burden_estimates["season"].str[7:9] return burden_estimates def calc_hosp_burden_adj(self): dat = self.load_flusurv_rates_base( - seasons = ['20' + str(yy) + '/' + str(yy+1) for yy in range(10, 23)], - locations= ['Entire Network'], - age_labels = ['Overall'] + seasons = ["20" + str(yy) + "/" + str(yy+1) for yy in range(10, 23)], + locations= ["Entire Network"], + age_labels = ["Overall"] ) - burden_adj = dat[dat.location == 'Entire Network'] \ - .groupby('season')['inc'] \ + burden_adj = dat[dat.location == "Entire Network"] \ + .groupby("season")["inc"] \ .sum() burden_adj = burden_adj.reset_index() - burden_adj.columns = ['season', 'cum_rate'] + burden_adj.columns = ["season", "cum_rate"] - us_census = self.load_us_census().query("location == 'US'").drop('location', axis=1) - burden_adj = pd.merge(burden_adj, us_census, on='season') + us_census = self.load_us_census().query("location == 'US'").drop("location", axis=1) + burden_adj = pd.merge(burden_adj, us_census, on="season") burden_estimates = self.load_hosp_burden() - burden_adj = pd.merge(burden_adj, burden_estimates, on='season') + burden_adj = pd.merge(burden_adj, burden_estimates, on="season") - burden_adj['reported_burden_est'] = burden_adj['cum_rate'] * burden_adj['pop'] / 100000 - burden_adj['adj_factor'] = burden_adj['hosp_burden'] / burden_adj['reported_burden_est'] + burden_adj["reported_burden_est"] = burden_adj["cum_rate"] * burden_adj["pop"] / 100000 + burden_adj["adj_factor"] = burden_adj["hosp_burden"] / burden_adj["reported_burden_est"] return burden_adj def fill_missing_flusurv_dates_one_location(self, location_df): - df = location_df.set_index('wk_end_date') \ - .asfreq('W-sat') \ + df = location_df.set_index("wk_end_date") \ + .asfreq("W-sat") \ .reset_index() - fill_cols = ['agg_level', 'location', 'season', 'pop', 'source'] + fill_cols = ["agg_level", "location", "season", "pop", "source"] fill_cols = [c for c in fill_cols if c in df.columns] - df[fill_cols] = df[fill_cols].fillna(axis=0, method='ffill') + df[fill_cols] = df[fill_cols].fillna(axis=0, method="ffill") return df def load_flusurv_rates(self, burden_adj=True, - locations=['California', 'Colorado', 'Connecticut', 'Entire Network', - 'Georgia', 'Maryland', 'Michigan', 'Minnesota', 'New Mexico', - 'New York - Albany', 'New York - Rochester', 'Ohio', 'Oregon', - 'Tennessee', 'Utah'] + locations=["California", "Colorado", "Connecticut", "Entire Network", + "Georgia", "Maryland", "Michigan", "Minnesota", "New Mexico", + "New York - Albany", "New York - Rochester", "Ohio", "Oregon", + "Tennessee", "Utah"] ): # read flusurv data and do some minimal preprocessing dat = self.load_flusurv_rates_base( - seasons = ['20' + str(yy) + '/' + str(yy+1) for yy in range(10, 23)], + seasons = ["20" + str(yy) + "/" + str(yy+1) for yy in range(10, 23)], locations = locations, - age_labels = ['Overall'] + age_labels = ["Overall"] ) # if requested, make adjustments for overall season burden if burden_adj: hosp_burden_adj = self.calc_hosp_burden_adj() - dat = pd.merge(dat, hosp_burden_adj, on='season') - dat['inc'] = dat['inc'] * dat['adj_factor'] + dat = pd.merge(dat, hosp_burden_adj, on="season") + dat["inc"] = dat["inc"] * dat["adj_factor"] # fill in missing dates - gd = dat.groupby('location') + gd = dat.groupby("location") dat = pd.concat( [self.fill_missing_flusurv_dates_one_location(df) for _, df in gd], axis = 0) - dat = dat[['agg_level', 'location', 'season', 'season_week', 'wk_end_date', 'inc', 'source']] + dat = dat[["agg_level", "location", "season", "season_week", "wk_end_date", "inc", "source"]] return dat def load_who_nrevss_positive(self): - dat = pd.read_csv(self._construct_data_raw_url('influenza-who-nrevss/who-nrevss.csv'), - encoding='ISO-8859-1', - engine='python') - dat = dat[['region_type', 'region', 'year', 'week', 'season', 'season_week', 'percent_positive']] + dat = pd.read_csv(self._construct_data_raw_url("influenza-who-nrevss/who-nrevss.csv"), + encoding="ISO-8859-1", + engine="python") + dat = dat[["region_type", "region", "year", "week", "season", "season_week", "percent_positive"]] - dat.rename(columns={'region_type': 'agg_level', 'region': 'location'}, + dat.rename(columns={"region_type": "agg_level", "region": "location"}, inplace=True) - dat['agg_level'] = np.where(dat['agg_level'] == 'National', - 'national', - dat['agg_level'].str[:-1].str.lower()) + dat["agg_level"] = np.where(dat["agg_level"] == "National", + "national", + dat["agg_level"].str[:-1].str.lower()) return dat def load_ilinet(self, - response_type='rate', + response_type="rate", scale_to_positive=True, drop_pandemic_seasons=True, burden_adj=False): # read ilinet data and do some minimal preprocessing - files = [self._construct_data_raw_url('influenza-ilinet/ilinet.csv'), - self._construct_data_raw_url('influenza-ilinet/ilinet_hhs.csv'), - self._construct_data_raw_url('influenza-ilinet/ilinet_state.csv')] + files = [self._construct_data_raw_url("influenza-ilinet/ilinet.csv"), + self._construct_data_raw_url("influenza-ilinet/ilinet_hhs.csv"), + self._construct_data_raw_url("influenza-ilinet/ilinet_state.csv")] dat = pd.concat( - [ pd.read_csv(f, encoding='ISO-8859-1', engine='python') for f in files ], + [ pd.read_csv(f, encoding="ISO-8859-1", engine="python") for f in files ], axis = 0) - if response_type == 'rate': - dat['inc'] = np.where(dat['region_type'] == 'States', - dat['unweighted_ili'], - dat['weighted_ili']) + if response_type == "rate": + dat["inc"] = np.where(dat["region_type"] == "States", + dat["unweighted_ili"], + dat["weighted_ili"]) else: - dat['inc'] = dat.ilitotal + dat["inc"] = dat.ilitotal - dat['wk_end_date'] = pd.to_datetime(dat['week_start']) + pd.Timedelta(6, 'days') - dat = dat[['region_type', 'region', 'year', 'week', 'season', 'season_week', 'wk_end_date', 'inc']] + dat["wk_end_date"] = pd.to_datetime(dat["week_start"]) + pd.Timedelta(6, "days") + dat = dat[["region_type", "region", "year", "week", "season", "season_week", "wk_end_date", "inc"]] - dat.rename(columns={'region_type': 'agg_level', 'region': 'location'}, + dat.rename(columns={"region_type": "agg_level", "region": "location"}, inplace=True) - dat['agg_level'] = np.where(dat['agg_level'] == 'National', - 'national', - dat['agg_level'].str[:-1].str.lower()) - dat = dat.sort_values(by=['season', 'season_week']) + dat["agg_level"] = np.where(dat["agg_level"] == "National", + "national", + dat["agg_level"].str[:-1].str.lower()) + dat = dat.sort_values(by=["season", "season_week"]) # for early seasons, drop out-of-season weeks with no reporting - early_seasons = [str(yyyy) + '/' + str(yyyy + 1)[2:] for yyyy in range(1997, 2002)] + early_seasons = [str(yyyy) + "/" + str(yyyy + 1)[2:] for yyyy in range(1997, 2002)] early_in_season_weeks = [w for w in range(10, 43)] - first_report_season = ['2002/03'] + first_report_season = ["2002/03"] first_report_in_season_weeks = [w for w in range(10, 53)] dat = dat[ (dat.season.isin(early_seasons) & dat.season_week.isin(early_in_season_weeks)) | @@ -277,20 +276,20 @@ def load_ilinet(self, # region 10 data prior to 2010/11 is bad, drop it dat = dat[ - ~((dat['location'] == 'Region 10') & (dat['season'] < '2010/11')) + ~((dat["location"] == "Region 10") & (dat["season"] < "2010/11")) ] if scale_to_positive: dat = pd.merge( left=dat, right=self.load_who_nrevss_positive(), - how='left', - on=['agg_level', 'location', 'season', 'season_week']) - dat['inc'] = dat['inc'] * dat['percent_positive'] / 100.0 - dat.drop('percent_positive', axis=1) + how="left", + on=["agg_level", "location", "season", "season_week"]) + dat["inc"] = dat["inc"] * dat["percent_positive"] / 100.0 + dat.drop("percent_positive", axis=1) if drop_pandemic_seasons: - dat.loc[dat['season'].isin(['2008/09', '2009/10', '2020/21', '2021/22']), 'inc'] = np.nan + dat.loc[dat["season"].isin(["2008/09", "2009/10", "2020/21", "2021/22"]), "inc"] = np.nan # if requested, make adjustments for overall season burden # if burden_adj: @@ -298,83 +297,83 @@ def load_ilinet(self, # dat = pd.merge(dat, hosp_burden_adj, on='season') # dat['inc'] = dat['inc'] * dat['adj_factor'] - dat = dat[['agg_level', 'location', 'season', 'season_week', 'wk_end_date', 'inc']] - dat['source'] = 'ilinet' + dat = dat[["agg_level", "location", "season", "season_week", "wk_end_date", "inc"]] + dat["source"] = "ilinet" return dat def load_hhs(self, rates=True, drop_pandemic_seasons=True, as_of=None): if drop_pandemic_seasons: if as_of is None: - file_path = 'influenza-hhs/hhs.csv' + file_path = "influenza-hhs/hhs.csv" else: # find the largest stored file dated on or before the as_of date - as_of_file_path = f'influenza-hhs/hhs-{str(as_of)}.csv' + as_of_file_path = f"influenza-hhs/hhs-{str(as_of)}.csv" glob_results = s3fs.S3FileSystem(anon=False) \ - .glob('infectious-disease-data/data-raw/influenza-hhs/hhs-????-??-??.csv') - all_file_paths = sorted([f[len('infectious-disease-data/data-raw/'):] for f in glob_results]) + .glob("infectious-disease-data/data-raw/influenza-hhs/hhs-????-??-??.csv") + all_file_paths = sorted([f[len("infectious-disease-data/data-raw/"):] for f in glob_results]) all_file_paths = [f for f in all_file_paths if f <= as_of_file_path] file_path = all_file_paths[-1] else: if as_of is not None: - raise NotImplementedError('Functionality for loading all seasons of HHS data with specified as_of date is not implemented.') - file_path = 'influenza-hhs/hhs_complete.csv' + raise NotImplementedError("Functionality for loading all seasons of HHS data with specified as_of date is not implemented.") + file_path = "influenza-hhs/hhs_complete.csv" dat = pd.read_csv(self._construct_data_raw_url(file_path)) - dat.rename(columns={'date': 'wk_end_date'}, inplace=True) + dat.rename(columns={"date": "wk_end_date"}, inplace=True) ew_str = dat.apply(utils.date_to_ew_str, axis=1) - dat['season'] = utils.convert_epiweek_to_season(ew_str) - dat['season_week'] = utils.convert_epiweek_to_season_week(ew_str) - dat = dat.sort_values(by=['season', 'season_week']) + dat["season"] = utils.convert_epiweek_to_season(ew_str) + dat["season_week"] = utils.convert_epiweek_to_season_week(ew_str) + dat = dat.sort_values(by=["season", "season_week"]) if rates: pops = self.load_us_census() - dat = dat.merge(pops, on = ['location', 'season'], how='left') \ - .assign(inc=lambda x: x['inc'] / x['pop'] * 100000) + dat = dat.merge(pops, on = ["location", "season"], how="left") \ + .assign(inc=lambda x: x["inc"] / x["pop"] * 100000) - dat['wk_end_date'] = pd.to_datetime(dat['wk_end_date']) + dat["wk_end_date"] = pd.to_datetime(dat["wk_end_date"]) - dat['agg_level'] = np.where(dat['location'] == 'US', 'national', 'state') - dat = dat[['agg_level', 'location', 'season', 'season_week', 'wk_end_date', 'inc']] - dat['source'] = 'hhs' + dat["agg_level"] = np.where(dat["location"] == "US", "national", "state") + dat = dat[["agg_level", "location", "season", "season_week", "wk_end_date", "inc"]] + dat["source"] = "hhs" return dat def load_agg_transform_ilinet(self, fips_mappings, **ilinet_kwargs): df_ilinet_full = self.load_ilinet(**ilinet_kwargs) # df_ilinet_full.loc[df_ilinet_full['inc'] < np.exp(-7), 'inc'] = np.exp(-7) - df_ilinet_full['inc'] = (df_ilinet_full['inc'] + np.exp(-7)) * 4 + df_ilinet_full["inc"] = (df_ilinet_full["inc"] + np.exp(-7)) * 4 # aggregate ilinet sites in New York to state level, # mainly to facilitate adding populations - ilinet_nonstates = ['National', 'Region 1', 'Region 2', 'Region 3', - 'Region 4', 'Region 5', 'Region 6', 'Region 7', - 'Region 8', 'Region 9', 'Region 10'] + ilinet_nonstates = ["National", "Region 1", "Region 2", "Region 3", + "Region 4", "Region 5", "Region 6", "Region 7", + "Region 8", "Region 9", "Region 10"] df_ilinet_by_state = df_ilinet_full \ - .loc[(~df_ilinet_full['location'].isin(ilinet_nonstates)) & - (df_ilinet_full['location'] != '78')] \ - .assign(state = lambda x: np.where(x['location'].isin(['New York', 'New York City']), - 'New York', - x['location'])) \ - .assign(state = lambda x: np.where(x['state'] == 'Commonwealth of the Northern Mariana Islands', - 'Northern Mariana Islands', - x['state'])) \ + .loc[(~df_ilinet_full["location"].isin(ilinet_nonstates)) & + (df_ilinet_full["location"] != "78")] \ + .assign(state = lambda x: np.where(x["location"].isin(["New York", "New York City"]), + "New York", + x["location"])) \ + .assign(state = lambda x: np.where(x["state"] == "Commonwealth of the Northern Mariana Islands", + "Northern Mariana Islands", + x["state"])) \ .merge( - fips_mappings.rename(columns={'location': 'fips'}), - left_on='state', - right_on='location_name') \ - .groupby(['state', 'fips', 'season', 'season_week', 'wk_end_date', 'source']) \ - .apply(lambda x: pd.DataFrame({'inc': [np.mean(x['inc'])]})) \ + fips_mappings.rename(columns={"location": "fips"}), + left_on="state", + right_on="location_name") \ + .groupby(["state", "fips", "season", "season_week", "wk_end_date", "source"]) \ + .apply(lambda x: pd.DataFrame({"inc": [np.mean(x["inc"])]})) \ .reset_index() \ - .drop(columns = ['state', 'level_6']) \ - .rename(columns = {'fips': 'location'}) \ - .assign(agg_level = 'state') - - df_ilinet_nonstates = df_ilinet_full.loc[df_ilinet_full['location'].isin(ilinet_nonstates)].copy() - df_ilinet_nonstates['location'] = np.where(df_ilinet_nonstates['location'] == 'National', - 'US', - df_ilinet_nonstates['location']) + .drop(columns = ["state", "level_6"]) \ + .rename(columns = {"fips": "location"}) \ + .assign(agg_level = "state") + + df_ilinet_nonstates = df_ilinet_full.loc[df_ilinet_full["location"].isin(ilinet_nonstates)].copy() + df_ilinet_nonstates["location"] = np.where(df_ilinet_nonstates["location"] == "National", + "US", + df_ilinet_nonstates["location"]) df_ilinet = pd.concat( [df_ilinet_nonstates, df_ilinet_by_state], axis = 0) @@ -385,28 +384,28 @@ def load_agg_transform_ilinet(self, fips_mappings, **ilinet_kwargs): def load_agg_transform_flusurv(self, fips_mappings, **flusurvnet_kwargs): df_flusurv_by_site = self.load_flusurv_rates(**flusurvnet_kwargs) # df_flusurv_by_site.loc[df_flusurv_by_site['inc'] < np.exp(-3), 'inc'] = np.exp(-3) - df_flusurv_by_site['inc'] = (df_flusurv_by_site['inc'] + np.exp(-3)) / 2.5 + df_flusurv_by_site["inc"] = (df_flusurv_by_site["inc"] + np.exp(-3)) / 2.5 # aggregate flusurv sites in New York to state level, # mainly to facilitate adding populations df_flusurv_by_state = df_flusurv_by_site \ - .loc[df_flusurv_by_site['location'] != 'Entire Network'] \ - .assign(state = lambda x: np.where(x['location'].isin(['New York - Albany', 'New York - Rochester']), - 'New York', - x['location'])) \ + .loc[df_flusurv_by_site["location"] != "Entire Network"] \ + .assign(state = lambda x: np.where(x["location"].isin(["New York - Albany", "New York - Rochester"]), + "New York", + x["location"])) \ .merge( - fips_mappings.rename(columns={'location': 'fips'}), - left_on='state', - right_on='location_name') \ - .groupby(['fips', 'season', 'season_week', 'wk_end_date', 'source']) \ - .apply(lambda x: pd.DataFrame({'inc': [np.mean(x['inc'])]})) \ + fips_mappings.rename(columns={"location": "fips"}), + left_on="state", + right_on="location_name") \ + .groupby(["fips", "season", "season_week", "wk_end_date", "source"]) \ + .apply(lambda x: pd.DataFrame({"inc": [np.mean(x["inc"])]})) \ .reset_index() \ - .drop(columns = ['level_5']) \ - .rename(columns = {'fips': 'location'}) \ - .assign(agg_level = 'state') + .drop(columns = ["level_5"]) \ + .rename(columns = {"fips": "location"}) \ + .assign(agg_level = "state") - df_flusurv_us = df_flusurv_by_site.loc[df_flusurv_by_site['location'] == 'Entire Network'].copy() - df_flusurv_us['location'] = 'US' + df_flusurv_us = df_flusurv_by_site.loc[df_flusurv_by_site["location"] == "Entire Network"].copy() + df_flusurv_us["location"] = "US" df_flusurv = pd.concat( [df_flusurv_us, df_flusurv_by_state], axis = 0) @@ -415,8 +414,8 @@ def load_agg_transform_flusurv(self, fips_mappings, **flusurvnet_kwargs): def load_data(self, sources=None, flusurvnet_kwargs=None, hhs_kwargs=None, ilinet_kwargs=None, - power_transform='4rt'): - ''' + power_transform="4rt"): + """ Load influenza data and transform to a scale suitable for input to models. Parameters @@ -432,9 +431,9 @@ def load_data(self, sources=None, flusurvnet_kwargs=None, hhs_kwargs=None, iline Returns ------- Pandas DataFrame - ''' + """ if sources is None: - sources = ['flusurvnet', 'hhs', 'ilinet'] + sources = ["flusurvnet", "hhs", "ilinet"] if flusurvnet_kwargs is None: flusurvnet_kwargs = {} @@ -445,35 +444,35 @@ def load_data(self, sources=None, flusurvnet_kwargs=None, hhs_kwargs=None, iline if ilinet_kwargs is None: ilinet_kwargs = {} - if power_transform not in ['4rt', None]: + if power_transform not in ["4rt", None]: raise ValueError('Only None and "4rt" are supported for the power_transform argument.') us_census = self.load_us_census() - fips_mappings = pd.read_csv(self._construct_data_raw_url('fips-mappings/fips_mappings.csv')) + fips_mappings = pd.read_csv(self._construct_data_raw_url("fips-mappings/fips_mappings.csv")) - if 'hhs' in sources: + if "hhs" in sources: df_hhs = self.load_hhs(**hhs_kwargs) - df_hhs['inc'] = df_hhs['inc'] + 0.75**4 + df_hhs["inc"] = df_hhs["inc"] + 0.75**4 else: df_hhs = None - if 'ilinet' in sources: + if "ilinet" in sources: df_ilinet = self.load_agg_transform_ilinet(fips_mappings=fips_mappings, **ilinet_kwargs) else: df_ilinet = None - if 'flusurvnet' in sources: + if "flusurvnet" in sources: df_flusurv = self.load_agg_transform_flusurv(fips_mappings=fips_mappings, **flusurvnet_kwargs) else: df_flusurv = None df = pd.concat( [df_hhs, df_ilinet, df_flusurv], - axis=0).sort_values(['source', 'location', 'wk_end_date']) + axis=0).sort_values(["source", "location", "wk_end_date"]) # log population - df = df.merge(us_census, how='left', on=['location', 'season']) - df['log_pop'] = np.log(df['pop']) + df = df.merge(us_census, how="left", on=["location", "season"]) + df["log_pop"] = np.log(df["pop"]) # process response variable: # - fourth root transform to stabilize variability @@ -481,26 +480,26 @@ def load_data(self, sources=None, flusurvnet_kwargs=None, hhs_kwargs=None, iline # - center relative to location- and source- specific mean # (note non-standard order of center/scale) if power_transform is None: - df['inc_trans'] = df['inc'] + 0.01 - elif power_transform == '4rt': - df['inc_trans'] = (df['inc'] + 0.01)**0.25 + df["inc_trans"] = df["inc"] + 0.01 + elif power_transform == "4rt": + df["inc_trans"] = (df["inc"] + 0.01)**0.25 - df['inc_trans_scale_factor'] = df \ + df["inc_trans_scale_factor"] = df \ .assign( - inc_trans_in_season = lambda x: np.where((x['season_week'] < 10) | (x['season_week'] > 45), + inc_trans_in_season = lambda x: np.where((x["season_week"] < 10) | (x["season_week"] > 45), np.nan, - x['inc_trans'])) \ - .groupby(['source', 'location'])['inc_trans_in_season'] \ + x["inc_trans"])) \ + .groupby(["source", "location"])["inc_trans_in_season"] \ .transform(lambda x: x.quantile(0.95)) - df['inc_trans_cs'] = df['inc_trans'] / (df['inc_trans_scale_factor'] + 0.01) - df['inc_trans_center_factor'] = df \ + df["inc_trans_cs"] = df["inc_trans"] / (df["inc_trans_scale_factor"] + 0.01) + df["inc_trans_center_factor"] = df \ .assign( - inc_trans_cs_in_season = lambda x: np.where((x['season_week'] < 10) | (x['season_week'] > 45), + inc_trans_cs_in_season = lambda x: np.where((x["season_week"] < 10) | (x["season_week"] > 45), np.nan, - x['inc_trans_cs'])) \ - .groupby(['source', 'location'])['inc_trans_cs_in_season'] \ + x["inc_trans_cs"])) \ + .groupby(["source", "location"])["inc_trans_cs_in_season"] \ .transform(lambda x: x.mean()) - df['inc_trans_cs'] = df['inc_trans_cs'] - df['inc_trans_center_factor'] + df["inc_trans_cs"] = df["inc_trans_cs"] - df["inc_trans_center_factor"] return(df) \ No newline at end of file diff --git a/src/iddata/utils.py b/src/iddata/utils.py index 6589320..f8f1f25 100644 --- a/src/iddata/utils.py +++ b/src/iddata/utils.py @@ -2,12 +2,11 @@ import numpy as np import pandas as pd -from pandas.tseries.holiday import USFederalHolidayCalendar - import pymmwr +from pandas.tseries.holiday import USFederalHolidayCalendar -def date_to_ew_str(row, date_col_name='wk_end_date'): +def date_to_ew_str(row, date_col_name="wk_end_date"): ew = pymmwr.date_to_epiweek(datetime.date.fromisoformat(row[date_col_name])) # ew_str = pd.Series(str(ew.year) + str(ew.week)) ew_str = str(ew.year) + str(ew.week) @@ -47,7 +46,7 @@ def convert_epiweek_to_season(epiweek): update_inds = (epiweek_week <= 30) epiweek_year = epiweek_year - update_inds season = epiweek_year.astype(str) - season = season + '/' + (season.str[-2:].astype(int) + 1).astype(str) + season = season + "/" + (season.str[-2:].astype(int) + 1).astype(str) return season @@ -66,10 +65,10 @@ def get_season_hol(start_year): return_name=True) hol = hol.reset_index() - hol.columns = ['date', 'holiday'] - hol = hol.loc[hol['holiday'].isin(['Thanksgiving Day', 'Christmas Day'])] + hol.columns = ["date", "holiday"] + hol = hol.loc[hol["holiday"].isin(["Thanksgiving Day", "Christmas Day"])] - hol['season'] = str(start_year) + '/' + str(start_year + 1)[-2:] + hol["season"] = str(start_year) + "/" + str(start_year + 1)[-2:] return hol @@ -77,6 +76,6 @@ def get_season_hol(start_year): def get_holidays(): hol = pd.concat([get_season_hol(sy) for sy in range(1997, 2024)], ignore_index=True) - hol['season_week'] = hol.apply(convert_datetime_to_season_week, axis=1, date_col_name='date') + hol["season_week"] = hol.apply(convert_datetime_to_season_week, axis=1, date_col_name="date") - return hol[['season', 'holiday', 'date', 'season_week']] + return hol[["season", "holiday", "date", "season_week"]] diff --git a/tests/iddata/unit/test_load_data.py b/tests/iddata/unit/test_load_data.py index 45eb799..2bacb49 100644 --- a/tests/iddata/unit/test_load_data.py +++ b/tests/iddata/unit/test_load_data.py @@ -1,39 +1,41 @@ +import datetime + +import numpy as np import pytest from iddata.loader import FluDataLoader -import numpy as np -import datetime + def test_load_data_sources(): fdl = FluDataLoader() sources_options = [ - ['hhs'], - ['hhs', 'ilinet'], - ['flusurvnet'], - ['flusurvnet', 'hhs', 'ilinet'] + ["hhs"], + ["hhs", "ilinet"], + ["flusurvnet"], + ["flusurvnet", "hhs", "ilinet"] ] for sources in sources_options: df = fdl.load_data(sources=sources) - assert set(df['source'].unique()) == set(sources) + assert set(df["source"].unique()) == set(sources) df = fdl.load_data() - assert set(df['source'].unique()) == {'flusurvnet', 'hhs', 'ilinet'} + assert set(df["source"].unique()) == {"flusurvnet", "hhs", "ilinet"} @pytest.mark.parametrize("test_kwargs, season_expected, wk_end_date_expected", [ - (None, '2022/23', '2023-12-23'), - ({'drop_pandemic_seasons': False}, '2019/20', '2023-12-23'), - ({'drop_pandemic_seasons': True, 'as_of': datetime.date.fromisoformat('2023-12-30')}, - '2022/23', '2023-12-23') + (None, "2022/23", "2023-12-23"), + ({"drop_pandemic_seasons": False}, "2019/20", "2023-12-23"), + ({"drop_pandemic_seasons": True, "as_of": datetime.date.fromisoformat("2023-12-30")}, + "2022/23", "2023-12-23") ]) def test_load_data_hhs_kwargs(test_kwargs, season_expected, wk_end_date_expected): fdl = FluDataLoader() - df = fdl.load_data(sources=['hhs'], hhs_kwargs=test_kwargs) + df = fdl.load_data(sources=["hhs"], hhs_kwargs=test_kwargs) - assert df['season'].min() == season_expected - wk_end_date_actual = str(df['wk_end_date'].max())[:10] - if test_kwargs is not None and 'as_of' in test_kwargs: + assert df["season"].min() == season_expected + wk_end_date_actual = str(df["wk_end_date"].max())[:10] + if test_kwargs is not None and "as_of" in test_kwargs: assert wk_end_date_actual == wk_end_date_expected else: assert wk_end_date_actual > wk_end_date_expected @@ -41,32 +43,32 @@ def test_load_data_hhs_kwargs(test_kwargs, season_expected, wk_end_date_expected @pytest.mark.parametrize("test_kwargs, expect_all_na", [ (None, True), - ({'drop_pandemic_seasons': False}, False), - ({'drop_pandemic_seasons': True}, True) + ({"drop_pandemic_seasons": False}, False), + ({"drop_pandemic_seasons": True}, True) ]) def test_load_data_ilinet_kwargs(test_kwargs, expect_all_na): fdl = FluDataLoader() - df = fdl.load_data(sources=['ilinet'], ilinet_kwargs=test_kwargs) + df = fdl.load_data(sources=["ilinet"], ilinet_kwargs=test_kwargs) if expect_all_na: - assert np.all(df.loc[df['season'].isin(['2008/09', '2009/10', '2020/21', '2021/22']), 'inc'].isna()) + assert np.all(df.loc[df["season"].isin(["2008/09", "2009/10", "2020/21", "2021/22"]), "inc"].isna()) else: # expect some non-NA values in pandemic seasons - assert np.any(~df.loc[df['season'].isin(['2008/09', '2009/10', '2020/21', '2021/22']), 'inc'].isna()) + assert np.any(~df.loc[df["season"].isin(["2008/09", "2009/10", "2020/21", "2021/22"]), "inc"].isna()) @pytest.mark.parametrize("test_kwargs", [ (None), - ({'locations': ['California', 'Colorado', 'Connecticut']}) + ({"locations": ["California", "Colorado", "Connecticut"]}) ]) def test_load_data_flusurvnet_kwargs(test_kwargs): fdl = FluDataLoader() #flusurv_kwargs - df = fdl.load_data(sources=['flusurvnet'], flusurvnet_kwargs=test_kwargs) + df = fdl.load_data(sources=["flusurvnet"], flusurvnet_kwargs=test_kwargs) if test_kwargs is None: - assert len(df['location'].unique()) > 3 + assert len(df["location"].unique()) > 3 else: - assert len(df['location'].unique()) == len(test_kwargs['locations']) + assert len(df["location"].unique()) == len(test_kwargs["locations"])