diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 5a589e53a..a3af113e1 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.2.25 +current_version = 0.3.0 commit = True message = chore: bump covidcast-indicators to {new_version} tag = False diff --git a/_delphi_utils_python/.bumpversion.cfg b/_delphi_utils_python/.bumpversion.cfg index 2d63919ca..8ce49f20b 100644 --- a/_delphi_utils_python/.bumpversion.cfg +++ b/_delphi_utils_python/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.2.11 +current_version = 0.3.0 commit = True message = chore: bump delphi_utils to {new_version} tag = False diff --git a/_delphi_utils_python/delphi_utils/__init__.py b/_delphi_utils_python/delphi_utils/__init__.py index 2475e9ac4..898e40e4b 100644 --- a/_delphi_utils_python/delphi_utils/__init__.py +++ b/_delphi_utils_python/delphi_utils/__init__.py @@ -15,4 +15,4 @@ from .nancodes import Nans from .weekday import Weekday -__version__ = "0.2.11" +__version__ = "0.3.0" diff --git a/_delphi_utils_python/delphi_utils/validator/dynamic.py b/_delphi_utils_python/delphi_utils/validator/dynamic.py index 07e0a2599..92bd790db 100644 --- a/_delphi_utils_python/delphi_utils/validator/dynamic.py +++ b/_delphi_utils_python/delphi_utils/validator/dynamic.py @@ -110,6 +110,8 @@ def validate(self, all_frames, report): self.check_max_allowed_max_date( max_date, geo_type, signal_type, report) + self.check_na_vals(geo_sig_df, geo_type, signal_type, report) + # Get relevant reference data from API dictionary. api_df_or_error = all_api_df[(geo_type, signal_type)] @@ -168,6 +170,43 @@ def validate(self, all_frames, report): if self.test_mode and kroc == 2: break + def check_na_vals(self, geo_sig_df, geo_type, signal_type, report): + """Check if there are any NA values. + + In particular, make sure that error doesn't occur for new Geo IDs introduced. + + Arguments: + - geo_type: str; geo type name (county, msa, hrr, state) as in the CSV name + - signal_type: str; signal name as in the CSV name + - report: ValidationReport; report where results are added + + Returns: + - None + """ + def replace_first_six(df, start_date): + x = df.val.isnull() + # First 6 days have to be null + x.iloc[:6] = False + df = df[x] + return df.time_value[df.time_value >= start_date] + + grouped_df = geo_sig_df.groupby('geo_id') + error_df = grouped_df.apply(replace_first_six, + start_date = self.params.time_window.start_date) + + if not error_df.empty: + for index, value in error_df.iteritems(): + report.add_raised_error( + ValidationFailure("check_val_missing", + geo_type=geo_type, + signal=signal_type, + date=value, + message=f"geo_id {index[0]}" + ) + ) + + report.increment_total_checks() + def check_min_allowed_max_date(self, max_date, geo_type, signal_type, report): """Check if time since data was generated is reasonable or too long ago. diff --git a/_delphi_utils_python/delphi_utils/validator/static.py b/_delphi_utils_python/delphi_utils/validator/static.py index 767b5761c..48b17b888 100644 --- a/_delphi_utils_python/delphi_utils/validator/static.py +++ b/_delphi_utils_python/delphi_utils/validator/static.py @@ -295,14 +295,6 @@ def check_bad_val(self, df_to_test, nameformat, signal_type, report): report.increment_total_checks() - if df_to_test['val'].isnull().values.any(): - report.add_raised_error( - ValidationFailure("check_val_missing", - filename=nameformat, - message="val column can't have any cell that is NA")) - - report.increment_total_checks() - if not df_to_test[(df_to_test['val'] < 0)].empty: report.add_raised_error( ValidationFailure("check_val_lt_0", diff --git a/_delphi_utils_python/setup.py b/_delphi_utils_python/setup.py index 5cde1e57a..779ef0fd4 100644 --- a/_delphi_utils_python/setup.py +++ b/_delphi_utils_python/setup.py @@ -26,7 +26,7 @@ setup( name="delphi_utils", - version="0.2.11", + version="0.3.0", description="Shared Utility Functions for Indicators", long_description=long_description, long_description_content_type="text/markdown", diff --git a/_delphi_utils_python/tests/validator/test_dynamic.py b/_delphi_utils_python/tests/validator/test_dynamic.py index 321ce63fb..1f0348315 100644 --- a/_delphi_utils_python/tests/validator/test_dynamic.py +++ b/_delphi_utils_python/tests/validator/test_dynamic.py @@ -106,6 +106,28 @@ def test_0_vs_many(self): assert len(report.raised_errors) == 1 assert report.raised_errors[0].check_name == "check_rapid_change_num_rows" +class TestCheckNaVals: + params = { + "common": { + "data_source": "", + "span_length": 14, + "end_date": "2020-09-02" + } + } + def test_missing(self): + validator = DynamicValidator(self.params) + report = ValidationReport([]) + data = {"val": [np.nan] * 15, "geo_id": [0,1] * 7 + [2], + "time_value": ["2021-08-30"] * 14 + ["2021-05-01"]} + df = pd.DataFrame(data) + df.time_value = (pd.to_datetime(df.time_value)).dt.date + validator.check_na_vals(df, "geo", "signal", report) + + assert len(report.raised_errors) == 2 + assert report.raised_errors[0].check_name == "check_val_missing" + assert report.raised_errors[0].message == "geo_id 0" + assert report.raised_errors[1].check_name == "check_val_missing" + assert report.raised_errors[1].message == "geo_id 1" class TestCheckAvgValDiffs: params = { diff --git a/_delphi_utils_python/tests/validator/test_static.py b/_delphi_utils_python/tests/validator/test_static.py index 09286ba9c..bf270b4fd 100644 --- a/_delphi_utils_python/tests/validator/test_static.py +++ b/_delphi_utils_python/tests/validator/test_static.py @@ -362,15 +362,6 @@ def test_empty_df(self): assert len(report.raised_errors) == 0 - def test_missing(self): - validator = StaticValidator(self.params) - report = ValidationReport([]) - df = pd.DataFrame([np.nan], columns=["val"]) - validator.check_bad_val(df, FILENAME, "signal", report) - - assert len(report.raised_errors) == 1 - assert report.raised_errors[0].check_name == "check_val_missing" - def test_lt_0(self): validator = StaticValidator(self.params) report = ValidationReport([]) diff --git a/ansible/templates/dsew_community_profile-params-prod.json.j2 b/ansible/templates/dsew_community_profile-params-prod.json.j2 index fd377d758..ec3e254c3 100644 --- a/ansible/templates/dsew_community_profile-params-prod.json.j2 +++ b/ansible/templates/dsew_community_profile-params-prod.json.j2 @@ -26,7 +26,9 @@ "ref_window_size": 7, "smoothed_signals": [ "naats_total_7dav", - "naats_positivity_7dav" + "naats_positivity_7dav", + "confirmed_admissions_covid_1d_prop_7dav", + "confirmed_admissions_covid_1d_7dav" ] } } diff --git a/dsew_community_profile/delphi_dsew_community_profile/constants.py b/dsew_community_profile/delphi_dsew_community_profile/constants.py index 51c62b5ea..1404e52f4 100644 --- a/dsew_community_profile/delphi_dsew_community_profile/constants.py +++ b/dsew_community_profile/delphi_dsew_community_profile/constants.py @@ -50,22 +50,34 @@ class Transform: SIGNALS = { "total": { "is_rate" : False, - "api_name": "naats_total_7dav" + "api_name": "naats_total_7dav", + "make_prop": False }, "positivity": { "is_rate" : True, - "api_name": "naats_positivity_7dav" + "api_name": "naats_positivity_7dav", + "make_prop": False }, "confirmed covid-19 admissions": { "is_rate" : False, - "api_name": "confirmed_admissions_covid_1d_7dav" + "api_name": "confirmed_admissions_covid_1d_7dav", + "make_prop": True, + "api_prop_name": "confirmed_admissions_covid_1d_prop_7dav" } } COUNTS_7D_SIGNALS = {key for key, value in SIGNALS.items() if not value["is_rate"]} -def make_signal_name(key): - """Convert a signal key to the corresponding signal name for the API.""" +def make_signal_name(key, is_prop=False): + """Convert a signal key to the corresponding signal name for the API. + + Note, this function gets called twice with the same `key` for signals that support + population-proportion ("prop") variants. + """ + if is_prop: + return SIGNALS[key]["api_prop_name"] return SIGNALS[key]["api_name"] -NEWLINE="\n" +NEWLINE = "\n" +IS_PROP = True +NOT_PROP = False diff --git a/dsew_community_profile/delphi_dsew_community_profile/pull.py b/dsew_community_profile/delphi_dsew_community_profile/pull.py index a65b26a07..f2e88217b 100644 --- a/dsew_community_profile/delphi_dsew_community_profile/pull.py +++ b/dsew_community_profile/delphi_dsew_community_profile/pull.py @@ -11,8 +11,9 @@ from delphi_utils.geomap import GeoMapper -from .constants import TRANSFORMS, SIGNALS, COUNTS_7D_SIGNALS, NEWLINE -from .constants import DOWNLOAD_ATTACHMENT, DOWNLOAD_LISTING +from .constants import (TRANSFORMS, SIGNALS, COUNTS_7D_SIGNALS, NEWLINE, + IS_PROP, NOT_PROP, + DOWNLOAD_ATTACHMENT, DOWNLOAD_LISTING) # YYYYMMDD # example: "Community Profile Report 20211104.xlsx" @@ -248,7 +249,7 @@ def _parse_sheet(self, sheet): if (sheet.level == "msa" or sheet.level == "county") \ and self.publish_date < datetime.date(2021, 1, 8) \ and sig == "confirmed covid-19 admissions": - self.dfs[(sheet.level, sig)] = pd.DataFrame( + self.dfs[(sheet.level, sig, NOT_PROP)] = pd.DataFrame( columns = ["geo_id", "timestamp", "val", \ "se", "sample_size", "publish_date"] ) @@ -258,7 +259,7 @@ def _parse_sheet(self, sheet): assert len(sig_select) > 0, \ f"No {sig} in any of {select}\n\nAll headers:\n{NEWLINE.join(list(df.columns))}" - self.dfs[(sheet.level, sig)] = pd.concat([ + self.dfs[(sheet.level, sig, NOT_PROP)] = pd.concat([ pd.DataFrame({ "geo_id": sheet.geo_id_select(df).apply(sheet.geo_id_apply), "timestamp": pd.to_datetime(self.times[si[0]][sig]), @@ -271,14 +272,18 @@ def _parse_sheet(self, sheet): ]) for sig in COUNTS_7D_SIGNALS: - self.dfs[(sheet.level, sig)]["val"] /= 7 # 7-day total -> 7-day average + self.dfs[(sheet.level, sig, NOT_PROP)]["val"] /= 7 # 7-day total -> 7-day average def as_cached_filename(params, config): """Formulate a filename to uniquely identify this report in the input cache.""" + # eg "Community Profile Report 20220128.xlsx" + # but delimiters vary; don't get tripped up if they do something wacky like + # Community.Profile.Report.20220128.xlsx + name, _, ext = config['filename'].rpartition(".") return os.path.join( params['indicator']['input_cache'], - f"{config['assetId']}--{config['filename']}" + f"{name}--{config['assetId']}.{ext}" ) def fetch_listing(params): @@ -390,13 +395,46 @@ def fetch_new_reports(params, logger=None): # add nation from state geomapper = GeoMapper() for sig in SIGNALS: - state_key = ("state", sig) + state_key = ("state", sig, NOT_PROP) if state_key not in ret: continue - ret[("nation", sig)] = nation_from_state( + ret[("nation", sig, NOT_PROP)] = nation_from_state( ret[state_key].rename(columns={"geo_id": "state_id"}), sig, geomapper ) + for key, df in ret.copy().items(): + (geo, sig, _) = key + if SIGNALS[sig]["make_prop"]: + ret[(geo, sig, IS_PROP)] = generate_prop_signal(df, geo, geomapper) + return ret + +def generate_prop_signal(df, geo, geo_mapper): + """Transform base df into a proportion (per 100k population).""" + if geo == "state": + geo = "state_id" + if geo == "county": + geo = "fips" + + # Add population data + if geo == "msa": + map_df = geo_mapper.get_crosswalk("fips", geo) + map_df = geo_mapper.add_population_column( + map_df, "fips" + ).drop( + "fips", axis=1 + ).groupby( + geo + ).sum( + ).reset_index( + ) + df = pd.merge(df, map_df, left_on="geo_id", right_on=geo, how="inner") + else: + df = geo_mapper.add_population_column(df, geo, geocode_col="geo_id") + + df["val"] = df["val"] / df["population"] * 100000 + df.drop(["population", geo], axis=1, inplace=True) + + return df diff --git a/dsew_community_profile/delphi_dsew_community_profile/run.py b/dsew_community_profile/delphi_dsew_community_profile/run.py index d27c96216..3ce69b325 100644 --- a/dsew_community_profile/delphi_dsew_community_profile/run.py +++ b/dsew_community_profile/delphi_dsew_community_profile/run.py @@ -58,14 +58,14 @@ def replace_date_param(p): run_stats = [] dfs = fetch_new_reports(params, logger) for key, df in dfs.items(): - (geo, sig) = key + (geo, sig, is_prop) = key if sig not in params["indicator"]["export_signals"]: continue dates = create_export_csv( df, params['common']['export_dir'], geo, - make_signal_name(sig), + make_signal_name(sig, is_prop), **export_params ) if len(dates)>0: diff --git a/dsew_community_profile/params.json.template b/dsew_community_profile/params.json.template index 3a64d71ab..42fc7faad 100644 --- a/dsew_community_profile/params.json.template +++ b/dsew_community_profile/params.json.template @@ -32,7 +32,9 @@ "ref_window_size": 7, "smoothed_signals": [ "naats_total_7dav", - "naats_positivity_7dav" + "naats_positivity_7dav", + "confirmed_admissions_covid_1d_prop_7dav", + "confirmed_admissions_covid_1d_7dav" ] } } diff --git a/dsew_community_profile/tests/params.json.template b/dsew_community_profile/tests/params.json.template index 89cee4bf0..645bd253f 100644 --- a/dsew_community_profile/tests/params.json.template +++ b/dsew_community_profile/tests/params.json.template @@ -25,7 +25,9 @@ "ref_window_size": 7, "smoothed_signals": [ "naats_total_7dav", - "naats_positivity_7dav" + "naats_positivity_7dav", + "confirmed_admissions_covid_1d_prop_7dav", + "confirmed_admissions_covid_1d_7dav" ] } } diff --git a/dsew_community_profile/tests/test_pull.py b/dsew_community_profile/tests/test_pull.py index 60f0fa5dd..b898e21b6 100644 --- a/dsew_community_profile/tests/test_pull.py +++ b/dsew_community_profile/tests/test_pull.py @@ -9,7 +9,7 @@ from delphi_dsew_community_profile.pull import DatasetTimes from delphi_dsew_community_profile.pull import Dataset -from delphi_dsew_community_profile.pull import fetch_listing, nation_from_state +from delphi_dsew_community_profile.pull import fetch_listing, nation_from_state, generate_prop_signal example = namedtuple("example", "given expected") @@ -213,3 +213,84 @@ def test_nation_from_state(self): 'sample_size': [None],}), check_like=True ) + + def test_generate_prop_signal_msa(self): + geomapper = GeoMapper() + county_pop = geomapper.get_crosswalk("fips", "pop") + county_msa = geomapper.get_crosswalk("fips", "msa") + msa_pop = county_pop.merge(county_msa, on="fips", how="inner").groupby("msa").sum().reset_index() + + test_df = pd.DataFrame({ + 'geo_id': ['35620', '31080'], + 'timestamp': [datetime(year=2020, month=1, day=1)]*2, + 'val': [15., 150.], + 'se': [None, None], + 'sample_size': [None, None],}) + + nyc_pop = int(msa_pop.loc[msa_pop.msa == "35620", "pop"]) + la_pop = int(msa_pop.loc[msa_pop.msa == "31080", "pop"]) + + expected_df = pd.DataFrame({ + 'geo_id': ['35620', '31080'], + 'timestamp': [datetime(year=2020, month=1, day=1)]*2, + 'val': [15. / nyc_pop * 100000, 150. / la_pop * 100000], + 'se': [None, None], + 'sample_size': [None, None],}) + + pd.testing.assert_frame_equal( + generate_prop_signal( + test_df.copy(), + "msa", + geomapper + ), + expected_df, + check_like=True + ) + def test_generate_prop_signal_non_msa(self): + geomapper = GeoMapper() + + geos = { + "state": { + "code_name": "state_id", + "geo_names": ['pa', 'wv'] + }, + "county": { + "code_name": "fips", + "geo_names": ['36061', '06037'] + }, + # nation uses the same logic path so no need to test separately + "hhs": { + "code_name": "hhs", + "geo_names": ["1", "4"] + } + } + + for geo, settings in geos.items(): + geo_pop = geomapper.get_crosswalk(settings["code_name"], "pop") + + test_df = pd.DataFrame({ + 'geo_id': settings["geo_names"], + 'timestamp': [datetime(year=2020, month=1, day=1)]*2, + 'val': [15., 150.], + 'se': [None, None], + 'sample_size': [None, None],}) + + pop1 = int(geo_pop.loc[geo_pop[settings["code_name"]] == settings["geo_names"][0], "pop"]) + pop2 = int(geo_pop.loc[geo_pop[settings["code_name"]] == settings["geo_names"][1], "pop"]) + + expected_df = pd.DataFrame({ + 'geo_id': settings["geo_names"], + 'timestamp': [datetime(year=2020, month=1, day=1)]*2, + 'val': [15. / pop1 * 100000, 150. / pop2 * 100000], + 'se': [None, None], + 'sample_size': [None, None],}) + + pd.testing.assert_frame_equal( + generate_prop_signal( + test_df.copy(), + geo, + geomapper + ), + expected_df, + check_like=True + )