Skip to content

Commit

Permalink
feat(low-code cdk): add config component resolver (#149)
Browse files Browse the repository at this point in the history
  • Loading branch information
lazebnyi authored Dec 12, 2024
1 parent 5801cd8 commit ceebfda
Show file tree
Hide file tree
Showing 10 changed files with 579 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def __new__( # type: ignore[misc]
try:
selected_key = str(
dpath.get(
config, # type: ignore [arg-type] # Dpath wants mutable mapping but doesn't need it.
config, # type: ignore[arg-type] # Dpath wants mutable mapping but doesn't need it.
authenticator_selection_path,
)
)
Expand Down
54 changes: 52 additions & 2 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3015,8 +3015,10 @@ definitions:
examples:
- ["data"]
- ["data", "records"]
- ["data", "{{ parameters.name }}"]
- ["data", 1, "name"]
- ["data", "{{ components_values.name }}"]
- ["data", "*", "record"]
- ["*", "**", "name"]
value:
title: Value
description: The dynamic or static value to assign to the key. Interpolated values can be used to dynamically determine the value during runtime.
Expand Down Expand Up @@ -3061,6 +3063,52 @@ definitions:
- type
- retriever
- components_mapping
StreamConfig:
title: Stream Config
description: (This component is experimental. Use at your own risk.) Describes how to get streams config from the source config.
type: object
required:
- type
- configs_pointer
properties:
type:
type: string
enum: [StreamConfig]
configs_pointer:
title: Configs Pointer
description: A list of potentially nested fields indicating the full path in source config file where streams configs located.
type: array
items:
- type: string
interpolation_context:
- parameters
examples:
- ["data"]
- ["data", "streams"]
- ["data", "{{ parameters.name }}"]
$parameters:
type: object
additionalProperties: true
ConfigComponentsResolver:
type: object
description: (This component is experimental. Use at your own risk.) Resolves and populates stream templates with components fetched from the source config.
properties:
type:
type: string
enum: [ConfigComponentsResolver]
stream_config:
"$ref": "#/definitions/StreamConfig"
components_mapping:
type: array
items:
"$ref": "#/definitions/ComponentMappingDefinition"
$parameters:
type: object
additionalProperties: true
required:
- type
- stream_config
- components_mapping
DynamicDeclarativeStream:
type: object
description: (This component is experimental. Use at your own risk.) A component that described how will be created declarative streams based on stream template.
Expand All @@ -3075,7 +3123,9 @@ definitions:
components_resolver:
title: Components Resolver
description: Component resolve and populates stream templates with components values.
"$ref": "#/definitions/HttpComponentsResolver"
anyOf:
- "$ref": "#/definitions/HttpComponentsResolver"
- "$ref": "#/definitions/ConfigComponentsResolver"
required:
- type
- stream_template
Expand Down
23 changes: 22 additions & 1 deletion airbyte_cdk/sources/declarative/manifest_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pkgutil
from copy import deepcopy
from importlib import metadata
from typing import Any, Dict, Iterator, List, Mapping, Optional
from typing import Any, Dict, Iterator, List, Mapping, Optional, Set

import yaml
from jsonschema.exceptions import ValidationError
Expand All @@ -20,6 +20,7 @@
AirbyteStateMessage,
ConfiguredAirbyteCatalog,
ConnectorSpecification,
FailureType,
)
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
Expand Down Expand Up @@ -48,6 +49,7 @@
DebugSliceLogger,
SliceLogger,
)
from airbyte_cdk.utils.traced_exception import AirbyteTracedException


class ManifestDeclarativeSource(DeclarativeSource):
Expand Down Expand Up @@ -313,6 +315,7 @@ def _dynamic_stream_configs(
) -> List[Dict[str, Any]]:
dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", [])
dynamic_stream_configs: List[Dict[str, Any]] = []
seen_dynamic_streams: Set[str] = set()

