Skip to content

Commit

Permalink
feat(low-code): add items and property mappings to dynamic schemas (#256
Browse files Browse the repository at this point in the history
)

Co-authored-by: octavia-squidington-iii <[email protected]>
Co-authored-by: Aaron ("AJ") Steers <[email protected]>
  • Loading branch information
3 people authored Jan 24, 2025
1 parent 0d22bdd commit 4ea9d94
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 16 deletions.
14 changes: 14 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1800,6 +1800,19 @@ definitions:
$parameters:
type: object
additionalProperties: true
ComplexFieldType:
title: Schema Field Type
description: (This component is experimental. Use at your own risk.) Represents a complex field type.
type: object
required:
- field_type
properties:
field_type:
type: string
items:
anyOf:
- type: string
- "$ref": "#/definitions/ComplexFieldType"
TypesMap:
title: Types Map
description: (This component is experimental. Use at your own risk.) Represents a mapping between a current type and its corresponding target type.
Expand All @@ -1814,6 +1827,7 @@ definitions:
- type: array
items:
type: string
- "$ref": "#/definitions/ComplexFieldType"
current_type:
anyOf:
- type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -736,8 +736,13 @@ class HttpResponseFilter(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class ComplexFieldType(BaseModel):
field_type: str
items: Optional[Union[str, ComplexFieldType]] = None


class TypesMap(BaseModel):
target_type: Union[str, List[str]]
target_type: Union[str, List[str], ComplexFieldType]
current_type: Union[str, List[str]]
condition: Optional[str] = None

Expand Down Expand Up @@ -2260,6 +2265,7 @@ class DynamicDeclarativeStream(BaseModel):
)


