Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create new feature for running and logging dbt commands #841

Merged
merged 10 commits into from
Jan 18, 2024
336 changes: 336 additions & 0 deletions parsons/utilities/dbt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,336 @@
"""Utility for running and logging output from dbt commands

Enable this utility by installing parsons with a dbt extra:
`pip install parsons[dbt-redshift]`
or `pip install parsons[dbt-postgres]`
or `pip install parsons[dbt-snowflake]`
or `pip install parsons[dbt-bigquery]`

To run dbt commands, you will need to have a dbt project directory
somewhere on the local filesystem.

If slack-related arguments or environment variables are not provided,
no log message will be sent to slack.

Example usage:
```
from parsons.utilities.dbt import dbtRunner

dbt_runner = dbtRunner(
commands=['run', 'test'],
dbt_project_directory='/home/ubuntu/code/dbt_project/',
dbt_schema='dbt_dev'
)
dbt_runner.run()
```
"""


import datetime
import json
import logging
import os
import pathlib
import shutil
import subprocess
import time
from typing import List, Literal, Optional

from parsons.notifications.slack import Slack
from parsons.utilities import check_env

logger = logging.getLogger(__name__)


class dbtLogger:
"""Module for aggregating logs between dbt commands and sending to slack."""

_command_times: dict[str, dict[Literal["start", "end"], float]] = {}

def __init__(
self,
slack_channel: Optional[str] = None,
slack_webhook: Optional[str] = None,
slack_api_key: Optional[str] = None,
):
self.start = time.time()
self.log_messages = []
self.error_messages = []
self.warn_messages = []
self.done_messages = []
self.slack_channel = slack_channel
self.slack_webhook = slack_webhook
self.slack_api_key = slack_api_key

def record_start(self, command: str) -> None:
"""Record start time for command"""
self._command_times[command] = {"start": time.time()}

def record_end(self, command: str) -> None:
"""Record end time for command"""
self._command_times[command]["end"] = time.time()

def seconds_to_time_string(self, seconds: int):
time_str = ""
command_time = time.gmtime(seconds)
if command_time.tm_yday - 1:
time_str += f"{command_time.tm_yday - 1} days, "
if command_time.tm_hour:
time_str += f"{command_time.tm_hour} hours, "
if command_time.tm_min:
time_str += f"{command_time.tm_min} minutes, "
if command_time.tm_sec:
time_str += f"{command_time.tm_sec} seconds"

return time_str

def record_result(
self,
command: str,
error_messages: list[str],
warn_messages: list[str],
skip_messages: list[str],
done_message: str,
):
command_seconds = int(
self._command_times[command]["end"] - self._command_times[command]["start"]
)

log_message = ""
if error_messages:
log_message += ":red_circle:"
status = "Error"
elif warn_messages:
log_message += ":large_orange_circle:"
status = "Warning"
else:
log_message += ":large_green_circle:"
status = "Success"

time_str = self.seconds_to_time_string(command_seconds)
log_message += f"Invoke dbt with `dbt {command}` ({status} in {time_str})"
if done_message:
log_message += f"\n*Summary*: `{done_message}`"

if error_messages:
log_message += "\nError messages:\n```{}```".format(
"\n\n".join(error_messages)
)

if warn_messages:
log_message += "\nWarning messages:\n```{}```".format(
"\n\n".join(warn_messages)
)

if skip_messages:
skips = [
msg.split(" ")[5].split(".")[1]
for msg in skip_messages
if msg.split(" ")[4] == "relation"
]
log_message += "\nSkipped:\n```{}```".format(", ".join(skips))

self.log_messages.append(log_message)

def send_to_slack(self) -> None:
"""Log final result to logger and send to slack."""
end_time = time.time()
duration_seconds = int(end_time - self.start)
duration_time_str = self.seconds_to_time_string(duration_seconds)

