From 592c02880a78b604ab0a60c75bffc3c535874968 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Martins?= Date: Fri, 1 Dec 2023 14:10:55 -0800 Subject: [PATCH] Task execute_dbt created --- pipelines/rj_sms/tasks.py | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/pipelines/rj_sms/tasks.py b/pipelines/rj_sms/tasks.py index 82d16b98f..a57ce6864 100644 --- a/pipelines/rj_sms/tasks.py +++ b/pipelines/rj_sms/tasks.py @@ -20,6 +20,8 @@ import google.oauth2.id_token import google.auth.transport.requests from azure.storage.blob import BlobServiceClient +from dbt.cli.main import dbtRunner, dbtRunnerResult +import git from prefect import task from pipelines.utils.utils import ( log, @@ -700,3 +702,35 @@ def upload_to_datalake( except Exception as e: log(f"An error occurred: {e}", level="error") + +@task +def execute_dbt( + command: str, + model: str = '' +): + """ + Download repository and execute commands in DBT. + + Args: + command (str): Command to be executed by DBT. Can be "run" or "build". + model (str): Name of model. Can be empty. + """ + # Repository download + repo_url = "https://github.com/prefeitura-rio/queries-rj-sms.git" + path = "pipelines/rj_sms/github/queries-rj-sms/" + try: + if os.path.exists(path): + shutil.rmtree(path, ignore_errors=True) + repo = git.Repo.clone_from(repo_url, path) + log(f"Cloned repository in {path}") + #Execute DBT + dbt = dbtRunner() + if not model: + cli_args = [command, "--profiles-dir", path, "--project-dir", path] + else: + cli_args = [command, "--profiles-dir", path, "--project-dir", path, "--models", model] + res: dbtRunnerResult = dbt.invoke(cli_args) + for r in res.result: + log(f"{r.node.name}: {r.status}") + except git.GitCommandError as e: + log(f"Error when cloning repository: {e}") \ No newline at end of file