Skip to content

Commit

Permalink
feat(ingest/kafka): support metadata mapping from kafka avro schemas (d…
Browse files Browse the repository at this point in the history
…atahub-project#8825)

Co-authored-by: Daniel Messias <[email protected]>
Co-authored-by: Deepankarkr <[email protected]>
Co-authored-by: Harshal Sheth <[email protected]>
  • Loading branch information
4 people authored Sep 23, 2023
1 parent 5bb9f30 commit 5c40390
Show file tree
Hide file tree
Showing 11 changed files with 730 additions and 48 deletions.
83 changes: 83 additions & 0 deletions metadata-ingestion/docs/sources/kafka/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,86 @@ message MessageWithMap {
repeated Map1Entry map_1 = 1;
}
```
### Enriching DataHub metadata with automated meta mapping
:::note
Meta mapping is currently only available for Avro schemas
:::
Avro schemas are permitted to have additional attributes not defined by the specification as arbitrary metadata. A common pattern is to utilize this for business metadata. The Kafka source has the ability to transform this directly into DataHub Owners, Tags and Terms.
#### Simple tags
If you simply have a list of tags embedded into an Avro schema (either at the top-level or for an individual field), you can use the `schema_tags_field` config.
Example Avro schema:
```json
{
"name": "sampleRecord",
"type": "record",
"tags": ["tag1", "tag2"],
"fields": [{
"name": "field_1",
"type": "string",
"tags": ["tag3", "tag4"]
}]
}
```

The name of the field containing a list of tags can be configured with the `schema_tags_field` property:

```yaml
config:
schema_tags_field: tags
```
#### meta mapping
You can also map specific Avro fields into Owners, Tags and Terms using meta
mapping.
Example Avro schema:
```json
{
"name": "sampleRecord",
"type": "record",
"owning_team": "@Data-Science",
"data_tier": "Bronze",
"fields": [{
"name": "field_1",
"type": "string",
"gdpr": {
"pii": true
}
}]
}
```

This can be mapped to DataHub metadata with `meta_mapping` config:

```yaml
config:
meta_mapping:
owning_team:
match: "^@(.*)"
operation: "add_owner"
config:
owner_type: group
data_tier:
match: "Bronze|Silver|Gold"
operation: "add_term"
config:
term: "{{ $match }}"
field_meta_mapping:
gdpr.pii:
match: true
operation: "add_tag"
config:
tag: "pii"
```
The underlying implementation is similar to [dbt meta mapping](https://datahubproject.io/docs/generated/ingestion/sources/dbt#dbt-meta-automated-mappings), which has more detailed examples that can be used for reference.
92 changes: 76 additions & 16 deletions metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import avro.schema

from datahub.emitter import mce_builder
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
ArrayTypeClass,
BooleanTypeClass,
Expand All @@ -21,7 +22,7 @@
TimeTypeClass,
UnionTypeClass,
)
from datahub.metadata.schema_classes import GlobalTagsClass, TagAssociationClass
from datahub.utilities.mapping import Constants, OperationProcessor

"""A helper file for Avro schema -> MCE schema transformations"""

Expand Down Expand Up @@ -98,7 +99,14 @@ class AvroToMceSchemaConverter:
"uuid": StringTypeClass,
}

def __init__(self, is_key_schema: bool, default_nullable: bool = False) -> None:
def __init__(
self,
is_key_schema: bool,
default_nullable: bool = False,
meta_mapping_processor: Optional[OperationProcessor] = None,
schema_tags_field: Optional[str] = None,
tag_prefix: Optional[str] = None,
) -> None:
# Tracks the prefix name stack for nested name generation.
self._prefix_name_stack: PrefixNameStack = [self.version_string]
# Tracks the fields on the current path.
Expand All @@ -112,6 +120,10 @@ def __init__(self, is_key_schema: bool, default_nullable: bool = False) -> None:
if is_key_schema:
# Helps maintain backwards-compatibility. Annotation for any field that is part of key-schema.
self._prefix_name_stack.append("[key=True]")
# Meta mapping
self._meta_mapping_processor = meta_mapping_processor
self._schema_tags_field = schema_tags_field
self._tag_prefix = tag_prefix
# Map of avro schema type to the conversion handler
self._avro_type_to_mce_converter_map: Dict[
avro.schema.Schema,
Expand Down Expand Up @@ -317,17 +329,39 @@ def emit(self) -> Generator[SchemaField, None, None]:
merged_props.update(self._schema.other_props)
merged_props.update(schema.other_props)

tags = None
# Parse meta_mapping
meta_aspects: Dict[str, Any] = {}
if self._converter._meta_mapping_processor:
meta_aspects = self._converter._meta_mapping_processor.process(
merged_props
)

tags: List[str] = []
if self._converter._schema_tags_field:
for tag in merged_props.get(self._converter._schema_tags_field, []):
tags.append(self._converter._tag_prefix + tag)

meta_tags_aspect = meta_aspects.get(Constants.ADD_TAG_OPERATION)
if meta_tags_aspect:
tags += [
tag_association.tag[len("urn:li:tag:") :]
for tag_association in meta_tags_aspect.tags
]

if "deprecated" in merged_props:
description = (
f"<span style=\"color:red\">DEPRECATED: {merged_props['deprecated']}</span>\n"
+ description
if description
else ""
)
tags = GlobalTagsClass(
tags=[TagAssociationClass(tag="urn:li:tag:Deprecated")]
)
tags.append("Deprecated")

tags_aspect = None
if tags:
tags_aspect = mce_builder.make_global_tag_aspect_with_tag_list(tags)

meta_terms_aspect = meta_aspects.get(Constants.ADD_TERM_OPERATION)

logical_type_name: Optional[str] = (
# logicalType nested inside type
Expand All @@ -349,7 +383,8 @@ def emit(self) -> Generator[SchemaField, None, None]:
recursive=False,
nullable=self._converter._is_nullable(schema),
isPartOfKey=self._converter._is_key_schema,
globalTags=tags,
globalTags=tags_aspect,
glossaryTerms=meta_terms_aspect,
jsonProps=json.dumps(merged_props) if merged_props else None,
)
yield field
Expand Down Expand Up @@ -447,7 +482,9 @@ def _gen_from_non_field_nested_schemas(
actual_schema = self._get_underlying_type_if_option_as_union(schema, schema)

with AvroToMceSchemaConverter.SchemaFieldEmissionContextManager(
schema, actual_schema, self
schema,
actual_schema,
self,
) as fe_schema:
if isinstance(
actual_schema,
Expand Down Expand Up @@ -478,7 +515,9 @@ def _gen_non_nested_to_mce_fields(
) -> Generator[SchemaField, None, None]:
"""Handles generation of MCE SchemaFields for non-nested AVRO types."""
with AvroToMceSchemaConverter.SchemaFieldEmissionContextManager(
schema, schema, self
schema,
schema,
self,
) as non_nested_emitter:
yield from non_nested_emitter.emit()

Expand All @@ -496,18 +535,27 @@ def _to_mce_fields(
@classmethod
def to_mce_fields(
cls,
avro_schema_string: str,
avro_schema: avro.schema.Schema,
is_key_schema: bool,
default_nullable: bool = False,
meta_mapping_processor: Optional[OperationProcessor] = None,
schema_tags_field: Optional[str] = None,
tag_prefix: Optional[str] = None,
) -> Generator[SchemaField, None, None]:
"""
Converts a key or value type AVRO schema string to appropriate MCE SchemaFields.
:param avro_schema_string: String representation of the AVRO schema.
:param is_key_schema: True if it is a key-schema.
:return: An MCE SchemaField generator.
"""
avro_schema = avro.schema.parse(avro_schema_string)
converter = cls(is_key_schema, default_nullable)
# avro_schema = avro.schema.parse(avro_schema)
converter = cls(
is_key_schema,
default_nullable,
meta_mapping_processor,
schema_tags_field,
tag_prefix,
)
yield from converter._to_mce_fields(avro_schema)


Expand All @@ -516,28 +564,40 @@ def to_mce_fields(


def avro_schema_to_mce_fields(
avro_schema_string: str,
avro_schema: Union[avro.schema.Schema, str],
is_key_schema: bool = False,
default_nullable: bool = False,
meta_mapping_processor: Optional[OperationProcessor] = None,
schema_tags_field: Optional[str] = None,
tag_prefix: Optional[str] = None,
swallow_exceptions: bool = True,
) -> List[SchemaField]:
"""
Converts an avro schema into schema fields compatible with MCE.
:param avro_schema_string: String representation of the AVRO schema.
:param avro_schema: AVRO schema, either as a string or as an avro.schema.Schema object.
:param is_key_schema: True if it is a key-schema. Default is False (value-schema).
:param swallow_exceptions: True if the caller wants exceptions to be suppressed
:param action_processor: Optional OperationProcessor to be used for meta mappings
:return: The list of MCE compatible SchemaFields.
"""

try:
if isinstance(avro_schema, str):
avro_schema = avro.schema.parse(avro_schema)

return list(
AvroToMceSchemaConverter.to_mce_fields(
avro_schema_string, is_key_schema, default_nullable
avro_schema,
is_key_schema,
default_nullable,
meta_mapping_processor,
schema_tags_field,
tag_prefix,
)
)
except Exception:
if swallow_exceptions:
logger.exception(f"Failed to parse {avro_schema_string} into mce fields.")
logger.exception(f"Failed to parse {avro_schema} into mce fields.")
return []
else:
raise
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from hashlib import md5
from typing import Any, List, Optional, Set, Tuple

import avro.schema
import jsonref
from confluent_kafka.schema_registry.schema_registry_client import (
RegisteredSchema,
Expand All @@ -22,6 +23,8 @@
SchemaField,
SchemaMetadata,
)
from datahub.metadata.schema_classes import OwnershipSourceTypeClass
from datahub.utilities.mapping import OperationProcessor

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -59,6 +62,14 @@ def __init__(
except Exception as e:
logger.warning(f"Failed to get subjects from schema registry: {e}")

self.field_meta_processor = OperationProcessor(
self.source_config.field_meta_mapping,
self.source_config.tag_prefix,
OwnershipSourceTypeClass.SERVICE,
self.source_config.strip_user_ids_from_email,
match_nested_props=True,
)

@classmethod
def create(
cls, source_config: KafkaSourceConfig, report: KafkaSourceReport
Expand Down Expand Up @@ -290,10 +301,19 @@ def _get_schema_fields(
fields: List[SchemaField] = []
if schema.schema_type == "AVRO":
cleaned_str: str = self.get_schema_str_replace_confluent_ref_avro(schema)
avro_schema = avro.schema.parse(cleaned_str)

# "value.id" or "value.[type=string]id"
fields = schema_util.avro_schema_to_mce_fields(
cleaned_str, is_key_schema=is_key_schema
avro_schema,
is_key_schema=is_key_schema,
meta_mapping_processor=self.field_meta_processor
if self.source_config.enable_meta_mapping
else None,
schema_tags_field=self.source_config.schema_tags_field,
tag_prefix=self.source_config.tag_prefix,
)

elif schema.schema_type == "PROTOBUF":
imported_schemas: List[
ProtobufSchema
Expand Down
Loading

0 comments on commit 5c40390

Please sign in to comment.