Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adds ZipfileDecoder component #169

Merged
merged 40 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
e68f36f
initial JsonParser component
pnilan Dec 10, 2024
a8a7bb3
update parser
pnilan Dec 11, 2024
254f877
add tests for json parser
pnilan Dec 11, 2024
8df239a
update parser and tests to yield empty dict if unparseable.
pnilan Dec 11, 2024
8c7d5f8
add zipfile_decoder
pnilan Dec 11, 2024
92574df
chore: format code
pnilan Dec 11, 2024
6e4b376
Merge branch 'pnilan/declarative/parsers' into pnilan/declarative/zip…
pnilan Dec 11, 2024
ab3f404
update zipfile_decoder and relevants tests
pnilan Dec 12, 2024
82a15c9
Merge branch 'main' into pnilan/declarative/parsers
pnilan Dec 12, 2024
49d0ec8
Merge branch 'pnilan/declarative/parsers' into pnilan/declarative/zip…
pnilan Dec 12, 2024
96ec874
remove errant comment
pnilan Dec 13, 2024
0b3b5e1
Merge branch 'main' into pnilan/declarative/parsers
pnilan Jan 10, 2025
9fd93cb
conform tests
pnilan Jan 10, 2025
1892a03
initial test updates
pnilan Jan 10, 2025
51118f1
update JsonParser and relevant tests
pnilan Jan 10, 2025
34a710d
chore: format/type-check
pnilan Jan 10, 2025
060178a
remove orjson from composite_raw_decoder file
pnilan Jan 14, 2025
bf8dd26
Merge branch 'main' into pnilan/declarative/parsers
pnilan Jan 14, 2025
d9b6df3
chore: format code
pnilan Jan 14, 2025
f20fffc
add additional test
pnilan Jan 14, 2025
9ce2c28
update to fallback to json library if orjson fails, update test to us…
pnilan Jan 14, 2025
7e7b2c4
add `JsonParser` to GzipDecoder and CompositeRawDecoder "anyOf" list
pnilan Jan 14, 2025
23cbfb7
update to simplify orjson/json parsing
pnilan Jan 14, 2025
1c2a832
chore: type-check
pnilan Jan 14, 2025
66aaae9
unlock `CompositeRawDecoder` w/ `JsonParser` support for pagination
pnilan Jan 14, 2025
00cf7b1
update conditional validations for decoders/parsers for pagination
pnilan Jan 15, 2025
b7aa78f
remove errant print
pnilan Jan 15, 2025
7b41732
chore: coderabbitai suggestions
pnilan Jan 15, 2025
3f550f2
update parservalidation method
pnilan Jan 15, 2025
27bf5a7
Merge branch 'main' into pnilan/declarative/parsers
pnilan Jan 15, 2025
bd724bf
Merge branch 'pnilan/declarative/parsers' into pnilan/declarative/zip…
pnilan Jan 15, 2025
350fcdb
remove unnecessary parser
pnilan Jan 15, 2025
aae3e77
update ZipfileDecoder to hanlde underlying parsers
pnilan Jan 15, 2025
fe6859f
update types
pnilan Jan 15, 2025
907f628
Merge branch 'main' into pnilan/declarative/zipfiledecoder
pnilan Jan 16, 2025
8a1ccf0
add `ZipfileDecoder` to `anyOf` validator in declarative component sc…
pnilan Jan 16, 2025
fcb3184
update `anyOf` validations to include `ZipfileDecoder` in declarative…
pnilan Jan 16, 2025
1c8cd66
update zipfiledecoder and relevant tests
pnilan Jan 17, 2025
1777cac
adds JsonLineParser and CsvParser to available underlying parsers for…
pnilan Jan 17, 2025
c0b2130
close zipfile context, add exception logging at decoder level, and ad…
pnilan Jan 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1897,6 +1897,56 @@ definitions:
$parameters:
type: object
additionalProperties: true
ZipfileDecoder:
title: Zipfile Decoder
description: Decoder for response data that is returned as zipfile(s).
type: object
additionalProperties: true
required:
- type
properties:
type:
type: string
enum: [ZipfileDecoder]
parser:
title: Parser
description: Parser to parse the decompressed data from the zipfile(s).
anyOf:
- "$ref": "#/definitions/JsonParser"
- "$ref": "#/definitions/CustomParser"
JsonParser:
title: JsonParser
description: Parser used for parsing str, bytes, or bytearray data and returning data in a dictionary format.
type: object
additionalProperties: true
required:
- type
properties:
type:
type: string
enum: [JsonParser]
CustomParser:
title: Custom Parser
description: Use this to implement custom parser logic.
type: object
additionalProperties: true
required:
- type
- class_name
properties:
type:
type: string
enum: [CustomParser]
class_name:
title: Class Name
description: Fully-qualified name of the class that will be implementing the custom decoding. Has to be a sub class of Parser. The format is `source_<name>.<package>.<class_name>`.
type: string
additionalProperties: true
examples:
- "source_rivendell.components.ElvishParser"
$parameters:
type: object
additionalProperties: true
ListPartitionRouter:
title: List Partition Router
description: A Partition router that specifies a list of attributes where each attribute describes a portion of the complete data set for a stream. During a sync, each value is iterated over and can be used as input to outbound API requests.
Expand Down
3 changes: 2 additions & 1 deletion airbyte_cdk/sources/declarative/decoders/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@
from airbyte_cdk.sources.declarative.decoders.noop_decoder import NoopDecoder
from airbyte_cdk.sources.declarative.decoders.pagination_decoder_decorator import PaginationDecoderDecorator
from airbyte_cdk.sources.declarative.decoders.xml_decoder import XmlDecoder
from airbyte_cdk.sources.declarative.decoders.zipfile_decoder import ZipfileDecoder

