Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code): add items mappings to dynamic schemas #256

Merged
merged 21 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1800,6 +1800,44 @@ definitions:
$parameters:
type: object
additionalProperties: true

PropertyTypesMap:
title: Types Map
description: (This component is experimental. Use at your own risk.) Represents a mapping between a current type and its corresponding target type for property.
type: object
required:
- property_name
- property_type_pinter
- type_mapping
properties:
property_name:
type: string
property_type_pointer:
title: Key Path
description: List of potentially nested fields describing the full path of the property type to extract.
type: array
items:
- type: string
type_mapping:
"$ref": "#/definitions/TypesMap"
ItemsTypeMap:
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
title: Types Map
description: (This component is experimental. Use at your own risk.) Represents a mapping between a current type and its corresponding target type for property.
type: object
required:
- items_type_pinter
- type_mapping
properties:
property_name:
type: string
items_type_pointer:
title: Items Type Path
description: List of potentially nested fields describing the full path of the items type to extract.
type: array
items:
- type: string
type_mapping:
"$ref": "#/definitions/TypesMap"
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
TypesMap:
title: Types Map
description: (This component is experimental. Use at your own risk.) Represents a mapping between a current type and its corresponding target type.
Expand All @@ -1824,6 +1862,12 @@ definitions:
type: string
interpolation_context:
- raw_schema
items_type:
"$ref": "#/definitions/ItemsTypeMap"
properties_types:
type: array
items:
- "$ref": "#/definitions/PropertyTypesMap"
SchemaTypeIdentifier:
title: Schema Type Identifier
description: (This component is experimental. Use at your own risk.) Identifies schema details for dynamic schema extraction and processing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -736,33 +736,6 @@ class HttpResponseFilter(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class TypesMap(BaseModel):
target_type: Union[str, List[str]]
current_type: Union[str, List[str]]
condition: Optional[str] = None


class SchemaTypeIdentifier(BaseModel):
type: Optional[Literal["SchemaTypeIdentifier"]] = None
schema_pointer: Optional[List[str]] = Field(
[],
description="List of nested fields defining the schema field path to extract. Defaults to [].",
title="Schema Path",
)
key_pointer: List[str] = Field(
...,
description="List of potentially nested fields describing the full path of the field key to extract.",
title="Key Path",
)
type_pointer: Optional[List[str]] = Field(
None,
description="List of potentially nested fields describing the full path of the field type to extract.",
title="Type Path",
)
types_mapping: Optional[List[TypesMap]] = None
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class InlineSchemaLoader(BaseModel):
type: Literal["InlineSchemaLoader"]
schema_: Optional[Dict[str, Any]] = Field(
Expand Down Expand Up @@ -2020,6 +1993,55 @@ class HttpRequester(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class PropertyTypesMap(BaseModel):
property_name: str
property_type_pointer: Optional[List[str]] = Field(
None,
description="List of potentially nested fields describing the full path of the property type to extract.",
title="Key Path",
)
type_mapping: TypesMap


class ItemsTypeMap(BaseModel):
property_name: Optional[str] = None
items_type_pointer: Optional[List[str]] = Field(
None,
description="List of potentially nested fields describing the full path of the items type to extract.",
title="Items Type Path",
)
type_mapping: TypesMap


class TypesMap(BaseModel):
target_type: Union[str, List[str]]
current_type: Union[str, List[str]]
condition: Optional[str] = None
items_type: Optional[ItemsTypeMap] = None
properties_types: Optional[List[PropertyTypesMap]] = None


class SchemaTypeIdentifier(BaseModel):
type: Optional[Literal["SchemaTypeIdentifier"]] = None
schema_pointer: Optional[List[str]] = Field(
[],
description="List of nested fields defining the schema field path to extract. Defaults to [].",
title="Schema Path",
)
key_pointer: List[str] = Field(
...,
description="List of potentially nested fields describing the full path of the field key to extract.",
title="Key Path",
)
type_pointer: Optional[List[str]] = Field(
None,
description="List of potentially nested fields describing the full path of the field type to extract.",
title="Type Path",
)
types_mapping: Optional[List[TypesMap]] = None
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class DynamicSchemaLoader(BaseModel):
type: Literal["DynamicSchemaLoader"]
retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
Expand Down Expand Up @@ -2266,6 +2288,8 @@ class DynamicDeclarativeStream(BaseModel):
SelectiveAuthenticator.update_forward_refs()
DeclarativeStream.update_forward_refs()
SessionTokenAuthenticator.update_forward_refs()
PropertyTypesMap.update_forward_refs()
ItemsTypeMap.update_forward_refs()
DynamicSchemaLoader.update_forward_refs()
SimpleRetriever.update_forward_refs()
AsyncRetriever.update_forward_refs()
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
InlineSchemaLoader as InlineSchemaLoaderModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ItemsTypeMap as ItemsTypeMapModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
IterableDecoder as IterableDecoderModel,
)
Expand Down Expand Up @@ -310,6 +313,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ParentStreamConfig as ParentStreamConfigModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
PropertyTypesMap as PropertyTypesMapModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
RecordFilter as RecordFilterModel,
)
Expand Down Expand Up @@ -432,7 +438,9 @@
DefaultSchemaLoader,
DynamicSchemaLoader,
InlineSchemaLoader,
ItemsTypeMap,
JsonFileSchemaLoader,
PropertyTypesMap,
SchemaTypeIdentifier,
TypesMap,
)
Expand Down Expand Up @@ -572,6 +580,8 @@ def _init_mappings(self) -> None:
DynamicSchemaLoaderModel: self.create_dynamic_schema_loader,
SchemaTypeIdentifierModel: self.create_schema_type_identifier,
TypesMapModel: self.create_types_map,
PropertyTypesMapModel: self.create_property_types_map,
ItemsTypeMapModel: self.create_items_type_map,
JwtAuthenticatorModel: self.create_jwt_authenticator,
LegacyToPerPartitionStateMigrationModel: self.create_legacy_to_per_partition_state_migration,
ListPartitionRouterModel: self.create_list_partition_router,
Expand Down Expand Up @@ -1894,12 +1904,50 @@ def create_inline_schema_loader(
) -> InlineSchemaLoader:
return InlineSchemaLoader(schema=model.schema_ or {}, parameters={})

