From b0934b59db8eead264ddc4f124310a976143918f Mon Sep 17 00:00:00 2001 From: Nolan Gormley Date: Mon, 27 Jan 2025 09:06:08 -0500 Subject: [PATCH 01/27] first pass at coverage endpoint --- src/server/endpoints/covidcast.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/server/endpoints/covidcast.py b/src/server/endpoints/covidcast.py index 3d7d99e82..f068bf2da 100644 --- a/src/server/endpoints/covidcast.py +++ b/src/server/endpoints/covidcast.py @@ -542,6 +542,23 @@ def transform_row(row, proxy): return execute_query(q.query, q.params, fields_string, fields_int, [], transform=transform_row) +@bp.route("/geo_coverage", methods=("GET", "POST")) +def handle_geo_coverage(): + """ + For a specific geo returns the signal coverage (number of signals for a given geo_type) + """ + + geo_type, geo_value = request.values.get("geo").split(":", 1) + + q = QueryBuilder("covid.coverage_crossref_v", "c") + fields_string = ["source", "signal"] + + q.set_fields(fields_string) + + q.where(geo_type=geo_type, geo_value=geo_value) + q.set_sort_order("source", "signal") + + return execute_query(q.query, q.params, fields_string, [], []) @bp.route("/anomalies", methods=("GET", "POST")) def handle_anomalies(): From 1a06e2e4c183700bdc33fd04409a57fe65797a8f Mon Sep 17 00:00:00 2001 From: Nolan Gormley Date: Mon, 27 Jan 2025 09:17:08 -0500 Subject: [PATCH 02/27] added SQL to schemas --- src/ddl/v4_schema.sql | 21 +++++++++++++++++++++ src/ddl/v4_schema_aliases.sql | 1 + 2 files changed, 22 insertions(+) diff --git a/src/ddl/v4_schema.sql b/src/ddl/v4_schema.sql index 7551707f6..de013c444 100644 --- a/src/ddl/v4_schema.sql +++ b/src/ddl/v4_schema.sql @@ -164,3 +164,24 @@ CREATE TABLE `covidcast_meta_cache` ( PRIMARY KEY (`timestamp`) ) ENGINE=InnoDB; INSERT INTO covidcast_meta_cache VALUES (0, '[]'); + +CREATE TABLE `coverage_crossref` ( + `signal_key_id` bigint NOT NULL, + `geo_key_id` bigint NOT NULL, + `min_time_value` int NOT NULL, + `max_time_value` int NOT NULL, + UNIQUE INDEX coverage_crossref_signal_key_id ON coverage_crossref (signal_key_id), + UNIQUE INDEX coverage_crossref_geo_key_id ON coverage_crossref (geo_key_id) +) ENGINE=InnoDB; + +CREATE OR REPLACE VIEW `coverage_crossref_v` AS +SELECT + `sd`.`source`, + `sd`.`signal`, + `gd`.`geo_type`, + `gd`.`geo_value`, + `cc`.`min_time_value`, + `cc`.`max_time_value` +FROM `coverage_crossref` `cc` +JOIN `signal_dim` `sd` USING (`signal_key_id`) +JOIN `geo_dim` `gd` USING (`geo_key_id`); diff --git a/src/ddl/v4_schema_aliases.sql b/src/ddl/v4_schema_aliases.sql index f5c6340e9..6b5a2d58f 100644 --- a/src/ddl/v4_schema_aliases.sql +++ b/src/ddl/v4_schema_aliases.sql @@ -8,3 +8,4 @@ CREATE VIEW `epidata`.`epimetric_full_v` AS SELECT * FROM `covid`.`epimetric_full_v`; CREATE VIEW `epidata`.`epimetric_latest_v` AS SELECT * FROM `covid`.`epimetric_latest_v`; CREATE VIEW `epidata`.`covidcast_meta_cache` AS SELECT * FROM `covid`.`covidcast_meta_cache`; +CREATE VIEW `epidata`.`coverage_crossref_v` AS SELECT * FROM `covid`.`coverage_crossref_v`; From 55d13cb5a9f1d1ecdf3e4aebd6b8f05fce668749 Mon Sep 17 00:00:00 2001 From: Nolan Gormley Date: Mon, 27 Jan 2025 09:38:06 -0500 Subject: [PATCH 03/27] added load table and function to recompute data --- src/acquisition/covidcast/database.py | 42 +++++++++++++++++++++++++++ src/ddl/v4_schema.sql | 9 ++++++ src/ddl/v4_schema_aliases.sql | 2 +- 3 files changed, 52 insertions(+), 1 deletion(-) diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index 5fd56923b..e818ee5e6 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -561,3 +561,45 @@ def retrieve_covidcast_meta_cache(self): for entry in cache: cache_hash[(entry['data_source'], entry['signal'], entry['time_type'], entry['geo_type'])] = entry return cache_hash + + def compute_coverage_crossref(self): + """Compute coverage_crossref table.""" + logger = get_structured_logger("compute_coverage_crossref") + + coverage_crossref_load_delete_sql = f''' + DELETE FROM coverage_crossref_load WHERE 1; + ''' + + coverage_crossref_compute_sql = f''' + INSERT INTO coverage_crossref_load + SELECT + el.signal_key_id, + el.geo_key_id, + MIN(el.time_value) as min_time_value, + MAX(el.time_value) as max_time_value + FROM epimetric_latest el + GROUP BY el.signal_key_id, el.geo_key_id; + ''' + + coverage_crossref_delete_sql = f''' + DELETE FROM coverage_crossref WHERE 1; + ''' + + coverage_crossref_update_sql = f''' + INSERT INTO coverage_crossref + SELECT * FROM coverage_crossref_load; + ''' + + self._cursor.execute(coverage_crossref_load_delete_sql) + logger.info(f"coverage_crossref_load_delete_sql:{self._cursor.rowcount}") + + self._cursor.execute(coverage_crossref_compute_sql) + logger.info(f"coverage_crossref_compute_sql:{self._cursor.rowcount}") + + self._cursor.execute(coverage_crossref_delete_sql) + logger.info(f"coverage_crossref_delete_sql:{self._cursor.rowcount}") + + self._cursor.execute(coverage_crossref_update_sql) + logger.info(f"coverage_crossref_update_sql:{self._cursor.rowcount}") + + return self._cursor.rowcount \ No newline at end of file diff --git a/src/ddl/v4_schema.sql b/src/ddl/v4_schema.sql index de013c444..e0fadeddc 100644 --- a/src/ddl/v4_schema.sql +++ b/src/ddl/v4_schema.sql @@ -165,6 +165,15 @@ CREATE TABLE `covidcast_meta_cache` ( ) ENGINE=InnoDB; INSERT INTO covidcast_meta_cache VALUES (0, '[]'); +CREATE TABLE `coverage_crossref_load` ( + `signal_key_id` bigint NOT NULL, + `geo_key_id` bigint NOT NULL, + `min_time_value` int NOT NULL, + `max_time_value` int NOT NULL, + UNIQUE INDEX coverage_crossref_signal_key_id ON coverage_crossref (signal_key_id), + UNIQUE INDEX coverage_crossref_geo_key_id ON coverage_crossref (geo_key_id) +) ENGINE=InnoDB; + CREATE TABLE `coverage_crossref` ( `signal_key_id` bigint NOT NULL, `geo_key_id` bigint NOT NULL, diff --git a/src/ddl/v4_schema_aliases.sql b/src/ddl/v4_schema_aliases.sql index 6b5a2d58f..9584fcf3a 100644 --- a/src/ddl/v4_schema_aliases.sql +++ b/src/ddl/v4_schema_aliases.sql @@ -8,4 +8,4 @@ CREATE VIEW `epidata`.`epimetric_full_v` AS SELECT * FROM `covid`.`epimetric_full_v`; CREATE VIEW `epidata`.`epimetric_latest_v` AS SELECT * FROM `covid`.`epimetric_latest_v`; CREATE VIEW `epidata`.`covidcast_meta_cache` AS SELECT * FROM `covid`.`covidcast_meta_cache`; -CREATE VIEW `epidata`.`coverage_crossref_v` AS SELECT * FROM `covid`.`coverage_crossref_v`; +CREATE VIEW `epidata`.`coverage_crossref_v` AS SELECT * FROM `covid`.`coverage_crossref_v`; From 16fe7b70893a203ee080740daa9b8f5f7b443727 Mon Sep 17 00:00:00 2001 From: Nolan Gormley Date: Mon, 27 Jan 2025 13:00:27 -0500 Subject: [PATCH 04/27] modified endpoint to use geo_sets, allowing multiple geos in filter --- src/server/endpoints/covidcast.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/server/endpoints/covidcast.py b/src/server/endpoints/covidcast.py index f068bf2da..de6f98e5a 100644 --- a/src/server/endpoints/covidcast.py +++ b/src/server/endpoints/covidcast.py @@ -548,14 +548,14 @@ def handle_geo_coverage(): For a specific geo returns the signal coverage (number of signals for a given geo_type) """ - geo_type, geo_value = request.values.get("geo").split(":", 1) + geo_sets = parse_geo_sets() - q = QueryBuilder("covid.coverage_crossref_v", "c") + q = QueryBuilder("coverage_crossref_v", "c") fields_string = ["source", "signal"] q.set_fields(fields_string) - q.where(geo_type=geo_type, geo_value=geo_value) + q.apply_geo_filters("geo_type", "geo_value", geo_sets) q.set_sort_order("source", "signal") return execute_query(q.query, q.params, fields_string, [], []) From 85a972e35ee809ded40952527666d204b0957313 Mon Sep 17 00:00:00 2001 From: Nolan Gormley Date: Mon, 27 Jan 2025 13:04:28 -0500 Subject: [PATCH 05/27] removed indexes from load table, fixed index declaration on main table --- src/ddl/v4_schema.sql | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/ddl/v4_schema.sql b/src/ddl/v4_schema.sql index e0fadeddc..aaa560e3a 100644 --- a/src/ddl/v4_schema.sql +++ b/src/ddl/v4_schema.sql @@ -169,9 +169,7 @@ CREATE TABLE `coverage_crossref_load` ( `signal_key_id` bigint NOT NULL, `geo_key_id` bigint NOT NULL, `min_time_value` int NOT NULL, - `max_time_value` int NOT NULL, - UNIQUE INDEX coverage_crossref_signal_key_id ON coverage_crossref (signal_key_id), - UNIQUE INDEX coverage_crossref_geo_key_id ON coverage_crossref (geo_key_id) + `max_time_value` int NOT NULL ) ENGINE=InnoDB; CREATE TABLE `coverage_crossref` ( @@ -179,8 +177,8 @@ CREATE TABLE `coverage_crossref` ( `geo_key_id` bigint NOT NULL, `min_time_value` int NOT NULL, `max_time_value` int NOT NULL, - UNIQUE INDEX coverage_crossref_signal_key_id ON coverage_crossref (signal_key_id), - UNIQUE INDEX coverage_crossref_geo_key_id ON coverage_crossref (geo_key_id) + UNIQUE INDEX coverage_crossref_signal_key_id (`signal_key_id`), + UNIQUE INDEX coverage_crossref_geo_key_id (`geo_key_id`) ) ENGINE=InnoDB; CREATE OR REPLACE VIEW `coverage_crossref_v` AS From c2baf0264c004d35e260ed1b7fe4fbed7dcd0ca9 Mon Sep 17 00:00:00 2001 From: Nolan Gormley Date: Wed, 29 Jan 2025 15:24:32 -0500 Subject: [PATCH 06/27] added CLI and tests --- .../test_coverage_crossref_update.py | 141 ++++++++++++++++++ src/acquisition/covidcast/database.py | 36 ++++- src/maintenance/coverage_crossref_updater.py | 58 +++++++ tests/acquisition/covidcast/test_database.py | 16 ++ 4 files changed, 248 insertions(+), 3 deletions(-) create mode 100644 integrations/acquisition/covidcast/test_coverage_crossref_update.py create mode 100644 src/maintenance/coverage_crossref_updater.py diff --git a/integrations/acquisition/covidcast/test_coverage_crossref_update.py b/integrations/acquisition/covidcast/test_coverage_crossref_update.py new file mode 100644 index 000000000..a99b0bc0f --- /dev/null +++ b/integrations/acquisition/covidcast/test_coverage_crossref_update.py @@ -0,0 +1,141 @@ +"""Integration tests for covidcast's metadata caching.""" + +# standard library +import json +import unittest + +# third party +import mysql.connector +import requests + +# first party +from delphi_utils import Nans +from delphi.epidata.client.delphi_epidata import Epidata +import delphi.operations.secrets as secrets +import delphi.epidata.acquisition.covidcast.database as live +from delphi.epidata.maintenance.coverage_crossref_updater import main + +# py3tester coverage target (equivalent to `import *`) +__test_target__ = ( + 'delphi.epidata.acquisition.covidcast.' + 'coverage_crossref_updater' +) + +# use the local instance of the Epidata API +BASE_URL = 'http://delphi_web_epidata/epidata' + + +class CoverageCrossrefTests(unittest.TestCase): + """Tests coverage crossref updater.""" + + def setUp(self): + """Perform per-test setup.""" + + # connect to the `epidata` database + cnx = mysql.connector.connect( + user='user', + password='pass', + host='delphi_database_epidata', + database='covid') + cur = cnx.cursor() + + # clear all tables + cur.execute("truncate table epimetric_load") + cur.execute("truncate table epimetric_full") + cur.execute("truncate table epimetric_latest") + cur.execute("truncate table geo_dim") + cur.execute("truncate table signal_dim") + cur.execute("truncate table coverage_crossref") + cur.execute("truncate table coverage_crossref_load") + cnx.commit() + cur.close() + + # make connection and cursor available to test cases + self.cnx = cnx + self.cur = cnx.cursor() + + # use the local instance of the epidata database + secrets.db.host = 'delphi_database_epidata' + secrets.db.epi = ('user', 'pass') + + epidata_cnx = mysql.connector.connect( + user='user', + password='pass', + host='delphi_database_epidata', + database='epidata') + epidata_cur = epidata_cnx.cursor() + + epidata_cur.execute("DELETE FROM `api_user`") + epidata_cur.execute('INSERT INTO `api_user`(`api_key`, `email`) VALUES("key", "email")') + epidata_cnx.commit() + epidata_cur.close() + epidata_cnx.close() + + # use the local instance of the Epidata API + Epidata.BASE_URL = BASE_URL + Epidata.auth = ('epidata', 'key') + + def tearDown(self): + """Perform per-test teardown.""" + self.cur.close() + self.cnx.close() + + @staticmethod + def _make_request(): + params = {'geo': 'state:*'} + response = requests.get(f"{Epidata.BASE_URL}/covidcast/geo_coverage", params=params, auth=Epidata.auth) + response.raise_for_status() + return response.json() + + def test_caching(self): + """Populate, query, cache, query, and verify the cache.""" + + # insert dummy data + self.cur.execute(f''' + INSERT INTO `signal_dim` (`signal_key_id`, `source`, `signal`) + VALUES + (42, 'src', 'sig'); + ''') + self.cur.execute(f''' + INSERT INTO `geo_dim` (`geo_key_id`, `geo_type`, `geo_value`) + VALUES + (96, 'state', 'pa'), + (97, 'state', 'wa'); + ''') + self.cur.execute(f''' + INSERT INTO + `epimetric_latest` (`epimetric_id`, `signal_key_id`, `geo_key_id`, `time_type`, + `time_value`, `value_updated_timestamp`, + `value`, `stderr`, `sample_size`, + `issue`, `lag`, `missing_value`, + `missing_stderr`,`missing_sample_size`) + VALUES + (15, 42, 96, 'day', 20200422, + 123, 1, 2, 3, 20200422, 0, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), + (16, 42, 97, 'day', 20200422, + 789, 1, 2, 3, 20200423, 1, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}) + ''') + self.cnx.commit() + + results = self._make_request() + + # make sure the tables are empty + self.assertEqual(results, { + 'result': -2, + 'epidata': [], + 'message': 'no results', + }) + + # update the cache + args = [] + main(args) + + results = self._make_request() + + # make sure the cache was actually served + self.assertEqual(results, { + 'result': 1, + 'epidata': [{'signal': 'sig', 'source': 'src'}, + {'signal': 'sig', 'source': 'src'}], + 'message': 'success', + }) diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index e818ee5e6..ab523a176 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -567,7 +567,7 @@ def compute_coverage_crossref(self): logger = get_structured_logger("compute_coverage_crossref") coverage_crossref_load_delete_sql = f''' - DELETE FROM coverage_crossref_load WHERE 1; + TRUNCATE coverage_crossref_load; ''' coverage_crossref_compute_sql = f''' @@ -582,7 +582,23 @@ def compute_coverage_crossref(self): ''' coverage_crossref_delete_sql = f''' - DELETE FROM coverage_crossref WHERE 1; + TRUNCATE coverage_crossref; + ''' + + coverage_crossref_drop_signal_index = f''' + DROP INDEX coverage_crossref_signal_key_id on coverage_crossref; + ''' + + coverage_crossref_drop_geo_index = f''' + DROP INDEX coverage_crossref_geo_key_id on coverage_crossref; + ''' + + coverage_crossref_create_signal_index = f''' + CREATE INDEX coverage_crossref_signal_key_id ON coverage_crossref (signal_key_id); + ''' + + coverage_crossref_create_geo_index = f''' + CREATE INDEX coverage_crossref_geo_key_id ON coverage_crossref (geo_key_id); ''' coverage_crossref_update_sql = f''' @@ -599,7 +615,21 @@ def compute_coverage_crossref(self): self._cursor.execute(coverage_crossref_delete_sql) logger.info(f"coverage_crossref_delete_sql:{self._cursor.rowcount}") + self._cursor.execute(coverage_crossref_drop_signal_index) + logger.info(f"coverage_crossref_drop_signal_index:{self._cursor.rowcount}") + + self._cursor.execute(coverage_crossref_drop_geo_index) + logger.info(f"coverage_crossref_drop_geo_index:{self._cursor.rowcount}") + self._cursor.execute(coverage_crossref_update_sql) logger.info(f"coverage_crossref_update_sql:{self._cursor.rowcount}") + main_rowcount = self._cursor.rowcount + + self._cursor.execute(coverage_crossref_create_signal_index) + logger.info(f"coverage_crossref_create_signal_index:{self._cursor.rowcount}") + + self._cursor.execute(coverage_crossref_create_geo_index) + logger.info(f"coverage_crossref_create_geo_index:{self._cursor.rowcount}") + self.commit() - return self._cursor.rowcount \ No newline at end of file + return main_rowcount \ No newline at end of file diff --git a/src/maintenance/coverage_crossref_updater.py b/src/maintenance/coverage_crossref_updater.py new file mode 100644 index 000000000..9013087c6 --- /dev/null +++ b/src/maintenance/coverage_crossref_updater.py @@ -0,0 +1,58 @@ +"""Updates the table for the `coverage_crossref` endpoint.""" + +# standard library +import argparse +import sys +import time + +# first party +from delphi.epidata.acquisition.covidcast.database import Database +from delphi_utils import get_structured_logger +from delphi.epidata.client.delphi_epidata import Epidata + +def get_argument_parser(): + """Define command line arguments.""" + + parser = argparse.ArgumentParser() + parser.add_argument("--log_file", help="filename for log output") + return parser + + +def main(args, epidata_impl=Epidata, database_impl=Database): + """Updates the table for the `coverage_crossref`. + + `args`: parsed command-line arguments + """ + log_file = None + + logger = get_structured_logger( + "coverage_crossref_updater", + filename=log_file) + start_time = time.time() + database = database_impl() + database.connect() + + # compute and update coverage_crossref + try: + coverage = database.compute_coverage_crossref() + except: + # clean up before failing + database.disconnect(True) + raise + + args = ("success",1) + if coverage==0: + args = ("no results",-2) + + logger.info('coverage_crossref result: %s (code %d)' % args) + + + logger.info( + "Generated and updated covidcast metadata", + total_runtime_in_seconds=round(time.time() - start_time, 2)) + return True + + +if __name__ == '__main__': + if not main(get_argument_parser().parse_args()): + sys.exit(1) diff --git a/tests/acquisition/covidcast/test_database.py b/tests/acquisition/covidcast/test_database.py index b676e7413..cc998f1ac 100644 --- a/tests/acquisition/covidcast/test_database.py +++ b/tests/acquisition/covidcast/test_database.py @@ -78,6 +78,22 @@ def test_update_covidcast_meta_cache_query(self): self.assertIn('timestamp', sql) self.assertIn('epidata', sql) + def test_compute_coverage_crossref_query(self): + """Query to update the compute crossref looks sensible. + + NOTE: Actual behavior is tested by integration test. + """ + + mock_connector = MagicMock() + database = Database() + database.connect(connector_impl=mock_connector) + + database.compute_coverage_crossref() + + connection = mock_connector.connect() + cursor = connection.cursor() + self.assertTrue(cursor.execute.called) + def test_insert_or_update_batch_exception_reraised(self): """Test that an exception is reraised""" mock_connector = MagicMock() From 233ccd26c2d3ef87e78a3066729a4d134fdfd34f Mon Sep 17 00:00:00 2001 From: Nolan Gormley Date: Wed, 29 Jan 2025 16:04:29 -0500 Subject: [PATCH 07/27] fixed sonar suggestions --- src/acquisition/covidcast/database.py | 31 ++++++++++++-------- src/maintenance/coverage_crossref_updater.py | 8 ++--- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index ab523a176..9d1418281 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -566,11 +566,11 @@ def compute_coverage_crossref(self): """Compute coverage_crossref table.""" logger = get_structured_logger("compute_coverage_crossref") - coverage_crossref_load_delete_sql = f''' + coverage_crossref_load_delete_sql = ''' TRUNCATE coverage_crossref_load; ''' - coverage_crossref_compute_sql = f''' + coverage_crossref_compute_sql = ''' INSERT INTO coverage_crossref_load SELECT el.signal_key_id, @@ -581,27 +581,27 @@ def compute_coverage_crossref(self): GROUP BY el.signal_key_id, el.geo_key_id; ''' - coverage_crossref_delete_sql = f''' + coverage_crossref_delete_sql = ''' TRUNCATE coverage_crossref; ''' - coverage_crossref_drop_signal_index = f''' + coverage_crossref_drop_signal_index = ''' DROP INDEX coverage_crossref_signal_key_id on coverage_crossref; ''' - coverage_crossref_drop_geo_index = f''' + coverage_crossref_drop_geo_index = ''' DROP INDEX coverage_crossref_geo_key_id on coverage_crossref; ''' - coverage_crossref_create_signal_index = f''' + coverage_crossref_create_signal_index = ''' CREATE INDEX coverage_crossref_signal_key_id ON coverage_crossref (signal_key_id); ''' - coverage_crossref_create_geo_index = f''' + coverage_crossref_create_geo_index = ''' CREATE INDEX coverage_crossref_geo_key_id ON coverage_crossref (geo_key_id); ''' - coverage_crossref_update_sql = f''' + coverage_crossref_update_sql = ''' INSERT INTO coverage_crossref SELECT * FROM coverage_crossref_load; ''' @@ -615,11 +615,18 @@ def compute_coverage_crossref(self): self._cursor.execute(coverage_crossref_delete_sql) logger.info(f"coverage_crossref_delete_sql:{self._cursor.rowcount}") - self._cursor.execute(coverage_crossref_drop_signal_index) - logger.info(f"coverage_crossref_drop_signal_index:{self._cursor.rowcount}") + # These will fail if the index does not exist, which is fine + try: + self._cursor.execute(coverage_crossref_drop_signal_index) + logger.info(f"coverage_crossref_drop_signal_index:{self._cursor.rowcount}") + except Exception as e: + logger.info(f"coverage_crossref_drop_signal_index:{e}") - self._cursor.execute(coverage_crossref_drop_geo_index) - logger.info(f"coverage_crossref_drop_geo_index:{self._cursor.rowcount}") + try: + self._cursor.execute(coverage_crossref_drop_geo_index) + logger.info(f"coverage_crossref_drop_geo_index:{self._cursor.rowcount}") + except Exception as e: + logger.info(f"coverage_crossref_drop_geo_index:{e}") self._cursor.execute(coverage_crossref_update_sql) logger.info(f"coverage_crossref_update_sql:{self._cursor.rowcount}") diff --git a/src/maintenance/coverage_crossref_updater.py b/src/maintenance/coverage_crossref_updater.py index 9013087c6..c67e3d01c 100644 --- a/src/maintenance/coverage_crossref_updater.py +++ b/src/maintenance/coverage_crossref_updater.py @@ -18,7 +18,7 @@ def get_argument_parser(): return parser -def main(args, epidata_impl=Epidata, database_impl=Database): +def main(args, database_impl=Database): """Updates the table for the `coverage_crossref`. `args`: parsed command-line arguments @@ -40,11 +40,11 @@ def main(args, epidata_impl=Epidata, database_impl=Database): database.disconnect(True) raise - args = ("success",1) + result = ("success",1) if coverage==0: - args = ("no results",-2) + result = ("no results",-2) - logger.info('coverage_crossref result: %s (code %d)' % args) + logger.info('coverage_crossref result: %s (code %d)' % result) logger.info( From 65c040a5e34a8d8efee207b8d7381b49db376ce0 Mon Sep 17 00:00:00 2001 From: Nolan Gormley Date: Wed, 29 Jan 2025 16:16:12 -0500 Subject: [PATCH 08/27] more sonar issues --- .../test_coverage_crossref_update.py | 9 ++++---- src/maintenance/coverage_crossref_updater.py | 22 ++++--------------- 2 files changed, 8 insertions(+), 23 deletions(-) diff --git a/integrations/acquisition/covidcast/test_coverage_crossref_update.py b/integrations/acquisition/covidcast/test_coverage_crossref_update.py index a99b0bc0f..fdbcf45f8 100644 --- a/integrations/acquisition/covidcast/test_coverage_crossref_update.py +++ b/integrations/acquisition/covidcast/test_coverage_crossref_update.py @@ -91,12 +91,12 @@ def test_caching(self): """Populate, query, cache, query, and verify the cache.""" # insert dummy data - self.cur.execute(f''' + self.cur.execute(''' INSERT INTO `signal_dim` (`signal_key_id`, `source`, `signal`) VALUES (42, 'src', 'sig'); ''') - self.cur.execute(f''' + self.cur.execute(''' INSERT INTO `geo_dim` (`geo_key_id`, `geo_type`, `geo_value`) VALUES (96, 'state', 'pa'), @@ -126,9 +126,8 @@ def test_caching(self): 'message': 'no results', }) - # update the cache - args = [] - main(args) + # update the coverage crossref table + main() results = self._make_request() diff --git a/src/maintenance/coverage_crossref_updater.py b/src/maintenance/coverage_crossref_updater.py index c67e3d01c..e6560895a 100644 --- a/src/maintenance/coverage_crossref_updater.py +++ b/src/maintenance/coverage_crossref_updater.py @@ -10,24 +10,11 @@ from delphi_utils import get_structured_logger from delphi.epidata.client.delphi_epidata import Epidata -def get_argument_parser(): - """Define command line arguments.""" - parser = argparse.ArgumentParser() - parser.add_argument("--log_file", help="filename for log output") - return parser +def main(database_impl=Database): + """Updates the table for the `coverage_crossref`.""" - -def main(args, database_impl=Database): - """Updates the table for the `coverage_crossref`. - - `args`: parsed command-line arguments - """ - log_file = None - - logger = get_structured_logger( - "coverage_crossref_updater", - filename=log_file) + logger = get_structured_logger("coverage_crossref_updater") start_time = time.time() database = database_impl() database.connect() @@ -54,5 +41,4 @@ def main(args, database_impl=Database): if __name__ == '__main__': - if not main(get_argument_parser().parse_args()): - sys.exit(1) + main() From 3c404b2c58c99ee530ec7a7861e78c25e2d72cfb Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Thu, 30 Jan 2025 11:27:21 -0500 Subject: [PATCH 09/27] ignore sonar test --- .../acquisition/covidcast/test_coverage_crossref_update.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/integrations/acquisition/covidcast/test_coverage_crossref_update.py b/integrations/acquisition/covidcast/test_coverage_crossref_update.py index fdbcf45f8..a06cc3b89 100644 --- a/integrations/acquisition/covidcast/test_coverage_crossref_update.py +++ b/integrations/acquisition/covidcast/test_coverage_crossref_update.py @@ -22,7 +22,7 @@ ) # use the local instance of the Epidata API -BASE_URL = 'http://delphi_web_epidata/epidata' +BASE_URL = 'http://delphi_web_epidata/epidata' # NOSONAR class CoverageCrossrefTests(unittest.TestCase): @@ -99,7 +99,7 @@ def test_caching(self): self.cur.execute(''' INSERT INTO `geo_dim` (`geo_key_id`, `geo_type`, `geo_value`) VALUES - (96, 'state', 'pa'), + (96, 'state', 'pa'), (97, 'state', 'wa'); ''') self.cur.execute(f''' @@ -118,7 +118,7 @@ def test_caching(self): self.cnx.commit() results = self._make_request() - + # make sure the tables are empty self.assertEqual(results, { 'result': -2, From a5555a531837534145034438966293f3e728d436 Mon Sep 17 00:00:00 2001 From: Nolan Gormley Date: Thu, 30 Jan 2025 15:04:27 -0500 Subject: [PATCH 10/27] removed unessesary index rebuild, fixed indexes in ddl sql file --- src/acquisition/covidcast/database.py | 36 ++------------------------- src/ddl/v4_schema.sql | 4 +-- 2 files changed, 4 insertions(+), 36 deletions(-) diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index 9d1418281..6e2f5343e 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -585,22 +585,6 @@ def compute_coverage_crossref(self): TRUNCATE coverage_crossref; ''' - coverage_crossref_drop_signal_index = ''' - DROP INDEX coverage_crossref_signal_key_id on coverage_crossref; - ''' - - coverage_crossref_drop_geo_index = ''' - DROP INDEX coverage_crossref_geo_key_id on coverage_crossref; - ''' - - coverage_crossref_create_signal_index = ''' - CREATE INDEX coverage_crossref_signal_key_id ON coverage_crossref (signal_key_id); - ''' - - coverage_crossref_create_geo_index = ''' - CREATE INDEX coverage_crossref_geo_key_id ON coverage_crossref (geo_key_id); - ''' - coverage_crossref_update_sql = ''' INSERT INTO coverage_crossref SELECT * FROM coverage_crossref_load; @@ -615,28 +599,12 @@ def compute_coverage_crossref(self): self._cursor.execute(coverage_crossref_delete_sql) logger.info(f"coverage_crossref_delete_sql:{self._cursor.rowcount}") - # These will fail if the index does not exist, which is fine - try: - self._cursor.execute(coverage_crossref_drop_signal_index) - logger.info(f"coverage_crossref_drop_signal_index:{self._cursor.rowcount}") - except Exception as e: - logger.info(f"coverage_crossref_drop_signal_index:{e}") - - try: - self._cursor.execute(coverage_crossref_drop_geo_index) - logger.info(f"coverage_crossref_drop_geo_index:{self._cursor.rowcount}") - except Exception as e: - logger.info(f"coverage_crossref_drop_geo_index:{e}") - self._cursor.execute(coverage_crossref_update_sql) logger.info(f"coverage_crossref_update_sql:{self._cursor.rowcount}") main_rowcount = self._cursor.rowcount - self._cursor.execute(coverage_crossref_create_signal_index) - logger.info(f"coverage_crossref_create_signal_index:{self._cursor.rowcount}") - - self._cursor.execute(coverage_crossref_create_geo_index) - logger.info(f"coverage_crossref_create_geo_index:{self._cursor.rowcount}") + self._cursor.execute(coverage_crossref_load_delete_sql) + logger.info(f"coverage_crossref_load_delete_sql:{self._cursor.rowcount}") self.commit() return main_rowcount \ No newline at end of file diff --git a/src/ddl/v4_schema.sql b/src/ddl/v4_schema.sql index aaa560e3a..2c6b7d264 100644 --- a/src/ddl/v4_schema.sql +++ b/src/ddl/v4_schema.sql @@ -177,8 +177,8 @@ CREATE TABLE `coverage_crossref` ( `geo_key_id` bigint NOT NULL, `min_time_value` int NOT NULL, `max_time_value` int NOT NULL, - UNIQUE INDEX coverage_crossref_signal_key_id (`signal_key_id`), - UNIQUE INDEX coverage_crossref_geo_key_id (`geo_key_id`) + UNIQUE INDEX coverage_crossref_geo_sig (`geo_key_id`, `signal_key_id`), + INDEX coverage_crossref_sig_geo (`signal_key_id`, `geo_key_id`) ) ENGINE=InnoDB; CREATE OR REPLACE VIEW `coverage_crossref_v` AS From 26c645037480cbfdc28e57b20a4a69bacc9ef5a3 Mon Sep 17 00:00:00 2001 From: Nolan Gormley Date: Fri, 31 Jan 2025 11:48:59 -0500 Subject: [PATCH 11/27] updated SQL to use transaction instead of load table --- .../test_coverage_crossref_update.py | 1 - src/acquisition/covidcast/database.py | 34 +++++-------------- src/ddl/v4_schema.sql | 7 ---- 3 files changed, 8 insertions(+), 34 deletions(-) diff --git a/integrations/acquisition/covidcast/test_coverage_crossref_update.py b/integrations/acquisition/covidcast/test_coverage_crossref_update.py index a06cc3b89..f1e8738d8 100644 --- a/integrations/acquisition/covidcast/test_coverage_crossref_update.py +++ b/integrations/acquisition/covidcast/test_coverage_crossref_update.py @@ -46,7 +46,6 @@ def setUp(self): cur.execute("truncate table geo_dim") cur.execute("truncate table signal_dim") cur.execute("truncate table coverage_crossref") - cur.execute("truncate table coverage_crossref_load") cnx.commit() cur.close() diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index 6e2f5343e..e647ff599 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -566,45 +566,27 @@ def compute_coverage_crossref(self): """Compute coverage_crossref table.""" logger = get_structured_logger("compute_coverage_crossref") - coverage_crossref_load_delete_sql = ''' - TRUNCATE coverage_crossref_load; - ''' + coverage_crossref_delete_sql = ''' + DELETE FROM coverage_crossref; + ''' - coverage_crossref_compute_sql = ''' - INSERT INTO coverage_crossref_load + coverage_crossref_update_sql = ''' + INSERT INTO coverage_crossref (signal_key_id, geo_key_id, min_time_value, max_time_value) SELECT el.signal_key_id, el.geo_key_id, MIN(el.time_value) as min_time_value, MAX(el.time_value) as max_time_value - FROM epimetric_latest el + FROM covid.epimetric_latest el GROUP BY el.signal_key_id, el.geo_key_id; - ''' - - coverage_crossref_delete_sql = ''' - TRUNCATE coverage_crossref; ''' - coverage_crossref_update_sql = ''' - INSERT INTO coverage_crossref - SELECT * FROM coverage_crossref_load; - ''' - - self._cursor.execute(coverage_crossref_load_delete_sql) - logger.info(f"coverage_crossref_load_delete_sql:{self._cursor.rowcount}") - - self._cursor.execute(coverage_crossref_compute_sql) - logger.info(f"coverage_crossref_compute_sql:{self._cursor.rowcount}") - + self._connection.start_transaction() self._cursor.execute(coverage_crossref_delete_sql) logger.info(f"coverage_crossref_delete_sql:{self._cursor.rowcount}") self._cursor.execute(coverage_crossref_update_sql) logger.info(f"coverage_crossref_update_sql:{self._cursor.rowcount}") - main_rowcount = self._cursor.rowcount - - self._cursor.execute(coverage_crossref_load_delete_sql) - logger.info(f"coverage_crossref_load_delete_sql:{self._cursor.rowcount}") self.commit() - return main_rowcount \ No newline at end of file + return self._cursor.rowcount \ No newline at end of file diff --git a/src/ddl/v4_schema.sql b/src/ddl/v4_schema.sql index 2c6b7d264..e151bb251 100644 --- a/src/ddl/v4_schema.sql +++ b/src/ddl/v4_schema.sql @@ -165,13 +165,6 @@ CREATE TABLE `covidcast_meta_cache` ( ) ENGINE=InnoDB; INSERT INTO covidcast_meta_cache VALUES (0, '[]'); -CREATE TABLE `coverage_crossref_load` ( - `signal_key_id` bigint NOT NULL, - `geo_key_id` bigint NOT NULL, - `min_time_value` int NOT NULL, - `max_time_value` int NOT NULL -) ENGINE=InnoDB; - CREATE TABLE `coverage_crossref` ( `signal_key_id` bigint NOT NULL, `geo_key_id` bigint NOT NULL, From 1f260612aca78c6e534243ef75af3cd730b38c98 Mon Sep 17 00:00:00 2001 From: Nolan Gormley Date: Fri, 31 Jan 2025 12:10:39 -0500 Subject: [PATCH 12/27] updated integration test, removed SQL alias --- .../test_coverage_crossref_update.py | 68 +++---------------- src/ddl/v4_schema_aliases.sql | 1 - 2 files changed, 8 insertions(+), 61 deletions(-) diff --git a/integrations/acquisition/covidcast/test_coverage_crossref_update.py b/integrations/acquisition/covidcast/test_coverage_crossref_update.py index f1e8738d8..37dfb3451 100644 --- a/integrations/acquisition/covidcast/test_coverage_crossref_update.py +++ b/integrations/acquisition/covidcast/test_coverage_crossref_update.py @@ -14,70 +14,18 @@ import delphi.operations.secrets as secrets import delphi.epidata.acquisition.covidcast.database as live from delphi.epidata.maintenance.coverage_crossref_updater import main - -# py3tester coverage target (equivalent to `import *`) -__test_target__ = ( - 'delphi.epidata.acquisition.covidcast.' - 'coverage_crossref_updater' -) +from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase # use the local instance of the Epidata API BASE_URL = 'http://delphi_web_epidata/epidata' # NOSONAR -class CoverageCrossrefTests(unittest.TestCase): +class CoverageCrossrefTests(CovidcastBase): """Tests coverage crossref updater.""" - def setUp(self): + def localSetUp(self): """Perform per-test setup.""" - - # connect to the `epidata` database - cnx = mysql.connector.connect( - user='user', - password='pass', - host='delphi_database_epidata', - database='covid') - cur = cnx.cursor() - - # clear all tables - cur.execute("truncate table epimetric_load") - cur.execute("truncate table epimetric_full") - cur.execute("truncate table epimetric_latest") - cur.execute("truncate table geo_dim") - cur.execute("truncate table signal_dim") - cur.execute("truncate table coverage_crossref") - cnx.commit() - cur.close() - - # make connection and cursor available to test cases - self.cnx = cnx - self.cur = cnx.cursor() - - # use the local instance of the epidata database - secrets.db.host = 'delphi_database_epidata' - secrets.db.epi = ('user', 'pass') - - epidata_cnx = mysql.connector.connect( - user='user', - password='pass', - host='delphi_database_epidata', - database='epidata') - epidata_cur = epidata_cnx.cursor() - - epidata_cur.execute("DELETE FROM `api_user`") - epidata_cur.execute('INSERT INTO `api_user`(`api_key`, `email`) VALUES("key", "email")') - epidata_cnx.commit() - epidata_cur.close() - epidata_cnx.close() - - # use the local instance of the Epidata API - Epidata.BASE_URL = BASE_URL - Epidata.auth = ('epidata', 'key') - - def tearDown(self): - """Perform per-test teardown.""" - self.cur.close() - self.cnx.close() + self._db._cursor.execute('TRUNCATE TABLE `coverage_crossref`') @staticmethod def _make_request(): @@ -90,18 +38,18 @@ def test_caching(self): """Populate, query, cache, query, and verify the cache.""" # insert dummy data - self.cur.execute(''' + self._db._cursor.execute(''' INSERT INTO `signal_dim` (`signal_key_id`, `source`, `signal`) VALUES (42, 'src', 'sig'); ''') - self.cur.execute(''' + self._db._cursor.execute(''' INSERT INTO `geo_dim` (`geo_key_id`, `geo_type`, `geo_value`) VALUES (96, 'state', 'pa'), (97, 'state', 'wa'); ''') - self.cur.execute(f''' + self._db._cursor.execute(f''' INSERT INTO `epimetric_latest` (`epimetric_id`, `signal_key_id`, `geo_key_id`, `time_type`, `time_value`, `value_updated_timestamp`, @@ -114,7 +62,7 @@ def test_caching(self): (16, 42, 97, 'day', 20200422, 789, 1, 2, 3, 20200423, 1, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}) ''') - self.cnx.commit() + self._db.commit() results = self._make_request() diff --git a/src/ddl/v4_schema_aliases.sql b/src/ddl/v4_schema_aliases.sql index 9584fcf3a..f5c6340e9 100644 --- a/src/ddl/v4_schema_aliases.sql +++ b/src/ddl/v4_schema_aliases.sql @@ -8,4 +8,3 @@ CREATE VIEW `epidata`.`epimetric_full_v` AS SELECT * FROM `covid`.`epimetric_full_v`; CREATE VIEW `epidata`.`epimetric_latest_v` AS SELECT * FROM `covid`.`epimetric_latest_v`; CREATE VIEW `epidata`.`covidcast_meta_cache` AS SELECT * FROM `covid`.`covidcast_meta_cache`; -CREATE VIEW `epidata`.`coverage_crossref_v` AS SELECT * FROM `covid`.`coverage_crossref_v`; From a0f631f84bbe54e106c748461b47496b5e74c7fe Mon Sep 17 00:00:00 2001 From: Nolan Gormley Date: Sat, 1 Feb 2025 09:18:38 -0500 Subject: [PATCH 13/27] adding alias back in --- src/ddl/v4_schema_aliases.sql | 1 + 1 file changed, 1 insertion(+) diff --git a/src/ddl/v4_schema_aliases.sql b/src/ddl/v4_schema_aliases.sql index f5c6340e9..9584fcf3a 100644 --- a/src/ddl/v4_schema_aliases.sql +++ b/src/ddl/v4_schema_aliases.sql @@ -8,3 +8,4 @@ CREATE VIEW `epidata`.`epimetric_full_v` AS SELECT * FROM `covid`.`epimetric_full_v`; CREATE VIEW `epidata`.`epimetric_latest_v` AS SELECT * FROM `covid`.`epimetric_latest_v`; CREATE VIEW `epidata`.`covidcast_meta_cache` AS SELECT * FROM `covid`.`covidcast_meta_cache`; +CREATE VIEW `epidata`.`coverage_crossref_v` AS SELECT * FROM `covid`.`coverage_crossref_v`; From 8da032657ca7c7a2bfe5035cff9e16ad1f2d4def Mon Sep 17 00:00:00 2001 From: Nolan Gormley Date: Sat, 1 Feb 2025 09:19:39 -0500 Subject: [PATCH 14/27] Update src/maintenance/coverage_crossref_updater.py Co-authored-by: george --- src/maintenance/coverage_crossref_updater.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/maintenance/coverage_crossref_updater.py b/src/maintenance/coverage_crossref_updater.py index e6560895a..bf3fae019 100644 --- a/src/maintenance/coverage_crossref_updater.py +++ b/src/maintenance/coverage_crossref_updater.py @@ -35,7 +35,7 @@ def main(database_impl=Database): logger.info( - "Generated and updated covidcast metadata", + "Generated and updated covidcast geo/signal coverage", total_runtime_in_seconds=round(time.time() - start_time, 2)) return True From 8a46b846a74937dba8693055122fb2fd69a0cb67 Mon Sep 17 00:00:00 2001 From: Nolan Gormley Date: Sat, 1 Feb 2025 09:19:58 -0500 Subject: [PATCH 15/27] Update src/acquisition/covidcast/database.py Co-authored-by: george --- src/acquisition/covidcast/database.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index e647ff599..5554d7b5c 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -573,12 +573,12 @@ def compute_coverage_crossref(self): coverage_crossref_update_sql = ''' INSERT INTO coverage_crossref (signal_key_id, geo_key_id, min_time_value, max_time_value) SELECT - el.signal_key_id, - el.geo_key_id, - MIN(el.time_value) as min_time_value, - MAX(el.time_value) as max_time_value - FROM covid.epimetric_latest el - GROUP BY el.signal_key_id, el.geo_key_id; + signal_key_id, + geo_key_id, + MIN(time_value) AS min_time_value, + MAX(time_value) AS max_time_value + FROM covid.epimetric_latest + GROUP BY signal_key_id, geo_key_id; ''' self._connection.start_transaction() From f0e60f0854b7e74d05019f1cbfa9f280aafb7b1f Mon Sep 17 00:00:00 2001 From: Nolan Gormley Date: Sat, 1 Feb 2025 09:20:10 -0500 Subject: [PATCH 16/27] Update src/acquisition/covidcast/database.py Co-authored-by: george --- src/acquisition/covidcast/database.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index 5554d7b5c..e5dbf9cba 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -563,7 +563,8 @@ def retrieve_covidcast_meta_cache(self): return cache_hash def compute_coverage_crossref(self): - """Compute coverage_crossref table.""" + """Compute coverage_crossref table, for looking up available signals per geo or vice versa.""" + logger = get_structured_logger("compute_coverage_crossref") coverage_crossref_delete_sql = ''' From d9f9260012d76e1f4cb8d5de0f71fa08bc207cfa Mon Sep 17 00:00:00 2001 From: Nolan Gormley Date: Sat, 1 Feb 2025 09:20:42 -0500 Subject: [PATCH 17/27] Update src/acquisition/covidcast/database.py Co-authored-by: george --- src/acquisition/covidcast/database.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index e5dbf9cba..80590859d 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -583,11 +583,14 @@ def compute_coverage_crossref(self): ''' self._connection.start_transaction() + self._cursor.execute(coverage_crossref_delete_sql) - logger.info(f"coverage_crossref_delete_sql:{self._cursor.rowcount}") + logger.info("coverage_crossref_delete", rows=self._cursor.rowcount) self._cursor.execute(coverage_crossref_update_sql) - logger.info(f"coverage_crossref_update_sql:{self._cursor.rowcount}") + logger.info("coverage_crossref_update", rows=self._cursor.rowcount) + self.commit() + logger.info("coverage_crossref committed") return self._cursor.rowcount \ No newline at end of file From 38b9e7b2606b75dbf6a4def5486ab4401067896d Mon Sep 17 00:00:00 2001 From: Nolan Gormley Date: Sat, 1 Feb 2025 09:21:07 -0500 Subject: [PATCH 18/27] Update src/maintenance/coverage_crossref_updater.py Co-authored-by: george --- src/maintenance/coverage_crossref_updater.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/maintenance/coverage_crossref_updater.py b/src/maintenance/coverage_crossref_updater.py index bf3fae019..4266d4b6f 100644 --- a/src/maintenance/coverage_crossref_updater.py +++ b/src/maintenance/coverage_crossref_updater.py @@ -11,12 +11,12 @@ from delphi.epidata.client.delphi_epidata import Epidata -def main(database_impl=Database): +def main(): """Updates the table for the `coverage_crossref`.""" logger = get_structured_logger("coverage_crossref_updater") start_time = time.time() - database = database_impl() + database = Database() database.connect() # compute and update coverage_crossref From 59d0ec84e5b0bdfec01a78843e6736e40b6173aa Mon Sep 17 00:00:00 2001 From: Nolan Gormley Date: Sat, 1 Feb 2025 09:22:12 -0500 Subject: [PATCH 19/27] Update integrations/acquisition/covidcast/test_coverage_crossref_update.py Co-authored-by: george --- .../acquisition/covidcast/test_coverage_crossref_update.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/acquisition/covidcast/test_coverage_crossref_update.py b/integrations/acquisition/covidcast/test_coverage_crossref_update.py index 37dfb3451..ac7bf4abc 100644 --- a/integrations/acquisition/covidcast/test_coverage_crossref_update.py +++ b/integrations/acquisition/covidcast/test_coverage_crossref_update.py @@ -1,4 +1,4 @@ -"""Integration tests for covidcast's metadata caching.""" +"""Integration tests for the covidcast `geo_coverage` endpoint.""" # standard library import json From 0bf61b722bd3d11d7a53c97d344974113e4792db Mon Sep 17 00:00:00 2001 From: Nolan Gormley Date: Mon, 3 Feb 2025 11:24:22 -0500 Subject: [PATCH 20/27] Update src/server/endpoints/covidcast.py Co-authored-by: george --- src/server/endpoints/covidcast.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/server/endpoints/covidcast.py b/src/server/endpoints/covidcast.py index de6f98e5a..42c4a300c 100644 --- a/src/server/endpoints/covidcast.py +++ b/src/server/endpoints/covidcast.py @@ -557,6 +557,7 @@ def handle_geo_coverage(): q.apply_geo_filters("geo_type", "geo_value", geo_sets) q.set_sort_order("source", "signal") + q.group_by = fields_string # this condenses duplicate results, similar to `SELECT DISTINCT` return execute_query(q.query, q.params, fields_string, [], []) From 8a590223ade2cdb2b2647ccbc2082244465222b6 Mon Sep 17 00:00:00 2001 From: Nolan Gormley Date: Mon, 3 Feb 2025 11:24:37 -0500 Subject: [PATCH 21/27] Update integrations/acquisition/covidcast/test_coverage_crossref_update.py Co-authored-by: george --- .../acquisition/covidcast/test_coverage_crossref_update.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/integrations/acquisition/covidcast/test_coverage_crossref_update.py b/integrations/acquisition/covidcast/test_coverage_crossref_update.py index ac7bf4abc..21e090cac 100644 --- a/integrations/acquisition/covidcast/test_coverage_crossref_update.py +++ b/integrations/acquisition/covidcast/test_coverage_crossref_update.py @@ -81,7 +81,6 @@ def test_caching(self): # make sure the cache was actually served self.assertEqual(results, { 'result': 1, - 'epidata': [{'signal': 'sig', 'source': 'src'}, - {'signal': 'sig', 'source': 'src'}], + 'epidata': [{'signal': 'sig', 'source': 'src'}], 'message': 'success', }) From a458730aaa8c801874eb7153f9a3c9731ef455eb Mon Sep 17 00:00:00 2001 From: Nolan Gormley Date: Mon, 3 Feb 2025 11:51:37 -0500 Subject: [PATCH 22/27] added test coverage and fixed groupby --- .../test_coverage_crossref_update.py | 25 ++++++++++++++++--- src/server/endpoints/covidcast.py | 2 +- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/integrations/acquisition/covidcast/test_coverage_crossref_update.py b/integrations/acquisition/covidcast/test_coverage_crossref_update.py index 21e090cac..18186f4c6 100644 --- a/integrations/acquisition/covidcast/test_coverage_crossref_update.py +++ b/integrations/acquisition/covidcast/test_coverage_crossref_update.py @@ -28,8 +28,9 @@ def localSetUp(self): self._db._cursor.execute('TRUNCATE TABLE `coverage_crossref`') @staticmethod - def _make_request(): - params = {'geo': 'state:*'} + def _make_request(params=None): + if params is None: + params = {'geo': 'state:*'} response = requests.get(f"{Epidata.BASE_URL}/covidcast/geo_coverage", params=params, auth=Epidata.auth) response.raise_for_status() return response.json() @@ -78,7 +79,25 @@ def test_caching(self): results = self._make_request() - # make sure the cache was actually served + # make sure the data was actually served + self.assertEqual(results, { + 'result': 1, + 'epidata': [{'signal': 'sig', 'source': 'src'}], + 'message': 'success', + }) + + results = self._make_request(params = {'geo': 'hrr:*'}) + + # make sure the tables are empty + self.assertEqual(results, { + 'result': -2, + 'epidata': [], + 'message': 'no results', + }) + + results = self._make_request(params = {'geo': 'state:pa'}) + + # make sure the data was actually served self.assertEqual(results, { 'result': 1, 'epidata': [{'signal': 'sig', 'source': 'src'}], diff --git a/src/server/endpoints/covidcast.py b/src/server/endpoints/covidcast.py index 42c4a300c..f4875fb6d 100644 --- a/src/server/endpoints/covidcast.py +++ b/src/server/endpoints/covidcast.py @@ -557,7 +557,7 @@ def handle_geo_coverage(): q.apply_geo_filters("geo_type", "geo_value", geo_sets) q.set_sort_order("source", "signal") - q.group_by = fields_string # this condenses duplicate results, similar to `SELECT DISTINCT` + q.group_by = ["c." + field for field in fields_string] # this condenses duplicate results, similar to `SELECT DISTINCT` return execute_query(q.query, q.params, fields_string, [], []) From 62b7475ceee9b4c75647200a54a3de3f7ae21c5f Mon Sep 17 00:00:00 2001 From: Nolan Gormley Date: Mon, 3 Feb 2025 13:47:00 -0500 Subject: [PATCH 23/27] Update src/maintenance/coverage_crossref_updater.py Co-authored-by: george --- src/maintenance/coverage_crossref_updater.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/maintenance/coverage_crossref_updater.py b/src/maintenance/coverage_crossref_updater.py index 4266d4b6f..a31c1a38a 100644 --- a/src/maintenance/coverage_crossref_updater.py +++ b/src/maintenance/coverage_crossref_updater.py @@ -27,12 +27,7 @@ def main(): database.disconnect(True) raise - result = ("success",1) - if coverage==0: - result = ("no results",-2) - - logger.info('coverage_crossref result: %s (code %d)' % result) - + logger.info(f"coverage_crossref returned: {coverage}") logger.info( "Generated and updated covidcast geo/signal coverage", From 269c1084870bf5100ab4b136f154a49f61a8bb98 Mon Sep 17 00:00:00 2001 From: Nolan Gormley Date: Mon, 3 Feb 2025 13:47:15 -0500 Subject: [PATCH 24/27] Update src/maintenance/coverage_crossref_updater.py Co-authored-by: george --- src/maintenance/coverage_crossref_updater.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/maintenance/coverage_crossref_updater.py b/src/maintenance/coverage_crossref_updater.py index a31c1a38a..ee80a190f 100644 --- a/src/maintenance/coverage_crossref_updater.py +++ b/src/maintenance/coverage_crossref_updater.py @@ -22,10 +22,9 @@ def main(): # compute and update coverage_crossref try: coverage = database.compute_coverage_crossref() - except: - # clean up before failing + finally: + # clean up in success and in failure database.disconnect(True) - raise logger.info(f"coverage_crossref returned: {coverage}") From b0be91fb33a78f2e958a3872b28d24c17f88c753 Mon Sep 17 00:00:00 2001 From: Nolan Gormley Date: Mon, 3 Feb 2025 13:47:44 -0500 Subject: [PATCH 25/27] Update src/maintenance/coverage_crossref_updater.py Co-authored-by: george --- src/maintenance/coverage_crossref_updater.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/maintenance/coverage_crossref_updater.py b/src/maintenance/coverage_crossref_updater.py index ee80a190f..002f5d023 100644 --- a/src/maintenance/coverage_crossref_updater.py +++ b/src/maintenance/coverage_crossref_updater.py @@ -1,14 +1,9 @@ """Updates the table for the `coverage_crossref` endpoint.""" -# standard library -import argparse -import sys import time -# first party from delphi.epidata.acquisition.covidcast.database import Database from delphi_utils import get_structured_logger -from delphi.epidata.client.delphi_epidata import Epidata def main(): From 3b137d8cdf53e612bb03c516e496eae33be1d00e Mon Sep 17 00:00:00 2001 From: Nolan Gormley Date: Mon, 3 Feb 2025 13:48:26 -0500 Subject: [PATCH 26/27] Update integrations/acquisition/covidcast/test_coverage_crossref_update.py Co-authored-by: george --- .../test_coverage_crossref_update.py | 30 ++++--------------- 1 file changed, 5 insertions(+), 25 deletions(-) diff --git a/integrations/acquisition/covidcast/test_coverage_crossref_update.py b/integrations/acquisition/covidcast/test_coverage_crossref_update.py index 18186f4c6..629ac3f8e 100644 --- a/integrations/acquisition/covidcast/test_coverage_crossref_update.py +++ b/integrations/acquisition/covidcast/test_coverage_crossref_update.py @@ -39,31 +39,11 @@ def test_caching(self): """Populate, query, cache, query, and verify the cache.""" # insert dummy data - self._db._cursor.execute(''' - INSERT INTO `signal_dim` (`signal_key_id`, `source`, `signal`) - VALUES - (42, 'src', 'sig'); - ''') - self._db._cursor.execute(''' - INSERT INTO `geo_dim` (`geo_key_id`, `geo_type`, `geo_value`) - VALUES - (96, 'state', 'pa'), - (97, 'state', 'wa'); - ''') - self._db._cursor.execute(f''' - INSERT INTO - `epimetric_latest` (`epimetric_id`, `signal_key_id`, `geo_key_id`, `time_type`, - `time_value`, `value_updated_timestamp`, - `value`, `stderr`, `sample_size`, - `issue`, `lag`, `missing_value`, - `missing_stderr`,`missing_sample_size`) - VALUES - (15, 42, 96, 'day', 20200422, - 123, 1, 2, 3, 20200422, 0, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}), - (16, 42, 97, 'day', 20200422, - 789, 1, 2, 3, 20200423, 1, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}) - ''') - self._db.commit() + self._insert_rows([ + CovidcastTestRow.make_default_row(geo_type="state", geo_value="pa"), + CovidcastTestRow.make_default_row(geo_type="state", geo_value="ny"), + CovidcastTestRow.make_default_row(geo_type="state", geo_value="ny", signal="sig2"), + ]) results = self._make_request() From 16048457c95f67c6e0a5ab179e551c7efbbb3346 Mon Sep 17 00:00:00 2001 From: Nolan Gormley Date: Mon, 3 Feb 2025 14:02:19 -0500 Subject: [PATCH 27/27] added some test cases --- .../test_coverage_crossref_update.py | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/integrations/acquisition/covidcast/test_coverage_crossref_update.py b/integrations/acquisition/covidcast/test_coverage_crossref_update.py index 629ac3f8e..f52f17141 100644 --- a/integrations/acquisition/covidcast/test_coverage_crossref_update.py +++ b/integrations/acquisition/covidcast/test_coverage_crossref_update.py @@ -14,7 +14,7 @@ import delphi.operations.secrets as secrets import delphi.epidata.acquisition.covidcast.database as live from delphi.epidata.maintenance.coverage_crossref_updater import main -from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase +from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase, CovidcastTestRow # use the local instance of the Epidata API BASE_URL = 'http://delphi_web_epidata/epidata' # NOSONAR @@ -28,9 +28,7 @@ def localSetUp(self): self._db._cursor.execute('TRUNCATE TABLE `coverage_crossref`') @staticmethod - def _make_request(params=None): - if params is None: - params = {'geo': 'state:*'} + def _make_request(params): response = requests.get(f"{Epidata.BASE_URL}/covidcast/geo_coverage", params=params, auth=Epidata.auth) response.raise_for_status() return response.json() @@ -45,7 +43,7 @@ def test_caching(self): CovidcastTestRow.make_default_row(geo_type="state", geo_value="ny", signal="sig2"), ]) - results = self._make_request() + results = self._make_request(params = {'geo': 'state:*'}) # make sure the tables are empty self.assertEqual(results, { @@ -57,12 +55,12 @@ def test_caching(self): # update the coverage crossref table main() - results = self._make_request() + results = self._make_request(params = {'geo': 'state:*'}) # make sure the data was actually served self.assertEqual(results, { 'result': 1, - 'epidata': [{'signal': 'sig', 'source': 'src'}], + 'epidata': [{'signal': 'sig', 'source': 'src'}, {'signal': 'sig2', 'source': 'src'}], 'message': 'success', }) @@ -83,3 +81,13 @@ def test_caching(self): 'epidata': [{'signal': 'sig', 'source': 'src'}], 'message': 'success', }) + + results = self._make_request(params = {'geo': 'state:ny'}) + + # make sure the data was actually served + self.assertEqual(results, { + 'result': 1, + 'epidata': [{'signal': 'sig', 'source': 'src'}, {'signal': 'sig2', 'source': 'src'}], + 'message': 'success', + }) +