full_log_message = ""
if any(":red_circle:" in log_message for log_message in self.log_messages):
status = "failed"
full_log_message += ":red_circle:"
else:
status = "succeeded"
full_log_message += ":large_green_circle:"

now = datetime.datetime.today().strftime("%Y-%m-%d %H:%M")
full_log_message += f"*dbt run {status} - {now}*"
full_log_message += f"\n*Duration:* {duration_time_str}\n\n"
full_log_message += "\n".join(self.log_messages)

if self.slack_webhook:
Slack.message(self.slack_channel, full_log_message, self.slack_webhook)
elif self.slack_api_key:
Slack(self.slack_api_key).message(self.slack_channel, full_log_message)

def log_results(self, command_str: str, stdout: str, stderr: str) -> None:
"""Parsed logs from dbt command and log to logger and slack."""

message = ""
parsed_rows = []

for output in (stdout, stderr):
for row in output.split("\n"):
if not row:
continue
try:
parsed_row = json.loads(row)
parsed_rows.append(parsed_row)
except json.JSONDecodeError:
message += row + "\n"

log_messages = []
error_messages = []
warn_messages = []
skip_messages = []

for row in parsed_rows:
if not row["info"]["msg"]:
continue

log_message = row["info"]["msg"]
log_messages.append(log_message)

if row["info"]["level"] == "error":
logger.error(log_message)
error_messages.append(log_message)
# Capture model/test warnings but exclude verbose top-level warnings
elif (
row["info"]["level"] == "warn" and "[WARNING]" not in row["info"]["msg"]
):
logger.warning(log_message)
warn_messages.append(log_message)
elif "SKIP " in row["info"]["msg"]:
logger.warning(log_message)
skip_messages.append(log_message)
else:
logger.info(log_message)

done_messages = [i for i in log_messages if "Done. PASS" in i]
if done_messages:
done_message = done_messages[0]
else:
done_message = ""

self.record_result(
command_str, error_messages, warn_messages, skip_messages, done_message
)


class dbtRunner:
def __init__(
self,
commands: List[str],
dbt_project_directory: pathlib.Path,
dbt_schema: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
host: Optional[str] = None,
port: Optional[int] = None,
db: Optional[str] = None,
raise_errors: bool = False,
slack_channel: Optional[str] = None,
slack_webhook: Optional[str] = None,
slack_api_key: Optional[str] = None,
) -> None:
"""Initialize dbtRunner client with commands, credentials, and options.

`Args:`
commands: List[str]
A list of strings, each string a dbt command with
options separated by spaces.
e.g. ["seed", "build -s models/staging", "test"]
dbt_project_directory: pathlib.Path
The path to find the dbt project, as a working
directory for dbt commands to run
dbt_schema: Optional[str]
Populates an environment variable DBT_SCHEMA
which can be used in your dbt profile.
Not required if the `DBT_SCHEMA` environment variable set.
username: Optional[str]
Populates an environment variable REDSHIFT_USERNAME
which can be used in your dbt profile
Not requried if the `REDSHIFT_USERNAME`
environment variable set.
password: Optional[str]
Populates an environment variable REDSHIFT_PASSWORD
which can be used in your dbt profile
Not requried if the `REDSHIFT_PASSWORD`
environment variable set.
host: Optional[str]
Populates an environment variable REDSHIFT_HOST
which can be used in your dbt profile
Not requried if the `REDSHIFT_HOST`
environment variable set.
port: Optional[str]
Populates an environment variable REDSHIFT_PORT
which can be used in your dbt profile
Not requried if the `REDSHIFT_PORT`
environment variable set.
db: Optional[str]
Populates an environment variable REDSHIFT_DB
which can be used in your dbt profile
Not requried if the `REDSHIFT_DB`
environment variable set.
raise_errors: bool
Default value: False
A flag indicating whether errors encountered by
the dbt command should be raised as exceptions.
slack_channel: Optional[str]
If set, will be used to send log results. Can be set
with environment variable `SLACK_CHANNEL`
slack_webhook: Optional[str]
If set, will be used to send log results. Only one
of slack_webhook or slack_api_key is necessary.
Can be set with environment variable `SLACK_WEBHOOK`
slack_api_key: Optional[str]
If set, will be used to send log results. Only one
of slack_webhook or slack_api_key is necessary.
Can be set with environment variable `SLACK_API_KEY`
"""
self.commands = commands
self.dbt_schema = check_env.check("DBT_SCHEMA", dbt_schema)
self.username = check_env.check("REDSHIFT_USERNAME", username)
self.password = check_env.check("REDSHIFT_PASSWORD", password)
self.host = check_env.check("REDSHIFT_HOST", host)
self.port = check_env.check("REDSHIFT_PORT", port)
self.db = check_env.check("REDSHIFT_DB", db)
self.dbt_project_directory = dbt_project_directory
self.raise_errors = raise_errors
self.dbt_logger = dbtLogger(
slack_channel=slack_channel or os.environ.get("SLACK_CHANNEL"),
slack_webhook=slack_webhook or os.environ.get("SLACK_WEBHOOK"),
slack_api_key=slack_api_key or os.environ.get("SLACK_API_KEY"),
)

