Skip to content

Commit

Permalink
Create new feature for running and logging dbt commands (#841)
Browse files Browse the repository at this point in the history
* Create new feature for running and logging dbt commands

* Make boto3 & jinja2 requirements open ended to enable compatibility

* Allow for different dbt engines

* Move to utilities folder, add some documentation

* Correct typo in docstring

* Log warnings and skipped models in addition to errored out models

* Fix use of check_env

Prior implementation had the arguments in the wrong order

* Include slackclient in all dbt extras

* Filter out verbose top-level warnings

---------

Co-authored-by: Shauna <[email protected]>
  • Loading branch information
austinweisgrau and shaunagm authored Jan 18, 2024
1 parent e84ac1c commit 7f24b4c
Show file tree
Hide file tree
Showing 3 changed files with 346 additions and 4 deletions.
336 changes: 336 additions & 0 deletions parsons/utilities/dbt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,336 @@
"""Utility for running and logging output from dbt commands
Enable this utility by installing parsons with a dbt extra:
`pip install parsons[dbt-redshift]`
or `pip install parsons[dbt-postgres]`
or `pip install parsons[dbt-snowflake]`
or `pip install parsons[dbt-bigquery]`
To run dbt commands, you will need to have a dbt project directory
somewhere on the local filesystem.
If slack-related arguments or environment variables are not provided,
no log message will be sent to slack.
Example usage:
```
from parsons.utilities.dbt import dbtRunner
dbt_runner = dbtRunner(
commands=['run', 'test'],
dbt_project_directory='/home/ubuntu/code/dbt_project/',
dbt_schema='dbt_dev'
)
dbt_runner.run()
```
"""


import datetime
import json
import logging
import os
import pathlib
import shutil
import subprocess
import time
from typing import List, Literal, Optional

from parsons.notifications.slack import Slack
from parsons.utilities import check_env

logger = logging.getLogger(__name__)


class dbtLogger:
"""Module for aggregating logs between dbt commands and sending to slack."""

_command_times: dict[str, dict[Literal["start", "end"], float]] = {}

def __init__(
self,
slack_channel: Optional[str] = None,
slack_webhook: Optional[str] = None,
slack_api_key: Optional[str] = None,
):
self.start = time.time()
self.log_messages = []
self.error_messages = []
self.warn_messages = []
self.done_messages = []
self.slack_channel = slack_channel
self.slack_webhook = slack_webhook
self.slack_api_key = slack_api_key

def record_start(self, command: str) -> None:
"""Record start time for command"""
self._command_times[command] = {"start": time.time()}

def record_end(self, command: str) -> None:
"""Record end time for command"""
self._command_times[command]["end"] = time.time()

def seconds_to_time_string(self, seconds: int):
time_str = ""
command_time = time.gmtime(seconds)
if command_time.tm_yday - 1:
time_str += f"{command_time.tm_yday - 1} days, "
if command_time.tm_hour:
time_str += f"{command_time.tm_hour} hours, "
if command_time.tm_min:
time_str += f"{command_time.tm_min} minutes, "
if command_time.tm_sec:
time_str += f"{command_time.tm_sec} seconds"

return time_str

def record_result(
self,
command: str,
error_messages: list[str],
warn_messages: list[str],
skip_messages: list[str],
done_message: str,
):
command_seconds = int(
self._command_times[command]["end"] - self._command_times[command]["start"]
)

log_message = ""
if error_messages:
log_message += ":red_circle:"
status = "Error"
elif warn_messages:
log_message += ":large_orange_circle:"
status = "Warning"
else:
log_message += ":large_green_circle:"
status = "Success"

time_str = self.seconds_to_time_string(command_seconds)
log_message += f"Invoke dbt with `dbt {command}` ({status} in {time_str})"
if done_message:
log_message += f"\n*Summary*: `{done_message}`"

if error_messages:
log_message += "\nError messages:\n```{}```".format(
"\n\n".join(error_messages)
)

if warn_messages:
log_message += "\nWarning messages:\n```{}```".format(
"\n\n".join(warn_messages)
)

if skip_messages:
skips = [
msg.split(" ")[5].split(".")[1]
for msg in skip_messages
if msg.split(" ")[4] == "relation"
]
log_message += "\nSkipped:\n```{}```".format(", ".join(skips))

self.log_messages.append(log_message)

def send_to_slack(self) -> None:
"""Log final result to logger and send to slack."""
end_time = time.time()
duration_seconds = int(end_time - self.start)
duration_time_str = self.seconds_to_time_string(duration_seconds)

full_log_message = ""
if any(":red_circle:" in log_message for log_message in self.log_messages):
status = "failed"
full_log_message += ":red_circle:"
else:
status = "succeeded"
full_log_message += ":large_green_circle:"

now = datetime.datetime.today().strftime("%Y-%m-%d %H:%M")
full_log_message += f"*dbt run {status} - {now}*"
full_log_message += f"\n*Duration:* {duration_time_str}\n\n"
full_log_message += "\n".join(self.log_messages)

if self.slack_webhook:
Slack.message(self.slack_channel, full_log_message, self.slack_webhook)
elif self.slack_api_key:
Slack(self.slack_api_key).message(self.slack_channel, full_log_message)

def log_results(self, command_str: str, stdout: str, stderr: str) -> None:
"""Parsed logs from dbt command and log to logger and slack."""

message = ""
parsed_rows = []

for output in (stdout, stderr):
for row in output.split("\n"):
if not row:
continue
try:
parsed_row = json.loads(row)
parsed_rows.append(parsed_row)
except json.JSONDecodeError:
message += row + "\n"

log_messages = []
error_messages = []
warn_messages = []
skip_messages = []

for row in parsed_rows:
if not row["info"]["msg"]:
continue

log_message = row["info"]["msg"]
log_messages.append(log_message)

if row["info"]["level"] == "error":
logger.error(log_message)
error_messages.append(log_message)
# Capture model/test warnings but exclude verbose top-level warnings
elif (
row["info"]["level"] == "warn" and "[WARNING]" not in row["info"]["msg"]
):
logger.warning(log_message)
warn_messages.append(log_message)
elif "SKIP " in row["info"]["msg"]:
logger.warning(log_message)
skip_messages.append(log_message)
else:
logger.info(log_message)

done_messages = [i for i in log_messages if "Done. PASS" in i]
if done_messages:
done_message = done_messages[0]
else:
done_message = ""

self.record_result(
command_str, error_messages, warn_messages, skip_messages, done_message
)


class dbtRunner:
def __init__(
self,
commands: List[str],
dbt_project_directory: pathlib.Path,
dbt_schema: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
host: Optional[str] = None,
port: Optional[int] = None,
db: Optional[str] = None,
raise_errors: bool = False,
slack_channel: Optional[str] = None,
slack_webhook: Optional[str] = None,
slack_api_key: Optional[str] = None,
) -> None:
"""Initialize dbtRunner client with commands, credentials, and options.
`Args:`
commands: List[str]
A list of strings, each string a dbt command with
options separated by spaces.
e.g. ["seed", "build -s models/staging", "test"]
dbt_project_directory: pathlib.Path
The path to find the dbt project, as a working
directory for dbt commands to run
dbt_schema: Optional[str]
Populates an environment variable DBT_SCHEMA
which can be used in your dbt profile.
Not required if the `DBT_SCHEMA` environment variable set.
username: Optional[str]
Populates an environment variable REDSHIFT_USERNAME
which can be used in your dbt profile
Not requried if the `REDSHIFT_USERNAME`
environment variable set.
password: Optional[str]
Populates an environment variable REDSHIFT_PASSWORD
which can be used in your dbt profile
Not requried if the `REDSHIFT_PASSWORD`
environment variable set.
host: Optional[str]
Populates an environment variable REDSHIFT_HOST
which can be used in your dbt profile
Not requried if the `REDSHIFT_HOST`
environment variable set.
port: Optional[str]
Populates an environment variable REDSHIFT_PORT
which can be used in your dbt profile
Not requried if the `REDSHIFT_PORT`
environment variable set.
db: Optional[str]
Populates an environment variable REDSHIFT_DB
which can be used in your dbt profile
Not requried if the `REDSHIFT_DB`
environment variable set.
raise_errors: bool
Default value: False
A flag indicating whether errors encountered by
the dbt command should be raised as exceptions.
slack_channel: Optional[str]
If set, will be used to send log results. Can be set
with environment variable `SLACK_CHANNEL`
slack_webhook: Optional[str]
If set, will be used to send log results. Only one
of slack_webhook or slack_api_key is necessary.
Can be set with environment variable `SLACK_WEBHOOK`
slack_api_key: Optional[str]
If set, will be used to send log results. Only one
of slack_webhook or slack_api_key is necessary.
Can be set with environment variable `SLACK_API_KEY`
"""
self.commands = commands
self.dbt_schema = check_env.check("DBT_SCHEMA", dbt_schema)
self.username = check_env.check("REDSHIFT_USERNAME", username)
self.password = check_env.check("REDSHIFT_PASSWORD", password)
self.host = check_env.check("REDSHIFT_HOST", host)
self.port = check_env.check("REDSHIFT_PORT", port)
self.db = check_env.check("REDSHIFT_DB", db)
self.dbt_project_directory = dbt_project_directory
self.raise_errors = raise_errors
self.dbt_logger = dbtLogger(
slack_channel=slack_channel or os.environ.get("SLACK_CHANNEL"),
slack_webhook=slack_webhook or os.environ.get("SLACK_WEBHOOK"),
slack_api_key=slack_api_key or os.environ.get("SLACK_API_KEY"),
)

def run(self) -> None:
for command in self.commands:
self.dbt_command(command)
self.dbt_logger.send_to_slack()

def dbt_command(self, command: str) -> None:
"""Runs dbt command and logs results after process is completed.
If raise_error is set, this method will raise an error if the dbt
command hits any errors.
"""

self.dbt_logger.record_start(command)
dbt_executable_path = shutil.which("dbt")
commands = [dbt_executable_path, "--log-format", "json"] + command.split(" ")

shell_environment = {
"REDSHIFT_USERNAME": self.username,
"REDSHIFT_PASSWORD": self.password,
"REDSHIFT_HOST": self.host,
"REDSHIFT_PORT": self.port,
"REDSHIFT_DB": self.db,
"DBT_SCHEMA": self.dbt_schema,
}

process = subprocess.run(
commands,
env=shell_environment,
cwd=self.dbt_project_directory,
text=True,
capture_output=True,
)
self.dbt_logger.record_end(command)

self.dbt_logger.log_results(command, process.stdout, process.stderr)

if self.raise_errors:
process.check_returncode()
7 changes: 4 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
requests==2.31.0
petl==1.6.8
boto3==1.17.98
boto3>=1.17.98
boxsdk==2.10.0
civis==1.16.0
slackclient==1.3.0
Expand Down Expand Up @@ -39,10 +39,11 @@ surveygizmo==1.2.3
PyJWT==2.4.0 # Otherwise `import jwt` would refer to python-jwt package
SQLAlchemy==1.3.23
requests_oauthlib==1.3.0
dbt_redshift==1.4.0
bs4==0.0.1

# Stuff for TMC scripts
# TODO Remove when we have a TMC-specific Docker image
selenium==3.141.0
jinja2==3.0.2
us==3.1.1
jinja2>=3.0.2
us==3.1.1
7 changes: 6 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import os
from setuptools import find_packages
from distutils.core import setup

from setuptools import find_packages


def main():
limited_deps = os.environ.get("PARSONS_LIMITED_DEPENDENCIES", "")
Expand All @@ -21,6 +22,10 @@ def main():
"braintree": ["braintree"],
"catalist": ["paramiko"],
"civis": ["civis"],
"dbt-redshift": ["dbt-redshift", "slackclient<2"],
"dbt-bigquery": ["dbt-bigquery", "slackclient<2"],
"dbt-postgres": ["dbt-postgres", "slackclient<2"],
"dbt-snowflake": ["dbt-snowflake", "slackclient<2"],
"facebook": ["joblib", "facebook-business"],
"geocode": ["censusgeocode", "urllib3==1.26.18"],
"github": ["PyGitHub"],
Expand Down

0 comments on commit 7f24b4c

Please sign in to comment.