Skip to content

Commit

Permalink
Update consumer test to use a mock
Browse files Browse the repository at this point in the history
  • Loading branch information
TheByronHimes committed Feb 19, 2025
1 parent ad2844c commit 25909fb
Showing 1 changed file with 12 additions and 20 deletions.
32 changes: 12 additions & 20 deletions tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@

"""Tests to verify functionality of kafka event consumer"""

from unittest.mock import AsyncMock

import pytest
from ghga_event_schemas import pydantic_ as event_schemas
from hexkit.providers.akafka.testutils import KafkaFixture

from mass.core import models
from mass.inject import prepare_event_subscriber
from tests.fixtures.config import get_config
from tests.fixtures.joint import JointFixture

pytestmark = pytest.mark.asyncio()
Expand Down Expand Up @@ -142,24 +146,18 @@ async def test_event_subscriber_dlq(joint_fixture: JointFixture):
assert event.payload == {"some_key": "some_value"}


async def test_consume_from_retry(joint_fixture: JointFixture, kafka: KafkaFixture):
async def test_consume_from_retry(kafka: KafkaFixture):
"""Verify that this service will correctly get events from the retry topic"""
config = joint_fixture.config
config = get_config(sources=[kafka.config], kafka_enable_dlq=True)
assert config.kafka_enable_dlq

results_all = await joint_fixture.handle_query(class_name=CLASS_NAME)
assert results_all.count > 0

# define content of resource
content: dict = {
"object": {"type": "added-resource-object", "id": "98u44-f4jo4"},
"city": "something",
"category": "test object",
}

# define a resource to be upserted
resource = models.Resource(id_="added-resource", content=content)

# put together event payload
payload = event_schemas.SearchableResource(
accession="added-resource",
Expand All @@ -177,16 +175,10 @@ async def test_consume_from_retry(joint_fixture: JointFixture, kafka: KafkaFixtu
)

# Consume the event
await joint_fixture.consume_event()

# verify that the resource was added
updated_resources = await joint_fixture.handle_query(class_name=CLASS_NAME)
assert updated_resources.count - results_all.count == 1
qh_mock = AsyncMock()
async with prepare_event_subscriber(
config=config, query_handler_override=qh_mock
) as consumer:
await consumer.run(forever=False)

# remove unselected fields
content = resource.content # type: ignore
del content["city"]
del content["category"]
del content["object"]["id"]

assert resource in updated_resources.hits
qh_mock.load_resource.assert_awaited_once()

0 comments on commit 25909fb

Please sign in to comment.