From 7e313888c29c779ff3b235ec7e1368ec90b4fdde Mon Sep 17 00:00:00 2001 From: Joseph Sawaya Date: Tue, 14 Jan 2025 16:30:26 -0500 Subject: [PATCH] feat: improve ta_storage.bigquery - generalize the way bigquery accepts query parameters - change type of params arg in bigquery_service to sequence - feat: add upload_id field to ta_testrun protobuf - add flags_hash field to ta_testrun protobuf - create new testid generation function - add test_id to ta_testrun proto - add flaky_failure to testrun protobuf - handle flaky failures in ta_storage.bq - create sql queries for reading from bq - write tests for ta_storage.bq aggregate queries --- database/models/reports.py | 2 +- generated_proto/testrun/ta_testrun_pb2.py | 27 +- protobuf/ta_testrun.proto | 5 + requirements.in | 1 + requirements.txt | 6 +- services/bigquery.py | 18 +- services/tests/test_bigquery.py | 28 ++ ta_storage/base.py | 1 + ta_storage/bq.py | 264 ++++++++++++- ta_storage/pg.py | 2 +- ta_storage/tests/test_bq.py | 374 +++++++++++++++++- ta_storage/utils.py | 28 ++ ...cessorTask__ta_processor_task_call__2.json | 20 +- 13 files changed, 736 insertions(+), 40 deletions(-) create mode 100644 ta_storage/utils.py diff --git a/database/models/reports.py b/database/models/reports.py index 922fceed3..d727c494a 100644 --- a/database/models/reports.py +++ b/database/models/reports.py @@ -117,7 +117,7 @@ class Upload(CodecovBaseModel, MixinBaseClass): upload_type_id = Column(types.Integer) @cached_property - def flag_names(self): + def flag_names(self) -> list[str]: return [f.flag_name for f in self.flags] diff --git a/generated_proto/testrun/ta_testrun_pb2.py b/generated_proto/testrun/ta_testrun_pb2.py index ba133248f..b9da9d04e 100644 --- a/generated_proto/testrun/ta_testrun_pb2.py +++ b/generated_proto/testrun/ta_testrun_pb2.py @@ -4,32 +4,35 @@ # source: ta_testrun.proto # Protobuf Python Version: 5.29.2 """Generated protocol buffer code.""" - from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool from google.protobuf import runtime_version as _runtime_version from google.protobuf import symbol_database as _symbol_database from google.protobuf.internal import builder as _builder - _runtime_version.ValidateProtobufRuntimeVersion( - _runtime_version.Domain.PUBLIC, 5, 29, 2, "", "ta_testrun.proto" + _runtime_version.Domain.PUBLIC, + 5, + 29, + 2, + '', + 'ta_testrun.proto' ) # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x10ta_testrun.proto"\xda\x02\n\x07TestRun\x12\x11\n\ttimestamp\x18\x01 \x01(\x03\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x11\n\tclassname\x18\x03 \x01(\t\x12\x11\n\ttestsuite\x18\x04 \x01(\t\x12\x15\n\rcomputed_name\x18\x05 \x01(\t\x12!\n\x07outcome\x18\x06 \x01(\x0e\x32\x10.TestRun.Outcome\x12\x17\n\x0f\x66\x61ilure_message\x18\x07 \x01(\t\x12\x18\n\x10\x64uration_seconds\x18\x08 \x01(\x02\x12\x0e\n\x06repoid\x18\n \x01(\x03\x12\x12\n\ncommit_sha\x18\x0b \x01(\t\x12\x13\n\x0b\x62ranch_name\x18\x0c \x01(\t\x12\r\n\x05\x66lags\x18\r \x03(\t\x12\x10\n\x08\x66ilename\x18\x0e \x01(\t\x12\x11\n\tframework\x18\x0f \x01(\t".\n\x07Outcome\x12\n\n\x06PASSED\x10\x00\x12\n\n\x06\x46\x41ILED\x10\x01\x12\x0b\n\x07SKIPPED\x10\x02' -) + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10ta_testrun.proto\"\xa4\x03\n\x07TestRun\x12\x11\n\ttimestamp\x18\x01 \x01(\x03\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x11\n\tclassname\x18\x03 \x01(\t\x12\x11\n\ttestsuite\x18\x04 \x01(\t\x12\x15\n\rcomputed_name\x18\x05 \x01(\t\x12!\n\x07outcome\x18\x06 \x01(\x0e\x32\x10.TestRun.Outcome\x12\x17\n\x0f\x66\x61ilure_message\x18\x07 \x01(\t\x12\x18\n\x10\x64uration_seconds\x18\x08 \x01(\x02\x12\x0e\n\x06repoid\x18\n \x01(\x03\x12\x12\n\ncommit_sha\x18\x0b \x01(\t\x12\x13\n\x0b\x62ranch_name\x18\x0c \x01(\t\x12\r\n\x05\x66lags\x18\r \x03(\t\x12\x10\n\x08\x66ilename\x18\x0e \x01(\t\x12\x11\n\tframework\x18\x0f \x01(\t\x12\x11\n\tupload_id\x18\x10 \x01(\x03\x12\x12\n\nflags_hash\x18\x11 \x01(\x0c\x12\x0f\n\x07test_id\x18\x12 \x01(\x0c\"@\n\x07Outcome\x12\n\n\x06PASSED\x10\x00\x12\n\n\x06\x46\x41ILED\x10\x01\x12\x0b\n\x07SKIPPED\x10\x02\x12\x10\n\x0c\x46LAKY_FAILED\x10\x03') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) -_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, "ta_testrun_pb2", _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'ta_testrun_pb2', _globals) if not _descriptor._USE_C_DESCRIPTORS: - DESCRIPTOR._loaded_options = None - _globals["_TESTRUN"]._serialized_start = 21 - _globals["_TESTRUN"]._serialized_end = 367 - _globals["_TESTRUN_OUTCOME"]._serialized_start = 321 - _globals["_TESTRUN_OUTCOME"]._serialized_end = 367 + DESCRIPTOR._loaded_options = None + _globals['_TESTRUN']._serialized_start=21 + _globals['_TESTRUN']._serialized_end=441 + _globals['_TESTRUN_OUTCOME']._serialized_start=377 + _globals['_TESTRUN_OUTCOME']._serialized_end=441 # @@protoc_insertion_point(module_scope) diff --git a/protobuf/ta_testrun.proto b/protobuf/ta_testrun.proto index c0530b875..7b460c43a 100644 --- a/protobuf/ta_testrun.proto +++ b/protobuf/ta_testrun.proto @@ -11,6 +11,7 @@ message TestRun { PASSED = 0; FAILED = 1; SKIPPED = 2; + FLAKY_FAILED = 3; } optional Outcome outcome = 6; @@ -27,4 +28,8 @@ message TestRun { optional string filename = 14; optional string framework = 15; + + optional int64 upload_id = 16; + optional bytes flags_hash = 17; + optional bytes test_id = 18; } diff --git a/requirements.in b/requirements.in index 9f737e5fd..0a8f53217 100644 --- a/requirements.in +++ b/requirements.in @@ -20,6 +20,7 @@ grpcio>=1.66.2 httpx jinja2>=3.1.3 lxml>=5.3.0 +mmh3>=5.0.1 mock multidict>=6.1.0 openai diff --git a/requirements.txt b/requirements.txt index 34ecf93ad..bc504adbe 100644 --- a/requirements.txt +++ b/requirements.txt @@ -208,8 +208,10 @@ markupsafe==2.1.3 # via jinja2 minio==7.1.13 # via shared -mmh3==4.0.1 - # via shared +mmh3==5.0.1 + # via + # -r requirements.in + # shared mock==4.0.3 # via -r requirements.in monotonic==1.5 diff --git a/services/bigquery.py b/services/bigquery.py index 4da0de60a..1fda004f8 100644 --- a/services/bigquery.py +++ b/services/bigquery.py @@ -1,5 +1,5 @@ from types import ModuleType -from typing import Dict, List, cast +from typing import Dict, List, Sequence, cast import polars as pl from google.api_core import retry @@ -108,7 +108,17 @@ def write( self.write_client.batch_commit_write_streams(batch_commit_write_streams_request) - def query(self, query: str, params: dict | None = None) -> List[Dict]: + def query( + self, + query: str, + params: Sequence[ + bigquery.ScalarQueryParameter + | bigquery.RangeQueryParameter + | bigquery.ArrayQueryParameter + | bigquery.StructQueryParameter + ] + | None = None, + ) -> List[Dict]: """Execute a BigQuery SQL query and return results. Try not to write INSERT statements and use the write method instead. @@ -125,9 +135,7 @@ def query(self, query: str, params: dict | None = None) -> List[Dict]: job_config = bigquery.QueryJobConfig() if params: - job_config.query_parameters = [ - bigquery.ScalarQueryParameter(k, "STRING", v) for k, v in params.items() - ] + job_config.query_parameters = params row_iterator = self.client.query_and_wait( query, job_config=job_config, retry=retry.Retry(deadline=30) diff --git a/services/tests/test_bigquery.py b/services/tests/test_bigquery.py index 9a9198d69..f2c861d19 100644 --- a/services/tests/test_bigquery.py +++ b/services/tests/test_bigquery.py @@ -3,6 +3,7 @@ import polars as pl import pytest +from google.cloud.bigquery import ScalarQueryParameter import generated_proto.testrun.ta_testrun_pb2 as ta_testrun_pb2 from services.bigquery import BigQueryService @@ -63,6 +64,33 @@ def test_bigquery_service(): assert {row["id"] for row in results} == {1, 2} +sql = """ +WITH sample_data AS ( + SELECT * FROM UNNEST([ + STRUCT(TIMESTAMP '2025-01-01T00:00:00Z' AS timestamp, 1 AS id, 'name' AS name), + STRUCT(TIMESTAMP '2024-12-30T00:00:00Z' AS timestamp, 2 AS id, 'name2' AS name) + ]) +) +SELECT * FROM sample_data where id = @id +""" + + +@pytest.mark.skip(reason="This test requires being run using actual working creds") +def test_bigquery_service_params(): + bigquery_service = BigQueryService(gcp_config) + + results = bigquery_service.query( + sql, params=[ScalarQueryParameter("id", "INT64", 2)] + ) + + assert len(results) == 1 + assert {row["timestamp"] for row in results} == { + datetime.fromisoformat("2024-12-30T00:00:00Z"), + } + assert {row["name"] for row in results} == {"name2"} + assert {row["id"] for row in results} == {2} + + @pytest.mark.skip(reason="This test requires being run using actual working creds") def test_bigquery_service_polars(): bigquery_service = BigQueryService(gcp_config) diff --git a/ta_storage/base.py b/ta_storage/base.py index ddeb5558b..690e3fefa 100644 --- a/ta_storage/base.py +++ b/ta_storage/base.py @@ -18,5 +18,6 @@ def write_testruns( upload: Upload, framework: str | None, testruns: list[test_results_parser.Testrun], + flaky_test_set: set[str], ): pass diff --git a/ta_storage/bq.py b/ta_storage/bq.py index 22828c584..7d9658c75 100644 --- a/ta_storage/bq.py +++ b/ta_storage/bq.py @@ -4,23 +4,138 @@ from typing import Literal, cast import test_results_parser +from google.cloud.bigquery import ArrayQueryParameter, ScalarQueryParameter from shared.config import get_config import generated_proto.testrun.ta_testrun_pb2 as ta_testrun_pb2 from database.models.reports import Upload from services.bigquery import get_bigquery_service from ta_storage.base import TADriver +from ta_storage.utils import calc_flags_hash, calc_test_id -DATASET_NAME: str = cast( - str, get_config("services", "bigquery", "dataset_name", default="codecov_prod") +RANKED_DATA = """ +ranked_data AS ( + SELECT + *, + ROW_NUMBER() OVER ( + PARTITION BY + name, + classname, + testsuite, + flags_hash + ORDER BY timestamp DESC + ) AS row_num + FROM + `{PROJECT_ID}.{DATASET_NAME}.{TESTRUN_TABLE_NAME}` + WHERE + repoid = @repoid + AND commit_sha = @commit_sha ) +""" -TESTRUN_TABLE_NAME: str = cast( - str, get_config("services", "bigquery", "testrun_table_name", default="testruns") +LATEST_INSTANCES = """ +latest_instances AS ( + SELECT + * + FROM + ranked_data + WHERE + row_num = 1 ) +""" +PR_COMMENT_AGG = """ +SELECT + * +FROM ( + SELECT + commit_sha, + outcome + FROM + latest_instances +) PIVOT ( + COUNT(*) AS ct + FOR outcome IN ( + 0 as passed, + 1 as failed, + 2 as skipped, + 3 as flaky_failed + ) +) +""" + +PR_COMMENT_FAIL = """ +SELECT + computed_name, + failure_message, + flags +FROM + latest_instances +WHERE + outcome = 1 +""" + +TESTRUNS_FOR_UPLOAD = """ +SELECT + DATE_BUCKET(timestamp, INTERVAL 1 DAY) AS date, + test_id, + outcome, + branch_name, +FROM + `{PROJECT_ID}.{DATASET_NAME}.{TESTRUN_TABLE_NAME}` +WHERE + upload_id = @upload_id + AND ( + outcome = 1 + OR outcome = 3 + OR test_id IN UNNEST(@test_ids) + ) +""" + +ANALYTICS_BASE = """ +analytics_base AS ( + SELECT * + FROM `{PROJECT_ID}.{DATASET_NAME}.{TESTRUN_TABLE_NAME}` + WHERE repoid = @repoid + AND timestamp BETWEEN + (CURRENT_DATE - INTERVAL @interval_start) AND + (CURRENT_DATE - INTERVAL @interval_end) +) +""" + +ANALYTICS_BRANCH = """ +analytics_base AS ( + SELECT * + FROM `{PROJECT_ID}.{DATASET_NAME}.{TESTRUN_TABLE_NAME}` + WHERE repoid = @repoid + AND branch_name = @branch + AND timestamp BETWEEN + (CURRENT_TIMESTAMP() - INTERVAL @interval_start DAY) AND + (CURRENT_TIMESTAMP() - INTERVAL @interval_end DAY) +) +""" -def outcome_to_int( +ANALYTICS = """ +SELECT + name, + classname, + testsuite, + ANY_VALUE(computed_name) AS computed_name, + COUNT(DISTINCT IF(outcome = 1 OR outcome = 3, commit_sha, NULL)) AS cwf, + AVG(duration_seconds) AS avg_duration, + MAX_BY(duration_seconds, timestamp) AS last_duration, + SUM(IF(outcome = 0, 1, 0)) AS pass_count, + SUM(IF(outcome = 1, 1, 0)) AS fail_count, + SUM(IF(outcome = 2, 1, 0)) AS skip_count, + SUM(IF(outcome = 3, 1, 0)) AS flaky_fail_count, + MAX(timestamp) AS updated_at, + ARRAY_AGG(DISTINCT unnested_flags) AS flags +FROM analytics_base, UNNEST(flags) AS unnested_flags +GROUP BY name, classname, testsuite +""" + + +def outcome_to_enum( outcome: Literal["pass", "skip", "failure", "error"], ) -> ta_testrun_pb2.TestRun.Outcome: match outcome: @@ -35,6 +150,26 @@ def outcome_to_int( class BQDriver(TADriver): + def __init__(self, flaky_test_set: set[bytes] | None = None): + self.flaky_test_set = flaky_test_set or {} + self.bq_service = get_bigquery_service() + + self.project_id: str = cast( + str, get_config("services", "gcp", "project_id", default="codecov-prod") + ) + + self.dataset_name: str = cast( + str, + get_config("services", "bigquery", "dataset_name", default="codecov_prod"), + ) + + self.testrun_table_name: str = cast( + str, + get_config( + "services", "bigquery", "testrun_table_name", default="testruns" + ), + ) + def write_testruns( self, timestamp: int | None, @@ -45,15 +180,21 @@ def write_testruns( framework: str | None, testruns: list[test_results_parser.Testrun], ): - bq_service = get_bigquery_service() - if timestamp is None: timestamp = int(datetime.now().timestamp() * 1000000) - flag_names = upload.flag_names + flag_names: list[str] = upload.flag_names testruns_pb: list[bytes] = [] + flags_hash = calc_flags_hash(flag_names) + for t in testruns: + test_id = calc_test_id(t["name"], t["classname"], t["testsuite"]) + if test_id in self.flaky_test_set and t["outcome"] == "failure": + outcome = ta_testrun_pb2.TestRun.Outcome.FLAKY_FAILED + else: + outcome = outcome_to_enum(t["outcome"]) + test_run = ta_testrun_pb2.TestRun( timestamp=timestamp, repoid=repo_id, @@ -65,12 +206,113 @@ def write_testruns( name=t["name"], testsuite=t["testsuite"], computed_name=t["computed_name"], - outcome=outcome_to_int(t["outcome"]), + outcome=outcome, failure_message=t["failure_message"], duration_seconds=t["duration"], filename=t["filename"], + upload_id=upload.id_, + flags_hash=flags_hash, + test_id=test_id, ) testruns_pb.append(test_run.SerializeToString()) - flag_names = upload.flag_names - bq_service.write(DATASET_NAME, TESTRUN_TABLE_NAME, ta_testrun_pb2, testruns_pb) + self.bq_service.write( + self.dataset_name, self.testrun_table_name, ta_testrun_pb2, testruns_pb + ) + + def pr_comment_agg( + self, + repoid: int, + commit_sha: str, + ): + query = f""" + WITH + {RANKED_DATA.format( + PROJECT_ID=self.project_id, + DATASET_NAME=self.dataset_name, + TESTRUN_TABLE_NAME=self.testrun_table_name, + )}, + {LATEST_INSTANCES} + {PR_COMMENT_AGG} + """ + return self.bq_service.query( + query, + [ + ScalarQueryParameter("repoid", "INT64", repoid), + ScalarQueryParameter("commit_sha", "STRING", commit_sha), + ], + ) + + def pr_comment_fail(self, repoid: int, commit_sha: str): + query = f""" + WITH + {RANKED_DATA.format( + PROJECT_ID=self.project_id, + DATASET_NAME=self.dataset_name, + TESTRUN_TABLE_NAME=self.testrun_table_name, + )}, + {LATEST_INSTANCES} + {PR_COMMENT_FAIL} + """ + return self.bq_service.query( + query, + [ + ScalarQueryParameter("repoid", "INT64", repoid), + ScalarQueryParameter("commit_sha", "STRING", commit_sha), + ], + ) + + def testruns_for_upload(self, upload_id: int, test_ids: list[bytes]): + query = f""" + {TESTRUNS_FOR_UPLOAD.format( + PROJECT_ID=self.project_id, + DATASET_NAME=self.dataset_name, + TESTRUN_TABLE_NAME=self.testrun_table_name, + )} + """ + return self.bq_service.query( + query, + [ + ScalarQueryParameter("upload_id", "INT64", upload_id), + ArrayQueryParameter("test_ids", "BYTES", test_ids), + ], + ) + + def analytics( + self, + repoid: int, + interval_start: int = 30, # for convention we want the start to be the larger number of days + interval_end: int = 0, + branch: str | None = None, + ): + if branch: + query = f""" + WITH + {ANALYTICS_BRANCH.format( + PROJECT_ID=self.project_id, + DATASET_NAME=self.dataset_name, + TESTRUN_TABLE_NAME=self.testrun_table_name, + )} + {ANALYTICS} + """ + else: + query = f""" + WITH + {ANALYTICS_BASE.format( + PROJECT_ID=self.project_id, + DATASET_NAME=self.dataset_name, + TESTRUN_TABLE_NAME=self.testrun_table_name, + )} + {ANALYTICS} + """ + + params = [ + ScalarQueryParameter("repoid", "INT64", repoid), + ScalarQueryParameter("interval_start", "INT64", interval_start), + ScalarQueryParameter("interval_end", "INT64", interval_end), + ] + + if branch: + params.append(ScalarQueryParameter("branch", "STRING", branch)) + + return self.bq_service.query(query, params) diff --git a/ta_storage/pg.py b/ta_storage/pg.py index c829a3adf..762efd8bd 100644 --- a/ta_storage/pg.py +++ b/ta_storage/pg.py @@ -283,7 +283,7 @@ def save_test_instances(db_session: Session, test_instance_data: list[dict]): class PGDriver(TADriver): - def __init__(self, db_session: Session, flaky_test_set: set): + def __init__(self, db_session: Session, flaky_test_set: set[str]): self.db_session = db_session self.flaky_test_set = flaky_test_set diff --git a/ta_storage/tests/test_bq.py b/ta_storage/tests/test_bq.py index d78840c25..208190098 100644 --- a/ta_storage/tests/test_bq.py +++ b/ta_storage/tests/test_bq.py @@ -1,14 +1,16 @@ from __future__ import annotations -from datetime import datetime +from datetime import datetime, timedelta, timezone from unittest.mock import MagicMock, patch import pytest import test_results_parser +import time_machine import generated_proto.testrun.ta_testrun_pb2 as ta_testrun_pb2 from database.tests.factories import RepositoryFlagFactory, UploadFactory -from ta_storage.bq import DATASET_NAME, TESTRUN_TABLE_NAME, BQDriver +from ta_storage.bq import BQDriver +from ta_storage.utils import calc_flags_hash, calc_test_id @pytest.fixture @@ -77,10 +79,12 @@ def test_bigquery_driver(dbsession, mock_bigquery_service): test_data, ) + flags_hash = calc_flags_hash(upload.flag_names) + # Verify the BigQuery service was called correctly mock_bigquery_service.write.assert_called_once_with( - DATASET_NAME, - TESTRUN_TABLE_NAME, + bq.dataset_name, + bq.testrun_table_name, ta_testrun_pb2, [ ta_testrun_pb2.TestRun( @@ -98,6 +102,9 @@ def test_bigquery_driver(dbsession, mock_bigquery_service): framework="pytest", branch_name=upload.report.commit.branch, flags=["flag1", "flag2"], + upload_id=upload.id_, + flags_hash=flags_hash, + test_id=calc_test_id("test_name", "test_class", "test_suite"), ).SerializeToString(), ta_testrun_pb2.TestRun( timestamp=timestamp, @@ -114,6 +121,365 @@ def test_bigquery_driver(dbsession, mock_bigquery_service): framework="pytest", branch_name=upload.report.commit.branch, flags=["flag1", "flag2"], + upload_id=upload.id_, + flags_hash=flags_hash, + test_id=calc_test_id("test_name2", "test_class2", "test_suite2"), ).SerializeToString(), ], ) + + +def populate_pr_comment_testruns(bq: BQDriver): + testruns = [] + + for i in range(3): + upload = UploadFactory() + upload.report.commit.commitid = "abcde" + upload.report.commit.branch = "feature_branch" + upload.report.commit.repoid = 2 + upload.flags.append(RepositoryFlagFactory(flag_name=f"flag_{i}")) + + for j in range(3): + name = f"test_{j}" + classname = f"class_{j}" + testsuite = "suite_feature" + + testrun: test_results_parser.Testrun = { + "name": name, + "classname": classname, + "testsuite": testsuite, + "duration": float(j % 5), + "outcome": "pass" if j % 2 == 0 else "failure", + "filename": None, + "computed_name": f"pr_computed_name_{j}", + "failure_message": None if j % 2 == 0 else "hi", + "build_url": None, + } + + testruns.append(testrun) + + bq.write_testruns( + None, 2, "abcde", "feature_branch", upload, "pytest", testruns + ) + + +@pytest.mark.skip(reason="need creds") +def test_bq_pr_comment(): + bq = BQDriver() + + if ( + bq.bq_service.query( + "select * from `test_dataset.testruns` where repoid = 2 limit 1" + ) + == [] + ): + populate_pr_comment_testruns(bq) + + pr_agg = bq.pr_comment_agg(repoid=2, commit_sha="abcde") + assert pr_agg == [ + { + "commit_sha": "abcde", + "ct_passed": 6, + "ct_failed": 3, + "ct_skipped": 0, + "ct_flaky_failed": 0, + } + ] + + pr_fail = bq.pr_comment_fail(repoid=2, commit_sha="abcde") + assert len(pr_fail) == 3 + assert {t["computed_name"] for t in pr_fail} == { + "pr_computed_name_1", + } + assert {t["failure_message"] for t in pr_fail} == {"hi"} + assert {tuple(t["flags"]) for t in pr_fail} == { + ("flag_1",), + ("flag_2",), + ("flag_0",), + } + + +def populate_testruns_for_upload_testruns(dbsession, bq: BQDriver): + testruns = [] + + upload = UploadFactory() + upload.id_ = 1 + dbsession.add(upload) + dbsession.flush() + + testruns: list[test_results_parser.Testrun] = [ + { # this test is flaky failure + "name": "test_0", + "classname": "class_0", + "testsuite": "suite_upload", + "duration": 0.0, + "outcome": "failure", + "filename": None, + "computed_name": "upload_computed_name_0", + "failure_message": None, + "build_url": None, + }, + { # this test is just a failure + "name": "test_1", + "classname": "class_1", + "testsuite": "suite_upload", + "duration": 0.0, + "outcome": "failure", + "filename": None, + "computed_name": "upload_computed_name_1", + "failure_message": None, + "build_url": None, + }, + { # this test is a pass but also flaky + "name": "test_2", + "classname": "class_2", + "testsuite": "suite_upload", + "duration": 0.0, + "outcome": "pass", + "filename": None, + "computed_name": "upload_computed_name_2", + "failure_message": None, + "build_url": None, + }, + { # this test should be ignored + "name": "test_3", + "classname": "class_3", + "testsuite": "suite_upload", + "duration": 0.0, + "outcome": "pass", + "filename": None, + "computed_name": "upload_computed_name_3", + "failure_message": None, + "build_url": None, + }, + ] + + bq.write_testruns(None, 3, "abcde", "feature_branch", upload, "pytest", testruns) + + +@pytest.mark.skip(reason="need creds") +def test_bq_testruns_for_upload(dbsession): + bq = BQDriver( + { + calc_test_id("test_0", "class_0", "suite_upload"), + calc_test_id("test_2", "class_2", "suite_upload"), + } + ) + + if ( + bq.bq_service.query( + "select * from `test_dataset.testruns` where repoid = 3 limit 1" + ) + == [] + ): + populate_testruns_for_upload_testruns(dbsession, bq) + + testruns_for_upload = bq.testruns_for_upload( + upload_id=1, + test_ids=[ + calc_test_id("test_0", "class_0", "suite_upload"), + calc_test_id("test_2", "class_2", "suite_upload"), + ], + ) + + assert {t["test_id"] for t in testruns_for_upload} == { + calc_test_id("test_0", "class_0", "suite_upload"), + calc_test_id("test_2", "class_2", "suite_upload"), + calc_test_id("test_1", "class_1", "suite_upload"), + } + + assert {t["outcome"] for t in testruns_for_upload} == {3, 1, 0} + + +def populate_analytics_testruns(bq: BQDriver): + upload_0 = UploadFactory() + upload_0.report.commit.commitid = "abcde" + upload_0.report.commit.branch = "feature_branch" + upload_0.report.commit.repoid = 1 + upload_0.flags.append(RepositoryFlagFactory(flag_name="flag_0")) + + upload_1 = UploadFactory() + upload_1.report.commit.commitid = "abcde" + upload_1.report.commit.branch = "feature_branch" + upload_1.report.commit.repoid = 1 + upload_1.flags.append(RepositoryFlagFactory(flag_name="flag_1")) + + testruns: list[test_results_parser.Testrun] = [ + { + "name": "interval_start", + "classname": "class_0", + "testsuite": "suite_upload", + "duration": 20000.0, + "outcome": "failure", + "filename": None, + "computed_name": "upload_computed_name_0", + "failure_message": None, + "build_url": None, + }, + ] + + timestamp = int((datetime.now() - timedelta(days=50)).timestamp() * 1000000) + + bq.write_testruns( + timestamp, 1, "interval_start", "feature_branch", upload_0, "pytest", testruns + ) + + testruns: list[test_results_parser.Testrun] = [ + { + "name": "interval_end", + "classname": "class_0", + "testsuite": "suite_upload", + "duration": 20000.0, + "outcome": "failure", + "filename": None, + "computed_name": "upload_computed_name_0", + "failure_message": None, + "build_url": None, + }, + ] + + timestamp = int((datetime.now() - timedelta(days=1)).timestamp() * 1000000) + + bq.write_testruns( + timestamp, 1, "interval_end", "feature_branch", upload_0, "pytest", testruns + ) + + testruns: list[test_results_parser.Testrun] = [ + { + "name": "test_0", + "classname": "class_0", + "testsuite": "suite_upload", + "duration": 10.0, + "outcome": "failure", + "filename": None, + "computed_name": "upload_computed_name_0", + "failure_message": None, + "build_url": None, + }, + { + "name": "test_1", + "classname": "class_1", + "testsuite": "suite_upload", + "duration": 10.0, + "outcome": "pass", + "filename": None, + "computed_name": "upload_computed_name_1", + "failure_message": None, + "build_url": None, + }, + ] + + timestamp = int((datetime.now() - timedelta(days=20)).timestamp() * 1000000) + + bq.write_testruns( + timestamp, 1, "commit_1", "feature_branch", upload_0, "pytest", testruns + ) + + testruns: list[test_results_parser.Testrun] = [ + { + "name": "test_1", + "classname": "class_1", + "testsuite": "suite_upload", + "duration": 10.0, + "outcome": "failure", + "filename": None, + "computed_name": "upload_computed_name_1", + "failure_message": None, + "build_url": None, + }, + ] + + timestamp = int((datetime.now() - timedelta(days=20)).timestamp() * 1000000) + + bq.write_testruns( + timestamp, 1, "commit_1", "feature_branch", upload_1, "pytest", testruns + ) + + bq = BQDriver( + { + calc_test_id("test_1", "class_1", "suite_upload"), + } + ) + + testruns: list[test_results_parser.Testrun] = [ + { + "name": "test_0", + "classname": "class_0", + "testsuite": "suite_upload", + "duration": 20.0, + "outcome": "pass", + "filename": None, + "computed_name": "upload_computed_name_0", + "failure_message": None, + "build_url": None, + }, + { + "name": "test_1", + "classname": "class_1", + "testsuite": "suite_upload", + "duration": 10.0, + "outcome": "failure", + "filename": None, + "computed_name": "upload_computed_name_1", + "failure_message": None, + "build_url": None, + }, + ] + + timestamp = int((datetime.now() - timedelta(days=10)).timestamp() * 1000000) + + bq.write_testruns( + timestamp, 1, "commit_2", "feature_branch", upload_1, "pytest", testruns + ) + + +@pytest.mark.skip(reason="need creds") +@time_machine.travel(datetime.now(tz=timezone.utc), tick=False) +def test_bq_analytics(): + bq = BQDriver() + + if ( + bq.bq_service.query( + "select * from `test_dataset.testruns` where repoid = 1 limit 1" + ) + == [] + ): + populate_analytics_testruns(bq) + + testruns_for_upload = bq.analytics(1, 30, 7, "feature_branch") + + assert sorted( + [(x | {"flags": sorted(x["flags"])}) for x in testruns_for_upload], + key=lambda x: x["name"], + ) == [ + { + "name": "test_0", + "classname": "class_0", + "testsuite": "suite_upload", + "computed_name": "upload_computed_name_0", + "cwf": 1, + "avg_duration": 15.0, + "last_duration": 20.0, + "pass_count": 1, + "fail_count": 1, + "skip_count": 0, + "flaky_fail_count": 0, + "updated_at": datetime.now(tz=timezone.utc) - timedelta(days=10), + "flags": ["flag_0", "flag_1"], + }, + { + "name": "test_1", + "classname": "class_1", + "testsuite": "suite_upload", + "computed_name": "upload_computed_name_1", + "cwf": 2, + "avg_duration": 10.0, + "last_duration": 10.0, + "pass_count": 1, + "fail_count": 1, + "skip_count": 0, + "flaky_fail_count": 1, + "updated_at": datetime.now(tz=timezone.utc) - timedelta(days=10), + "flags": ["flag_0", "flag_1"], + }, + ] diff --git a/ta_storage/utils.py b/ta_storage/utils.py new file mode 100644 index 000000000..83ad866ca --- /dev/null +++ b/ta_storage/utils.py @@ -0,0 +1,28 @@ +from sys import byteorder + +import mmh3 +import sentry_sdk + + +def calc_test_id(name: str, classname: str, testsuite: str) -> bytes: + h = mmh3.mmh3_x64_128() # assumes we're running on x64 machines + h.update(testsuite.encode("utf-8")) + h.update(classname.encode("utf-8")) + h.update(name.encode("utf-8")) + test_id_hash = h.digest() + + return test_id_hash + + +def calc_flags_hash(flags: list[str]) -> bytes | None: + flags_str = " ".join(flags) # we know that flags cannot contain spaces + + # returns a tuple of two int64 values + # we only need the first one + flags_hash, _ = mmh3.hash64(flags_str) + try: + flags_hash_bytes = flags_hash.to_bytes(8, byteorder) + return flags_hash_bytes + except OverflowError as e: # this should never happen because hash64 should always return 2 64 bit ints + sentry_sdk.capture_exception(e) + return None diff --git a/tasks/tests/unit/snapshots/ta_processor_task__TestUploadTestProcessorTask__ta_processor_task_call__2.json b/tasks/tests/unit/snapshots/ta_processor_task__TestUploadTestProcessorTask__ta_processor_task_call__2.json index 73abd5112..936abe615 100644 --- a/tasks/tests/unit/snapshots/ta_processor_task__TestUploadTestProcessorTask__ta_processor_task_call__2.json +++ b/tasks/tests/unit/snapshots/ta_processor_task__TestUploadTestProcessorTask__ta_processor_task_call__2.json @@ -9,7 +9,10 @@ "duration_seconds": 0.001, "repoid": "1", "commit_sha": "cd76b0821854a780b60012aed85af0a8263004ad", - "framework": "Pytest" + "framework": "Pytest", + "upload_id": "1", + "flags_hash": "AAAAAAAAAAA=", + "test_id": "S/K2VdzrrehI4hnoZNsPVg==" }, { "timestamp": "1735689600000000", @@ -22,7 +25,10 @@ "duration_seconds": 0.001, "repoid": "1", "commit_sha": "cd76b0821854a780b60012aed85af0a8263004ad", - "framework": "Pytest" + "framework": "Pytest", + "upload_id": "1", + "flags_hash": "AAAAAAAAAAA=", + "test_id": "CVU2jNUNOkOrl6/lJdK0nw==" }, { "timestamp": "1735689600000000", @@ -34,7 +40,10 @@ "duration_seconds": 0.0, "repoid": "1", "commit_sha": "cd76b0821854a780b60012aed85af0a8263004ad", - "framework": "Pytest" + "framework": "Pytest", + "upload_id": "1", + "flags_hash": "AAAAAAAAAAA=", + "test_id": "UDBibp0NWEToP72TpCn1xg==" }, { "timestamp": "1735689600000000", @@ -46,6 +55,9 @@ "duration_seconds": 0.001, "repoid": "1", "commit_sha": "cd76b0821854a780b60012aed85af0a8263004ad", - "framework": "Pytest" + "framework": "Pytest", + "upload_id": "1", + "flags_hash": "AAAAAAAAAAA=", + "test_id": "VE2yD2IYxdSbTvGB6XCJPA==" } ]