Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Source Freshness Results #437

Open
wants to merge 26 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
4c79422
Add SQL for source freshness results
dpguthrie Jul 13, 2024
b07abca
Include source_executions when looping through
dpguthrie Jul 13, 2024
7270396
Fix for wrong macro name
dpguthrie Jul 13, 2024
23769ed
Add column names for source executions
dpguthrie Jul 13, 2024
baac8a5
Fix error passing in old argument name
dpguthrie Jul 13, 2024
a583226
Add flag to integration test project
dpguthrie Jul 13, 2024
24b5272
Oof
dpguthrie Jul 13, 2024
114e772
Count values shouldn't be strings
dpguthrie Jul 14, 2024
c93bbfa
Fix attributes not on node
dpguthrie Jul 14, 2024
bf5312e
Copy/paste fun
dpguthrie Jul 14, 2024
43285e2
Missing comma
dpguthrie Jul 14, 2024
5240631
Add missing comma
dpguthrie Jul 14, 2024
68b1288
Add source executions model and yml
dpguthrie Jul 14, 2024
8f7a167
Add missing docs blocks for new source_executions model/yml
dpguthrie Jul 14, 2024
9f115e7
Add staging model for source executions
dpguthrie Jul 14, 2024
3a2b0c5
Fix id name
dpguthrie Jul 14, 2024
d85d62d
Add staging / fact models and yml for source executions
dpguthrie Jul 14, 2024
1a511bc
Remove unnecessary doc block
dpguthrie Jul 14, 2024
5b61f19
Fix ref
dpguthrie Jul 14, 2024
1e28b7c
Should be a string
dpguthrie Jul 14, 2024
4a330a7
Fix other adapter dml
dpguthrie Jul 14, 2024
53f49f4
Remove full refresh column from 'source'
dpguthrie Jul 14, 2024
df041c6
Add source freshness command to testing suite
dpguthrie Jul 14, 2024
b00dfab
Update README for instructions for opting in
dpguthrie Jul 14, 2024
ba03537
Merge branch 'main' into feat/add-source-freshness
michelley-an Aug 20, 2024
35dc474
Take config from profiles.yml and put in dbt_project.yml
dpguthrie Aug 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ fct_dbt__model_executions
fct_dbt__seed_executions
fct_dbt__snapshot_executions
fct_dbt__test_executions
fct_dbt__source_executions
```

See the generated [dbt docs site](https://brooklyn-data.github.io/dbt_artifacts/#!/overview) for documentation on each model.
Expand Down Expand Up @@ -73,6 +74,13 @@ packages:
dbt run --select dbt_artifacts
```

5. If you want to include results from the `dbt source freshness` command, you'll need to add the following flag in your `dbt_project.yml`. This was recently added and made backwards-compatible to older core versions, so it's something that you have to opt into.

```yml
flags:
source_freshness_run_project_hooks: true
```

### Notes on upgrading

Due to the structure of the project, when additional fields are added, the package needs to be re-run to ensure the tables include the new field, or it will simply error on the hook. These changes will always be implemented within a new **minor** version, so make sure that the version you use in `packages.yml` reflects this.
Expand Down
3 changes: 3 additions & 0 deletions integration_test_project/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,8 @@ models:
seeds:
+quote_columns: false

flags:
michelley-an marked this conversation as resolved.
Show resolved Hide resolved
source_freshness_run_project_hooks: true

on-run-end:
- "{{ dbt_artifacts.upload_results(results) }}"
209 changes: 209 additions & 0 deletions macros/upload_individual_datasets/upload_source_executions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
{% macro upload_source_executions(sources) %}
{{ return(adapter.dispatch('get_source_executions_dml_sql', 'dbt_artifacts')(sources)) }}
{% endmacro %}

{% macro default__get_source_executions_dml_sql(sources) -%}
{% if sources != [] %}

