Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adds ZipfileDecoder component #169

Merged
merged 40 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
e68f36f
initial JsonParser component
pnilan Dec 10, 2024
a8a7bb3
update parser
pnilan Dec 11, 2024
254f877
add tests for json parser
pnilan Dec 11, 2024
8df239a
update parser and tests to yield empty dict if unparseable.
pnilan Dec 11, 2024
8c7d5f8
add zipfile_decoder
pnilan Dec 11, 2024
92574df
chore: format code
pnilan Dec 11, 2024
6e4b376
Merge branch 'pnilan/declarative/parsers' into pnilan/declarative/zip…
pnilan Dec 11, 2024
ab3f404
update zipfile_decoder and relevants tests
pnilan Dec 12, 2024
82a15c9
Merge branch 'main' into pnilan/declarative/parsers
pnilan Dec 12, 2024
49d0ec8
Merge branch 'pnilan/declarative/parsers' into pnilan/declarative/zip…
pnilan Dec 12, 2024
96ec874
remove errant comment
pnilan Dec 13, 2024
0b3b5e1
Merge branch 'main' into pnilan/declarative/parsers
pnilan Jan 10, 2025
9fd93cb
conform tests
pnilan Jan 10, 2025
1892a03
initial test updates
pnilan Jan 10, 2025
51118f1
update JsonParser and relevant tests
pnilan Jan 10, 2025
34a710d
chore: format/type-check
pnilan Jan 10, 2025
060178a
remove orjson from composite_raw_decoder file
pnilan Jan 14, 2025
bf8dd26
Merge branch 'main' into pnilan/declarative/parsers
pnilan Jan 14, 2025
d9b6df3
chore: format code
pnilan Jan 14, 2025
f20fffc
add additional test
pnilan Jan 14, 2025
9ce2c28
update to fallback to json library if orjson fails, update test to us…
pnilan Jan 14, 2025
7e7b2c4
add `JsonParser` to GzipDecoder and CompositeRawDecoder "anyOf" list
pnilan Jan 14, 2025
23cbfb7
update to simplify orjson/json parsing
pnilan Jan 14, 2025
1c2a832
chore: type-check
pnilan Jan 14, 2025
66aaae9
unlock `CompositeRawDecoder` w/ `JsonParser` support for pagination
pnilan Jan 14, 2025
00cf7b1
update conditional validations for decoders/parsers for pagination
pnilan Jan 15, 2025
b7aa78f
remove errant print
pnilan Jan 15, 2025
7b41732
chore: coderabbitai suggestions
pnilan Jan 15, 2025
3f550f2
update parservalidation method
pnilan Jan 15, 2025
27bf5a7
Merge branch 'main' into pnilan/declarative/parsers
pnilan Jan 15, 2025
bd724bf
Merge branch 'pnilan/declarative/parsers' into pnilan/declarative/zip…
pnilan Jan 15, 2025
350fcdb
remove unnecessary parser
pnilan Jan 15, 2025
aae3e77
update ZipfileDecoder to hanlde underlying parsers
pnilan Jan 15, 2025
fe6859f
update types
pnilan Jan 15, 2025
907f628
Merge branch 'main' into pnilan/declarative/zipfiledecoder
pnilan Jan 16, 2025
8a1ccf0
add `ZipfileDecoder` to `anyOf` validator in declarative component sc…
pnilan Jan 16, 2025
fcb3184
update `anyOf` validations to include `ZipfileDecoder` in declarative…
pnilan Jan 16, 2025
1c8cd66
update zipfiledecoder and relevant tests
pnilan Jan 17, 2025
1777cac
adds JsonLineParser and CsvParser to available underlying parsers for…
pnilan Jan 17, 2025
c0b2130
close zipfile context, add exception logging at decoder level, and ad…
pnilan Jan 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1513,6 +1513,7 @@ definitions:
anyOf:
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/CompositeRawDecoder"
$parameters:
type: object
additionalProperties: true
Expand Down Expand Up @@ -2067,6 +2068,26 @@ definitions:
$parameters:
type: object
additionalProperties: true
ZipfileDecoder:
title: Zipfile Decoder
description: Decoder for response data that is returned as zipfile(s).
type: object
additionalProperties: true
required:
- type
- parser
properties:
type:
type: string
enum: [ZipfileDecoder]
parser:
title: Parser
description: Parser to parse the decompressed data from the zipfile(s).
anyOf:
- "$ref": "#/definitions/GzipParser"
- "$ref": "#/definitions/JsonParser"
- "$ref": "#/definitions/JsonLineParser"
- "$ref": "#/definitions/CsvParser"
ListPartitionRouter:
title: List Partition Router
description: A Partition router that specifies a list of attributes where each attribute describes a portion of the complete data set for a stream. During a sync, each value is iterated over and can be used as input to outbound API requests.
Expand Down Expand Up @@ -2895,6 +2916,7 @@ definitions:
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/GzipJsonDecoder"
- "$ref": "#/definitions/CompositeRawDecoder"
- "$ref": "#/definitions/ZipfileDecoder"
$parameters:
type: object
additionalProperties: true
Expand Down Expand Up @@ -3093,6 +3115,8 @@ definitions:
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/GzipJsonDecoder"
- "$ref": "#/definitions/CompositeRawDecoder"
- "$ref": "#/definitions/ZipfileDecoder"
download_decoder:
title: Download Decoder
description: Component decoding the download response so records can be extracted.
Expand All @@ -3103,6 +3127,8 @@ definitions:
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/GzipJsonDecoder"
- "$ref": "#/definitions/CompositeRawDecoder"
- "$ref": "#/definitions/ZipfileDecoder"
$parameters:
type: object
additionalProperties: true
Expand Down
10 changes: 9 additions & 1 deletion airbyte_cdk/sources/declarative/decoders/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import CompositeRawDecoder
from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import (
CompositeRawDecoder,
GzipParser,
JsonParser,
Parser,
)
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import (
GzipJsonDecoder,
Expand All @@ -15,15 +20,18 @@
PaginationDecoderDecorator,
)
from airbyte_cdk.sources.declarative.decoders.xml_decoder import XmlDecoder
from airbyte_cdk.sources.declarative.decoders.zipfile_decoder import ZipfileDecoder

