diff --git a/metadata-ingestion/docs/sources/kafka/kafka.md b/metadata-ingestion/docs/sources/kafka/kafka.md
index 2e8baa9516d17c..9fdfc3a3af1d02 100644
--- a/metadata-ingestion/docs/sources/kafka/kafka.md
+++ b/metadata-ingestion/docs/sources/kafka/kafka.md
@@ -130,3 +130,86 @@ message MessageWithMap {
repeated Map1Entry map_1 = 1;
}
```
+
+### Enriching DataHub metadata with automated meta mapping
+
+:::note
+Meta mapping is currently only available for Avro schemas
+:::
+
+Avro schemas are permitted to have additional attributes not defined by the specification as arbitrary metadata. A common pattern is to utilize this for business metadata. The Kafka source has the ability to transform this directly into DataHub Owners, Tags and Terms.
+
+#### Simple tags
+
+If you simply have a list of tags embedded into an Avro schema (either at the top-level or for an individual field), you can use the `schema_tags_field` config.
+
+Example Avro schema:
+
+```json
+{
+ "name": "sampleRecord",
+ "type": "record",
+ "tags": ["tag1", "tag2"],
+ "fields": [{
+ "name": "field_1",
+ "type": "string",
+ "tags": ["tag3", "tag4"]
+ }]
+}
+```
+
+The name of the field containing a list of tags can be configured with the `schema_tags_field` property:
+
+```yaml
+config:
+ schema_tags_field: tags
+```
+
+#### meta mapping
+
+You can also map specific Avro fields into Owners, Tags and Terms using meta
+mapping.
+
+Example Avro schema:
+
+```json
+{
+ "name": "sampleRecord",
+ "type": "record",
+ "owning_team": "@Data-Science",
+ "data_tier": "Bronze",
+ "fields": [{
+ "name": "field_1",
+ "type": "string",
+ "gdpr": {
+ "pii": true
+ }
+ }]
+}
+```
+
+This can be mapped to DataHub metadata with `meta_mapping` config:
+
+```yaml
+config:
+ meta_mapping:
+ owning_team:
+ match: "^@(.*)"
+ operation: "add_owner"
+ config:
+ owner_type: group
+ data_tier:
+ match: "Bronze|Silver|Gold"
+ operation: "add_term"
+ config:
+ term: "{{ $match }}"
+ field_meta_mapping:
+ gdpr.pii:
+ match: true
+ operation: "add_tag"
+ config:
+ tag: "pii"
+```
+
+The underlying implementation is similar to [dbt meta mapping](https://datahubproject.io/docs/generated/ingestion/sources/dbt#dbt-meta-automated-mappings), which has more detailed examples that can be used for reference.
+
diff --git a/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py b/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py
index 75de18e9037eea..4acf99a50e50ed 100644
--- a/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py
+++ b/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py
@@ -4,6 +4,7 @@
import avro.schema
+from datahub.emitter import mce_builder
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
ArrayTypeClass,
BooleanTypeClass,
@@ -21,7 +22,7 @@
TimeTypeClass,
UnionTypeClass,
)
-from datahub.metadata.schema_classes import GlobalTagsClass, TagAssociationClass
+from datahub.utilities.mapping import Constants, OperationProcessor
"""A helper file for Avro schema -> MCE schema transformations"""
@@ -98,7 +99,14 @@ class AvroToMceSchemaConverter:
"uuid": StringTypeClass,
}
- def __init__(self, is_key_schema: bool, default_nullable: bool = False) -> None:
+ def __init__(
+ self,
+ is_key_schema: bool,
+ default_nullable: bool = False,
+ meta_mapping_processor: Optional[OperationProcessor] = None,
+ schema_tags_field: Optional[str] = None,
+ tag_prefix: Optional[str] = None,
+ ) -> None:
# Tracks the prefix name stack for nested name generation.
self._prefix_name_stack: PrefixNameStack = [self.version_string]
# Tracks the fields on the current path.
@@ -112,6 +120,10 @@ def __init__(self, is_key_schema: bool, default_nullable: bool = False) -> None:
if is_key_schema:
# Helps maintain backwards-compatibility. Annotation for any field that is part of key-schema.
self._prefix_name_stack.append("[key=True]")
+ # Meta mapping
+ self._meta_mapping_processor = meta_mapping_processor
+ self._schema_tags_field = schema_tags_field
+ self._tag_prefix = tag_prefix
# Map of avro schema type to the conversion handler
self._avro_type_to_mce_converter_map: Dict[
avro.schema.Schema,
@@ -317,7 +329,25 @@ def emit(self) -> Generator[SchemaField, None, None]:
merged_props.update(self._schema.other_props)
merged_props.update(schema.other_props)
- tags = None
+ # Parse meta_mapping
+ meta_aspects: Dict[str, Any] = {}
+ if self._converter._meta_mapping_processor:
+ meta_aspects = self._converter._meta_mapping_processor.process(
+ merged_props
+ )
+
+ tags: List[str] = []
+ if self._converter._schema_tags_field:
+ for tag in merged_props.get(self._converter._schema_tags_field, []):
+ tags.append(self._converter._tag_prefix + tag)
+
+ meta_tags_aspect = meta_aspects.get(Constants.ADD_TAG_OPERATION)
+ if meta_tags_aspect:
+ tags += [
+ tag_association.tag[len("urn:li:tag:") :]
+ for tag_association in meta_tags_aspect.tags
+ ]
+
if "deprecated" in merged_props:
description = (
f"DEPRECATED: {merged_props['deprecated']}\n"
@@ -325,9 +355,13 @@ def emit(self) -> Generator[SchemaField, None, None]:
if description
else ""
)
- tags = GlobalTagsClass(
- tags=[TagAssociationClass(tag="urn:li:tag:Deprecated")]
- )
+ tags.append("Deprecated")
+
+ tags_aspect = None
+ if tags:
+ tags_aspect = mce_builder.make_global_tag_aspect_with_tag_list(tags)
+
+ meta_terms_aspect = meta_aspects.get(Constants.ADD_TERM_OPERATION)
logical_type_name: Optional[str] = (
# logicalType nested inside type
@@ -349,7 +383,8 @@ def emit(self) -> Generator[SchemaField, None, None]:
recursive=False,
nullable=self._converter._is_nullable(schema),
isPartOfKey=self._converter._is_key_schema,
- globalTags=tags,
+ globalTags=tags_aspect,
+ glossaryTerms=meta_terms_aspect,
jsonProps=json.dumps(merged_props) if merged_props else None,
)
yield field
@@ -447,7 +482,9 @@ def _gen_from_non_field_nested_schemas(
actual_schema = self._get_underlying_type_if_option_as_union(schema, schema)
with AvroToMceSchemaConverter.SchemaFieldEmissionContextManager(
- schema, actual_schema, self
+ schema,
+ actual_schema,
+ self,
) as fe_schema:
if isinstance(
actual_schema,
@@ -478,7 +515,9 @@ def _gen_non_nested_to_mce_fields(
) -> Generator[SchemaField, None, None]:
"""Handles generation of MCE SchemaFields for non-nested AVRO types."""
with AvroToMceSchemaConverter.SchemaFieldEmissionContextManager(
- schema, schema, self
+ schema,
+ schema,
+ self,
) as non_nested_emitter:
yield from non_nested_emitter.emit()
@@ -496,9 +535,12 @@ def _to_mce_fields(
@classmethod
def to_mce_fields(
cls,
- avro_schema_string: str,
+ avro_schema: avro.schema.Schema,
is_key_schema: bool,
default_nullable: bool = False,
+ meta_mapping_processor: Optional[OperationProcessor] = None,
+ schema_tags_field: Optional[str] = None,
+ tag_prefix: Optional[str] = None,
) -> Generator[SchemaField, None, None]:
"""
Converts a key or value type AVRO schema string to appropriate MCE SchemaFields.
@@ -506,8 +548,14 @@ def to_mce_fields(
:param is_key_schema: True if it is a key-schema.
:return: An MCE SchemaField generator.
"""
- avro_schema = avro.schema.parse(avro_schema_string)
- converter = cls(is_key_schema, default_nullable)
+ # avro_schema = avro.schema.parse(avro_schema)
+ converter = cls(
+ is_key_schema,
+ default_nullable,
+ meta_mapping_processor,
+ schema_tags_field,
+ tag_prefix,
+ )
yield from converter._to_mce_fields(avro_schema)
@@ -516,28 +564,40 @@ def to_mce_fields(
def avro_schema_to_mce_fields(
- avro_schema_string: str,
+ avro_schema: Union[avro.schema.Schema, str],
is_key_schema: bool = False,
default_nullable: bool = False,
+ meta_mapping_processor: Optional[OperationProcessor] = None,
+ schema_tags_field: Optional[str] = None,
+ tag_prefix: Optional[str] = None,
swallow_exceptions: bool = True,
) -> List[SchemaField]:
"""
Converts an avro schema into schema fields compatible with MCE.
- :param avro_schema_string: String representation of the AVRO schema.
+ :param avro_schema: AVRO schema, either as a string or as an avro.schema.Schema object.
:param is_key_schema: True if it is a key-schema. Default is False (value-schema).
:param swallow_exceptions: True if the caller wants exceptions to be suppressed
+ :param action_processor: Optional OperationProcessor to be used for meta mappings
:return: The list of MCE compatible SchemaFields.
"""
try:
+ if isinstance(avro_schema, str):
+ avro_schema = avro.schema.parse(avro_schema)
+
return list(
AvroToMceSchemaConverter.to_mce_fields(
- avro_schema_string, is_key_schema, default_nullable
+ avro_schema,
+ is_key_schema,
+ default_nullable,
+ meta_mapping_processor,
+ schema_tags_field,
+ tag_prefix,
)
)
except Exception:
if swallow_exceptions:
- logger.exception(f"Failed to parse {avro_schema_string} into mce fields.")
+ logger.exception(f"Failed to parse {avro_schema} into mce fields.")
return []
else:
raise
diff --git a/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py b/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py
index 0bdcb115b377c8..54475cb509621d 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py
@@ -4,6 +4,7 @@
from hashlib import md5
from typing import Any, List, Optional, Set, Tuple
+import avro.schema
import jsonref
from confluent_kafka.schema_registry.schema_registry_client import (
RegisteredSchema,
@@ -22,6 +23,8 @@
SchemaField,
SchemaMetadata,
)
+from datahub.metadata.schema_classes import OwnershipSourceTypeClass
+from datahub.utilities.mapping import OperationProcessor
logger = logging.getLogger(__name__)
@@ -59,6 +62,14 @@ def __init__(
except Exception as e:
logger.warning(f"Failed to get subjects from schema registry: {e}")
+ self.field_meta_processor = OperationProcessor(
+ self.source_config.field_meta_mapping,
+ self.source_config.tag_prefix,
+ OwnershipSourceTypeClass.SERVICE,
+ self.source_config.strip_user_ids_from_email,
+ match_nested_props=True,
+ )
+
@classmethod
def create(
cls, source_config: KafkaSourceConfig, report: KafkaSourceReport
@@ -290,10 +301,19 @@ def _get_schema_fields(
fields: List[SchemaField] = []
if schema.schema_type == "AVRO":
cleaned_str: str = self.get_schema_str_replace_confluent_ref_avro(schema)
+ avro_schema = avro.schema.parse(cleaned_str)
+
# "value.id" or "value.[type=string]id"
fields = schema_util.avro_schema_to_mce_fields(
- cleaned_str, is_key_schema=is_key_schema
+ avro_schema,
+ is_key_schema=is_key_schema,
+ meta_mapping_processor=self.field_meta_processor
+ if self.source_config.enable_meta_mapping
+ else None,
+ schema_tags_field=self.source_config.schema_tags_field,
+ tag_prefix=self.source_config.tag_prefix,
)
+
elif schema.schema_type == "PROTOBUF":
imported_schemas: List[
ProtobufSchema
diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka.py
index 61f6103347eb37..566304e1999b79 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/kafka.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/kafka.py
@@ -5,6 +5,7 @@
from enum import Enum
from typing import Any, Dict, Iterable, List, Optional, Type
+import avro.schema
import confluent_kafka
import confluent_kafka.admin
import pydantic
@@ -18,6 +19,7 @@
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.kafka import KafkaConsumerConnectionConfig
from datahub.configuration.source_common import DatasetSourceConfigMixin
+from datahub.emitter import mce_builder
from datahub.emitter.mce_builder import (
make_data_platform_urn,
make_dataplatform_instance_urn,
@@ -56,8 +58,10 @@
DataPlatformInstanceClass,
DatasetPropertiesClass,
KafkaSchemaClass,
+ OwnershipSourceTypeClass,
SubTypesClass,
)
+from datahub.utilities.mapping import Constants, OperationProcessor
from datahub.utilities.registries.domain_registry import DomainRegistry
logger = logging.getLogger(__name__)
@@ -89,6 +93,29 @@ class KafkaSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):
default="datahub.ingestion.source.confluent_schema_registry.ConfluentSchemaRegistry",
description="The fully qualified implementation class(custom) that implements the KafkaSchemaRegistryBase interface.",
)
+ schema_tags_field = pydantic.Field(
+ default="tags",
+ description="The field name in the schema metadata that contains the tags to be added to the dataset.",
+ )
+ enable_meta_mapping = pydantic.Field(
+ default=True,
+ description="When enabled, applies the mappings that are defined through the meta_mapping directives.",
+ )
+ meta_mapping: Dict = pydantic.Field(
+ default={},
+ description="mapping rules that will be executed against top-level schema properties. Refer to the section below on meta automated mappings.",
+ )
+ field_meta_mapping: Dict = pydantic.Field(
+ default={},
+ description="mapping rules that will be executed against field-level schema properties. Refer to the section below on meta automated mappings.",
+ )
+ strip_user_ids_from_email: bool = pydantic.Field(
+ default=False,
+ description="Whether or not to strip email id while adding owners using meta mappings.",
+ )
+ tag_prefix: str = pydantic.Field(
+ default="", description="Prefix added to tags during ingestion."
+ )
ignore_warnings_on_schema_type: bool = pydantic.Field(
default=False,
description="Disables warnings reported for non-AVRO/Protobuf value or key schemas if set.",
@@ -167,6 +194,14 @@ def __init__(self, config: KafkaSourceConfig, ctx: PipelineContext):
graph=self.ctx.graph,
)
+ self.meta_processor = OperationProcessor(
+ self.source_config.meta_mapping,
+ self.source_config.tag_prefix,
+ OwnershipSourceTypeClass.SERVICE,
+ self.source_config.strip_user_ids_from_email,
+ match_nested_props=True,
+ )
+
def init_kafka_admin_client(self) -> None:
try:
# TODO: Do we require separate config than existing consumer_config ?
@@ -227,7 +262,6 @@ def _extract_record(
logger.debug(f"topic = {topic}")
AVRO = "AVRO"
- DOC_KEY = "doc"
# 1. Create the default dataset snapshot for the topic.
dataset_name = topic
@@ -261,8 +295,8 @@ def _extract_record(
topic, topic_detail, extra_topic_config
)
- # 4. Set dataset's description as top level doc, if topic schema type is avro
- description = None
+ # 4. Set dataset's description, tags, ownership, etc, if topic schema type is avro
+ description: Optional[str] = None
if (
schema_metadata is not None
and isinstance(schema_metadata.platformSchema, KafkaSchemaClass)
@@ -271,9 +305,41 @@ def _extract_record(
# Point to note:
# In Kafka documentSchema and keySchema both contains "doc" field.
# DataHub Dataset "description" field is mapped to documentSchema's "doc" field.
- schema = json.loads(schema_metadata.platformSchema.documentSchema)
- if isinstance(schema, dict):
- description = schema.get(DOC_KEY)
+
+ avro_schema = avro.schema.parse(
+ schema_metadata.platformSchema.documentSchema
+ )
+ description = avro_schema.doc
+ # set the tags
+ all_tags: List[str] = []
+ for tag in avro_schema.other_props.get(
+ self.source_config.schema_tags_field, []
+ ):
+ all_tags.append(self.source_config.tag_prefix + tag)
+
+ if self.source_config.enable_meta_mapping:
+ meta_aspects = self.meta_processor.process(avro_schema.other_props)
+
+ meta_owners_aspects = meta_aspects.get(Constants.ADD_OWNER_OPERATION)
+ if meta_owners_aspects:
+ dataset_snapshot.aspects.append(meta_owners_aspects)
+
+ meta_terms_aspect = meta_aspects.get(Constants.ADD_TERM_OPERATION)
+ if meta_terms_aspect:
+ dataset_snapshot.aspects.append(meta_terms_aspect)
+
+ # Create the tags aspect
+ meta_tags_aspect = meta_aspects.get(Constants.ADD_TAG_OPERATION)
+ if meta_tags_aspect:
+ all_tags += [
+ tag_association.tag[len("urn:li:tag:") :]
+ for tag_association in meta_tags_aspect.tags
+ ]
+
+ if all_tags:
+ dataset_snapshot.aspects.append(
+ mce_builder.make_global_tag_aspect_with_tag_list(all_tags)
+ )
dataset_properties = DatasetPropertiesClass(
name=topic, customProperties=custom_props, description=description
diff --git a/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py b/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py
index 8865254e885795..4fcef990ae4f43 100644
--- a/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py
+++ b/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py
@@ -269,7 +269,7 @@ def get_schema_fields_for_hive_column(
hive_column_name=hive_column_name, hive_column_type=hive_column_type
)
schema_fields = avro_schema_to_mce_fields(
- avro_schema_string=json.dumps(avro_schema_json),
+ avro_schema=json.dumps(avro_schema_json),
default_nullable=default_nullable,
swallow_exceptions=False,
)
diff --git a/metadata-ingestion/src/datahub/utilities/mapping.py b/metadata-ingestion/src/datahub/utilities/mapping.py
index 32666ceecdf85c..793eccfb22c7e8 100644
--- a/metadata-ingestion/src/datahub/utilities/mapping.py
+++ b/metadata-ingestion/src/datahub/utilities/mapping.py
@@ -1,6 +1,8 @@
import contextlib
import logging
+import operator
import re
+from functools import reduce
from typing import Any, Dict, List, Match, Optional, Union
from datahub.emitter import mce_builder
@@ -94,11 +96,13 @@ def __init__(
tag_prefix: str = "",
owner_source_type: Optional[str] = None,
strip_owner_email_id: bool = False,
+ match_nested_props: bool = False,
):
self.operation_defs = operation_defs
self.tag_prefix = tag_prefix
self.strip_owner_email_id = strip_owner_email_id
self.owner_source_type = owner_source_type
+ self.match_nested_props = match_nested_props
def process(self, raw_props: Dict[str, Any]) -> Dict[str, Any]:
# Defining the following local variables -
@@ -121,9 +125,18 @@ def process(self, raw_props: Dict[str, Any]) -> Dict[str, Any]:
)
if not operation_type or not operation_config:
continue
+ raw_props_value = raw_props.get(operation_key)
+ if not raw_props_value and self.match_nested_props:
+ try:
+ raw_props_value = reduce(
+ operator.getitem, operation_key.split("."), raw_props
+ )
+ except KeyError:
+ pass
+
maybe_match = self.get_match(
self.operation_defs[operation_key][Constants.MATCH],
- raw_props.get(operation_key),
+ raw_props_value,
)
if maybe_match is not None:
operation = self.get_operation_value(
diff --git a/metadata-ingestion/tests/integration/kafka/kafka_mces_golden.json b/metadata-ingestion/tests/integration/kafka/kafka_mces_golden.json
index e51eaa10b8b10c..7dd328168e84c0 100644
--- a/metadata-ingestion/tests/integration/kafka/kafka_mces_golden.json
+++ b/metadata-ingestion/tests/integration/kafka/kafka_mces_golden.json
@@ -86,7 +86,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
- "runId": "kafka-test"
+ "runId": "kafka-test",
+ "lastRunId": "no-run-id-provided"
}
},
{
@@ -103,7 +104,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
- "runId": "kafka-test"
+ "runId": "kafka-test",
+ "lastRunId": "no-run-id-provided"
}
},
{
@@ -118,7 +120,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
- "runId": "kafka-test"
+ "runId": "kafka-test",
+ "lastRunId": "no-run-id-provided"
}
},
{
@@ -144,10 +147,10 @@
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
- "hash": "cc452cf58242cdb9d09cf33d657497d8",
+ "hash": "a79a2fe3adab60b21d272a9cc3e93595",
"platformSchema": {
"com.linkedin.pegasus2avro.schema.KafkaSchema": {
- "documentSchema": "{\"type\":\"record\",\"name\":\"CreateUserRequest\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Value schema for kafka topic\",\"fields\":[{\"name\":\"email\",\"type\":\"string\"},{\"name\":\"firstName\",\"type\":\"string\"},{\"name\":\"lastName\",\"type\":\"string\"}]}",
+ "documentSchema": "{\"type\":\"record\",\"name\":\"CreateUserRequest\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Value schema for kafka topic\",\"fields\":[{\"name\":\"email\",\"type\":\"string\",\"tags\":[\"Email\"]},{\"name\":\"firstName\",\"type\":\"string\",\"tags\":[\"Name\"]},{\"name\":\"lastName\",\"type\":\"string\",\"tags\":[\"Name\"]}],\"tags\":[\"PII\"]}",
"documentSchemaType": "AVRO",
"keySchema": "{\"type\":\"record\",\"name\":\"UserKey\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Key schema for kafka topic\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"namespace\",\"type\":\"string\"}]}",
"keySchemaType": "AVRO"
@@ -188,7 +191,15 @@
},
"nativeDataType": "email",
"recursive": false,
- "isPartOfKey": false
+ "globalTags": {
+ "tags": [
+ {
+ "tag": "urn:li:tag:Email"
+ }
+ ]
+ },
+ "isPartOfKey": false,
+ "jsonProps": "{\"tags\": [\"Email\"]}"
},
{
"fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].firstName",
@@ -200,7 +211,15 @@
},
"nativeDataType": "firstName",
"recursive": false,
- "isPartOfKey": false
+ "globalTags": {
+ "tags": [
+ {
+ "tag": "urn:li:tag:Name"
+ }
+ ]
+ },
+ "isPartOfKey": false,
+ "jsonProps": "{\"tags\": [\"Name\"]}"
},
{
"fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].lastName",
@@ -212,7 +231,15 @@
},
"nativeDataType": "lastName",
"recursive": false,
- "isPartOfKey": false
+ "globalTags": {
+ "tags": [
+ {
+ "tag": "urn:li:tag:Name"
+ }
+ ]
+ },
+ "isPartOfKey": false,
+ "jsonProps": "{\"tags\": [\"Name\"]}"
}
]
}
@@ -224,6 +251,15 @@
]
}
},
+ {
+ "com.linkedin.pegasus2avro.common.GlobalTags": {
+ "tags": [
+ {
+ "tag": "urn:li:tag:PII"
+ }
+ ]
+ }
+ },
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
@@ -246,7 +282,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
- "runId": "kafka-test"
+ "runId": "kafka-test",
+ "lastRunId": "no-run-id-provided"
}
},
{
@@ -263,7 +300,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
- "runId": "kafka-test"
+ "runId": "kafka-test",
+ "lastRunId": "no-run-id-provided"
}
},
{
@@ -280,7 +318,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
- "runId": "kafka-test"
+ "runId": "kafka-test",
+ "lastRunId": "no-run-id-provided"
}
},
{
@@ -295,7 +334,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
- "runId": "kafka-test"
+ "runId": "kafka-test",
+ "lastRunId": "no-run-id-provided"
}
},
{
@@ -321,10 +361,10 @@
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
- "hash": "dc1cf32c2688cc3d2d27fe6e856f06d2",
+ "hash": "62c7c400ec5760797a59c45e59c2f2dc",
"platformSchema": {
"com.linkedin.pegasus2avro.schema.KafkaSchema": {
- "documentSchema": "{\"type\":\"record\",\"name\":\"CreateUserRequest\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Value schema for kafka topic\",\"fields\":[{\"name\":\"email\",\"type\":\"string\"},{\"name\":\"firstName\",\"type\":\"string\"},{\"name\":\"lastName\",\"type\":\"string\"}]}",
+ "documentSchema": "{\"type\":\"record\",\"name\":\"CreateUserRequest\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Value schema for kafka topic\",\"fields\":[{\"name\":\"email\",\"type\":\"string\",\"tags\":[\"Email\"]},{\"name\":\"firstName\",\"type\":\"string\",\"tags\":[\"Name\"]},{\"name\":\"lastName\",\"type\":\"string\",\"tags\":[\"Name\"]}],\"tags\":[\"PII\"]}",
"documentSchemaType": "AVRO",
"keySchema": "\"string\"",
"keySchemaType": "AVRO"
@@ -353,7 +393,15 @@
},
"nativeDataType": "email",
"recursive": false,
- "isPartOfKey": false
+ "globalTags": {
+ "tags": [
+ {
+ "tag": "urn:li:tag:Email"
+ }
+ ]
+ },
+ "isPartOfKey": false,
+ "jsonProps": "{\"tags\": [\"Email\"]}"
},
{
"fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].firstName",
@@ -365,7 +413,15 @@
},
"nativeDataType": "firstName",
"recursive": false,
- "isPartOfKey": false
+ "globalTags": {
+ "tags": [
+ {
+ "tag": "urn:li:tag:Name"
+ }
+ ]
+ },
+ "isPartOfKey": false,
+ "jsonProps": "{\"tags\": [\"Name\"]}"
},
{
"fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].lastName",
@@ -377,7 +433,15 @@
},
"nativeDataType": "lastName",
"recursive": false,
- "isPartOfKey": false
+ "globalTags": {
+ "tags": [
+ {
+ "tag": "urn:li:tag:Name"
+ }
+ ]
+ },
+ "isPartOfKey": false,
+ "jsonProps": "{\"tags\": [\"Name\"]}"
}
]
}
@@ -389,6 +453,15 @@
]
}
},
+ {
+ "com.linkedin.pegasus2avro.common.GlobalTags": {
+ "tags": [
+ {
+ "tag": "urn:li:tag:PII"
+ }
+ ]
+ }
+ },
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
@@ -411,7 +484,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
- "runId": "kafka-test"
+ "runId": "kafka-test",
+ "lastRunId": "no-run-id-provided"
}
},
{
@@ -428,7 +502,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
- "runId": "kafka-test"
+ "runId": "kafka-test",
+ "lastRunId": "no-run-id-provided"
}
},
{
@@ -443,7 +518,56 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
- "runId": "kafka-test"
+ "runId": "kafka-test",
+ "lastRunId": "no-run-id-provided"
+ }
+},
+{
+ "entityType": "tag",
+ "entityUrn": "urn:li:tag:Email",
+ "changeType": "UPSERT",
+ "aspectName": "tagKey",
+ "aspect": {
+ "json": {
+ "name": "Email"
+ }
+ },
+ "systemMetadata": {
+ "lastObserved": 1586847600000,
+ "runId": "kafka-test",
+ "lastRunId": "no-run-id-provided"
+ }
+},
+{
+ "entityType": "tag",
+ "entityUrn": "urn:li:tag:Name",
+ "changeType": "UPSERT",
+ "aspectName": "tagKey",
+ "aspect": {
+ "json": {
+ "name": "Name"
+ }
+ },
+ "systemMetadata": {
+ "lastObserved": 1586847600000,
+ "runId": "kafka-test",
+ "lastRunId": "no-run-id-provided"
+ }
+},
+{
+ "entityType": "tag",
+ "entityUrn": "urn:li:tag:PII",
+ "changeType": "UPSERT",
+ "aspectName": "tagKey",
+ "aspect": {
+ "json": {
+ "name": "PII"
+ }
+ },
+ "systemMetadata": {
+ "lastObserved": 1586847600000,
+ "runId": "kafka-test",
+ "lastRunId": "no-run-id-provided"
}
}
]
\ No newline at end of file
diff --git a/metadata-ingestion/tests/integration/kafka/value_schema.avsc b/metadata-ingestion/tests/integration/kafka/value_schema.avsc
index 788cb94c47a72b..8cb6c42cb03f45 100644
--- a/metadata-ingestion/tests/integration/kafka/value_schema.avsc
+++ b/metadata-ingestion/tests/integration/kafka/value_schema.avsc
@@ -3,18 +3,22 @@
"type": "record",
"name": "CreateUserRequest",
"doc": "Value schema for kafka topic",
+ "tags": ["PII"],
"fields": [
{
"name": "email",
- "type": "string"
+ "type": "string",
+ "tags": ["Email"]
},
{
"name": "firstName",
- "type": "string"
+ "type": "string",
+ "tags": ["Name"]
},
{
"name": "lastName",
- "type": "string"
+ "type": "string",
+ "tags": ["Name"]
}
]
}
diff --git a/metadata-ingestion/tests/unit/test_kafka_source.py b/metadata-ingestion/tests/unit/test_kafka_source.py
index b48ebf12ee37a1..603068780d0a7b 100644
--- a/metadata-ingestion/tests/unit/test_kafka_source.py
+++ b/metadata-ingestion/tests/unit/test_kafka_source.py
@@ -1,3 +1,4 @@
+import json
from itertools import chain
from typing import Dict, Optional, Tuple
from unittest.mock import MagicMock, patch
@@ -7,11 +8,17 @@
RegisteredSchema,
Schema,
)
+from freezegun import freeze_time
from datahub.emitter.mce_builder import (
+ OwnerType,
make_dataplatform_instance_urn,
make_dataset_urn,
make_dataset_urn_with_platform_instance,
+ make_global_tag_aspect_with_tag_list,
+ make_glossary_terms_aspect_from_urn_list,
+ make_owner_urn,
+ make_ownership_aspect_from_urn_list,
)
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.workunit import MetadataWorkUnit
@@ -20,7 +27,10 @@
from datahub.metadata.schema_classes import (
BrowsePathsClass,
DataPlatformInstanceClass,
+ GlobalTagsClass,
+ GlossaryTermsClass,
KafkaSchemaClass,
+ OwnershipClass,
SchemaMetadataClass,
)
@@ -521,3 +531,148 @@ def test_kafka_source_succeeds_with_describe_configs_error(
mock_admin_client_instance.describe_configs.assert_called_once()
assert len(workunits) == 2
+
+
+@freeze_time("2023-09-20 10:00:00")
+@patch(
+ "datahub.ingestion.source.confluent_schema_registry.SchemaRegistryClient",
+ autospec=True,
+)
+@patch("datahub.ingestion.source.kafka.confluent_kafka.Consumer", autospec=True)
+def test_kafka_source_topic_meta_mappings(
+ mock_kafka_consumer, mock_schema_registry_client, mock_admin_client
+):
+ # Setup the topic to key/value schema mappings for all types of schema registry subject name strategies.
+ # ,)
+ topic_subject_schema_map: Dict[str, Tuple[RegisteredSchema, RegisteredSchema]] = {
+ "topic1": (
+ RegisteredSchema(
+ schema_id="schema_id_2",
+ schema=Schema(
+ schema_str='{"type":"record", "name":"Topic1Key", "namespace": "test.acryl", "fields": [{"name":"t1key", "type": "string"}]}',
+ schema_type="AVRO",
+ ),
+ subject="topic1-key",
+ version=1,
+ ),
+ RegisteredSchema(
+ schema_id="schema_id_1",
+ schema=Schema(
+ schema_str=json.dumps(
+ {
+ "type": "record",
+ "name": "Topic1Value",
+ "namespace": "test.acryl",
+ "fields": [{"name": "t1value", "type": "string"}],
+ "owner": "@charles",
+ "business_owner": "jdoe.last@gmail.com",
+ "data_governance.team_owner": "Finance",
+ "has_pii": True,
+ "int_property": 1,
+ "double_property": 2.5,
+ }
+ ),
+ schema_type="AVRO",
+ ),
+ subject="topic1-value",
+ version=1,
+ ),
+ )
+ }
+
+ # Mock the kafka consumer
+ mock_kafka_instance = mock_kafka_consumer.return_value
+ mock_cluster_metadata = MagicMock()
+ mock_cluster_metadata.topics = {k: None for k in topic_subject_schema_map.keys()}
+ mock_kafka_instance.list_topics.return_value = mock_cluster_metadata
+
+ # Mock the schema registry client
+ # - mock get_subjects: all subjects in topic_subject_schema_map
+ mock_schema_registry_client.return_value.get_subjects.return_value = [
+ v.subject for v in chain(*topic_subject_schema_map.values())
+ ]
+
+ # - mock get_latest_version
+ def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]:
+ for registered_schema in chain(*topic_subject_schema_map.values()):
+ if registered_schema.subject == subject_name:
+ return registered_schema
+ return None
+
+ mock_schema_registry_client.return_value.get_latest_version = (
+ mock_get_latest_version
+ )
+
+ ctx = PipelineContext(run_id="test1")
+ kafka_source = KafkaSource.create(
+ {
+ "connection": {"bootstrap": "localhost:9092"},
+ "meta_mapping": {
+ "owner": {
+ "match": "^@(.*)",
+ "operation": "add_owner",
+ "config": {"owner_type": "user"},
+ },
+ "business_owner": {
+ "match": ".*",
+ "operation": "add_owner",
+ "config": {"owner_type": "user"},
+ },
+ "has_pii": {
+ "match": True,
+ "operation": "add_tag",
+ "config": {"tag": "has_pii_test"},
+ },
+ "int_property": {
+ "match": 1,
+ "operation": "add_tag",
+ "config": {"tag": "int_meta_property"},
+ },
+ "double_property": {
+ "match": 2.5,
+ "operation": "add_term",
+ "config": {"term": "double_meta_property"},
+ },
+ "data_governance.team_owner": {
+ "match": "Finance",
+ "operation": "add_term",
+ "config": {"term": "Finance_test"},
+ },
+ },
+ },
+ ctx,
+ )
+ workunits = [w for w in kafka_source.get_workunits()]
+ assert len(workunits) == 4
+ mce = workunits[0].metadata
+ assert isinstance(mce, MetadataChangeEvent)
+
+ ownership_aspect = [
+ asp for asp in mce.proposedSnapshot.aspects if isinstance(asp, OwnershipClass)
+ ][0]
+ assert ownership_aspect == make_ownership_aspect_from_urn_list(
+ [
+ make_owner_urn("charles", OwnerType.USER),
+ make_owner_urn("jdoe.last@gmail.com", OwnerType.USER),
+ ],
+ "SERVICE",
+ )
+
+ tags_aspect = [
+ asp for asp in mce.proposedSnapshot.aspects if isinstance(asp, GlobalTagsClass)
+ ][0]
+ assert tags_aspect == make_global_tag_aspect_with_tag_list(
+ ["has_pii_test", "int_meta_property"]
+ )
+
+ terms_aspect = [
+ asp
+ for asp in mce.proposedSnapshot.aspects
+ if isinstance(asp, GlossaryTermsClass)
+ ][0]
+ assert terms_aspect == make_glossary_terms_aspect_from_urn_list(
+ [
+ "urn:li:glossaryTerm:Finance_test",
+ "urn:li:glossaryTerm:double_meta_property",
+ ]
+ )
diff --git a/metadata-ingestion/tests/unit/test_mapping.py b/metadata-ingestion/tests/unit/test_mapping.py
index aea1d8ddd9a548..d69dd4a8a96b0d 100644
--- a/metadata-ingestion/tests/unit/test_mapping.py
+++ b/metadata-ingestion/tests/unit/test_mapping.py
@@ -231,3 +231,51 @@ def test_operation_processor_advanced_matching_tags():
tag_aspect: GlobalTagsClass = aspect_map["add_tag"]
assert len(tag_aspect.tags) == 1
assert tag_aspect.tags[0].tag == "urn:li:tag:case_4567"
+
+
+def test_operation_processor_matching_nested_props():
+ raw_props = {
+ "gdpr": {
+ "pii": True,
+ },
+ }
+ processor = OperationProcessor(
+ operation_defs={
+ "gdpr.pii": {
+ "match": True,
+ "operation": "add_tag",
+ "config": {"tag": "pii"},
+ },
+ },
+ owner_source_type="SOURCE_CONTROL",
+ match_nested_props=True,
+ )
+ aspect_map = processor.process(raw_props)
+ assert "add_tag" in aspect_map
+
+ tag_aspect: GlobalTagsClass = aspect_map["add_tag"]
+ assert len(tag_aspect.tags) == 1
+ assert tag_aspect.tags[0].tag == "urn:li:tag:pii"
+
+
+def test_operation_processor_matching_dot_props():
+ raw_props = {
+ "gdpr.pii": True,
+ }
+ processor = OperationProcessor(
+ operation_defs={
+ "gdpr.pii": {
+ "match": True,
+ "operation": "add_tag",
+ "config": {"tag": "pii"},
+ },
+ },
+ owner_source_type="SOURCE_CONTROL",
+ match_nested_props=True,
+ )
+ aspect_map = processor.process(raw_props)
+ assert "add_tag" in aspect_map
+
+ tag_aspect: GlobalTagsClass = aspect_map["add_tag"]
+ assert len(tag_aspect.tags) == 1
+ assert tag_aspect.tags[0].tag == "urn:li:tag:pii"
diff --git a/metadata-ingestion/tests/unit/test_schema_util.py b/metadata-ingestion/tests/unit/test_schema_util.py
index e81c335e178a2c..0a111d700cf8ce 100644
--- a/metadata-ingestion/tests/unit/test_schema_util.py
+++ b/metadata-ingestion/tests/unit/test_schema_util.py
@@ -6,7 +6,12 @@
from typing import Dict, List, Type
import pytest
+from freezegun import freeze_time
+from datahub.emitter.mce_builder import (
+ make_global_tag_aspect_with_tag_list,
+ make_glossary_terms_aspect_from_urn_list,
+)
from datahub.ingestion.extractor.schema_util import avro_schema_to_mce_fields
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
DateTypeClass,
@@ -15,6 +20,7 @@
StringTypeClass,
TimeTypeClass,
)
+from datahub.utilities.mapping import OperationProcessor
logger = logging.getLogger(__name__)
@@ -771,3 +777,106 @@ def test_ignore_exceptions():
"""
fields: List[SchemaField] = avro_schema_to_mce_fields(malformed_schema)
assert not fields
+
+
+@freeze_time("2023-09-12")
+def test_avro_schema_to_mce_fields_with_field_meta_mapping():
+ schema = """
+{
+ "type": "record",
+ "name": "Payment",
+ "namespace": "some.event.namespace",
+ "fields": [
+ {"name": "id", "type": "string"},
+ {"name": "amount", "type": "double", "doc": "amountDoc","has_pii": "False"},
+ {"name": "name","type": "string","default": "","has_pii": "True"},
+ {"name": "phoneNumber",
+ "type": [{
+ "type": "record",
+ "name": "PhoneNumber",
+ "doc": "testDoc",
+ "fields": [{
+ "name": "areaCode",
+ "type": "string",
+ "doc": "areaCodeDoc",
+ "default": ""
+ }, {
+ "name": "countryCode",
+ "type": "string",
+ "default": ""
+ }, {
+ "name": "prefix",
+ "type": "string",
+ "default": ""
+ }, {
+ "name": "number",
+ "type": "string",
+ "default": ""
+ }]
+ },
+ "null"
+ ],
+ "default": "null",
+ "has_pii": "True",
+ "glossary_field": "TERM_PhoneNumber"
+ },
+ {"name": "address",
+ "type": [{
+ "type": "record",
+ "name": "Address",
+ "fields": [{
+ "name": "street",
+ "type": "string",
+ "default": ""
+ }]
+ },
+ "null"
+ ],
+ "doc": "addressDoc",
+ "default": "null",
+ "has_pii": "True",
+ "glossary_field": "TERM_Address"
+ }
+ ]
+}
+"""
+ processor = OperationProcessor(
+ operation_defs={
+ "has_pii": {
+ "match": "True",
+ "operation": "add_tag",
+ "config": {"tag": "has_pii_test"},
+ },
+ "glossary_field": {
+ "match": "TERM_(.*)",
+ "operation": "add_term",
+ "config": {"term": "{{ $match }}"},
+ },
+ }
+ )
+ fields = avro_schema_to_mce_fields(schema, meta_mapping_processor=processor)
+ expected_field_paths = [
+ "[version=2.0].[type=Payment].[type=string].id",
+ "[version=2.0].[type=Payment].[type=double].amount",
+ "[version=2.0].[type=Payment].[type=string].name",
+ "[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber",
+ "[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].areaCode",
+ "[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].countryCode",
+ "[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].prefix",
+ "[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].number",
+ "[version=2.0].[type=Payment].[type=Address].address",
+ "[version=2.0].[type=Payment].[type=Address].address.[type=string].street",
+ ]
+ assert_field_paths_match(fields, expected_field_paths)
+
+ pii_tag_aspect = make_global_tag_aspect_with_tag_list(["has_pii_test"])
+ assert fields[1].globalTags is None
+ assert fields[2].globalTags == pii_tag_aspect
+ assert fields[3].globalTags == pii_tag_aspect
+ assert fields[3].glossaryTerms == make_glossary_terms_aspect_from_urn_list(
+ ["urn:li:glossaryTerm:PhoneNumber"]
+ )
+ assert fields[8].globalTags == pii_tag_aspect
+ assert fields[8].glossaryTerms == make_glossary_terms_aspect_from_urn_list(
+ ["urn:li:glossaryTerm:Address"]
+ )