From c1eea6a9f3a8234136c949190fad99e1c6ddd25f Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Fri, 24 Jan 2025 16:59:19 +0200 Subject: [PATCH 1/4] Fix declarative low-code state migration in SubstreamPartitionRouter --- .../substream_partition_router.py | 62 ++++++++++++++----- .../test_substream_partition_router.py | 10 +++ 2 files changed, 56 insertions(+), 16 deletions(-) diff --git a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py index ae558c63..c242215e 100644 --- a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py +++ b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py @@ -295,28 +295,58 @@ def set_initial_state(self, stream_state: StreamState) -> None: return if not parent_state and incremental_dependency: - # Attempt to retrieve child state - substream_state_values = list(stream_state.values()) - substream_state = substream_state_values[0] if substream_state_values else {} - # Filter out per partition state. Because we pass the state to the parent stream in the format {cursor_field: substream_state} - if isinstance(substream_state, (list, dict)): - substream_state = {} - - parent_state = {} - - # Copy child state to parent streams with incremental dependencies - if substream_state: - for parent_config in self.parent_stream_configs: - if parent_config.incremental_dependency: - parent_state[parent_config.stream.name] = { - parent_config.stream.cursor_field: substream_state - } + # Migrate child state to parent state format + parent_state = self._migrate_child_state_to_parent_state(stream_state) # Set state for each parent stream with an incremental dependency for parent_config in self.parent_stream_configs: if parent_config.incremental_dependency: parent_config.stream.state = parent_state.get(parent_config.stream.name, {}) + def _migrate_child_state_to_parent_state(self, stream_state: StreamState) -> StreamState: + """ + Migrate the child stream state to the parent stream's state format. + + This method converts the global or child state into a format compatible with parent + streams. The migration occurs only for parent streams with incremental dependencies. + The method filters out per-partition states and retains only the global state in the + format `{cursor_field: cursor_value}`. + + Args: + stream_state (StreamState): The state to migrate. Expected formats include: + - {"updated_at": "2023-05-27T00:00:00Z"} + - {"states": [...] } (ignored during migration) + + Returns: + StreamState: A migrated state for parent streams in the format: + { + "parent_stream_name": {"parent_stream_cursor": "2023-05-27T00:00:00Z"} + } + + Example: + Input: {"updated_at": "2023-05-27T00:00:00Z"} + Output: { + "parent_stream_name": {"parent_stream_cursor": "2023-05-27T00:00:00Z"} + } + """ + substream_state_values = list(stream_state.values()) + substream_state = substream_state_values[0] if substream_state_values else {} + + # Ignore per-partition states or invalid formats + if isinstance(substream_state, (list, dict)) or len(substream_state_values) != 1: + return {} + + # Copy child state to parent streams with incremental dependencies + parent_state = {} + if substream_state: + for parent_config in self.parent_stream_configs: + if parent_config.incremental_dependency: + parent_state[parent_config.stream.name] = { + parent_config.stream.cursor_field: substream_state + } + + return parent_state + def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: """ Get the state of the parent streams. diff --git a/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py b/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py index 52306d34..089d2bf0 100644 --- a/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py +++ b/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py @@ -464,6 +464,15 @@ def test_substream_partition_router_invalid_parent_record_type(): }, {"parent_stream_cursor": "2023-05-27T00:00:00Z"}, ), + # Case 6: Declarative global cursor state, no migration expected + ( + { + "looback_window": 1, + "use_global_cursor": True, + "state": {"updated": "2023-05-27T00:00:00Z"}, + }, + {}, + ), ], ids=[ "empty_initial_state", @@ -471,6 +480,7 @@ def test_substream_partition_router_invalid_parent_record_type(): "initial_state_no_parent_global_state", "initial_state_no_parent_per_partition_state", "initial_state_with_parent_state", + "initial_state_no_parent_global_state_declarative", ], ) def test_set_initial_state(initial_state, expected_parent_state): From 2b0a99973b2340ff9ac754d196cbecba3cd68723 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Fri, 24 Jan 2025 18:24:49 +0200 Subject: [PATCH 2/4] Fix migration from python global state format --- .../declarative/incremental/concurrent_partition_cursor.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index b71890cc..730f8763 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -264,7 +264,10 @@ def _set_initial_state(self, stream_state: StreamState) -> None: if not stream_state: return - if self._PERPARTITION_STATE_KEY not in stream_state: + if ( + self._PERPARTITION_STATE_KEY not in stream_state + and self._GLOBAL_STATE_KEY not in stream_state + ): # We assume that `stream_state` is in a global format that can be applied to all partitions. # Example: {"global_state_format_key": "global_state_format_value"} self._global_cursor = deepcopy(stream_state) @@ -273,7 +276,7 @@ def _set_initial_state(self, stream_state: StreamState) -> None: else: self._lookback_window = int(stream_state.get("lookback_window", 0)) - for state in stream_state[self._PERPARTITION_STATE_KEY]: + for state in stream_state.get(self._PERPARTITION_STATE_KEY, []): self._cursor_per_partition[self._to_partition_key(state["partition"])] = ( self._create_cursor(state["cursor"]) ) From d2b09176ea5eaae4251d7b6def893d7cecc737c3 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Fri, 24 Jan 2025 19:07:34 +0200 Subject: [PATCH 3/4] Add unit test for low code state migration --- .../test_concurrent_perpartitioncursor.py | 23 ++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index d1502c21..3fc9e001 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -1171,7 +1171,7 @@ def test_incremental_parent_state( @pytest.mark.parametrize( - "test_name, manifest, mock_requests, expected_records, initial_state, expected_state", + "test_name, manifest, mock_requests, expected_records, expected_state", [ ( "test_incremental_parent_state", @@ -1326,8 +1326,6 @@ def test_incremental_parent_state( "id": 300, }, ], - # Initial state - {"created_at": PARTITION_SYNC_START_TIME}, # Expected state { "state": {"created_at": VOTE_100_CREATED_AT}, @@ -1384,6 +1382,25 @@ def test_incremental_parent_state( ), ], ) +@pytest.mark.parametrize( + "initial_state", + [ + {"created_at": PARTITION_SYNC_START_TIME}, + { + "state": {"created_at": PARTITION_SYNC_START_TIME}, + "lookback_window": 0, + "use_global_cursor": True, + "parent_state": { + "post_comments": { + "state": {"updated_at": PARTITION_SYNC_START_TIME}, + "parent_state": {"posts": {"updated_at": PARTITION_SYNC_START_TIME}}, + "lookback_window": 0, + } + }, + }, + ], + ids=["legacy_python_format", "low_code_global_format"], +) def test_incremental_parent_state_migration( test_name, manifest, mock_requests, expected_records, initial_state, expected_state ): From 8bbb828e8ffd6e8ee0797157589ff47fae06216c Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Fri, 24 Jan 2025 20:15:17 +0200 Subject: [PATCH 4/4] Add lock for _cursor_per_partition --- .../concurrent_partition_cursor.py | 45 ++++++++++++++----- 1 file changed, 34 insertions(+), 11 deletions(-) diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index 730f8763..74442b96 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -147,7 +147,7 @@ def close_partition(self, partition: Partition) -> None: < cursor.state[self.cursor_field.cursor_field_key] ): self._new_global_cursor = copy.deepcopy(cursor.state) - self._emit_state_message() + self._emit_state_message() def ensure_at_least_one_state_emitted(self) -> None: """ @@ -192,7 +192,8 @@ def _generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[St self._global_cursor, self._lookback_window if self._global_cursor else 0, ) - self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor + with self._lock: + self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor self._semaphore_per_partition[self._to_partition_key(partition.partition)] = ( threading.Semaphore(0) ) @@ -210,16 +211,38 @@ def _generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[St def _ensure_partition_limit(self) -> None: """ - Ensure the maximum number of partitions is not exceeded. If so, the oldest added partition will be dropped. + Ensure the maximum number of partitions does not exceed the predefined limit. + + Steps: + 1. Attempt to remove partitions that are marked as finished in `_finished_partitions`. + These partitions are considered processed and safe to delete. + 2. If the limit is still exceeded and no finished partitions are available for removal, + remove the oldest partition unconditionally. We expect failed partitions to be removed. + + Logging: + - Logs a warning each time a partition is removed, indicating whether it was finished + or removed due to being the oldest. """ - while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1: - self._over_limit += 1 - oldest_partition = self._cursor_per_partition.popitem(last=False)[ - 0 - ] # Remove the oldest partition - logger.warning( - f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._over_limit}." - ) + with self._lock: + while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1: + # Try removing finished partitions first + for partition_key in list(self._cursor_per_partition.keys()): + if partition_key in self._finished_partitions: + oldest_partition = self._cursor_per_partition.pop( + partition_key + ) # Remove the oldest partition + logger.warning( + f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._over_limit}." + ) + break + else: + # If no finished partitions can be removed, fall back to removing the oldest partition + oldest_partition = self._cursor_per_partition.popitem(last=False)[ + 1 + ] # Remove the oldest partition + logger.warning( + f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._over_limit}." + ) def _set_initial_state(self, stream_state: StreamState) -> None: """