Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add JsonParser component to declarative framework #166

Merged
merged 27 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e68f36f
initial JsonParser component
pnilan Dec 10, 2024
a8a7bb3
update parser
pnilan Dec 11, 2024
254f877
add tests for json parser
pnilan Dec 11, 2024
8df239a
update parser and tests to yield empty dict if unparseable.
pnilan Dec 11, 2024
92574df
chore: format code
pnilan Dec 11, 2024
82a15c9
Merge branch 'main' into pnilan/declarative/parsers
pnilan Dec 12, 2024
0b3b5e1
Merge branch 'main' into pnilan/declarative/parsers
pnilan Jan 10, 2025
9fd93cb
conform tests
pnilan Jan 10, 2025
1892a03
initial test updates
pnilan Jan 10, 2025
51118f1
update JsonParser and relevant tests
pnilan Jan 10, 2025
34a710d
chore: format/type-check
pnilan Jan 10, 2025
060178a
remove orjson from composite_raw_decoder file
pnilan Jan 14, 2025
bf8dd26
Merge branch 'main' into pnilan/declarative/parsers
pnilan Jan 14, 2025
d9b6df3
chore: format code
pnilan Jan 14, 2025
f20fffc
add additional test
pnilan Jan 14, 2025
9ce2c28
update to fallback to json library if orjson fails, update test to us…
pnilan Jan 14, 2025
7e7b2c4
add `JsonParser` to GzipDecoder and CompositeRawDecoder "anyOf" list
pnilan Jan 14, 2025
23cbfb7
update to simplify orjson/json parsing
pnilan Jan 14, 2025
1c2a832
chore: type-check
pnilan Jan 14, 2025
66aaae9
unlock `CompositeRawDecoder` w/ `JsonParser` support for pagination
pnilan Jan 14, 2025
00cf7b1
update conditional validations for decoders/parsers for pagination
pnilan Jan 15, 2025
b7aa78f
remove errant print
pnilan Jan 15, 2025
7b41732
chore: coderabbitai suggestions
pnilan Jan 15, 2025
3f550f2
update parservalidation method
pnilan Jan 15, 2025
27bf5a7
Merge branch 'main' into pnilan/declarative/parsers
pnilan Jan 15, 2025
e691f79
Merge branch 'main' into pnilan/declarative/parsers
natikgadzhi Jan 15, 2025
bb63934
Update airbyte_cdk/sources/declarative/declarative_component_schema.yaml
pnilan Jan 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ definitions:
properties:
type:
type: string
enum: [ CustomSchemaNormalization ]
enum: [CustomSchemaNormalization]
class_name:
title: Class Name
description: Fully-qualified name of the class that will be implementing the custom normalization. The format is `source_<name>.<package>.<class_name>`.
Expand Down Expand Up @@ -2859,6 +2859,7 @@ definitions:
parser:
anyOf:
- "$ref": "#/definitions/GzipParser"
- "$ref": "#/definitions/JsonParser"
- "$ref": "#/definitions/JsonLineParser"
- "$ref": "#/definitions/CsvParser"
# PARSERS
Expand All @@ -2875,6 +2876,21 @@ definitions:
anyOf:
- "$ref": "#/definitions/JsonLineParser"
- "$ref": "#/definitions/CsvParser"
- "$ref": "#/definitions/JsonParser"
JsonParser:
title: JsonParser
description: Parser used for parsing str, bytes, or bytearray data and returning data in a dictionary format.
type: object
additionalProperties: true
pnilan marked this conversation as resolved.
Show resolved Hide resolved
required:
- type
properties:
type:
type: string
enum: [JsonParser]
encoding:
type: string
default: utf-8
JsonLineParser:
type: object
required:
Expand Down
43 changes: 43 additions & 0 deletions airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
from io import BufferedIOBase, TextIOWrapper
from typing import Any, Generator, MutableMapping, Optional

import orjson
import requests

from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.utils import AirbyteTracedException

logger = logging.getLogger("airbyte")

