Skip to content

Commit

Permalink
Merge pull request #28 from flexanalytics/redshift
Browse files Browse the repository at this point in the history
Create redshift__create_constraints.sql
  • Loading branch information
sfc-gh-dflippo authored Sep 27, 2022
2 parents c61bd39 + 35eccb6 commit f15affc
Showing 1 changed file with 269 additions and 0 deletions.
269 changes: 269 additions & 0 deletions macros/redshift__create_constraints.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
{# Redshift specific implementation to create a primary key #}
{%- macro redshift__create_primary_key(table_relation, column_names, verify_permissions, quote_columns=false) -%}
{%- set constraint_name = (table_relation.identifier ~ "_" ~ column_names|join('_') ~ "_PK") | upper -%}

{%- if constraint_name|length > 127 %}
{%- set constraint_name_query %}
select 'PK_' || md5( '{{ constraint_name }}' )::varchar as "constraint_name"
{%- endset -%}
{%- set results = run_query(constraint_name_query) -%}
{%- set constraint_name = results.columns[0].values()[0] -%}
{% endif %}

{%- set columns_csv = dbt_constraints.get_quoted_column_csv(column_names, quote_columns) -%}

{#- Check that the table does not already have this PK/UK -#}
{%- if not dbt_constraints.unique_constraint_exists(table_relation, column_names) -%}

{%- if dbt_constraints.have_ownership_priv(table_relation, verify_permissions) -%}

{%- do log("Creating primary key: " ~ constraint_name, info=true) -%}
{%- call statement('add_pk', fetch_result=False, auto_begin=True) -%}
ALTER TABLE {{table_relation}} ADD CONSTRAINT {{constraint_name}} PRIMARY KEY ( {{columns_csv}} )
{%- endcall -%}
{{ adapter.commit() }}

{%- else -%}
{%- do log("Skipping " ~ constraint_name ~ " because of insufficient privileges: " ~ table_relation, info=false) -%}
{%- endif -%}

{%- else -%}
{%- do log("Skipping " ~ constraint_name ~ " because PK/UK already exists: " ~ table_relation ~ " " ~ column_names, info=false) -%}
{%- endif -%}

{%- endmacro -%}



{# Redshift specific implementation to create a unique key #}
{%- macro redshift__create_unique_key(table_relation, column_names, verify_permissions, quote_columns=false) -%}
{%- set constraint_name = (table_relation.identifier ~ "_" ~ column_names|join('_') ~ "_UK") | upper -%}

{%- if constraint_name|length > 127 %}
{%- set constraint_name_query %}
select 'UK_' || md5( '{{ constraint_name }}' )::varchar as "constraint_name"
{%- endset -%}
{%- set results = run_query(constraint_name_query) -%}
{%- set constraint_name = results.columns[0].values()[0] -%}
{% endif %}

{%- set columns_csv = dbt_constraints.get_quoted_column_csv(column_names, quote_columns) -%}

{#- Check that the table does not already have this PK/UK -#}
{%- if not dbt_constraints.unique_constraint_exists(table_relation, column_names) -%}

{%- if dbt_constraints.have_ownership_priv(table_relation, verify_permissions) -%}

{%- do log("Creating unique key: " ~ constraint_name, info=true) -%}
{%- call statement('add_uk', fetch_result=False, auto_begin=True) -%}
ALTER TABLE {{table_relation}} ADD CONSTRAINT {{constraint_name}} UNIQUE ( {{columns_csv}} )
{%- endcall -%}
{{ adapter.commit() }}

{%- else -%}
{%- do log("Skipping " ~ constraint_name ~ " because of insufficient privileges: " ~ table_relation, info=false) -%}
{%- endif -%}

{%- else -%}
{%- do log("Skipping " ~ constraint_name ~ " because PK/UK already exists: " ~ table_relation ~ " " ~ column_names, info=false) -%}
{%- endif -%}

{%- endmacro -%}

{# Redshift specific implementation to create a not null constraint #}
{%- macro redshift__create_not_null(table_relation, column_names, verify_permissions, quote_columns=false) -%}
{%- set columns_list = dbt_constraints.get_quoted_column_list(column_names, quote_columns) -%}

{%- do log("Skipping not null constraint for " ~ columns_list | join(", ") ~ " in " ~ table_relation ~ " because ALTER COLUMN SET NOT NULL is not supported", info=true) -%}
{%- endmacro -%}

{# Redshift specific implementation to create a foreign key #}
{%- macro redshift__create_foreign_key(pk_table_relation, pk_column_names, fk_table_relation, fk_column_names, verify_permissions, quote_columns=true) -%}
{%- set constraint_name = (fk_table_relation.identifier ~ "_" ~ fk_column_names|join('_') ~ "_FK") | upper -%}

{%- if constraint_name|length > 127 %}
{%- set constraint_name_query %}
select 'FK_' || md5( '{{ constraint_name }}' )::varchar as "constraint_name"
{%- endset -%}
{%- set results = run_query(constraint_name_query) -%}
{%- set constraint_name = results.columns[0].values()[0] -%}
{% endif %}

{%- set fk_columns_csv = dbt_constraints.get_quoted_column_csv(fk_column_names, quote_columns) -%}
{%- set pk_columns_csv = dbt_constraints.get_quoted_column_csv(pk_column_names, quote_columns) -%}
{#- Check that the PK table has a PK or UK -#}
{%- if dbt_constraints.unique_constraint_exists(pk_table_relation, pk_column_names) -%}
{#- Check if the table already has this foreign key -#}
{%- if not dbt_constraints.foreign_key_exists(fk_table_relation, fk_column_names) -%}

{%- if dbt_constraints.have_ownership_priv(fk_table_relation, verify_permissions) and dbt_constraints.have_references_priv(pk_table_relation, verify_permissions) -%}

{%- do log("Creating foreign key: " ~ constraint_name ~ " referencing " ~ pk_table_relation.identifier ~ " " ~ pk_column_names, info=true) -%}
{%- call statement('add_fk', fetch_result=False, auto_begin=True) -%}
--Note: ON DELETE not supported in Redshift
ALTER TABLE {{fk_table_relation}} ADD CONSTRAINT {{constraint_name}} FOREIGN KEY ( {{fk_columns_csv}} ) REFERENCES {{pk_table_relation}} ( {{pk_columns_csv}} ) --ON DELETE NO ACTION DEFERRABLE INITIALLY DEFERRED
{%- endcall -%}
{{ adapter.commit() }}

{%- else -%}
{%- do log("Skipping " ~ constraint_name ~ " because of insufficient privileges: " ~ fk_table_relation ~ " referencing " ~ pk_table_relation, info=true) -%}
{%- endif -%}

{%- else -%}
{%- do log("Skipping " ~ constraint_name ~ " because FK already exists: " ~ fk_table_relation ~ " " ~ fk_column_names, info=false) -%}
{%- endif -%}
{%- else -%}
{%- do log("Skipping " ~ constraint_name ~ " because a PK/UK was not found on the PK table: " ~ pk_table_relation ~ " " ~ pk_column_names, info=true) -%}
{%- endif -%}

{%- endmacro -%}



{#- This macro is used in create macros to avoid duplicate PK/UK constraints
and to skip FK where no PK/UK constraint exists on the parent table -#}
{%- macro redshift__unique_constraint_exists(table_relation, column_names) -%}
{%- set lookup_query -%}
SELECT
kc.constraint_name
, lower(kc.column_name) as column_name
FROM information_schema.key_column_usage kc
JOIN information_schema.table_constraints tc
ON kc.table_name = tc.table_name
AND kc.table_schema = tc.table_schema
AND kc.constraint_name = tc.constraint_name
WHERE tc.constraint_type in ('PRIMARY KEY', 'UNIQUE')
AND kc.table_schema ilike '{{table_relation.schema}}'
AND kc.table_name ilike '{{table_relation.identifier}}'
order by kc.constraint_name
{%- endset -%}
{%- do log("Lookup: " ~ lookup_query, info=false) -%}
{%- set constraint_list = run_query(lookup_query) -%}
{%- if constraint_list.columns["column_name"].values() | count > 0 -%}
{%- for constraint in constraint_list.group_by("constraint_name") -%}
{%- if dbt_constraints.column_list_matches(constraint.columns["column_name"].values(), column_names ) -%}
{%- do log("Found PK/UK key: " ~ table_relation ~ " " ~ column_names, info=false) -%}
{{ return(true) }}
{%- endif -%}
{% endfor %}
{%- endif -%}#}

{#- If we get this far then the table does not have either constraint -#}
{%- do log("No PK/UK key: " ~ table_relation ~ " " ~ column_names, info=false) -%}
{{ return(false) }}
{%- endmacro -%}



{#- This macro is used in create macros to avoid duplicate FK constraints -#}
{%- macro redshift__foreign_key_exists(table_relation, column_names) -%}
{%- set lookup_query -%}
SELECT
kc.constraint_name fk_name
, lower(kc.column_name) as fk_column_name
FROM information_schema.key_column_usage kc
JOIN information_schema.table_constraints tc
ON kc.table_name = tc.table_name
AND kc.table_schema = tc.table_schema
AND kc.constraint_name = tc.constraint_name
WHERE tc.constraint_type='FOREIGN KEY'
AND kc.table_schema ilike '{{table_relation.schema}}'
AND kc.table_name ilike '{{table_relation.identifier}}'
order by kc.constraint_name
{%- endset -%}
{%- do log("Lookup: " ~ lookup_query, info=false) -%}
{%- set constraint_list = run_query(lookup_query) -%}
{%- if constraint_list.columns["fk_column_name"].values() | count > 0 -%}
{%- for constraint in constraint_list.group_by("fk_name") -%}
{%- if dbt_constraints.column_list_matches(constraint.columns["fk_column_name"].values(), column_names ) -%}
{%- do log("Found FK key: " ~ table_relation ~ " " ~ column_names, info=false) -%}
{{ return(true) }}
{%- endif -%}
{% endfor %}
{%- endif -%}

{#- If we get this far then the table does not have this constraint -#}
{%- do log("No FK key: " ~ table_relation ~ " " ~ column_names, info=false) -%}
{{ return(false) }}
{%- endmacro -%}


{%- macro redshift__have_references_priv(table_relation, verify_permissions) -%}
{%- if verify_permissions is sameas true -%}

{%- set lookup_query -%}
select case when count(*) > 0 then 'y' else 'n' end as "have_references"
from information_schema.table_privileges t
join information_schema.enabled_roles er on t.grantee = er.role_name
where upper(t.table_schema) = upper('{{table_relation.schema}}')
and upper(t.table_name) = upper('{{table_relation.identifier}}')
{%- endset -%}
{%- do log("Lookup: " ~ lookup_query, info=false) -%}
{%- set results = run_query(lookup_query) -%}
{%- if "y" in( results.columns["have_references"].values() ) -%}
{{ return(true) }}
{%- endif -%}

{{ return(false) }}
{%- else -%}
{{ return(true) }}
{%- endif -%}
{%- endmacro -%}


{%- macro redshift__have_ownership_priv(table_relation, verify_permissions) -%}
{%- if verify_permissions is sameas true -%}

{%- set lookup_query -%}
select case when count(*) > 0 then 'y' else 'n' end as "have_ownership"
from pg_catalog.pg_tables t
join information_schema.enabled_roles er on t.tableowner = er.role_name
where upper(t.schemaname) = upper('{{table_relation.schema}}')
and upper(t.tablename) = upper('{{table_relation.identifier}}')
{%- endset -%}
{%- do log("Lookup: " ~ lookup_query, info=false) -%}
{%- set results = run_query(lookup_query) -%}
{%- if "y" in( results.columns["have_ownership"].values() ) -%}
{{ return(true) }}
{%- endif -%}

{{ return(false) }}
{%- else -%}
{{ return(true) }}
{%- endif -%}
{%- endmacro -%}


{% macro redshift__drop_referential_constraints(relation) -%}
{%- set lookup_query -%}
select constraint_name
from information_schema.table_constraints
where table_schema = '{{relation.schema}}'
and table_name='{{relation.identifier}}'
and constraint_type in ('FOREIGN KEY', 'PRIMARY KEY', 'UNIQUE')
{%- endset -%}
{%- set constraint_list = run_query(lookup_query) -%}

{%- for constraint_name in constraint_list.columns["constraint_name"].values() -%}
{%- do log("Dropping constraint: " ~ constraint_name ~ " from table " ~ relation, info=false) -%}
{%- call statement('drop_constraint_cascade', fetch_result=False, auto_begin=True) -%}
ALTER TABLE {{relation}} DROP CONSTRAINT "{{constraint_name}}" CASCADE
{%- endcall -%}
{{ adapter.commit() }}
{% endfor %}

{% endmacro %}

{#- Redshift will error if you try to truncate tables with FK constraints or tables with PK/UK constraints
referenced by FK so we will drop all constraints before truncating tables -#}
{% macro redshift__truncate_relation(relation) -%}
{{ redshift__drop_referential_constraints(relation) }}
{{ return(adapter.dispatch('truncate_relation', 'dbt')(relation)) }}
{% endmacro %}

{#- Redshift will get deadlocks if you try to drop tables with FK constraints or tables with PK/UK constraints
referenced by FK so we will drop all constraints before dropping tables -#}
{% macro redshift__drop_relation(relation) -%}
{{ redshift__drop_referential_constraints(relation) }}
{{ return(adapter.dispatch('drop_relation', 'dbt')(relation)) }}
{% endmacro %}

0 comments on commit f15affc

Please sign in to comment.