From 5a202f7ed4f0eb89f8c02dbb1b6c33b6eb8d77ba Mon Sep 17 00:00:00 2001 From: Catherine Noll Date: Thu, 15 Feb 2024 09:38:07 -0500 Subject: [PATCH 1/2] File-based CDK: enqueue AirbyteMessage of type record instead of sending to the message repository (#35318) --- .../sources/file_based/stream/concurrent/adapters.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py index fdcdec54ad0d..abaa8f7d044f 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py @@ -243,6 +243,9 @@ def read(self) -> Iterable[Record]: data_to_return = dict(record_data) self._stream.transformer.transform(data_to_return, self._stream.get_json_schema()) yield Record(data_to_return, self.stream_name()) + elif isinstance(record_data, AirbyteMessage) and record_data.type == Type.RECORD: + # `AirbyteMessage`s of type `Record` should also be yielded so they are enqueued + yield Record(record_data.record.data, self.stream_name()) else: self._message_repository.emit_message(record_data) except Exception as e: From 8e7618f845c7c293d5b678321234e9f1ac72135f Mon Sep 17 00:00:00 2001 From: clnoll Date: Thu, 15 Feb 2024 14:57:53 +0000 Subject: [PATCH 2/2] =?UTF-8?q?=F0=9F=A4=96=20Bump=20patch=20version=20of?= =?UTF-8?q?=20Python=20CDK?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- airbyte-cdk/python/.bumpversion.cfg | 2 +- airbyte-cdk/python/CHANGELOG.md | 3 +++ airbyte-cdk/python/Dockerfile | 4 ++-- airbyte-cdk/python/setup.py | 2 +- 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/airbyte-cdk/python/.bumpversion.cfg b/airbyte-cdk/python/.bumpversion.cfg index 0e3c50c1fd4c..a2e957bf86c2 100644 --- a/airbyte-cdk/python/.bumpversion.cfg +++ b/airbyte-cdk/python/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.63.0 +current_version = 0.63.1 commit = False [bumpversion:file:setup.py] diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index c0615ff6deb1..3853c061679e 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.63.1 +File-based CDK: fix record enqueuing + ## 0.63.0 Per-stream error reporting and continue syncing on error by default diff --git a/airbyte-cdk/python/Dockerfile b/airbyte-cdk/python/Dockerfile index a910fd1a14d3..bba8e056a664 100644 --- a/airbyte-cdk/python/Dockerfile +++ b/airbyte-cdk/python/Dockerfile @@ -10,7 +10,7 @@ RUN apk --no-cache upgrade \ && apk --no-cache add tzdata build-base # install airbyte-cdk -RUN pip install --prefix=/install airbyte-cdk==0.63.0 +RUN pip install --prefix=/install airbyte-cdk==0.63.1 # build a clean environment FROM base @@ -32,5 +32,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] # needs to be the same as CDK -LABEL io.airbyte.version=0.63.0 +LABEL io.airbyte.version=0.63.1 LABEL io.airbyte.name=airbyte/source-declarative-manifest diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index 68c9482e40da..0598d8a0ef54 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -36,7 +36,7 @@ name="airbyte-cdk", # The version of the airbyte-cdk package is used at runtime to validate manifests. That validation must be # updated if our semver format changes such as using release candidate versions. - version="0.63.0", + version="0.63.1", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown",