Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code cdk): add config component resolver #149

Merged
merged 15 commits into from
Dec 12, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def __new__( # type: ignore[misc]
try:
selected_key = str(
dpath.get(
config, # type: ignore [arg-type] # Dpath wants mutable mapping but doesn't need it.
config, # type: ignore[arg-type] # Dpath wants mutable mapping but doesn't need it.
authenticator_selection_path,
)
)
Expand Down
54 changes: 52 additions & 2 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2928,8 +2928,10 @@ definitions:
examples:
- ["data"]
- ["data", "records"]
- ["data", "{{ parameters.name }}"]
- ["data", 1, "name"]
- ["data", "{{ components_values.name }}"]
- ["data", "*", "record"]
- ["*", "**", "name"]
value:
title: Value
description: The dynamic or static value to assign to the key. Interpolated values can be used to dynamically determine the value during runtime.
Expand Down Expand Up @@ -2974,6 +2976,52 @@ definitions:
- type
- retriever
- components_mapping
StreamConfig:
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
title: Stream Config
description: (This component is experimental. Use at your own risk.) Describes how to get streams config from the source config.
type: object
required:
- type
- configs_pointer
properties:
type:
type: string
enum: [StreamConfig]
configs_pointer:
title: Configs Pointer
description: A list of potentially nested fields indicating the full path in source config file where streams configs located.
type: array
items:
- type: string
interpolation_context:
- parameters
examples:
- ["data"]
- ["data", "streams"]
- ["data", "{{ parameters.name }}"]
$parameters:
type: object
additionalProperties: true
ConfigComponentsResolver:
type: object
description: (This component is experimental. Use at your own risk.) Resolves and populates stream templates with components fetched from the source config.
properties:
type:
type: string
enum: [ConfigComponentsResolver]
stream_config:
"$ref": "#/definitions/StreamConfig"
components_mapping:
type: array
items:
"$ref": "#/definitions/ComponentMappingDefinition"
$parameters:
type: object
additionalProperties: true
required:
- type
- stream_config
- components_mapping
DynamicDeclarativeStream:
type: object
description: (This component is experimental. Use at your own risk.) A component that described how will be created declarative streams based on stream template.
Expand All @@ -2988,7 +3036,9 @@ definitions:
components_resolver:
title: Components Resolver
description: Component resolve and populates stream templates with components values.
"$ref": "#/definitions/HttpComponentsResolver"
anyOf:
- "$ref": "#/definitions/HttpComponentsResolver"
- "$ref": "#/definitions/ConfigComponentsResolver"
required:
- type
- stream_template
Expand Down
18 changes: 17 additions & 1 deletion airbyte_cdk/sources/declarative/manifest_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pkgutil
from copy import deepcopy
from importlib import metadata
from typing import Any, Dict, Iterator, List, Mapping, Optional
from typing import Any, Dict, Iterator, List, Mapping, Optional, Set

import yaml
from jsonschema.exceptions import ValidationError
Expand All @@ -20,6 +20,7 @@
AirbyteStateMessage,
ConfiguredAirbyteCatalog,
ConnectorSpecification,
FailureType,
)
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
Expand Down Expand Up @@ -48,6 +49,7 @@
DebugSliceLogger,
SliceLogger,
)
from airbyte_cdk.utils.traced_exception import AirbyteTracedException


class ManifestDeclarativeSource(DeclarativeSource):
Expand Down Expand Up @@ -313,6 +315,7 @@ def _dynamic_stream_configs(
) -> List[Dict[str, Any]]:
dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", [])
dynamic_stream_configs: List[Dict[str, Any]] = []
seen_dynamic_streams: Set[str] = set()

for dynamic_definition in dynamic_stream_definitions:
components_resolver_config = dynamic_definition["components_resolver"]
Expand Down Expand Up @@ -350,6 +353,19 @@ def _dynamic_stream_configs(
if "type" not in dynamic_stream:
dynamic_stream["type"] = "DeclarativeStream"

# Ensure that each stream is created with a unique name
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
name = dynamic_stream.get("name")

if name in seen_dynamic_streams:
error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration."

raise AirbyteTracedException(
message=error_message,
internal_message=error_message,
failure_type=FailureType.config_error,
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
)

seen_dynamic_streams.add(name)
dynamic_stream_configs.append(dynamic_stream)

return dynamic_stream_configs
Expand Down
Loading
Loading