Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adds ZipfileDecoder component #169

Merged
merged 40 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
e68f36f
initial JsonParser component
pnilan Dec 10, 2024
a8a7bb3
update parser
pnilan Dec 11, 2024
254f877
add tests for json parser
pnilan Dec 11, 2024
8df239a
update parser and tests to yield empty dict if unparseable.
pnilan Dec 11, 2024
8c7d5f8
add zipfile_decoder
pnilan Dec 11, 2024
92574df
chore: format code
pnilan Dec 11, 2024
6e4b376
Merge branch 'pnilan/declarative/parsers' into pnilan/declarative/zip…
pnilan Dec 11, 2024
ab3f404
update zipfile_decoder and relevants tests
pnilan Dec 12, 2024
82a15c9
Merge branch 'main' into pnilan/declarative/parsers
pnilan Dec 12, 2024
49d0ec8
Merge branch 'pnilan/declarative/parsers' into pnilan/declarative/zip…
pnilan Dec 12, 2024
96ec874
remove errant comment
pnilan Dec 13, 2024
0b3b5e1
Merge branch 'main' into pnilan/declarative/parsers
pnilan Jan 10, 2025
9fd93cb
conform tests
pnilan Jan 10, 2025
1892a03
initial test updates
pnilan Jan 10, 2025
51118f1
update JsonParser and relevant tests
pnilan Jan 10, 2025
34a710d
chore: format/type-check
pnilan Jan 10, 2025
060178a
remove orjson from composite_raw_decoder file
pnilan Jan 14, 2025
bf8dd26
Merge branch 'main' into pnilan/declarative/parsers
pnilan Jan 14, 2025
d9b6df3
chore: format code
pnilan Jan 14, 2025
f20fffc
add additional test
pnilan Jan 14, 2025
9ce2c28
update to fallback to json library if orjson fails, update test to us…
pnilan Jan 14, 2025
7e7b2c4
add `JsonParser` to GzipDecoder and CompositeRawDecoder "anyOf" list
pnilan Jan 14, 2025
23cbfb7
update to simplify orjson/json parsing
pnilan Jan 14, 2025
1c2a832
chore: type-check
pnilan Jan 14, 2025
66aaae9
unlock `CompositeRawDecoder` w/ `JsonParser` support for pagination
pnilan Jan 14, 2025
00cf7b1
update conditional validations for decoders/parsers for pagination
pnilan Jan 15, 2025
b7aa78f
remove errant print
pnilan Jan 15, 2025
7b41732
chore: coderabbitai suggestions
pnilan Jan 15, 2025
3f550f2
update parservalidation method
pnilan Jan 15, 2025
27bf5a7
Merge branch 'main' into pnilan/declarative/parsers
pnilan Jan 15, 2025
bd724bf
Merge branch 'pnilan/declarative/parsers' into pnilan/declarative/zip…
pnilan Jan 15, 2025
350fcdb
remove unnecessary parser
pnilan Jan 15, 2025
aae3e77
update ZipfileDecoder to hanlde underlying parsers
pnilan Jan 15, 2025
fe6859f
update types
pnilan Jan 15, 2025
907f628
Merge branch 'main' into pnilan/declarative/zipfiledecoder
pnilan Jan 16, 2025
8a1ccf0
add `ZipfileDecoder` to `anyOf` validator in declarative component sc…
pnilan Jan 16, 2025
fcb3184
update `anyOf` validations to include `ZipfileDecoder` in declarative…
pnilan Jan 16, 2025
1c8cd66
update zipfiledecoder and relevant tests
pnilan Jan 17, 2025
1777cac
adds JsonLineParser and CsvParser to available underlying parsers for…
pnilan Jan 17, 2025
c0b2130
close zipfile context, add exception logging at decoder level, and ad…
pnilan Jan 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ definitions:
properties:
type:
type: string
enum: [ CustomSchemaNormalization ]
enum: [CustomSchemaNormalization]
class_name:
title: Class Name
description: Fully-qualified name of the class that will be implementing the custom normalization. The format is `source_<name>.<package>.<class_name>`.
Expand Down Expand Up @@ -2042,6 +2042,24 @@ definitions:
$parameters:
type: object
additionalProperties: true
ZipfileDecoder:
title: Zipfile Decoder
description: Decoder for response data that is returned as zipfile(s).
type: object
additionalProperties: true
required:
- type
- parser
properties:
type:
type: string
enum: [ZipfileDecoder]
parser:
title: Parser
description: Parser to parse the decompressed data from the zipfile(s).
anyOf:
- "$ref": "#/definitions/GzipParser"
- "$ref": "#/definitions/JsonParser"
ListPartitionRouter:
title: List Partition Router
description: A Partition router that specifies a list of attributes where each attribute describes a portion of the complete data set for a stream. During a sync, each value is iterated over and can be used as input to outbound API requests.
Expand Down Expand Up @@ -2886,6 +2904,7 @@ definitions:
parser:
anyOf:
- "$ref": "#/definitions/GzipParser"
- "$ref": "#/definitions/JsonParser"
- "$ref": "#/definitions/JsonLineParser"
- "$ref": "#/definitions/CsvParser"
# PARSERS
Expand All @@ -2902,6 +2921,21 @@ definitions:
anyOf:
- "$ref": "#/definitions/JsonLineParser"
- "$ref": "#/definitions/CsvParser"
- "$ref": "#/definitions/JsonParser"
JsonParser:
title: JsonParser
description: Parser used for parsing str, bytes, or bytearray data and returning data in a dictionary format.
type: object
additionalProperties: true
required:
- type
properties:
type:
type: string
enum: [JsonParser]
encoding:
type: string
default: utf-8
JsonLineParser:
type: object
required:
Expand Down
10 changes: 9 additions & 1 deletion airbyte_cdk/sources/declarative/decoders/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import CompositeRawDecoder
from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import (
CompositeRawDecoder,
GzipParser,
JsonParser,
Parser,
)
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import (
GzipJsonDecoder,
Expand All @@ -15,15 +20,18 @@
PaginationDecoderDecorator,
)
from airbyte_cdk.sources.declarative.decoders.xml_decoder import XmlDecoder
from airbyte_cdk.sources.declarative.decoders.zipfile_decoder import ZipfileDecoder