__all__ = [
"Decoder",
"CompositeRawDecoder",
"JsonDecoder",
"JsonParser",
"JsonlDecoder",
"IterableDecoder",
"GzipJsonDecoder",
"NoopDecoder",
"PaginationDecoderDecorator",
"XmlDecoder",
"ZipfileDecoder",
]
59 changes: 59 additions & 0 deletions airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

import logging
import zipfile
from dataclasses import dataclass
from io import BytesIO
from typing import Any, Generator, MutableMapping

import orjson
import requests

from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.declarative.decoders import Decoder
from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import (
Parser,
)
from airbyte_cdk.utils import AirbyteTracedException

logger = logging.getLogger("airbyte")


@dataclass
class ZipfileDecoder(Decoder):
parser: Parser

def is_stream_response(self) -> bool:
return False

def decode(
self, response: requests.Response
) -> Generator[MutableMapping[str, Any], None, None]:
try:
with zipfile.ZipFile(BytesIO(response.content)) as zip_file:
for file_name in zip_file.namelist():
unzipped_content = zip_file.read(file_name)
buffered_content = BytesIO(unzipped_content)
try:
yield from self.parser.parse(buffered_content)
except Exception as e:
logger.error(
f"Failed to parse file: {file_name} from zip file: {response.request.url} with exception {e}."
)
raise AirbyteTracedException(
message=f"Failed to parse file: {file_name} from zip file.",
internal_message=f"Failed to parse file: {file_name} from zip file: {response.request.url}.",
failure_type=FailureType.system_error,
) from e
except zipfile.BadZipFile as e:
logger.error(
f"Received an invalid zip file in response to URL: {response.request.url}. "
f"The size of the response body is: {len(response.content)}"
)
raise AirbyteTracedException(
message="Received an invalid zip file in response.",
internal_message=f"Received an invalid zip file in response to URL: {response.request.url}.",
failure_type=FailureType.system_error,
) from e
Original file line number Diff line number Diff line change
Expand Up @@ -1222,9 +1222,6 @@ class LegacySessionTokenAuthenticator(BaseModel):


class JsonParser(BaseModel):
class Config:
extra = Extra.allow

type: Literal["JsonParser"]
encoding: Optional[str] = "utf-8"

