Skip to content

Commit

Permalink
feat: Adds ZipfileDecoder component (#169)
Browse files Browse the repository at this point in the history
  • Loading branch information
pnilan authored Jan 17, 2025
1 parent b5ed82c commit d2016c6
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 5 deletions.
26 changes: 26 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1513,6 +1513,7 @@ definitions:
anyOf:
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/CompositeRawDecoder"
$parameters:
type: object
additionalProperties: true
Expand Down Expand Up @@ -2071,6 +2072,26 @@ definitions:
$parameters:
type: object
additionalProperties: true
ZipfileDecoder:
title: Zipfile Decoder
description: Decoder for response data that is returned as zipfile(s).
type: object
additionalProperties: true
required:
- type
- parser
properties:
type:
type: string
enum: [ZipfileDecoder]
parser:
title: Parser
description: Parser to parse the decompressed data from the zipfile(s).
anyOf:
- "$ref": "#/definitions/GzipParser"
- "$ref": "#/definitions/JsonParser"
- "$ref": "#/definitions/JsonLineParser"
- "$ref": "#/definitions/CsvParser"
ListPartitionRouter:
title: List Partition Router
description: A Partition router that specifies a list of attributes where each attribute describes a portion of the complete data set for a stream. During a sync, each value is iterated over and can be used as input to outbound API requests.
Expand Down Expand Up @@ -2899,6 +2920,7 @@ definitions:
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/GzipJsonDecoder"
- "$ref": "#/definitions/CompositeRawDecoder"
- "$ref": "#/definitions/ZipfileDecoder"
$parameters:
type: object
additionalProperties: true
Expand Down Expand Up @@ -3097,6 +3119,8 @@ definitions:
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/GzipJsonDecoder"
- "$ref": "#/definitions/CompositeRawDecoder"
- "$ref": "#/definitions/ZipfileDecoder"
download_decoder:
title: Download Decoder
description: Component decoding the download response so records can be extracted.
Expand All @@ -3107,6 +3131,8 @@ definitions:
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/GzipJsonDecoder"
- "$ref": "#/definitions/CompositeRawDecoder"
- "$ref": "#/definitions/ZipfileDecoder"
$parameters:
type: object
additionalProperties: true
Expand Down
10 changes: 9 additions & 1 deletion airbyte_cdk/sources/declarative/decoders/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import CompositeRawDecoder
from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import (
CompositeRawDecoder,
GzipParser,
JsonParser,
Parser,
)
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import (
GzipJsonDecoder,
Expand All @@ -15,15 +20,18 @@
PaginationDecoderDecorator,
)
from airbyte_cdk.sources.declarative.decoders.xml_decoder import XmlDecoder
from airbyte_cdk.sources.declarative.decoders.zipfile_decoder import ZipfileDecoder

__all__ = [
"Decoder",
"CompositeRawDecoder",
"JsonDecoder",
"JsonParser",
"JsonlDecoder",
"IterableDecoder",
"GzipJsonDecoder",
"NoopDecoder",
"PaginationDecoderDecorator",
"XmlDecoder",
"ZipfileDecoder",
]
59 changes: 59 additions & 0 deletions airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

import logging
import zipfile
from dataclasses import dataclass
from io import BytesIO
from typing import Any, Generator, MutableMapping

import orjson
import requests

from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.declarative.decoders import Decoder
from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import (
Parser,
)
from airbyte_cdk.utils import AirbyteTracedException

logger = logging.getLogger("airbyte")


@dataclass
class ZipfileDecoder(Decoder):
parser: Parser

def is_stream_response(self) -> bool:
return False

def decode(
self, response: requests.Response
) -> Generator[MutableMapping[str, Any], None, None]:
try:
with zipfile.ZipFile(BytesIO(response.content)) as zip_file:
for file_name in zip_file.namelist():
unzipped_content = zip_file.read(file_name)
buffered_content = BytesIO(unzipped_content)
try:
yield from self.parser.parse(buffered_content)
except Exception as e:
logger.error(
f"Failed to parse file: {file_name} from zip file: {response.request.url} with exception {e}."
)
raise AirbyteTracedException(
message=f"Failed to parse file: {file_name} from zip file.",
internal_message=f"Failed to parse file: {file_name} from zip file: {response.request.url}.",
failure_type=FailureType.system_error,
) from e
except zipfile.BadZipFile as e:
logger.error(
f"Received an invalid zip file in response to URL: {response.request.url}. "
f"The size of the response body is: {len(response.content)}"
)
raise AirbyteTracedException(
message="Received an invalid zip file in response.",
internal_message=f"Received an invalid zip file in response to URL: {response.request.url}.",
failure_type=FailureType.system_error,
) from e
Original file line number Diff line number Diff line change
Expand Up @@ -1223,9 +1223,6 @@ class LegacySessionTokenAuthenticator(BaseModel):


class JsonParser(BaseModel):
class Config:
extra = Extra.allow

type: Literal["JsonParser"]
encoding: Optional[str] = "utf-8"

Expand Down Expand Up @@ -1661,6 +1658,18 @@ class CompositeErrorHandler(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class ZipfileDecoder(BaseModel):
class Config:
extra = Extra.allow

type: Literal["ZipfileDecoder"]
parser: Union[GzipParser, JsonParser, JsonLineParser, CsvParser] = Field(
...,
description="Parser to parse the decompressed data from the zipfile(s).",
title="Parser",
)


class CompositeRawDecoder(BaseModel):
type: Literal["CompositeRawDecoder"]
parser: Union[GzipParser, JsonParser, JsonLineParser, CsvParser]
Expand Down Expand Up @@ -1866,7 +1875,7 @@ class SessionTokenAuthenticator(BaseModel):
description="Authentication method to use for requests sent to the API, specifying how to inject the session token.",
title="Data Request Authentication",
)
decoder: Optional[Union[JsonDecoder, XmlDecoder]] = Field(
decoder: Optional[Union[JsonDecoder, XmlDecoder, CompositeRawDecoder]] = Field(
None, description="Component used to decode the response.", title="Decoder"
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
Expand Down Expand Up @@ -2071,6 +2080,7 @@ class SimpleRetriever(BaseModel):
XmlDecoder,
GzipJsonDecoder,
CompositeRawDecoder,
ZipfileDecoder,
]
] = Field(
None,
Expand Down Expand Up @@ -2147,6 +2157,8 @@ class AsyncRetriever(BaseModel):
IterableDecoder,
XmlDecoder,
GzipJsonDecoder,
CompositeRawDecoder,
ZipfileDecoder,
]
] = Field(
None,
Expand All @@ -2161,6 +2173,8 @@ class AsyncRetriever(BaseModel):
IterableDecoder,
XmlDecoder,
GzipJsonDecoder,
CompositeRawDecoder,
ZipfileDecoder,
]
] = Field(
None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
JsonlDecoder,
PaginationDecoderDecorator,
XmlDecoder,
ZipfileDecoder,
)
from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import (
CompositeRawDecoder,
Expand Down Expand Up @@ -356,6 +357,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
XmlDecoder as XmlDecoderModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ZipfileDecoder as ZipfileDecoderModel,
)
from airbyte_cdk.sources.declarative.partition_routers import (
CartesianProductStreamSlicer,
ListPartitionRouter,
Expand Down Expand Up @@ -571,6 +575,7 @@ def _init_mappings(self) -> None:
ConfigComponentsResolverModel: self.create_config_components_resolver,
StreamConfigModel: self.create_stream_config,
ComponentMappingDefinitionModel: self.create_components_mapping_definition,
ZipfileDecoderModel: self.create_zipfile_decoder,
}

