Skip to content

Commit

Permalink
Signal Documentation Coverage Endpoint (#1584)
Browse files Browse the repository at this point in the history
new `geo_coverage` endpoint with backing table and views, plus a utility to compute the table contents, as well as tests

---------

Co-authored-by: Amaris Sim <[email protected]>
Co-authored-by: george <[email protected]>
  • Loading branch information
3 people authored Feb 3, 2025
1 parent 828e72a commit 5863680
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""Integration tests for the covidcast `geo_coverage` endpoint."""

# standard library
import json
import unittest

# third party
import mysql.connector
import requests

# first party
from delphi_utils import Nans
from delphi.epidata.client.delphi_epidata import Epidata
import delphi.operations.secrets as secrets
import delphi.epidata.acquisition.covidcast.database as live
from delphi.epidata.maintenance.coverage_crossref_updater import main
from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase, CovidcastTestRow

# use the local instance of the Epidata API
BASE_URL = 'http://delphi_web_epidata/epidata' # NOSONAR


class CoverageCrossrefTests(CovidcastBase):
"""Tests coverage crossref updater."""

def localSetUp(self):
"""Perform per-test setup."""
self._db._cursor.execute('TRUNCATE TABLE `coverage_crossref`')

@staticmethod
def _make_request(params):
response = requests.get(f"{Epidata.BASE_URL}/covidcast/geo_coverage", params=params, auth=Epidata.auth)
response.raise_for_status()
return response.json()

def test_caching(self):
"""Populate, query, cache, query, and verify the cache."""

# insert dummy data
self._insert_rows([
CovidcastTestRow.make_default_row(geo_type="state", geo_value="pa"),
CovidcastTestRow.make_default_row(geo_type="state", geo_value="ny"),
CovidcastTestRow.make_default_row(geo_type="state", geo_value="ny", signal="sig2"),
])

results = self._make_request(params = {'geo': 'state:*'})

# make sure the tables are empty
self.assertEqual(results, {
'result': -2,
'epidata': [],
'message': 'no results',
})

# update the coverage crossref table
main()

results = self._make_request(params = {'geo': 'state:*'})

# make sure the data was actually served
self.assertEqual(results, {
'result': 1,
'epidata': [{'signal': 'sig', 'source': 'src'}, {'signal': 'sig2', 'source': 'src'}],
'message': 'success',
})

results = self._make_request(params = {'geo': 'hrr:*'})

# make sure the tables are empty
self.assertEqual(results, {
'result': -2,
'epidata': [],
'message': 'no results',
})

results = self._make_request(params = {'geo': 'state:pa'})

# make sure the data was actually served
self.assertEqual(results, {
'result': 1,
'epidata': [{'signal': 'sig', 'source': 'src'}],
'message': 'success',
})

results = self._make_request(params = {'geo': 'state:ny'})

# make sure the data was actually served
self.assertEqual(results, {
'result': 1,
'epidata': [{'signal': 'sig', 'source': 'src'}, {'signal': 'sig2', 'source': 'src'}],
'message': 'success',
})

33 changes: 33 additions & 0 deletions src/acquisition/covidcast/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,3 +561,36 @@ def retrieve_covidcast_meta_cache(self):
for entry in cache:
cache_hash[(entry['data_source'], entry['signal'], entry['time_type'], entry['geo_type'])] = entry
return cache_hash

def compute_coverage_crossref(self):
"""Compute coverage_crossref table, for looking up available signals per geo or vice versa."""

logger = get_structured_logger("compute_coverage_crossref")

coverage_crossref_delete_sql = '''
DELETE FROM coverage_crossref;
'''

coverage_crossref_update_sql = '''
INSERT INTO coverage_crossref (signal_key_id, geo_key_id, min_time_value, max_time_value)
SELECT
signal_key_id,
geo_key_id,
MIN(time_value) AS min_time_value,
MAX(time_value) AS max_time_value
FROM covid.epimetric_latest
GROUP BY signal_key_id, geo_key_id;
'''

self._connection.start_transaction()

self._cursor.execute(coverage_crossref_delete_sql)
logger.info("coverage_crossref_delete", rows=self._cursor.rowcount)

self._cursor.execute(coverage_crossref_update_sql)
logger.info("coverage_crossref_update", rows=self._cursor.rowcount)

self.commit()
logger.info("coverage_crossref committed")

return self._cursor.rowcount
21 changes: 21 additions & 0 deletions src/ddl/v4_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,24 @@ CREATE TABLE `covidcast_meta_cache` (
PRIMARY KEY (`timestamp`)
) ENGINE=InnoDB;
INSERT INTO covidcast_meta_cache VALUES (0, '[]');

CREATE TABLE `coverage_crossref` (
`signal_key_id` bigint NOT NULL,
`geo_key_id` bigint NOT NULL,
`min_time_value` int NOT NULL,
`max_time_value` int NOT NULL,
UNIQUE INDEX coverage_crossref_geo_sig (`geo_key_id`, `signal_key_id`),
INDEX coverage_crossref_sig_geo (`signal_key_id`, `geo_key_id`)
) ENGINE=InnoDB;

CREATE OR REPLACE VIEW `coverage_crossref_v` AS
SELECT
`sd`.`source`,
`sd`.`signal`,
`gd`.`geo_type`,
`gd`.`geo_value`,
`cc`.`min_time_value`,
`cc`.`max_time_value`
FROM `coverage_crossref` `cc`
JOIN `signal_dim` `sd` USING (`signal_key_id`)
JOIN `geo_dim` `gd` USING (`geo_key_id`);
1 change: 1 addition & 0 deletions src/ddl/v4_schema_aliases.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
CREATE VIEW `epidata`.`epimetric_full_v` AS SELECT * FROM `covid`.`epimetric_full_v`;
CREATE VIEW `epidata`.`epimetric_latest_v` AS SELECT * FROM `covid`.`epimetric_latest_v`;
CREATE VIEW `epidata`.`covidcast_meta_cache` AS SELECT * FROM `covid`.`covidcast_meta_cache`;
CREATE VIEW `epidata`.`coverage_crossref_v` AS SELECT * FROM `covid`.`coverage_crossref_v`;
33 changes: 33 additions & 0 deletions src/maintenance/coverage_crossref_updater.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""Updates the table for the `coverage_crossref` endpoint."""

import time

from delphi.epidata.acquisition.covidcast.database import Database
from delphi_utils import get_structured_logger


def main():
"""Updates the table for the `coverage_crossref`."""

logger = get_structured_logger("coverage_crossref_updater")
start_time = time.time()
database = Database()
database.connect()

# compute and update coverage_crossref
try:
coverage = database.compute_coverage_crossref()
finally:
# clean up in success and in failure
database.disconnect(True)

logger.info(f"coverage_crossref returned: {coverage}")

logger.info(
"Generated and updated covidcast geo/signal coverage",
total_runtime_in_seconds=round(time.time() - start_time, 2))
return True


if __name__ == '__main__':
main()
18 changes: 18 additions & 0 deletions src/server/endpoints/covidcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,24 @@ def transform_row(row, proxy):

return execute_query(q.query, q.params, fields_string, fields_int, [], transform=transform_row)

@bp.route("/geo_coverage", methods=("GET", "POST"))
def handle_geo_coverage():
"""
For a specific geo returns the signal coverage (number of signals for a given geo_type)
"""

geo_sets = parse_geo_sets()

q = QueryBuilder("coverage_crossref_v", "c")
fields_string = ["source", "signal"]

q.set_fields(fields_string)

q.apply_geo_filters("geo_type", "geo_value", geo_sets)
q.set_sort_order("source", "signal")
q.group_by = ["c." + field for field in fields_string] # this condenses duplicate results, similar to `SELECT DISTINCT`

return execute_query(q.query, q.params, fields_string, [], [])

@bp.route("/anomalies", methods=("GET", "POST"))
def handle_anomalies():
Expand Down
16 changes: 16 additions & 0 deletions tests/acquisition/covidcast/test_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,22 @@ def test_update_covidcast_meta_cache_query(self):
self.assertIn('timestamp', sql)
self.assertIn('epidata', sql)

def test_compute_coverage_crossref_query(self):
"""Query to update the compute crossref looks sensible.
NOTE: Actual behavior is tested by integration test.
"""

mock_connector = MagicMock()
database = Database()
database.connect(connector_impl=mock_connector)

database.compute_coverage_crossref()

connection = mock_connector.connect()
cursor = connection.cursor()
self.assertTrue(cursor.execute.called)

def test_insert_or_update_batch_exception_reraised(self):
"""Test that an exception is reraised"""
mock_connector = MagicMock()
Expand Down

0 comments on commit 5863680

Please sign in to comment.