Skip to content

Commit

Permalink
fix(ingest/unity): generate sibling and lineage (datahub-project#9894)
Browse files Browse the repository at this point in the history
  • Loading branch information
dushayntAW authored Mar 18, 2024
1 parent ad4da57 commit 3a4bdef
Show file tree
Hide file tree
Showing 5 changed files with 497 additions and 2,156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ source:
deny:
- ".*\\.unwanted_schema"

# emit_siblings: true
# delta_lake_options:
# platform_instance_name: null
# env: 'PROD'

# profiling:
# method: "analyze"
# enabled: true
Expand Down
17 changes: 17 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/unity/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ class UnityCatalogProfilerConfig(ConfigModel):
)


class DeltaLakeDetails(ConfigModel):
platform_instance_name: Optional[str] = Field(
default=None, description="Delta-lake paltform instance name"
)
env: str = Field(default="PROD", description="Delta-lake environment")


class UnityCatalogAnalyzeProfilerConfig(UnityCatalogProfilerConfig):
method: Literal["analyze"] = "analyze"

Expand Down Expand Up @@ -253,6 +260,16 @@ class UnityCatalogSourceConfig(
discriminator="method",
)

emit_siblings: bool = pydantic.Field(
default=True,
description="Whether to emit siblings relation with corresponding delta-lake platform's table. If enabled, this will also ingest the corresponding delta-lake table.",
)

delta_lake_options: DeltaLakeDetails = Field(
default=DeltaLakeDetails(),
description="Details about the delta lake, incase to emit siblings",
)

scheme: str = DATABRICKS

