diff --git a/airbyte/_util/api_util.py b/airbyte/_util/api_util.py index 32ccb8e1..71d47bbd 100644 --- a/airbyte/_util/api_util.py +++ b/airbyte/_util/api_util.py @@ -14,6 +14,7 @@ from __future__ import annotations import json +from dataclasses import dataclass from typing import TYPE_CHECKING, Any, Literal import airbyte_api @@ -858,3 +859,102 @@ def check_connector( "response": json_result, }, ) + + +@dataclass +class DockerImageOverride: + """Defines a connector image override.""" + + docker_image_override: str + override_level: Literal["workspace", "actor"] = "actor" + + +def get_connector_image_override( + *, + workspace_id: str, + actor_id: str, + actor_type: Literal["source", "destination"], + api_root: str = CLOUD_API_ROOT, + client_id: SecretString, + client_secret: SecretString, +) -> DockerImageOverride | None: + """Get the docker image and tag for a specific connector. + + Result is a tuple of two values: + - A boolean indicating if an override is set. + - The docker image and tag, either from the override if set, or from the . + + https://github.com/airbytehq/airbyte-platform-internal/blob/10bb92e1745a282e785eedfcbed1ba72654c4e4e/oss/airbyte-api/server-api/src/main/openapi/config.yaml#L5066 + """ + json_result = _make_config_api_request( + path="/scoped_configuration/list", + json={ + "config_key": "docker_image", # TODO: Fix this. + }, + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + try: + scoped_configurations = json_result["scopedConfigurations"] + except KeyError as ex: + raise AirbyteError( + message="Could not find 'scoped_configurations' in response.", + context={ + "workspace_id": workspace_id, + "actor_id": actor_id, + "actor_type": actor_type, + "response_keys": list(json_result.keys()), + }, + ) from ex + overrides = [ + DockerImageOverride( + docker_image_override=entry["value"], + override_level=entry["scope_type"], + ) + for entry in scoped_configurations + ] + if len(overrides) == 0: + return None + + if len(overrides) > 1: + raise NotImplementedError( + "Multiple overrides found. This is not yet supported.", + ) + + return overrides[0] + + +def set_actor_override( + *, + workspace_id: str, + actor_id: str, + actor_type: Literal["source", "destination"], + override: DockerImageOverride, + api_root: str = CLOUD_API_ROOT, + client_id: SecretString, + client_secret: SecretString, +) -> None: + """Override the docker image and tag for a specific connector. + + https://github.com/airbytehq/airbyte-platform-internal/blob/10bb92e1745a282e785eedfcbed1ba72654c4e4e/oss/airbyte-api/server-api/src/main/openapi/config.yaml#L5111 + """ + _ = workspace_id + json_result = _make_config_api_request( + path="/scoped_configuration/create", + json={ + # https://github.com/airbytehq/airbyte-platform-internal/blob/10bb92e1745a282e785eedfcbed1ba72654c4e4e/oss/airbyte-api/server-api/src/main/openapi/config.yaml#L7376 + "value": override.docker_image_override, + "config_key": "docker_image", # TODO: Fix this. + "scope_id": actor_id, + "scope_type": actor_type, + "resource_id": "", # TODO: Need to call something like get_actor_definition + "resource_type": "ACTOR_DEFINITION", + "origin": "", # TODO: Need to get user ID somehow or use another origin type + "origin_type": "USER", + }, + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + _ = json_result # Not used (yet) diff --git a/tests/integration_tests/cloud/test_cloud_api_util.py b/tests/integration_tests/cloud/test_cloud_api_util.py index 4aebb424..7dec08e0 100644 --- a/tests/integration_tests/cloud/test_cloud_api_util.py +++ b/tests/integration_tests/cloud/test_cloud_api_util.py @@ -15,6 +15,7 @@ check_connector, AirbyteError, CLOUD_API_ROOT, + get_connector_image_override, ) from airbyte_api.models import DestinationDuckdb, SourceFaker @@ -280,5 +281,36 @@ def test_check_connector( client_secret=airbyte_cloud_client_secret, ) assert result == expect_success + except NotImplementedError: + pytest.fail("check_connector function is not implemented") + except AirbyteError as e: + pytest.fail(f"API call failed: {e}") + + +@pytest.mark.parametrize( + "connector_id, connector_type, expect_success", + [ + ("f45dd701-d1f0-4e8e-97c4-2b89c40ac928", "source", True), + # ("......-....-....-....-............", "destination", True), + ], +) +def test_get_connector_overrides( + airbyte_cloud_client_id: SecretString, + airbyte_cloud_client_secret: SecretString, + connector_id: str, + connector_type: Literal["source", "destination"], + expect_success: bool, +) -> None: + try: + result = get_connector_image_override( + workspace_id=None, # type: ignore # Unused anyway + actor_id=connector_id, + actor_type=connector_type, + client_id=airbyte_cloud_client_id, + client_secret=airbyte_cloud_client_secret, + ) + # assert result is not None # No overrides on this item + except NotImplementedError: + pytest.fail("check_connector function is not implemented") except AirbyteError as e: pytest.fail(f"API call failed: {e}")