Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code): add items mappings to dynamic schemas #256

Merged
merged 21 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1800,6 +1800,19 @@ definitions:
$parameters:
type: object
additionalProperties: true
ComplexFieldType:
title: Schema Field Type
description: (This component is experimental. Use at your own risk.) Represents a complex field type.
type: object
required:
- field_type
properties:
field_type:
type: string
items:
anyOf:
- type: string
- "$ref": "#/definitions/ComplexFieldType"
TypesMap:
title: Types Map
description: (This component is experimental. Use at your own risk.) Represents a mapping between a current type and its corresponding target type.
Expand All @@ -1814,6 +1827,7 @@ definitions:
- type: array
items:
type: string
- "$ref": "#/definitions/ComplexFieldType"
current_type:
anyOf:
- type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -736,8 +736,13 @@ class HttpResponseFilter(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class ComplexFieldType(BaseModel):
field_type: str
items: Optional[Union[str, ComplexFieldType]] = None


class TypesMap(BaseModel):
target_type: Union[str, List[str]]
target_type: Union[str, List[str], ComplexFieldType]
current_type: Union[str, List[str]]
condition: Optional[str] = None

Expand Down Expand Up @@ -2260,6 +2265,7 @@ class DynamicDeclarativeStream(BaseModel):
)


ComplexFieldType.update_forward_refs()
CompositeErrorHandler.update_forward_refs()
DeclarativeSource1.update_forward_refs()
DeclarativeSource2.update_forward_refs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CheckStream as CheckStreamModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ComplexFieldType as ComplexFieldTypeModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ComponentMappingDefinition as ComponentMappingDefinitionModel,
)
Expand Down Expand Up @@ -429,6 +432,7 @@
SimpleRetrieverTestReadDecorator,
)
from airbyte_cdk.sources.declarative.schema import (
ComplexFieldType,
DefaultSchemaLoader,
DynamicSchemaLoader,
InlineSchemaLoader,
Expand Down Expand Up @@ -572,6 +576,7 @@ def _init_mappings(self) -> None:
DynamicSchemaLoaderModel: self.create_dynamic_schema_loader,
SchemaTypeIdentifierModel: self.create_schema_type_identifier,
TypesMapModel: self.create_types_map,
ComplexFieldTypeModel: self.create_complex_field_type,
JwtAuthenticatorModel: self.create_jwt_authenticator,
LegacyToPerPartitionStateMigrationModel: self.create_legacy_to_per_partition_state_migration,
ListPartitionRouterModel: self.create_list_partition_router,
Expand Down Expand Up @@ -1894,10 +1899,26 @@ def create_inline_schema_loader(
) -> InlineSchemaLoader:
return InlineSchemaLoader(schema=model.schema_ or {}, parameters={})

@staticmethod
def create_types_map(model: TypesMapModel, **kwargs: Any) -> TypesMap:
def create_complex_field_type(
self, model: ComplexFieldTypeModel, config: Config, **kwargs: Any
) -> ComplexFieldType:
items = (
self._create_component_from_model(model=model.items, config=config)
if isinstance(model.items, ComplexFieldTypeModel)
else model.items
)

return ComplexFieldType(field_type=model.field_type, items=items)

def create_types_map(self, model: TypesMapModel, config: Config, **kwargs: Any) -> TypesMap:
target_type = (
self._create_component_from_model(model=model.target_type, config=config)
if isinstance(model.target_type, ComplexFieldTypeModel)
else model.target_type
)

return TypesMap(
target_type=model.target_type,
target_type=target_type,
current_type=model.current_type,
condition=model.condition if model.condition is not None else "True",
)
Expand Down
2 changes: 2 additions & 0 deletions airbyte_cdk/sources/declarative/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from airbyte_cdk.sources.declarative.schema.default_schema_loader import DefaultSchemaLoader
from airbyte_cdk.sources.declarative.schema.dynamic_schema_loader import (
ComplexFieldType,
DynamicSchemaLoader,
SchemaTypeIdentifier,
TypesMap,
Expand All @@ -18,6 +19,7 @@
"SchemaLoader",
"InlineSchemaLoader",
"DynamicSchemaLoader",
"ComplexFieldType",
"TypesMap",
"SchemaTypeIdentifier",
]
45 changes: 40 additions & 5 deletions airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from airbyte_cdk.sources.source import ExperimentalClassWarning
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState

AIRBYTE_DATA_TYPES: Mapping[str, Mapping[str, Any]] = {
AIRBYTE_DATA_TYPES: Mapping[str, MutableMapping[str, Any]] = {
"string": {"type": ["null", "string"]},
"boolean": {"type": ["null", "boolean"]},
"date": {"type": ["null", "string"], "format": "date"},
Expand All @@ -45,14 +45,33 @@
}


@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
@dataclass(frozen=True)
class ComplexFieldType:
"""
Identifies complex field type
"""

field_type: str
items: Optional[Union[str, "ComplexFieldType"]] = None

def __post_init__(self) -> None:
"""
Enforces that `items` is only used when `field_type` is a array
"""
# `items_type` is valid only for array target types
if self.items and self.field_type != "array":
raise ValueError("'items' can only be used when 'field_type' is an array.")


@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
@dataclass(frozen=True)
class TypesMap:
"""
Represents a mapping between a current type and its corresponding target type.
"""

target_type: Union[List[str], str]
target_type: Union[List[str], str, ComplexFieldType]
current_type: Union[List[str], str]
condition: Optional[str]

Expand Down Expand Up @@ -135,8 +154,9 @@ def get_json_schema(self) -> Mapping[str, Any]:
transformed_properties = self._transform(properties, {})

return {
"$schema": "http://json-schema.org/draft-07/schema#",
"$schema": "https://json-schema.org/draft-07/schema#",
"type": "object",
"additionalProperties": True,
"properties": transformed_properties,
}

Expand Down Expand Up @@ -188,18 +208,33 @@ def _get_type(
first_type = self._get_airbyte_type(mapped_field_type[0])
second_type = self._get_airbyte_type(mapped_field_type[1])
return {"oneOf": [first_type, second_type]}

elif isinstance(mapped_field_type, str):
return self._get_airbyte_type(mapped_field_type)

elif isinstance(mapped_field_type, ComplexFieldType):
return self._resolve_complex_type(mapped_field_type)

else:
raise ValueError(
f"Invalid data type. Available string or two items list of string. Got {mapped_field_type}."
)

def _resolve_complex_type(self, complex_type: ComplexFieldType) -> Mapping[str, Any]:
if not complex_type.items:
return self._get_airbyte_type(complex_type.field_type)

field_type = self._get_airbyte_type(complex_type.field_type)
field_type["items"] = self._get_airbyte_type(complex_type.items) if isinstance(complex_type.items, str) else self._resolve_complex_type(
complex_type.items)

return field_type

def _replace_type_if_not_valid(
self,
field_type: Union[List[str], str],
raw_schema: MutableMapping[str, Any],
) -> Union[List[str], str]:
) -> Union[List[str], str, ComplexFieldType]:
"""
Replaces a field type if it matches a type mapping in `types_map`.
"""
Expand All @@ -216,7 +251,7 @@ def _replace_type_if_not_valid(
return field_type

@staticmethod
def _get_airbyte_type(field_type: str) -> Mapping[str, Any]:
def _get_airbyte_type(field_type: str) -> MutableMapping[str, Any]:
"""
Maps a field type to its corresponding Airbyte type definition.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,14 @@
"schema_pointer": ["fields"],
"key_pointer": ["name"],
"type_pointer": ["type"],
"types_mapping": [{"target_type": "string", "current_type": "singleLineText"}],
"types_mapping": [
{"target_type": "string", "current_type": "singleLineText"},
{
"target_type": {"field_type": "array", "items": {"field_type": "array", "items": "integer"}},
"current_type": "formula",
"condition": "{{ raw_schema['result']['type'] == 'customInteger' }}",
},
],
},
},
},
Expand Down Expand Up @@ -314,7 +321,8 @@ def test_dynamic_schema_loader_with_type_conditions():
]["types_mapping"].append({"target_type": "array", "current_type": "formula"})

expected_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"$schema": "https://json-schema.org/draft-07/schema#",
"additionalProperties": True,
"type": "object",
"properties": {
"id": {"type": ["null", "integer"]},
Expand All @@ -324,6 +332,7 @@ def test_dynamic_schema_loader_with_type_conditions():
"currency": {"type": ["null", "number"]},
"salary": {"type": ["null", "number"]},
"working_days": {"type": ["null", "array"]},
"avg_salary": {"type": ["null", "array"], "items": {"type": ["null", "array"], "items": {"type": ["null", "integer"]}}},
},
}
source = ConcurrentDeclarativeSource(
Expand Down Expand Up @@ -365,6 +374,12 @@ def test_dynamic_schema_loader_with_type_conditions():
{"name": "FirstName", "type": "string"},
{"name": "Description", "type": "singleLineText"},
{"name": "Salary", "type": "formula", "result": {"type": "number"}},
{
"name": "AvgSalary",
"type": "formula",
"result": {"type": "customInteger"},
},
{"name": "Currency", "type": "formula", "result": {"type": "currency"}},
{"name": "Currency", "type": "formula", "result": {"type": "currency"}},
{"name": "WorkingDays", "type": "formula"},
]
Expand Down
Loading