Skip to content

Commit

Permalink
Update for Horizon reconciliation
Browse files Browse the repository at this point in the history
  • Loading branch information
sydneynotthecity committed Aug 31, 2021
1 parent 7d9fd4b commit 857625c
Show file tree
Hide file tree
Showing 9 changed files with 291 additions and 45 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,21 @@ Create a new Cloud Composer environment using the command below or the [UI](http
```bash
gcloud composer environments create <environment_name> --location=<project_location> \
--zone=<project_zone> --disk-size=100GB --machine-type=n1-standard-4 \
--node-count=3 --python-version=3 --image-version=composer-1.12.0-airflow-1.10.10 \
--oauth-scopes=['https://www.googleapis.com/auth/cloud-platform'] \
--node-count=3 --python-version=3 --image-version=composer-<version>-airflow-<version> \
--service-account=<service_account>

gcloud composer environments update <environment_name> \
--location=<project_location> --update-pypi-package=docker==3.7.3
```
_Note_: If no service account is provided, GCP will use the default GKE service account. For quick setup this is an easy option.
Remember to adjust the disk size, machine type, and node count to fit your needs. The python version must be 3, and the image must be `composer-1.12.0-airflow-1.10.10` or later. See [the command reference page](https://cloud.google.com/sdk/gcloud/reference/composer/environments/create) for a detailed list of parameters.
> **_TROUBLESHOOTING:_** If the environment creation fails because the "Composer Backend timed out" try disabling and enabling the Cloud Composer API. If the creation fails again, try creating a service account with Owner permissions and use it to create the Composer environment.
Cloud Composer may take a while to setup the environment. Once the process is finished, you can view the environment by going to the [Composer section of the Cloud Console](https://console.cloud.google.com/composer/environments).
> **_NOTE:_** Creating an environment will also create a new Google Cloud Storage bucket. You can check this bucket's name by clicking on the DAGs folder link in the Composer section of the Cloud Console.
### Upload DAGs and Schemas to Cloud Composer
After the environment is created, select the environment and navigate to the environment configuration tab. Look for the value under **DAGs folder**. It will be of the form `gs://airflow_bucket/dags`. The `airflow_bucket` value will be used in this step and the next . Run the command below in order to upload the DAGs and schemas to your Airflow bucket.
After the environment is created, select the environment and navigate to the environment configuration tab. Look for the value under **DAGs folder**. It will be of the form `gs://airflow_bucket/dags`. The `airflow_bucket` value will be used in this step and the next. Run the command below in order to upload the DAGs and schemas to your Airflow bucket.
```bash
> bash upload_static_to_gcs.sh <airflow_bucket>
```
Expand Down
22 changes: 11 additions & 11 deletions airflow_variables.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
{
"affinity": "{}",
"api_key_path": "/home/airflow/gcs/data/apiKey.json",
"bq_dataset": "test_dataset",
"bq_project": "stellar-testing-281223",
"gcs_exported_data_bucket_name": "stellar-testing-us",
"bq_dataset": "test_gcp_airflow_internal",
"bq_project": "test-hubble-319619",
"gcs_exported_data_bucket_name": "us-central1-internal-airflo-8d08728f-bucket",
"image_name": "stellar/stellar-etl:latest",
"image_output_path": "/etl/exported_data/",
"image_pull_policy": "Always",
Expand Down Expand Up @@ -38,18 +38,18 @@
},
"schema_filepath": "/home/airflow/gcs/dags/schemas/",
"table_ids": {
"accounts": "acc_table",
"assets": "asset_table",
"accounts": "acccounts",
"assets": "history_assets",
"dimAccounts": "dim_accounts",
"dimMarkets": "dim_markets",
"dimOffers": "dim_offers",
"factEvents": "fact_offer_events",
"ledgers": "ledger_table",
"offers": "off_table",
"operations": "op_table",
"trades": "trade_table",
"transactions": "tx_table",
"trustlines": "trust_table"
"ledgers": "history_ledgers",
"offers": "offers",
"operations": "history_operations",
"trades": "history_trades",
"transactions": "history_transactions",
"trustlines": "trust_lines"
},
"use_kubernetes_pod_exporter": "False",
"volume_config": "{}",
Expand Down
12 changes: 11 additions & 1 deletion dags/history_archive_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,20 @@
from stellar_etl_airflow.default import get_default_dag_args
from stellar_etl_airflow.build_load_task import build_load_task
from stellar_etl_airflow.build_apply_gcs_changes_to_bq_task import build_apply_gcs_changes_to_bq_task
import logging
import sys
import time

from airflow import DAG
from airflow.models import Variable

logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)

dag = DAG(
'history_archive_export',
default_args=get_default_dag_args(),
description='This DAG exports ledgers, transactions, operations, and trades from the history archive to BigQuery.',
schedule_interval="*/5 * * * *",
schedule_interval="0 */3 * * *",
user_defined_filters={'fromjson': lambda s: json.loads(s)},
)

Expand All @@ -34,10 +39,15 @@
can be exported from the history archives.
'''
ledger_export_task = build_export_task(dag, 'archive', 'export_ledgers', file_names['ledgers'])
ledger_export_task.post_execute = lambda **x: time.sleep(30)
tx_export_task = build_export_task(dag, 'archive', 'export_transactions', file_names['transactions'])
tx_export_task.post_execute = lambda **x: time.sleep(30)
op_export_task = build_export_task(dag, 'archive', 'export_operations', file_names['operations'])
op_export_task.post_execute = lambda **x: time.sleep(30)
trade_export_task = build_export_task(dag, 'archive', 'export_trades', file_names['trades'])
trade_export_task.post_execute = lambda **x: time.sleep(30)
asset_export_task = build_export_task(dag, 'archive', 'export_assets', file_names['assets'])
asset_export_task.post_execute = lambda **x: time.sleep(30)

'''
The load tasks receive the location of the exported file through Airflow's XCOM system.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,9 @@ def apply_gcs_changes(data_type, **kwargs):
table_id = f'{splitext(basename(gcs_filepath))[0]}'
table_id = table_id.replace('-', '_')

job_config = bigquery.QueryJobConfig(table_definitions={table_id: external_config})
job_config = bigquery.QueryJobConfig(
table_definitions={table_id: external_config}
)

#check if the table already exists; if it does not then we need to create it using the schema that we have already read in
true_table_id = f'{Variable.get("bq_project")}.{Variable.get("bq_dataset")}.{Variable.get("table_ids", deserialize_json=True)[data_type]}'
Expand Down
12 changes: 9 additions & 3 deletions dags/stellar_etl_airflow/build_export_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
'''

import json
import logging
import sys
from airflow import AirflowException
from airflow.models import Variable

logger = logging.getLogger("airflow.task")

def get_path_variables():
'''
Returns the image output path, core executable path, and core config path.
Expand All @@ -22,6 +26,7 @@ def select_correct_filename(cmd_type, base_name, batched_name):
filename = switch.get(cmd_type, 'No file')
if filename == 'No file':
raise AirflowException("Command type is not supported: ", cmd_type)
logger.info(f"Final filename: {filename}")
return filename

def generate_etl_cmd(command, base_filename, cmd_type):
Expand Down Expand Up @@ -59,6 +64,7 @@ def generate_etl_cmd(command, base_filename, cmd_type):

batch_filename = '-'.join([start_ledger, end_ledger, base_filename])
batched_path = image_output_path + batch_filename
logger.info(f'Batched file name: {batch_filename} batch file path: {batched_path}')
base_path = image_output_path + base_filename

correct_filename = select_correct_filename(cmd_type, base_filename, batch_filename)
Expand Down Expand Up @@ -87,8 +93,7 @@ def build_kubernetes_pod_exporter(dag, command, etl_cmd_string, output_file):
'''
from airflow.kubernetes.volume import Volume
from airflow.kubernetes.volume_mount import VolumeMount
from stellar_etl_airflow.kubernetes_pod_operator import KubernetesPodOperator

from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
data_mount = VolumeMount(Variable.get('volume_name'), Variable.get("image_output_path"), '', False)
volume_config = Variable.get('volume_config', deserialize_json=True)

Expand Down Expand Up @@ -131,7 +136,7 @@ def build_docker_exporter(dag, command, etl_cmd_string, output_file):
Returns:
the DockerOperator for the export task
'''
from stellar_etl_airflow.docker_operator import DockerOperator
from airflow.operators.docker_operator import DockerOperator

full_cmd = f'bash -c "{etl_cmd_string} >> /dev/null && echo \"{output_file}\""'
force_pull = True if Variable.get('image_pull_policy')=='Always' else False
Expand All @@ -143,6 +148,7 @@ def build_docker_exporter(dag, command, etl_cmd_string, output_file):
dag=dag,
xcom_push=True,
auto_remove=True,
tty=True,
force_pull=force_pull,
)

Expand Down
6 changes: 3 additions & 3 deletions dags/stellar_etl_airflow/default.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import timedelta
from datetime import timedelta, datetime
from airflow.models import Variable

def get_base_dag_args():
Expand All @@ -11,10 +11,10 @@ def get_base_dag_args():

def get_default_dag_args():
base = get_base_dag_args()
base['start_date'] = "2015-09-30T16:41:54+00:00"
base['start_date'] = datetime(2021, 8, 16)
return base

def get_orderbook_dag_args():
base = get_base_dag_args()
base['start_date'] = "2015-09-30T17:26:17+00:00"
base['start_date'] = datetime(2021, 8, 16)
return base
Loading

0 comments on commit 857625c

Please sign in to comment.