Skip to content

Commit

Permalink
Fix error in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tolik0 committed Dec 26, 2024
1 parent 5ee05f1 commit 24268e2
Show file tree
Hide file tree
Showing 2 changed files with 311 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#
import threading
from collections import OrderedDict
from copy import deepcopy
from typing import Any, Callable, Iterable, Mapping, MutableMapping, Optional

from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
Expand Down Expand Up @@ -57,7 +58,6 @@ class ConcurrentPerPartitionCursor(Cursor):
_NO_CURSOR_STATE: Mapping[str, Any] = {}
_KEY = 0
_VALUE = 1
_state_to_migrate_from: Mapping[str, Any] = {}

def __init__(
self,
Expand All @@ -70,6 +70,7 @@ def __init__(
connector_state_manager: ConnectorStateManager,
cursor_field: CursorField,
) -> None:
self._global_cursor: Mapping[str, Any] = {}
self._stream_name = stream_name
self._stream_namespace = stream_namespace
self._message_repository = message_repository
Expand All @@ -87,7 +88,6 @@ def __init__(
self._finished_partitions = set()
self._lock = threading.Lock()
self._timer = Timer()
self._global_cursor = None
self._new_global_cursor = None
self._lookback_window = 0
self._parent_state = None
Expand Down Expand Up @@ -116,12 +116,12 @@ def state(self) -> MutableMapping[str, Any]:
)
state: dict[str, Any] = {"states": states}

state["state"] = self._global_cursor
if self._global_cursor:
state["state"] = self._global_cursor
if self._lookback_window is not None:
state["lookback_window"] = self._lookback_window
if self._parent_state is not None:
state["parent_state"] = self._parent_state
print(state)
return state

def close_partition(self, partition: Partition) -> None:
Expand Down Expand Up @@ -192,8 +192,8 @@ def generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[Str
cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition))
if not cursor:
partition_state = (
self._state_to_migrate_from
if self._state_to_migrate_from
self._global_cursor
if self._global_cursor
else self._NO_CURSOR_STATE
)
cursor = self._create_cursor(partition_state)
Expand Down Expand Up @@ -265,7 +265,8 @@ def _set_initial_state(self, stream_state: StreamState) -> None:
if "states" not in stream_state:
# We assume that `stream_state` is in a global format that can be applied to all partitions.
# Example: {"global_state_format_key": "global_state_format_value"}
self._state_to_migrate_from = stream_state
self._global_cursor = deepcopy(stream_state)
self._new_global_cursor = deepcopy(stream_state)

else:
for state in stream_state["states"]:
Expand All @@ -278,13 +279,16 @@ def _set_initial_state(self, stream_state: StreamState) -> None:

# set default state for missing partitions if it is per partition with fallback to global
if "state" in stream_state:
self._state_to_migrate_from = stream_state["state"]
self._global_cursor = deepcopy(stream_state["state"])
self._new_global_cursor = deepcopy(stream_state["state"])

# Set parent state for partition routers based on parent streams
self._partition_router.set_initial_state(stream_state)

def observe(self, record: Record) -> None:
print(self._to_partition_key(record.associated_slice.partition), record)
print("Observing record in concirrent perpartition ", self._to_partition_key(record.associated_slice.partition), record, self._cursor_per_partition[
self._to_partition_key(record.associated_slice.partition)
].state)
self._cursor_per_partition[
self._to_partition_key(record.associated_slice.partition)
].observe(record)
Expand All @@ -296,7 +300,7 @@ def _to_dict(self, partition_key: str) -> Mapping[str, Any]:
return self._partition_serializer.to_partition(partition_key)

def _create_cursor(self, cursor_state: Any) -> Cursor:
cursor = self._cursor_factory.create(stream_state=cursor_state)
cursor = self._cursor_factory.create(stream_state=deepcopy(cursor_state))
return cursor

def should_be_synced(self, record: Record) -> bool:
Expand Down
Loading

0 comments on commit 24268e2

Please sign in to comment.