diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 15f1427a4..4e6f6de19 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1513,6 +1513,7 @@ definitions: anyOf: - "$ref": "#/definitions/JsonDecoder" - "$ref": "#/definitions/XmlDecoder" + - "$ref": "#/definitions/CompositeRawDecoder" $parameters: type: object additionalProperties: true @@ -2067,6 +2068,26 @@ definitions: $parameters: type: object additionalProperties: true + ZipfileDecoder: + title: Zipfile Decoder + description: Decoder for response data that is returned as zipfile(s). + type: object + additionalProperties: true + required: + - type + - parser + properties: + type: + type: string + enum: [ZipfileDecoder] + parser: + title: Parser + description: Parser to parse the decompressed data from the zipfile(s). + anyOf: + - "$ref": "#/definitions/GzipParser" + - "$ref": "#/definitions/JsonParser" + - "$ref": "#/definitions/JsonLineParser" + - "$ref": "#/definitions/CsvParser" ListPartitionRouter: title: List Partition Router description: A Partition router that specifies a list of attributes where each attribute describes a portion of the complete data set for a stream. During a sync, each value is iterated over and can be used as input to outbound API requests. @@ -2895,6 +2916,7 @@ definitions: - "$ref": "#/definitions/XmlDecoder" - "$ref": "#/definitions/GzipJsonDecoder" - "$ref": "#/definitions/CompositeRawDecoder" + - "$ref": "#/definitions/ZipfileDecoder" $parameters: type: object additionalProperties: true @@ -3093,6 +3115,8 @@ definitions: - "$ref": "#/definitions/IterableDecoder" - "$ref": "#/definitions/XmlDecoder" - "$ref": "#/definitions/GzipJsonDecoder" + - "$ref": "#/definitions/CompositeRawDecoder" + - "$ref": "#/definitions/ZipfileDecoder" download_decoder: title: Download Decoder description: Component decoding the download response so records can be extracted. @@ -3103,6 +3127,8 @@ definitions: - "$ref": "#/definitions/IterableDecoder" - "$ref": "#/definitions/XmlDecoder" - "$ref": "#/definitions/GzipJsonDecoder" + - "$ref": "#/definitions/CompositeRawDecoder" + - "$ref": "#/definitions/ZipfileDecoder" $parameters: type: object additionalProperties: true diff --git a/airbyte_cdk/sources/declarative/decoders/__init__.py b/airbyte_cdk/sources/declarative/decoders/__init__.py index bec52137d..45eaf5599 100644 --- a/airbyte_cdk/sources/declarative/decoders/__init__.py +++ b/airbyte_cdk/sources/declarative/decoders/__init__.py @@ -2,7 +2,12 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # -from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import CompositeRawDecoder +from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import ( + CompositeRawDecoder, + GzipParser, + JsonParser, + Parser, +) from airbyte_cdk.sources.declarative.decoders.decoder import Decoder from airbyte_cdk.sources.declarative.decoders.json_decoder import ( GzipJsonDecoder, @@ -15,15 +20,18 @@ PaginationDecoderDecorator, ) from airbyte_cdk.sources.declarative.decoders.xml_decoder import XmlDecoder +from airbyte_cdk.sources.declarative.decoders.zipfile_decoder import ZipfileDecoder __all__ = [ "Decoder", "CompositeRawDecoder", "JsonDecoder", + "JsonParser", "JsonlDecoder", "IterableDecoder", "GzipJsonDecoder", "NoopDecoder", "PaginationDecoderDecorator", "XmlDecoder", + "ZipfileDecoder", ] diff --git a/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py b/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py new file mode 100644 index 000000000..a937a1e4d --- /dev/null +++ b/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py @@ -0,0 +1,59 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +import logging +import zipfile +from dataclasses import dataclass +from io import BytesIO +from typing import Any, Generator, MutableMapping + +import orjson +import requests + +from airbyte_cdk.models import FailureType +from airbyte_cdk.sources.declarative.decoders import Decoder +from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import ( + Parser, +) +from airbyte_cdk.utils import AirbyteTracedException + +logger = logging.getLogger("airbyte") + + +@dataclass +class ZipfileDecoder(Decoder): + parser: Parser + + def is_stream_response(self) -> bool: + return False + + def decode( + self, response: requests.Response + ) -> Generator[MutableMapping[str, Any], None, None]: + try: + with zipfile.ZipFile(BytesIO(response.content)) as zip_file: + for file_name in zip_file.namelist(): + unzipped_content = zip_file.read(file_name) + buffered_content = BytesIO(unzipped_content) + try: + yield from self.parser.parse(buffered_content) + except Exception as e: + logger.error( + f"Failed to parse file: {file_name} from zip file: {response.request.url} with exception {e}." + ) + raise AirbyteTracedException( + message=f"Failed to parse file: {file_name} from zip file.", + internal_message=f"Failed to parse file: {file_name} from zip file: {response.request.url}.", + failure_type=FailureType.system_error, + ) from e + except zipfile.BadZipFile as e: + logger.error( + f"Received an invalid zip file in response to URL: {response.request.url}. " + f"The size of the response body is: {len(response.content)}" + ) + raise AirbyteTracedException( + message="Received an invalid zip file in response.", + internal_message=f"Received an invalid zip file in response to URL: {response.request.url}.", + failure_type=FailureType.system_error, + ) from e diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 17226fc7b..dbd958410 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1222,9 +1222,6 @@ class LegacySessionTokenAuthenticator(BaseModel): class JsonParser(BaseModel): - class Config: - extra = Extra.allow - type: Literal["JsonParser"] encoding: Optional[str] = "utf-8" @@ -1660,6 +1657,18 @@ class CompositeErrorHandler(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class ZipfileDecoder(BaseModel): + class Config: + extra = Extra.allow + + type: Literal["ZipfileDecoder"] + parser: Union[GzipParser, JsonParser, JsonLineParser, CsvParser] = Field( + ..., + description="Parser to parse the decompressed data from the zipfile(s).", + title="Parser", + ) + + class CompositeRawDecoder(BaseModel): type: Literal["CompositeRawDecoder"] parser: Union[GzipParser, JsonParser, JsonLineParser, CsvParser] @@ -1865,7 +1874,7 @@ class SessionTokenAuthenticator(BaseModel): description="Authentication method to use for requests sent to the API, specifying how to inject the session token.", title="Data Request Authentication", ) - decoder: Optional[Union[JsonDecoder, XmlDecoder]] = Field( + decoder: Optional[Union[JsonDecoder, XmlDecoder, CompositeRawDecoder]] = Field( None, description="Component used to decode the response.", title="Decoder" ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2070,6 +2079,7 @@ class SimpleRetriever(BaseModel): XmlDecoder, GzipJsonDecoder, CompositeRawDecoder, + ZipfileDecoder, ] ] = Field( None, @@ -2146,6 +2156,8 @@ class AsyncRetriever(BaseModel): IterableDecoder, XmlDecoder, GzipJsonDecoder, + CompositeRawDecoder, + ZipfileDecoder, ] ] = Field( None, @@ -2160,6 +2172,8 @@ class AsyncRetriever(BaseModel): IterableDecoder, XmlDecoder, GzipJsonDecoder, + CompositeRawDecoder, + ZipfileDecoder, ] ] = Field( None, diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 5ca79afe6..ed966ee45 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -66,6 +66,7 @@ JsonlDecoder, PaginationDecoderDecorator, XmlDecoder, + ZipfileDecoder, ) from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import ( CompositeRawDecoder, @@ -356,6 +357,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( XmlDecoder as XmlDecoderModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + ZipfileDecoder as ZipfileDecoderModel, +) from airbyte_cdk.sources.declarative.partition_routers import ( CartesianProductStreamSlicer, ListPartitionRouter, @@ -571,6 +575,7 @@ def _init_mappings(self) -> None: ConfigComponentsResolverModel: self.create_config_components_resolver, StreamConfigModel: self.create_stream_config, ComponentMappingDefinitionModel: self.create_components_mapping_definition, + ZipfileDecoderModel: self.create_zipfile_decoder, } # Needed for the case where we need to perform a second parse on the fields of a custom component @@ -1796,6 +1801,12 @@ def create_gzipjson_decoder( ) -> GzipJsonDecoder: return GzipJsonDecoder(parameters={}, encoding=model.encoding) + def create_zipfile_decoder( + self, model: ZipfileDecoderModel, config: Config, **kwargs: Any + ) -> ZipfileDecoder: + parser = self._create_component_from_model(model=model.parser, config=config) + return ZipfileDecoder(parser=parser) + def create_gzip_parser( self, model: GzipParserModel, config: Config, **kwargs: Any ) -> GzipParser: diff --git a/unit_tests/sources/declarative/decoders/test_zipfile_decoder.py b/unit_tests/sources/declarative/decoders/test_zipfile_decoder.py new file mode 100644 index 000000000..731895e2e --- /dev/null +++ b/unit_tests/sources/declarative/decoders/test_zipfile_decoder.py @@ -0,0 +1,68 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# +import gzip +import json +import zipfile +from io import BytesIO +from typing import Union + +import pytest +import requests + +from airbyte_cdk.sources.declarative.decoders import GzipParser, JsonParser, ZipfileDecoder + + +def create_zip_from_dict(data: Union[dict, list]) -> bytes: + zip_buffer = BytesIO() + with zipfile.ZipFile(zip_buffer, mode="w") as zip_file: + zip_file.writestr("data.json", data) + return zip_buffer.getvalue() + + +def create_multi_zip_from_dict(data: list) -> bytes: + zip_buffer = BytesIO() + + with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file: + for i, content in enumerate(data): + file_content = json.dumps(content).encode("utf-8") + zip_file.writestr(f"file_{i}.json", file_content) + return zip_buffer.getvalue() + + +@pytest.mark.parametrize( + "json_data", + [ + {"test": "test"}, + {"responses": [{"id": 1}, {"id": 2}]}, + [{"id": 1}, {"id": 2}], + {}, + ], +) +def test_zipfile_decoder_with_single_file_response(requests_mock, json_data): + zipfile_decoder = ZipfileDecoder(parser=GzipParser(inner_parser=JsonParser())) + compressed_data = gzip.compress(json.dumps(json_data).encode()) + zipped_data = create_zip_from_dict(compressed_data) + requests_mock.register_uri("GET", "https://airbyte.io/", content=zipped_data) + response = requests.get("https://airbyte.io/") + + if isinstance(json_data, list): + for i, actual in enumerate(zipfile_decoder.decode(response=response)): + assert actual == json_data[i] + else: + assert next(zipfile_decoder.decode(response=response)) == json_data + + +def test_zipfile_decoder_with_multi_file_response(requests_mock): + data_to_zip = [{"key1": "value1"}, {"key2": "value2"}, {"key3": "value3"}] + + mocked_response = create_multi_zip_from_dict(data_to_zip) + + decoder = ZipfileDecoder(parser=JsonParser()) + requests_mock.register_uri("GET", "https://airbyte.io/", content=mocked_response) + response = requests.get("https://airbyte.io/") + results = list(decoder.decode(response)) + + assert len(results) == 3 + for i, actual in enumerate(results): + assert actual == data_to_zip[i]