-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(concurrency): support failed on http cache write #115
base: main
Are you sure you want to change the base?
Conversation
📝 Walkthrough📝 WalkthroughWalkthroughThe changes in this pull request enhance the Changes
Possibly related PRs
Suggested labels
Suggested reviewers
Wdyt about the changes? Do you think the new error handling will cover all edge cases? Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (5)
airbyte_cdk/sources/streams/http/http_client.py (3)
146-148
: Question about initializingCachedLimiterSession
with backendShould we consider passing only the
backend
parameter when initializingCachedLimiterSession
, instead of bothsqlite_path
andbackend
, to avoid potential redundancy or conflicts? Wdyt?
525-529
: Should we handle specific exceptions in_write
method?In the
_write
method ofSkipFailureSQLiteDict
, we're catching all exceptions and logging a warning. Would it be better to catch specific exceptions (e.g.,OperationalError
) that we expect during cache writes, to avoid suppressing unexpected errors? Wdyt?
529-529
: Consider adding more context to the logged warningWhen logging the exception in
_write
, should we include additional context like thekey
being written to aid in debugging? Wdyt?unit_tests/sources/streams/http/test_http_client.py (2)
749-751
: Use a more robust check for detectingINSERT
operationsIn
RaiseOnInsertConnection.execute
, we're checking if"INSERT"
is instr(args)
. Would it be more reliable to inspect the SQL query directly or use parameterized queries to determine if it's anINSERT
operation? Wdyt?
754-764
: Should we assert that a warning is logged during cache failureIn the test
test_given_cache_save_failure_then_do_not_break
, would it be beneficial to assert that a warning was logged when the cache save fails, to ensure that the failure is handled and logged as expected? Wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
airbyte_cdk/sources/streams/http/http_client.py
(3 hunks)unit_tests/sources/streams/http/test_http_client.py
(2 hunks)
🔇 Additional comments (1)
airbyte_cdk/sources/streams/http/http_client.py (1)
57-57
: LGTM!
The logger is appropriately instantiated for module-level use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (5)
airbyte_cdk/sources/streams/http/http_client.py (3)
146-148
: Consider adding a comment explaining the cache backend choice?The switch to
SkipFailureSQLiteCache
is a good improvement for handling cache failures gracefully. Would you consider adding a brief comment explaining why this specific backend was chosen? Something like:+# Use SkipFailureSQLiteCache to prevent cache write failures from breaking the HTTP client backend = SkipFailureSQLiteCache(sqlite_path)
wdyt?
524-536
: Consider enhancing error handling and type hints?The implementation looks good! A few suggestions to make it even better:
- Would you consider adding more specific error messages? For example:
- logger.warning(f"Error while retrieving item from cache: {exception}") + logger.warning(f"Cache retrieval failed for key '{key}': {exception}")
- How about adding type hints to match the parent class?
- def __getitem__(self, key): + def __getitem__(self, key: str) -> Any:
539-557
: Add class documentation?The implementation looks solid! Would you consider adding a docstring to explain the purpose and behavior of this class? Something like:
class SkipFailureSQLiteCache(requests_cache.backends.sqlite.SQLiteCache): + """SQLite cache implementation that gracefully handles cache operation failures. + + This cache implementation logs warnings instead of raising exceptions when cache + operations fail, ensuring that the HTTP client continues to function even when + caching is not possible. + """unit_tests/sources/streams/http/test_http_client.py (2)
748-752
: Add test helper documentation?The test helper looks good! Would you consider adding a docstring to explain its purpose? Something like:
class RaiseOnInsertConnection: + """Test helper that simulates SQLite database lock errors during INSERT operations. + Used to verify that the HTTP client handles cache write failures gracefully. + """
754-769
: Enhance test coverage with additional assertions?The test looks good! Would you consider adding a few more assertions to make it even more thorough? For example:
assert response.json() +# Verify that the request was made despite cache failure +assert requests_mock.called +# Verify that the response matches expected data +assert response.json() == {"test": "response"}
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
airbyte_cdk/sources/streams/http/http_client.py
(3 hunks)unit_tests/sources/streams/http/test_http_client.py
(2 hunks)
🔇 Additional comments (1)
airbyte_cdk/sources/streams/http/http_client.py (1)
57-57
: LGTM: Logger initialization looks good!
The logger namespace "airbyte" aligns well with the codebase structure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (1)
airbyte_cdk/sources/streams/http/http_client.py (1)
542-560
: Add documentation and improve typingThe implementation looks solid! Would you consider adding:
- A docstring explaining the purpose and behavior of this cache implementation?
- Explicit return type annotation for the
__init__
method?Here's a suggestion:
+from typing import Optional, Any + class SkipFailureSQLiteCache(requests_cache.backends.sqlite.SQLiteCache): + """ + A SQLiteCache implementation that gracefully handles cache operation failures. + + This cache implementation logs warnings instead of raising exceptions when cache + operations fail, ensuring that the application continues to function even when + cache writes encounter issues. + """ + def __init__( # type: ignore # ignoring as lib is not typed self, - db_path="http_cache", - serializer=None, + db_path: str = "http_cache", + serializer: Optional[Any] = None, **kwargs, ) -> None:
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
airbyte_cdk/sources/streams/http/http_client.py
(3 hunks)
🔇 Additional comments (1)
airbyte_cdk/sources/streams/http/http_client.py (1)
146-148
: Consider potential concurrent access issues
The sqlite_path
is used both for backend creation and session creation. Have you verified how this handles concurrent access scenarios? wdyt about adding a test case for concurrent access to ensure it behaves as expected?
✅ Verification successful
SQLite concurrent access appears to be handled by the underlying library
I see that requests_cache
already handles concurrent access through its implementation - there's a lock
parameter being used in the codebase. The SQLite backend from requests_cache
is designed to handle concurrent access scenarios. The existing tests also cover SQLite error cases, though they focus on write errors rather than concurrency.
Since the concurrent access is handled by the underlying library and we're using it as intended, there's no immediate concern here. However, wdyt about adding a note in the docstring to make it explicit that it's safe for concurrent use? This would help future maintainers understand the design choice better.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check for existing concurrent access tests
rg -A 5 "concurrent|parallel|threading|multiprocess" "unit_tests/sources/streams/http/test_http_client.py"
Length of output: 106
Script:
#!/bin/bash
# Let's check if there are any tests related to SQLite caching
rg -A 5 "SQLite|sqlite" "unit_tests/sources/streams/http/test_http_client.py"
# Also check the implementation for any concurrent access handling
rg -A 5 "lock|Lock|concurrent|thread" "airbyte_cdk/sources/streams/http/http_client.py"
# Let's also check if SQLiteDict itself has any concurrent access handling
ast-grep --pattern 'class SkipFailureSQLiteDict($$$)'
Length of output: 1554
class SkipFailureSQLiteDict(requests_cache.backends.sqlite.SQLiteDict): | ||
def __getitem__(self, key): # type: ignore # lib is not typed | ||
try: | ||
return super().__getitem__(key) # type: ignore # lib is not typed | ||
except Exception as exception: | ||
if not isinstance(exception, KeyError): | ||
logger.warning(f"Error while retrieving item from cache: {exception}") | ||
else: | ||
raise exception | ||
|
||
def _write(self, key: str, value: str) -> None: | ||
try: | ||
super()._write(key, value) # type: ignore # lib is not typed | ||
except Exception as exception: | ||
logger.warning(f"Error while saving item to cache: {exception}") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Enhance error handling and type safety
A few suggestions to make this more robust:
- The broad exception handling could mask critical issues. What do you think about catching specific exceptions?
- The warning messages could be more descriptive by including the key. wdyt?
- Consider adding type hints for better maintainability?
Here's a potential improvement:
-class SkipFailureSQLiteDict(requests_cache.backends.sqlite.SQLiteDict):
+class SkipFailureSQLiteDict(requests_cache.backends.sqlite.SQLiteDict):
+ """A SQLiteDict that logs warnings instead of raising exceptions on cache operations."""
+
- def __getitem__(self, key): # type: ignore # lib is not typed
+ def __getitem__(self, key: str) -> Any: # type: ignore # return type from parent
try:
return super().__getitem__(key) # type: ignore # lib is not typed
- except Exception as exception:
+ except (sqlite3.Error, IOError) as exception:
if not isinstance(exception, KeyError):
- logger.warning(f"Error while retrieving item from cache: {exception}")
+ logger.warning(f"Error while retrieving key '{key}' from cache: {exception}")
else:
raise exception
def _write(self, key: str, value: str) -> None:
try:
super()._write(key, value) # type: ignore # lib is not typed
- except Exception as exception:
- logger.warning(f"Error while saving item to cache: {exception}")
+ except (sqlite3.Error, IOError) as exception:
+ logger.warning(f"Error while saving key '{key}' to cache: {exception}")
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
class SkipFailureSQLiteDict(requests_cache.backends.sqlite.SQLiteDict): | |
def __getitem__(self, key): # type: ignore # lib is not typed | |
try: | |
return super().__getitem__(key) # type: ignore # lib is not typed | |
except Exception as exception: | |
if not isinstance(exception, KeyError): | |
logger.warning(f"Error while retrieving item from cache: {exception}") | |
else: | |
raise exception | |
def _write(self, key: str, value: str) -> None: | |
try: | |
super()._write(key, value) # type: ignore # lib is not typed | |
except Exception as exception: | |
logger.warning(f"Error while saving item to cache: {exception}") | |
from typing import Any | |
import sqlite3 | |
class SkipFailureSQLiteDict(requests_cache.backends.sqlite.SQLiteDict): | |
"""A SQLiteDict that logs warnings instead of raising exceptions on cache operations.""" | |
def __getitem__(self, key: str) -> Any: # type: ignore # return type from parent | |
try: | |
return super().__getitem__(key) # type: ignore # lib is not typed | |
except (sqlite3.Error, IOError) as exception: | |
if not isinstance(exception, KeyError): | |
logger.warning(f"Error while retrieving key '{key}' from cache: {exception}") | |
else: | |
raise exception | |
def _write(self, key: str, value: str) -> None: | |
try: | |
super()._write(key, value) # type: ignore # lib is not typed | |
except (sqlite3.Error, IOError) as exception: | |
logger.warning(f"Error while saving key '{key}' to cache: {exception}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (1)
airbyte_cdk/sources/declarative/interpolation/jinja.py (1)
112-112
: Typo in exception messageThere's a typo in the exception message: "Cound not interpolate" should be "Could not interpolate". Wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/interpolation/jinja.py
(5 hunks)airbyte_cdk/sources/streams/http/http_client.py
(3 hunks)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/interpolation/jinja.py (1)
51-59
: Could making _ENVIRONMENT
a global variable introduce threading issues?
By changing _ENVIRONMENT
to a global variable, there might be potential threading concerns in multi-threaded contexts. Should we ensure that the environment is thread-safe or consider using instance variables instead? Wdyt?
airbyte_cdk/sources/streams/http/http_client.py (1)
524-539
: Could we catch specific exceptions and include the key in warning messages?
Catching broad exceptions like Exception
might mask critical issues. What do you think about catching specific exceptions like sqlite3.Error
or IOError
instead? Also, including the key in the warning messages could improve debuggability. Wdyt?
c7302d9
to
50d6f21
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (2)
airbyte_cdk/sources/declarative/interpolation/jinja.py (1)
33-59
: LGTM! Great security improvements.The addition of restricted extensions and builtin functions is a solid security practice. The documentation clearly explains the security considerations. What do you think about adding a link to Jinja's security documentation in the comments for future reference? wdyt?
airbyte_cdk/sources/streams/http/http_client.py (1)
541-559
: Consider adding class documentation.The implementation looks good, especially sharing the lock between tables. What do you think about adding a docstring explaining the purpose of this class and its error-handling behavior? wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/interpolation/jinja.py
(5 hunks)airbyte_cdk/sources/streams/http/http_client.py
(3 hunks)
🔇 Additional comments (5)
airbyte_cdk/sources/declarative/interpolation/jinja.py (3)
Line range hint 89-108
: LGTM! Robust alias handling and error management.
The validation of reserved keywords and clear error messages will help catch issues early. The default value handling is well implemented.
136-141
: LGTM! Good performance optimization.
The addition of caching and more specific type annotations improves both performance and type safety.
144-148
: LGTM! Good type safety improvement.
The removal of Optional[str] makes the method contract clearer and safer. The caching implementation is well documented.
airbyte_cdk/sources/streams/http/http_client.py (2)
57-57
: LGTM! Good logging practice.
Using a specific logger namespace helps with log filtering and organization.
524-539
: Consider the previous review suggestions for robustness.
The previous review raised valid points about exception handling and logging that are still applicable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
59-60
: Fix typos in comments for clarityThere are minor typos in the comments:
- Line 59: "A value lower than than could cause..." (duplicate "than")
- Line 60: "...but not partition reader is consuming them." (Perhaps you meant "but no partition reader is consuming them"?)
Would you consider correcting these for better readability? Wdyt?
airbyte_cdk/sources/streams/http/http_client.py (1)
57-57
: Reuse the existing logger instanceSince
self._logger
is already defined in the class, could we use it instead of creating a new module-level logger? This would keep logging consistent within the class. Wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(2 hunks)airbyte_cdk/sources/streams/http/http_client.py
(4 hunks)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
125-125
: Confirm calculation of initial partitions
The updated calculation uses integer division:
initial_number_of_partitions_to_generate = self.SINGLE_THREADED_CONCURRENCY_LEVEL // 2
Given SINGLE_THREADED_CONCURRENCY_LEVEL
is 2, this results in initial_number_of_partitions_to_generate
being 1. Just to confirm, is this the intended behavior to prevent deadlocks by ensuring there is at least one partition? Wdyt?
airbyte_cdk/sources/streams/http/http_client.py (1)
526-541
: 🛠️ Refactor suggestion
Catch specific exceptions and enhance logging
Would it be better to catch specific exceptions, such as sqlite3.Error
or IOError
, instead of a broad Exception
? This can prevent masking other unexpected issues. Additionally, including the key
in the warning messages might provide more context for troubleshooting. What do you think about adding type hints for better clarity and maintainability? Wdyt?
Here's a potential improvement:
+import sqlite3
class SkipFailureSQLiteDict(requests_cache.backends.sqlite.SQLiteDict):
- def __getitem__(self, key): # type: ignore # lib is not typed
+ def __getitem__(self, key: str) -> Any:
try:
return super().__getitem__(key) # type: ignore # lib is not typed
- except Exception as exception:
+ except (sqlite3.Error, IOError) as exception:
if not isinstance(exception, KeyError):
- logger.warning(f"Error while retrieving item from cache: {exception}")
+ logger.warning(f"Error while retrieving key '{key}' from cache: {exception}")
else:
raise
def _write(self, key: str, value: str) -> None:
try:
super()._write(key, value) # type: ignore # lib is not typed
- except Exception as exception:
- logger.warning(f"Error while saving item to cache: {exception}")
+ except (sqlite3.Error, IOError) as exception:
+ logger.warning(f"Error while saving key '{key}' to cache: {exception}")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (4)
airbyte_cdk/sources/streams/http/http_client.py (1)
145-146
: Consider removing the temporary logging comment.The TODO comment suggests this is temporary logging. Should we remove it or make it more permanent? wdyt?
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)
123-135
: The method looks good, but the name could be more descriptive.The implementation is solid with good error handling and documentation. However, would you consider renaming
_actually_group
to something more descriptive like_initialize_stream_groups
or_setup_stream_categories
? The current name doesn't fully convey its purpose. wdyt?
145-147
: Consider extracting the initialization check.The null check and initialization pattern is duplicated in both
read()
anddiscover()
. Would you consider extracting this into a private method like_ensure_groups_initialized()
? This would reduce duplication and make the code more maintainable. wdyt?+ def _ensure_groups_initialized(self) -> None: + if self._concurrent_streams is None: + self._actually_group() + def read(self, ...): - if self._concurrent_streams is None: - self._actually_group() + self._ensure_groups_initialized() ... def discover(self, ...): - if self._concurrent_streams is None: - self._actually_group() + self._ensure_groups_initialized() ...Also applies to: 173-175
113-113
: Consider adding a comment explaining the partition calculation.The calculation
SINGLE_THREADED_CONCURRENCY_LEVEL // 2
looks correct, but would you consider adding a comment explaining why half the concurrency level is chosen? This would help future maintainers understand the reasoning. wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(6 hunks)airbyte_cdk/sources/streams/http/http_client.py
(4 hunks)
🔇 Additional comments (5)
airbyte_cdk/sources/streams/http/http_client.py (4)
57-57
: LGTM! Logger initialization looks good.
The logger instance is properly initialized for the "airbyte" namespace.
148-150
: LGTM! Cache configuration looks good.
The cache configuration properly initializes the new SkipFailureSQLiteCache
backend with appropriate settings.
525-540
: Consider enhancing error handling in SkipFailureSQLiteDict.
I notice this was mentioned in a previous review, but it's worth revisiting. What do you think about:
- Catching specific SQLite exceptions instead of generic Exception?
- Including the key in warning messages for better debugging?
- Adding type hints for better maintainability?
Here's a potential improvement:
class SkipFailureSQLiteDict(requests_cache.backends.sqlite.SQLiteDict):
+ """A SQLiteDict that logs warnings instead of raising exceptions on cache operations."""
+
- def __getitem__(self, key): # type: ignore # lib is not typed
+ def __getitem__(self, key: str) -> Any: # type: ignore # return type from parent
try:
return super().__getitem__(key) # type: ignore # lib is not typed
- except Exception as exception:
+ except (sqlite3.Error, IOError) as exception:
if not isinstance(exception, KeyError):
- logger.warning(f"Error while retrieving item from cache: {exception}")
+ logger.warning(f"Error while retrieving key '{key}' from cache: {exception}")
else:
raise exception
def _write(self, key: str, value: str) -> None:
try:
super()._write(key, value) # type: ignore # lib is not typed
- except Exception as exception:
- logger.warning(f"Error while saving item to cache: {exception}")
+ except (sqlite3.Error, IOError) as exception:
+ logger.warning(f"Error while saving key '{key}' to cache: {exception}")
542-563
: LGTM! SkipFailureSQLiteCache implementation looks good.
The implementation properly initializes both responses and redirects tables with appropriate settings. The use of WAL mode and fast_save is a good choice for performance.
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
59-61
: LGTM! The concurrency level change makes sense.
The increase to 2 should prevent the PartitionEnqueuer deadlock by ensuring there's always room for both producing and consuming futures. The new private attributes also improve state management.
Would you consider adding a test to verify that the PartitionEnqueuer doesn't deadlock with this new configuration? wdyt?
Let's check if there are any existing deadlock-related tests:
Also applies to: 82-84
✅ Verification successful
The concurrency level change is well-tested and safe!
Looking at the test files, I can see that:
- The PartitionEnqueuer has dedicated tests for handling futures limits and waiting behavior
- The ConcurrentDeclarativeSource tests verify proper handling of concurrent streams with different concurrency levels
- The code ensures a minimum concurrency level of 2 to prevent deadlocks
No additional tests are needed as the current test coverage already validates the deadlock prevention mechanism. The change from 1 to 2 is well-justified and properly tested.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for deadlock-related tests
rg -i "deadlock|partition.*enqueuer.*test" --type python
Length of output: 89
Script:
#!/bin/bash
# Search for test files related to partition or enqueuer
fd "test.*\.py$" | xargs rg -i "partition.*enqueuer|deadlock"
# Also search for any test files that might test the concurrent source
fd "test.*\.py$" | xargs rg -i "concurrent.*declarative.*source"
# Look for the actual test directory structure
fd "test" --type d
Length of output: 7244
Script:
#!/bin/bash
# Let's check the actual test content for PartitionEnqueuer to see if it covers concurrency levels
cat unit_tests/sources/streams/concurrent/test_partition_enqueuer.py
# Also check the concurrent declarative source tests for any concurrency level testing
cat unit_tests/sources/declarative/test_concurrent_declarative_source.py
Length of output: 70413
sqlite_path = "file::memory:?cache=shared" | ||
backend = SkipFailureSQLiteCache(self._name, sqlite_path) # TODO maybe add a busy timeout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Would you consider adding a busy timeout to prevent SQLite lock errors? Here's a suggestion 🤔
I noticed we have a test case simulating a "database table is locked" error, and the current implementation inherits from requests_cache.backends.sqlite.SQLiteCache
without any timeout configuration. What if we modified the __init__
to include a busy timeout? Something like:
def __init__(self, table_name="response", db_path="http_cache", serializer=None):
super().__init__(table_name, db_path, serializer)
self._db.execute("PRAGMA busy_timeout = 30000") # 30 seconds timeout, wdyt?
This would help prevent those pesky lock errors in concurrent scenarios while keeping the implementation simple. What do you think about this approach? 🤔
🔗 Analysis chain
Consider implementing busy timeout for SQLite.
The TODO comment suggests adding a busy timeout. This could help prevent "database is locked" errors in concurrent scenarios. Would you like me to provide an example implementation?
Let's check if there are any existing busy timeout implementations or issues:
Let me check the implementation of the SQLite cache to understand the context better.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for existing busy timeout implementations
rg -A 2 "busy_timeout|pragma busy_timeout"
# Search for database locked errors in tests
rg "database.*locked|sqlite.*locked"
Length of output: 198
Script:
#!/bin/bash
# Find the SQLite cache implementation
ast-grep --pattern 'class SkipFailureSQLiteCache'
# Also check for any test cases related to concurrent access
rg -B 2 -A 5 "SkipFailureSQLiteCache"
Length of output: 1631
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
59-60
: Typographical corrections in commentsI noticed some typos in the comments that might need correction. Specifically, "lower than than" could be "lower than that," and the sentence in line 60 could be rephrased for clarity. Wdyt?
Here's the suggested fix:
- # By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock - # because it has hit the limit of futures but not partition reader is consuming them. + # By default, we defer to a value of 2. A value lower than that could cause a PartitionEnqueuer to be stuck in a deadlock state + # because it has hit the limit of futures but no partition reader is consuming them.
145-147
: Consider refactoring repeated code into a helper methodI noticed that the check
if self._concurrent_streams is None: self._actually_group()
is repeated in both theread
anddiscover
methods (lines 145-147 and 173-175). Perhaps we could extract this logic into a helper method to adhere to the DRY principle and improve maintainability. Wdyt?Proposed addition:
Add a helper method:
def _ensure_streams_grouped(self): if self._concurrent_streams is None: self._actually_group()Then, in the
read
method (lines 145-147):- if self._concurrent_streams is None: - self._actually_group() + self._ensure_streams_grouped()And in the
discover
method (lines 173-175):- if self._concurrent_streams is None: - self._actually_group() + self._ensure_streams_grouped()Also applies to: 173-175
@maxi297 are there any pieces in here taht we still want to merge? |
What
This PR has been split in different issues to scope the problems found:
How
Summary by CodeRabbit
New Features
ConcurrentDeclarativeSource
, enhancing stability during stream processing.Bug Fixes
Tests