Skip to content

Commit

Permalink
Improved data modeling for dbtLoggerDatabase
Browse files Browse the repository at this point in the history
now outputs to two database tables, one for the run and one for the nodes
  • Loading branch information
austinweisgrau committed Sep 21, 2024
1 parent b143a63 commit 72dbeef
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 26 deletions.
93 changes: 67 additions & 26 deletions parsons/utilities/dbt/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import datetime
import logging
import uuid
import time
from abc import ABC, abstractmethod
from typing import Optional
Expand Down Expand Up @@ -92,6 +93,7 @@ def format_command_result(
log_message += "\n*Summary*: `{}`".format(log_summary_str)

log_message += "\n*GB Processed*: {:.2f}".format(manifest.total_gb_processed)
log_message += "\n*Slot hours*: {:.2f}".format(manifest.total_slot_hours)

# Errors
if manifest.errors or manifest.fails:
Expand Down Expand Up @@ -181,63 +183,102 @@ def send(self, manifests: list[Manifest]) -> None:


class dbtLoggerDatabase(dbtLogger, ABC):
"""Log dbt artifacts by loading to a database."""
"""Log dbt artifacts by loading to a database.
This class is an abstract base class for logging dbt artifacts to
a database.
"""

def __init__(
self,
database_connector: DatabaseConnector,
destination_table: str,
destination_table_runs: str,
destination_table_nodes: str,
extra_run_table_fields: dict,
**copy_kwargs,
) -> None:
"""Initialize the logger.
Args:
database_connector: A DatabaseConnector object.
destination_table_runs: The name of the table to log run information.
destination_table_nodes: The name of the table to log node information.
extra_run_table_fields: A dictionary of additional fields to include in the run table.
**copy_kwargs: Additional keyword arguments to pass to the `copy` method.
"""
self.db_connector = database_connector
self.destination_table = destination_table
self.destination_table_runs = destination_table_runs
self.destination_table_nodes = destination_table_nodes
self.extra_run_table_fields = extra_run_table_fields
self.copy_kwargs = copy_kwargs

def format_command_result(self, manifest: Manifest) -> Table:
def format_command_result(self, manifest: Manifest) -> tuple[Table, Table]:
"""Loads all artifact results into a Parsons Table."""
dbt_run_id = str(uuid.uuid4())
run_metadata = {
key: getattr(manifest, key)
for key in (
"command",
"generated_at",
)
key: getattr(manifest, key) for key in ("command", "generated_at", "elapsed_time")
}
rows = []
run_metadata.update(**self.extra_run_table_fields)
run_metadata["run_id"] = dbt_run_id
run_tbl = Table([run_metadata])

node_rows = []
for result in manifest.results:
row = run_metadata.copy()
row.update(
node_row = {"dbt_run_id": dbt_run_id}
node_row.update(
{
key: value
for key, value in result.__dict__.items()
if key in ("execution_time", "message")
}
)
row["status"] = str(result.status)
node_row["status"] = str(result.status)
node_info = {
key: value
for key, value in result.node.__dict__.items()
if key in ("database", "schema", "name", "path")
}
node_info["node"] = result.node.unique_id
row.update(node_info)
node_row.update(node_info)

adapter_response_data = {
key: value
for key, value in result.adapter_response.items()
if key in ("bytes_processed", "bytes_billed", "job_id", "slot_ms")
}
node_row.update(adapter_response_data)

node_rows.append(node_row)

row["bytes_processed"] = result.adapter_response.get("bytes_processed", 0)
nodes_tbl = Table(node_rows)
return run_tbl, nodes_tbl

rows.append(row)
tbl = Table(rows)
return tbl
def format_result(self) -> tuple[Table, Table]:
"""Returns a table for the dbt runs and a table for the node runs."""
run_rows = []
node_rows = []
for command in self.commands:
run_tbl, nodes_tbl = self.format_command_result(command)
run_rows.extend(run_tbl.to_dicts())
node_rows.extend(nodes_tbl.to_dicts())

def format_result(self) -> Table:
tbls = [self.format_command_result(command) for command in self.commands]
all_rows_lists = [tbl.to_dicts() for tbl in tbls]
all_rows_flat = [item for sublist in all_rows_lists for item in sublist]
tbl = Table(all_rows_flat)
return tbl
all_runs_tbl = Table(run_rows)
all_nodes_tbl = Table(node_rows)
return all_runs_tbl, all_nodes_tbl

def send(self, manifests: list[Manifest]) -> None:
self.commands = manifests
log_tbl = self.format_result()
runs_tbl, nodes_tbl = self.format_result()

self.db_connector.copy(
log_tbl, self.destination_table, if_exists="append", **self.copy_kwargs
runs_tbl,
self.destination_table_runs,
if_exists="append",
**self.copy_kwargs,
)
self.db_connector.copy(
nodes_tbl,
self.destination_table_nodes,
if_exists="append",
**self.copy_kwargs,
)
8 changes: 8 additions & 0 deletions parsons/utilities/dbt/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,11 @@ def total_gb_processed(self) -> float:
/ 1000000000
)
return result

@property
def total_slot_hours(self) -> float:
"""Total slot hours used by full dbt command run."""
result = (
sum([node.adapter_response.get("slot_ms", 0) for node in self.dbt_manifest]) / 3600000
)
return result

0 comments on commit 72dbeef

Please sign in to comment.