# Needed for the case where we need to perform a second parse on the fields of a custom component
Expand Down Expand Up @@ -1800,6 +1805,12 @@ def create_gzipjson_decoder(
) -> GzipJsonDecoder:
return GzipJsonDecoder(parameters={}, encoding=model.encoding)

def create_zipfile_decoder(
self, model: ZipfileDecoderModel, config: Config, **kwargs: Any
) -> ZipfileDecoder:
parser = self._create_component_from_model(model=model.parser, config=config)
return ZipfileDecoder(parser=parser)

def create_gzip_parser(
self, model: GzipParserModel, config: Config, **kwargs: Any
) -> GzipParser:
Expand Down
68 changes: 68 additions & 0 deletions unit_tests/sources/declarative/decoders/test_zipfile_decoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#
import gzip
import json
import zipfile
from io import BytesIO
from typing import Union

import pytest
import requests

from airbyte_cdk.sources.declarative.decoders import GzipParser, JsonParser, ZipfileDecoder


def create_zip_from_dict(data: Union[dict, list]) -> bytes:
zip_buffer = BytesIO()
with zipfile.ZipFile(zip_buffer, mode="w") as zip_file:
zip_file.writestr("data.json", data)
return zip_buffer.getvalue()


def create_multi_zip_from_dict(data: list) -> bytes:
zip_buffer = BytesIO()

with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file:
for i, content in enumerate(data):
file_content = json.dumps(content).encode("utf-8")
zip_file.writestr(f"file_{i}.json", file_content)
return zip_buffer.getvalue()


@pytest.mark.parametrize(
"json_data",
[
{"test": "test"},
{"responses": [{"id": 1}, {"id": 2}]},
[{"id": 1}, {"id": 2}],
{},
],
)
def test_zipfile_decoder_with_single_file_response(requests_mock, json_data):
zipfile_decoder = ZipfileDecoder(parser=GzipParser(inner_parser=JsonParser()))
compressed_data = gzip.compress(json.dumps(json_data).encode())
zipped_data = create_zip_from_dict(compressed_data)
requests_mock.register_uri("GET", "https://airbyte.io/", content=zipped_data)
response = requests.get("https://airbyte.io/")

if isinstance(json_data, list):
for i, actual in enumerate(zipfile_decoder.decode(response=response)):
assert actual == json_data[i]
else:
assert next(zipfile_decoder.decode(response=response)) == json_data


def test_zipfile_decoder_with_multi_file_response(requests_mock):
data_to_zip = [{"key1": "value1"}, {"key2": "value2"}, {"key3": "value3"}]

mocked_response = create_multi_zip_from_dict(data_to_zip)

decoder = ZipfileDecoder(parser=JsonParser())
requests_mock.register_uri("GET", "https://airbyte.io/", content=mocked_response)
response = requests.get("https://airbyte.io/")
results = list(decoder.decode(response))

assert len(results) == 3
for i, actual in enumerate(results):
assert actual == data_to_zip[i]

0 comments on commit d2016c6

Please sign in to comment.