Skip to content

Commit

Permalink
fix(airbyte-cdk): Fix RequestOptionsProvider for PerPartitionWithGlob…
Browse files Browse the repository at this point in the history
…alCursor (#254)
  • Loading branch information
tolik0 authored Jan 23, 2025
1 parent 5cc9840 commit ec7e961
Show file tree
Hide file tree
Showing 3 changed files with 583 additions and 19 deletions.
53 changes: 35 additions & 18 deletions airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ def get_request_params(
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
if stream_slice:
if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition:
self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition))
return self._partition_router.get_request_params( # type: ignore # this always returns a mapping
stream_state=stream_state,
stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
Expand All @@ -244,6 +246,8 @@ def get_request_headers(
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
if stream_slice:
if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition:
self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition))
return self._partition_router.get_request_headers( # type: ignore # this always returns a mapping
stream_state=stream_state,
stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
Expand All @@ -266,6 +270,8 @@ def get_request_body_data(
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Union[Mapping[str, Any], str]:
if stream_slice:
if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition:
self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition))
return self._partition_router.get_request_body_data( # type: ignore # this always returns a mapping
stream_state=stream_state,
stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
Expand All @@ -288,6 +294,8 @@ def get_request_body_json(
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
if stream_slice:
if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition:
self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition))
return self._partition_router.get_request_body_json( # type: ignore # this always returns a mapping
stream_state=stream_state,
stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
Expand All @@ -303,21 +311,6 @@ def get_request_body_json(
raise ValueError("A partition needs to be provided in order to get request body json")

def should_be_synced(self, record: Record) -> bool:
if (
record.associated_slice
and self._to_partition_key(record.associated_slice.partition)
not in self._cursor_per_partition
):
partition_state = (
self._state_to_migrate_from
if self._state_to_migrate_from
else self._NO_CURSOR_STATE
)
cursor = self._create_cursor(partition_state)

self._cursor_per_partition[
self._to_partition_key(record.associated_slice.partition)
] = cursor
return self._get_cursor(record).should_be_synced(
self._convert_record_to_cursor_record(record)
)
Expand Down Expand Up @@ -356,8 +349,32 @@ def _get_cursor(self, record: Record) -> DeclarativeCursor:
)
partition_key = self._to_partition_key(record.associated_slice.partition)
if partition_key not in self._cursor_per_partition:
raise ValueError(
"Invalid state as stream slices that are emitted should refer to an existing cursor"
)
self._create_cursor_for_partition(partition_key)
cursor = self._cursor_per_partition[partition_key]
return cursor

def _create_cursor_for_partition(self, partition_key: str) -> None:
"""
Dynamically creates and initializes a cursor for the specified partition.
This method is required for `ConcurrentPerPartitionCursor`. For concurrent cursors,
stream_slices is executed only for the concurrent cursor, so cursors per partition
are not created for the declarative cursor. This method ensures that a cursor is available
to create requests for the specified partition. The cursor is initialized
with the per-partition state if present in the initial state, or with the global state
adjusted by the lookback window, or with the state to migrate from.
Note:
This is a temporary workaround and should be removed once the declarative cursor
is decoupled from the concurrent cursor implementation.
Args:
partition_key (str): The unique identifier for the partition for which the cursor
needs to be created.
"""
partition_state = (
self._state_to_migrate_from if self._state_to_migrate_from else self._NO_CURSOR_STATE
)
cursor = self._create_cursor(partition_state)

self._cursor_per_partition[partition_key] = cursor
Original file line number Diff line number Diff line change
Expand Up @@ -2386,7 +2386,7 @@ def create_simple_retriever(
if (
not isinstance(stream_slicer, DatetimeBasedCursor)
or type(stream_slicer) is not DatetimeBasedCursor
) and not isinstance(stream_slicer, PerPartitionWithGlobalCursor):
):
# Many of the custom component implementations of DatetimeBasedCursor override get_request_params() (or other methods).
# Because we're decoupling RequestOptionsProvider from the Cursor, custom components will eventually need to reimplement
# their own RequestOptionsProvider. However, right now the existing StreamSlicer/Cursor still can act as the SimpleRetriever's
Expand Down
Loading

0 comments on commit ec7e961

Please sign in to comment.