{% set source_execution_values %}
select
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(1) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(2) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(3) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(4) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(5) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(6) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(7) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(8) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(9) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(10) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(11) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(12) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(13) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(14) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(15) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(16) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(17) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(18) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(19) }},
{{ adapter.dispatch('parse_json', 'dbt_artifacts')(adapter.dispatch('column_identifier', 'dbt_artifacts')(20)) }}
from values
{% for source in sources -%}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ source.node.unique_id }}', {# node_id #}
'{{ run_started_at }}', {# run_started_at #}
'{{ source.thread_id }}', {# thread_id #}
'{{ source.status }}', {# status #}

{% set compile_started_at = (source.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %}
{% if compile_started_at %}'{{ compile_started_at }}'{% else %}null{% endif %}, {# compile_started_at #}
{% set query_completed_at = (source.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %}
{% if query_completed_at %}'{{ query_completed_at }}'{% else %}null{% endif %}, {# query_completed_at #}

{{ source.execution_time }}, {# total_node_runtime #}
'{{ source.node.schema }}', {# schema #}
'{{ source.node.name }}', {# name #}
'{{ source.node.source_name }}', {# source_name #}
'{{ source.node.loaded_at_field }}', {# loaded_at_field #}
{{ source.node.freshness.warn_after.count }}, {# warn_after_count #}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
{{ source.node.freshness.warn_after.count }}, {# warn_after_count #}
{{ 'null' if source.node.freshness.warn_after.count is none else source.node.freshness.warn_after.count }}

Was throwing an error when warn after not provided

'{{ source.node.freshness.warn_after.period }}', {# warn_after_period #}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
'{{ source.node.freshness.warn_after.period }}', {# warn_after_period #}
{% if source.node.freshness.warn_after.period is none %} null {% else %} '{{ source.node.freshness.warn_after.period }}' {% endif %}, {# warn_after_period #}

Handle None values

{{ source.node.freshness.error_after.count }}, {# error_after_count #}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
{{ source.node.freshness.error_after.count }}, {# error_after_count #}
{{ 'null' if source.node.freshness.error_after.count is none else source.node.freshness.error_after.count }}, {# error_after_count #}

Handle None values

'{{ source.node.freshness.error_after.period }}', {# error_after_period #}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
'{{ source.node.freshness.error_after.period }}', {# error_after_period #}
{% if source.node.freshness.error_after.period is none %} null {% else %} '{{ source.node.freshness.error_after.period }}' {% endif %}, {# error_after_period #}

Handle None value

'{{ source.max_loaded_at }}', {# max_loaded_at #}
'{{ source.snapshotted_at }}', {# snapshotted_at #}
{{ source.age }}, {# age #}
'{{ tojson(source.adapter_response) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}' {# adapter_response #}
)
{%- if not loop.last %},{%- endif %}
{% endfor %}
{% endset %}
{{ source_execution_values }}
{% else %}
{{ return ("") }}
{% endif %}
{% endmacro -%}


{% macro bigquery__get_source_executions_dml_sql(sources) -%}
{% if sources != [] %}

{% set source_execution_values %}
{% for source in sources -%}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ source.node.unique_id }}', {# node_id #}
'{{ run_started_at }}', {# run_started_at #}
'{{ source.thread_id }}', {# thread_id #}
'{{ source.status }}', {# status #}

{% set compile_started_at = (source.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %}
{% if compile_started_at %}'{{ compile_started_at }}'{% else %}null{% endif %}, {# compile_started_at #}
{% set query_completed_at = (source.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %}
{% if query_completed_at %}'{{ query_completed_at }}'{% else %}null{% endif %}, {# query_completed_at #}

{{ source.execution_time }}, {# total_node_runtime #}
'{{ source.node.schema }}', {# schema #}
'{{ source.node.name }}', {# name #}
'{{ source.node.source_name }}', {# source_name #}
'{{ source.node.loaded_at_field }}', {# loaded_at_field #}
'{{ source.node.freshness.warn_after.count }}', {# warn_after_count #}
'{{ source.node.freshness.warn_after.period }}', {# warn_after_period #}
'{{ source.node.freshness.error_after.count }}', {# error_after_count #}
'{{ source.node.freshness.error_after.period }}', {# error_after_period #}
'{{ source.max_loaded_at }}', {# max_loaded_at #}
'{{ source.snapshotted_at }}', {# snapshotted_at #}
{{ source.age }}, {# age #}
{{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(model.adapter_response) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"')) }} {# adapter_response #}
)
{%- if not loop.last %},{%- endif %}
{% endfor %}
{% endset %}
{{ source_execution_values }}
{% else %}
{{ return ("") }}
{% endif %}
{% endmacro -%}


{% macro snowflake__get_source_executions_dml_sql(sources) -%}
{% if sources != [] %}

{% set source_execution_values %}
select
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(1) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(2) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(3) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(4) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(5) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(6) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(7) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(8) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(9) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(10) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(11) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(12) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(13) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(14) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(15) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(16) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(17) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(18) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(19) }},
{{ adapter.dispatch('parse_json', 'dbt_artifacts')(adapter.dispatch('column_identifier', 'dbt_artifacts')(20)) }}
from values
{% for source in sources -%}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ source.node.unique_id }}', {# node_id #}
'{{ run_started_at }}', {# run_started_at #}
'{{ source.thread_id }}', {# thread_id #}
'{{ source.status }}', {# status #}

{% set compile_started_at = (source.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %}
{% if compile_started_at %}'{{ compile_started_at }}'{% else %}null{% endif %}, {# compile_started_at #}
{% set query_completed_at = (source.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %}
{% if query_completed_at %}'{{ query_completed_at }}'{% else %}null{% endif %}, {# query_completed_at #}

{{ source.execution_time }}, {# total_node_runtime #}
'{{ source.node.schema }}', {# schema #}
'{{ source.node.name }}', {# name #}
'{{ source.node.source_name }}', {# source_name #}
'{{ source.node.loaded_at_field }}', {# loaded_at_field #}
{{ source.node.freshness.warn_after.count }}, {# warn_after_count #}
'{{ source.node.freshness.warn_after.period }}', {# warn_after_period #}
{{ source.node.freshness.error_after.count }}, {# error_after_count #}
'{{ source.node.freshness.error_after.period }}', {# error_after_period #}
'{{ source.max_loaded_at }}', {# max_loaded_at #}
'{{ source.snapshotted_at }}', {# snapshotted_at #}
{{ source.age }}, {# age #}
'{{ tojson(source.adapter_response) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}' {# adapter_response #}
)
{%- if not loop.last %},{%- endif %}
{% endfor %}
{% endset %}
{{ source_execution_values }}
{% else %}
{{ return ("") }}
{% endif %}
{% endmacro -%}

{% macro postgres__get_source_executions_dml_sql(sources) -%}
{% if sources != [] %}

{% set source_execution_values %}
{% for source in sources -%}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ source.node.unique_id }}', {# node_id #}
'{{ run_started_at }}', {# run_started_at #}
'{{ source.thread_id }}', {# thread_id #}
'{{ source.status }}', {# status #}

{% set compile_started_at = (source.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %}
{% if compile_started_at %}'{{ compile_started_at }}'{% else %}null{% endif %}, {# compile_started_at #}
{% set query_completed_at = (source.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %}
{% if query_completed_at %}'{{ query_completed_at }}'{% else %}null{% endif %}, {# query_completed_at #}

{{ source.execution_time }}, {# total_node_runtime #}
'{{ source.node.schema }}', {# schema #}
'{{ source.node.name }}', {# name #}
'{{ source.node.source_name }}', {# source_name #}
'{{ source.node.loaded_at_field }}', {# loaded_at_field #}
'{{ source.node.freshness.warn_after.count }}', {# warn_after_count #}
'{{ source.node.freshness.warn_after.period }}', {# warn_after_period #}
'{{ source.node.freshness.error_after.count }}', {# error_after_count #}
'{{ source.node.freshness.error_after.period }}', {# error_after_period #}
'{{ source.max_loaded_at }}', {# max_loaded_at #}
'{{ source.snapshotted_at }}', {# snapshotted_at #}
{{ source.age }}, {# age #}
$${{ tojson(model.adapter_response) }}$$ {# adapter_response #}
)
{%- if not loop.last %},{%- endif %}
{% endfor %}
{% endset %}
{{ source_execution_values }}
{% else %}
{{ return ("") }}
{% endif %}
{% endmacro -%}
25 changes: 25 additions & 0 deletions macros/upload_results/get_column_name_lists.sql
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,31 @@
all_results
)

{% elif dataset == 'source_executions' %}

(
command_invocation_id,
node_id,
run_started_at,
thread_id,
status,
compile_started_at,
query_completed_at,
total_node_runtime,
schema,
name,
source_name,
loaded_at_field,
warn_after_count,
warn_after_period,
error_after_count,
error_after_period,
max_loaded_at,
snapshotted_at,
age,
adapter_response
)

{% else %}

/* No column list available */
Expand Down
2 changes: 1 addition & 1 deletion macros/upload_results/get_dataset_content.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{% macro get_dataset_content(dataset) %}

{% if dataset in ['model_executions', 'seed_executions', 'test_executions', 'snapshot_executions'] %}
{% if dataset in ['model_executions', 'seed_executions', 'test_executions', 'snapshot_executions', 'source_executions'] %}
{# Executions make use of the results object #}
{% set objects = results | selectattr("node.resource_type", "equalto", dataset.split("_")[0]) | list %}
{% elif dataset in ['seeds', 'snapshots', 'tests', 'models'] %}
Expand Down
2 changes: 2 additions & 0 deletions macros/upload_results/get_table_content_values.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
{% set content = dbt_artifacts.upload_test_executions(objects_to_upload) %}
{% elif dataset == 'snapshot_executions' %}
{% set content = dbt_artifacts.upload_snapshot_executions(objects_to_upload) %}
{% elif dataset == 'source_executions' %}
{% set content = dbt_artifacts.upload_source_executions(objects_to_upload) %}
{% elif dataset == 'exposures' %}
{% set content = dbt_artifacts.upload_exposures(objects_to_upload) %}
{% elif dataset == 'models' %}
Expand Down
2 changes: 1 addition & 1 deletion macros/upload_results/upload_results.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
{% set datasets_to_load = ['exposures', 'seeds', 'snapshots', 'invocations', 'sources', 'tests', 'models'] %}
{% if results != [] %}
{# When executing, and results are available, then upload the results #}
{% set datasets_to_load = ['model_executions', 'seed_executions', 'test_executions', 'snapshot_executions'] + datasets_to_load %}
{% set datasets_to_load = ['model_executions', 'seed_executions', 'test_executions', 'snapshot_executions', 'source_executions'] + datasets_to_load %}
{% endif %}

{# Upload each data set in turn #}
Expand Down
42 changes: 42 additions & 0 deletions models/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -412,3 +412,45 @@ Response provided by the adapter as JSON.
All results as a JSON blob

{% enddocs %}

{% docs warn_after_count %}

Positive integer indicating a threshold to warn (used alongside warn_after_period)

{% enddocs %}

{% docs warn_after_period %}

Used alongside warn_after_count to indicate number of periods (minutes, hours, days) for a warn threshold

{% enddocs %}

{% docs error_after_count %}

Positive integer indicating a threshold to error (used alongside error_after_period)

{% enddocs %}

{% docs error_after_period %}

Used alongside error_after_count to indicate number of periods (minutes, hours, days) for a error threshold

{% enddocs %}

{% docs max_loaded_at %}

Max value of loaded_at_field timestamp in the source table when queried

{% enddocs %}

{% docs snapshotted_at %}

Current timestamp when querying

{% enddocs %}

{% docs age %}

Interval between max_loaded_at and snapshotted_at, calculated in python to handle timezone complexity

{% enddocs %}
36 changes: 36 additions & 0 deletions models/fct_dbt__source_executions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
with base as (

select *
from {{ ref('stg_dbt__source_executions') }}

),

source_executions as (

select
source_execution_id,
command_invocation_id,
node_id,
run_started_at,
thread_id,
status,
compile_started_at,
query_completed_at,
total_node_runtime,
schema, -- noqa
name,
source_name,
loaded_at_field,
warn_after_count,
warn_after_period,
error_after_count,
error_after_period,
max_loaded_at,
snapshotted_at,
age,
adapter_response
from base

)

select * from source_executions
Loading
Loading