Expand Down Expand Up @@ -1660,6 +1657,18 @@ class CompositeErrorHandler(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class ZipfileDecoder(BaseModel):
class Config:
extra = Extra.allow

type: Literal["ZipfileDecoder"]
parser: Union[GzipParser, JsonParser, JsonLineParser, CsvParser] = Field(
...,
description="Parser to parse the decompressed data from the zipfile(s).",
title="Parser",
)


class CompositeRawDecoder(BaseModel):
type: Literal["CompositeRawDecoder"]
parser: Union[GzipParser, JsonParser, JsonLineParser, CsvParser]
Expand Down Expand Up @@ -1865,7 +1874,7 @@ class SessionTokenAuthenticator(BaseModel):
description="Authentication method to use for requests sent to the API, specifying how to inject the session token.",
title="Data Request Authentication",
)
decoder: Optional[Union[JsonDecoder, XmlDecoder]] = Field(
decoder: Optional[Union[JsonDecoder, XmlDecoder, CompositeRawDecoder]] = Field(
None, description="Component used to decode the response.", title="Decoder"
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
Expand Down Expand Up @@ -2070,6 +2079,7 @@ class SimpleRetriever(BaseModel):
XmlDecoder,
GzipJsonDecoder,
CompositeRawDecoder,
ZipfileDecoder,
]
] = Field(
None,
Expand Down Expand Up @@ -2146,6 +2156,8 @@ class AsyncRetriever(BaseModel):
IterableDecoder,
XmlDecoder,
GzipJsonDecoder,
CompositeRawDecoder,
ZipfileDecoder,
]
] = Field(
None,
Expand All @@ -2160,6 +2172,8 @@ class AsyncRetriever(BaseModel):
IterableDecoder,
XmlDecoder,
GzipJsonDecoder,
CompositeRawDecoder,
ZipfileDecoder,
]
] = Field(
None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
JsonlDecoder,
PaginationDecoderDecorator,
XmlDecoder,
ZipfileDecoder,
)
from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import (
CompositeRawDecoder,
Expand Down Expand Up @@ -356,6 +357,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
XmlDecoder as XmlDecoderModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ZipfileDecoder as ZipfileDecoderModel,
)
from airbyte_cdk.sources.declarative.partition_routers import (
CartesianProductStreamSlicer,
ListPartitionRouter,
Expand Down Expand Up @@ -571,6 +575,7 @@ def _init_mappings(self) -> None:
ConfigComponentsResolverModel: self.create_config_components_resolver,
StreamConfigModel: self.create_stream_config,
ComponentMappingDefinitionModel: self.create_components_mapping_definition,
ZipfileDecoderModel: self.create_zipfile_decoder,
}

# Needed for the case where we need to perform a second parse on the fields of a custom component
Expand Down Expand Up @@ -1796,6 +1801,12 @@ def create_gzipjson_decoder(
) -> GzipJsonDecoder:
return GzipJsonDecoder(parameters={}, encoding=model.encoding)

def create_zipfile_decoder(
self, model: ZipfileDecoderModel, config: Config, **kwargs: Any
) -> ZipfileDecoder:
parser = self._create_component_from_model(model=model.parser, config=config)
return ZipfileDecoder(parser=parser)

def create_gzip_parser(
self, model: GzipParserModel, config: Config, **kwargs: Any
) -> GzipParser:
Expand Down
68 changes: 68 additions & 0 deletions unit_tests/sources/declarative/decoders/test_zipfile_decoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#
import gzip
import json
import zipfile
from io import BytesIO
from typing import Union

import pytest
import requests

from airbyte_cdk.sources.declarative.decoders import GzipParser, JsonParser, ZipfileDecoder


def create_zip_from_dict(data: Union[dict, list]) -> bytes:
zip_buffer = BytesIO()
with zipfile.ZipFile(zip_buffer, mode="w") as zip_file:
zip_file.writestr("data.json", data)
return zip_buffer.getvalue()


def create_multi_zip_from_dict(data: list) -> bytes:
zip_buffer = BytesIO()

with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file:
for i, content in enumerate(data):
file_content = json.dumps(content).encode("utf-8")
zip_file.writestr(f"file_{i}.json", file_content)
return zip_buffer.getvalue()


@pytest.mark.parametrize(
"json_data",
[
{"test": "test"},
{"responses": [{"id": 1}, {"id": 2}]},
[{"id": 1}, {"id": 2}],
{},
],
)
def test_zipfile_decoder_with_single_file_response(requests_mock, json_data):
zipfile_decoder = ZipfileDecoder(parser=GzipParser(inner_parser=JsonParser()))
compressed_data = gzip.compress(json.dumps(json_data).encode())
zipped_data = create_zip_from_dict(compressed_data)
requests_mock.register_uri("GET", "https://airbyte.io/", content=zipped_data)
response = requests.get("https://airbyte.io/")

if isinstance(json_data, list):
for i, actual in enumerate(zipfile_decoder.decode(response=response)):
assert actual == json_data[i]
else:
assert next(zipfile_decoder.decode(response=response)) == json_data


def test_zipfile_decoder_with_multi_file_response(requests_mock):
data_to_zip = [{"key1": "value1"}, {"key2": "value2"}, {"key3": "value3"}]

mocked_response = create_multi_zip_from_dict(data_to_zip)

decoder = ZipfileDecoder(parser=JsonParser())
requests_mock.register_uri("GET", "https://airbyte.io/", content=mocked_response)
response = requests.get("https://airbyte.io/")
results = list(decoder.decode(response))

assert len(results) == 3
for i, actual in enumerate(results):
assert actual == data_to_zip[i]
Loading