__all__ = ["Decoder", "JsonDecoder", "JsonlDecoder", "IterableDecoder", "GzipJsonDecoder", "NoopDecoder", "PaginationDecoderDecorator", "XmlDecoder"]
__all__ = ["Decoder", "JsonDecoder", "JsonlDecoder", "IterableDecoder", "GzipJsonDecoder", "NoopDecoder", "PaginationDecoderDecorator", "XmlDecoder", "ZipfileDecoder"]
7 changes: 7 additions & 0 deletions airbyte_cdk/sources/declarative/decoders/parsers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.decoders.parsers.parsers import Parser, JsonParser

__all__ = ["Parser", "JsonParser"]
49 changes: 49 additions & 0 deletions airbyte_cdk/sources/declarative/decoders/parsers/parsers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

import json
import logging
from abc import abstractmethod
from dataclasses import InitVar, dataclass
from typing import Any, Generator, Mapping, MutableMapping, Union

logger = logging.getLogger("airbyte")


@dataclass
class Parser:
"""
Parser strategy to convert str, bytes, or bytearray data into MutableMapping[str, Any].
"""

@abstractmethod
def parse(
self, data: Union[str, bytes, bytearray]
) -> Generator[MutableMapping[str, Any], None, None]:
pass


@dataclass
class JsonParser(Parser):
"""
Parser strategy for converting JSON-structure str, bytes, or bytearray data into MutableMapping[str, Any].
"""

parameters: InitVar[Mapping[str, Any]]

def parse(
self, data: Union[str, bytes, bytearray]
) -> Generator[MutableMapping[str, Any], None, None]:
try:
body_json = json.loads(data)
except json.JSONDecodeError:
logger.warning(f"Data cannot be parsed into json: {data=}")
yield {}
pnilan marked this conversation as resolved.
Show resolved Hide resolved

if not isinstance(body_json, list):
body_json = [body_json]
if len(body_json) == 0:
yield {}
else:
yield from body_json
54 changes: 54 additions & 0 deletions airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

import gzip
import io
import logging
import zipfile
from dataclasses import InitVar, dataclass
from typing import Any, Generator, Mapping, MutableMapping, Optional

