Skip to content

Commit

Permalink
Merge branch 'master' into Hubble-398-Combine-Del-Ins-Tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
harsha-stellar-data authored Jul 12, 2024
2 parents f30f9ce + cb5a1a3 commit 82922cb
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: CI
name: CI-CD-DEV

on:
pull_request:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: release
name: CI-CD-PROD

on:
pull_request:
Expand Down
39 changes: 20 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -450,25 +450,26 @@ The `airflow_variables_*.txt` files provide a set of default values for variable

### **DBT Variables**

| Variable name | Description | Should be changed? |
| --------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------ | --------------------------------------------------------- |
| dbt_full_refresh_models | JSON object. Each key should be a DBT model, and the value is a boolean controlling if the model should be run with `--full-refresh` | Yes, if desired for models that need to be full-refreshed |
| dbt_image_name | name of the `stellar-dbt` image to use | No, unless you need a specific image version |
| dbt_job_execution_timeout_seconds | timeout for dbt tasks in seconds | No, unless you want a different timeout |
| dbt_job_retries | number of times dbt_jobs will retry | No, unless you want a different retry limit |
| dbt_mart_dataset | Name of the BigQuery [dataset](https://cloud.google.com/bigquery/docs/datasets) for DBT marts | Yes. Change to your dataset name |
| dbt_maximum_bytes_billed | the max number of BigQuery bytes that can be billed when running DBT | No, unless you want a different limit |
| dbt_project | name of the Biquery [project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#console) | Yes. Change to your project name |
| dbt_target | the `target` that will used to run dbt | No, unless you want a different target |
| dbt_threads | the number of threads that dbt will spawn to build a model | No, unless you want a different thread count |
| dbt_tables | name of dbt tables to copy to sandbox | No |
| dbt_internal_source_db | Name of the BigQuery [project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#console) | Yes. Change to your project name. |
| dbt_internal_source_schema | Name of the BigQuery [dataset](https://cloud.google.com/bigquery/docs/datasets) | Yes. Change to your dataset name. |
| dbt_public_source_db | Name of the BigQuery [project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#console) | Yes. Change to your project name. |
| dbt_public_source_schema | Name of the BigQuery [dataset](https://cloud.google.com/bigquery/docs/datasets) | Yes. Change to your dataset name. |
| dbt_slack_elementary_channel | Name of slack channel to send elementary alerts | Yes. Change to your slack channel name. |
| dbt_elementary_dataset | Name of the BigQuery [dataset](https://cloud.google.com/bigquery/docs/datasets) | Yes. Change to your dataset name. |
| dbt_elementary_secret | Necessary argument for elementary task | No |
| Variable name | Description | Should be changed? |
| --------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------- | --------------------------------------------------------- |
| dbt_full_refresh_models | JSON object. Each key should be a DBT model, and the value is a boolean controlling if the model should be run with `--full-refresh` | Yes, if desired for models that need to be full-refreshed |
| dbt_image_name | name of the `stellar-dbt` image to use | No, unless you need a specific image version |
| dbt_job_execution_timeout_seconds | timeout for dbt tasks in seconds | No, unless you want a different timeout |
| dbt_job_retries | number of times dbt_jobs will retry | No, unless you want a different retry limit |
| dbt_mart_dataset | Name of the BigQuery [dataset](https://cloud.google.com/bigquery/docs/datasets) for DBT marts | Yes. Change to your dataset name |
| dbt_maximum_bytes_billed | the max number of BigQuery bytes that can be billed when running DBT | No, unless you want a different limit |
| dbt_project | name of the Biquery [project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#console) | Yes. Change to your project name |
| dbt_target | the `target` that will used to run dbt | No, unless you want a different target |
| dbt_threads | the number of threads that dbt will spawn to build a model | No, unless you want a different thread count |
| dbt_tables | name of dbt tables to copy to sandbox | No |
| dbt_internal_source_db | Name of the BigQuery [project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#console) | Yes. Change to your project name. |
| dbt_internal_source_schema | Name of the BigQuery [dataset](https://cloud.google.com/bigquery/docs/datasets) | Yes. Change to your dataset name. |
| dbt_public_source_db | Name of the BigQuery [project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#console) | Yes. Change to your project name. |
| dbt_public_source_schema | Name of the BigQuery [dataset](https://cloud.google.com/bigquery/docs/datasets) | Yes. Change to your dataset name. |
| dbt_slack_elementary_channel | Name of slack channel to send elementary alerts | Yes. Change to your slack channel name. |
| dbt_elementary_dataset | Name of the BigQuery [dataset](https://cloud.google.com/bigquery/docs/datasets) | Yes. Change to your dataset name. |
| dbt_elementary_secret | Necessary argument for elementary task | No |
| dbt_transient_errors_patterns | Dictionary containing a name of a known dbt transient error as key and a list of string sentences to identify the error pattern as value | Yes, for every known error added |

### **Kubernetes-Specific Variables**

Expand Down
6 changes: 6 additions & 0 deletions airflow_variables_dev.json
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@
},
"dbt_target": "test",
"dbt_threads": 12,
"dbt_transient_errors_patterns": {
"elementary_concurrent_access": [
"Could not serialize access to table",
"due to concurrent update"
]
},
"gcs_exported_data_bucket_name": "us-central1-test-hubble-2-5f1f2dbf-bucket",
"gcs_exported_object_prefix": "dag-exported",
"image_name": "stellar/stellar-etl:98bea9a",
Expand Down
6 changes: 6 additions & 0 deletions airflow_variables_prod.json
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@
},
"dbt_target": "prod",
"dbt_threads": 12,
"dbt_transient_errors_patterns": {
"elementary_concurrent_access": [
"Could not serialize access to table",
"due to concurrent update"
]
},
"gcs_exported_data_bucket_name": "us-central1-hubble-14c4ca64-bucket",
"gcs_exported_object_prefix": "dag-exported",
"image_name": "stellar/stellar-etl:98bea9a",
Expand Down
3 changes: 3 additions & 0 deletions dags/stellar_etl_airflow/build_dbt_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
)
from kubernetes.client import models as k8s
from stellar_etl_airflow.default import alert_after_max_retries
from stellar_etl_airflow.utils import skip_retry_dbt_errors


def create_dbt_profile(project="prod"):
Expand Down Expand Up @@ -142,6 +143,7 @@ def dbt_task(
config_file=config_file_location,
container_resources=container_resources,
on_failure_callback=alert_after_max_retries,
on_retry_callback=skip_retry_dbt_errors,
image_pull_policy="IfNotPresent",
image_pull_secrets=[k8s.V1LocalObjectReference("private-docker-auth")],
sla=timedelta(
Expand Down Expand Up @@ -223,6 +225,7 @@ def build_dbt_task(
config_file=config_file_location,
container_resources=resources_requests,
on_failure_callback=alert_after_max_retries,
on_retry_callback=skip_retry_dbt_errors,
image_pull_policy="IfNotPresent",
image_pull_secrets=[k8s.V1LocalObjectReference("private-docker-auth")],
sla=timedelta(
Expand Down
102 changes: 102 additions & 0 deletions dags/stellar_etl_airflow/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import logging
import re
import time

from airflow.configuration import conf
from airflow.models import Variable
from airflow.utils.state import TaskInstanceState

base_log_folder = conf.get("logging", "base_log_folder")


def get_log_file_name(context):
ti = context["ti"]
log_params = [
base_log_folder,
f"dag_id={ti.dag_id}",
f"run_id={ti.run_id}",
f"task_id={ti.task_id}",
f"attempt={ti.try_number - 1}.log",
]
return "/".join(log_params)


def check_dbt_transient_errors(context):
"""
Searches through the logs to find failure messages
and returns True if the errors found are transient.
"""
log_file_path = get_log_file_name(context)
log_contents = read_log_file(log_file_path)

dbt_transient_error_patterns = Variable.get(
"dbt_transient_errors_patterns", deserialize_json=True
)

dbt_summary_line = None
for line in log_contents:
# Check for transient errors message patterns
for transient_error, patterns in dbt_transient_error_patterns.items():
if all(sentence in line for sentence in patterns):
logging.info(
f"Found {transient_error} dbt transient error, proceeding to retry"
)
return True
elif "Done. PASS=" in line:
dbt_summary_line = line
break
# Check if dbt summary has been logged
if dbt_summary_line:
match = re.search(r"ERROR=(\d+)", dbt_summary_line)
if match:
dbt_errors = int(match.group(1))
# Check if dbt pipeline returned errors
if dbt_errors > 0:
logging.info("Could not find dbt transient errors, skipping retry")
return False
else:
logging.info(
"dbt pipeline finished without errors, task failed but will not retry"
)
return False
# Logic could not identify the error and assumes it is transient
logging.info("Task failed due to unforeseen error, proceeding to retry")
return True


def read_log_file(log_file_path):
max_retries = 3
retry_delay = 30
log_contents = []

for attempt in range(max_retries):
try:
with open(log_file_path, "r") as log_file:
log_contents = log_file.readlines()
break
except FileNotFoundError as file_error:
if attempt < max_retries - 1:
logging.warn(
f"Log file {log_file_path} not found retrying in {retry_delay} seconds..."
)
time.sleep(retry_delay)
else:
logging.error(file_error)
raise
return log_contents


def skip_retry_dbt_errors(context) -> None:
"""
Set task state to SKIPPED in case errors found in dbt are not transient.
"""
if not check_dbt_transient_errors(context):
ti = context["ti"]
logging.info(
f"Set task instance {ti} state to \
{TaskInstanceState.SKIPPED} to skip retrying"
)
ti.set_state(TaskInstanceState.SKIPPED)
return
else:
return

0 comments on commit 82922cb

Please sign in to comment.