for dynamic_definition in dynamic_stream_definitions:
components_resolver_config = dynamic_definition["components_resolver"]
Expand Down Expand Up @@ -350,6 +353,24 @@ def _dynamic_stream_configs(
if "type" not in dynamic_stream:
dynamic_stream["type"] = "DeclarativeStream"

# Ensure that each stream is created with a unique name
name = dynamic_stream.get("name")

if name in seen_dynamic_streams:
error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support."
failure_type = FailureType.system_error

if resolver_type == "ConfigComponentsResolver":
error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration."
failure_type = FailureType.config_error

raise AirbyteTracedException(
message=error_message,
internal_message=error_message,
failure_type=failure_type,
)

seen_dynamic_streams.add(name)
dynamic_stream_configs.append(dynamic_stream)

return dynamic_stream_configs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1192,8 +1192,10 @@ class ComponentMappingDefinition(BaseModel):
examples=[
["data"],
["data", "records"],
["data", "{{ parameters.name }}"],
["data", 1, "name"],
["data", "{{ components_values.name }}"],
["data", "*", "record"],
["*", "**", "name"],
],
title="Field Path",
)
Expand All @@ -1215,6 +1217,24 @@ class ComponentMappingDefinition(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class StreamConfig(BaseModel):
type: Literal["StreamConfig"]
configs_pointer: List[str] = Field(
...,
description="A list of potentially nested fields indicating the full path in source config file where streams configs located.",
examples=[["data"], ["data", "streams"], ["data", "{{ parameters.name }}"]],
title="Configs Pointer",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class ConfigComponentsResolver(BaseModel):
type: Literal["ConfigComponentsResolver"]
stream_config: StreamConfig
components_mapping: List[ComponentMappingDefinition]
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class AddedFieldDefinition(BaseModel):
type: Literal["AddedFieldDefinition"]
path: List[str] = Field(
Expand Down Expand Up @@ -2010,7 +2030,7 @@ class DynamicDeclarativeStream(BaseModel):
stream_template: DeclarativeStream = Field(
..., description="Reference to the stream template.", title="Stream Template"
)
components_resolver: HttpComponentsResolver = Field(
components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field(
...,
description="Component resolve and populates stream templates with components values.",
title="Components Resolver",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@
"DeclarativeStream.schema_loader": "JsonFileSchemaLoader",
# DynamicDeclarativeStream
"DynamicDeclarativeStream.stream_template": "DeclarativeStream",
"DynamicDeclarativeStream.components_resolver": "HttpComponentsResolver",
"DynamicDeclarativeStream.components_resolver": "ConfigComponentResolver",
# HttpComponentsResolver
"HttpComponentsResolver.retriever": "SimpleRetriever",
"HttpComponentsResolver.components_mapping": "ComponentMappingDefinition",
# ConfigComponentResolver
"ConfigComponentsResolver.stream_config": "StreamConfig",
"ConfigComponentsResolver.components_mapping": "ComponentMappingDefinition",
# DefaultErrorHandler
"DefaultErrorHandler.response_filters": "HttpResponseFilter",
# DefaultPaginator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ConcurrencyLevel as ConcurrencyLevelModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ConfigComponentsResolver as ConfigComponentsResolverModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ConstantBackoffStrategy as ConstantBackoffStrategyModel,
)
Expand Down Expand Up @@ -294,6 +297,9 @@
SimpleRetriever as SimpleRetrieverModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import Spec as SpecModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
StreamConfig as StreamConfigModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
SubstreamPartitionRouter as SubstreamPartitionRouterModel,
)
Expand Down Expand Up @@ -356,7 +362,9 @@
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod
from airbyte_cdk.sources.declarative.resolvers import (
ComponentMappingDefinition,
ConfigComponentsResolver,
HttpComponentsResolver,
StreamConfig,
)
from airbyte_cdk.sources.declarative.retrievers import (
AsyncRetriever,
Expand Down Expand Up @@ -494,6 +502,8 @@ def _init_mappings(self) -> None:
WaitUntilTimeFromHeaderModel: self.create_wait_until_time_from_header,
AsyncRetrieverModel: self.create_async_retriever,
HttpComponentsResolverModel: self.create_http_components_resolver,
ConfigComponentsResolverModel: self.create_config_components_resolver,
StreamConfigModel: self.create_stream_config,
ComponentMappingDefinitionModel: self.create_components_mapping_definition,
}

Expand Down Expand Up @@ -1884,8 +1894,8 @@ def create_record_selector(
self,
model: RecordSelectorModel,
config: Config,
name: str,
*,
name: str,
transformations: List[RecordTransformation],
decoder: Optional[Decoder] = None,
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
Expand Down Expand Up @@ -2364,3 +2374,41 @@ def create_http_components_resolver(
components_mapping=components_mapping,
parameters=model.parameters or {},
)

@staticmethod
def create_stream_config(
model: StreamConfigModel, config: Config, **kwargs: Any
) -> StreamConfig:
model_configs_pointer: List[Union[InterpolatedString, str]] = (
[x for x in model.configs_pointer] if model.configs_pointer else []
)

return StreamConfig(
configs_pointer=model_configs_pointer,
parameters=model.parameters or {},
)

def create_config_components_resolver(
self, model: ConfigComponentsResolverModel, config: Config
) -> Any:
stream_config = self._create_component_from_model(
model.stream_config, config=config, parameters=model.parameters or {}
)

components_mapping = [
self._create_component_from_model(
model=components_mapping_definition_model,
value_type=ModelToComponentFactory._json_schema_type_name_to_type(
components_mapping_definition_model.value_type
),
config=config,
)
for components_mapping_definition_model in model.components_mapping
]

return ConfigComponentsResolver(
stream_config=stream_config,
config=config,
components_mapping=components_mapping,
parameters=model.parameters or {},
)
11 changes: 8 additions & 3 deletions airbyte_cdk/sources/declarative/resolvers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@

from airbyte_cdk.sources.declarative.resolvers.components_resolver import ComponentsResolver, ComponentMappingDefinition, ResolvedComponentMappingDefinition
from airbyte_cdk.sources.declarative.resolvers.http_components_resolver import HttpComponentsResolver
from airbyte_cdk.sources.declarative.resolvers.config_components_resolver import ConfigComponentsResolver, StreamConfig
from airbyte_cdk.sources.declarative.models import HttpComponentsResolver as HttpComponentsResolverModel
from airbyte_cdk.sources.declarative.models import ConfigComponentsResolver as ConfigComponentsResolverModel
from pydantic.v1 import BaseModel
from typing import Mapping

COMPONENTS_RESOLVER_TYPE_MAPPING = {
"HttpComponentsResolver": HttpComponentsResolverModel
COMPONENTS_RESOLVER_TYPE_MAPPING: Mapping[str, type[BaseModel]] = {
"HttpComponentsResolver": HttpComponentsResolverModel,
"ConfigComponentsResolver": ConfigComponentsResolverModel
}

__all__ = ["ComponentsResolver", "HttpComponentsResolver", "ComponentMappingDefinition", "ResolvedComponentMappingDefinition", "COMPONENTS_RESOLVER_TYPE_MAPPING"]
__all__ = ["ComponentsResolver", "HttpComponentsResolver", "ComponentMappingDefinition", "ResolvedComponentMappingDefinition", "StreamConfig", "ConfigComponentsResolver", "COMPONENTS_RESOLVER_TYPE_MAPPING"]
Loading

0 comments on commit ceebfda

Please sign in to comment.