import requests

from airbyte_cdk.sources.declarative.decoders import Decoder
from airbyte_cdk.sources.declarative.decoders.parsers import JsonParser, Parser

logger = logging.getLogger("airbyte")


@dataclass
class ZipfileDecoder(Decoder):
parameters: InitVar[Mapping[str, Any]]
parser: Optional[Parser] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._parser = (
self.parser(parameters=parameters) if self.parser else JsonParser(parameters=parameters)
)
pnilan marked this conversation as resolved.
Show resolved Hide resolved

def is_stream_response(self) -> bool:
return False

def decode(
self, response: requests.Response
) -> Generator[MutableMapping[str, Any], None, None]:
try:
zip_file = zipfile.ZipFile(io.BytesIO(response.content))
except zipfile.BadZipFile as e:
logger.exception(e)
logger.error(
f"Received an invalid zip file in response to URL: {response.request.url}. "
f"The size of the response body is: {len(response.content)}"
)
yield {}

pnilan marked this conversation as resolved.
Show resolved Hide resolved
for gzip_filename in zip_file.namelist():
with zip_file.open(gzip_filename) as file:
try:
for data in gzip.open(file):
yield from self._parser.parse(data)
except gzip.BadGzipFile as e:
logger.exception(e)
logger.error(f"Fail to read contents of zipped response: {e}")
yield {}
pnilan marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,27 @@ class Config:
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class JsonParser(BaseModel):
class Config:
extra = Extra.allow

type: Literal["JsonParser"]


class CustomParser(BaseModel):
class Config:
extra = Extra.allow

type: Literal["CustomParser"]
class_name: str = Field(
...,
description="Fully-qualified name of the class that will be implementing the custom decoding. Has to be a sub class of Parser. The format is `source_<name>.<package>.<class_name>`.",
examples=["source_rivendell.components.ElvishParser"],
title="Class Name",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class MinMaxDatetime(BaseModel):
type: Literal["MinMaxDatetime"]
datetime: str = Field(
Expand Down Expand Up @@ -1468,6 +1489,18 @@ class SessionTokenRequestApiKeyAuthenticator(BaseModel):
)


class ZipfileDecoder(BaseModel):
class Config:
extra = Extra.allow

type: Literal["ZipfileDecoder"]
parser: Optional[Union[JsonParser, CustomParser]] = Field(
None,
description="Parser to parse the decompressed data from the zipfile(s).",
title="Parser",
)


class ListPartitionRouter(BaseModel):
type: Literal["ListPartitionRouter"]
cursor_field: str = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@
JsonlDecoder,
PaginationDecoderDecorator,
XmlDecoder,
ZipfileDecoder,
)
from airbyte_cdk.sources.declarative.decoders.parsers import JsonParser
from airbyte_cdk.sources.declarative.extractors import (
DpathExtractor,
RecordFilter,
Expand Down Expand Up @@ -224,6 +226,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JsonlDecoder as JsonlDecoderModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JsonParser as JsonParserModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JwtAuthenticator as JwtAuthenticatorModel,
)
Expand Down Expand Up @@ -316,6 +321,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
XmlDecoder as XmlDecoderModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ZipfileDecoder as ZipfileDecoderModel,
)
from airbyte_cdk.sources.declarative.partition_routers import (
CartesianProductStreamSlicer,
ListPartitionRouter,
Expand Down Expand Up @@ -470,6 +478,7 @@ def _init_mappings(self) -> None:
InlineSchemaLoaderModel: self.create_inline_schema_loader,
JsonDecoderModel: self.create_json_decoder,
JsonlDecoderModel: self.create_jsonl_decoder,
JsonParserModel: self.create_json_parser,
GzipJsonDecoderModel: self.create_gzipjson_decoder,
KeysToLowerModel: self.create_keys_to_lower_transformation,
IterableDecoderModel: self.create_iterable_decoder,
Expand Down Expand Up @@ -505,6 +514,7 @@ def _init_mappings(self) -> None:
ConfigComponentsResolverModel: self.create_config_components_resolver,
StreamConfigModel: self.create_stream_config,
ComponentMappingDefinitionModel: self.create_components_mapping_definition,
ZipfileDecoderModel: self.create_zipfile_decoder,
}

