Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code concurrent): Allow async job low-code streams that are incremental to be run by the concurrent framework #228

Merged
merged 16 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 57 additions & 32 deletions airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
ModelToComponentFactory,
)
from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter
from airbyte_cdk.sources.declarative.requesters import HttpRequester
from airbyte_cdk.sources.declarative.retrievers import Retriever, SimpleRetriever
from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, Retriever, SimpleRetriever
from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
DeclarativePartitionFactory,
StreamSlicerPartitionGenerator,
Expand All @@ -48,7 +49,7 @@
from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
AlwaysAvailableAvailabilityStrategy,
)
from airbyte_cdk.sources.streams.concurrent.cursor import FinalStateCursor
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, FinalStateCursor
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream

Expand All @@ -69,13 +70,18 @@ def __init__(
component_factory: Optional[ModelToComponentFactory] = None,
**kwargs: Any,
) -> None:
# todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
# no longer needs to store the original incoming state. But maybe there's an edge case?
self._connector_state_manager = ConnectorStateManager(state=state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later

# To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic
# cursors. We do this by no longer automatically instantiating RFR cursors when converting
# the declarative models into runtime components. Concurrent sources will continue to checkpoint
# incremental streams running in full refresh.
component_factory = component_factory or ModelToComponentFactory(
emit_connector_builder_messages=emit_connector_builder_messages,
disable_resumable_full_refresh=True,
connector_state_manager=self._connector_state_manager,
)

super().__init__(
Expand All @@ -86,10 +92,6 @@ def __init__(
component_factory=component_factory,
)

# todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
# no longer needs to store the original incoming state. But maybe there's an edge case?
self._state = state

concurrency_level_from_manifest = self._source_config.get("concurrency_level")
if concurrency_level_from_manifest:
concurrency_level_component = self._constructor.create_component(
Expand Down Expand Up @@ -179,8 +181,6 @@ def _group_streams(
concurrent_streams: List[AbstractStream] = []
synchronous_streams: List[Stream] = []

state_manager = ConnectorStateManager(state=self._state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later

# Combine streams and dynamic_streams. Note: both cannot be empty at the same time,
# and this is validated during the initialization of the source.
streams = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
Expand Down Expand Up @@ -220,31 +220,52 @@ def _group_streams(
if self._is_datetime_incremental_without_partition_routing(
declarative_stream, incremental_sync_component_definition
):
stream_state = state_manager.get_stream_state(
stream_state = self._connector_state_manager.get_stream_state(
stream_name=declarative_stream.name, namespace=declarative_stream.namespace
)

cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
state_manager=state_manager,
model_type=DatetimeBasedCursorModel,
component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above
stream_name=declarative_stream.name,
stream_namespace=declarative_stream.namespace,
config=config or {},
stream_state=stream_state,
)

retriever = self._get_retriever(declarative_stream, stream_state)

partition_generator = StreamSlicerPartitionGenerator(
DeclarativePartitionFactory(
declarative_stream.name,
declarative_stream.get_json_schema(),
retriever,
self.message_repository,
),
cursor,
)
if isinstance(declarative_stream.retriever, AsyncRetriever) and isinstance(
declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter
):
cursor = declarative_stream.retriever.stream_slicer.stream_slicer

if not isinstance(cursor, ConcurrentCursor):
# This should never happen since we instantiate ConcurrentCursor in
# model_to_component_factory.py
raise ValueError(
f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}"
)
brianjlai marked this conversation as resolved.
Show resolved Hide resolved

partition_generator = StreamSlicerPartitionGenerator(
partition_factory=DeclarativePartitionFactory(
declarative_stream.name,
declarative_stream.get_json_schema(),
retriever,
self.message_repository,
),
stream_slicer=declarative_stream.retriever.stream_slicer,
)
else:
cursor = (
self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
model_type=DatetimeBasedCursorModel,
component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above
stream_name=declarative_stream.name,
stream_namespace=declarative_stream.namespace,
config=config or {},
)
)
partition_generator = StreamSlicerPartitionGenerator(
partition_factory=DeclarativePartitionFactory(
declarative_stream.name,
declarative_stream.get_json_schema(),
retriever,
self.message_repository,
),
stream_slicer=cursor,
)

concurrent_streams.append(
DefaultStream(
Expand Down Expand Up @@ -306,14 +327,14 @@ def _group_streams(
declarative_stream.retriever.stream_slicer, PerPartitionWithGlobalCursor
)
):
stream_state = state_manager.get_stream_state(
stream_state = self._connector_state_manager.get_stream_state(
stream_name=declarative_stream.name, namespace=declarative_stream.namespace
)
partition_router = declarative_stream.retriever.stream_slicer._partition_router

perpartition_cursor = (
self._constructor.create_concurrent_cursor_from_perpartition_cursor(
state_manager=state_manager,
state_manager=self._connector_state_manager,
model_type=DatetimeBasedCursorModel,
component_definition=incremental_sync_component_definition,
stream_name=declarative_stream.name,
Expand Down Expand Up @@ -369,7 +390,10 @@ def _is_datetime_incremental_without_partition_routing(
declarative_stream=declarative_stream
)
and hasattr(declarative_stream.retriever, "stream_slicer")
and isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)
and (
isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)
or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter)
)
)

def _stream_supports_concurrent_partition_processing(
Expand Down Expand Up @@ -438,8 +462,9 @@ def _stream_supports_concurrent_partition_processing(
return False
return True

@staticmethod
def _get_retriever(
self, declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any]
declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any]
) -> Retriever:
retriever = declarative_stream.retriever

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@
from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CheckStream as CheckStreamModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
DeclarativeStream as DeclarativeStreamModel,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ def __init__(
disable_cache: bool = False,
disable_resumable_full_refresh: bool = False,
message_repository: Optional[MessageRepository] = None,
connector_state_manager: Optional[ConnectorStateManager] = None,
):
self._init_mappings()
self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice
Expand All @@ -518,6 +519,7 @@ def __init__(
self._message_repository = message_repository or InMemoryMessageRepository(
self._evaluate_log_level(emit_connector_builder_messages)
)
self._connector_state_manager = connector_state_manager or ConnectorStateManager()

def _init_mappings(self) -> None:
self.PYDANTIC_MODEL_TO_CONSTRUCTOR: Mapping[Type[BaseModel], Callable[..., Any]] = {
Expand Down Expand Up @@ -927,17 +929,24 @@ def create_concurrency_level(

def create_concurrent_cursor_from_datetime_based_cursor(
self,
state_manager: ConnectorStateManager,
model_type: Type[BaseModel],
component_definition: ComponentDefinition,
stream_name: str,
stream_namespace: Optional[str],
config: Config,
stream_state: MutableMapping[str, Any],
message_repository: Optional[MessageRepository] = None,
runtime_lookback_window: Optional[datetime.timedelta] = None,
**kwargs: Any,
) -> ConcurrentCursor:
# Per-partition incremental streams can dynamically create child cursors which will pass their current
# state via the stream_state keyword argument. Incremental syncs without parent streams use the
# incoming state and connector_state_manager that is initialized when the component factory is created
stream_state = (
self._connector_state_manager.get_stream_state(stream_name, stream_namespace)
if "stream_state" not in kwargs
else kwargs["stream_state"]
)

component_type = component_definition.get("type")
if component_definition.get("type") != model_type.__name__:
raise ValueError(
Expand Down Expand Up @@ -1131,7 +1140,7 @@ def create_concurrent_cursor_from_datetime_based_cursor(
stream_namespace=stream_namespace,
stream_state=stream_state,
message_repository=message_repository or self._message_repository,
connector_state_manager=state_manager,
connector_state_manager=self._connector_state_manager,
connector_state_converter=connector_state_converter,
cursor_field=cursor_field,
slice_boundary_fields=slice_boundary_fields,
Expand Down Expand Up @@ -1681,6 +1690,22 @@ def _merge_stream_slicers(
stream_cursor=cursor_component,
)
elif model.incremental_sync:
if model.retriever.type == "AsyncRetriever":
if model.incremental_sync.type != "DatetimeBasedCursor":
# We are currently in a transition to the Concurrent CDK and AsyncRetriever can only work with the support or unordered slices (for example, when we trigger reports for January and February, the report in February can be completed first). Once we have support for custom concurrent cursor or have a new implementation available in the CDK, we can enable more cursors here.
raise ValueError(
"AsyncRetriever with cursor other than DatetimeBasedCursor is not supported yet"
)
if model.retriever.partition_router:
# Note that this development is also done in parallel to the per partition development which once merged we could support here by calling `create_concurrent_cursor_from_perpartition_cursor`
raise ValueError("Per partition state is not supported yet for AsyncRetriever")
return self.create_concurrent_cursor_from_datetime_based_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
model_type=DatetimeBasedCursorModel,
component_definition=model.incremental_sync.__dict__,
stream_name=model.name or "",
stream_namespace=None,
config=config or {},
)
return (
self._create_component_from_model(model=model.incremental_sync, config=config)
if model.incremental_sync
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def _validate_and_get_stream_slice_partition(
"""
if not isinstance(stream_slice, StreamSlice) or "partition" not in stream_slice.partition:
raise AirbyteTracedException(
message="Invalid arguments to AsyncJobRetriever.read_records: stream_slice is no optional. Please contact Airbyte Support",
message="Invalid arguments to AsyncRetriever.read_records: stream_slice is not optional. Please contact Airbyte Support",
failure_type=FailureType.system_error,
)
return stream_slice["partition"] # type: ignore # stream_slice["partition"] has been added as an AsyncPartition as part of stream_slices
Expand Down
6 changes: 4 additions & 2 deletions airbyte_cdk/sources/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from typing import Any, ItemsView, Iterator, KeysView, List, Mapping, Optional, ValuesView

import orjson
from airbyte_cdk.utils.slice_hasher import SliceHasher

# A FieldPointer designates a path to a field inside a mapping. For example, retrieving ["k1", "k1.2"] in the object {"k1" :{"k1.2":
# "hello"}] returns "hello"
Expand Down Expand Up @@ -151,7 +151,9 @@ def __json_serializable__(self) -> Any:
return self._stream_slice

def __hash__(self) -> int:
return hash(orjson.dumps(self._stream_slice, option=orjson.OPT_SORT_KEYS))
return SliceHasher.hash(
stream_slice=self._stream_slice
) # no need to provide stream_name here as this is used for slicing the cursor

def __bool__(self) -> bool:
return bool(self._stream_slice) or bool(self._extra_fields)
9 changes: 8 additions & 1 deletion airbyte_cdk/utils/slice_hasher.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,14 @@ class SliceHasher:
_ENCODING: Final = "utf-8"

@classmethod
def hash(cls, stream_name: str, stream_slice: Optional[Mapping[str, Any]] = None) -> int:
def hash(
cls,
stream_name: str = "<stream name not provided>",
stream_slice: Optional[Mapping[str, Any]] = None,
) -> int:
"""
Note that streams partition with the same slicing value but with different names might collapse if stream name is not provided
"""
if stream_slice:
try:
s = json.dumps(stream_slice, sort_keys=True, cls=SliceEncoder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,15 @@
import requests

from airbyte_cdk import AirbyteTracedException
from airbyte_cdk.models import FailureType, Level
from airbyte_cdk.models import (
AirbyteStateBlob,
AirbyteStateMessage,
AirbyteStateType,
AirbyteStreamState,
FailureType,
Level,
StreamDescriptor,
)
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.declarative.async_job.job_orchestrator import AsyncJobOrchestrator
from airbyte_cdk.sources.declarative.auth import DeclarativeOauth2Authenticator, JwtAuthenticator
Expand Down Expand Up @@ -3093,11 +3101,23 @@ def test_create_concurrent_cursor_from_datetime_based_cursor_all_fields(
"legacy": {},
}

connector_state_manager = ConnectorStateManager()
stream_name = "test"

connector_builder_factory = ModelToComponentFactory(emit_connector_builder_messages=True)
connector_state_manager = ConnectorStateManager(
state=[
AirbyteStateMessage(
type=AirbyteStateType.STREAM,
stream=AirbyteStreamState(
stream_descriptor=StreamDescriptor(name=stream_name),
stream_state=AirbyteStateBlob(stream_state),
),
)
]
)

stream_name = "test"
connector_builder_factory = ModelToComponentFactory(
emit_connector_builder_messages=True, connector_state_manager=connector_state_manager
)

cursor_component_definition = {
"type": "DatetimeBasedCursor",
Expand All @@ -3114,13 +3134,11 @@ def test_create_concurrent_cursor_from_datetime_based_cursor_all_fields(

concurrent_cursor = (
connector_builder_factory.create_concurrent_cursor_from_datetime_based_cursor(
state_manager=connector_state_manager,
model_type=DatetimeBasedCursorModel,
component_definition=cursor_component_definition,
stream_name=stream_name,
stream_namespace=None,
config=config,
stream_state=stream_state,
)
)

Expand Down
Loading
Loading