Skip to content

Commit

Permalink
fix(ingest/snowflake): Apply email filter on all usage metrics (datah…
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored Nov 23, 2023
1 parent 37ea292 commit f794a90
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import List, Optional

from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.time_window_config import BucketDuration
from datahub.ingestion.source.snowflake.constants import SnowflakeObjectDomain
from datahub.ingestion.source.snowflake.snowflake_config import DEFAULT_TABLES_DENY_LIST
Expand Down Expand Up @@ -551,6 +552,8 @@ def usage_per_object_per_time_bucket_for_time_window(
use_base_objects: bool,
top_n_queries: int,
include_top_n_queries: bool,
email_domain: Optional[str],
email_filter: AllowDenyPattern,
) -> str:
if not include_top_n_queries:
top_n_queries = 0
Expand All @@ -561,6 +564,9 @@ def usage_per_object_per_time_bucket_for_time_window(
objects_column = (
"BASE_OBJECTS_ACCESSED" if use_base_objects else "DIRECT_OBJECTS_ACCESSED"
)
email_filter_query = SnowflakeQuery.gen_email_filter_query(email_filter)

email_domain = f"@{email_domain}" if email_domain else ""

return f"""
WITH object_access_history AS
Expand All @@ -578,12 +584,16 @@ def usage_per_object_per_time_bucket_for_time_window(
query_id,
query_start_time,
user_name,
NVL(USERS.email, CONCAT(user_name, '{email_domain}')) AS user_email,
{objects_column}
from
snowflake.account_usage.access_history
LEFT JOIN
snowflake.account_usage.users USERS
WHERE
query_start_time >= to_timestamp_ltz({start_time_millis}, 3)
AND query_start_time < to_timestamp_ltz({end_time_millis}, 3)
{email_filter_query}
)
t,
lateral flatten(input => t.{objects_column}) object
Expand Down Expand Up @@ -705,6 +715,34 @@ def usage_per_object_per_time_bucket_for_time_window(
basic_usage_counts.bucket_start_time
"""

@staticmethod
def gen_email_filter_query(email_filter: AllowDenyPattern) -> str:
allow_filters = []
allow_filter = ""
if len(email_filter.allow) == 1 and email_filter.allow[0] == ".*":
allow_filter = ""
else:
for allow_pattern in email_filter.allow:
allow_filters.append(
f"rlike(user_name, '{allow_pattern}','{'i' if email_filter.ignoreCase else 'c'}')"
)
if allow_filters:
allow_filter = " OR ".join(allow_filters)
allow_filter = f"AND ({allow_filter})"
deny_filters = []
deny_filter = ""
for deny_pattern in email_filter.deny:
deny_filters.append(
f"rlike(user_name, '{deny_pattern}','{'i' if email_filter.ignoreCase else 'c'}')"
)
if deny_filters:
deny_filter = " OR ".join(deny_filters)
deny_filter = f"({deny_filter})"
email_filter_query = allow_filter + (
" AND" + f" NOT {deny_filter}" if deny_filter else ""
)
return email_filter_query

@staticmethod
def table_upstreams_with_column_lineage(
start_time_millis: int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ def _get_workunits_internal(
use_base_objects=self.config.apply_view_usage_to_tables,
top_n_queries=self.config.top_n_queries,
include_top_n_queries=self.config.include_top_n_queries,
email_domain=self.config.email_domain,
email_filter=self.config.user_email_pattern,
),
)
except Exception as e:
Expand Down
3 changes: 3 additions & 0 deletions metadata-ingestion/tests/integration/snowflake/common.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
from datetime import datetime, timezone

from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.time_window_config import BucketDuration
from datahub.ingestion.source.snowflake import snowflake_query
from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery
Expand Down Expand Up @@ -263,6 +264,8 @@ def default_query_results( # noqa: C901
top_n_queries=10,
include_top_n_queries=True,
time_bucket_size=BucketDuration.DAY,
email_domain=None,
email_filter=AllowDenyPattern.allow_all(),
)
):
return []
Expand Down
43 changes: 43 additions & 0 deletions metadata-ingestion/tests/unit/test_snowflake_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest
from pydantic import ValidationError

from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.oauth import OAuthConfiguration
from datahub.configuration.pattern_utils import UUID_REGEX
from datahub.ingestion.api.source import SourceCapability
Expand All @@ -16,6 +17,7 @@
SnowflakeV2Config,
)
from datahub.ingestion.source.snowflake.snowflake_query import (
SnowflakeQuery,
create_deny_regex_sql_filter,
)
from datahub.ingestion.source.snowflake.snowflake_usage_v2 import (
Expand Down Expand Up @@ -661,3 +663,44 @@ def test_snowflake_temporary_patterns_config_rename():
}
)
assert conf.temporary_tables_pattern == [".*tmp.*"]


def test_email_filter_query_generation_with_one_deny():
email_filter = AllowDenyPattern(deny=[".*@example.com"])
filter_query = SnowflakeQuery.gen_email_filter_query(email_filter)
assert filter_query == " AND NOT (rlike(user_name, '.*@example.com','i'))"


def test_email_filter_query_generation_without_any_filter():
email_filter = AllowDenyPattern()
filter_query = SnowflakeQuery.gen_email_filter_query(email_filter)
assert filter_query == ""


def test_email_filter_query_generation_one_allow():
email_filter = AllowDenyPattern(allow=[".*@example.com"])
filter_query = SnowflakeQuery.gen_email_filter_query(email_filter)
assert filter_query == "AND (rlike(user_name, '.*@example.com','i'))"


def test_email_filter_query_generation_one_allow_and_deny():
email_filter = AllowDenyPattern(
allow=[".*@example.com", ".*@example2.com"],
deny=[".*@example2.com", ".*@example4.com"],
)
filter_query = SnowflakeQuery.gen_email_filter_query(email_filter)
assert (
filter_query
== "AND (rlike(user_name, '.*@example.com','i') OR rlike(user_name, '.*@example2.com','i')) AND NOT (rlike(user_name, '.*@example2.com','i') OR rlike(user_name, '.*@example4.com','i'))"
)


def test_email_filter_query_generation_with_case_insensitive_filter():
email_filter = AllowDenyPattern(
allow=[".*@example.com"], deny=[".*@example2.com"], ignoreCase=False
)
filter_query = SnowflakeQuery.gen_email_filter_query(email_filter)
assert (
filter_query
== "AND (rlike(user_name, '.*@example.com','c')) AND NOT (rlike(user_name, '.*@example2.com','c'))"
)

0 comments on commit f794a90

Please sign in to comment.