Skip to content

Commit

Permalink
Enable DLQ (GSI-1395) (#35)
Browse files Browse the repository at this point in the history
* Enable DLQ

* Update test

* Update consumer test to use a mock
  • Loading branch information
TheByronHimes authored Feb 19, 2025
1 parent b2557d2 commit 6ac988e
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 20 deletions.
1 change: 1 addition & 0 deletions .devcontainer/.dev_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ resource_deletion_event_type: searchable_resource_deleted
resource_upsertion_event_type: searchable_resource_upserted
service_instance_id: "001"
kafka_servers: ["kafka:9092"]
kafka_enable_dlq: True
2 changes: 1 addition & 1 deletion example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ docs_url: /docs
generate_correlation_id: true
host: 127.0.0.1
kafka_dlq_topic: dlq
kafka_enable_dlq: false
kafka_enable_dlq: true
kafka_max_message_size: 1048576
kafka_max_retries: 0
kafka_retry_backoff: 0
Expand Down
27 changes: 13 additions & 14 deletions src/mass/adapters/inbound/event_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,22 +91,22 @@ async def _handle_deletion(self, *, payload: JsonObject):
validated_payload = get_validated_payload(
payload=payload, schema=event_schemas.SearchableResourceInfo
)
await self._query_handler.delete_resource(
resource_id=validated_payload.accession,
class_name=validated_payload.class_name,
)
except EventSchemaValidationError:
log.error(
SCHEMA_VALIDATION_ERROR_LOG_MSG,
event_schemas.SearchableResourceInfo.__name__,
)
return

try:
await self._query_handler.delete_resource(
resource_id=validated_payload.accession,
class_name=validated_payload.class_name,
)
raise
except self._query_handler.ResourceNotFoundError:
log.warning(DELETION_FAILED_LOG_MSG, validated_payload.accession)
raise
except self._query_handler.ClassNotConfiguredError:
log.error(CLASS_NOT_CONFIGURED_LOG_MSG, validated_payload.class_name)
raise

async def _handle_upsertion(self, *, payload: JsonObject):
"""Load the specified resource.
Expand All @@ -122,20 +122,19 @@ async def _handle_upsertion(self, *, payload: JsonObject):
id_=validated_payload.accession,
content=validated_payload.content,
)
await self._query_handler.load_resource(
resource=resource,
class_name=validated_payload.class_name,
)
except EventSchemaValidationError:
log.error(
SCHEMA_VALIDATION_ERROR_LOG_MSG,
event_schemas.SearchableResource.__name__,
)
return

try:
await self._query_handler.load_resource(
resource=resource,
class_name=validated_payload.class_name,
)
raise
except self._query_handler.ClassNotConfiguredError:
log.error(CLASS_NOT_CONFIGURED_LOG_MSG, validated_payload.class_name)
raise

async def _consume_validated(
self,
Expand Down
13 changes: 9 additions & 4 deletions src/mass/inject.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from fastapi import FastAPI
from ghga_service_commons.utils.context import asyncnullcontext
from hexkit.providers.akafka.provider import KafkaEventSubscriber
from hexkit.providers.akafka.provider import KafkaEventPublisher, KafkaEventSubscriber
from hexkit.providers.mongodb.provider import MongoDbDaoFactory

from mass.adapters.inbound.event_sub import EventSubTranslator
Expand Down Expand Up @@ -101,7 +101,12 @@ async def prepare_event_subscriber(
config=config,
)

async with KafkaEventSubscriber.construct(
config=config, translator=event_sub_translator
) as event_subscriber:
async with (
KafkaEventPublisher.construct(config=config) as dlq_publisher,
KafkaEventSubscriber.construct(
config=config,
translator=event_sub_translator,
dlq_publisher=dlq_publisher,
) as event_subscriber,
):
yield event_subscriber
2 changes: 2 additions & 0 deletions tests/fixtures/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
def get_config(
sources: list[BaseSettings] | None = None,
default_config_yaml: Path = TEST_CONFIG_YAML,
**kwargs,
) -> Config:
"""Merges parameters from the default TEST_CONFIG_YAML with params inferred
from testcontainers.
Expand All @@ -37,5 +38,6 @@ def get_config(
if sources is not None:
for source in sources:
sources_dict.update(**source.model_dump())
sources_dict.update(**kwargs)

return Config(config_yaml=default_config_yaml, **sources_dict) # type: ignore
2 changes: 1 addition & 1 deletion tests/fixtures/joint.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ async def joint_fixture(
) -> AsyncGenerator[JointFixture, None]:
"""Function scoped joint fixture for API-level integration testing."""
# merge configs from different sources with the default one:
config = get_config(sources=[mongodb.config, kafka.config])
config = get_config(sources=[mongodb.config, kafka.config], kafka_enable_dlq=True)

async with (
prepare_core(config=config) as query_handler,
Expand Down
1 change: 1 addition & 0 deletions tests/fixtures/test_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,4 @@ resource_deletion_event_type: searchable_resource_deleted
resource_upsertion_event_type: searchable_resource_upserted
service_instance_id: "001"
kafka_servers: ["kafka:9092"]
kafka_enable_dlq: True
67 changes: 67 additions & 0 deletions tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@

"""Tests to verify functionality of kafka event consumer"""

from unittest.mock import AsyncMock

import pytest
from ghga_event_schemas import pydantic_ as event_schemas
from hexkit.providers.akafka.testutils import KafkaFixture

from mass.core import models
from mass.inject import prepare_event_subscriber
from tests.fixtures.config import get_config
from tests.fixtures.joint import JointFixture

pytestmark = pytest.mark.asyncio()
Expand Down Expand Up @@ -115,3 +120,65 @@ async def test_resource_delete(joint_fixture: JointFixture):
)

assert results_post_delete.count == 0


async def test_event_subscriber_dlq(joint_fixture: JointFixture):
"""Verify that if we get an error when consuming an event, it gets published to the DLQ."""
config = joint_fixture.config
assert config.kafka_enable_dlq

# Publish an event with a bogus payload to a topic/type this service expects
await joint_fixture.publish_event(
payload={"some_key": "some_value"},
type_=config.resource_upsertion_event_type,
topic=config.resource_change_event_topic,
key="test",
)
async with joint_fixture._kafka.record_events(
in_topic=config.kafka_dlq_topic
) as recorder:
# Consume the event, which should error and get sent to the DLQ
await joint_fixture.consume_event()
assert recorder.recorded_events
assert len(recorder.recorded_events) == 1
event = recorder.recorded_events[0]
assert event.key == "test"
assert event.payload == {"some_key": "some_value"}


async def test_consume_from_retry(kafka: KafkaFixture):
"""Verify that this service will correctly get events from the retry topic"""
config = get_config(sources=[kafka.config], kafka_enable_dlq=True)
assert config.kafka_enable_dlq

# define content of resource
content: dict = {
"object": {"type": "added-resource-object", "id": "98u44-f4jo4"},
"city": "something",
"category": "test object",
}

# put together event payload
payload = event_schemas.SearchableResource(
accession="added-resource",
class_name=CLASS_NAME,
content=content,
).model_dump()

# Publish an event with a proper payload to a topic/type this service expects
await kafka.publish_event(
payload=payload,
type_=config.resource_upsertion_event_type,
topic=config.service_name + "-retry",
key="test",
headers={"original_topic": config.resource_change_event_topic},
)

# Consume the event
qh_mock = AsyncMock()
async with prepare_event_subscriber(
config=config, query_handler_override=qh_mock
) as consumer:
await consumer.run(forever=False)

qh_mock.load_resource.assert_awaited_once()

0 comments on commit 6ac988e

Please sign in to comment.