Skip to content

Commit

Permalink
PROD-2486 Add Dynamic Erasure Email Integration (#5226)
Browse files Browse the repository at this point in the history
Co-authored-by: Adrian Galvan <adrian@ethyca.com>
  • Loading branch information
erosselli and galvana authored Sep 10, 2024
1 parent 77dabdd commit 2e03b9b
Showing 40 changed files with 3,647 additions and 202 deletions.
26 changes: 26 additions & 0 deletions data/dataset/postgres_example_custom_request_field_dataset.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
dataset:
- fides_key: postgres_example_custom_request_field_dataset
data_categories: []
description: Postgres example dataset with a custom request field
name: Postgrex Example Custom Request Field Dataset
collections:
- name: dynamic_email_address_config
fields:
- name: id
data_categories: [system.operations]
fides_meta:
data_type: string
primary_key: True
- name: email_address
data_categories: [system.operations]
fides_meta:
data_type: string
- name: vendor_name
data_categories: [system.operations]
fides_meta:
data_type: string
- name: site_id
data_categories: [system.operations]
fides_meta:
data_type: string
custom_request_field: tenant_id
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@ types-defusedxml==0.7.0.20240218
expandvars==0.9.0
fastapi[all]==0.111.0
fastapi-pagination[sqlalchemy]==0.12.25
fideslang==3.0.2
fideslang==3.0.3
fideslog==1.2.10
firebase-admin==5.3.0
GitPython==3.1.41
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
"""Add dynamic erasure email connector type
Revision ID: 9de4bb76307a
Revises: a249a089f23b
Create Date: 2024-09-10 11:36:35.020140
"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "9de4bb76307a"
down_revision = "a249a089f23b"
branch_labels = None
depends_on = None


def upgrade():
# Add 'dynamic_erasure_email' to ConnectionType enum
op.execute("ALTER TYPE connectiontype RENAME TO connectiontype_old")
op.execute(
"""
CREATE TYPE connectiontype AS ENUM (
'mongodb',
'mysql',
'https',
'snowflake',
'redshift',
'mssql',
'mariadb',
'bigquery',
'saas',
'manual',
'manual_webhook',
'timescale',
'fides',
'sovrn',
'attentive',
'dynamodb',
'postgres',
'generic_consent_email',
'generic_erasure_email',
'scylla',
's3',
'google_cloud_sql_mysql',
'google_cloud_sql_postgres',
'dynamic_erasure_email'
)
"""
)
op.execute(
"""
ALTER TABLE connectionconfig ALTER COLUMN connection_type TYPE connectiontype USING
connection_type::text::connectiontype
"""
)
op.execute("DROP TYPE connectiontype_old")


def downgrade():
# Remove 'dynamic_erasure_email' from ConnectionType enum
op.execute(
"DELETE FROM connectionconfig WHERE connection_type IN ('dynamic_erasure_email')"
)
op.execute("ALTER TYPE connectiontype RENAME TO connectiontype_old")
op.execute(
"""
CREATE TYPE connectiontype AS ENUM (
'mongodb',
'mysql',
'https',
'snowflake',
'redshift',
'mssql',
'mariadb',
'bigquery',
'saas',
'manual',
'manual_webhook',
'timescale',
'fides',
'sovrn',
'attentive',
'dynamodb',
'postgres',
'generic_consent_email',
'generic_erasure_email',
'scylla',
's3',
'google_cloud_sql_mysql',
'google_cloud_sql_postgres'
)
"""
)
op.execute(
"""
ALTER TABLE connectionconfig ALTER COLUMN connection_type TYPE connectiontype USING
connection_type::text::connectiontype
"""
)
op.execute("DROP TYPE connectiontype_old")
27 changes: 27 additions & 0 deletions src/fides/api/graph/config.py
Original file line number Diff line number Diff line change
@@ -262,6 +262,8 @@ class Field(BaseModel, ABC):
return_all_elements: Optional[bool] = None
# Should field be returned by query if it is in an entrypoint array field, or just if it matches query?

custom_request_field: Optional[str] = None

"""Known type of held data"""
length: Optional[int] = None
"""Known length of held data"""
@@ -404,6 +406,7 @@ def generate_field(
sub_fields: List[Field],
return_all_elements: Optional[bool],
read_only: Optional[bool],
custom_request_field: Optional[str],
) -> Field:
"""Generate a graph field."""

@@ -427,6 +430,7 @@ def generate_field(
is_array=is_array,
return_all_elements=return_all_elements,
read_only=read_only,
custom_request_field=custom_request_field,
)


@@ -493,6 +497,29 @@ def identities(self) -> Dict[FieldPath, str]:
if field.identity
}

def custom_request_fields(self) -> Dict[FieldPath, str]:
"""
Return custom request fields included in the table,
i.e fields whose values may come in a custom request field on a DSR.
E.g if the collection is defined like:
- name: publishers
- fields:
- name: id
fides_meta:
identity: true
- name: site_id
fides_meta:
custom_request_field: tenant_id
Then this returns a dictionary of the form {FieldPath("site_id"): "tenant_id"}
"""
return {
field_path: field.custom_request_field
for field_path, field in self.field_dict.items()
if field.custom_request_field
}

def field(self, field_path: FieldPath) -> Optional[Field]:
"""Return Field (looked up by FieldPath) if on Collection or None if not found"""
return self.field_dict[field_path] if field_path in self.field_dict else None
11 changes: 11 additions & 0 deletions src/fides/api/graph/traversal.py
Original file line number Diff line number Diff line change
@@ -380,6 +380,16 @@ def edge_ends_with_collection(_edge: Edge) -> bool:
[{','.join([str(tn.address) for tn in running_node_queue.data])}]"""
)

