Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code): add items mappings to dynamic schemas #256

Merged
merged 21 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1800,6 +1800,24 @@ definitions:
$parameters:
type: object
additionalProperties: true
ItemsTypeMap:
title: Types Map
description: (This component is experimental. Use at your own risk.) Represents a mapping between a current type and its corresponding target type for property.
type: object
required:
- items_type_pointer
- type_mapping
properties:
property_name:
type: string
items_type_pointer:
title: Items Type Path
description: List of potentially nested fields describing the full path of the items type to extract.
type: array
items:
- type: string
type_mapping:
"$ref": "#/definitions/TypesMap"
TypesMap:
title: Types Map
description: (This component is experimental. Use at your own risk.) Represents a mapping between a current type and its corresponding target type.
Expand All @@ -1824,6 +1842,8 @@ definitions:
type: string
interpolation_context:
- raw_schema
items_type:
"$ref": "#/definitions/ItemsTypeMap"
SchemaTypeIdentifier:
title: Schema Type Identifier
description: (This component is experimental. Use at your own risk.) Identifies schema details for dynamic schema extraction and processing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -736,33 +736,6 @@ class HttpResponseFilter(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class TypesMap(BaseModel):
target_type: Union[str, List[str]]
current_type: Union[str, List[str]]
condition: Optional[str] = None


class SchemaTypeIdentifier(BaseModel):
type: Optional[Literal["SchemaTypeIdentifier"]] = None
schema_pointer: Optional[List[str]] = Field(
[],
description="List of nested fields defining the schema field path to extract. Defaults to [].",
title="Schema Path",
)
key_pointer: List[str] = Field(
...,
description="List of potentially nested fields describing the full path of the field key to extract.",
title="Key Path",
)
type_pointer: Optional[List[str]] = Field(
None,
description="List of potentially nested fields describing the full path of the field type to extract.",
title="Type Path",
)
types_mapping: Optional[List[TypesMap]] = None
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class InlineSchemaLoader(BaseModel):
type: Literal["InlineSchemaLoader"]
schema_: Optional[Dict[str, Any]] = Field(
Expand Down Expand Up @@ -2020,6 +1993,44 @@ class HttpRequester(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class ItemsTypeMap(BaseModel):
property_name: Optional[str] = None
items_type_pointer: List[str] = Field(
...,
description="List of potentially nested fields describing the full path of the items type to extract.",
title="Items Type Path",
)
type_mapping: TypesMap


class TypesMap(BaseModel):
target_type: Union[str, List[str]]
current_type: Union[str, List[str]]
condition: Optional[str] = None
items_type: Optional[ItemsTypeMap] = None


class SchemaTypeIdentifier(BaseModel):
type: Optional[Literal["SchemaTypeIdentifier"]] = None
schema_pointer: Optional[List[str]] = Field(
[],
description="List of nested fields defining the schema field path to extract. Defaults to [].",
title="Schema Path",
)
key_pointer: List[str] = Field(
...,
description="List of potentially nested fields describing the full path of the field key to extract.",
title="Key Path",
)
type_pointer: Optional[List[str]] = Field(
None,
description="List of potentially nested fields describing the full path of the field type to extract.",
title="Type Path",
)
types_mapping: Optional[List[TypesMap]] = None
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class DynamicSchemaLoader(BaseModel):
type: Literal["DynamicSchemaLoader"]
retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
Expand Down Expand Up @@ -2266,6 +2277,7 @@ class DynamicDeclarativeStream(BaseModel):
SelectiveAuthenticator.update_forward_refs()
DeclarativeStream.update_forward_refs()
SessionTokenAuthenticator.update_forward_refs()
ItemsTypeMap.update_forward_refs()
DynamicSchemaLoader.update_forward_refs()
SimpleRetriever.update_forward_refs()
AsyncRetriever.update_forward_refs()
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
InlineSchemaLoader as InlineSchemaLoaderModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ItemsTypeMap as ItemsTypeMapModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
IterableDecoder as IterableDecoderModel,
)
Expand Down Expand Up @@ -432,6 +435,7 @@
DefaultSchemaLoader,
DynamicSchemaLoader,
InlineSchemaLoader,
ItemsTypeMap,
JsonFileSchemaLoader,
SchemaTypeIdentifier,
TypesMap,
Expand Down Expand Up @@ -572,6 +576,7 @@ def _init_mappings(self) -> None:
DynamicSchemaLoaderModel: self.create_dynamic_schema_loader,
SchemaTypeIdentifierModel: self.create_schema_type_identifier,
TypesMapModel: self.create_types_map,
ItemsTypeMapModel: self.create_items_type_map,
JwtAuthenticatorModel: self.create_jwt_authenticator,
LegacyToPerPartitionStateMigrationModel: self.create_legacy_to_per_partition_state_migration,
ListPartitionRouterModel: self.create_list_partition_router,
Expand Down Expand Up @@ -1894,12 +1899,27 @@ def create_inline_schema_loader(
) -> InlineSchemaLoader:
return InlineSchemaLoader(schema=model.schema_ or {}, parameters={})

@staticmethod
def create_types_map(model: TypesMapModel, **kwargs: Any) -> TypesMap:
def create_items_type_map(
self, model: ItemsTypeMapModel, config: Config, **kwargs: Any
) -> ItemsTypeMap:
type_mapping = self._create_component_from_model(model=model.type_mapping, config=config)
model_items_type_pointer: List[Union[InterpolatedString, str]] = (
[x for x in model.items_type_pointer] if model.items_type_pointer else []
)
return ItemsTypeMap(items_type_pointer=model_items_type_pointer, type_mapping=type_mapping)

def create_types_map(self, model: TypesMapModel, config: Config, **kwargs: Any) -> TypesMap:
items_type = (
self._create_component_from_model(model=model.items_type, config=config)
if model.items_type
else None
)

return TypesMap(
target_type=model.target_type,
current_type=model.current_type,
condition=model.condition if model.condition is not None else "True",
items_type=items_type,
)

def create_schema_type_identifier(
Expand Down
2 changes: 2 additions & 0 deletions airbyte_cdk/sources/declarative/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from airbyte_cdk.sources.declarative.schema.default_schema_loader import DefaultSchemaLoader
from airbyte_cdk.sources.declarative.schema.dynamic_schema_loader import (
DynamicSchemaLoader,
ItemsTypeMap,
SchemaTypeIdentifier,
TypesMap,
)
Expand All @@ -19,5 +20,6 @@
"InlineSchemaLoader",
"DynamicSchemaLoader",
"TypesMap",
"ItemsTypeMap",
"SchemaTypeIdentifier",
]
85 changes: 74 additions & 11 deletions airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from copy import deepcopy
from dataclasses import InitVar, dataclass, field
from typing import Any, List, Mapping, MutableMapping, Optional, Union
from typing import Any, List, Mapping, MutableMapping, Optional, Tuple, Union

import dpath
from typing_extensions import deprecated
Expand All @@ -18,7 +18,7 @@
from airbyte_cdk.sources.source import ExperimentalClassWarning
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState

AIRBYTE_DATA_TYPES: Mapping[str, Mapping[str, Any]] = {
AIRBYTE_DATA_TYPES: Mapping[str, MutableMapping[str, Any]] = {
"string": {"type": ["null", "string"]},
"boolean": {"type": ["null", "boolean"]},
"date": {"type": ["null", "string"], "format": "date"},
Expand All @@ -45,6 +45,17 @@
}


@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
@dataclass(frozen=True)
class ItemsTypeMap:
"""
Represents a mapping between a current type and its corresponding target type for item.
"""

items_type_pointer: List[Union[InterpolatedString, str]]
type_mapping: "TypesMap"


@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
@dataclass(frozen=True)
class TypesMap:
Expand All @@ -55,6 +66,15 @@ class TypesMap:
target_type: Union[List[str], str]
current_type: Union[List[str], str]
condition: Optional[str]
items_type: Optional[ItemsTypeMap] = None

def __post_init__(self) -> None:
"""
Enforces that `items_type` is only used when `target_type` is a array
"""
# `items_type` is valid only for array target types
if self.items_type and self.target_type != "array":
raise ValueError("'items_type' can only be used when 'target_type' is an array.")


@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
Expand Down Expand Up @@ -179,7 +199,10 @@ def _get_type(
if field_type_path
else "string"
)
mapped_field_type = self._replace_type_if_not_valid(raw_field_type, raw_schema)
mapped_field_type, mapped_additional_types = self._replace_type_if_not_valid(
raw_field_type, raw_schema
)

if (
isinstance(mapped_field_type, list)
and len(mapped_field_type) == 2
Expand All @@ -189,7 +212,7 @@ def _get_type(
second_type = self._get_airbyte_type(mapped_field_type[1])
return {"oneOf": [first_type, second_type]}
elif isinstance(mapped_field_type, str):
return self._get_airbyte_type(mapped_field_type)
return self._get_airbyte_type(mapped_field_type, mapped_additional_types)
else:
raise ValueError(
f"Invalid data type. Available string or two items list of string. Got {mapped_field_type}."
Expand All @@ -199,10 +222,11 @@ def _replace_type_if_not_valid(
self,
field_type: Union[List[str], str],
raw_schema: MutableMapping[str, Any],
) -> Union[List[str], str]:
) -> Tuple[Union[List[str], str], List[Union[List[str], str]]]:
"""
Replaces a field type if it matches a type mapping in `types_map`.
"""
additional_types: List[Union[List[str], str]] = []
if self.schema_type_identifier.types_mapping:
for types_map in self.schema_type_identifier.types_mapping:
# conditional is optional param, setting to true if not provided
Expand All @@ -212,18 +236,57 @@ def _replace_type_if_not_valid(
).eval(config=self.config, raw_schema=raw_schema)

if field_type == types_map.current_type and condition:
return types_map.target_type
return field_type

@staticmethod
def _get_airbyte_type(field_type: str) -> Mapping[str, Any]:
if types_map.items_type:
items_type = self._extract_data(
raw_schema, types_map.items_type.items_type_pointer
)
items_type_condition = InterpolatedBoolean(
condition=types_map.items_type.type_mapping.condition
if types_map.items_type.type_mapping.condition is not None
else "True",
parameters={},
).eval(config=self.config, raw_schema=raw_schema)

if (
items_type == types_map.items_type.type_mapping.current_type
and items_type_condition
):
additional_types = [types_map.items_type.type_mapping.target_type]
return types_map.target_type, additional_types
return field_type, additional_types

def _get_airbyte_type(
self, field_type: str, additional_types: Optional[List[Union[List[str], str]]] = None
) -> Mapping[str, Any]:
"""
Maps a field type to its corresponding Airbyte type definition.
"""
if additional_types is None:
additional_types = []

if field_type not in AIRBYTE_DATA_TYPES:
raise ValueError(f"Invalid Airbyte data type: {field_type}")

return deepcopy(AIRBYTE_DATA_TYPES[field_type])
airbyte_type = deepcopy(AIRBYTE_DATA_TYPES[field_type])

if field_type == "array" and additional_types:
if (
isinstance(additional_types[0], list)
and len(additional_types[0]) == 2
and all(isinstance(item, str) for item in additional_types[0])
):
first_type = self._get_airbyte_type(additional_types[0][0])
second_type = self._get_airbyte_type(additional_types[0][1])
items_type = {"oneOf": [first_type, second_type]}
elif isinstance(additional_types[0], str):
items_type = deepcopy(AIRBYTE_DATA_TYPES[additional_types[0]]) # type: ignore[arg-type]
else:
raise ValueError(
f"Invalid data type. Available string or two items list of string. Got {additional_types[0]}."
)

airbyte_type["items"] = items_type
return airbyte_type

def _extract_data(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,21 @@
"schema_pointer": ["fields"],
"key_pointer": ["name"],
"type_pointer": ["type"],
"types_mapping": [{"target_type": "string", "current_type": "singleLineText"}],
"types_mapping": [
{"target_type": "string", "current_type": "singleLineText"},
{
"target_type": "array",
"current_type": "formula",
"items_type": {
"items_type_pointer": ["result", "type"],
"type_mapping": {
"target_type": "integer",
"current_type": "customInteger",
},
},
"condition": "{{ raw_schema['result']['type'] == 'customInteger' }}",
},
],
},
},
},
Expand Down Expand Up @@ -324,6 +338,7 @@ def test_dynamic_schema_loader_with_type_conditions():
"currency": {"type": ["null", "number"]},
"salary": {"type": ["null", "number"]},
"working_days": {"type": ["null", "array"]},
"avg_salary": {"type": ["null", "array"], "items": {"type": ["null", "integer"]}},
},
}
source = ConcurrentDeclarativeSource(
Expand Down Expand Up @@ -365,6 +380,12 @@ def test_dynamic_schema_loader_with_type_conditions():
{"name": "FirstName", "type": "string"},
{"name": "Description", "type": "singleLineText"},
{"name": "Salary", "type": "formula", "result": {"type": "number"}},
{
"name": "AvgSalary",
"type": "formula",
"result": {"type": "customInteger"},
},
{"name": "Currency", "type": "formula", "result": {"type": "currency"}},
{"name": "Currency", "type": "formula", "result": {"type": "currency"}},
{"name": "WorkingDays", "type": "formula"},
]
Expand Down
Loading