-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore(decoder): clean decoders and make csvdecoder available #326
base: main
Are you sure you want to change the base?
Conversation
|
||
def decode( | ||
self, response: requests.Response | ||
) -> Generator[MutableMapping[str, Any], None, None]: | ||
""" | ||
Given the response is an empty string or an emtpy list, the function will return a generator with an empty mapping. | ||
""" | ||
has_yielded = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default behavior of this decoder was a bit weird so I decided to keep it not to push the weird logic in JsonParser
|
||
|
||
@dataclass | ||
class GzipJsonDecoder(JsonDecoder): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removes because it wasn't used anyway. We can introduce this for source-amazon-seller-partner though
📝 WalkthroughWalkthroughThis PR refactors the decoding and parsing architecture. It removes several deprecated decoders and parsers (e.g., GzipJsonDecoder, JsonParser, JsonLineParser, CsvParser) and introduces a unified approach with a new GzipDecoder and renamed CsvDecoder. The CompositeRawDecoder now supports configurable streaming via a new Changes
Sequence Diagram(s)sequenceDiagram
participant C as Caller
participant CRD as CompositeRawDecoder
participant P as Parser
C->>CRD: decode(response)
alt stream_response is True
CRD->>P: parse(response.raw)
else stream_response is False
CRD->>CRD: wrap response.content in BytesIO
CRD->>P: parse(wrapped content)
end
P-->>CRD: return parsed data
CRD-->>C: yield decoded data
Possibly related PRs
Suggested labels
Suggested reviewers
Would this setup work for you? wdyt? ✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (15)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)
2035-2036
: Consider leveraging the model parameters or removing the unused argument.Right now, this method always returns a new JsonDecoder with empty parameters, ignoring the passed-in model. Would it make sense to incorporate model parameters or drop the unused argument to avoid confusion, wdyt?
2039-2040
: Make stream_response configurable or confirm it’s always false.Here, you set stream_response=False for CSV. Are you certain that no streaming scenario is needed for CSV data, or would making it configurable benefit some use cases, wdyt?
2061-2061
: Check for ZipfileDecoder parameters.Currently, the created ZipfileDecoder ignores additional parameters in model.decoder or model.parameters. Do you want to forward them to the parser, or is this intentional, wdyt?
2064-2077
: Consider exposing parameter checks & fallback for decoders.
- The _get_parser method doesn't incorporate model.parameters. If additional settings (like encoding) are required, you might unify that logic here.
- There's a potential for infinitely nested GzipParser if user misconfigures the inner_decoder repeatedly. A recursion limit or check might help.
Wdyt about adding these safeguards?airbyte_cdk/sources/declarative/decoders/json_decoder.py (2)
24-25
: Consider making 'stream_response' a parameter.
It's currently hardcoded to False. Would you like to introduce a parameter to toggle streaming for future flexibility, wdyt?
36-41
: Catching broad exceptions.
Catching Exception might mask unexpected errors. Would you like to handle a more specific exception type, wdyt?unit_tests/sources/declarative/decoders/test_json_decoder.py (2)
11-13
: Great alignment with the new composite decoders!
This import approach looks consistent. Would you consider adding more test coverage to verify interplay between CompositeRawDecoder and JsonDecoder, wdyt?
44-45
: Testing partial streaming scenarios?
We now set stream=True. Would you like to add tests confirming that partial lines or chunked responses are handled gracefully, wdyt?unit_tests/sources/declarative/auth/test_token_provider.py (1)
58-60
: Testing updated token response.
This properly simulates a new token. Maybe we could also test invalid JSON scenarios to ensure robustness, wdyt?unit_tests/sources/declarative/extractors/test_dpath_extractor.py (1)
24-24
: Consider adding a comment explaining the stream_response flag.The initialization looks good, but since this is a test file, it might be helpful to add a comment explaining why
stream_response=True
is needed here, wdyt?-decoder_jsonl = CompositeRawDecoder(parser=JsonLineParser(), stream_response=True) +# stream_response=True is required for JSONL parsing to handle streaming responses correctly +decoder_jsonl = CompositeRawDecoder(parser=JsonLineParser(), stream_response=True)airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py (2)
142-145
: Consider adding docstring for the decode method.The implementation looks good, but since this is a significant change in behavior, would you consider adding a docstring explaining the difference between streaming and non-streaming modes, wdyt?
def decode( self, response: requests.Response ) -> Generator[MutableMapping[str, Any], None, None]: + """Decode the response based on stream_response setting. + + When stream_response is True: + - Uses response.raw for streaming parsing + - Suitable for large responses or JSONL format + When stream_response is False: + - Uses response.content with BytesIO + - Suitable for responses that need to be parsed multiple times + """ if self.is_stream_response(): yield from self.parser.parse(data=response.raw) # type: ignore[arg-type] else: yield from self.parser.parse(data=io.BytesIO(response.content))
134-134
: Nice addition of streaming control! Consider adding docstring?The new
stream_response
flag and its implementation look good. Would you consider adding a docstring to explain when to use each mode? For example:stream_response: bool = True + """ + Controls how responses are processed: + - True: Streams response.raw directly (memory efficient for large responses) + - False: Loads response.content into memory (allows multiple iterations) + """Also applies to: 136-137, 142-145
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
1268-1272
: Consider adding docstring for CsvDecoder.The implementation looks good, but would you consider adding a docstring explaining the purpose and configuration options of the CSV decoder, wdyt?
class CsvDecoder(BaseModel): type: Literal["CsvDecoder"] + """Decoder for CSV formatted data. + + Attributes: + encoding: The character encoding to use (default: utf-8) + delimiter: The character used to separate fields (default: comma) + """ encoding: Optional[str] = "utf-8" delimiter: Optional[str] = ","airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)
3012-3025
: CsvDecoder – Making CSV decoding available
Introducing the CsvDecoder with clear defaults (utf-8 encoding and a comma delimiter) is a clean and welcome addition. It looks like it accomplishes the PR objective to make CSV decoding available while cleaning up the decoders. Would you be open to adding some tests for different CSV configurations to ensure robustness? wdyt?unit_tests/sources/declarative/decoders/test_composite_decoder.py (1)
203-213
: Great test for stream consumption! Consider adding error message check?The test for streamed response consumption looks good. Would you consider also asserting the specific error message to ensure the right error is being raised? Something like:
- with pytest.raises(Exception): + with pytest.raises(Exception, match="Response body has already been consumed"): list(composite_raw_decoder.decode(response))
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (11)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
(3 hunks)airbyte_cdk/sources/declarative/decoders/__init__.py
(0 hunks)airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py
(2 hunks)airbyte_cdk/sources/declarative/decoders/json_decoder.py
(1 hunks)airbyte_cdk/sources/declarative/models/declarative_component_schema.py
(8 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(5 hunks)unit_tests/sources/declarative/auth/test_token_provider.py
(3 hunks)unit_tests/sources/declarative/decoders/test_composite_decoder.py
(1 hunks)unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py
(0 hunks)unit_tests/sources/declarative/decoders/test_json_decoder.py
(2 hunks)unit_tests/sources/declarative/extractors/test_dpath_extractor.py
(1 hunks)
💤 Files with no reviewable changes (2)
- airbyte_cdk/sources/declarative/decoders/init.py
- unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py
🧰 Additional context used
🪛 GitHub Actions: Linters
unit_tests/sources/declarative/decoders/test_json_decoder.py
[warning] 1-1: Code would be reformatted to adhere to style guidelines.
unit_tests/sources/declarative/auth/test_token_provider.py
[warning] 1-1: Code would be reformatted to adhere to style guidelines.
unit_tests/sources/declarative/decoders/test_composite_decoder.py
[warning] 1-1: Code would be reformatted to adhere to style guidelines.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
[warning] 1-1: Code would be reformatted to adhere to style guidelines.
airbyte_cdk/sources/declarative/decoders/json_decoder.py
[warning] 1-1: Code would be reformatted to adhere to style guidelines.
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (21)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)
152-152
: Looks good!The added import for CsvDecoderModel synchronizes well with the rest of the codebase. No issues found, wdyt?
227-227
: Nice addition for GzipDecoderModel.This import appears consistent with your usage in _get_parser. No concerns, wdyt?
2045-2046
: Validate streaming approach for JSONL.Here, the method sets stream_response=True for JSONL. This is likely correct given JSON lines are commonly processed in a streaming manner. Have you tested large JSONL data with this approach, wdyt?
airbyte_cdk/sources/declarative/decoders/json_decoder.py (3)
13-13
: Thank you for adopting the composite approach.
This new import ensures we unify decoding logic with JsonParser. Would you like to verify usage in other parts of the codebase for consistency, wdyt?
28-28
: Pass-through of 'is_stream_response' looks good.
No issues here!
44-44
: Verify empty response behavior.
We yield an empty dict when nothing was decoded. Are we certain we want a single empty mapping rather than not yielding at all or returning an empty list, wdyt?unit_tests/sources/declarative/auth/test_token_provider.py (2)
4-4
: Importing 'json' is good.
This helps us easily create mock responses. No concerns here!
21-21
: Switching to '.content' is more realistic.
Setting the token via encoded JSON simulates real response behavior. Would you like to confirm that bytes-to-JSON decoding logic is correctly handled in production, wdyt?unit_tests/sources/declarative/extractors/test_dpath_extractor.py (2)
12-13
: LGTM! Clean import changes.The imports are correctly updated to use the new decoder architecture.
12-13
: LGTM! Nice refactoring of the JsonlDecoder.The change to use
CompositeRawDecoder
withJsonLineParser
looks good and aligns with the decoder cleanup objectives. The test cases continue to pass with the new implementation.Also applies to: 24-24
airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py (2)
134-134
: LGTM! Good default value choice.Setting
stream_response=True
as default maintains backward compatibility while allowing opt-out when needed.
3-3
: LGTM! Added required import.The addition of
io
import is necessary for usingBytesIO
in the non-streaming mode.unit_tests/sources/declarative/decoders/test_composite_decoder.py (3)
203-213
: LGTM! Good test for consumed stream behavior.The test verifies that attempting to decode an already consumed stream raises an exception, which is the expected behavior.
215-223
: LGTM! Good test for non-streaming mode.The test verifies that non-streaming mode allows multiple decodes of the same response.
215-223
: LGTM! Good test for non-streamed mode.The test effectively verifies that non-streamed responses can be decoded multiple times.
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (4)
1664-1666
: LGTM! Good design for GzipDecoder.The GzipDecoder with inner_decoder support allows for flexible composition of decoders.
1268-1272
: LGTM! Clean CsvDecoder implementation.The CsvDecoder class looks good with appropriate default values for encoding and delimiter.
1664-1666
: LGTM! Nice GzipDecoder implementation.The GzipDecoder class with inner_decoder support looks good and aligns with the decoder cleanup objectives.
1912-1914
: LGTM! Simplified decoder options.The update to SessionTokenAuthenticator's decoder field to only allow JsonDecoder and XmlDecoder makes sense.
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (2)
2141-2155
: ZipfileDecoder – Updated property from "parser" to "decoder"
The new changes now require a "decoder" property (instead of the old "parser") and correctly reference the unified decoders (CsvDecoder, GzipDecoder, JsonDecoder, and JsonlDecoder). Would you consider renaming the inner title from "Parser" to "Decoder" for consistency with the rest of the schema? wdyt?
2997-3011
: GzipDecoder – Flexible inner_decoder implementation
The new GzipDecoder correctly requires an inner_decoder that can be one of CsvDecoder, GzipDecoder, JsonDecoder, or JsonlDecoder, which aligns with the unified decoding strategy. Have you thought about potential recursion issues if a GzipDecoder is nested inside another? It might be useful to add a note or safeguard if deep-nested gzip decoding isn’t expected. wdyt?
/autofix
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (5)
airbyte_cdk/sources/declarative/decoders/json_decoder.py (2)
24-25
: Consider documenting the stream_response=False choice?The initialization looks good! Would it be helpful to add a comment explaining why
stream_response=False
is the default here? This could help future maintainers understand the design decision, wdyt?
30-45
: Consider more specific error handling?The code looks good overall! A few thoughts:
- The generic
Exception
catch might hide specific issues. Would it be helpful to catch and log specific exceptions likeJSONDecodeError
separately, wdyt?- The empty dict fallback is a nice safety net, but should we log a warning when this happens to help with debugging?
try: for element in self._decoder.decode(response): yield element has_yielded = True - except Exception: + except json.JSONDecodeError as e: + logger.warning(f"Failed to decode JSON response: {e}") + yield {} + except Exception as e: + logger.warning(f"Unexpected error while decoding response: {e}") yield {}unit_tests/sources/declarative/decoders/test_json_decoder.py (1)
44-48
: Consider adding error case tests?The happy path tests look good! Would it be valuable to add some error case tests, wdyt? For example:
- Malformed JSON lines
- Mixed valid/invalid JSON lines
- Empty lines between valid JSON
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
2035-2048
: LGTM! Consider adding docstrings for better maintainability?The implementation looks good. The stream_response flag is correctly set based on the decoder type. Would you consider adding docstrings to explain the purpose and behavior of each decoder method? This could help future maintainers understand the differences between them, wdyt?
Example docstring for
create_csv_decoder
:def create_csv_decoder(model: CsvDecoderModel, config: Config, **kwargs: Any) -> Decoder: """Creates a CSV decoder using CompositeRawDecoder with CsvParser. Args: model: The CSV decoder model containing encoding and delimiter settings. config: The connector configuration. **kwargs: Additional keyword arguments. Returns: A CompositeRawDecoder instance configured for CSV parsing. """
2066-2083
: LGTM! Consider enhancing error messages?The implementation is clean and handles all decoder types appropriately. Would you consider making the error messages more specific by including the list of supported decoders in the error message? This could help users quickly understand what decoders are available, wdyt?
Example enhanced error message:
- raise ValueError(f"Decoder type {model} does not have parser associated to it") + raise ValueError(f"Decoder type {model} does not have parser associated to it. Supported decoders are: JsonDecoder, JsonlDecoder, CsvDecoder, and GzipDecoder")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
airbyte_cdk/sources/declarative/decoders/json_decoder.py
(1 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(5 hunks)unit_tests/sources/declarative/auth/test_token_provider.py
(3 hunks)unit_tests/sources/declarative/decoders/test_composite_decoder.py
(1 hunks)unit_tests/sources/declarative/decoders/test_json_decoder.py
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- unit_tests/sources/declarative/decoders/test_composite_decoder.py
- unit_tests/sources/declarative/auth/test_token_provider.py
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (5)
airbyte_cdk/sources/declarative/decoders/json_decoder.py (1)
13-13
: LGTM! Nice simplification of the decoder structure.The removal of the dataclass decorator and delegation to CompositeRawDecoder makes the code more maintainable and follows the composition over inheritance principle.
Also applies to: 19-22
unit_tests/sources/declarative/decoders/test_json_decoder.py (1)
11-13
: LGTM! Clean import updates.The imports are properly aligned with the new decoder structure.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)
152-152
: LGTM! Clean addition of CsvDecoderModel.The import and mapping follow the established pattern in the codebase.
Also applies to: 521-521
227-227
: LGTM! Clean addition of GzipDecoderModel.The import follows the established pattern in the codebase.
2063-2063
: LGTM! Clean refactor of create_zipfile_decoder.The change nicely leverages the new _get_parser method, making the code more maintainable and consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't looked into what that would mean for our connectors and published manifests — I think the only concern is around GzipJsonDecoder
(not used?) and CompositeRawDecoded
that I believe is used in a few spots, but it's a very simple manifest change to update, right?
class JsonDecoder(Decoder): | ||
""" | ||
Decoder strategy that returns the json-encoded content of a response, if any. | ||
""" | ||
|
||
parameters: InitVar[Mapping[str, Any]] | ||
def __init__(self, parameters: Mapping[str, Any]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: a comment on what _decoder
is would be really good to have a few months form now.
@@ -1720,18 +1701,13 @@ class Config: | |||
extra = Extra.allow | |||
|
|||
type: Literal["ZipfileDecoder"] | |||
parser: Union[GzipParser, JsonParser, JsonLineParser, CsvParser] = Field( | |||
decoder: Union[CsvDecoder, GzipDecoder, JsonDecoder, JsonlDecoder] = Field( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Asking to learn: why is it that GzipDecoder
has innner_decoder
, but ZipfileDecoder
has just decoder
? Should these names be consistent?
return CsvParser(encoding=model.encoding, delimiter=model.delimiter) | ||
elif isinstance(model, GzipDecoderModel): | ||
return GzipParser( | ||
inner_parser=ModelToComponentFactory._get_parser(model.inner_decoder, config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to make sure I'm not tripping: this calls the func recursively, but for the inner decoder, right?
type: string | ||
default: utf-8 | ||
CsvParser: | ||
- "$ref": "#/definitions/CsvDecoder" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing stops us from shoving XmlDecoder here as well, but we're just not sure if there is a usecase for it?
What
https://github.com/airbytehq/airbyte-internal-issues/issues/11616
This is a breaking change but only for an experimental component or one that is only used in source-amplitude so I'm fine keeping this a minor change.
Summary by CodeRabbit
Summary by CodeRabbit
New Features
Refactor
Tests
CompositeRawDecoder
to ensure correct behavior with consumed and non-streamed responses.