# Remove nodes that have custom request fields, since we don't care if these are reachable or not.
# They will be used independently by the Dynamic Email Erasure Connector.
# TODO: ideally we'll update the Traversal code to include these "custom request field datasets"
# as part of the main graph. This is a targeted workaround for now.
remaining_node_keys = set(
key
for key in remaining_node_keys
if not self.traversal_node_dict[key].node.collection.custom_request_fields()
)

# error if there are nodes that have not been visited
if remaining_node_keys:
logger.error(
@@ -389,6 +399,7 @@ def edge_ends_with_collection(_edge: Edge) -> bool:
raise TraversalError(
f"Some nodes were not reachable: {','.join([str(x) for x in remaining_node_keys])}"
)

# error if there are edges that have not been visited
if remaining_edges:
logger.error(
2 changes: 2 additions & 0 deletions src/fides/api/models/connectionconfig.py
Original file line number Diff line number Diff line change
@@ -41,6 +41,7 @@ class ConnectionType(enum.Enum):
fides = "fides"
generic_consent_email = "generic_consent_email" # Run after the traversal
generic_erasure_email = "generic_erasure_email" # Run after the traversal
dynamic_erasure_email = "dynamic_erasure_email" # Run after the traversal
google_cloud_sql_mysql = "google_cloud_sql_mysql"
google_cloud_sql_postgres = "google_cloud_sql_postgres"
https = "https"
@@ -67,6 +68,7 @@ def human_readable(self) -> str:
readable_mapping: Dict[str, str] = {
ConnectionType.attentive.value: "Attentive",
ConnectionType.bigquery.value: "BigQuery",
ConnectionType.dynamic_erasure_email.value: "Dynamic Erasure Email",
ConnectionType.dynamodb.value: "DynamoDB",
ConnectionType.fides.value: "Fides Connector",
ConnectionType.generic_consent_email.value: "Generic Consent Email",
5 changes: 5 additions & 0 deletions src/fides/api/models/datasetconfig.py
Original file line number Diff line number Diff line change
@@ -204,10 +204,14 @@ def to_graph_field(
length = None
data_type_name = None
read_only = None
custom_request_field = None

if meta_section:
identity = meta_section.identity
if meta_section.primary_key:
is_pk = meta_section.primary_key
if meta_section.custom_request_field:
custom_request_field = meta_section.custom_request_field
if meta_section.references:
for reference in meta_section.references:
# Split the "field" address (e.g. "customers.id") into its component
@@ -272,6 +276,7 @@ def to_graph_field(
sub_fields=sub_fields,
return_all_elements=return_all_elements,
read_only=read_only,
custom_request_field=custom_request_field,
)


17 changes: 17 additions & 0 deletions src/fides/api/models/privacy_request.py
Original file line number Diff line number Diff line change
@@ -1835,6 +1835,23 @@ class TraversalDetails(FidesSchema):
outgoing_edges: List[Tuple[str, str]]
input_keys: List[str]

# TODO: remove this method once we support custom request fields in DSR graph.
@classmethod
def create_empty_traversal(cls, connection_key: str) -> TraversalDetails:
"""
Creates an "empty" TraversalDetails object that only has the dataset connection key set.
This is a bit of a hacky workaround needed to implement the Dynamic Erasure Emails feature,
and should be needed only until we support custom request fields as entry points to the DSR graph.
This is needed because custom request field nodes aren't currently reachable, so they don't have
a real TraversalNode associated to them.
"""
return cls(
dataset_connection_key=connection_key,
incoming_edges=[],
outgoing_edges=[],
input_keys=[],
)


class RequestTask(Base):
"""
14 changes: 11 additions & 3 deletions src/fides/api/schemas/connection_configuration/__init__.py
Original file line number Diff line number Diff line change
@@ -19,6 +19,12 @@
from fides.api.schemas.connection_configuration.connection_secrets_bigquery import (
BigQuerySchema as BigQuerySchema,
)
from fides.api.schemas.connection_configuration.connection_secrets_dynamic_erasure_email import (
DynamicErasureEmailDocsSchema as DynamicErasureEmailDocsSchema,
)
from fides.api.schemas.connection_configuration.connection_secrets_dynamic_erasure_email import (
DynamicErasureEmailSchema as DynamicErasureEmailSchema,
)
from fides.api.schemas.connection_configuration.connection_secrets_dynamodb import (
DynamoDBDocsSchema as DynamoDBDocsSchema,
)
@@ -51,10 +57,10 @@
GoogleCloudSQLPostgresSchema as GoogleCloudSQLPostgresSchema,
)
from fides.api.schemas.connection_configuration.connection_secrets_manual_webhook import (
ManualWebhookSchema as ManualWebhookSchema,
ManualWebhookDocsSchema,
)
from fides.api.schemas.connection_configuration.connection_secrets_manual_webhook import (
ManualWebhookSchemaforDocs as ManualWebhookSchemaforDocs,
ManualWebhookSchema as ManualWebhookSchema,
)
from fides.api.schemas.connection_configuration.connection_secrets_mariadb import (
MariaDBDocsSchema as MariaDBDocsSchema,
@@ -132,6 +138,7 @@
secrets_schemas: Dict[str, Any] = {
ConnectionType.attentive.value: AttentiveSchema,
ConnectionType.bigquery.value: BigQuerySchema,
ConnectionType.dynamic_erasure_email.value: DynamicErasureEmailSchema,
ConnectionType.dynamodb.value: DynamoDBSchema,
ConnectionType.fides.value: FidesConnectorSchema,
ConnectionType.generic_consent_email.value: ExtendedEmailSchema,
@@ -198,11 +205,12 @@ def get_connection_secrets_schema(
BigQueryDocsSchema,
SaaSSchema,
EmailDocsSchema,
ManualWebhookSchemaforDocs,
ManualWebhookDocsSchema,
TimescaleDocsSchema,
FidesDocsSchema,
SovrnDocsSchema,
DynamoDBDocsSchema,
S3DocsSchema,
ScyllaDocsSchema,
DynamicErasureEmailDocsSchema,
]
Loading

0 comments on commit 2e03b9b

Please sign in to comment.