__all__ = [
"Decoder",
"CompositeRawDecoder",
"JsonDecoder",
"JsonParser",
"JsonlDecoder",
"IterableDecoder",
"GzipJsonDecoder",
"NoopDecoder",
"PaginationDecoderDecorator",
"XmlDecoder",
"ZipfileDecoder",
]
43 changes: 43 additions & 0 deletions airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
from io import BufferedIOBase, TextIOWrapper
from typing import Any, Generator, MutableMapping, Optional

import orjson
import requests

from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.utils import AirbyteTracedException

logger = logging.getLogger("airbyte")

Expand Down Expand Up @@ -42,6 +45,46 @@ def parse(
yield from self.inner_parser.parse(gzipobj)


@dataclass
class JsonParser(Parser):
encoding: str = "utf-8"

def parse(self, data: BufferedIOBase) -> Generator[MutableMapping[str, Any], None, None]:
"""
Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data.
"""
raw_data = data.read()
body_json = self._parse_orjson(raw_data) or self._parse_json(raw_data)

if body_json is None:
raise AirbyteTracedException(
message="Response JSON data failed to be parsed. See logs for more information.",
internal_message=f"Response JSON data failed to be parsed.",
failure_type=FailureType.system_error,
)

if isinstance(body_json, list):
yield from body_json
else:
yield from [body_json]

def _parse_orjson(self, raw_data: bytes) -> Optional[Any]:
try:
return orjson.loads(raw_data.decode(self.encoding))
except Exception as exc:
logger.debug(
f"Failed to parse JSON data using orjson library. Falling back to json library. {exc}"
)
return None

def _parse_json(self, raw_data: bytes) -> Optional[Any]:
try:
return json.loads(raw_data.decode(self.encoding))
except Exception as exc:
logger.error(f"Failed to parse JSON data using json library. {exc}")
return None


@dataclass
class JsonLineParser(Parser):
encoding: Optional[str] = "utf-8"
Expand Down
43 changes: 43 additions & 0 deletions airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

import gzip
import io
import logging
import zipfile
from dataclasses import InitVar, dataclass
from typing import Any, Generator, Mapping, MutableMapping, Optional

import requests

from airbyte_cdk.sources.declarative.decoders import Decoder
from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import Parser

logger = logging.getLogger("airbyte")


@dataclass
class ZipfileDecoder(Decoder):
parser: Parser

def is_stream_response(self) -> bool:
return True

def decode(
self, response: requests.Response
) -> Generator[MutableMapping[str, Any], None, None]:
try:
zip_file = zipfile.ZipFile(io.BytesIO(response.content))
except zipfile.BadZipFile as e:
logger.exception(e)
logger.error(
f"Received an invalid zip file in response to URL: {response.request.url}. "
f"The size of the response body is: {len(response.content)}"
)
yield {}

pnilan marked this conversation as resolved.
Show resolved Hide resolved
for filename in zip_file.namelist():
with zip_file.open(filename) as file:
buffered_file = io.BufferedReader(file)
yield from self.parser.parse(buffered_file)
pnilan marked this conversation as resolved.
Show resolved Hide resolved
Loading
Loading