def run(self) -> None:
for command in self.commands:
self.dbt_command(command)
self.dbt_logger.send_to_slack()

def dbt_command(self, command: str) -> None:
"""Runs dbt command and logs results after process is completed.

If raise_error is set, this method will raise an error if the dbt
command hits any errors.
"""

self.dbt_logger.record_start(command)
dbt_executable_path = shutil.which("dbt")
commands = [dbt_executable_path, "--log-format", "json"] + command.split(" ")

shell_environment = {
"REDSHIFT_USERNAME": self.username,
"REDSHIFT_PASSWORD": self.password,
"REDSHIFT_HOST": self.host,
"REDSHIFT_PORT": self.port,
"REDSHIFT_DB": self.db,
"DBT_SCHEMA": self.dbt_schema,
}

process = subprocess.run(
commands,
env=shell_environment,
cwd=self.dbt_project_directory,
text=True,
capture_output=True,
)
self.dbt_logger.record_end(command)

self.dbt_logger.log_results(command, process.stdout, process.stderr)

if self.raise_errors:
process.check_returncode()
5 changes: 3 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
requests==2.31.0
petl==1.6.8
boto3==1.17.98
boto3>=1.17.98
boxsdk==2.10.0
civis==1.16.0
slackclient==1.3.0
Expand Down Expand Up @@ -38,6 +38,7 @@ surveygizmo==1.2.3
PyJWT==2.4.0 # Otherwise `import jwt` would refer to python-jwt package
SQLAlchemy==1.3.23
requests_oauthlib==1.3.0
dbt_redshift==1.4.0

# Testing Requirements
requests-mock==1.5.2
Expand All @@ -50,4 +51,4 @@ pytest-datadir==1.3.0
# Stuff for TMC scripts
# TODO Remove when we have a TMC-specific Docker image
selenium==3.141.0
jinja2==3.0.2
jinja2>=3.0.2
7 changes: 6 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import os
from setuptools import find_packages
from distutils.core import setup

from setuptools import find_packages


def main():
limited_deps = os.environ.get("PARSONS_LIMITED_DEPENDENCIES", "")
Expand All @@ -20,6 +21,10 @@ def main():
"box": ["boxsdk"],
"braintree": ["braintree"],
"civis": ["civis"],
"dbt-redshift": ["dbt-redshift", "slackclient<2"],
"dbt-bigquery": ["dbt-bigquery", "slackclient<2"],
"dbt-postgres": ["dbt-postgres", "slackclient<2"],
"dbt-snowflake": ["dbt-snowflake", "slackclient<2"],
"facebook": ["joblib", "facebook-business"],
"geocode": ["censusgeocode"],
"github": ["PyGitHub"],
Expand Down