def get_sql_alchemy_url(self, database: Optional[str] = None) -> str:
Expand Down
63 changes: 62 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/unity/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@
TestConnectionReport,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws.s3_util import make_s3_urn_for_lineage
from datahub.ingestion.source.aws.s3_util import (
make_s3_urn_for_lineage,
strip_s3_prefix,
)
from datahub.ingestion.source.common.subtypes import (
DatasetContainerSubTypes,
DatasetSubTypes,
Expand Down Expand Up @@ -80,9 +83,13 @@
)
from datahub.ingestion.source.unity.report import UnityCatalogReport
from datahub.ingestion.source.unity.usage import UnityCatalogUsageExtractor
from datahub.metadata.com.linkedin.pegasus2avro.common import Siblings
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageType,
FineGrainedLineage,
FineGrainedLineageUpstreamType,
Upstream,
UpstreamLineage,
ViewProperties,
)
from datahub.metadata.schema_classes import (
Expand Down Expand Up @@ -491,6 +498,25 @@ def process_table(self, table: Table, schema: Schema) -> Iterable[MetadataWorkUn
if table.view_definition:
self.view_definitions[dataset_urn] = (table.ref, table.view_definition)

# generate sibling and lineage aspects in case of EXTERNAL DELTA TABLE
if (
table_props.customProperties.get("table_type") == "EXTERNAL"
and table_props.customProperties.get("data_source_format") == "DELTA"
and self.config.emit_siblings
):
storage_location = str(table_props.customProperties.get("storage_location"))
if storage_location.startswith("s3://"):
browse_path = strip_s3_prefix(storage_location)
source_dataset_urn = make_dataset_urn_with_platform_instance(
"delta-lake",
browse_path,
self.config.delta_lake_options.platform_instance_name,
self.config.delta_lake_options.env,
)

yield from self.gen_siblings_workunit(dataset_urn, source_dataset_urn)
yield from self.gen_lineage_workunit(dataset_urn, source_dataset_urn)

yield from [
mcp.as_workunit()
for mcp in MetadataChangeProposalWrapper.construct_many(
Expand Down Expand Up @@ -947,3 +973,38 @@ def close(self):
self.sql_parser_schema_resolver.close()

super().close()

def gen_siblings_workunit(
self,
dataset_urn: str,
source_dataset_urn: str,
) -> Iterable[MetadataWorkUnit]:
"""
Generate sibling workunit for both unity-catalog dataset and its connector source dataset
"""
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=Siblings(primary=False, siblings=[source_dataset_urn]),
).as_workunit()

yield MetadataChangeProposalWrapper(
entityUrn=source_dataset_urn,
aspect=Siblings(primary=True, siblings=[dataset_urn]),
).as_workunit(is_primary_source=False)

def gen_lineage_workunit(
self,
dataset_urn: str,
source_dataset_urn: str,
) -> Iterable[MetadataWorkUnit]:
"""
Generate dataset to source connector lineage workunit
"""
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=UpstreamLineage(
upstreams=[
Upstream(dataset=source_dataset_urn, type=DatasetLineageType.VIEW)
]
),
).as_workunit()
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,6 @@ def register_mock_data(workspace_client):
workspace_client.catalogs.list.return_value = [
CatalogInfo.from_dict(d)
for d in [
{
"name": "main",
"owner": "account users",
"comment": "Main catalog (auto-created)",
"metastore_id": "2c983545-d403-4f87-9063-5b7e3b6d3736",
"created_at": 1666185153376,
"created_by": "[email protected]",
"updated_at": 1666186071115,
"updated_by": "[email protected]",
"catalog_type": "MANAGED_CATALOG",
},
{
"name": "quickstart_catalog",
"owner": "account users",
Expand All @@ -87,50 +76,13 @@ def register_mock_data(workspace_client):
"updated_at": 1666186064332,
"updated_by": "[email protected]",
"catalog_type": "MANAGED_CATALOG",
},
{
"name": "system",
"owner": SERVICE_PRINCIPAL_ID_2,
"comment": "System catalog (auto-created)",
"metastore_id": "2c983545-d403-4f87-9063-5b7e3b6d3736",
"created_at": 1666185153391,
"created_by": "System user",
"updated_at": 1666185153391,
"updated_by": "System user",
"catalog_type": "SYSTEM_CATALOG",
},
}
]
]

workspace_client.schemas.list.return_value = [
SchemaInfo.from_dict(d)
for d in [
{
"name": "default",
"catalog_name": "quickstart_catalog",
"owner": "[email protected]",
"comment": "Default schema (auto-created)",
"metastore_id": "2c983545-d403-4f87-9063-5b7e3b6d3736",
"full_name": "quickstart_catalog.default",
"created_at": 1666185610021,
"created_by": "[email protected]",
"updated_at": 1666185610021,
"updated_by": "[email protected]",
"catalog_type": "MANAGED_CATALOG",
},
{
"name": "information_schema",
"catalog_name": "quickstart_catalog",
"owner": SERVICE_PRINCIPAL_ID_1,
"comment": "Information schema (auto-created)",
"metastore_id": "2c983545-d403-4f87-9063-5b7e3b6d3736",
"full_name": "quickstart_catalog.information_schema",
"created_at": 1666185610024,
"created_by": "System user",
"updated_at": 1666185610024,
"updated_by": "System user",
"catalog_type": "MANAGED_CATALOG",
},
{
"name": "quickstart_schema",
"catalog_name": "quickstart_catalog",
Expand Down Expand Up @@ -199,7 +151,57 @@ def register_mock_data(workspace_client):
"updated_by": "[email protected]",
"table_id": "cff27aa1-1c6a-4d78-b713-562c660c2896",
}
)
),
databricks.sdk.service.catalog.TableInfo.from_dict(
{
"name": "quickstart_table_external",
"catalog_name": "quickstart_catalog",
"schema_name": "quickstart_schema",
"table_type": "EXTERNAL",
"data_source_format": "DELTA",
"columns": [
{
"name": "columnA",
"type_text": "int",
"type_json": '{"name":"columnA","type":"integer","nullable":true,"metadata":{}}',
"type_name": "INT",
"type_precision": 0,
"type_scale": 0,
"position": 0,
"nullable": True,
},
{
"name": "columnB",
"type_text": "string",
"type_json": '{"name":"columnB","type":"string","nullable":true,"metadata":{}}',
"type_name": "STRING",
"type_precision": 0,
"type_scale": 0,
"position": 1,
"nullable": True,
},
],
"storage_location": "s3://db-02eec1f70bfe4115445be9fdb1aac6ac-s3-root-bucket/metastore/2c983545-d403-4f87-9063-5b7e3b6d3736/tables/cff27aa1-1c6a-4d78-b713-562c660c2896",
"owner": "account users",
"properties": {
"delta.lastCommitTimestamp": "1666185711000",
"delta.lastUpdateVersion": "1",
"delta.minReaderVersion": "1",
"delta.minWriterVersion": "2",
"spark.sql.statistics.numRows": "10",
"spark.sql.statistics.totalSize": "512",
},
"generation": 2,
"metastore_id": "2c983545-d403-4f87-9063-5b7e3b6d3736",
"full_name": "quickstart_catalog.quickstart_schema.quickstart_table_external",
"data_access_configuration_id": "00000000-0000-0000-0000-000000000000",
"created_at": 1666185698688,
"created_by": "[email protected]",
"updated_at": 1666186049633,
"updated_by": "[email protected]",
"table_id": "cff27aa1-1c6a-4d78-b713-562c660c2896",
}
),
]

workspace_client.tables.get = lambda *args, **kwargs: databricks.sdk.service.catalog.TableInfo.from_dict(
Expand Down Expand Up @@ -409,6 +411,11 @@ def test_ingestion(pytestconfig, tmp_path, requests_mock):
"include_ownership": True,
"include_hive_metastore": True,
"warehouse_id": "test",
"emit_siblings": True,
"delta_lake_options": {
"platform_instance_name": None,
"env": "PROD",
},
"profiling": {
"enabled": True,
"method": "analyze",
Expand Down
Loading

0 comments on commit 3a4bdef

Please sign in to comment.