Skip to content

Commit

Permalink
Remove a job task for define a field as default and create a ddl for it
Browse files Browse the repository at this point in the history
  • Loading branch information
cayod committed Nov 28, 2023
1 parent 385b41f commit 9a2f5a9
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 25 deletions.
25 changes: 25 additions & 0 deletions dags/ddls/create_default_value_field.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# bin/bash -e
#
#
###############################################################################
# Define a default value for a field in a table
#
# Author: Cayo Dias
# Date: 27 November 2023
#
# The script will define a column as default in case of necessity to recreate or
# rename the table.
# The project id, dataset ids, table id and field are hardcoded; if you need
# to update tables for a different environment, change the parameters.
###############################################################################

PROJECT_ID=test-hubble-319619
DATASET_ID=test_crypto_stellar_internal
TABLE=raw_mgi_stellar_transactions
FIELD=batch_insert_ts

echo "Creating default value field $FIELD in $TABLE in $DATASET_ID"

bq query --use_legacy_sql=false \
"ALTER TABLE \`$PROJECT_ID.$DATASET_ID.$TABLE\` \
ALTER COLUMN $FIELD SET DEFAULT CURRENT_TIMESTAMP();"
26 changes: 1 addition & 25 deletions dags/partner_pipeline_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@
BUCKET_NAME = Variable.get("partners_bucket")
PARTNERS = Variable.get("partners_data", deserialize_json=True)
TODAY = "{{ next_ds_nodash }}"
QUERY = """ UPDATE {project}.{dataset}.{table}
SET update_timestamp = CURRENT_TIMESTAMP()
WHERE update_timestamp IS NULL
"""
start_tables_task = EmptyOperator(task_id="start_update_task")

for partner in PARTNERS:
Expand Down Expand Up @@ -79,24 +75,4 @@
on_failure_callback=alert_after_max_retries,
)

# insert_ts_field = BigQueryInsertJobOperator(
# task_id=f"insert_ts_field_{partner}",
# project_id=PROJECT,
# on_failure_callback=alert_after_max_retries,
# configuration={
# "query": {
# "query": QUERY.format(
# project=PROJECT,
# dataset=DATASET,
# table=PARTNERS[partner]["table"],
# ),
# "useLegacySql": False,
# }
# },
# )
(
start_tables_task
>> check_gcs_file
>> send_partner_to_bq_internal_task
# >> insert_ts_field
)
(start_tables_task >> check_gcs_file >> send_partner_to_bq_internal_task)

0 comments on commit 9a2f5a9

Please sign in to comment.