Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Publish the response to process in the context of the record's transformation and filtering steps #193

Open
wants to merge 30 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
df11e5c
Store a reference to the root response in each record extracted from it
rpopov Dec 28, 2024
83a03d7
Added methods to remove the service keys from records to support the …
rpopov Dec 29, 2024
fd17e5a
Change and test the JSON / JSONL decoders to unify their behavior
rpopov Dec 29, 2024
d274539
Renamed a method
rpopov Dec 30, 2024
99a371d
Extracted a convention and 2 global functions to remove/check for ser…
rpopov Dec 30, 2024
1bb3e05
Avoid reusing the maps/dictionaries produced, thus avoid building cyc…
rpopov Dec 30, 2024
c71fd67
Added in-place update of the Mappings of service keys for cases of
rpopov Dec 31, 2024
b5e3650
Added explycit cleanup of the service fields, thus making them:
rpopov Dec 31, 2024
11472ee
Removed the skipping of the service fields in tests of features after…
rpopov Dec 31, 2024
e693c94
Formatted
rpopov Dec 31, 2024
daaea38
Applied automated formmatting as of the requirements of GitHub
rpopov Dec 31, 2024
9a05c94
Suppressed the false positive lint findings.
rpopov Dec 31, 2024
6587873
Update RELEASES.md
rpopov Jan 1, 2025
a33f815
Update CONTRIBUTING.md
rpopov Jan 1, 2025
4bc243d
Update CONTRIBUTING.md
rpopov Jan 1, 2025
62cca5e
Update CONTRIBUTING.md
rpopov Jan 2, 2025
b71da46
Updating the development environment setyp documentation
rpopov Jan 4, 2025
26367a6
50395 Updating the development environment setyp documentation
rpopov Jan 4, 2025
e1c5a47
Update CONTRIBUTING.md
rpopov Jan 5, 2025
4af3c8a
Update CONTRIBUTING.md
rpopov Jan 5, 2025
62d2417
Update record_extractor.py
rpopov Jan 6, 2025
8c636f7
Used the service key name instead of its literal value.
rpopov Jan 8, 2025
a4f16c3
Added the .lock
rpopov Jan 8, 2025
1065a20
Update unit_tests/sources/declarative/extractors/test_dpath_extractor.py
rpopov Jan 8, 2025
b2dab7b
Merge branch 'main' into 50395-1-2
rpopov Jan 8, 2025
118d785
Merged, formatted
rpopov Jan 9, 2025
b49a98d
Update documentation.
rpopov Jan 9, 2025
f7f19be
Update CONTRIBUTING.md
rpopov Jan 9, 2025
e5b9e5c
Handled nitpick comments by rabbit
rpopov Jan 9, 2025
638ed41
Merge branch 'main' into 50395-1-2
rpopov Jan 9, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions airbyte_cdk/sources/declarative/decoders/json_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,19 @@ def decode(
try:
body_json = response.json()
yield from self.parse_body_json(body_json)
except requests.exceptions.JSONDecodeError:
logger.warning(
f"Response cannot be parsed into json: {response.status_code=}, {response.text=}"
)
yield {}
except requests.exceptions.JSONDecodeError as ex:
logger.warning("Response cannot be parsed into json: %s", ex)
logger.debug("Response to parse: %s", response.text, exc_info=True, stack_info=True)
yield from []

@staticmethod
def parse_body_json(
body_json: MutableMapping[str, Any] | List[MutableMapping[str, Any]],
) -> Generator[MutableMapping[str, Any], None, None]:
if not isinstance(body_json, list):
body_json = [body_json]
if len(body_json) == 0:
yield {}
else:
if isinstance(body_json, list):
yield from body_json
else:
yield from [body_json]


@dataclass
Expand Down
23 changes: 21 additions & 2 deletions airbyte_cdk/sources/declarative/extractors/dpath_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,25 @@
import requests

from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.extractors.record_extractor import (
SERVICE_KEY_PREFIX,
RecordExtractor,
)
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.types import Config

# The name of the service field to bind the response (root) in each record
RECORD_ROOT_KEY = SERVICE_KEY_PREFIX + "root"


def update_record(record: Any, root: Any) -> Any:
if isinstance(record, dict):
copy = {k: v for k, v in record.items()}
copy.update({RECORD_ROOT_KEY: root})
else:
copy = record
return copy


@dataclass
class DpathExtractor(RecordExtractor):
Expand Down Expand Up @@ -70,6 +85,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:

def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]:
for body in self.decoder.decode(response):
root_response = body
if len(self._field_path) == 0:
extracted = body
else:
Expand All @@ -79,7 +95,10 @@ def extract_records(self, response: requests.Response) -> Iterable[MutableMappin
else:
extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure
if isinstance(extracted, list):
yield from extracted
for record in extracted:
yield update_record(record, root_response)
elif isinstance(extracted, dict):
yield update_record(extracted, root_response)
elif extracted:
yield extracted
else:
Expand Down
24 changes: 24 additions & 0 deletions airbyte_cdk/sources/declarative/extractors/record_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,30 @@

import requests

# Convention:
# - The record extractors may leave service fields in the extracted records (mappings)
# - The names (keys) of the service fields have the value of SERVICE_KEY_PREFIX as their prefix
# - The service fields may be skipped only to ease the testing
SERVICE_KEY_PREFIX = "$"


def exclude_service_keys(mapping: Mapping[str, Any]) -> Mapping[str, Any]:
return {k: v for k, v in mapping.items() if not is_service_key(k)}


def remove_service_keys(mapping: dict[str, Any]): # type: ignore[no-untyped-def]
for key in list(mapping.keys()):
if is_service_key(key):
mapping.pop(key)


def is_service_key(k: str) -> bool:
return k.find(SERVICE_KEY_PREFIX) == 0


def verify_service_keys_exist(mapping: Mapping[str, Any]): # type: ignore[no-untyped-def]
assert mapping != exclude_service_keys(mapping), "Expected service are present"


@dataclass
class RecordExtractor:
Expand Down
15 changes: 13 additions & 2 deletions airbyte_cdk/sources/declarative/extractors/record_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
import requests

from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.extractors.record_extractor import (
RecordExtractor,
exclude_service_keys,
)
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.models import SchemaNormalization
Expand Down Expand Up @@ -108,7 +111,8 @@ def filter_and_transform(
"""
filtered_data = self._filter(all_data, stream_state, stream_slice, next_page_token)
transformed_data = self._transform(filtered_data, stream_state, stream_slice)
normalized_data = self._normalize_by_schema(transformed_data, schema=records_schema)
no_service_fields_data = self._remove_service_keys(transformed_data)
normalized_data = self._normalize_by_schema(no_service_fields_data, schema=records_schema)
for data in normalized_data:
yield Record(data=data, stream_name=self.name, associated_slice=stream_slice)

Expand Down Expand Up @@ -156,3 +160,10 @@ def _transform(
stream_slice=stream_slice,
)
yield record

def _remove_service_keys(
self, records: Iterable[Mapping[str, Any]]
) -> Iterable[Mapping[str, Any]]:
for record in records:
clean_record = exclude_service_keys(record)
yield clean_record
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def initial_token(self) -> Optional[Any]:
def next_page_token(
self, response: requests.Response, last_page_size: int, last_record: Optional[Record]
) -> Optional[Any]:
decoded_response = next(self.decoder.decode(response))
decoded_response = next(self.decoder.decode(response), {})

# The default way that link is presented in requests.Response is a string of various links (last, next, etc). This
# is not indexable or useful for parsing the cursor, so we replace it with the link dictionary from response.links
Expand Down
16 changes: 10 additions & 6 deletions airbyte_cdk/sources/declarative/transformations/flatten_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dataclasses import dataclass
from typing import Any, Dict, Optional

from airbyte_cdk.sources.declarative.extractors.record_extractor import is_service_key
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState

Expand Down Expand Up @@ -32,12 +33,15 @@ def flatten_record(self, record: Dict[str, Any]) -> Dict[str, Any]:

if isinstance(current_record, dict):
for current_key, value in current_record.items():
new_key = (
f"{parent_key}.{current_key}"
if (current_key in transformed_record or force_with_parent_name)
else current_key
)
stack.append((value, new_key))
if not is_service_key(current_key):
new_key = (
f"{parent_key}.{current_key}"
if (current_key in transformed_record or force_with_parent_name)
else current_key
)
stack.append((value, new_key))
else: # transfer the service fields without change
transformed_record[current_key] = value

elif isinstance(current_record, list):
for i, item in enumerate(current_record):
Expand Down
Loading
Loading