forked from datahub-project/datahub
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(ingestion/transformer): Add dataset dataproduct transformer (dat…
…ahub-project#9491) Co-authored-by: Harshal Sheth <[email protected]>
- Loading branch information
1 parent
f4b05a8
commit cb80024
Showing
7 changed files
with
390 additions
and
10 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
133 changes: 133 additions & 0 deletions
133
metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_dataproduct.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
import logging | ||
from typing import Callable, Dict, List, Optional, Union | ||
|
||
import pydantic | ||
|
||
from datahub.configuration.common import ConfigModel, KeyValuePattern | ||
from datahub.configuration.import_resolver import pydantic_resolve_key | ||
from datahub.emitter.mce_builder import Aspect | ||
from datahub.emitter.mcp import MetadataChangeProposalWrapper | ||
from datahub.ingestion.api.common import PipelineContext | ||
from datahub.ingestion.transformer.dataset_transformer import ( | ||
DatasetDataproductTransformer, | ||
) | ||
from datahub.metadata.schema_classes import MetadataChangeProposalClass | ||
from datahub.specific.dataproduct import DataProductPatchBuilder | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class AddDatasetDataProductConfig(ConfigModel): | ||
# dataset_urn -> data product urn | ||
get_data_product_to_add: Callable[[str], Optional[str]] | ||
|
||
_resolve_data_product_fn = pydantic_resolve_key("get_data_product_to_add") | ||
|
||
|
||
class AddDatasetDataProduct(DatasetDataproductTransformer): | ||
"""Transformer that adds dataproduct entity for provided dataset as its asset according to a callback function.""" | ||
|
||
ctx: PipelineContext | ||
config: AddDatasetDataProductConfig | ||
|
||
def __init__(self, config: AddDatasetDataProductConfig, ctx: PipelineContext): | ||
super().__init__() | ||
self.ctx = ctx | ||
self.config = config | ||
|
||
@classmethod | ||
def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetDataProduct": | ||
config = AddDatasetDataProductConfig.parse_obj(config_dict) | ||
return cls(config, ctx) | ||
|
||
def transform_aspect( | ||
self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect] | ||
) -> Optional[Aspect]: | ||
return None | ||
|
||
def handle_end_of_stream( | ||
self, | ||
) -> List[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]: | ||
data_products: Dict[str, DataProductPatchBuilder] = {} | ||
|
||
logger.debug("Generating dataproducts") | ||
for entity_urn in self.entity_map.keys(): | ||
data_product_urn = self.config.get_data_product_to_add(entity_urn) | ||
if data_product_urn: | ||
if data_product_urn not in data_products: | ||
data_products[data_product_urn] = DataProductPatchBuilder( | ||
data_product_urn | ||
).add_asset(entity_urn) | ||
else: | ||
data_products[data_product_urn] = data_products[ | ||
data_product_urn | ||
].add_asset(entity_urn) | ||
|
||
mcps: List[ | ||
Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass] | ||
] = [] | ||
for data_product in data_products.values(): | ||
mcps.extend(list(data_product.build())) | ||
return mcps | ||
|
||
|
||
class SimpleDatasetDataProductConfig(ConfigModel): | ||
dataset_to_data_product_urns: Dict[str, str] | ||
|
||
|
||
class SimpleAddDatasetDataProduct(AddDatasetDataProduct): | ||
"""Transformer that adds a specified dataproduct entity for provided dataset as its asset.""" | ||
|
||
def __init__(self, config: SimpleDatasetDataProductConfig, ctx: PipelineContext): | ||
|
||
generic_config = AddDatasetDataProductConfig( | ||
get_data_product_to_add=lambda dataset_urn: config.dataset_to_data_product_urns.get( | ||
dataset_urn | ||
), | ||
) | ||
super().__init__(generic_config, ctx) | ||
|
||
@classmethod | ||
def create( | ||
cls, config_dict: dict, ctx: PipelineContext | ||
) -> "SimpleAddDatasetDataProduct": | ||
config = SimpleDatasetDataProductConfig.parse_obj(config_dict) | ||
return cls(config, ctx) | ||
|
||
|
||
class PatternDatasetDataProductConfig(ConfigModel): | ||
dataset_to_data_product_urns_pattern: KeyValuePattern = KeyValuePattern.all() | ||
|
||
@pydantic.root_validator(pre=True) | ||
def validate_pattern_value(cls, values: Dict) -> Dict: | ||
rules = values["dataset_to_data_product_urns_pattern"]["rules"] | ||
for key, value in rules.items(): | ||
if isinstance(value, list) and len(value) > 1: | ||
raise ValueError( | ||
"Same dataset cannot be an asset of two different data product." | ||
) | ||
elif isinstance(value, str): | ||
rules[key] = [rules[key]] | ||
return values | ||
|
||
|
||
class PatternAddDatasetDataProduct(AddDatasetDataProduct): | ||
"""Transformer that adds a specified dataproduct entity for provided dataset as its asset.""" | ||
|
||
def __init__(self, config: PatternDatasetDataProductConfig, ctx: PipelineContext): | ||
dataset_to_data_product = config.dataset_to_data_product_urns_pattern | ||
generic_config = AddDatasetDataProductConfig( | ||
get_data_product_to_add=lambda dataset_urn: dataset_to_data_product.value( | ||
dataset_urn | ||
)[0] | ||
if dataset_to_data_product.value(dataset_urn) | ||
else None, | ||
) | ||
super().__init__(generic_config, ctx) | ||
|
||
@classmethod | ||
def create( | ||
cls, config_dict: dict, ctx: PipelineContext | ||
) -> "PatternAddDatasetDataProduct": | ||
config = PatternDatasetDataProductConfig.parse_obj(config_dict) | ||
return cls(config, ctx) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.