Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code concurrent): Allow async job low-code streams that are incremental to be run by the concurrent framework #228

Merged
merged 16 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 18 additions & 21 deletions airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
ModelToComponentFactory,
)
from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter
from airbyte_cdk.sources.declarative.requesters import HttpRequester
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever
from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
Expand Down Expand Up @@ -66,13 +67,18 @@ def __init__(
component_factory: Optional[ModelToComponentFactory] = None,
**kwargs: Any,
) -> None:
# todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
# no longer needs to store the original incoming state. But maybe there's an edge case?
self._state_manager = ConnectorStateManager(state=state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later

# To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic
# cursors. We do this by no longer automatically instantiating RFR cursors when converting
# the declarative models into runtime components. Concurrent sources will continue to checkpoint
# incremental streams running in full refresh.
component_factory = component_factory or ModelToComponentFactory(
emit_connector_builder_messages=emit_connector_builder_messages,
disable_resumable_full_refresh=True,
state_manager=self._state_manager,
)

super().__init__(
Expand All @@ -82,10 +88,6 @@ def __init__(
component_factory=component_factory,
)

# todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
# no longer needs to store the original incoming state. But maybe there's an edge case?
self._state = state

concurrency_level_from_manifest = self._source_config.get("concurrency_level")
if concurrency_level_from_manifest:
concurrency_level_component = self._constructor.create_component(
Expand Down Expand Up @@ -175,8 +177,6 @@ def _group_streams(
concurrent_streams: List[AbstractStream] = []
synchronous_streams: List[Stream] = []

state_manager = ConnectorStateManager(state=self._state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later

# Combine streams and dynamic_streams. Note: both cannot be empty at the same time,
# and this is validated during the initialization of the source.
streams = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
Expand Down Expand Up @@ -216,19 +216,6 @@ def _group_streams(
if self._is_datetime_incremental_without_partition_routing(
declarative_stream, incremental_sync_component_definition
):
stream_state = state_manager.get_stream_state(
stream_name=declarative_stream.name, namespace=declarative_stream.namespace
)

cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
state_manager=state_manager,
model_type=DatetimeBasedCursorModel,
component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above
stream_name=declarative_stream.name,
stream_namespace=declarative_stream.namespace,
config=config or {},
stream_state=stream_state,
)

retriever = declarative_stream.retriever

Expand All @@ -241,19 +228,29 @@ def _group_streams(
# like StopConditionPaginationStrategyDecorator and ClientSideIncrementalRecordFilterDecorator
# still rely on a DatetimeBasedCursor that is properly initialized with state.
if retriever.cursor:
stream_state = self._state_manager.get_stream_state(
stream_name=declarative_stream.name, namespace=declarative_stream.namespace
)
retriever.cursor.set_initial_state(stream_state=stream_state)
# We zero it out here, but since this is a cursor reference, the state is still properly
# instantiated for the other components that reference it
retriever.cursor = None

cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
model_type=DatetimeBasedCursorModel,
component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above
stream_name=declarative_stream.name,
stream_namespace=declarative_stream.namespace,
config=config or {},
) if type(declarative_stream.retriever).__name__ != "AsyncRetriever" else declarative_stream.retriever.stream_slicer.stream_slicer # type: ignore # AsyncRetriever has stream_slicer
partition_generator = StreamSlicerPartitionGenerator(
DeclarativePartitionFactory(
declarative_stream.name,
declarative_stream.get_json_schema(),
retriever,
self.message_repository,
),
cursor,
cursor if type(declarative_stream.retriever).__name__ != "AsyncRetriever" else declarative_stream.retriever.stream_slicer, # type: ignore # AsyncRetriever has stream_slicer
)

concurrent_streams.append(
Expand Down Expand Up @@ -325,7 +322,7 @@ def _is_datetime_incremental_without_partition_routing(
declarative_stream=declarative_stream
)
and hasattr(declarative_stream.retriever, "stream_slicer")
and isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)
and (isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor) or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter))
)

def _stream_supports_concurrent_partition_processing(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@
from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CheckStream as CheckStreamModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
DeclarativeStream as DeclarativeStreamModel,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ def __init__(
disable_cache: bool = False,
disable_resumable_full_refresh: bool = False,
message_repository: Optional[MessageRepository] = None,
state_manager: Optional[ConnectorStateManager] = None
):
self._init_mappings()
self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice
Expand All @@ -487,6 +488,7 @@ def __init__(
self._message_repository = message_repository or InMemoryMessageRepository(
self._evaluate_log_level(emit_connector_builder_messages)
)
self._state_manager = state_manager
brianjlai marked this conversation as resolved.
Show resolved Hide resolved

def _init_mappings(self) -> None:
self.PYDANTIC_MODEL_TO_CONSTRUCTOR: Mapping[Type[BaseModel], Callable[..., Any]] = {
Expand Down Expand Up @@ -880,13 +882,11 @@ def create_concurrency_level(

def create_concurrent_cursor_from_datetime_based_cursor(
self,
state_manager: ConnectorStateManager,
model_type: Type[BaseModel],
component_definition: ComponentDefinition,
stream_name: str,
stream_namespace: Optional[str],
config: Config,
stream_state: MutableMapping[str, Any],
**kwargs: Any,
) -> ConcurrentCursor:
component_type = component_definition.get("type")
Expand Down Expand Up @@ -1021,9 +1021,9 @@ def create_concurrent_cursor_from_datetime_based_cursor(
return ConcurrentCursor(
stream_name=stream_name,
stream_namespace=stream_namespace,
stream_state=stream_state,
stream_state=self._state_manager.get_stream_state(stream_name, stream_namespace),
message_repository=self._message_repository,
connector_state_manager=state_manager,
connector_state_manager=self._state_manager,
connector_state_converter=connector_state_converter,
cursor_field=cursor_field,
slice_boundary_fields=slice_boundary_fields,
Expand Down Expand Up @@ -1476,6 +1476,20 @@ def _merge_stream_slicers(
stream_cursor=cursor_component,
)
elif model.incremental_sync:
if model.retriever.type == "AsyncRetriever":
if model.incremental_sync.type != "DatetimeBasedCursor":
# We are currently in a transition to the Concurrent CDK and AsyncRetriever can only work with the support or unordered slices (for example, when we trigger reports for January and February, the report in February can be completed first). Once we have support for custom concurrent cursor or have a new implementation available in the CDK, we can enable more cursors here.
raise ValueError("AsyncRetriever with cursor other than DatetimeBasedCursor is not supported yet")
if model.retriever.partition_router:
# Note that this development is also done in parallel to the per partition development which once merged we could support here by calling `create_concurrent_cursor_from_perpartition_cursor`
raise ValueError("Per partition state is not supported yet for AsyncRetriever")
return self.create_concurrent_cursor_from_datetime_based_cursor(
brianjlai marked this conversation as resolved.
Show resolved Hide resolved
model_type=DatetimeBasedCursorModel,
component_definition=model.incremental_sync.__dict__,
stream_name=model.name,
stream_namespace=None,
config=config or {},
)
return (
self._create_component_from_model(model=model.incremental_sync, config=config)
if model.incremental_sync
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def _validate_and_get_stream_slice_partition(
"""
if not isinstance(stream_slice, StreamSlice) or "partition" not in stream_slice.partition:
raise AirbyteTracedException(
message="Invalid arguments to AsyncJobRetriever.read_records: stream_slice is no optional. Please contact Airbyte Support",
message="Invalid arguments to AsyncRetriever.read_records: stream_slice is no optional. Please contact Airbyte Support",
failure_type=FailureType.system_error,
)
return stream_slice["partition"] # type: ignore # stream_slice["partition"] has been added as an AsyncPartition as part of stream_slices
Expand Down
7 changes: 6 additions & 1 deletion airbyte_cdk/sources/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@

import orjson

from airbyte_cdk.utils.slice_hasher import SliceHasher
from unit_tests.sources.declarative.requesters.request_options.test_interpolated_request_options_provider import (
stream_slice,
)

# A FieldPointer designates a path to a field inside a mapping. For example, retrieving ["k1", "k1.2"] in the object {"k1" :{"k1.2":
# "hello"}] returns "hello"
FieldPointer = List[str]
Expand Down Expand Up @@ -151,7 +156,7 @@ def __json_serializable__(self) -> Any:
return self._stream_slice

def __hash__(self) -> int:
return hash(orjson.dumps(self._stream_slice, option=orjson.OPT_SORT_KEYS))
return SliceHasher.hash(stream_slice=self._stream_slice) # no need to provide stream_name here as this is used for slicing the cursor

def __bool__(self) -> bool:
return bool(self._stream_slice) or bool(self._extra_fields)
5 changes: 4 additions & 1 deletion airbyte_cdk/utils/slice_hasher.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ class SliceHasher:
_ENCODING: Final = "utf-8"

@classmethod
def hash(cls, stream_name: str, stream_slice: Optional[Mapping[str, Any]] = None) -> int:
def hash(cls, stream_name: str = "<stream name not provided>", stream_slice: Optional[Mapping[str, Any]] = None) -> int:
"""
Note that streams partition with the same slicing value but with different names might collapse if stream name is not provided
"""
if stream_slice:
try:
s = json.dumps(stream_slice, sort_keys=True, cls=SliceEncoder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@
"http_method": "GET",
},
},
"incremental_sync": {"$ref": "#/definitions/incremental_cursor"},
brianjlai marked this conversation as resolved.
Show resolved Hide resolved
"schema_loader": {
"type": "InlineSchemaLoader",
"schema": {
Expand Down
Loading