From 68cd03be8fa515f84336a134e20817114cae5e85 Mon Sep 17 00:00:00 2001 From: maxi297 Date: Fri, 17 Jan 2025 14:28:09 -0500 Subject: [PATCH 01/11] TMP development to show how things could work with concurrent cursor --- .../concurrent_declarative_source.py | 39 +++++++++---------- .../manifest_declarative_source.py | 3 -- .../parsers/model_to_component_factory.py | 19 +++++++-- .../declarative/retrievers/async_retriever.py | 2 +- airbyte_cdk/sources/types.py | 4 +- .../test_concurrent_declarative_source.py | 1 + 6 files changed, 38 insertions(+), 30 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 7de937782..d929df88e 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -31,6 +31,7 @@ from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( ModelToComponentFactory, ) +from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter from airbyte_cdk.sources.declarative.requesters import HttpRequester from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import ( @@ -66,6 +67,10 @@ def __init__( component_factory: Optional[ModelToComponentFactory] = None, **kwargs: Any, ) -> None: + # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source + # no longer needs to store the original incoming state. But maybe there's an edge case? + self._state_manager = ConnectorStateManager(state=state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later + # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic # cursors. We do this by no longer automatically instantiating RFR cursors when converting # the declarative models into runtime components. Concurrent sources will continue to checkpoint @@ -73,6 +78,7 @@ def __init__( component_factory = component_factory or ModelToComponentFactory( emit_connector_builder_messages=emit_connector_builder_messages, disable_resumable_full_refresh=True, + state_manager=self._state_manager, ) super().__init__( @@ -82,10 +88,6 @@ def __init__( component_factory=component_factory, ) - # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source - # no longer needs to store the original incoming state. But maybe there's an edge case? - self._state = state - concurrency_level_from_manifest = self._source_config.get("concurrency_level") if concurrency_level_from_manifest: concurrency_level_component = self._constructor.create_component( @@ -175,8 +177,6 @@ def _group_streams( concurrent_streams: List[AbstractStream] = [] synchronous_streams: List[Stream] = [] - state_manager = ConnectorStateManager(state=self._state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later - # Combine streams and dynamic_streams. Note: both cannot be empty at the same time, # and this is validated during the initialization of the source. streams = self._stream_configs(self._source_config) + self._dynamic_stream_configs( @@ -216,19 +216,6 @@ def _group_streams( if self._is_datetime_incremental_without_partition_routing( declarative_stream, incremental_sync_component_definition ): - stream_state = state_manager.get_stream_state( - stream_name=declarative_stream.name, namespace=declarative_stream.namespace - ) - - cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor( - state_manager=state_manager, - model_type=DatetimeBasedCursorModel, - component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above - stream_name=declarative_stream.name, - stream_namespace=declarative_stream.namespace, - config=config or {}, - stream_state=stream_state, - ) retriever = declarative_stream.retriever @@ -241,11 +228,21 @@ def _group_streams( # like StopConditionPaginationStrategyDecorator and ClientSideIncrementalRecordFilterDecorator # still rely on a DatetimeBasedCursor that is properly initialized with state. if retriever.cursor: + stream_state = self._state_manager.get_stream_state( + stream_name=declarative_stream.name, namespace=declarative_stream.namespace + ) retriever.cursor.set_initial_state(stream_state=stream_state) # We zero it out here, but since this is a cursor reference, the state is still properly # instantiated for the other components that reference it retriever.cursor = None + cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor( + model_type=DatetimeBasedCursorModel, + component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above + stream_name=declarative_stream.name, + stream_namespace=declarative_stream.namespace, + config=config or {}, + ) if type(declarative_stream.retriever).__name__ != "AsyncRetriever" else declarative_stream.retriever.stream_slicer.stream_slicer # type: ignore # AsyncRetriever has stream_slicer partition_generator = StreamSlicerPartitionGenerator( DeclarativePartitionFactory( declarative_stream.name, @@ -253,7 +250,7 @@ def _group_streams( retriever, self.message_repository, ), - cursor, + cursor if type(declarative_stream.retriever).__name__ != "AsyncRetriever" else declarative_stream.retriever.stream_slicer, # type: ignore # AsyncRetriever has stream_slicer ) concurrent_streams.append( @@ -325,7 +322,7 @@ def _is_datetime_incremental_without_partition_routing( declarative_stream=declarative_stream ) and hasattr(declarative_stream.retriever, "stream_slicer") - and isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor) + and (isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor) or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter)) ) def _stream_supports_concurrent_partition_processing( diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 4282f7fc7..e21c15fdf 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -25,9 +25,6 @@ from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - CheckStream as CheckStreamModel, -) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( DeclarativeStream as DeclarativeStreamModel, ) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 597be6386..a7facbf1f 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -476,6 +476,7 @@ def __init__( disable_cache: bool = False, disable_resumable_full_refresh: bool = False, message_repository: Optional[MessageRepository] = None, + state_manager: Optional[ConnectorStateManager] = None ): self._init_mappings() self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice @@ -487,6 +488,7 @@ def __init__( self._message_repository = message_repository or InMemoryMessageRepository( self._evaluate_log_level(emit_connector_builder_messages) ) + self._state_manager = state_manager def _init_mappings(self) -> None: self.PYDANTIC_MODEL_TO_CONSTRUCTOR: Mapping[Type[BaseModel], Callable[..., Any]] = { @@ -880,13 +882,11 @@ def create_concurrency_level( def create_concurrent_cursor_from_datetime_based_cursor( self, - state_manager: ConnectorStateManager, model_type: Type[BaseModel], component_definition: ComponentDefinition, stream_name: str, stream_namespace: Optional[str], config: Config, - stream_state: MutableMapping[str, Any], **kwargs: Any, ) -> ConcurrentCursor: component_type = component_definition.get("type") @@ -1021,9 +1021,9 @@ def create_concurrent_cursor_from_datetime_based_cursor( return ConcurrentCursor( stream_name=stream_name, stream_namespace=stream_namespace, - stream_state=stream_state, + stream_state=self._state_manager.get_stream_state(stream_name, stream_namespace), message_repository=self._message_repository, - connector_state_manager=state_manager, + connector_state_manager=self._state_manager, connector_state_converter=connector_state_converter, cursor_field=cursor_field, slice_boundary_fields=slice_boundary_fields, @@ -1476,6 +1476,17 @@ def _merge_stream_slicers( stream_cursor=cursor_component, ) elif model.incremental_sync: + if model.retriever.type == "AsyncRetriever": + if model.incremental_sync.type != "DatetimeBasedCursor": + # TODO explain why it isn't supported + raise ValueError("AsyncRetriever with cursor other than DatetimeBasedCursor is not supported yet") + return self.create_concurrent_cursor_from_datetime_based_cursor( + model_type=DatetimeBasedCursorModel, + component_definition=model.incremental_sync.__dict__, + stream_name=model.name, + stream_namespace=None, + config=config or {}, + ) return ( self._create_component_from_model(model=model.incremental_sync, config=config) if model.incremental_sync diff --git a/airbyte_cdk/sources/declarative/retrievers/async_retriever.py b/airbyte_cdk/sources/declarative/retrievers/async_retriever.py index 1b8860289..08253bab0 100644 --- a/airbyte_cdk/sources/declarative/retrievers/async_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/async_retriever.py @@ -75,7 +75,7 @@ def _validate_and_get_stream_slice_partition( """ if not isinstance(stream_slice, StreamSlice) or "partition" not in stream_slice.partition: raise AirbyteTracedException( - message="Invalid arguments to AsyncJobRetriever.read_records: stream_slice is no optional. Please contact Airbyte Support", + message="Invalid arguments to AsyncRetriever.read_records: stream_slice is no optional. Please contact Airbyte Support", failure_type=FailureType.system_error, ) return stream_slice["partition"] # type: ignore # stream_slice["partition"] has been added as an AsyncPartition as part of stream_slices diff --git a/airbyte_cdk/sources/types.py b/airbyte_cdk/sources/types.py index 3c466ccd8..426c578ad 100644 --- a/airbyte_cdk/sources/types.py +++ b/airbyte_cdk/sources/types.py @@ -8,6 +8,8 @@ import orjson +from airbyte_cdk.utils.slice_hasher import SliceHasher + # A FieldPointer designates a path to a field inside a mapping. For example, retrieving ["k1", "k1.2"] in the object {"k1" :{"k1.2": # "hello"}] returns "hello" FieldPointer = List[str] @@ -151,7 +153,7 @@ def __json_serializable__(self) -> Any: return self._stream_slice def __hash__(self) -> int: - return hash(orjson.dumps(self._stream_slice, option=orjson.OPT_SORT_KEYS)) + return SliceHasher.hash("dummy_name", self._stream_slice) def __bool__(self) -> bool: return bool(self._stream_slice) or bool(self._extra_fields) diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 3b5dd50c9..e0a7c4b6a 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -322,6 +322,7 @@ "http_method": "GET", }, }, + "incremental_sync": {"$ref": "#/definitions/incremental_cursor"}, "schema_loader": { "type": "InlineSchemaLoader", "schema": { From c3b9aed65031cbdc9919eeffb570529fc25bcb0e Mon Sep 17 00:00:00 2001 From: maxi297 Date: Wed, 22 Jan 2025 11:29:46 -0500 Subject: [PATCH 02/11] Improve comments on async retriever cursor initialization --- .../declarative/parsers/model_to_component_factory.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index a7facbf1f..cbdddecb0 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1478,8 +1478,11 @@ def _merge_stream_slicers( elif model.incremental_sync: if model.retriever.type == "AsyncRetriever": if model.incremental_sync.type != "DatetimeBasedCursor": - # TODO explain why it isn't supported + # We are currently in a transition to the Concurrent CDK and AsyncRetriever can only work with the support or unordered slices (for example, when we trigger reports for January and February, the report in February can be completed first). Once we have support for custom concurrent cursor or have a new implementation available in the CDK, we can enable more cursors here. raise ValueError("AsyncRetriever with cursor other than DatetimeBasedCursor is not supported yet") + if model.retriever.partition_router: + # Note that this development is also done in parallel to the per partition development which once merged we could support here by calling `create_concurrent_cursor_from_perpartition_cursor` + raise ValueError("Per partition state is not supported yet for AsyncRetriever") return self.create_concurrent_cursor_from_datetime_based_cursor( model_type=DatetimeBasedCursorModel, component_definition=model.incremental_sync.__dict__, From 05af545caa638cd7e6c27bda4558836b3eae447d Mon Sep 17 00:00:00 2001 From: maxi297 Date: Wed, 22 Jan 2025 11:37:37 -0500 Subject: [PATCH 03/11] making stream name optional when hashing slices --- airbyte_cdk/sources/types.py | 5 ++++- airbyte_cdk/utils/slice_hasher.py | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/types.py b/airbyte_cdk/sources/types.py index 426c578ad..c170f00dd 100644 --- a/airbyte_cdk/sources/types.py +++ b/airbyte_cdk/sources/types.py @@ -9,6 +9,9 @@ import orjson from airbyte_cdk.utils.slice_hasher import SliceHasher +from unit_tests.sources.declarative.requesters.request_options.test_interpolated_request_options_provider import ( + stream_slice, +) # A FieldPointer designates a path to a field inside a mapping. For example, retrieving ["k1", "k1.2"] in the object {"k1" :{"k1.2": # "hello"}] returns "hello" @@ -153,7 +156,7 @@ def __json_serializable__(self) -> Any: return self._stream_slice def __hash__(self) -> int: - return SliceHasher.hash("dummy_name", self._stream_slice) + return SliceHasher.hash(stream_slice=self._stream_slice) # no need to provide stream_name here as this is used for slicing the cursor def __bool__(self) -> bool: return bool(self._stream_slice) or bool(self._extra_fields) diff --git a/airbyte_cdk/utils/slice_hasher.py b/airbyte_cdk/utils/slice_hasher.py index 7f46dd768..c6d73f93b 100644 --- a/airbyte_cdk/utils/slice_hasher.py +++ b/airbyte_cdk/utils/slice_hasher.py @@ -16,7 +16,10 @@ class SliceHasher: _ENCODING: Final = "utf-8" @classmethod - def hash(cls, stream_name: str, stream_slice: Optional[Mapping[str, Any]] = None) -> int: + def hash(cls, stream_name: str = "", stream_slice: Optional[Mapping[str, Any]] = None) -> int: + """ + Note that streams partition with the same slicing value but with different names might collapse if stream name is not provided + """ if stream_slice: try: s = json.dumps(stream_slice, sort_keys=True, cls=SliceEncoder) From 07493d31df4f25efe33ee1cc2c9a4010106664d0 Mon Sep 17 00:00:00 2001 From: brianjlai Date: Wed, 22 Jan 2025 17:11:10 -0800 Subject: [PATCH 04/11] fix tests and clean up the code for readability and better typing, add some more tests to concurrent_declarative_source --- .../concurrent_declarative_source.py | 74 +++++++++++++------ .../parsers/model_to_component_factory.py | 19 +++-- .../declarative/retrievers/async_retriever.py | 2 +- airbyte_cdk/sources/types.py | 9 +-- airbyte_cdk/utils/slice_hasher.py | 6 +- .../test_model_to_component_factory.py | 30 ++++++-- .../test_concurrent_declarative_source.py | 47 ++++++++++++ 7 files changed, 141 insertions(+), 46 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index a6114ddbd..ef3e72daf 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -33,7 +33,7 @@ ) from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter from airbyte_cdk.sources.declarative.requesters import HttpRequester -from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever +from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, SimpleRetriever from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import ( DeclarativePartitionFactory, StreamSlicerPartitionGenerator, @@ -46,7 +46,7 @@ from airbyte_cdk.sources.streams.concurrent.availability_strategy import ( AlwaysAvailableAvailabilityStrategy, ) -from airbyte_cdk.sources.streams.concurrent.cursor import FinalStateCursor +from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, FinalStateCursor from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream @@ -69,7 +69,7 @@ def __init__( ) -> None: # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source # no longer needs to store the original incoming state. But maybe there's an edge case? - self._state_manager = ConnectorStateManager(state=state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later + self._connector_state_manager = ConnectorStateManager(state=state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic # cursors. We do this by no longer automatically instantiating RFR cursors when converting @@ -78,7 +78,7 @@ def __init__( component_factory = component_factory or ModelToComponentFactory( emit_connector_builder_messages=emit_connector_builder_messages, disable_resumable_full_refresh=True, - state_manager=self._state_manager, + connector_state_manager=self._connector_state_manager, ) super().__init__( @@ -217,7 +217,6 @@ def _group_streams( if self._is_datetime_incremental_without_partition_routing( declarative_stream, incremental_sync_component_definition ): - retriever = declarative_stream.retriever # This is an optimization so that we don't invoke any cursor or state management flows within the @@ -229,30 +228,54 @@ def _group_streams( # like StopConditionPaginationStrategyDecorator and ClientSideIncrementalRecordFilterDecorator # still rely on a DatetimeBasedCursor that is properly initialized with state. if retriever.cursor: - stream_state = self._state_manager.get_stream_state( - stream_name=declarative_stream.name, namespace=declarative_stream.namespace + stream_state = self._connector_state_manager.get_stream_state( + stream_name=declarative_stream.name, + namespace=declarative_stream.namespace, ) retriever.cursor.set_initial_state(stream_state=stream_state) # We zero it out here, but since this is a cursor reference, the state is still properly # instantiated for the other components that reference it retriever.cursor = None - cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor( - model_type=DatetimeBasedCursorModel, - component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above - stream_name=declarative_stream.name, - stream_namespace=declarative_stream.namespace, - config=config or {}, - ) if type(declarative_stream.retriever).__name__ != "AsyncRetriever" else declarative_stream.retriever.stream_slicer.stream_slicer # type: ignore # AsyncRetriever has stream_slicer - partition_generator = StreamSlicerPartitionGenerator( - DeclarativePartitionFactory( - declarative_stream.name, - declarative_stream.get_json_schema(), - retriever, - self.message_repository, - ), - cursor if type(declarative_stream.retriever).__name__ != "AsyncRetriever" else declarative_stream.retriever.stream_slicer, # type: ignore # AsyncRetriever has stream_slicer - ) + # if type(declarative_stream.retriever).__name__ != "AsyncRetriever": + if isinstance(declarative_stream.retriever, AsyncRetriever): + cursor = declarative_stream.retriever.stream_slicer.stream_slicer + + if not isinstance(cursor, ConcurrentCursor): + # This should never happen since we instantiate ConcurrentCursor in + # model_to_component_factory.py + raise ValueError( + f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}" + ) + + partition_generator = StreamSlicerPartitionGenerator( + partition_factory=DeclarativePartitionFactory( + declarative_stream.name, + declarative_stream.get_json_schema(), + retriever, + self.message_repository, + ), + stream_slicer=declarative_stream.retriever.stream_slicer, + ) + else: + cursor = ( + self._constructor.create_concurrent_cursor_from_datetime_based_cursor( + model_type=DatetimeBasedCursorModel, + component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above + stream_name=declarative_stream.name, + stream_namespace=declarative_stream.namespace, + config=config or {}, + ) + ) + partition_generator = StreamSlicerPartitionGenerator( + partition_factory=DeclarativePartitionFactory( + declarative_stream.name, + declarative_stream.get_json_schema(), + retriever, + self.message_repository, + ), + stream_slicer=cursor, + ) concurrent_streams.append( DefaultStream( @@ -323,7 +346,10 @@ def _is_datetime_incremental_without_partition_routing( declarative_stream=declarative_stream ) and hasattr(declarative_stream.retriever, "stream_slicer") - and (isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor) or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter)) + and ( + isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor) + or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter) + ) ) def _stream_supports_concurrent_partition_processing( diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index f72b4c73f..97462781e 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -15,7 +15,6 @@ Dict, List, Mapping, - MutableMapping, Optional, Type, Union, @@ -500,7 +499,7 @@ def __init__( disable_cache: bool = False, disable_resumable_full_refresh: bool = False, message_repository: Optional[MessageRepository] = None, - state_manager: Optional[ConnectorStateManager] = None + connector_state_manager: Optional[ConnectorStateManager] = None, ): self._init_mappings() self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice @@ -512,7 +511,7 @@ def __init__( self._message_repository = message_repository or InMemoryMessageRepository( self._evaluate_log_level(emit_connector_builder_messages) ) - self._state_manager = state_manager + self._connector_state_manager = connector_state_manager or ConnectorStateManager() def _init_mappings(self) -> None: self.PYDANTIC_MODEL_TO_CONSTRUCTOR: Mapping[Type[BaseModel], Callable[..., Any]] = { @@ -1107,9 +1106,11 @@ def create_concurrent_cursor_from_datetime_based_cursor( return ConcurrentCursor( stream_name=stream_name, stream_namespace=stream_namespace, - stream_state=self._state_manager.get_stream_state(stream_name, stream_namespace), + stream_state=self._connector_state_manager.get_stream_state( + stream_name, stream_namespace + ), message_repository=self._message_repository, - connector_state_manager=self._state_manager, + connector_state_manager=self._connector_state_manager, connector_state_converter=connector_state_converter, cursor_field=cursor_field, slice_boundary_fields=slice_boundary_fields, @@ -1608,14 +1609,16 @@ def _merge_stream_slicers( if model.retriever.type == "AsyncRetriever": if model.incremental_sync.type != "DatetimeBasedCursor": # We are currently in a transition to the Concurrent CDK and AsyncRetriever can only work with the support or unordered slices (for example, when we trigger reports for January and February, the report in February can be completed first). Once we have support for custom concurrent cursor or have a new implementation available in the CDK, we can enable more cursors here. - raise ValueError("AsyncRetriever with cursor other than DatetimeBasedCursor is not supported yet") + raise ValueError( + "AsyncRetriever with cursor other than DatetimeBasedCursor is not supported yet" + ) if model.retriever.partition_router: # Note that this development is also done in parallel to the per partition development which once merged we could support here by calling `create_concurrent_cursor_from_perpartition_cursor` raise ValueError("Per partition state is not supported yet for AsyncRetriever") - return self.create_concurrent_cursor_from_datetime_based_cursor( + return self.create_concurrent_cursor_from_datetime_based_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing model_type=DatetimeBasedCursorModel, component_definition=model.incremental_sync.__dict__, - stream_name=model.name, + stream_name=model.name or "", stream_namespace=None, config=config or {}, ) diff --git a/airbyte_cdk/sources/declarative/retrievers/async_retriever.py b/airbyte_cdk/sources/declarative/retrievers/async_retriever.py index 08253bab0..bd28e0e2d 100644 --- a/airbyte_cdk/sources/declarative/retrievers/async_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/async_retriever.py @@ -75,7 +75,7 @@ def _validate_and_get_stream_slice_partition( """ if not isinstance(stream_slice, StreamSlice) or "partition" not in stream_slice.partition: raise AirbyteTracedException( - message="Invalid arguments to AsyncRetriever.read_records: stream_slice is no optional. Please contact Airbyte Support", + message="Invalid arguments to AsyncRetriever.read_records: stream_slice is not optional. Please contact Airbyte Support", failure_type=FailureType.system_error, ) return stream_slice["partition"] # type: ignore # stream_slice["partition"] has been added as an AsyncPartition as part of stream_slices diff --git a/airbyte_cdk/sources/types.py b/airbyte_cdk/sources/types.py index c170f00dd..d4db76f87 100644 --- a/airbyte_cdk/sources/types.py +++ b/airbyte_cdk/sources/types.py @@ -6,12 +6,7 @@ from typing import Any, ItemsView, Iterator, KeysView, List, Mapping, Optional, ValuesView -import orjson - from airbyte_cdk.utils.slice_hasher import SliceHasher -from unit_tests.sources.declarative.requesters.request_options.test_interpolated_request_options_provider import ( - stream_slice, -) # A FieldPointer designates a path to a field inside a mapping. For example, retrieving ["k1", "k1.2"] in the object {"k1" :{"k1.2": # "hello"}] returns "hello" @@ -156,7 +151,9 @@ def __json_serializable__(self) -> Any: return self._stream_slice def __hash__(self) -> int: - return SliceHasher.hash(stream_slice=self._stream_slice) # no need to provide stream_name here as this is used for slicing the cursor + return SliceHasher.hash( + stream_slice=self._stream_slice + ) # no need to provide stream_name here as this is used for slicing the cursor def __bool__(self) -> bool: return bool(self._stream_slice) or bool(self._extra_fields) diff --git a/airbyte_cdk/utils/slice_hasher.py b/airbyte_cdk/utils/slice_hasher.py index c6d73f93b..25950a934 100644 --- a/airbyte_cdk/utils/slice_hasher.py +++ b/airbyte_cdk/utils/slice_hasher.py @@ -16,7 +16,11 @@ class SliceHasher: _ENCODING: Final = "utf-8" @classmethod - def hash(cls, stream_name: str = "", stream_slice: Optional[Mapping[str, Any]] = None) -> int: + def hash( + cls, + stream_name: str = "", + stream_slice: Optional[Mapping[str, Any]] = None, + ) -> int: """ Note that streams partition with the same slicing value but with different names might collapse if stream name is not provided """ diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 7bfdc0379..845e41fe8 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -12,7 +12,15 @@ import requests from airbyte_cdk import AirbyteTracedException -from airbyte_cdk.models import FailureType, Level +from airbyte_cdk.models import ( + AirbyteStateBlob, + AirbyteStateMessage, + AirbyteStateType, + AirbyteStreamState, + FailureType, + Level, + StreamDescriptor, +) from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager from airbyte_cdk.sources.declarative.async_job.job_orchestrator import AsyncJobOrchestrator from airbyte_cdk.sources.declarative.auth import DeclarativeOauth2Authenticator, JwtAuthenticator @@ -3093,11 +3101,23 @@ def test_create_concurrent_cursor_from_datetime_based_cursor_all_fields( "legacy": {}, } - connector_state_manager = ConnectorStateManager() + stream_name = "test" - connector_builder_factory = ModelToComponentFactory(emit_connector_builder_messages=True) + connector_state_manager = ConnectorStateManager( + state=[ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name=stream_name), + stream_state=AirbyteStateBlob(stream_state), + ), + ) + ] + ) - stream_name = "test" + connector_builder_factory = ModelToComponentFactory( + emit_connector_builder_messages=True, connector_state_manager=connector_state_manager + ) cursor_component_definition = { "type": "DatetimeBasedCursor", @@ -3114,13 +3134,11 @@ def test_create_concurrent_cursor_from_datetime_based_cursor_all_fields( concurrent_cursor = ( connector_builder_factory.create_concurrent_cursor_from_datetime_based_cursor( - state_manager=connector_state_manager, model_type=DatetimeBasedCursorModel, component_definition=cursor_component_definition, stream_name=stream_name, stream_namespace=None, config=config, - stream_state=stream_state, ) ) diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index e0a7c4b6a..eccedda44 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -33,6 +33,10 @@ ConcurrentDeclarativeSource, ) from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream +from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter +from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import ( + StreamSlicerPartitionGenerator +) from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.checkpoint import Cursor from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor @@ -1604,6 +1608,49 @@ def test_given_partition_routing_and_incremental_sync_then_stream_is_not_concurr assert len(synchronous_streams) == 1 +def test_async_incremental_stream_uses_concurrent_cursor_with_state(): + state = [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="async_job_stream", namespace=None), + stream_state=AirbyteStateBlob(updated_at="2024-08-06"), + ), + ) + ] + + source = ConcurrentDeclarativeSource( + source_config=_MANIFEST, config=_CONFIG, catalog=_CATALOG, state=state + ) + + expected_state = { + "legacy": { + "updated_at": "2024-08-06" + }, + "slices": [ + { + "end": datetime(2024, 8, 6, 0, 0, tzinfo=timezone.utc), + "most_recent_cursor_value": datetime(2024, 8, 6, 0, 0, tzinfo=timezone.utc), + "start": datetime(2024, 7, 1, 0, 0, tzinfo=timezone.utc) + } + ], + "state_type": "date-range" + } + + concurrent_streams, _ = source._group_streams(config=_CONFIG) + async_job_stream = concurrent_streams[5] + assert isinstance(async_job_stream, DefaultStream) + cursor = async_job_stream._cursor + assert isinstance(cursor, ConcurrentCursor) + assert cursor._concurrent_state == expected_state + stream_partition_generator = async_job_stream._stream_partition_generator + assert isinstance(stream_partition_generator, StreamSlicerPartitionGenerator) + async_job_partition_router = stream_partition_generator._stream_slicer + assert isinstance(async_job_partition_router, AsyncJobPartitionRouter) + assert isinstance(async_job_partition_router.stream_slicer, ConcurrentCursor) + assert async_job_partition_router.stream_slicer._concurrent_state == expected_state + + def create_wrapped_stream(stream: DeclarativeStream) -> Stream: slice_to_records_mapping = get_mocked_read_records_output(stream_name=stream.name) From 8a55d2c62dd670ffca0d57d663536fcf093f5089 Mon Sep 17 00:00:00 2001 From: brianjlai Date: Wed, 22 Jan 2025 17:17:03 -0800 Subject: [PATCH 05/11] formatting --- .../declarative/test_concurrent_declarative_source.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index eccedda44..718f913ba 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -35,7 +35,7 @@ from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import ( - StreamSlicerPartitionGenerator + StreamSlicerPartitionGenerator, ) from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.checkpoint import Cursor @@ -1624,17 +1624,15 @@ def test_async_incremental_stream_uses_concurrent_cursor_with_state(): ) expected_state = { - "legacy": { - "updated_at": "2024-08-06" - }, + "legacy": {"updated_at": "2024-08-06"}, "slices": [ { "end": datetime(2024, 8, 6, 0, 0, tzinfo=timezone.utc), "most_recent_cursor_value": datetime(2024, 8, 6, 0, 0, tzinfo=timezone.utc), - "start": datetime(2024, 7, 1, 0, 0, tzinfo=timezone.utc) + "start": datetime(2024, 7, 1, 0, 0, tzinfo=timezone.utc), } ], - "state_type": "date-range" + "state_type": "date-range", } concurrent_streams, _ = source._group_streams(config=_CONFIG) From 05ead347b591fa71bfd540d1a35adcc4f47824c0 Mon Sep 17 00:00:00 2001 From: Brian Lai <51336873+brianjlai@users.noreply.github.com> Date: Wed, 22 Jan 2025 20:24:14 -0500 Subject: [PATCH 06/11] Update airbyte_cdk/sources/declarative/concurrent_declarative_source.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- .../sources/declarative/concurrent_declarative_source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index ef3e72daf..bb3962bd4 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -245,7 +245,7 @@ def _group_streams( # This should never happen since we instantiate ConcurrentCursor in # model_to_component_factory.py raise ValueError( - f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}" + f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received {cursor.__class__}" ) partition_generator = StreamSlicerPartitionGenerator( From 92f09408bc3280e1165647787be12ab60f8e97b5 Mon Sep 17 00:00:00 2001 From: brianjlai Date: Thu, 23 Jan 2025 13:59:45 -0800 Subject: [PATCH 07/11] fix test --- .../sources/declarative/test_concurrent_declarative_source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 505c0f2a2..a8c9f77ba 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -1634,7 +1634,7 @@ def test_async_incremental_stream_uses_concurrent_cursor_with_state(): } concurrent_streams, _ = source._group_streams(config=_CONFIG) - async_job_stream = concurrent_streams[5] + async_job_stream = concurrent_streams[6] assert isinstance(async_job_stream, DefaultStream) cursor = async_job_stream._cursor assert isinstance(cursor, ConcurrentCursor) From 03d19583df8a1708e0245546ef71e0cf3a6c3b60 Mon Sep 17 00:00:00 2001 From: brianjlai Date: Thu, 23 Jan 2025 14:37:21 -0800 Subject: [PATCH 08/11] refactor back to old way --- .../concurrent_declarative_source.py | 54 +++++++++++-------- 1 file changed, 32 insertions(+), 22 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 532d9972a..3293731fd 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -36,7 +36,7 @@ ) from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter from airbyte_cdk.sources.declarative.requesters import HttpRequester -from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, SimpleRetriever +from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, Retriever, SimpleRetriever from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import ( DeclarativePartitionFactory, StreamSlicerPartitionGenerator, @@ -220,27 +220,15 @@ def _group_streams( if self._is_datetime_incremental_without_partition_routing( declarative_stream, incremental_sync_component_definition ): - retriever = declarative_stream.retriever - - # This is an optimization so that we don't invoke any cursor or state management flows within the - # low-code framework because state management is handled through the ConcurrentCursor. - if declarative_stream and isinstance(retriever, SimpleRetriever): - # Also a temporary hack. In the legacy Stream implementation, as part of the read, - # set_initial_state() is called to instantiate incoming state on the cursor. Although we no - # longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components - # like StopConditionPaginationStrategyDecorator and ClientSideIncrementalRecordFilterDecorator - # still rely on a DatetimeBasedCursor that is properly initialized with state. - if retriever.cursor: - stream_state = self._connector_state_manager.get_stream_state( - stream_name=declarative_stream.name, - namespace=declarative_stream.namespace, - ) - retriever.cursor.set_initial_state(stream_state=stream_state) - # We zero it out here, but since this is a cursor reference, the state is still properly - # instantiated for the other components that reference it - retriever.cursor = None + stream_state = self._connector_state_manager.get_stream_state( + stream_name=declarative_stream.name, namespace=declarative_stream.namespace + ) + + retriever = self._get_retriever(declarative_stream, stream_state) - if isinstance(declarative_stream.retriever, AsyncRetriever): + if isinstance(declarative_stream.retriever, AsyncRetriever) and isinstance( + declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter + ): cursor = declarative_stream.retriever.stream_slicer.stream_slicer if not isinstance(cursor, ConcurrentCursor): @@ -357,7 +345,7 @@ def _group_streams( ) ) - retriever = declarative_stream.retriever + retriever = self._get_retriever(declarative_stream, stream_state) partition_generator = StreamSlicerPartitionGenerator( DeclarativePartitionFactory( @@ -474,6 +462,28 @@ def _stream_supports_concurrent_partition_processing( return False return True + @staticmethod + def _get_retriever( + declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any] + ) -> Retriever: + retriever = declarative_stream.retriever + + # This is an optimization so that we don't invoke any cursor or state management flows within the + # low-code framework because state management is handled through the ConcurrentCursor. + if declarative_stream and isinstance(retriever, SimpleRetriever): + # Also a temporary hack. In the legacy Stream implementation, as part of the read, + # set_initial_state() is called to instantiate incoming state on the cursor. Although we no + # longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components + # like StopConditionPaginationStrategyDecorator and ClientSideIncrementalRecordFilterDecorator + # still rely on a DatetimeBasedCursor that is properly initialized with state. + if retriever.cursor: + retriever.cursor.set_initial_state(stream_state=stream_state) + # We zero it out here, but since this is a cursor reference, the state is still properly + # instantiated for the other components that reference it + retriever.cursor = None + + return retriever + @staticmethod def _select_streams( streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog From 516f844b32308d79e7194c88ee2aec9597ae4ae5 Mon Sep 17 00:00:00 2001 From: maxi297 Date: Fri, 24 Jan 2025 10:07:27 -0500 Subject: [PATCH 09/11] Attempt to fix per partition concurrent tests --- .../sources/declarative/parsers/model_to_component_factory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 4517323b6..dd6c95120 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -933,7 +933,7 @@ def create_concurrent_cursor_from_datetime_based_cursor( runtime_lookback_window: Optional[datetime.timedelta] = None, **kwargs: Any, ) -> ConcurrentCursor: - stream_state = self._connector_state_manager.get_stream_state(stream_name, stream_namespace) + stream_state = self._connector_state_manager.get_stream_state(stream_name, stream_namespace) if "stream_state" not in kwargs else kwargs["stream_state"] component_type = component_definition.get("type") if component_definition.get("type") != model_type.__name__: From f5c61c06db97756ff73ac81cc25f798f9f543e2e Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Fri, 24 Jan 2025 15:25:11 +0000 Subject: [PATCH 10/11] Auto-fix lint and format issues --- .../declarative/parsers/model_to_component_factory.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index dd6c95120..78db362d1 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -933,7 +933,11 @@ def create_concurrent_cursor_from_datetime_based_cursor( runtime_lookback_window: Optional[datetime.timedelta] = None, **kwargs: Any, ) -> ConcurrentCursor: - stream_state = self._connector_state_manager.get_stream_state(stream_name, stream_namespace) if "stream_state" not in kwargs else kwargs["stream_state"] + stream_state = ( + self._connector_state_manager.get_stream_state(stream_name, stream_namespace) + if "stream_state" not in kwargs + else kwargs["stream_state"] + ) component_type = component_definition.get("type") if component_definition.get("type") != model_type.__name__: From 68fd8cbb423a5d24d3054f58658269b3722cd853 Mon Sep 17 00:00:00 2001 From: brianjlai Date: Fri, 24 Jan 2025 11:58:07 -0800 Subject: [PATCH 11/11] add clarifying comments --- .../sources/declarative/parsers/model_to_component_factory.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 78db362d1..f47dd96a3 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -933,6 +933,9 @@ def create_concurrent_cursor_from_datetime_based_cursor( runtime_lookback_window: Optional[datetime.timedelta] = None, **kwargs: Any, ) -> ConcurrentCursor: + # Per-partition incremental streams can dynamically create child cursors which will pass their current + # state via the stream_state keyword argument. Incremental syncs without parent streams use the + # incoming state and connector_state_manager that is initialized when the component factory is created stream_state = ( self._connector_state_manager.get_stream_state(stream_name, stream_namespace) if "stream_state" not in kwargs