Skip to content

Commit

Permalink
feat(Low-Code Concurrent CDK): Add ConcurrentPerPartitionCursor (#111)
Browse files Browse the repository at this point in the history
Co-authored-by: Maxime Carbonneau-Leclerc <[email protected]>
  • Loading branch information
tolik0 and maxi297 authored Jan 23, 2025
1 parent 8963a3c commit 4459243
Show file tree
Hide file tree
Showing 14 changed files with 2,910 additions and 86 deletions.
96 changes: 80 additions & 16 deletions airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
ClientSideIncrementalRecordFilterDecorator,
)
from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import (
PerPartitionWithGlobalCursor,
)
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
Expand All @@ -32,7 +35,7 @@
ModelToComponentFactory,
)
from airbyte_cdk.sources.declarative.requesters import HttpRequester
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever
from airbyte_cdk.sources.declarative.retrievers import Retriever, SimpleRetriever
from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
DeclarativePartitionFactory,
StreamSlicerPartitionGenerator,
Expand Down Expand Up @@ -231,21 +234,7 @@ def _group_streams(
stream_state=stream_state,
)

retriever = declarative_stream.retriever

# This is an optimization so that we don't invoke any cursor or state management flows within the
# low-code framework because state management is handled through the ConcurrentCursor.
if declarative_stream and isinstance(retriever, SimpleRetriever):
# Also a temporary hack. In the legacy Stream implementation, as part of the read,
# set_initial_state() is called to instantiate incoming state on the cursor. Although we no
# longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components
# like StopConditionPaginationStrategyDecorator and ClientSideIncrementalRecordFilterDecorator
# still rely on a DatetimeBasedCursor that is properly initialized with state.
if retriever.cursor:
retriever.cursor.set_initial_state(stream_state=stream_state)
# We zero it out here, but since this is a cursor reference, the state is still properly
# instantiated for the other components that reference it
retriever.cursor = None
retriever = self._get_retriever(declarative_stream, stream_state)

partition_generator = StreamSlicerPartitionGenerator(
DeclarativePartitionFactory(
Expand Down Expand Up @@ -305,6 +294,60 @@ def _group_streams(
cursor=final_state_cursor,
)
)
elif (
incremental_sync_component_definition
and incremental_sync_component_definition.get("type", "")
== DatetimeBasedCursorModel.__name__
and self._stream_supports_concurrent_partition_processing(
declarative_stream=declarative_stream
)
and hasattr(declarative_stream.retriever, "stream_slicer")
and isinstance(
declarative_stream.retriever.stream_slicer, PerPartitionWithGlobalCursor
)
):
stream_state = state_manager.get_stream_state(
stream_name=declarative_stream.name, namespace=declarative_stream.namespace
)
partition_router = declarative_stream.retriever.stream_slicer._partition_router

perpartition_cursor = (
self._constructor.create_concurrent_cursor_from_perpartition_cursor(
state_manager=state_manager,
model_type=DatetimeBasedCursorModel,
component_definition=incremental_sync_component_definition,
stream_name=declarative_stream.name,
stream_namespace=declarative_stream.namespace,
config=config or {},
stream_state=stream_state,
partition_router=partition_router,
)
)

retriever = self._get_retriever(declarative_stream, stream_state)

partition_generator = StreamSlicerPartitionGenerator(
DeclarativePartitionFactory(
declarative_stream.name,
declarative_stream.get_json_schema(),
retriever,
self.message_repository,
),
perpartition_cursor,
)

concurrent_streams.append(
DefaultStream(
partition_generator=partition_generator,
name=declarative_stream.name,
json_schema=declarative_stream.get_json_schema(),
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
cursor_field=perpartition_cursor.cursor_field.cursor_field_key,
logger=self.logger,
cursor=perpartition_cursor,
)
)
else:
synchronous_streams.append(declarative_stream)
else:
Expand Down Expand Up @@ -395,6 +438,27 @@ def _stream_supports_concurrent_partition_processing(
return False
return True

def _get_retriever(
self, declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any]
) -> Retriever:
retriever = declarative_stream.retriever

# This is an optimization so that we don't invoke any cursor or state management flows within the
# low-code framework because state management is handled through the ConcurrentCursor.
if declarative_stream and isinstance(retriever, SimpleRetriever):
# Also a temporary hack. In the legacy Stream implementation, as part of the read,
# set_initial_state() is called to instantiate incoming state on the cursor. Although we no
# longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components
# like StopConditionPaginationStrategyDecorator and ClientSideIncrementalRecordFilterDecorator
# still rely on a DatetimeBasedCursor that is properly initialized with state.
if retriever.cursor:
retriever.cursor.set_initial_state(stream_state=stream_state)
# We zero it out here, but since this is a cursor reference, the state is still properly
# instantiated for the other components that reference it
retriever.cursor = None

return retriever

@staticmethod
def _select_streams(
streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog
Expand Down
8 changes: 3 additions & 5 deletions airbyte_cdk/sources/declarative/extractors/record_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,11 @@ class ClientSideIncrementalRecordFilterDecorator(RecordFilter):

def __init__(
self,
date_time_based_cursor: DatetimeBasedCursor,
substream_cursor: Optional[Union[PerPartitionWithGlobalCursor, GlobalSubstreamCursor]],
cursor: Union[DatetimeBasedCursor, PerPartitionWithGlobalCursor, GlobalSubstreamCursor],
**kwargs: Any,
):
super().__init__(**kwargs)
self._date_time_based_cursor = date_time_based_cursor
self._substream_cursor = substream_cursor
self._cursor = cursor

def filter_records(
self,
Expand All @@ -77,7 +75,7 @@ def filter_records(
records = (
record
for record in records
if (self._substream_cursor or self._date_time_based_cursor).should_be_synced(
if self._cursor.should_be_synced(
# Record is created on the fly to align with cursors interface; stream name is ignored as we don't need it here
# Record stream name is empty cause it is not used durig the filtering
Record(data=record, associated_slice=stream_slice, stream_name="")
Expand Down
6 changes: 6 additions & 0 deletions airbyte_cdk/sources/declarative/incremental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.incremental.concurrent_partition_cursor import (
ConcurrentCursorFactory,
ConcurrentPerPartitionCursor,
)
from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
from airbyte_cdk.sources.declarative.incremental.global_substream_cursor import (
Expand All @@ -21,6 +25,8 @@

__all__ = [
"CursorFactory",
"ConcurrentCursorFactory",
"ConcurrentPerPartitionCursor",
"DatetimeBasedCursor",
"DeclarativeCursor",
"GlobalSubstreamCursor",
Expand Down
Loading

0 comments on commit 4459243

Please sign in to comment.