Skip to content

Commit

Permalink
airbyte-ci: load built image to local docker host for java connectors
Browse files Browse the repository at this point in the history
  • Loading branch information
alafanechere committed Jan 23, 2025
1 parent 7333ff4 commit 58c60ca
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 64 deletions.
1 change: 1 addition & 0 deletions airbyte-ci/connectors/pipelines/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,7 @@ airbyte-ci connectors --language=low-code migrate-to-manifest-only

| Version | PR | Description |
| ------- | ---------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------- |
| 4.49.3 | [#52102](https://github.com/airbytehq/airbyte/pull/52102) | Load docker image to local docker host for java connectors |
| 4.49.2 | [#52090](https://github.com/airbytehq/airbyte/pull/52090) | Re-add custom task parameters in GradleTask |
| 4.49.1 | [#52087](https://github.com/airbytehq/airbyte/pull/52087) | Wire the `--enable-report-auto-open` correctly for connector tests |
| 4.49.0 | [#52033](https://github.com/airbytehq/airbyte/pull/52033) | Run gradle as a subprocess and not via Dagger |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async def run_connector_build_pipeline(context: ConnectorContext, semaphore: any
per_platform_built_containers = build_result.output
step_results.append(build_result)
if context.is_local and build_result.status is StepStatus.SUCCESS:
load_image_result = await LoadContainerToLocalDockerHost(context, per_platform_built_containers, image_tag).run()
load_image_result = await LoadContainerToLocalDockerHost(context, image_tag).run(per_platform_built_containers)
step_results.append(load_image_result)
report = ConnectorReport(context, step_results, name="BUILD RESULTS")
context.report = report
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,9 @@ async def get_image_user(cls: Type[T], base_container: Container) -> str:
class LoadContainerToLocalDockerHost(Step):
context: ConnectorContext

def __init__(self, context: ConnectorContext, containers: dict[Platform, Container], image_tag: str = "dev") -> None:
def __init__(self, context: ConnectorContext, image_tag: str = "dev") -> None:
super().__init__(context)
self.image_tag = image_tag
self.containers = containers

def _generate_dev_tag(self, platform: Platform, multi_platforms: bool) -> str:
"""
Expand All @@ -118,11 +117,11 @@ def title(self) -> str:
def image_name(self) -> str:
return f"airbyte/{self.context.connector.technical_name}"

async def _run(self) -> StepResult:
async def _run(self, containers: dict[Platform, Container]) -> StepResult:
loaded_images = []
image_sha = None
multi_platforms = len(self.containers) > 1
for platform, container in self.containers.items():
multi_platforms = len(containers) > 1
for platform, container in containers.items():
_, exported_tar_path = await export_container_to_tarball(self.context, container, platform)
if not exported_tar_path:
return StepResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class CONNECTOR_TEST_STEP_ID(str, Enum):
MANIFEST_ONLY_CHECK = "migrate_to_manifest_only.check"
MANIFEST_ONLY_STRIP = "migrate_to_manifest_only.strip"
MANIFEST_ONLY_UPDATE = "migrate_to_manifest_only.update"
LOAD_IMAGE_TO_LOCAL_DOCKER_HOST = "load_image_to_local_docker_host"

def __str__(self) -> str:
return self.value
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import anyio
from dagger import File, QueryError

from pipelines.airbyte_ci.connectors.build_image.steps.common import LoadContainerToLocalDockerHost
from pipelines.airbyte_ci.connectors.build_image.steps.java_connectors import (
BuildConnectorDistributionTar,
BuildConnectorImages,
Expand Down Expand Up @@ -52,29 +53,6 @@ def default_params(self) -> STEP_PARAMS:
"-x": ["assemble"],
}

async def _load_normalization_image(self, normalization_tar_file: File) -> None:
normalization_image_tag = f"{self.context.connector.normalization_repository}:dev"
self.context.logger.info("Load the normalization image to the docker host.")
await docker.load_image_to_docker_host(self.context, normalization_tar_file, normalization_image_tag)
self.context.logger.info("Successfully loaded the normalization image to the docker host.")

async def _load_connector_image(self, connector_tar_file: File) -> None:
connector_image_tag = f"airbyte/{self.context.connector.technical_name}:dev"
self.context.logger.info("Load the connector image to the docker host")
await docker.load_image_to_docker_host(self.context, connector_tar_file, connector_image_tag)
self.context.logger.info("Successfully loaded the connector image to the docker host.")

async def _run(self, connector_tar_file: File, normalization_tar_file: Optional[File]) -> StepResult:
try:
async with anyio.create_task_group() as tg:
if normalization_tar_file:
tg.start_soon(self._load_normalization_image, normalization_tar_file)
tg.start_soon(self._load_connector_image, connector_tar_file)
except QueryError as e:
return StepResult(step=self, status=StepStatus.FAILURE, stderr=str(e))
# Run the gradle integration test task now that the required docker images have been loaded.
return await super()._run()


class UnitTests(GradleTask):
"""A step to run unit tests for Java connectors."""
Expand All @@ -85,31 +63,6 @@ class UnitTests(GradleTask):
with_test_artifacts = True


def _create_integration_step_args_factory(context: ConnectorTestContext) -> Callable:
"""
Create a function that can process the args for the integration step.
"""

async def _create_integration_step_args(results: RESULTS_DICT) -> Dict[str, Optional[File]]:
connector_container = results["build"].output[LOCAL_BUILD_PLATFORM]
connector_image_tar_file, _ = await export_container_to_tarball(context, connector_container, LOCAL_BUILD_PLATFORM)

if context.connector.supports_normalization:
tar_file_name = f"{context.connector.normalization_repository}_{context.git_revision}.tar"
build_normalization_results = results["build_normalization"]

normalization_container = build_normalization_results.output
normalization_tar_file, _ = await export_container_to_tarball(
context, normalization_container, LOCAL_BUILD_PLATFORM, tar_file_name=tar_file_name
)
else:
normalization_tar_file = None

return {"connector_tar_file": connector_image_tar_file, "normalization_tar_file": normalization_tar_file}

return _create_integration_step_args


def _get_normalization_steps(context: ConnectorTestContext) -> List[StepToRun]:
normalization_image = f"{context.connector.normalization_repository}:dev"
context.logger.info(f"This connector supports normalization: will build {normalization_image}.")
Expand All @@ -134,7 +87,6 @@ def _get_acceptance_test_steps(context: ConnectorTestContext) -> List[StepToRun]
StepToRun(
id=CONNECTOR_TEST_STEP_ID.INTEGRATION,
step=IntegrationTests(context, secrets=context.get_secrets_for_step_id(CONNECTOR_TEST_STEP_ID.INTEGRATION)),
args=_create_integration_step_args_factory(context),
depends_on=[CONNECTOR_TEST_STEP_ID.BUILD],
),
StepToRun(
Expand Down Expand Up @@ -168,6 +120,14 @@ def get_test_steps(context: ConnectorTestContext) -> STEP_TREE:
step=BuildConnectorImages(context),
args=lambda results: {"dist_dir": results[CONNECTOR_TEST_STEP_ID.BUILD_TAR].output.directory("build/distributions")},
depends_on=[CONNECTOR_TEST_STEP_ID.BUILD_TAR],
)
],
[
StepToRun(
id=CONNECTOR_TEST_STEP_ID.LOAD_IMAGE_TO_LOCAL_DOCKER_HOST,
step=LoadContainerToLocalDockerHost(context, image_tag="dev"),
args=lambda results: {"containers": results[CONNECTOR_TEST_STEP_ID.BUILD].output},
depends_on=[CONNECTOR_TEST_STEP_ID.BUILD],
),
],
]
Expand Down
2 changes: 1 addition & 1 deletion airbyte-ci/connectors/pipelines/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "pipelines"
version = "4.49.2"
version = "4.49.3"
description = "Packaged maintained by the connector operations team to perform CI for connectors' pipelines"
authors = ["Airbyte <[email protected]>"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async def test_run(self, dagger_client, test_context, platforms):
platform: dagger_client.container(platform=platform).from_(f'{test_context.connector.metadata["dockerRepository"]}:latest')
for platform in platforms
}
step = common.LoadContainerToLocalDockerHost(test_context, built_containers)
step = common.LoadContainerToLocalDockerHost(test_context)

assert step.image_tag == "dev"
docker_client = docker.from_env()
Expand All @@ -72,7 +72,7 @@ async def test_run(self, dagger_client, test_context, platforms):
docker_client.images.remove(full_image_name, force=True)
except docker.errors.ImageNotFound:
pass
result = await step.run()
result = await step.run(built_containers)
assert result.status is StepStatus.SUCCESS
multi_platforms = len(platforms) > 1
for platform in platforms:
Expand All @@ -94,10 +94,10 @@ async def test_run_export_failure(self, dagger_client, test_context, mocker):
f'{test_context.connector.metadata["dockerRepository"]}:latest'
)
}
step = common.LoadContainerToLocalDockerHost(test_context, built_containers)
step = common.LoadContainerToLocalDockerHost(test_context)

mocker.patch.object(common, "export_container_to_tarball", return_value=(None, None))
result = await step.run()
result = await step.run(built_containers)
assert result.status is StepStatus.FAILURE
assert "Failed to export the connector image" in result.stderr

Expand All @@ -108,9 +108,9 @@ async def test_run_connection_error(self, dagger_client, test_context, bad_docke
f'{test_context.connector.metadata["dockerRepository"]}:latest'
)
}
step = common.LoadContainerToLocalDockerHost(test_context, built_containers)
step = common.LoadContainerToLocalDockerHost(test_context)
os.environ["DOCKER_HOST"] = bad_docker_host
result = await step.run()
result = await step.run(built_containers)
assert result.status is StepStatus.FAILURE
assert "Something went wrong while interacting with the local docker client" in result.stderr

Expand All @@ -121,11 +121,11 @@ async def test_run_import_failure(self, dagger_client, test_context, mocker):
f'{test_context.connector.metadata["dockerRepository"]}:latest'
)
}
step = common.LoadContainerToLocalDockerHost(test_context, built_containers)
step = common.LoadContainerToLocalDockerHost(test_context)
mock_docker_client = mocker.MagicMock()
mock_docker_client.api.import_image_from_file.return_value = "bad response"
mock_docker_client.images.load.side_effect = docker.errors.DockerException("test error")
mocker.patch.object(common.docker, "from_env", return_value=mock_docker_client)
result = await step.run()
result = await step.run(built_containers)
assert result.status is StepStatus.FAILURE
assert "Something went wrong while interacting with the local docker client: test error" in result.stderr

0 comments on commit 58c60ca

Please sign in to comment.