# Needed for the case where we need to perform a second parse on the fields of a custom component
Expand Down Expand Up @@ -1682,6 +1692,20 @@ def create_gzipjson_decoder(
) -> GzipJsonDecoder:
return GzipJsonDecoder(parameters={}, encoding=model.encoding)

def create_zipfile_decoder(
self, model: ZipfileDecoderModel, config: Config, **kwargs: Any
) -> ZipfileDecoder:
parser = (
self._create_component_from_model(model=model.parser, config=config)
if model.parser
else None
)
return ZipfileDecoder(parameters={}, parser=parser)

@staticmethod
def create_json_parser(model: JsonParserModel, config: Config, **kwargs: Any) -> JsonParser:
return JsonParser(parameters={})

@staticmethod
def create_json_file_schema_loader(
model: JsonFileSchemaLoaderModel, config: Config, **kwargs: Any
Expand Down
3 changes: 3 additions & 0 deletions unit_tests/sources/declarative/decoders/parsers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
35 changes: 35 additions & 0 deletions unit_tests/sources/declarative/decoders/parsers/test_parsers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

import json

import pytest

from airbyte_cdk.sources.declarative.decoders.parsers import JsonParser


@pytest.mark.parametrize(
"raw_data, expected",
[
(json.dumps({"data-type": "string"}), {"data-type": "string"}),
(json.dumps({"data-type": "bytes"}).encode("utf-8"), {"data-type": "bytes"}),
(
bytearray(json.dumps({"data-type": "bytearray"}).encode("utf-8")),
{"data-type": "bytearray"},
),
(json.dumps([{"id": 1}, {"id": 2}]), [{"id": 1}, {"id": 2}]),
],
ids=[
"test_with_str",
"test_with_bytes",
"test_with_bytearray",
"test_with_string_data_containing_list",
],
)
def test_json_parser_with_valid_data(raw_data, expected):
for i, actual in enumerate(JsonParser().parse(raw_data)):
if isinstance(expected, list):
assert actual == expected[i]
else:
assert actual == expected
44 changes: 44 additions & 0 deletions unit_tests/sources/declarative/decoders/test_zipfile_decoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
import gzip
import json
import zipfile
from io import BytesIO
from typing import Union

import pytest
import requests

from airbyte_cdk.sources.declarative.decoders import ZipfileDecoder
from airbyte_cdk.sources.declarative.decoders.parsers import JsonParser


def create_zip_from_dict(data: Union[dict, list]):
zip_buffer = BytesIO()
with zipfile.ZipFile(zip_buffer, mode="w") as zip_file:
zip_file.writestr("data.json", data)
zip_buffer.seek(0)
return zip_buffer.getvalue()


@pytest.mark.parametrize(
"json_data",
[
{"test": "test"},
[{"id": 1}, {"id": 2}],
],
)
def test_zipfile_decoder_with_valid_response(requests_mock, json_data):
pnilan marked this conversation as resolved.
Show resolved Hide resolved
zipfile_decoder = ZipfileDecoder(parameters={}, parser=JsonParser)
compressed_data = gzip.compress(json.dumps(json_data).encode())
# zipped_data = create_zip_from_dict(json_data)
zipped_data = create_zip_from_dict(compressed_data)
requests_mock.register_uri("GET", "https://airbyte.io/", content=zipped_data)
response = requests.get("https://airbyte.io/")

if isinstance(json_data, list):
for i, actual in enumerate(zipfile_decoder.decode(response=response)):
assert actual == json_data[i]
else:
assert next(zipfile_decoder.decode(response=response)) == json_data
Loading