diff --git a/.sqlfluff b/.sqlfluff index 8521efd6..8ad9bc69 100644 --- a/.sqlfluff +++ b/.sqlfluff @@ -73,6 +73,7 @@ unwrap_wrapped_queries = True [sqlfluff:templater:python:context] project_id = project_id dataset_id = dataset_id +target_project = target_project target_dataset = target_dataset table_id = table_id batch_id = batch_id diff --git a/dags/queries/create_table.sql b/dags/queries/create_table.sql index 3f5a8569..cf4880b3 100644 --- a/dags/queries/create_table.sql +++ b/dags/queries/create_table.sql @@ -1,4 +1,4 @@ -create or replace table `{project_id}.{target_dataset}.{table_id}` +create or replace table `{target_project}.{target_dataset}.{table_id}` partition by date_trunc(batch_run_date, day) options (partition_expiration_days = 180) as ( select * diff --git a/dags/queries/create_view.sql b/dags/queries/create_view.sql index 0a94e0ab..902aefff 100644 --- a/dags/queries/create_view.sql +++ b/dags/queries/create_view.sql @@ -1,3 +1,3 @@ -create or replace view `{project_id}.{target_dataset}.{table_id}` as ( +create or replace view `{target_project}.{target_dataset}.{table_id}` as ( select * from `{project_id}.{dataset_id}.{table_id}` ) diff --git a/dags/queries/update_table.sql b/dags/queries/update_table.sql index 9b1d6337..2dd78698 100644 --- a/dags/queries/update_table.sql +++ b/dags/queries/update_table.sql @@ -1,4 +1,4 @@ -insert into {project_id}.{target_dataset}.{table_id} +insert into `{target_project}.{target_dataset}.{table_id}` select * -from {project_id}.{dataset_id}.{table_id} +from `{project_id}.{dataset_id}.{table_id}` where date_trunc(batch_run_date, day) = date_trunc(cast('{batch_run_date}' as datetime), day) diff --git a/dags/sandbox_create_dag.py b/dags/sandbox_create_dag.py index 0f52aab3..e1b7b0b0 100644 --- a/dags/sandbox_create_dag.py +++ b/dags/sandbox_create_dag.py @@ -32,6 +32,7 @@ ) as dag: PROJECT = Variable.get("public_project") DATASET = Variable.get("public_dataset") + SANDBOX_PROJECT = Variable.get("bq_project") SANDBOX_DATASET = Variable.get("sandbox_dataset") DBT_DATASET = Variable.get("dbt_mart_dataset") TABLES_ID = Variable.get("table_ids", deserialize_json=True) @@ -47,6 +48,7 @@ "project_id": PROJECT, "dataset_id": DATASET, "table_id": TABLES_ID[table_id], + "target_project": SANDBOX_PROJECT, "target_dataset": SANDBOX_DATASET, } query = query.format(**sql_params) @@ -70,6 +72,7 @@ "project_id": PROJECT, "dataset_id": DBT_DATASET, "table_id": DBT_TABLES[dbt_table], + "target_project": SANDBOX_PROJECT, "target_dataset": SANDBOX_DATASET, } query = query.format(**sql_params) diff --git a/dags/sandbox_update_dag.py b/dags/sandbox_update_dag.py index 1b5b8807..2e7e662e 100644 --- a/dags/sandbox_update_dag.py +++ b/dags/sandbox_update_dag.py @@ -40,6 +40,7 @@ TABLES_ID = Variable.get("table_ids", deserialize_json=True) PROJECT = Variable.get("public_project") BQ_DATASET = Variable.get("public_dataset") + SANDBOX_PROJECT = Variable.get("bq_project") SANDBOX_DATASET = Variable.get("sandbox_dataset") batch_run_date = "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}" @@ -63,6 +64,7 @@ "project_id": PROJECT, "dataset_id": BQ_DATASET, "table_id": TABLES_ID[table_id], + "target_project": SANDBOX_PROJECT, "target_dataset": SANDBOX_DATASET, "batch_run_date": batch_run_date, }