Expand Down Expand Up @@ -42,6 +45,46 @@ def parse(
yield from self.inner_parser.parse(gzipobj)


@dataclass
class JsonParser(Parser):
encoding: str = "utf-8"

def parse(self, data: BufferedIOBase) -> Generator[MutableMapping[str, Any], None, None]:
"""
Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data.
"""
raw_data = data.read()
body_json = self._parse_orjson(raw_data) or self._parse_json(raw_data)

if body_json is None:
raise AirbyteTracedException(
message="Response JSON data failed to be parsed. See logs for more information.",
internal_message=f"Response JSON data failed to be parsed.",
failure_type=FailureType.system_error,
)

if isinstance(body_json, list):
yield from body_json
else:
yield from [body_json]

def _parse_orjson(self, raw_data: bytes) -> Optional[Any]:
try:
return orjson.loads(raw_data.decode(self.encoding))
except Exception as exc:
logger.debug(
f"Failed to parse JSON data using orjson library. Falling back to json library. {exc}"
)
return None

def _parse_json(self, raw_data: bytes) -> Optional[Any]:
try:
return json.loads(raw_data.decode(self.encoding))
except Exception as exc:
logger.error(f"Failed to parse JSON data using json library. {exc}")
return None


@dataclass
class JsonLineParser(Parser):
encoding: Optional[str] = "utf-8"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,33 +737,43 @@ class KeysToSnakeCase(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class FlattenFields(BaseModel):
type: Literal["FlattenFields"]
flatten_lists: Optional[bool] = Field(
True,
description="Whether to flatten lists or leave it as is. Default is True.",
title="Flatten Lists",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class KeysReplace(BaseModel):
type: Literal["KeysReplace"]
old: str = Field(
...,
description="Old value to replace.",
examples=[" ", "{{ record.id }}", "{{ config['id'] }}", "{{ stream_slice['id'] }}"],
examples=[
" ",
"{{ record.id }}",
"{{ config['id'] }}",
"{{ stream_slice['id'] }}",
],
title="Old value",
)
new: str = Field(
...,
description="New value to set.",
examples=["_", "{{ record.id }}", "{{ config['id'] }}", "{{ stream_slice['id'] }}"],
examples=[
"_",
"{{ record.id }}",
"{{ config['id'] }}",
"{{ stream_slice['id'] }}",
],
title="New value",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class FlattenFields(BaseModel):
type: Literal["FlattenFields"]
flatten_lists: Optional[bool] = Field(
True,
description="Whether to flatten lists or leave it as is. Default is True.",
title="Flatten Lists",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class IterableDecoder(BaseModel):
type: Literal["IterableDecoder"]

Expand Down Expand Up @@ -1163,6 +1173,14 @@ class LegacySessionTokenAuthenticator(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class JsonParser(BaseModel):
class Config:
extra = Extra.allow

type: Literal["JsonParser"]
encoding: Optional[str] = "utf-8"


class JsonLineParser(BaseModel):
type: Literal["JsonLineParser"]
encoding: Optional[str] = "utf-8"
Expand Down Expand Up @@ -1561,7 +1579,7 @@ class RecordSelector(BaseModel):

class GzipParser(BaseModel):
type: Literal["GzipParser"]
inner_parser: Union[JsonLineParser, CsvParser]
inner_parser: Union[JsonLineParser, CsvParser, JsonParser]


class Spec(BaseModel):
Expand Down Expand Up @@ -1596,7 +1614,7 @@ class CompositeErrorHandler(BaseModel):

class CompositeRawDecoder(BaseModel):
type: Literal["CompositeRawDecoder"]
parser: Union[GzipParser, JsonLineParser, CsvParser]
parser: Union[GzipParser, JsonParser, JsonLineParser, CsvParser]


class DeclarativeSource1(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@
CsvParser,
GzipParser,
JsonLineParser,
JsonParser,
Parser,
pnilan marked this conversation as resolved.
Show resolved Hide resolved
)
from airbyte_cdk.sources.declarative.extractors import (
DpathExtractor,
Expand Down Expand Up @@ -247,6 +249,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JsonLineParser as JsonLineParserModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JsonParser as JsonParserModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JwtAuthenticator as JwtAuthenticatorModel,
)
Expand Down Expand Up @@ -522,6 +527,7 @@ def _init_mappings(self) -> None:
JsonDecoderModel: self.create_json_decoder,
JsonlDecoderModel: self.create_jsonl_decoder,
JsonLineParserModel: self.create_json_line_parser,
JsonParserModel: self.create_json_parser,
GzipJsonDecoderModel: self.create_gzipjson_decoder,
GzipParserModel: self.create_gzip_parser,
KeysToLowerModel: self.create_keys_to_lower_transformation,
Expand Down Expand Up @@ -1032,17 +1038,17 @@ def create_cursor_pagination(
self, model: CursorPaginationModel, config: Config, decoder: Decoder, **kwargs: Any
) -> CursorPaginationStrategy:
if isinstance(decoder, PaginationDecoderDecorator):
if not isinstance(decoder.decoder, (JsonDecoder, XmlDecoder)):
raise ValueError(
f"Provided decoder of {type(decoder.decoder)=} is not supported. Please set JsonDecoder or XmlDecoder instead."
)
inner_decoder = decoder.decoder
else:
inner_decoder = decoder
decoder = PaginationDecoderDecorator(decoder=decoder)

if self._is_supported_decoder_for_pagination(inner_decoder):
decoder_to_use = decoder
else:
if not isinstance(decoder, (JsonDecoder, XmlDecoder)):
raise ValueError(
f"Provided decoder of {type(decoder)=} is not supported. Please set JsonDecoder or XmlDecoder instead."
)
decoder_to_use = PaginationDecoderDecorator(decoder=decoder)
raise ValueError(
self._UNSUPPORTED_DECODER_ERROR.format(decoder_type=type(inner_decoder))
)

return CursorPaginationStrategy(
cursor_value=model.cursor_value,
Expand Down Expand Up @@ -1515,11 +1521,10 @@ def create_default_paginator(
cursor_used_for_stop_condition: Optional[DeclarativeCursor] = None,
) -> Union[DefaultPaginator, PaginatorTestReadDecorator]:
if decoder:
if not isinstance(decoder, (JsonDecoder, XmlDecoder)):
raise ValueError(
f"Provided decoder of {type(decoder)=} is not supported. Please set JsonDecoder or XmlDecoder instead."
)
decoder_to_use = PaginationDecoderDecorator(decoder=decoder)
if self._is_supported_decoder_for_pagination(decoder):
decoder_to_use = PaginationDecoderDecorator(decoder=decoder)
else:
raise ValueError(self._UNSUPPORTED_DECODER_ERROR.format(decoder_type=type(decoder)))
else:
decoder_to_use = PaginationDecoderDecorator(decoder=JsonDecoder(parameters={}))
page_size_option = (
Expand Down Expand Up @@ -1748,6 +1753,11 @@ def create_dynamic_schema_loader(
def create_json_decoder(model: JsonDecoderModel, config: Config, **kwargs: Any) -> JsonDecoder:
return JsonDecoder(parameters={})

@staticmethod
def create_json_parser(model: JsonParserModel, config: Config, **kwargs: Any) -> JsonParser:
encoding = model.encoding if model.encoding else "utf-8"
return JsonParser(encoding=encoding)

@staticmethod
def create_jsonl_decoder(
model: JsonlDecoderModel, config: Config, **kwargs: Any
Expand Down Expand Up @@ -1927,22 +1937,22 @@ def create_oauth_authenticator(
message_repository=self._message_repository,
)

@staticmethod
def create_offset_increment(
model: OffsetIncrementModel, config: Config, decoder: Decoder, **kwargs: Any
self, model: OffsetIncrementModel, config: Config, decoder: Decoder, **kwargs: Any
) -> OffsetIncrement:
if isinstance(decoder, PaginationDecoderDecorator):
if not isinstance(decoder.decoder, (JsonDecoder, XmlDecoder)):
raise ValueError(
f"Provided decoder of {type(decoder.decoder)=} is not supported. Please set JsonDecoder or XmlDecoder instead."
)
inner_decoder = decoder.decoder
else:
inner_decoder = decoder
decoder = PaginationDecoderDecorator(decoder=decoder)

if self._is_supported_decoder_for_pagination(inner_decoder):
decoder_to_use = decoder
else:
if not isinstance(decoder, (JsonDecoder, XmlDecoder)):
raise ValueError(
f"Provided decoder of {type(decoder)=} is not supported. Please set JsonDecoder or XmlDecoder instead."
)
decoder_to_use = PaginationDecoderDecorator(decoder=decoder)
raise ValueError(
self._UNSUPPORTED_DECODER_ERROR.format(decoder_type=type(inner_decoder))
)

return OffsetIncrement(
page_size=model.page_size,
config=config,
Expand Down Expand Up @@ -2531,3 +2541,25 @@ def create_config_components_resolver(
components_mapping=components_mapping,
parameters=model.parameters or {},
)

_UNSUPPORTED_DECODER_ERROR = (
pnilan marked this conversation as resolved.
Show resolved Hide resolved
"Specified decoder of {decoder_type} is not supported for pagination."
"Please set as `JsonDecoder`, `XmlDecoder`, or a `CompositeRawDecoder` with an inner_parser of `JsonParser` or `GzipParser` instead."
"If using `GzipParser`, please ensure that the lowest level inner_parser is a `JsonParser`."
)

def _is_supported_decoder_for_pagination(self, decoder: Decoder) -> bool:
if isinstance(decoder, (JsonDecoder, XmlDecoder)):
return True
elif isinstance(decoder, CompositeRawDecoder):
return self._is_supported_parser_for_pagination(decoder.parser)
else:
return False

def _is_supported_parser_for_pagination(self, parser: Parser) -> bool:
if isinstance(parser, JsonParser):
return True
elif isinstance(parser, GzipParser):
return self._is_supported_parser_for_pagination(parser.inner_parser)
else:
return False
12 changes: 6 additions & 6 deletions docs/RELEASES.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ A few seconds after any PR is merged to `main` , a release draft will be created
3. Optionally tweak the text in the release notes - for instance to call out contributors, to make a specific change more intuitive for readers to understand, or to move updates into a different category than they were assigned by default. (Note: You can also do this retroactively after publishing the release.)
4. Publish the release by pressing the “Publish release” button.

*Note:*
pnilan marked this conversation as resolved.
Show resolved Hide resolved
_Note:_

- *Only maintainers can see release drafts. Non-maintainers will only see published releases.*
- _Only maintainers can see release drafts. Non-maintainers will only see published releases._
- If you create a tag on accident that you need to remove, contact a maintainer to delete the tag and the release.
- You can monitor the PyPI release process here in the GitHub Actions view: https://github.com/airbytehq/airbyte-python-cdk/actions/workflows/pypi_publish.yml

Expand Down Expand Up @@ -49,23 +49,23 @@ The first option is to look in the `declarative_manifest_image_version` database

If that is not available as an option, you can run an Builder-created connector in Cloud and note the version number printed in the logs. Warning: this may not be indicative if that connector instance has been manually pinned to a specific version.

TODO: Would be great to find a way to inspect directly without requiring direct prod DB access.
TODO: Would be great to find a way to inspect directly without requiring direct prod DB access.

### How to pretest changes to SDM images manually

To manually test changes against a dev image of SDM before committing to a release, first use the Publishing & Packaging workflow to publish a pre-release version of the CDK/SDM. Be sure to uncheck the option to create a connector builder PR.

#### Pretesting Manifest-Only connectors

Once the publish pipeline has completed, choose a connector to test. Set the base_image in the connector's metadata to your pre-release version in Dockerhub (make sure to update the SHA as well).
Next, build the pre-release image locally using `airbyte-ci connectors —name=<source> build`.
Once the publish pipeline has completed, choose a connector to test. Set the base_image in the connector's metadata to your pre-release version in Dockerhub (make sure to update the SHA as well).
Next, build the pre-release image locally using `airbyte-ci connectors —name=<source> build`.
You can now run connector interfaces against the built image using the pattern
`docker run airbyte/<source-name>:dev <spec/check/discover/read>`.
The connector's README should include a list of these commands, which can be copy/pasted and run from the connector's directory for quick testing against a local config.
You can also run `airbyte-ci connectors —name=<source> test` to run the CI test suite against the dev image.

#### Pretesting Low-Code Python connectors

Once the publish pipeline has completed, set the version of `airbyte-cdk` in the connector's pyproject.toml file to the pre-release version in PyPI.
Once the publish pipeline has completed, set the version of `airbyte-cdk` in the connector's pyproject.toml file to the pre-release version in PyPI.
Update the lockfile and run connector interfaces via poetry:
`poetry run source-<name> spec/check/discover/read`.
You can also run `airbyte-ci connectors —name=<source> test` to run the CI test suite against the dev image.



Expand Down
Loading
Loading