-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(low-code concurrent): Allow async job low-code streams that are incremental to be run by the concurrent framework #228
feat(low-code concurrent): Allow async job low-code streams that are incremental to be run by the concurrent framework #228
Conversation
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
…d some more tests to concurrent_declarative_source
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
📝 WalkthroughWalkthroughThe pull request introduces modifications to the Airbyte CDK's declarative source components, focusing on enhancing concurrent stream processing and state management. The changes primarily affect the Changes
Possibly related issues
Possibly related PRs
Suggested reviewersWdyt? Would you like to dive deeper into any specific aspect of these changes? 😊 ✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
70-73
: Consider addressing the TODO comment on state initializationThere's a
TODO
comment suggesting that state may no longer need to be stored during initialization. Should we explore removing the state parameter now, or is there an edge case that still requires it? WDYT?airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
502-502
: Update typing and documentation forconnector_state_manager
We've added
connector_state_manager
as an optional parameter. Should we update the type hints and documentation to reflect its usage within the class, ensuring clarity for future maintenance? WDYT?airbyte_cdk/sources/declarative/retrievers/async_retriever.py (1)
78-78
: Clarify the error message in_validate_and_get_stream_slice_partition
The error message could be more precise. Instead of stating "
stream_slice
is not optional," perhaps we can say "stream_slice
must be provided and contain a 'partition' key." Would this make the error clearer? WDYT?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(5 hunks)airbyte_cdk/sources/declarative/manifest_declarative_source.py
(0 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(4 hunks)airbyte_cdk/sources/declarative/retrievers/async_retriever.py
(1 hunks)airbyte_cdk/sources/types.py
(2 hunks)airbyte_cdk/utils/slice_hasher.py
(1 hunks)unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
(2 hunks)unit_tests/sources/declarative/test_concurrent_declarative_source.py
(3 hunks)
💤 Files with no reviewable changes (1)
- airbyte_cdk/sources/declarative/manifest_declarative_source.py
👮 Files not reviewed due to content moderation or server errors (4)
- airbyte_cdk/sources/types.py
- unit_tests/sources/declarative/test_concurrent_declarative_source.py
- unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
- airbyte_cdk/utils/slice_hasher.py
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
81-81
: Verify the impact of passingconnector_state_manager
By passing
self._connector_state_manager
to theModelToComponentFactory
, we're centralizing state management. Can we ensure that all dependent components are updated accordingly to handle this change? WDYT?airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
1609-1624
: Reconsider returningConcurrentCursor
fromcreate_concurrent_cursor_from_datetime_based_cursor
Returning a
ConcurrentCursor
may introduce typing inconsistencies since it doesn't implement the same interface as the low-codeStreamSlicer
. Should we address this discrepancy to ensure type safety and maintainability? WDYT?
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)
70-72
: Consider improving type annotation for state management.The TODO comment provides good context about potential future improvements. However, the type ignore comment suggests we could improve type safety. WDYT about creating a proper type for the state parameter to avoid the type ignore?
- self._connector_state_manager = ConnectorStateManager(state=state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later + self._connector_state_manager = ConnectorStateManager[TState](state=state) # The generic type ensures type safety at compile time
231-234
: Consider adding defensive programming for state retrieval.The state retrieval looks good, but what if the stream state is None? WDYT about adding a defensive check?
stream_state = self._connector_state_manager.get_stream_state( stream_name=declarative_stream.name, namespace=declarative_stream.namespace, ) + if stream_state is not None: retriever.cursor.set_initial_state(stream_state=stream_state) + else: + self.logger.debug(f"No state found for stream {declarative_stream.name}")
251-278
: Consider extracting partition generator creation logic.The partition generator setup is quite complex and repeated. What do you think about extracting this into a separate method to improve maintainability and reduce code duplication? Something like:
def _create_partition_generator( self, stream_name: str, json_schema: dict, retriever: Any, stream_slicer: Any ) -> StreamSlicerPartitionGenerator: return StreamSlicerPartitionGenerator( partition_factory=DeclarativePartitionFactory( stream_name, json_schema, retriever, self.message_repository, ), stream_slicer=stream_slicer, )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(5 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (4)
34-34
: LGTM! Clean import additions.The new imports for
AsyncJobPartitionRouter
andAsyncRetriever
are well-organized and necessary for the enhanced concurrent processing functionality.Also applies to: 36-36
73-81
: LGTM! Good component factory configuration.The component factory initialization is well-structured with clear comments explaining the rationale behind disabling resumable full refresh for concurrent sources.
240-249
: Fix formatting in the error message.There's a missing space in the error message, causing
received{cursor.__class__}
to be concatenated without a space.Apply this diff to fix the formatting:
- f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}" + f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received {cursor.__class__}"
349-352
: LGTM! Good type validation extension.The addition of
AsyncJobPartitionRouter
to the type validation is well-integrated and maintains consistency with the new async support.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i added the various fixes noted which should have this in a good state to merge. Approving since the fixes I pushed were what I would have commented to address
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
250-251
:⚠️ Potential issueFix formatting in the error message
There's a missing space in the error message, causing
received{cursor.__class__}
to be concatenated without a space.Apply this diff to fix the formatting:
- f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}" + f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received {cursor.__class__}"
🧹 Nitpick comments (6)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)
3104-3116
: Consider using a helper method for creating the state message.The state message construction could be extracted into a helper method to improve readability and reusability across tests, wdyt?
def create_state_message(stream_name: str, stream_state: dict) -> AirbyteStateMessage: return AirbyteStateMessage( type=AirbyteStateType.STREAM, stream=AirbyteStreamState( stream_descriptor=StreamDescriptor(name=stream_name), stream_state=AirbyteStateBlob(stream_state), ), )airbyte_cdk/sources/declarative/concurrent_declarative_source.py (5)
73-75
: Consider improving the TODO comment.The TODO comment suggests that state could be removed from initialization, but it's not clear about the potential edge cases. Would it be helpful to document these edge cases in the comment to help future maintainers understand why we might need to keep it? WDYT?
Line range hint
1681-1689
: Consider adding more context to the comments.The comments explain the current limitations well, but could we add more context about when these limitations might be lifted? For example, what specific concurrent cursor support or CDK implementation would enable these features? WDYT?
73-75
: Consider removing the TODO commentThe comment suggests that state could be removed from initialization, but notes there might be edge cases. Since this is a public-facing change, should we either address the TODO or document the edge cases that prevent its removal? WDYT?
223-242
: Verify the cursor nullification approachThe code nullifies the cursor after state initialization to avoid duplicate state management. While this works, it feels a bit hacky. Would it make more sense to refactor this to avoid the need for nullification? For example, could we introduce a flag to disable state management in the low-code framework instead? WDYT?
Line range hint
1681-1690
: Consider adding more detailed error messagesThe error messages for unsupported configurations could be more helpful. Would it be useful to include:
- What cursor types are currently supported
- When per partition support is expected
- Links to relevant documentation
WDYT?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(7 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(5 hunks)unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
(2 hunks)unit_tests/sources/declarative/test_concurrent_declarative_source.py
(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- unit_tests/sources/declarative/test_concurrent_declarative_source.py
👮 Files not reviewed due to content moderation or server errors (1)
- airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
🔇 Additional comments (10)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)
3118-3120
: LGTM! Clear factory initialization with state management.The factory initialization with state management is well-structured and the parameters are clear.
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (7)
37-37
: LGTM! New imports look good.The addition of
AsyncJobPartitionRouter
andAsyncRetriever
imports aligns with the PR objectives to support async job low-code streams.Also applies to: 39-39
223-242
: LGTM! Smart optimization for cursor and state management.The code optimizes state management by bypassing low-code cursor flows when using ConcurrentCursor. The comments effectively explain the rationale behind this optimization.
250-251
: Fix formatting in the error message.There's a missing space in the error message, causing
received{cursor.__class__}
to be concatenated without a space.Apply this diff to fix the formatting:
- f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}" + f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received {cursor.__class__}"
37-37
: LGTM: New imports for async job supportThe addition of
AsyncJobPartitionRouter
andAsyncRetriever
imports aligns with the PR's objective of enabling async job low-code streams.Also applies to: 39-39
405-408
: LGTM: Extended stream slicer type checkThe addition of
AsyncJobPartitionRouter
to the type check in_is_datetime_incremental_without_partition_routing
is correct and aligns with the new functionality.
Line range hint
506-506
: LGTM: Added connector state manager parameterThe addition of the optional
connector_state_manager
parameter with a default fallback to a new instance is a clean approach.Also applies to: 518-518
Line range hint
936-937
: LGTM: Simplified state retrievalUsing
_connector_state_manager.get_stream_state
directly is cleaner than passing state through parameters.airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
506-506
: LGTM! Good addition of connector_state_manager.The addition of the optional connector_state_manager parameter with a sensible default improves the flexibility of the factory class.
Also applies to: 518-518
936-937
: LGTM! Proper state retrieval.The code correctly retrieves stream state using the connector_state_manager, which aligns with the state management improvements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)
73-75
: Consider adding more detailed documentation for the state management change.The TODO comment suggests that state initialization could be removed, but it might be worth documenting why we're keeping it for now and what edge cases we're concerned about, wdyt?
- # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source - # no longer needs to store the original incoming state. But maybe there's an edge case? + # TODO: State initialization could potentially be removed since streams are grouped during read(). + # However, we're keeping it for now to: + # 1. Handle potential edge cases with state management + # 2. Maintain backward compatibility + # 3. Support legacy Python streams that might depend on this behavior
465-467
: Consider adding type hints for better code maintainability.The
_get_retriever
method could benefit from more specific type hints for better code maintainability and IDE support, wdyt?@staticmethod def _get_retriever( - declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any] + declarative_stream: DeclarativeStream, + stream_state: Mapping[str, Any], ) -> Retriever:
393-396
: Consider extracting complex condition into a helper method.The condition in
_is_datetime_incremental_without_partition_routing
is getting more complex with the addition ofAsyncJobPartitionRouter
. Would it be clearer to extract this into a helper method, wdyt?+ def _is_valid_stream_slicer(self, stream_slicer: Any) -> bool: + return ( + isinstance(stream_slicer, DatetimeBasedCursor) + or isinstance(stream_slicer, AsyncJobPartitionRouter) + ) + def _is_datetime_incremental_without_partition_routing( self, declarative_stream: DeclarativeStream, incremental_sync_component_definition: Mapping[str, Any] | None, ) -> bool: return ( incremental_sync_component_definition is not None and bool(incremental_sync_component_definition) and incremental_sync_component_definition.get("type", "") == DatetimeBasedCursorModel.__name__ and self._stream_supports_concurrent_partition_processing(declarative_stream=declarative_stream) and hasattr(declarative_stream.retriever, "stream_slicer") - and ( - isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor) - or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter) - ) + and self._is_valid_stream_slicer(declarative_stream.retriever.stream_slicer) )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(7 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
229-239
: Fix the error message formatting.There's a missing space in the error message, causing
received{cursor.__class__}
to be concatenated without a space.- f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}" + f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received {cursor.__class__}"
223-225
: Consider adding error handling for stream state retrieval.The
get_stream_state
calls might benefit from error handling to gracefully handle cases where the stream state is not found or malformed, wdyt?Also applies to: 330-332
@maxi297 - FYI, I'm updating from |
/autofix
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
Line range hint
510-522
: Consider adding type hints and docstring updates?The new
connector_state_manager
parameter could benefit from:
- A docstring explaining its purpose and default behavior
- Moving the type hint from the parameter to the instance variable
What do you think about adding something like this?
def __init__( self, limit_pages_fetched_per_slice: Optional[int] = None, limit_slices_fetched: Optional[int] = None, emit_connector_builder_messages: bool = False, disable_retries: bool = False, disable_cache: bool = False, disable_resumable_full_refresh: bool = False, message_repository: Optional[MessageRepository] = None, connector_state_manager: Optional[ConnectorStateManager] = None, ): + """Initialize the ModelToComponentFactory. + + Args: + connector_state_manager: Optional manager for handling connector state. + If not provided, an empty state manager will be used. + """ self._init_mappings() self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice self._limit_slices_fetched = limit_slices_fetched self._emit_connector_builder_messages = emit_connector_builder_messages self._disable_retries = disable_retries self._disable_cache = disable_cache self._disable_resumable_full_refresh = disable_resumable_full_refresh self._message_repository = message_repository or InMemoryMessageRepository( self._evaluate_log_level(emit_connector_builder_messages) ) - self._connector_state_manager = connector_state_manager or ConnectorStateManager() + self._connector_state_manager: ConnectorStateManager = connector_state_manager or ConnectorStateManager()
1693-1708
: Consider making the error messages more actionable?The validation logic is good, but the error messages could be more helpful by suggesting what the user should do instead. What do you think about something like this?
- raise ValueError( - "AsyncRetriever with cursor other than DatetimeBasedCursor is not supported yet" - ) + raise ValueError( + "AsyncRetriever currently only supports DatetimeBasedCursor. " + f"Found cursor of type {type(model.incremental_sync).__name__}. " + "Please use DatetimeBasedCursor or wait for support of additional cursor types." + ) - raise ValueError("Per partition state is not supported yet for AsyncRetriever") + raise ValueError( + "AsyncRetriever does not support partition routers yet. " + "Please remove the partition_router configuration or wait for the feature to be implemented." + )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(5 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
941-948
: LGTM! Clear comments and good state handling.The logic for handling stream state is well-documented and correctly implemented. The comments explain the purpose of the state retrieval logic clearly.
What
Updates the concurrent declarative source so that async retrievers w/ incremental components will be properly run within the concurrent framework and that it still checkpoints correctly
Note that this change might impact @tolik0 work here
How
Modifies the
model_to_component_factory.py
so that when creating the runtime components, the right cursors are instantiated. Theconcurrent_declarative_source.py
also needs to be changed so that the correct cursor and partition routers are assigned.Summary by CodeRabbit
New Features
Bug Fixes
Refactor
Tests