From f723bebb28fd7fa4e61d4257735ed1eaef4bda69 Mon Sep 17 00:00:00 2001 From: Samuel Gratzl Date: Wed, 7 Oct 2020 13:04:42 +0200 Subject: [PATCH 1/5] feat: add min_issue to covidcast meta data closes #232 --- docs/api/covidcast_meta.md | 1 + .../covidcast/test_covidcast_meta_caching.py | 1 + integrations/client/test_delphi_epidata.py | 5 +++-- integrations/server/test_covidcast_meta.py | 3 +++ src/acquisition/covidcast/database.py | 11 ++++++----- 5 files changed, 14 insertions(+), 7 deletions(-) diff --git a/docs/api/covidcast_meta.md b/docs/api/covidcast_meta.md index d8121b10b..f7339808c 100644 --- a/docs/api/covidcast_meta.md +++ b/docs/api/covidcast_meta.md @@ -38,6 +38,7 @@ None required. | `epidata[].max_value` | maximum value | float | | `epidata[].mean_value` | mean of value | float | | `epidata[].stdev_value` | standard deviation of value | float | +| `epidata[].min_issue` | earliest date data was issued (e.g., 20200710) | integer | | `epidata[].max_issue` | most recent date data was issued (e.g., 20200710) | integer | | `epidata[].min_lag` | smallest lag from observation to issue, in `time_type` units | integer | | `epidata[].max_lag` | largest lag from observation to issue, in `time_type` units | integer | diff --git a/integrations/acquisition/covidcast/test_covidcast_meta_caching.py b/integrations/acquisition/covidcast/test_covidcast_meta_caching.py index 18dec6577..29f55672b 100644 --- a/integrations/acquisition/covidcast/test_covidcast_meta_caching.py +++ b/integrations/acquisition/covidcast/test_covidcast_meta_caching.py @@ -99,6 +99,7 @@ def test_caching(self): 'max_value': 1, 'mean_value': 1, 'stdev_value': 0, + 'min_issue': 20200423, 'max_issue': 20200423, 'min_lag': 0, 'max_lag': 1, diff --git a/integrations/client/test_delphi_epidata.py b/integrations/client/test_delphi_epidata.py index 9e51ec99f..7a7e0477d 100644 --- a/integrations/client/test_delphi_epidata.py +++ b/integrations/client/test_delphi_epidata.py @@ -46,7 +46,7 @@ def tearDown(self): def test_covidcast(self): """Test that the covidcast endpoint returns expected data.""" self.maxDiff=None - + # insert dummy data self.cur.execute(''' insert into covidcast values @@ -123,7 +123,7 @@ def test_covidcast(self): }], 'message': 'success', }) - + # fetch data, without specifying issue or lag response_1 = Epidata.covidcast( 'src', 'sig', 'day', 'county', 20200414, '01234') @@ -258,6 +258,7 @@ def test_covidcast_meta(self): 'mean_value': 6.5, 'stdev_value': 0.5, 'last_update': 345, + 'min_issue': 20200416, 'max_issue': 20200416, 'min_lag': 1, 'max_lag': 2, diff --git a/integrations/server/test_covidcast_meta.py b/integrations/server/test_covidcast_meta.py index a45981e15..5d73f73e2 100644 --- a/integrations/server/test_covidcast_meta.py +++ b/integrations/server/test_covidcast_meta.py @@ -67,6 +67,7 @@ def test_round_trip(self): 'mean_value': 15, 'stdev_value': 5, 'last_update': 123, + 'min_issue': 2, 'max_issue': 2, 'min_lag': 0, 'max_lag': 0, @@ -116,6 +117,7 @@ def test_filter(self): 'mean_value': 15, 'stdev_value': 5, 'last_update': 123, + 'min_issue': 2, 'max_issue': 2, 'min_lag': 0, 'max_lag': 0, @@ -235,6 +237,7 @@ def test_suppress_work_in_progress(self): 'mean_value': 15, 'stdev_value': 5, 'last_update': 123, + 'min_issue': 2, 'max_issue': 2, 'min_lag': 0, 'max_lag': 0, diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index 27176f072..77a7dc23d 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -162,12 +162,12 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False SET `is_latest_issue`=0 ''' set_is_latest_issue_sql = f''' - UPDATE + UPDATE ( SELECT `source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`, MAX(`issue`) AS `issue` FROM ( - SELECT DISTINCT `source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value` + SELECT DISTINCT `source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value` FROM `{tmp_table_name}` ) AS TMP LEFT JOIN `covidcast` @@ -176,7 +176,7 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False ) AS TMP LEFT JOIN `covidcast` USING (`source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`, `issue`) - SET `is_latest_issue`=1 + SET `is_latest_issue`=1 ''' # TODO: ^ do we want to reset `direction_updated_timestamp` and `direction` in the duplicate key case? @@ -333,7 +333,7 @@ def get_all_record_values_of_timeseries_with_potentially_stale_direction(self, t # A query that selects all rows from `covidcast` that have latest issue-date # for any (time-series-key, time_value) with time_type='day'. latest_issues_sql = f''' - SELECT + SELECT `id`, `source`, `signal`, @@ -576,6 +576,7 @@ def get_covidcast_meta(self): ROUND(AVG(`value`),7) AS `mean_value`, ROUND(STD(`value`),7) AS `stdev_value`, MAX(`value_updated_timestamp`) AS `last_update`, + MIN(`issue`) as `min_issue`, MAX(`issue`) as `max_issue`, MIN(`lag`) as `min_lag`, MAX(`lag`) as `max_lag` @@ -621,7 +622,7 @@ def get_covidcast_meta(self): meta.extend(list(dict(zip(self._cursor.column_names,x)) for x in self._cursor)) return meta - + def update_covidcast_meta_cache(self, metadata): """Updates the `covidcast_meta_cache` table.""" From 6f01aa58a63948e47673f2f17a68e8e43b706a84 Mon Sep 17 00:00:00 2001 From: Samuel Gratzl Date: Mon, 12 Oct 2020 16:26:51 +0200 Subject: [PATCH 2/5] fix: update min issue computation --- integrations/server/test_covidcast_meta.py | 61 ++++++++++++++- src/acquisition/covidcast/database.py | 86 +++++++++++----------- 2 files changed, 100 insertions(+), 47 deletions(-) diff --git a/integrations/server/test_covidcast_meta.py b/integrations/server/test_covidcast_meta.py index 5d73f73e2..e06499ec0 100644 --- a/integrations/server/test_covidcast_meta.py +++ b/integrations/server/test_covidcast_meta.py @@ -47,7 +47,11 @@ def test_round_trip(self): # insert dummy data and accumulate expected results (in sort order) template = ''' insert into covidcast values - (0, "%s", "%s", "%s", "%s", %d, "%s", 123, %d, 0, 0, 456, 0, %d, 0, 1, %d) + (0, "%s", "%s", "%s", "%s", %d, "%s", + 123, + %d, 0, 0, + 456, 0, + %d, 0, 1, %d) ''' expected = [] for src in ('src1', 'src2'): @@ -67,7 +71,7 @@ def test_round_trip(self): 'mean_value': 15, 'stdev_value': 5, 'last_update': 123, - 'min_issue': 2, + 'min_issue': 1, 'max_issue': 2, 'min_lag': 0, 'max_lag': 0, @@ -117,7 +121,7 @@ def test_filter(self): 'mean_value': 15, 'stdev_value': 5, 'last_update': 123, - 'min_issue': 2, + 'min_issue': 1, 'max_issue': 2, 'min_lag': 0, 'max_lag': 0, @@ -237,7 +241,7 @@ def test_suppress_work_in_progress(self): 'mean_value': 15, 'stdev_value': 5, 'last_update': 123, - 'min_issue': 2, + 'min_issue': 1, 'max_issue': 2, 'min_lag': 0, 'max_lag': 0, @@ -259,3 +263,52 @@ def test_suppress_work_in_progress(self): 'epidata': expected, 'message': 'success', }) + + + def test_min_issue(self): + """Test proper computation of min issue in a complex setup.""" + + def add_row(src, sig, time_type, geo_type, time_value, geo_value, value, issue, is_latest = True): + + template = ''' + insert into covidcast( +`id`, `source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`, +`value_updated_timestamp`, `value`, `stderr`, `sample_size`, `direction_updated_timestamp`, `direction`, +`issue`, `lag`, `is_latest_issue`, `is_wip`) + values + (0, "%s", "%s", "%s", "%s", %d, "%s", + 19700101, %d, 0, 0, 19700101, 0, + %d, 0, %d, 0) + ''' + self.cur.execute(template % (src, sig, time_type, geo_type, time_value, geo_value, value, issue, is_latest)) + + expected = [] + for src in ('src1', 'src2'): + for sig in ('sig1', 'sig2'): + expected.append(dict(data_source=src, signal=sig, min_issue=20200502, max_issue=20200505, mean_value=15)) + + for time_value in [20200101, 20200102]: + for geo_value, value in zip(('geo1', 'geo2'), (1, 2)): + # add some old issue rows which won't influence the mean + add_row(src, sig, 'day', 'county', time_value, geo_value, value, 20200502, False) + + for time_value in range(20200101, 20200105): + for geo_value, value in zip(('geo1', 'geo2'), (10, 20)): + # add a latest issue row + add_row(src, sig, 'day', 'county', time_value, geo_value, value, 20200505, True) + + self.cnx.commit() + update_cache(args=None) + + # make the request + response = requests.get(BASE_URL, params=dict(source='covidcast_meta', fields="data_source,signal,min_issue,max_issue,mean_value")) + response.raise_for_status() + response = response.json() + + # assert that the right data came back + self.maxDiff = None + self.assertEqual(response, { + 'result': 1, + 'epidata': expected, + 'message': 'success', + }) diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index 77a7dc23d..382bb690a 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -560,17 +560,37 @@ def get_covidcast_meta(self): sql = 'SELECT `source`, `signal` FROM covidcast WHERE NOT `is_wip` GROUP BY `source`, `signal` ORDER BY `source` ASC, `signal` ASC;' self._cursor.execute(sql) - for source, signal in [ss for ss in self._cursor]: #NOTE: this obfuscation protects the integrity of the cursor; using the cursor as a generator will cause contention w/ subsequent queries + signals = [ss for ss in self._cursor] #NOTE: this obfuscation protects the integrity of the cursor; using the cursor as a generator will cause contention w/ subsequent queries + for source, signal in signals: + # calculate the min issues per combination sql = ''' SELECT - t.`source` AS `data_source`, - t.`signal`, - t.`time_type`, - t.`geo_type`, - MIN(t.`time_value`) AS `min_time`, - MAX(t.`time_value`) AS `max_time`, - COUNT(DISTINCT t.`geo_value`) AS `num_locations`, + `time_type`, + `geo_type`, + MIN(`issue`) as `min_issue` + FROM + `covidcast` + WHERE + `source` = %s AND + `signal` = %s + GROUP BY + `time_type`, + `geo_type` + ''' + self._cursor.execute(sql, (source, signal)) + min_issue_lookup = {f'{x[0]}:{x[1]}': x[2] for x in self._cursor} + + # calculate statistics for the latest issue entries + sql = ''' + SELECT + `source` AS `data_source`, + `signal`, + `time_type`, + `geo_type`, + MIN(`time_value`) AS `min_time`, + MAX(`time_value`) AS `max_time`, + COUNT(DISTINCT `geo_value`) AS `num_locations`, MIN(`value`) AS `min_value`, MAX(`value`) AS `max_value`, ROUND(AVG(`value`),7) AS `mean_value`, @@ -581,46 +601,26 @@ def get_covidcast_meta(self): MIN(`lag`) as `min_lag`, MAX(`lag`) as `max_lag` FROM - `covidcast` t - JOIN - ( - SELECT - max(`issue`) `max_issue`, - `time_type`, - `time_value`, - `source`, - `signal`, - `geo_type`, - `geo_value` - FROM - `covidcast` - WHERE - `source` = %s AND - `signal` = %s - GROUP BY - `time_value`, - `time_type`, - `geo_type`, - `geo_value` - ) x - ON - x.`max_issue` = t.`issue` AND - x.`time_type` = t.`time_type` AND - x.`time_value` = t.`time_value` AND - x.`source` = t.`source` AND - x.`signal` = t.`signal` AND - x.`geo_type` = t.`geo_type` AND - x.`geo_value` = t.`geo_value` + `covidcast` + WHERE + `source` = %s AND + `signal` = %s AND + `is_latest_issue` is TRUE GROUP BY - t.`time_type`, - t.`geo_type` + `time_type`, + `geo_type` ORDER BY - t.`time_type` ASC, - t.`geo_type` ASC + `time_type` ASC, + `geo_type` ASC ''' self._cursor.execute(sql, (source, signal)) - meta.extend(list(dict(zip(self._cursor.column_names,x)) for x in self._cursor)) + for x in self._cursor: + entry = dict(zip(self._cursor.column_names, x)) + # merge in the min issue + key = f"{entry['time_type']}:{entry['geo_type']}" + entry['min_issue'] = min_issue_lookup.get(key, entry['max_issue']) + meta.append(entry) return meta def update_covidcast_meta_cache(self, metadata): From b2fce76257937d520d1be57f8650e85ed1cad473 Mon Sep 17 00:00:00 2001 From: Samuel Gratzl Date: Mon, 12 Oct 2020 16:29:34 +0200 Subject: [PATCH 3/5] fix: python client test --- integrations/client/test_delphi_epidata.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/client/test_delphi_epidata.py b/integrations/client/test_delphi_epidata.py index 7a7e0477d..63f44ebfe 100644 --- a/integrations/client/test_delphi_epidata.py +++ b/integrations/client/test_delphi_epidata.py @@ -258,7 +258,7 @@ def test_covidcast_meta(self): 'mean_value': 6.5, 'stdev_value': 0.5, 'last_update': 345, - 'min_issue': 20200416, + 'min_issue': 20200414, 'max_issue': 20200416, 'min_lag': 1, 'max_lag': 2, From a581f0b7c4cbbaeff2ec4ae24bebfb2f83b21b67 Mon Sep 17 00:00:00 2001 From: Samuel Gratzl Date: Thu, 15 Oct 2020 10:44:24 +0200 Subject: [PATCH 4/5] refactor: simpliy lookup --- src/acquisition/covidcast/database.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index 382bb690a..d706ff815 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -579,7 +579,7 @@ def get_covidcast_meta(self): `geo_type` ''' self._cursor.execute(sql, (source, signal)) - min_issue_lookup = {f'{x[0]}:{x[1]}': x[2] for x in self._cursor} + min_issue_lookup = {(x[0], x[1]): x[2] for x in self._cursor} # calculate statistics for the latest issue entries sql = ''' @@ -618,7 +618,7 @@ def get_covidcast_meta(self): for x in self._cursor: entry = dict(zip(self._cursor.column_names, x)) # merge in the min issue - key = f"{entry['time_type']}:{entry['geo_type']}" + key = (entry['time_type'], entry['geo_type']) entry['min_issue'] = min_issue_lookup.get(key, entry['max_issue']) meta.append(entry) return meta From 06aab57d3aed2f21f68ea65fb20d15811b5860d7 Mon Sep 17 00:00:00 2001 From: Samuel Gratzl Date: Thu, 15 Oct 2020 11:00:14 +0200 Subject: [PATCH 5/5] fix: test --- .../acquisition/covidcast/test_covidcast_meta_caching.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/integrations/acquisition/covidcast/test_covidcast_meta_caching.py b/integrations/acquisition/covidcast/test_covidcast_meta_caching.py index 29f55672b..81d3be001 100644 --- a/integrations/acquisition/covidcast/test_covidcast_meta_caching.py +++ b/integrations/acquisition/covidcast/test_covidcast_meta_caching.py @@ -97,9 +97,9 @@ def test_caching(self): 'last_update': 789, 'min_value': 1, 'max_value': 1, - 'mean_value': 1, - 'stdev_value': 0, - 'min_issue': 20200423, + 'mean_value': 1.0, + 'stdev_value': 0.0, + 'min_issue': 20200422, 'max_issue': 20200423, 'min_lag': 0, 'max_lag': 1,