-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
create ledger transaction dag #230
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cayod do you mind giving me an overview of the data pipeline design?
I think this is how the pipeline will run, but want to confirm:
- Airflow will specify a time range, converting that to a ledger range to export
- one export job will spin up a captive core, read the ledgers and dump tx data to a file
- Each separate ledger will be read and converted to its own json file (
lt_lake_export_task
) - Each ledger will individually be loaded to BQ (
lt_bq_task
)
Is this correct?
init_sentry() | ||
|
||
|
||
def build_data_lake_to_bq_task(dag, project, dataset, data_type, ledger_range): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have multiple tasks that load GCS files to BQ using the GoogleCloudStorageToBigQueryOperator
. Can you explain how this one differs and the thought around loading files using a different design?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This task will be used to load multiple ledger files as the organization method of the data lake will be by the number of ledgers.
ledger_range = "{{ task_instance.xcom_pull(task_ids='time_task.task_id') }}" | ||
ledger_range = ast.literal_eval(ledger_range) | ||
for ledger in range(ledger_range["start"], ledger_range["end"]): | ||
lt_lake_export_task = export_to_lake(dag, lt_export_task.task_id, ledger) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will this spin up individual captive cores for each ledger we export?
Correct, this ledger will be loaded in a temp table to be consumed by Dbt, which will consume the js-stellar-base package and generate the history_transactions table. |
|
||
|
||
def export_to_lake(dag, export_task_id, ledger_sequence): | ||
bucket_source = Variable.get("gcs_exported_data_bucket_name") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should be using Jinja template to avoid getting variables at DAG parsing time. E.g.: {{ var.value.variable_name }}
instead of Variable.get(“variable_name”)
.
See for more details: https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#top-level-python-code
This PR creates a ledger transaction dag.