ComplexFieldType.update_forward_refs()
CompositeErrorHandler.update_forward_refs()
DeclarativeSource1.update_forward_refs()
DeclarativeSource2.update_forward_refs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CheckStream as CheckStreamModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ComplexFieldType as ComplexFieldTypeModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ComponentMappingDefinition as ComponentMappingDefinitionModel,
)
Expand Down Expand Up @@ -429,6 +432,7 @@
SimpleRetrieverTestReadDecorator,
)
from airbyte_cdk.sources.declarative.schema import (
ComplexFieldType,
DefaultSchemaLoader,
DynamicSchemaLoader,
InlineSchemaLoader,
Expand Down Expand Up @@ -572,6 +576,7 @@ def _init_mappings(self) -> None:
DynamicSchemaLoaderModel: self.create_dynamic_schema_loader,
SchemaTypeIdentifierModel: self.create_schema_type_identifier,
TypesMapModel: self.create_types_map,
ComplexFieldTypeModel: self.create_complex_field_type,
JwtAuthenticatorModel: self.create_jwt_authenticator,
LegacyToPerPartitionStateMigrationModel: self.create_legacy_to_per_partition_state_migration,
ListPartitionRouterModel: self.create_list_partition_router,
Expand Down Expand Up @@ -1894,10 +1899,26 @@ def create_inline_schema_loader(
) -> InlineSchemaLoader:
return InlineSchemaLoader(schema=model.schema_ or {}, parameters={})

@staticmethod
def create_types_map(model: TypesMapModel, **kwargs: Any) -> TypesMap:
def create_complex_field_type(
self, model: ComplexFieldTypeModel, config: Config, **kwargs: Any
) -> ComplexFieldType:
items = (
self._create_component_from_model(model=model.items, config=config)
if isinstance(model.items, ComplexFieldTypeModel)
else model.items
)

return ComplexFieldType(field_type=model.field_type, items=items)

def create_types_map(self, model: TypesMapModel, config: Config, **kwargs: Any) -> TypesMap:
target_type = (
self._create_component_from_model(model=model.target_type, config=config)
if isinstance(model.target_type, ComplexFieldTypeModel)
else model.target_type
)

return TypesMap(
target_type=model.target_type,
target_type=target_type,
current_type=model.current_type,
condition=model.condition if model.condition is not None else "True",
)
Expand Down
2 changes: 2 additions & 0 deletions airbyte_cdk/sources/declarative/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from airbyte_cdk.sources.declarative.schema.default_schema_loader import DefaultSchemaLoader
from airbyte_cdk.sources.declarative.schema.dynamic_schema_loader import (
ComplexFieldType,
DynamicSchemaLoader,
SchemaTypeIdentifier,
TypesMap,
Expand All @@ -18,6 +19,7 @@
"SchemaLoader",
"InlineSchemaLoader",
"DynamicSchemaLoader",
"ComplexFieldType",
"TypesMap",
"SchemaTypeIdentifier",
]
48 changes: 43 additions & 5 deletions airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from airbyte_cdk.sources.source import ExperimentalClassWarning
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState

AIRBYTE_DATA_TYPES: Mapping[str, Mapping[str, Any]] = {
AIRBYTE_DATA_TYPES: Mapping[str, MutableMapping[str, Any]] = {
"string": {"type": ["null", "string"]},
"boolean": {"type": ["null", "boolean"]},
"date": {"type": ["null", "string"], "format": "date"},
Expand All @@ -45,14 +45,33 @@
}


@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
@dataclass(frozen=True)
class ComplexFieldType:
"""
Identifies complex field type
"""

field_type: str
items: Optional[Union[str, "ComplexFieldType"]] = None

def __post_init__(self) -> None:
"""
Enforces that `items` is only used when `field_type` is a array
"""
# `items_type` is valid only for array target types
if self.items and self.field_type != "array":
raise ValueError("'items' can only be used when 'field_type' is an array.")


@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
@dataclass(frozen=True)
class TypesMap:
"""
Represents a mapping between a current type and its corresponding target type.
"""

target_type: Union[List[str], str]
target_type: Union[List[str], str, ComplexFieldType]
current_type: Union[List[str], str]
condition: Optional[str]

Expand Down Expand Up @@ -135,8 +154,9 @@ def get_json_schema(self) -> Mapping[str, Any]:
transformed_properties = self._transform(properties, {})

return {
"$schema": "http://json-schema.org/draft-07/schema#",
"$schema": "https://json-schema.org/draft-07/schema#",
"type": "object",
"additionalProperties": True,
"properties": transformed_properties,
}

Expand Down Expand Up @@ -188,18 +208,36 @@ def _get_type(
first_type = self._get_airbyte_type(mapped_field_type[0])
second_type = self._get_airbyte_type(mapped_field_type[1])
return {"oneOf": [first_type, second_type]}

elif isinstance(mapped_field_type, str):
return self._get_airbyte_type(mapped_field_type)

elif isinstance(mapped_field_type, ComplexFieldType):
return self._resolve_complex_type(mapped_field_type)

else:
raise ValueError(
f"Invalid data type. Available string or two items list of string. Got {mapped_field_type}."
)

def _resolve_complex_type(self, complex_type: ComplexFieldType) -> Mapping[str, Any]:
if not complex_type.items:
return self._get_airbyte_type(complex_type.field_type)

field_type = self._get_airbyte_type(complex_type.field_type)
field_type["items"] = (
self._get_airbyte_type(complex_type.items)
if isinstance(complex_type.items, str)
else self._resolve_complex_type(complex_type.items)
)

return field_type

def _replace_type_if_not_valid(
self,
field_type: Union[List[str], str],
raw_schema: MutableMapping[str, Any],
) -> Union[List[str], str]:
) -> Union[List[str], str, ComplexFieldType]:
"""
Replaces a field type if it matches a type mapping in `types_map`.
"""
Expand All @@ -216,7 +254,7 @@ def _replace_type_if_not_valid(
return field_type

@staticmethod
def _get_airbyte_type(field_type: str) -> Mapping[str, Any]:
def _get_airbyte_type(field_type: str) -> MutableMapping[str, Any]:
"""
Maps a field type to its corresponding Airbyte type definition.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,17 @@
"schema_pointer": ["fields"],
"key_pointer": ["name"],
"type_pointer": ["type"],
"types_mapping": [{"target_type": "string", "current_type": "singleLineText"}],
"types_mapping": [
{"target_type": "string", "current_type": "singleLineText"},
{
"target_type": {
"field_type": "array",
"items": {"field_type": "array", "items": "integer"},
},
"current_type": "formula",
"condition": "{{ raw_schema['result']['type'] == 'customInteger' }}",
},
],
},
},
},
Expand Down Expand Up @@ -150,7 +160,8 @@ def dynamic_schema_loader(mock_retriever, mock_schema_type_identifier):
]
),
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$schema": "https://json-schema.org/draft-07/schema#",
"additionalProperties": True,
"type": "object",
"properties": {
"name": {"type": ["null", "string"]},
Expand All @@ -171,7 +182,8 @@ def dynamic_schema_loader(mock_retriever, mock_schema_type_identifier):
]
),
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$schema": "https://json-schema.org/draft-07/schema#",
"additionalProperties": True,
"type": "object",
"properties": {
"name": {"type": ["null", "string"]},
Expand All @@ -191,7 +203,8 @@ def dynamic_schema_loader(mock_retriever, mock_schema_type_identifier):
]
),
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$schema": "https://json-schema.org/draft-07/schema#",
"additionalProperties": True,
"type": "object",
"properties": {
"address": {
Expand All @@ -204,7 +217,8 @@ def dynamic_schema_loader(mock_retriever, mock_schema_type_identifier):
# Test case: Empty record set
iter([]),
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$schema": "https://json-schema.org/draft-07/schema#",
"additionalProperties": True,
"type": "object",
"properties": {},
},
Expand Down Expand Up @@ -242,7 +256,8 @@ def test_dynamic_schema_loader_invalid_type(dynamic_schema_loader):

def test_dynamic_schema_loader_manifest_flow():
expected_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"$schema": "https://json-schema.org/draft-07/schema#",
"additionalProperties": True,
"type": "object",
"properties": {
"id": {"type": ["null", "integer"]},
Expand Down Expand Up @@ -314,7 +329,8 @@ def test_dynamic_schema_loader_with_type_conditions():
]["types_mapping"].append({"target_type": "array", "current_type": "formula"})

expected_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"$schema": "https://json-schema.org/draft-07/schema#",
"additionalProperties": True,
"type": "object",
"properties": {
"id": {"type": ["null", "integer"]},
Expand All @@ -324,6 +340,10 @@ def test_dynamic_schema_loader_with_type_conditions():
"currency": {"type": ["null", "number"]},
"salary": {"type": ["null", "number"]},
"working_days": {"type": ["null", "array"]},
"avg_salary": {
"type": ["null", "array"],
"items": {"type": ["null", "array"], "items": {"type": ["null", "integer"]}},
},
},
}
source = ConcurrentDeclarativeSource(
Expand Down Expand Up @@ -365,6 +385,12 @@ def test_dynamic_schema_loader_with_type_conditions():
{"name": "FirstName", "type": "string"},
{"name": "Description", "type": "singleLineText"},
{"name": "Salary", "type": "formula", "result": {"type": "number"}},
{
"name": "AvgSalary",
"type": "formula",
"result": {"type": "customInteger"},
},
{"name": "Currency", "type": "formula", "result": {"type": "currency"}},
{"name": "Currency", "type": "formula", "result": {"type": "currency"}},
{"name": "WorkingDays", "type": "formula"},
]
Expand Down

0 comments on commit 4ea9d94

Please sign in to comment.