@staticmethod
def create_types_map(model: TypesMapModel, **kwargs: Any) -> TypesMap:
def create_property_types_map(
self, model: PropertyTypesMapModel, config: Config, **kwargs: Any
) -> PropertyTypesMap:
type_mapping = self._create_component_from_model(model=model.type_mapping, config=config)
model_property_type_pointer: List[Union[InterpolatedString, str]] = (
[x for x in model.property_type_pointer] if model.property_type_pointer else []
)
return PropertyTypesMap(
property_name=model.property_name,
property_type_pointer=model_property_type_pointer,
type_mapping=type_mapping,
)

def create_items_type_map(
self, model: ItemsTypeMapModel, config: Config, **kwargs: Any
) -> ItemsTypeMap:
type_mapping = self._create_component_from_model(model=model.type_mapping, config=config)
model_items_type_pointer: List[Union[InterpolatedString, str]] = (
[x for x in model.items_type_pointer] if model.items_type_pointer else []
)
return ItemsTypeMap(items_type_pointer=model_items_type_pointer, type_mapping=type_mapping)

def create_types_map(self, model: TypesMapModel, config: Config, **kwargs: Any) -> TypesMap:
items_type = (
self._create_component_from_model(model=model.items_type, config=config)
if model.items_type
else model.items_type
)

properties_types = []
if model.properties_types:
properties_types.extend(
[
self._create_component_from_model(property_type, config=config)
for property_type in model.properties_types
]
)

return TypesMap(
target_type=model.target_type,
current_type=model.current_type,
condition=model.condition if model.condition is not None else "True",
items_type=items_type,
properties_types=properties_types,
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
)

def create_schema_type_identifier(
Expand Down
4 changes: 4 additions & 0 deletions airbyte_cdk/sources/declarative/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from airbyte_cdk.sources.declarative.schema.default_schema_loader import DefaultSchemaLoader
from airbyte_cdk.sources.declarative.schema.dynamic_schema_loader import (
DynamicSchemaLoader,
ItemsTypeMap,
PropertyTypesMap,
SchemaTypeIdentifier,
TypesMap,
)
Expand All @@ -19,5 +21,7 @@
"InlineSchemaLoader",
"DynamicSchemaLoader",
"TypesMap",
"PropertyTypesMap",
"ItemsTypeMap",
"SchemaTypeIdentifier",
]
Loading
Loading