diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index d1c594aad..83b774fe5 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -6,6 +6,7 @@ # import threading from collections import OrderedDict +from copy import deepcopy from typing import Any, Callable, Iterable, Mapping, MutableMapping, Optional from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager @@ -57,7 +58,6 @@ class ConcurrentPerPartitionCursor(Cursor): _NO_CURSOR_STATE: Mapping[str, Any] = {} _KEY = 0 _VALUE = 1 - _state_to_migrate_from: Mapping[str, Any] = {} def __init__( self, @@ -70,6 +70,7 @@ def __init__( connector_state_manager: ConnectorStateManager, cursor_field: CursorField, ) -> None: + self._global_cursor: Mapping[str, Any] = {} self._stream_name = stream_name self._stream_namespace = stream_namespace self._message_repository = message_repository @@ -87,7 +88,6 @@ def __init__( self._finished_partitions = set() self._lock = threading.Lock() self._timer = Timer() - self._global_cursor = None self._new_global_cursor = None self._lookback_window = 0 self._parent_state = None @@ -116,12 +116,12 @@ def state(self) -> MutableMapping[str, Any]: ) state: dict[str, Any] = {"states": states} - state["state"] = self._global_cursor + if self._global_cursor: + state["state"] = self._global_cursor if self._lookback_window is not None: state["lookback_window"] = self._lookback_window if self._parent_state is not None: state["parent_state"] = self._parent_state - print(state) return state def close_partition(self, partition: Partition) -> None: @@ -192,8 +192,8 @@ def generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[Str cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition)) if not cursor: partition_state = ( - self._state_to_migrate_from - if self._state_to_migrate_from + self._global_cursor + if self._global_cursor else self._NO_CURSOR_STATE ) cursor = self._create_cursor(partition_state) @@ -265,7 +265,8 @@ def _set_initial_state(self, stream_state: StreamState) -> None: if "states" not in stream_state: # We assume that `stream_state` is in a global format that can be applied to all partitions. # Example: {"global_state_format_key": "global_state_format_value"} - self._state_to_migrate_from = stream_state + self._global_cursor = deepcopy(stream_state) + self._new_global_cursor = deepcopy(stream_state) else: for state in stream_state["states"]: @@ -278,13 +279,16 @@ def _set_initial_state(self, stream_state: StreamState) -> None: # set default state for missing partitions if it is per partition with fallback to global if "state" in stream_state: - self._state_to_migrate_from = stream_state["state"] + self._global_cursor = deepcopy(stream_state["state"]) + self._new_global_cursor = deepcopy(stream_state["state"]) # Set parent state for partition routers based on parent streams self._partition_router.set_initial_state(stream_state) def observe(self, record: Record) -> None: - print(self._to_partition_key(record.associated_slice.partition), record) + print("Observing record in concirrent perpartition ", self._to_partition_key(record.associated_slice.partition), record, self._cursor_per_partition[ + self._to_partition_key(record.associated_slice.partition) + ].state) self._cursor_per_partition[ self._to_partition_key(record.associated_slice.partition) ].observe(record) @@ -296,7 +300,7 @@ def _to_dict(self, partition_key: str) -> Mapping[str, Any]: return self._partition_serializer.to_partition(partition_key) def _create_cursor(self, cursor_state: Any) -> Cursor: - cursor = self._cursor_factory.create(stream_state=cursor_state) + cursor = self._cursor_factory.create(stream_state=deepcopy(cursor_state)) return cursor def should_be_synced(self, record: Record) -> bool: diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index 57948d79c..cdeb70d42 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -1,6 +1,7 @@ # Copyright (c) 2024 Airbyte, Inc., all rights reserved. import copy +from copy import deepcopy from typing import Any, List, Mapping, MutableMapping, Optional, Union from unittest.mock import MagicMock @@ -240,6 +241,16 @@ }, } +SUBSTREAM_MANIFEST_NO_DEPENDENCY = deepcopy(SUBSTREAM_MANIFEST) +# Disable incremental_dependency +SUBSTREAM_MANIFEST_NO_DEPENDENCY["definitions"]["post_comments_stream"]["retriever"]["partition_router"][ + "parent_stream_configs" +][0]["incremental_dependency"] = False +SUBSTREAM_MANIFEST_NO_DEPENDENCY["definitions"]["post_comment_votes_stream"]["retriever"]["partition_router"][ + "parent_stream_configs" +][0]["incremental_dependency"] = False + + def _run_read( manifest: Mapping[str, Any], @@ -273,7 +284,7 @@ def _run_read( [ ( "test_incremental_parent_state", - SUBSTREAM_MANIFEST, + SUBSTREAM_MANIFEST_NO_DEPENDENCY, [ # Fetch the first page of posts ( @@ -501,14 +512,6 @@ def test_incremental_parent_state_no_incremental_dependency( "credentials": {"email": "email", "api_token": "api_token"}, } - # Disable incremental_dependency - manifest["definitions"]["post_comments_stream"]["retriever"]["partition_router"][ - "parent_stream_configs" - ][0]["incremental_dependency"] = False - manifest["definitions"]["post_comment_votes_stream"]["retriever"]["partition_router"][ - "parent_stream_configs" - ][0]["incremental_dependency"] = False - with requests_mock.Mocker() as m: for url, response in mock_requests: m.get(url, json=response) @@ -913,6 +916,7 @@ def test_incremental_parent_state( "partition": {"id": 12, "parent_slice": {"id": 1, "parent_slice": {}}}, } additional_expected_state["states"].append(empty_state) + print(manifest) run_incremental_parent_state_test( manifest, mock_requests, @@ -1066,7 +1070,6 @@ def test_incremental_parent_state( ], # Expected state { - "use_global_cursor": False, "state": {"created_at": "2024-01-15T00:00:00Z"}, "parent_state": { "post_comments": { @@ -1292,7 +1295,6 @@ def test_incremental_parent_state_migration( # Expected state { "lookback_window": 1, - "use_global_cursor": False, "state": {"created_at": "2024-01-03T00:00:00Z"}, "parent_state": { "post_comments": { @@ -1391,7 +1393,7 @@ def test_incremental_parent_state_no_slices( ), # Fetch the first page of votes for comment 10 of post 1 ( - "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time=2024-01-03T00:00:00Z", + "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time=2024-01-02T00:00:00Z", { "votes": [], "next_page": "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&page=2&start_time=2024-01-01T00:00:01Z", @@ -1500,8 +1502,35 @@ def test_incremental_parent_state_no_slices( # Expected state { "lookback_window": 1, - "use_global_cursor": True, "state": {"created_at": "2024-01-03T00:00:00Z"}, + "states": [ + { + "partition": { + "id": 10, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": "2024-01-02T00:00:00Z"}, + }, + { + "partition": { + "id": 11, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": "2024-01-03T00:00:00Z"}, + }, + {'cursor': {'created_at': '2024-01-03T00:00:00Z'}, + 'partition': {'id': 12, + 'parent_slice': {'id': 1, 'parent_slice': {}}}}, + {'cursor': {'created_at': '2024-01-03T00:00:00Z'}, + 'partition': {'id': 20, + 'parent_slice': {'id': 2, 'parent_slice': {}}}}, + {'cursor': {'created_at': '2024-01-03T00:00:00Z'}, + 'partition': {'id': 21, + 'parent_slice': {'id': 2, 'parent_slice': {}}}}, + {'cursor': {'created_at': '2024-01-03T00:00:00Z'}, + 'partition': {'id': 30, + 'parent_slice': {'id': 3, 'parent_slice': {}}}} + ], "parent_state": { "post_comments": { "use_global_cursor": False, @@ -1554,3 +1583,258 @@ def test_incremental_parent_state_no_records( if message.state ] assert final_state[-1] == expected_state + + +@pytest.mark.parametrize( + "test_name, manifest, mock_requests, expected_records, initial_state, expected_state", + [ + ( + "test_incremental_parent_state", + SUBSTREAM_MANIFEST, + [ + # Fetch the first page of posts + ( + "https://api.example.com/community/posts?per_page=100&start_time=2024-01-05T00:00:00Z", + { + "posts": [ + {"id": 1, "updated_at": "2024-01-30T00:00:00Z"}, + {"id": 2, "updated_at": "2024-01-29T00:00:00Z"}, + ], + "next_page": "https://api.example.com/community/posts?per_page=100&start_time=2024-01-05T00:00:00Z&page=2", + }, + ), + # Fetch the second page of posts + ( + "https://api.example.com/community/posts?per_page=100&start_time=2024-01-05T00:00:00Z&page=2", + {"posts": [{"id": 3, "updated_at": "2024-01-28T00:00:00Z"}]}, + ), + # Fetch the first page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100", + { + "comments": [ + {"id": 9, "post_id": 1, "updated_at": "2023-01-01T00:00:00Z"}, + {"id": 10, "post_id": 1, "updated_at": "2024-01-25T00:00:00Z"}, + {"id": 11, "post_id": 1, "updated_at": "2024-01-24T00:00:00Z"}, + ], + "next_page": "https://api.example.com/community/posts/1/comments?per_page=100&page=2", + }, + ), + # Fetch the second page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100&page=2", + {"comments": [{"id": 12, "post_id": 1, "updated_at": "2024-01-23T00:00:00Z"}]}, + ), + # Fetch the first page of votes for comment 10 of post 1 + ( + "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time=2024-01-02T00:00:00Z", + { + "votes": [ + {"id": 100, "comment_id": 10, "created_at": "2024-01-15T00:00:00Z"} + ], + "next_page": "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&page=2&start_time=2024-01-01T00:00:01Z", + }, + ), + # Fetch the second page of votes for comment 10 of post 1 + ( + "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&page=2&start_time=2024-01-01T00:00:01Z", + { + "votes": [ + {"id": 101, "comment_id": 10, "created_at": "2024-01-14T00:00:00Z"} + ] + }, + ), + # Fetch the first page of votes for comment 11 of post 1 + ( + "https://api.example.com/community/posts/1/comments/11/votes?per_page=100&start_time=2024-01-03T00:00:00Z", + { + "votes": [ + {"id": 102, "comment_id": 11, "created_at": "2024-01-13T00:00:00Z"} + ] + }, + ), + # Fetch the first page of votes for comment 12 of post 1 + ( + "https://api.example.com/community/posts/1/comments/12/votes?per_page=100&start_time=2024-01-01T00:00:01Z", + {"votes": []}, + ), + # Fetch the first page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100", + { + "comments": [ + {"id": 20, "post_id": 2, "updated_at": "2024-01-22T00:00:00Z"} + ], + "next_page": "https://api.example.com/community/posts/2/comments?per_page=100&page=2", + }, + ), + # Fetch the second page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100&page=2", + {"comments": [{"id": 21, "post_id": 2, "updated_at": "2024-01-21T00:00:00Z"}]}, + ), + # Fetch the first page of votes for comment 20 of post 2 + ( + "https://api.example.com/community/posts/2/comments/20/votes?per_page=100&start_time=2024-01-01T00:00:01Z", + { + "status_code": 500, + "json": {"error": "Internal Server Error"}, + }, + ), + # Fetch the first page of votes for comment 21 of post 2 + ( + "https://api.example.com/community/posts/2/comments/21/votes?per_page=100&start_time=2024-01-01T00:00:01Z", + { + "votes": [ + {"id": 201, "comment_id": 21, "created_at": "2024-01-12T00:00:15Z"} + ] + }, + ), + # Fetch the first page of comments for post 3 + ( + "https://api.example.com/community/posts/3/comments?per_page=100", + {"comments": [{"id": 30, "post_id": 3, "updated_at": "2024-01-09T00:00:00Z"}]}, + ), + # Fetch the first page of votes for comment 30 of post 3 + ( + "https://api.example.com/community/posts/3/comments/30/votes?per_page=100", + { + "votes": [ + {"id": 300, "comment_id": 30, "created_at": "2024-01-10T00:00:00Z"} + ] + }, + ), + ], + # Expected records + [ + {"id": 100, "comment_id": 10, "created_at": "2024-01-15T00:00:00Z"}, + {"id": 101, "comment_id": 10, "created_at": "2024-01-14T00:00:00Z"}, + {"id": 102, "comment_id": 11, "created_at": "2024-01-13T00:00:00Z"}, + {"id": 201, "comment_id": 21, "created_at": "2024-01-12T00:00:15Z"}, + {"id": 300, "comment_id": 30, "created_at": "2024-01-10T00:00:00Z"}, + ], + # Initial state + [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor( + name="post_comment_votes", namespace=None + ), + stream_state=AirbyteStateBlob( + { + "parent_state": { + "post_comments": { + "states": [ + { + "partition": {"id": 1, "parent_slice": {}}, + "cursor": {"updated_at": "2023-01-04T00:00:00Z"}, + } + ], + "parent_state": { + "posts": {"updated_at": "2024-01-05T00:00:00Z"} + }, + } + }, + "states": [ + { + "partition": { + "id": 10, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": "2024-01-02T00:00:00Z"}, + }, + { + "partition": { + "id": 11, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": "2024-01-03T00:00:00Z"}, + }, + ], + } + ), + ), + ) + ], + # Expected state + { + "state": {"created_at": "2024-01-15T00:00:00Z"}, + "parent_state": { + "post_comments": { + "use_global_cursor": False, + "state": {"updated_at": "2024-01-25T00:00:00Z"}, + "parent_state": {"posts": {"updated_at": "2024-01-30T00:00:00Z"}}, + "lookback_window": 1, + "states": [ + { + "partition": {"id": 1, "parent_slice": {}}, + "cursor": {"updated_at": "2024-01-25T00:00:00Z"}, + }, + { + "partition": {"id": 2, "parent_slice": {}}, + "cursor": {"updated_at": "2024-01-22T00:00:00Z"}, + }, + { + "partition": {"id": 3, "parent_slice": {}}, + "cursor": {"updated_at": "2024-01-09T00:00:00Z"}, + }, + ], + } + }, + "lookback_window": 1, + "states": [ + { + "partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-15T00:00:00Z"}, + }, + { + "partition": {"id": 11, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-13T00:00:00Z"}, + }, + { + "partition": {"id": 12, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-01T00:00:01Z"}, + }, + { + "partition": {"id": 20, "parent_slice": {"id": 2, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-01T00:00:01Z"}, + }, + { + "partition": {"id": 21, "parent_slice": {"id": 2, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-12T00:00:15Z"}, + }, + { + "partition": {"id": 30, "parent_slice": {"id": 3, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-10T00:00:00Z"}, + }, + ], + }, + ), + ], +) +def test_incremental_error__parent_state( + test_name, manifest, mock_requests, expected_records, initial_state, expected_state +): + _stream_name = "post_comment_votes" + config = { + "start_date": "2024-01-01T00:00:01Z", + "credentials": {"email": "email", "api_token": "api_token"}, + } + + with requests_mock.Mocker() as m: + for url, response in mock_requests: + m.get(url, json=response) + + output = _run_read(manifest, config, _stream_name, initial_state) + output_data = [message.record.data for message in output if message.record] + + assert set(tuple(sorted(d.items())) for d in output_data) == set( + tuple(sorted(d.items())) for d in expected_records + ) + final_state = [ + orjson.loads(orjson.dumps(message.state.stream.stream_state)) + for message in output + if message.state + ] + assert final_state[-1] == expected_state