diff --git a/integrations/acquisition/covidcast/test_coverage_crossref_update.py b/integrations/acquisition/covidcast/test_coverage_crossref_update.py new file mode 100644 index 000000000..f52f17141 --- /dev/null +++ b/integrations/acquisition/covidcast/test_coverage_crossref_update.py @@ -0,0 +1,93 @@ +"""Integration tests for the covidcast `geo_coverage` endpoint.""" + +# standard library +import json +import unittest + +# third party +import mysql.connector +import requests + +# first party +from delphi_utils import Nans +from delphi.epidata.client.delphi_epidata import Epidata +import delphi.operations.secrets as secrets +import delphi.epidata.acquisition.covidcast.database as live +from delphi.epidata.maintenance.coverage_crossref_updater import main +from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase, CovidcastTestRow + +# use the local instance of the Epidata API +BASE_URL = 'http://delphi_web_epidata/epidata' # NOSONAR + + +class CoverageCrossrefTests(CovidcastBase): + """Tests coverage crossref updater.""" + + def localSetUp(self): + """Perform per-test setup.""" + self._db._cursor.execute('TRUNCATE TABLE `coverage_crossref`') + + @staticmethod + def _make_request(params): + response = requests.get(f"{Epidata.BASE_URL}/covidcast/geo_coverage", params=params, auth=Epidata.auth) + response.raise_for_status() + return response.json() + + def test_caching(self): + """Populate, query, cache, query, and verify the cache.""" + + # insert dummy data + self._insert_rows([ + CovidcastTestRow.make_default_row(geo_type="state", geo_value="pa"), + CovidcastTestRow.make_default_row(geo_type="state", geo_value="ny"), + CovidcastTestRow.make_default_row(geo_type="state", geo_value="ny", signal="sig2"), + ]) + + results = self._make_request(params = {'geo': 'state:*'}) + + # make sure the tables are empty + self.assertEqual(results, { + 'result': -2, + 'epidata': [], + 'message': 'no results', + }) + + # update the coverage crossref table + main() + + results = self._make_request(params = {'geo': 'state:*'}) + + # make sure the data was actually served + self.assertEqual(results, { + 'result': 1, + 'epidata': [{'signal': 'sig', 'source': 'src'}, {'signal': 'sig2', 'source': 'src'}], + 'message': 'success', + }) + + results = self._make_request(params = {'geo': 'hrr:*'}) + + # make sure the tables are empty + self.assertEqual(results, { + 'result': -2, + 'epidata': [], + 'message': 'no results', + }) + + results = self._make_request(params = {'geo': 'state:pa'}) + + # make sure the data was actually served + self.assertEqual(results, { + 'result': 1, + 'epidata': [{'signal': 'sig', 'source': 'src'}], + 'message': 'success', + }) + + results = self._make_request(params = {'geo': 'state:ny'}) + + # make sure the data was actually served + self.assertEqual(results, { + 'result': 1, + 'epidata': [{'signal': 'sig', 'source': 'src'}, {'signal': 'sig2', 'source': 'src'}], + 'message': 'success', + }) + diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index 5fd56923b..80590859d 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -561,3 +561,36 @@ def retrieve_covidcast_meta_cache(self): for entry in cache: cache_hash[(entry['data_source'], entry['signal'], entry['time_type'], entry['geo_type'])] = entry return cache_hash + + def compute_coverage_crossref(self): + """Compute coverage_crossref table, for looking up available signals per geo or vice versa.""" + + logger = get_structured_logger("compute_coverage_crossref") + + coverage_crossref_delete_sql = ''' + DELETE FROM coverage_crossref; + ''' + + coverage_crossref_update_sql = ''' + INSERT INTO coverage_crossref (signal_key_id, geo_key_id, min_time_value, max_time_value) + SELECT + signal_key_id, + geo_key_id, + MIN(time_value) AS min_time_value, + MAX(time_value) AS max_time_value + FROM covid.epimetric_latest + GROUP BY signal_key_id, geo_key_id; + ''' + + self._connection.start_transaction() + + self._cursor.execute(coverage_crossref_delete_sql) + logger.info("coverage_crossref_delete", rows=self._cursor.rowcount) + + self._cursor.execute(coverage_crossref_update_sql) + logger.info("coverage_crossref_update", rows=self._cursor.rowcount) + + self.commit() + logger.info("coverage_crossref committed") + + return self._cursor.rowcount \ No newline at end of file diff --git a/src/ddl/v4_schema.sql b/src/ddl/v4_schema.sql index 7551707f6..e151bb251 100644 --- a/src/ddl/v4_schema.sql +++ b/src/ddl/v4_schema.sql @@ -164,3 +164,24 @@ CREATE TABLE `covidcast_meta_cache` ( PRIMARY KEY (`timestamp`) ) ENGINE=InnoDB; INSERT INTO covidcast_meta_cache VALUES (0, '[]'); + +CREATE TABLE `coverage_crossref` ( + `signal_key_id` bigint NOT NULL, + `geo_key_id` bigint NOT NULL, + `min_time_value` int NOT NULL, + `max_time_value` int NOT NULL, + UNIQUE INDEX coverage_crossref_geo_sig (`geo_key_id`, `signal_key_id`), + INDEX coverage_crossref_sig_geo (`signal_key_id`, `geo_key_id`) +) ENGINE=InnoDB; + +CREATE OR REPLACE VIEW `coverage_crossref_v` AS +SELECT + `sd`.`source`, + `sd`.`signal`, + `gd`.`geo_type`, + `gd`.`geo_value`, + `cc`.`min_time_value`, + `cc`.`max_time_value` +FROM `coverage_crossref` `cc` +JOIN `signal_dim` `sd` USING (`signal_key_id`) +JOIN `geo_dim` `gd` USING (`geo_key_id`); diff --git a/src/ddl/v4_schema_aliases.sql b/src/ddl/v4_schema_aliases.sql index f5c6340e9..9584fcf3a 100644 --- a/src/ddl/v4_schema_aliases.sql +++ b/src/ddl/v4_schema_aliases.sql @@ -8,3 +8,4 @@ CREATE VIEW `epidata`.`epimetric_full_v` AS SELECT * FROM `covid`.`epimetric_full_v`; CREATE VIEW `epidata`.`epimetric_latest_v` AS SELECT * FROM `covid`.`epimetric_latest_v`; CREATE VIEW `epidata`.`covidcast_meta_cache` AS SELECT * FROM `covid`.`covidcast_meta_cache`; +CREATE VIEW `epidata`.`coverage_crossref_v` AS SELECT * FROM `covid`.`coverage_crossref_v`; diff --git a/src/maintenance/coverage_crossref_updater.py b/src/maintenance/coverage_crossref_updater.py new file mode 100644 index 000000000..002f5d023 --- /dev/null +++ b/src/maintenance/coverage_crossref_updater.py @@ -0,0 +1,33 @@ +"""Updates the table for the `coverage_crossref` endpoint.""" + +import time + +from delphi.epidata.acquisition.covidcast.database import Database +from delphi_utils import get_structured_logger + + +def main(): + """Updates the table for the `coverage_crossref`.""" + + logger = get_structured_logger("coverage_crossref_updater") + start_time = time.time() + database = Database() + database.connect() + + # compute and update coverage_crossref + try: + coverage = database.compute_coverage_crossref() + finally: + # clean up in success and in failure + database.disconnect(True) + + logger.info(f"coverage_crossref returned: {coverage}") + + logger.info( + "Generated and updated covidcast geo/signal coverage", + total_runtime_in_seconds=round(time.time() - start_time, 2)) + return True + + +if __name__ == '__main__': + main() diff --git a/src/server/endpoints/covidcast.py b/src/server/endpoints/covidcast.py index 3d7d99e82..f4875fb6d 100644 --- a/src/server/endpoints/covidcast.py +++ b/src/server/endpoints/covidcast.py @@ -542,6 +542,24 @@ def transform_row(row, proxy): return execute_query(q.query, q.params, fields_string, fields_int, [], transform=transform_row) +@bp.route("/geo_coverage", methods=("GET", "POST")) +def handle_geo_coverage(): + """ + For a specific geo returns the signal coverage (number of signals for a given geo_type) + """ + + geo_sets = parse_geo_sets() + + q = QueryBuilder("coverage_crossref_v", "c") + fields_string = ["source", "signal"] + + q.set_fields(fields_string) + + q.apply_geo_filters("geo_type", "geo_value", geo_sets) + q.set_sort_order("source", "signal") + q.group_by = ["c." + field for field in fields_string] # this condenses duplicate results, similar to `SELECT DISTINCT` + + return execute_query(q.query, q.params, fields_string, [], []) @bp.route("/anomalies", methods=("GET", "POST")) def handle_anomalies(): diff --git a/tests/acquisition/covidcast/test_database.py b/tests/acquisition/covidcast/test_database.py index b676e7413..cc998f1ac 100644 --- a/tests/acquisition/covidcast/test_database.py +++ b/tests/acquisition/covidcast/test_database.py @@ -78,6 +78,22 @@ def test_update_covidcast_meta_cache_query(self): self.assertIn('timestamp', sql) self.assertIn('epidata', sql) + def test_compute_coverage_crossref_query(self): + """Query to update the compute crossref looks sensible. + + NOTE: Actual behavior is tested by integration test. + """ + + mock_connector = MagicMock() + database = Database() + database.connect(connector_impl=mock_connector) + + database.compute_coverage_crossref() + + connection = mock_connector.connect() + cursor = connection.cursor() + self.assertTrue(cursor.execute.called) + def test_insert_or_update_batch_exception_reraised(self): """Test that an exception is reraised""" mock_connector = MagicMock()