diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py index 0f89324f5efc60..267f7cf0749099 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py @@ -1,5 +1,6 @@ from typing import List, Optional +from datahub.configuration.common import AllowDenyPattern from datahub.configuration.time_window_config import BucketDuration from datahub.ingestion.source.snowflake.constants import SnowflakeObjectDomain from datahub.ingestion.source.snowflake.snowflake_config import DEFAULT_TABLES_DENY_LIST @@ -551,6 +552,8 @@ def usage_per_object_per_time_bucket_for_time_window( use_base_objects: bool, top_n_queries: int, include_top_n_queries: bool, + email_domain: Optional[str], + email_filter: AllowDenyPattern, ) -> str: if not include_top_n_queries: top_n_queries = 0 @@ -561,6 +564,9 @@ def usage_per_object_per_time_bucket_for_time_window( objects_column = ( "BASE_OBJECTS_ACCESSED" if use_base_objects else "DIRECT_OBJECTS_ACCESSED" ) + email_filter_query = SnowflakeQuery.gen_email_filter_query(email_filter) + + email_domain = f"@{email_domain}" if email_domain else "" return f""" WITH object_access_history AS @@ -578,12 +584,16 @@ def usage_per_object_per_time_bucket_for_time_window( query_id, query_start_time, user_name, + NVL(USERS.email, CONCAT(user_name, '{email_domain}')) AS user_email, {objects_column} from snowflake.account_usage.access_history + LEFT JOIN + snowflake.account_usage.users USERS WHERE query_start_time >= to_timestamp_ltz({start_time_millis}, 3) AND query_start_time < to_timestamp_ltz({end_time_millis}, 3) + {email_filter_query} ) t, lateral flatten(input => t.{objects_column}) object @@ -705,6 +715,34 @@ def usage_per_object_per_time_bucket_for_time_window( basic_usage_counts.bucket_start_time """ + @staticmethod + def gen_email_filter_query(email_filter: AllowDenyPattern) -> str: + allow_filters = [] + allow_filter = "" + if len(email_filter.allow) == 1 and email_filter.allow[0] == ".*": + allow_filter = "" + else: + for allow_pattern in email_filter.allow: + allow_filters.append( + f"rlike(user_name, '{allow_pattern}','{'i' if email_filter.ignoreCase else 'c'}')" + ) + if allow_filters: + allow_filter = " OR ".join(allow_filters) + allow_filter = f"AND ({allow_filter})" + deny_filters = [] + deny_filter = "" + for deny_pattern in email_filter.deny: + deny_filters.append( + f"rlike(user_name, '{deny_pattern}','{'i' if email_filter.ignoreCase else 'c'}')" + ) + if deny_filters: + deny_filter = " OR ".join(deny_filters) + deny_filter = f"({deny_filter})" + email_filter_query = allow_filter + ( + " AND" + f" NOT {deny_filter}" if deny_filter else "" + ) + return email_filter_query + @staticmethod def table_upstreams_with_column_lineage( start_time_millis: int, diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py index 8f571313f18883..f75e994303954d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py @@ -214,6 +214,8 @@ def _get_workunits_internal( use_base_objects=self.config.apply_view_usage_to_tables, top_n_queries=self.config.top_n_queries, include_top_n_queries=self.config.include_top_n_queries, + email_domain=self.config.email_domain, + email_filter=self.config.user_email_pattern, ), ) except Exception as e: diff --git a/metadata-ingestion/tests/integration/snowflake/common.py b/metadata-ingestion/tests/integration/snowflake/common.py index 78e54996973119..b21cea5f0988d0 100644 --- a/metadata-ingestion/tests/integration/snowflake/common.py +++ b/metadata-ingestion/tests/integration/snowflake/common.py @@ -1,6 +1,7 @@ import json from datetime import datetime, timezone +from datahub.configuration.common import AllowDenyPattern from datahub.configuration.time_window_config import BucketDuration from datahub.ingestion.source.snowflake import snowflake_query from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery @@ -263,6 +264,8 @@ def default_query_results( # noqa: C901 top_n_queries=10, include_top_n_queries=True, time_bucket_size=BucketDuration.DAY, + email_domain=None, + email_filter=AllowDenyPattern.allow_all(), ) ): return [] diff --git a/metadata-ingestion/tests/unit/test_snowflake_source.py b/metadata-ingestion/tests/unit/test_snowflake_source.py index aaff878b81eeef..343f4466fd6fdf 100644 --- a/metadata-ingestion/tests/unit/test_snowflake_source.py +++ b/metadata-ingestion/tests/unit/test_snowflake_source.py @@ -3,6 +3,7 @@ import pytest from pydantic import ValidationError +from datahub.configuration.common import AllowDenyPattern from datahub.configuration.oauth import OAuthConfiguration from datahub.configuration.pattern_utils import UUID_REGEX from datahub.ingestion.api.source import SourceCapability @@ -16,6 +17,7 @@ SnowflakeV2Config, ) from datahub.ingestion.source.snowflake.snowflake_query import ( + SnowflakeQuery, create_deny_regex_sql_filter, ) from datahub.ingestion.source.snowflake.snowflake_usage_v2 import ( @@ -661,3 +663,44 @@ def test_snowflake_temporary_patterns_config_rename(): } ) assert conf.temporary_tables_pattern == [".*tmp.*"] + + +def test_email_filter_query_generation_with_one_deny(): + email_filter = AllowDenyPattern(deny=[".*@example.com"]) + filter_query = SnowflakeQuery.gen_email_filter_query(email_filter) + assert filter_query == " AND NOT (rlike(user_name, '.*@example.com','i'))" + + +def test_email_filter_query_generation_without_any_filter(): + email_filter = AllowDenyPattern() + filter_query = SnowflakeQuery.gen_email_filter_query(email_filter) + assert filter_query == "" + + +def test_email_filter_query_generation_one_allow(): + email_filter = AllowDenyPattern(allow=[".*@example.com"]) + filter_query = SnowflakeQuery.gen_email_filter_query(email_filter) + assert filter_query == "AND (rlike(user_name, '.*@example.com','i'))" + + +def test_email_filter_query_generation_one_allow_and_deny(): + email_filter = AllowDenyPattern( + allow=[".*@example.com", ".*@example2.com"], + deny=[".*@example2.com", ".*@example4.com"], + ) + filter_query = SnowflakeQuery.gen_email_filter_query(email_filter) + assert ( + filter_query + == "AND (rlike(user_name, '.*@example.com','i') OR rlike(user_name, '.*@example2.com','i')) AND NOT (rlike(user_name, '.*@example2.com','i') OR rlike(user_name, '.*@example4.com','i'))" + ) + + +def test_email_filter_query_generation_with_case_insensitive_filter(): + email_filter = AllowDenyPattern( + allow=[".*@example.com"], deny=[".*@example2.com"], ignoreCase=False + ) + filter_query = SnowflakeQuery.gen_email_filter_query(email_filter) + assert ( + filter_query + == "AND (rlike(user_name, '.*@example.com','c')) AND NOT (rlike(user_name, '.*@example2.com','c'))" + )