From ec7e961ea80e64f933e978ab521eba6e8c0fd86c Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk <35109939+tolik0@users.noreply.github.com> Date: Thu, 23 Jan 2025 22:28:51 +0200 Subject: [PATCH] fix(airbyte-cdk): Fix RequestOptionsProvider for PerPartitionWithGlobalCursor (#254) --- .../incremental/per_partition_cursor.py | 53 +- .../parsers/model_to_component_factory.py | 2 +- .../test_concurrent_perpartitioncursor.py | 547 ++++++++++++++++++ 3 files changed, 583 insertions(+), 19 deletions(-) diff --git a/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py index 4c1eacce5..76a16e141 100644 --- a/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py @@ -222,6 +222,8 @@ def get_request_params( next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: if stream_slice: + if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition: + self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition)) return self._partition_router.get_request_params( # type: ignore # this always returns a mapping stream_state=stream_state, stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), @@ -244,6 +246,8 @@ def get_request_headers( next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: if stream_slice: + if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition: + self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition)) return self._partition_router.get_request_headers( # type: ignore # this always returns a mapping stream_state=stream_state, stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), @@ -266,6 +270,8 @@ def get_request_body_data( next_page_token: Optional[Mapping[str, Any]] = None, ) -> Union[Mapping[str, Any], str]: if stream_slice: + if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition: + self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition)) return self._partition_router.get_request_body_data( # type: ignore # this always returns a mapping stream_state=stream_state, stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), @@ -288,6 +294,8 @@ def get_request_body_json( next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: if stream_slice: + if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition: + self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition)) return self._partition_router.get_request_body_json( # type: ignore # this always returns a mapping stream_state=stream_state, stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), @@ -303,21 +311,6 @@ def get_request_body_json( raise ValueError("A partition needs to be provided in order to get request body json") def should_be_synced(self, record: Record) -> bool: - if ( - record.associated_slice - and self._to_partition_key(record.associated_slice.partition) - not in self._cursor_per_partition - ): - partition_state = ( - self._state_to_migrate_from - if self._state_to_migrate_from - else self._NO_CURSOR_STATE - ) - cursor = self._create_cursor(partition_state) - - self._cursor_per_partition[ - self._to_partition_key(record.associated_slice.partition) - ] = cursor return self._get_cursor(record).should_be_synced( self._convert_record_to_cursor_record(record) ) @@ -356,8 +349,32 @@ def _get_cursor(self, record: Record) -> DeclarativeCursor: ) partition_key = self._to_partition_key(record.associated_slice.partition) if partition_key not in self._cursor_per_partition: - raise ValueError( - "Invalid state as stream slices that are emitted should refer to an existing cursor" - ) + self._create_cursor_for_partition(partition_key) cursor = self._cursor_per_partition[partition_key] return cursor + + def _create_cursor_for_partition(self, partition_key: str) -> None: + """ + Dynamically creates and initializes a cursor for the specified partition. + + This method is required for `ConcurrentPerPartitionCursor`. For concurrent cursors, + stream_slices is executed only for the concurrent cursor, so cursors per partition + are not created for the declarative cursor. This method ensures that a cursor is available + to create requests for the specified partition. The cursor is initialized + with the per-partition state if present in the initial state, or with the global state + adjusted by the lookback window, or with the state to migrate from. + + Note: + This is a temporary workaround and should be removed once the declarative cursor + is decoupled from the concurrent cursor implementation. + + Args: + partition_key (str): The unique identifier for the partition for which the cursor + needs to be created. + """ + partition_state = ( + self._state_to_migrate_from if self._state_to_migrate_from else self._NO_CURSOR_STATE + ) + cursor = self._create_cursor(partition_state) + + self._cursor_per_partition[partition_key] = cursor diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 905e31748..3fd301947 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2386,7 +2386,7 @@ def create_simple_retriever( if ( not isinstance(stream_slicer, DatetimeBasedCursor) or type(stream_slicer) is not DatetimeBasedCursor - ) and not isinstance(stream_slicer, PerPartitionWithGlobalCursor): + ): # Many of the custom component implementations of DatetimeBasedCursor override get_request_params() (or other methods). # Because we're decoupling RequestOptionsProvider from the Cursor, custom components will eventually need to reimplement # their own RequestOptionsProvider. However, right now the existing StreamSlicer/Cursor still can act as the SimpleRetriever's diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index 8862600d5..d1502c218 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -2326,3 +2326,550 @@ def test_incremental_error( expected_records, expected_state, ) + + +SUBSTREAM_REQUEST_OPTIONS_MANIFEST: MutableMapping[str, Any] = { + "version": "0.51.42", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["post_comment_votes"]}, + "definitions": { + "basic_authenticator": { + "type": "BasicHttpAuthenticator", + "username": "{{ config['credentials']['email'] + '/token' }}", + "password": "{{ config['credentials']['api_token'] }}", + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.example.com", + "http_method": "GET", + "authenticator": "#/definitions/basic_authenticator", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": { + "type": "DpathExtractor", + "field_path": ["{{ parameters.get('data_path') or parameters['name'] }}"], + }, + "schema_normalization": "Default", + }, + "paginator": { + "type": "DefaultPaginator", + "page_size_option": { + "type": "RequestOption", + "field_name": "per_page", + "inject_into": "request_parameter", + }, + "pagination_strategy": { + "type": "CursorPagination", + "page_size": 100, + "cursor_value": "{{ response.get('next_page', {}) }}", + "stop_condition": "{{ not response.get('next_page', {}) }}", + }, + "page_token_option": {"type": "RequestPath"}, + }, + }, + "cursor_incremental_sync": { + "type": "DatetimeBasedCursor", + "cursor_datetime_formats": ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z"], + "datetime_format": "%Y-%m-%dT%H:%M:%SZ", + "cursor_field": "{{ parameters.get('cursor_field', 'updated_at') }}", + "start_datetime": {"datetime": "{{ config.get('start_date')}}"}, + "start_time_option": { + "inject_into": "request_parameter", + "field_name": "start_time", + "type": "RequestOption", + }, + }, + "posts_stream": { + "type": "DeclarativeStream", + "name": "posts", + "primary_key": ["id"], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "id": {"type": "integer"}, + "updated_at": {"type": "string", "format": "date-time"}, + "title": {"type": "string"}, + "content": {"type": "string"}, + }, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.example.com", + "path": "/community/posts", + "http_method": "GET", + "authenticator": "#/definitions/basic_authenticator", + }, + "record_selector": "#/definitions/retriever/record_selector", + "paginator": "#/definitions/retriever/paginator", + }, + "incremental_sync": "#/definitions/cursor_incremental_sync", + "$parameters": { + "name": "posts", + "path": "community/posts", + "data_path": "posts", + "cursor_field": "updated_at", + "primary_key": "id", + }, + }, + "post_comments_stream": { + "type": "DeclarativeStream", + "name": "post_comments", + "primary_key": ["id"], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "id": {"type": "integer"}, + "updated_at": {"type": "string", "format": "date-time"}, + "post_id": {"type": "integer"}, + "comment": {"type": "string"}, + }, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.example.com", + "path": "/community/posts_comments", + "http_method": "GET", + "authenticator": "#/definitions/basic_authenticator", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": ["comments"]}, + "record_filter": { + "condition": "{{ record['updated_at'] >= stream_state.get('updated_at', config.get('start_date')) }}" + }, + }, + "paginator": "#/definitions/retriever/paginator", + "partition_router": { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + { + "stream": "#/definitions/posts_stream", + "parent_key": "id", + "partition_field": "id", + "incremental_dependency": True, + "request_option": { + "inject_into": "request_parameter", + "type": "RequestOption", + "field_name": "post_id", + }, + } + ], + }, + }, + "incremental_sync": { + "type": "DatetimeBasedCursor", + "cursor_datetime_formats": ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z"], + "datetime_format": "%Y-%m-%dT%H:%M:%SZ", + "cursor_field": "{{ parameters.get('cursor_field', 'updated_at') }}", + "start_datetime": {"datetime": "{{ config.get('start_date') }}"}, + }, + "$parameters": { + "name": "post_comments", + "path": "community/posts_comments", + "data_path": "comments", + "cursor_field": "updated_at", + "primary_key": "id", + }, + }, + "post_comment_votes_stream": { + "type": "DeclarativeStream", + "name": "post_comment_votes", + "primary_key": ["id"], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "id": {"type": "integer"}, + "created_at": {"type": "string", "format": "date-time"}, + "comment_id": {"type": "integer"}, + "vote": {"type": "number"}, + }, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.example.com", + "path": "/community/posts_comments_votes", + "http_method": "GET", + "authenticator": "#/definitions/basic_authenticator", + }, + "record_selector": "#/definitions/retriever/record_selector", + "paginator": "#/definitions/retriever/paginator", + "partition_router": { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + { + "stream": "#/definitions/post_comments_stream", + "parent_key": "id", + "partition_field": "id", + "incremental_dependency": True, + "extra_fields": [["updated_at"]], + "request_option": { + "inject_into": "request_parameter", + "type": "RequestOption", + "field_name": "comment_id", + }, + } + ], + }, + }, + "transformations": [ + { + "type": "AddFields", + "fields": [ + { + "path": ["comment_updated_at"], + "value_type": "string", + "value": "{{ stream_slice.extra_fields['updated_at'] }}", + }, + ], + }, + ], + "incremental_sync": "#/definitions/cursor_incremental_sync", + "$parameters": { + "name": "post_comment_votes", + "path": "community/posts_comments_votes", + "data_path": "votes", + "cursor_field": "created_at", + "primary_key": "id", + }, + }, + }, + "streams": [ + {"$ref": "#/definitions/posts_stream"}, + {"$ref": "#/definitions/post_comments_stream"}, + {"$ref": "#/definitions/post_comment_votes_stream"}, + ], + "concurrency_level": { + "type": "ConcurrencyLevel", + "default_concurrency": "{{ config['num_workers'] or 10 }}", + "max_concurrency": 25, + }, + "spec": { + "type": "Spec", + "documentation_url": "https://airbyte.com/#yaml-from-manifest", + "connection_specification": { + "title": "Test Spec", + "type": "object", + "required": ["credentials", "start_date"], + "additionalProperties": False, + "properties": { + "credentials": { + "type": "object", + "required": ["email", "api_token"], + "properties": { + "email": { + "type": "string", + "title": "Email", + "description": "The email for authentication.", + }, + "api_token": { + "type": "string", + "airbyte_secret": True, + "title": "API Token", + "description": "The API token for authentication.", + }, + }, + }, + "start_date": { + "type": "string", + "format": "date-time", + "title": "Start Date", + "description": "The date from which to start syncing data.", + }, + }, + }, + }, +} + + +@pytest.mark.parametrize( + "test_name, manifest, mock_requests, expected_records, initial_state, expected_state", + [ + ( + "test_incremental_parent_state", + SUBSTREAM_REQUEST_OPTIONS_MANIFEST, + [ + # Fetch the first page of posts + ( + f"https://api.example.com/community/posts?per_page=100&start_time={PARENT_POSTS_CURSOR}", + { + "posts": [ + {"id": 1, "updated_at": POST_1_UPDATED_AT}, + {"id": 2, "updated_at": POST_2_UPDATED_AT}, + ], + "next_page": ( + f"https://api.example.com/community/posts" + f"?per_page=100&start_time={PARENT_POSTS_CURSOR}&page=2" + ), + }, + ), + # Fetch the second page of posts + ( + f"https://api.example.com/community/posts?per_page=100&start_time={PARENT_POSTS_CURSOR}&page=2", + {"posts": [{"id": 3, "updated_at": POST_3_UPDATED_AT}]}, + ), + # Fetch the first page of comments for post 1 + ( + "https://api.example.com/community/posts_comments?per_page=100&post_id=1", + { + "comments": [ + { + "id": 9, + "post_id": 1, + "updated_at": COMMENT_9_OLDEST, + }, + { + "id": 10, + "post_id": 1, + "updated_at": COMMENT_10_UPDATED_AT, + }, + { + "id": 11, + "post_id": 1, + "updated_at": COMMENT_11_UPDATED_AT, + }, + ], + "next_page": "https://api.example.com/community/posts_comments?per_page=100&post_id=1&page=2", + }, + ), + # Fetch the second page of comments for post 1 + ( + "https://api.example.com/community/posts_comments?per_page=100&post_id=1&page=2", + {"comments": [{"id": 12, "post_id": 1, "updated_at": COMMENT_12_UPDATED_AT}]}, + ), + # Fetch the first page of votes for comment 10 of post 1 + ( + f"https://api.example.com/community/posts_comments_votes?per_page=100&comment_id=10&start_time={INITIAL_STATE_PARTITION_10_CURSOR}", + { + "votes": [ + { + "id": 100, + "comment_id": 10, + "created_at": VOTE_100_CREATED_AT, + } + ], + "next_page": ( + f"https://api.example.com/community/posts_comments_votes" + f"?per_page=100&page=2&comment_id=10&start_time={INITIAL_STATE_PARTITION_10_CURSOR}" + ), + }, + ), + # Fetch the second page of votes for comment 10 of post 1 + ( + f"https://api.example.com/community/posts_comments_votes" + f"?per_page=100&page=2&comment_id=10&start_time={INITIAL_STATE_PARTITION_10_CURSOR}", + {"votes": [{"id": 101, "comment_id": 10, "created_at": VOTE_101_CREATED_AT}]}, + ), + # Fetch the first page of votes for comment 11 of post 1 + ( + f"https://api.example.com/community/posts_comments_votes" + f"?per_page=100&comment_id=11&start_time={INITIAL_STATE_PARTITION_11_CURSOR}", + {"votes": [{"id": 111, "comment_id": 11, "created_at": VOTE_111_CREATED_AT}]}, + ), + # Fetch the first page of votes for comment 12 of post 1 + ( + f"https://api.example.com/community/posts_comments_votes?" + f"per_page=100&comment_id=12&start_time={LOOKBACK_DATE}", + {"votes": []}, + ), + # Fetch the first page of comments for post 2 + ( + "https://api.example.com/community/posts_comments?per_page=100&post_id=2", + { + "comments": [{"id": 20, "post_id": 2, "updated_at": COMMENT_20_UPDATED_AT}], + "next_page": "https://api.example.com/community/posts_comments?per_page=100&post_id=2&page=2", + }, + ), + # Fetch the second page of comments for post 2 + ( + "https://api.example.com/community/posts_comments?per_page=100&post_id=2&page=2", + {"comments": [{"id": 21, "post_id": 2, "updated_at": COMMENT_21_UPDATED_AT}]}, + ), + # Fetch the first page of votes for comment 20 of post 2 + ( + f"https://api.example.com/community/posts_comments_votes" + f"?per_page=100&comment_id=20&start_time={LOOKBACK_DATE}", + {"votes": [{"id": 200, "comment_id": 20, "created_at": VOTE_200_CREATED_AT}]}, + ), + # Fetch the first page of votes for comment 21 of post 2 + ( + f"https://api.example.com/community/posts_comments_votes?" + f"per_page=100&comment_id=21&start_time={LOOKBACK_DATE}", + {"votes": [{"id": 210, "comment_id": 21, "created_at": VOTE_210_CREATED_AT}]}, + ), + # Fetch the first page of comments for post 3 + ( + "https://api.example.com/community/posts_comments?per_page=100&post_id=3", + {"comments": [{"id": 30, "post_id": 3, "updated_at": COMMENT_30_UPDATED_AT}]}, + ), + # Fetch the first page of votes for comment 30 of post 3 + ( + f"https://api.example.com/community/posts_comments_votes?" + f"per_page=100&comment_id=30&start_time={LOOKBACK_DATE}", + {"votes": [{"id": 300, "comment_id": 30, "created_at": VOTE_300_CREATED_AT}]}, + ), + ], + # Expected records + [ + { + "comment_id": 10, + "comment_updated_at": COMMENT_10_UPDATED_AT, + "created_at": VOTE_100_CREATED_AT, + "id": 100, + }, + { + "comment_id": 10, + "comment_updated_at": COMMENT_10_UPDATED_AT, + "created_at": VOTE_101_CREATED_AT, + "id": 101, + }, + { + "comment_id": 11, + "comment_updated_at": COMMENT_11_UPDATED_AT, + "created_at": VOTE_111_CREATED_AT, + "id": 111, + }, + { + "comment_id": 20, + "comment_updated_at": COMMENT_20_UPDATED_AT, + "created_at": VOTE_200_CREATED_AT, + "id": 200, + }, + { + "comment_id": 21, + "comment_updated_at": COMMENT_21_UPDATED_AT, + "created_at": VOTE_210_CREATED_AT, + "id": 210, + }, + { + "comment_id": 30, + "comment_updated_at": COMMENT_30_UPDATED_AT, + "created_at": VOTE_300_CREATED_AT, + "id": 300, + }, + ], + # Initial state + { + "parent_state": { + "post_comments": { + "states": [ + { + "partition": {"id": 1, "parent_slice": {}}, + "cursor": {"updated_at": PARENT_COMMENT_CURSOR_PARTITION_1}, + } + ], + "parent_state": {"posts": {"updated_at": PARENT_POSTS_CURSOR}}, + } + }, + "state": {"created_at": INITIAL_GLOBAL_CURSOR}, + "states": [ + { + "partition": { + "id": 10, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR}, + }, + { + "partition": { + "id": 11, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR}, + }, + ], + "lookback_window": 86400, + }, + # Expected state + { + "state": {"created_at": VOTE_100_CREATED_AT}, + "parent_state": { + "post_comments": { + "use_global_cursor": False, + "state": {"updated_at": COMMENT_10_UPDATED_AT}, # 10 is the "latest" + "parent_state": { + "posts": {"updated_at": POST_1_UPDATED_AT} + }, # post 1 is the latest + "lookback_window": 1, + "states": [ + { + "partition": {"id": 1, "parent_slice": {}}, + "cursor": {"updated_at": COMMENT_10_UPDATED_AT}, + }, + { + "partition": {"id": 2, "parent_slice": {}}, + "cursor": {"updated_at": COMMENT_20_UPDATED_AT}, + }, + { + "partition": {"id": 3, "parent_slice": {}}, + "cursor": {"updated_at": COMMENT_30_UPDATED_AT}, + }, + ], + } + }, + "lookback_window": 1, + "states": [ + { + "partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": VOTE_100_CREATED_AT}, + }, + { + "partition": {"id": 11, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": VOTE_111_CREATED_AT}, + }, + { + "partition": {"id": 12, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": LOOKBACK_DATE}, + }, + { + "partition": {"id": 20, "parent_slice": {"id": 2, "parent_slice": {}}}, + "cursor": {"created_at": VOTE_200_CREATED_AT}, + }, + { + "partition": {"id": 21, "parent_slice": {"id": 2, "parent_slice": {}}}, + "cursor": {"created_at": VOTE_210_CREATED_AT}, + }, + { + "partition": {"id": 30, "parent_slice": {"id": 3, "parent_slice": {}}}, + "cursor": {"created_at": VOTE_300_CREATED_AT}, + }, + ], + }, + ), + ], +) +def test_incremental_substream_request_options_provider( + test_name, manifest, mock_requests, expected_records, initial_state, expected_state +): + """ + Test incremental syncing for a stream that uses request options provider from parent stream config. + """ + run_mocked_test( + mock_requests, + manifest, + CONFIG, + "post_comment_votes", + initial_state, + expected_records, + expected_state, + )