From 5c40390a923a2a44f23290e4f1a2168820993fca Mon Sep 17 00:00:00 2001 From: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com> Date: Sat, 23 Sep 2023 05:41:42 +0530 Subject: [PATCH] feat(ingest/kafka): support metadata mapping from kafka avro schemas (#8825) Co-authored-by: Daniel Messias Co-authored-by: Deepankarkr Co-authored-by: Harshal Sheth --- .../docs/sources/kafka/kafka.md | 83 +++++++++ .../ingestion/extractor/schema_util.py | 92 ++++++++-- .../source/confluent_schema_registry.py | 22 ++- .../src/datahub/ingestion/source/kafka.py | 78 ++++++++- .../datahub/utilities/hive_schema_to_avro.py | 2 +- .../src/datahub/utilities/mapping.py | 15 +- .../integration/kafka/kafka_mces_golden.json | 164 +++++++++++++++--- .../tests/integration/kafka/value_schema.avsc | 10 +- .../tests/unit/test_kafka_source.py | 155 +++++++++++++++++ metadata-ingestion/tests/unit/test_mapping.py | 48 +++++ .../tests/unit/test_schema_util.py | 109 ++++++++++++ 11 files changed, 730 insertions(+), 48 deletions(-) diff --git a/metadata-ingestion/docs/sources/kafka/kafka.md b/metadata-ingestion/docs/sources/kafka/kafka.md index 2e8baa9516d17c..9fdfc3a3af1d02 100644 --- a/metadata-ingestion/docs/sources/kafka/kafka.md +++ b/metadata-ingestion/docs/sources/kafka/kafka.md @@ -130,3 +130,86 @@ message MessageWithMap { repeated Map1Entry map_1 = 1; } ``` + +### Enriching DataHub metadata with automated meta mapping + +:::note +Meta mapping is currently only available for Avro schemas +::: + +Avro schemas are permitted to have additional attributes not defined by the specification as arbitrary metadata. A common pattern is to utilize this for business metadata. The Kafka source has the ability to transform this directly into DataHub Owners, Tags and Terms. + +#### Simple tags + +If you simply have a list of tags embedded into an Avro schema (either at the top-level or for an individual field), you can use the `schema_tags_field` config. + +Example Avro schema: + +```json +{ + "name": "sampleRecord", + "type": "record", + "tags": ["tag1", "tag2"], + "fields": [{ + "name": "field_1", + "type": "string", + "tags": ["tag3", "tag4"] + }] +} +``` + +The name of the field containing a list of tags can be configured with the `schema_tags_field` property: + +```yaml +config: + schema_tags_field: tags +``` + +#### meta mapping + +You can also map specific Avro fields into Owners, Tags and Terms using meta +mapping. + +Example Avro schema: + +```json +{ + "name": "sampleRecord", + "type": "record", + "owning_team": "@Data-Science", + "data_tier": "Bronze", + "fields": [{ + "name": "field_1", + "type": "string", + "gdpr": { + "pii": true + } + }] +} +``` + +This can be mapped to DataHub metadata with `meta_mapping` config: + +```yaml +config: + meta_mapping: + owning_team: + match: "^@(.*)" + operation: "add_owner" + config: + owner_type: group + data_tier: + match: "Bronze|Silver|Gold" + operation: "add_term" + config: + term: "{{ $match }}" + field_meta_mapping: + gdpr.pii: + match: true + operation: "add_tag" + config: + tag: "pii" +``` + +The underlying implementation is similar to [dbt meta mapping](https://datahubproject.io/docs/generated/ingestion/sources/dbt#dbt-meta-automated-mappings), which has more detailed examples that can be used for reference. + diff --git a/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py b/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py index 75de18e9037eea..4acf99a50e50ed 100644 --- a/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py +++ b/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py @@ -4,6 +4,7 @@ import avro.schema +from datahub.emitter import mce_builder from datahub.metadata.com.linkedin.pegasus2avro.schema import ( ArrayTypeClass, BooleanTypeClass, @@ -21,7 +22,7 @@ TimeTypeClass, UnionTypeClass, ) -from datahub.metadata.schema_classes import GlobalTagsClass, TagAssociationClass +from datahub.utilities.mapping import Constants, OperationProcessor """A helper file for Avro schema -> MCE schema transformations""" @@ -98,7 +99,14 @@ class AvroToMceSchemaConverter: "uuid": StringTypeClass, } - def __init__(self, is_key_schema: bool, default_nullable: bool = False) -> None: + def __init__( + self, + is_key_schema: bool, + default_nullable: bool = False, + meta_mapping_processor: Optional[OperationProcessor] = None, + schema_tags_field: Optional[str] = None, + tag_prefix: Optional[str] = None, + ) -> None: # Tracks the prefix name stack for nested name generation. self._prefix_name_stack: PrefixNameStack = [self.version_string] # Tracks the fields on the current path. @@ -112,6 +120,10 @@ def __init__(self, is_key_schema: bool, default_nullable: bool = False) -> None: if is_key_schema: # Helps maintain backwards-compatibility. Annotation for any field that is part of key-schema. self._prefix_name_stack.append("[key=True]") + # Meta mapping + self._meta_mapping_processor = meta_mapping_processor + self._schema_tags_field = schema_tags_field + self._tag_prefix = tag_prefix # Map of avro schema type to the conversion handler self._avro_type_to_mce_converter_map: Dict[ avro.schema.Schema, @@ -317,7 +329,25 @@ def emit(self) -> Generator[SchemaField, None, None]: merged_props.update(self._schema.other_props) merged_props.update(schema.other_props) - tags = None + # Parse meta_mapping + meta_aspects: Dict[str, Any] = {} + if self._converter._meta_mapping_processor: + meta_aspects = self._converter._meta_mapping_processor.process( + merged_props + ) + + tags: List[str] = [] + if self._converter._schema_tags_field: + for tag in merged_props.get(self._converter._schema_tags_field, []): + tags.append(self._converter._tag_prefix + tag) + + meta_tags_aspect = meta_aspects.get(Constants.ADD_TAG_OPERATION) + if meta_tags_aspect: + tags += [ + tag_association.tag[len("urn:li:tag:") :] + for tag_association in meta_tags_aspect.tags + ] + if "deprecated" in merged_props: description = ( f"DEPRECATED: {merged_props['deprecated']}\n" @@ -325,9 +355,13 @@ def emit(self) -> Generator[SchemaField, None, None]: if description else "" ) - tags = GlobalTagsClass( - tags=[TagAssociationClass(tag="urn:li:tag:Deprecated")] - ) + tags.append("Deprecated") + + tags_aspect = None + if tags: + tags_aspect = mce_builder.make_global_tag_aspect_with_tag_list(tags) + + meta_terms_aspect = meta_aspects.get(Constants.ADD_TERM_OPERATION) logical_type_name: Optional[str] = ( # logicalType nested inside type @@ -349,7 +383,8 @@ def emit(self) -> Generator[SchemaField, None, None]: recursive=False, nullable=self._converter._is_nullable(schema), isPartOfKey=self._converter._is_key_schema, - globalTags=tags, + globalTags=tags_aspect, + glossaryTerms=meta_terms_aspect, jsonProps=json.dumps(merged_props) if merged_props else None, ) yield field @@ -447,7 +482,9 @@ def _gen_from_non_field_nested_schemas( actual_schema = self._get_underlying_type_if_option_as_union(schema, schema) with AvroToMceSchemaConverter.SchemaFieldEmissionContextManager( - schema, actual_schema, self + schema, + actual_schema, + self, ) as fe_schema: if isinstance( actual_schema, @@ -478,7 +515,9 @@ def _gen_non_nested_to_mce_fields( ) -> Generator[SchemaField, None, None]: """Handles generation of MCE SchemaFields for non-nested AVRO types.""" with AvroToMceSchemaConverter.SchemaFieldEmissionContextManager( - schema, schema, self + schema, + schema, + self, ) as non_nested_emitter: yield from non_nested_emitter.emit() @@ -496,9 +535,12 @@ def _to_mce_fields( @classmethod def to_mce_fields( cls, - avro_schema_string: str, + avro_schema: avro.schema.Schema, is_key_schema: bool, default_nullable: bool = False, + meta_mapping_processor: Optional[OperationProcessor] = None, + schema_tags_field: Optional[str] = None, + tag_prefix: Optional[str] = None, ) -> Generator[SchemaField, None, None]: """ Converts a key or value type AVRO schema string to appropriate MCE SchemaFields. @@ -506,8 +548,14 @@ def to_mce_fields( :param is_key_schema: True if it is a key-schema. :return: An MCE SchemaField generator. """ - avro_schema = avro.schema.parse(avro_schema_string) - converter = cls(is_key_schema, default_nullable) + # avro_schema = avro.schema.parse(avro_schema) + converter = cls( + is_key_schema, + default_nullable, + meta_mapping_processor, + schema_tags_field, + tag_prefix, + ) yield from converter._to_mce_fields(avro_schema) @@ -516,28 +564,40 @@ def to_mce_fields( def avro_schema_to_mce_fields( - avro_schema_string: str, + avro_schema: Union[avro.schema.Schema, str], is_key_schema: bool = False, default_nullable: bool = False, + meta_mapping_processor: Optional[OperationProcessor] = None, + schema_tags_field: Optional[str] = None, + tag_prefix: Optional[str] = None, swallow_exceptions: bool = True, ) -> List[SchemaField]: """ Converts an avro schema into schema fields compatible with MCE. - :param avro_schema_string: String representation of the AVRO schema. + :param avro_schema: AVRO schema, either as a string or as an avro.schema.Schema object. :param is_key_schema: True if it is a key-schema. Default is False (value-schema). :param swallow_exceptions: True if the caller wants exceptions to be suppressed + :param action_processor: Optional OperationProcessor to be used for meta mappings :return: The list of MCE compatible SchemaFields. """ try: + if isinstance(avro_schema, str): + avro_schema = avro.schema.parse(avro_schema) + return list( AvroToMceSchemaConverter.to_mce_fields( - avro_schema_string, is_key_schema, default_nullable + avro_schema, + is_key_schema, + default_nullable, + meta_mapping_processor, + schema_tags_field, + tag_prefix, ) ) except Exception: if swallow_exceptions: - logger.exception(f"Failed to parse {avro_schema_string} into mce fields.") + logger.exception(f"Failed to parse {avro_schema} into mce fields.") return [] else: raise diff --git a/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py b/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py index 0bdcb115b377c8..54475cb509621d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py +++ b/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py @@ -4,6 +4,7 @@ from hashlib import md5 from typing import Any, List, Optional, Set, Tuple +import avro.schema import jsonref from confluent_kafka.schema_registry.schema_registry_client import ( RegisteredSchema, @@ -22,6 +23,8 @@ SchemaField, SchemaMetadata, ) +from datahub.metadata.schema_classes import OwnershipSourceTypeClass +from datahub.utilities.mapping import OperationProcessor logger = logging.getLogger(__name__) @@ -59,6 +62,14 @@ def __init__( except Exception as e: logger.warning(f"Failed to get subjects from schema registry: {e}") + self.field_meta_processor = OperationProcessor( + self.source_config.field_meta_mapping, + self.source_config.tag_prefix, + OwnershipSourceTypeClass.SERVICE, + self.source_config.strip_user_ids_from_email, + match_nested_props=True, + ) + @classmethod def create( cls, source_config: KafkaSourceConfig, report: KafkaSourceReport @@ -290,10 +301,19 @@ def _get_schema_fields( fields: List[SchemaField] = [] if schema.schema_type == "AVRO": cleaned_str: str = self.get_schema_str_replace_confluent_ref_avro(schema) + avro_schema = avro.schema.parse(cleaned_str) + # "value.id" or "value.[type=string]id" fields = schema_util.avro_schema_to_mce_fields( - cleaned_str, is_key_schema=is_key_schema + avro_schema, + is_key_schema=is_key_schema, + meta_mapping_processor=self.field_meta_processor + if self.source_config.enable_meta_mapping + else None, + schema_tags_field=self.source_config.schema_tags_field, + tag_prefix=self.source_config.tag_prefix, ) + elif schema.schema_type == "PROTOBUF": imported_schemas: List[ ProtobufSchema diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka.py index 61f6103347eb37..566304e1999b79 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka.py @@ -5,6 +5,7 @@ from enum import Enum from typing import Any, Dict, Iterable, List, Optional, Type +import avro.schema import confluent_kafka import confluent_kafka.admin import pydantic @@ -18,6 +19,7 @@ from datahub.configuration.common import AllowDenyPattern from datahub.configuration.kafka import KafkaConsumerConnectionConfig from datahub.configuration.source_common import DatasetSourceConfigMixin +from datahub.emitter import mce_builder from datahub.emitter.mce_builder import ( make_data_platform_urn, make_dataplatform_instance_urn, @@ -56,8 +58,10 @@ DataPlatformInstanceClass, DatasetPropertiesClass, KafkaSchemaClass, + OwnershipSourceTypeClass, SubTypesClass, ) +from datahub.utilities.mapping import Constants, OperationProcessor from datahub.utilities.registries.domain_registry import DomainRegistry logger = logging.getLogger(__name__) @@ -89,6 +93,29 @@ class KafkaSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin): default="datahub.ingestion.source.confluent_schema_registry.ConfluentSchemaRegistry", description="The fully qualified implementation class(custom) that implements the KafkaSchemaRegistryBase interface.", ) + schema_tags_field = pydantic.Field( + default="tags", + description="The field name in the schema metadata that contains the tags to be added to the dataset.", + ) + enable_meta_mapping = pydantic.Field( + default=True, + description="When enabled, applies the mappings that are defined through the meta_mapping directives.", + ) + meta_mapping: Dict = pydantic.Field( + default={}, + description="mapping rules that will be executed against top-level schema properties. Refer to the section below on meta automated mappings.", + ) + field_meta_mapping: Dict = pydantic.Field( + default={}, + description="mapping rules that will be executed against field-level schema properties. Refer to the section below on meta automated mappings.", + ) + strip_user_ids_from_email: bool = pydantic.Field( + default=False, + description="Whether or not to strip email id while adding owners using meta mappings.", + ) + tag_prefix: str = pydantic.Field( + default="", description="Prefix added to tags during ingestion." + ) ignore_warnings_on_schema_type: bool = pydantic.Field( default=False, description="Disables warnings reported for non-AVRO/Protobuf value or key schemas if set.", @@ -167,6 +194,14 @@ def __init__(self, config: KafkaSourceConfig, ctx: PipelineContext): graph=self.ctx.graph, ) + self.meta_processor = OperationProcessor( + self.source_config.meta_mapping, + self.source_config.tag_prefix, + OwnershipSourceTypeClass.SERVICE, + self.source_config.strip_user_ids_from_email, + match_nested_props=True, + ) + def init_kafka_admin_client(self) -> None: try: # TODO: Do we require separate config than existing consumer_config ? @@ -227,7 +262,6 @@ def _extract_record( logger.debug(f"topic = {topic}") AVRO = "AVRO" - DOC_KEY = "doc" # 1. Create the default dataset snapshot for the topic. dataset_name = topic @@ -261,8 +295,8 @@ def _extract_record( topic, topic_detail, extra_topic_config ) - # 4. Set dataset's description as top level doc, if topic schema type is avro - description = None + # 4. Set dataset's description, tags, ownership, etc, if topic schema type is avro + description: Optional[str] = None if ( schema_metadata is not None and isinstance(schema_metadata.platformSchema, KafkaSchemaClass) @@ -271,9 +305,41 @@ def _extract_record( # Point to note: # In Kafka documentSchema and keySchema both contains "doc" field. # DataHub Dataset "description" field is mapped to documentSchema's "doc" field. - schema = json.loads(schema_metadata.platformSchema.documentSchema) - if isinstance(schema, dict): - description = schema.get(DOC_KEY) + + avro_schema = avro.schema.parse( + schema_metadata.platformSchema.documentSchema + ) + description = avro_schema.doc + # set the tags + all_tags: List[str] = [] + for tag in avro_schema.other_props.get( + self.source_config.schema_tags_field, [] + ): + all_tags.append(self.source_config.tag_prefix + tag) + + if self.source_config.enable_meta_mapping: + meta_aspects = self.meta_processor.process(avro_schema.other_props) + + meta_owners_aspects = meta_aspects.get(Constants.ADD_OWNER_OPERATION) + if meta_owners_aspects: + dataset_snapshot.aspects.append(meta_owners_aspects) + + meta_terms_aspect = meta_aspects.get(Constants.ADD_TERM_OPERATION) + if meta_terms_aspect: + dataset_snapshot.aspects.append(meta_terms_aspect) + + # Create the tags aspect + meta_tags_aspect = meta_aspects.get(Constants.ADD_TAG_OPERATION) + if meta_tags_aspect: + all_tags += [ + tag_association.tag[len("urn:li:tag:") :] + for tag_association in meta_tags_aspect.tags + ] + + if all_tags: + dataset_snapshot.aspects.append( + mce_builder.make_global_tag_aspect_with_tag_list(all_tags) + ) dataset_properties = DatasetPropertiesClass( name=topic, customProperties=custom_props, description=description diff --git a/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py b/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py index 8865254e885795..4fcef990ae4f43 100644 --- a/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py +++ b/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py @@ -269,7 +269,7 @@ def get_schema_fields_for_hive_column( hive_column_name=hive_column_name, hive_column_type=hive_column_type ) schema_fields = avro_schema_to_mce_fields( - avro_schema_string=json.dumps(avro_schema_json), + avro_schema=json.dumps(avro_schema_json), default_nullable=default_nullable, swallow_exceptions=False, ) diff --git a/metadata-ingestion/src/datahub/utilities/mapping.py b/metadata-ingestion/src/datahub/utilities/mapping.py index 32666ceecdf85c..793eccfb22c7e8 100644 --- a/metadata-ingestion/src/datahub/utilities/mapping.py +++ b/metadata-ingestion/src/datahub/utilities/mapping.py @@ -1,6 +1,8 @@ import contextlib import logging +import operator import re +from functools import reduce from typing import Any, Dict, List, Match, Optional, Union from datahub.emitter import mce_builder @@ -94,11 +96,13 @@ def __init__( tag_prefix: str = "", owner_source_type: Optional[str] = None, strip_owner_email_id: bool = False, + match_nested_props: bool = False, ): self.operation_defs = operation_defs self.tag_prefix = tag_prefix self.strip_owner_email_id = strip_owner_email_id self.owner_source_type = owner_source_type + self.match_nested_props = match_nested_props def process(self, raw_props: Dict[str, Any]) -> Dict[str, Any]: # Defining the following local variables - @@ -121,9 +125,18 @@ def process(self, raw_props: Dict[str, Any]) -> Dict[str, Any]: ) if not operation_type or not operation_config: continue + raw_props_value = raw_props.get(operation_key) + if not raw_props_value and self.match_nested_props: + try: + raw_props_value = reduce( + operator.getitem, operation_key.split("."), raw_props + ) + except KeyError: + pass + maybe_match = self.get_match( self.operation_defs[operation_key][Constants.MATCH], - raw_props.get(operation_key), + raw_props_value, ) if maybe_match is not None: operation = self.get_operation_value( diff --git a/metadata-ingestion/tests/integration/kafka/kafka_mces_golden.json b/metadata-ingestion/tests/integration/kafka/kafka_mces_golden.json index e51eaa10b8b10c..7dd328168e84c0 100644 --- a/metadata-ingestion/tests/integration/kafka/kafka_mces_golden.json +++ b/metadata-ingestion/tests/integration/kafka/kafka_mces_golden.json @@ -86,7 +86,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "kafka-test" + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" } }, { @@ -103,7 +104,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "kafka-test" + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" } }, { @@ -118,7 +120,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "kafka-test" + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" } }, { @@ -144,10 +147,10 @@ "time": 0, "actor": "urn:li:corpuser:unknown" }, - "hash": "cc452cf58242cdb9d09cf33d657497d8", + "hash": "a79a2fe3adab60b21d272a9cc3e93595", "platformSchema": { "com.linkedin.pegasus2avro.schema.KafkaSchema": { - "documentSchema": "{\"type\":\"record\",\"name\":\"CreateUserRequest\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Value schema for kafka topic\",\"fields\":[{\"name\":\"email\",\"type\":\"string\"},{\"name\":\"firstName\",\"type\":\"string\"},{\"name\":\"lastName\",\"type\":\"string\"}]}", + "documentSchema": "{\"type\":\"record\",\"name\":\"CreateUserRequest\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Value schema for kafka topic\",\"fields\":[{\"name\":\"email\",\"type\":\"string\",\"tags\":[\"Email\"]},{\"name\":\"firstName\",\"type\":\"string\",\"tags\":[\"Name\"]},{\"name\":\"lastName\",\"type\":\"string\",\"tags\":[\"Name\"]}],\"tags\":[\"PII\"]}", "documentSchemaType": "AVRO", "keySchema": "{\"type\":\"record\",\"name\":\"UserKey\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Key schema for kafka topic\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"namespace\",\"type\":\"string\"}]}", "keySchemaType": "AVRO" @@ -188,7 +191,15 @@ }, "nativeDataType": "email", "recursive": false, - "isPartOfKey": false + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Email" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Email\"]}" }, { "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].firstName", @@ -200,7 +211,15 @@ }, "nativeDataType": "firstName", "recursive": false, - "isPartOfKey": false + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Name\"]}" }, { "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].lastName", @@ -212,7 +231,15 @@ }, "nativeDataType": "lastName", "recursive": false, - "isPartOfKey": false + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Name\"]}" } ] } @@ -224,6 +251,15 @@ ] } }, + { + "com.linkedin.pegasus2avro.common.GlobalTags": { + "tags": [ + { + "tag": "urn:li:tag:PII" + } + ] + } + }, { "com.linkedin.pegasus2avro.dataset.DatasetProperties": { "customProperties": { @@ -246,7 +282,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "kafka-test" + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" } }, { @@ -263,7 +300,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "kafka-test" + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" } }, { @@ -280,7 +318,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "kafka-test" + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" } }, { @@ -295,7 +334,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "kafka-test" + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" } }, { @@ -321,10 +361,10 @@ "time": 0, "actor": "urn:li:corpuser:unknown" }, - "hash": "dc1cf32c2688cc3d2d27fe6e856f06d2", + "hash": "62c7c400ec5760797a59c45e59c2f2dc", "platformSchema": { "com.linkedin.pegasus2avro.schema.KafkaSchema": { - "documentSchema": "{\"type\":\"record\",\"name\":\"CreateUserRequest\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Value schema for kafka topic\",\"fields\":[{\"name\":\"email\",\"type\":\"string\"},{\"name\":\"firstName\",\"type\":\"string\"},{\"name\":\"lastName\",\"type\":\"string\"}]}", + "documentSchema": "{\"type\":\"record\",\"name\":\"CreateUserRequest\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Value schema for kafka topic\",\"fields\":[{\"name\":\"email\",\"type\":\"string\",\"tags\":[\"Email\"]},{\"name\":\"firstName\",\"type\":\"string\",\"tags\":[\"Name\"]},{\"name\":\"lastName\",\"type\":\"string\",\"tags\":[\"Name\"]}],\"tags\":[\"PII\"]}", "documentSchemaType": "AVRO", "keySchema": "\"string\"", "keySchemaType": "AVRO" @@ -353,7 +393,15 @@ }, "nativeDataType": "email", "recursive": false, - "isPartOfKey": false + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Email" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Email\"]}" }, { "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].firstName", @@ -365,7 +413,15 @@ }, "nativeDataType": "firstName", "recursive": false, - "isPartOfKey": false + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Name\"]}" }, { "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].lastName", @@ -377,7 +433,15 @@ }, "nativeDataType": "lastName", "recursive": false, - "isPartOfKey": false + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Name\"]}" } ] } @@ -389,6 +453,15 @@ ] } }, + { + "com.linkedin.pegasus2avro.common.GlobalTags": { + "tags": [ + { + "tag": "urn:li:tag:PII" + } + ] + } + }, { "com.linkedin.pegasus2avro.dataset.DatasetProperties": { "customProperties": { @@ -411,7 +484,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "kafka-test" + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" } }, { @@ -428,7 +502,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "kafka-test" + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" } }, { @@ -443,7 +518,56 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "kafka-test" + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:Email", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "Email" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:Name", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "Name" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:PII", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "PII" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/kafka/value_schema.avsc b/metadata-ingestion/tests/integration/kafka/value_schema.avsc index 788cb94c47a72b..8cb6c42cb03f45 100644 --- a/metadata-ingestion/tests/integration/kafka/value_schema.avsc +++ b/metadata-ingestion/tests/integration/kafka/value_schema.avsc @@ -3,18 +3,22 @@ "type": "record", "name": "CreateUserRequest", "doc": "Value schema for kafka topic", + "tags": ["PII"], "fields": [ { "name": "email", - "type": "string" + "type": "string", + "tags": ["Email"] }, { "name": "firstName", - "type": "string" + "type": "string", + "tags": ["Name"] }, { "name": "lastName", - "type": "string" + "type": "string", + "tags": ["Name"] } ] } diff --git a/metadata-ingestion/tests/unit/test_kafka_source.py b/metadata-ingestion/tests/unit/test_kafka_source.py index b48ebf12ee37a1..603068780d0a7b 100644 --- a/metadata-ingestion/tests/unit/test_kafka_source.py +++ b/metadata-ingestion/tests/unit/test_kafka_source.py @@ -1,3 +1,4 @@ +import json from itertools import chain from typing import Dict, Optional, Tuple from unittest.mock import MagicMock, patch @@ -7,11 +8,17 @@ RegisteredSchema, Schema, ) +from freezegun import freeze_time from datahub.emitter.mce_builder import ( + OwnerType, make_dataplatform_instance_urn, make_dataset_urn, make_dataset_urn_with_platform_instance, + make_global_tag_aspect_with_tag_list, + make_glossary_terms_aspect_from_urn_list, + make_owner_urn, + make_ownership_aspect_from_urn_list, ) from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.workunit import MetadataWorkUnit @@ -20,7 +27,10 @@ from datahub.metadata.schema_classes import ( BrowsePathsClass, DataPlatformInstanceClass, + GlobalTagsClass, + GlossaryTermsClass, KafkaSchemaClass, + OwnershipClass, SchemaMetadataClass, ) @@ -521,3 +531,148 @@ def test_kafka_source_succeeds_with_describe_configs_error( mock_admin_client_instance.describe_configs.assert_called_once() assert len(workunits) == 2 + + +@freeze_time("2023-09-20 10:00:00") +@patch( + "datahub.ingestion.source.confluent_schema_registry.SchemaRegistryClient", + autospec=True, +) +@patch("datahub.ingestion.source.kafka.confluent_kafka.Consumer", autospec=True) +def test_kafka_source_topic_meta_mappings( + mock_kafka_consumer, mock_schema_registry_client, mock_admin_client +): + # Setup the topic to key/value schema mappings for all types of schema registry subject name strategies. + # ,) + topic_subject_schema_map: Dict[str, Tuple[RegisteredSchema, RegisteredSchema]] = { + "topic1": ( + RegisteredSchema( + schema_id="schema_id_2", + schema=Schema( + schema_str='{"type":"record", "name":"Topic1Key", "namespace": "test.acryl", "fields": [{"name":"t1key", "type": "string"}]}', + schema_type="AVRO", + ), + subject="topic1-key", + version=1, + ), + RegisteredSchema( + schema_id="schema_id_1", + schema=Schema( + schema_str=json.dumps( + { + "type": "record", + "name": "Topic1Value", + "namespace": "test.acryl", + "fields": [{"name": "t1value", "type": "string"}], + "owner": "@charles", + "business_owner": "jdoe.last@gmail.com", + "data_governance.team_owner": "Finance", + "has_pii": True, + "int_property": 1, + "double_property": 2.5, + } + ), + schema_type="AVRO", + ), + subject="topic1-value", + version=1, + ), + ) + } + + # Mock the kafka consumer + mock_kafka_instance = mock_kafka_consumer.return_value + mock_cluster_metadata = MagicMock() + mock_cluster_metadata.topics = {k: None for k in topic_subject_schema_map.keys()} + mock_kafka_instance.list_topics.return_value = mock_cluster_metadata + + # Mock the schema registry client + # - mock get_subjects: all subjects in topic_subject_schema_map + mock_schema_registry_client.return_value.get_subjects.return_value = [ + v.subject for v in chain(*topic_subject_schema_map.values()) + ] + + # - mock get_latest_version + def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]: + for registered_schema in chain(*topic_subject_schema_map.values()): + if registered_schema.subject == subject_name: + return registered_schema + return None + + mock_schema_registry_client.return_value.get_latest_version = ( + mock_get_latest_version + ) + + ctx = PipelineContext(run_id="test1") + kafka_source = KafkaSource.create( + { + "connection": {"bootstrap": "localhost:9092"}, + "meta_mapping": { + "owner": { + "match": "^@(.*)", + "operation": "add_owner", + "config": {"owner_type": "user"}, + }, + "business_owner": { + "match": ".*", + "operation": "add_owner", + "config": {"owner_type": "user"}, + }, + "has_pii": { + "match": True, + "operation": "add_tag", + "config": {"tag": "has_pii_test"}, + }, + "int_property": { + "match": 1, + "operation": "add_tag", + "config": {"tag": "int_meta_property"}, + }, + "double_property": { + "match": 2.5, + "operation": "add_term", + "config": {"term": "double_meta_property"}, + }, + "data_governance.team_owner": { + "match": "Finance", + "operation": "add_term", + "config": {"term": "Finance_test"}, + }, + }, + }, + ctx, + ) + workunits = [w for w in kafka_source.get_workunits()] + assert len(workunits) == 4 + mce = workunits[0].metadata + assert isinstance(mce, MetadataChangeEvent) + + ownership_aspect = [ + asp for asp in mce.proposedSnapshot.aspects if isinstance(asp, OwnershipClass) + ][0] + assert ownership_aspect == make_ownership_aspect_from_urn_list( + [ + make_owner_urn("charles", OwnerType.USER), + make_owner_urn("jdoe.last@gmail.com", OwnerType.USER), + ], + "SERVICE", + ) + + tags_aspect = [ + asp for asp in mce.proposedSnapshot.aspects if isinstance(asp, GlobalTagsClass) + ][0] + assert tags_aspect == make_global_tag_aspect_with_tag_list( + ["has_pii_test", "int_meta_property"] + ) + + terms_aspect = [ + asp + for asp in mce.proposedSnapshot.aspects + if isinstance(asp, GlossaryTermsClass) + ][0] + assert terms_aspect == make_glossary_terms_aspect_from_urn_list( + [ + "urn:li:glossaryTerm:Finance_test", + "urn:li:glossaryTerm:double_meta_property", + ] + ) diff --git a/metadata-ingestion/tests/unit/test_mapping.py b/metadata-ingestion/tests/unit/test_mapping.py index aea1d8ddd9a548..d69dd4a8a96b0d 100644 --- a/metadata-ingestion/tests/unit/test_mapping.py +++ b/metadata-ingestion/tests/unit/test_mapping.py @@ -231,3 +231,51 @@ def test_operation_processor_advanced_matching_tags(): tag_aspect: GlobalTagsClass = aspect_map["add_tag"] assert len(tag_aspect.tags) == 1 assert tag_aspect.tags[0].tag == "urn:li:tag:case_4567" + + +def test_operation_processor_matching_nested_props(): + raw_props = { + "gdpr": { + "pii": True, + }, + } + processor = OperationProcessor( + operation_defs={ + "gdpr.pii": { + "match": True, + "operation": "add_tag", + "config": {"tag": "pii"}, + }, + }, + owner_source_type="SOURCE_CONTROL", + match_nested_props=True, + ) + aspect_map = processor.process(raw_props) + assert "add_tag" in aspect_map + + tag_aspect: GlobalTagsClass = aspect_map["add_tag"] + assert len(tag_aspect.tags) == 1 + assert tag_aspect.tags[0].tag == "urn:li:tag:pii" + + +def test_operation_processor_matching_dot_props(): + raw_props = { + "gdpr.pii": True, + } + processor = OperationProcessor( + operation_defs={ + "gdpr.pii": { + "match": True, + "operation": "add_tag", + "config": {"tag": "pii"}, + }, + }, + owner_source_type="SOURCE_CONTROL", + match_nested_props=True, + ) + aspect_map = processor.process(raw_props) + assert "add_tag" in aspect_map + + tag_aspect: GlobalTagsClass = aspect_map["add_tag"] + assert len(tag_aspect.tags) == 1 + assert tag_aspect.tags[0].tag == "urn:li:tag:pii" diff --git a/metadata-ingestion/tests/unit/test_schema_util.py b/metadata-ingestion/tests/unit/test_schema_util.py index e81c335e178a2c..0a111d700cf8ce 100644 --- a/metadata-ingestion/tests/unit/test_schema_util.py +++ b/metadata-ingestion/tests/unit/test_schema_util.py @@ -6,7 +6,12 @@ from typing import Dict, List, Type import pytest +from freezegun import freeze_time +from datahub.emitter.mce_builder import ( + make_global_tag_aspect_with_tag_list, + make_glossary_terms_aspect_from_urn_list, +) from datahub.ingestion.extractor.schema_util import avro_schema_to_mce_fields from datahub.metadata.com.linkedin.pegasus2avro.schema import ( DateTypeClass, @@ -15,6 +20,7 @@ StringTypeClass, TimeTypeClass, ) +from datahub.utilities.mapping import OperationProcessor logger = logging.getLogger(__name__) @@ -771,3 +777,106 @@ def test_ignore_exceptions(): """ fields: List[SchemaField] = avro_schema_to_mce_fields(malformed_schema) assert not fields + + +@freeze_time("2023-09-12") +def test_avro_schema_to_mce_fields_with_field_meta_mapping(): + schema = """ +{ + "type": "record", + "name": "Payment", + "namespace": "some.event.namespace", + "fields": [ + {"name": "id", "type": "string"}, + {"name": "amount", "type": "double", "doc": "amountDoc","has_pii": "False"}, + {"name": "name","type": "string","default": "","has_pii": "True"}, + {"name": "phoneNumber", + "type": [{ + "type": "record", + "name": "PhoneNumber", + "doc": "testDoc", + "fields": [{ + "name": "areaCode", + "type": "string", + "doc": "areaCodeDoc", + "default": "" + }, { + "name": "countryCode", + "type": "string", + "default": "" + }, { + "name": "prefix", + "type": "string", + "default": "" + }, { + "name": "number", + "type": "string", + "default": "" + }] + }, + "null" + ], + "default": "null", + "has_pii": "True", + "glossary_field": "TERM_PhoneNumber" + }, + {"name": "address", + "type": [{ + "type": "record", + "name": "Address", + "fields": [{ + "name": "street", + "type": "string", + "default": "" + }] + }, + "null" + ], + "doc": "addressDoc", + "default": "null", + "has_pii": "True", + "glossary_field": "TERM_Address" + } + ] +} +""" + processor = OperationProcessor( + operation_defs={ + "has_pii": { + "match": "True", + "operation": "add_tag", + "config": {"tag": "has_pii_test"}, + }, + "glossary_field": { + "match": "TERM_(.*)", + "operation": "add_term", + "config": {"term": "{{ $match }}"}, + }, + } + ) + fields = avro_schema_to_mce_fields(schema, meta_mapping_processor=processor) + expected_field_paths = [ + "[version=2.0].[type=Payment].[type=string].id", + "[version=2.0].[type=Payment].[type=double].amount", + "[version=2.0].[type=Payment].[type=string].name", + "[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber", + "[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].areaCode", + "[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].countryCode", + "[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].prefix", + "[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].number", + "[version=2.0].[type=Payment].[type=Address].address", + "[version=2.0].[type=Payment].[type=Address].address.[type=string].street", + ] + assert_field_paths_match(fields, expected_field_paths) + + pii_tag_aspect = make_global_tag_aspect_with_tag_list(["has_pii_test"]) + assert fields[1].globalTags is None + assert fields[2].globalTags == pii_tag_aspect + assert fields[3].globalTags == pii_tag_aspect + assert fields[3].glossaryTerms == make_glossary_terms_aspect_from_urn_list( + ["urn:li:glossaryTerm:PhoneNumber"] + ) + assert fields[8].globalTags == pii_tag_aspect + assert fields[8].glossaryTerms == make_glossary_terms_aspect_from_urn_list( + ["urn:li:glossaryTerm:Address"] + )