Skip to content

Commit

Permalink
Added '--rows-per-insert' to 'upload-source-freshness' operation and …
Browse files Browse the repository at this point in the history
…change default to 25.
  • Loading branch information
elongl committed Jan 17, 2024
1 parent 1ceb6d9 commit 39c6c33
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 6 deletions.
10 changes: 8 additions & 2 deletions elementary/operations/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,20 @@ def run_operation():
default=Config.DEFAULT_TARGET_PATH,
help="Absolute target path for saving edr files such as logs and reports",
)
@click.option(
"--rows-per-insert",
type=int,
default=25,
help="Amount of rows to insert per insert statement.",
)
@click.pass_context
def upload_source_freshness(ctx, **conf):
def upload_source_freshness(ctx, rows_per_insert: int, **conf):
"""
Upload the results of `dbt source freshness` to Elementary's schema.
This is required in order to monitor and get alerts on source freshness failures.
"""
config = Config(**conf)
anonymous_tracking = AnonymousCommandLineTracking(config)
anonymous_tracking.track_cli_start(_MODULE_NAME, None, ctx.command.name)
UploadSourceFreshnessOperation(config).run()
UploadSourceFreshnessOperation(config).run(rows_per_insert)
anonymous_tracking.track_cli_end(_MODULE_NAME, None, ctx.command.name)
8 changes: 4 additions & 4 deletions elementary/operations/upload_source_freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ class UploadSourceFreshnessOperation:
def __init__(self, config: Config):
self.config = config

def run(self):
def run(self, rows_per_insert: int):
if not self.config.project_dir:
raise click.ClickException(
"Path to dbt project is missing. Please run the command with `--project-dir <DBT_PROJECT_DIR>`."
)
sources_file_contents = self.get_sources_file_contents()
results = sources_file_contents["results"]
metadata = sources_file_contents["metadata"]
self.upload_results(results, metadata)
self.upload_results(results, metadata, rows_per_insert)
click.echo("Uploaded source freshness results successfully.")

def get_sources_file_contents(self) -> dict:
Expand All @@ -35,7 +35,7 @@ def get_sources_file_contents(self) -> dict:
)
return json.loads(source_path.read_text())

def upload_results(self, results: dict, metadata: dict):
def upload_results(self, results: dict, metadata: dict, rows_per_insert: int):
dbt_runner = DbtRunner(
dbt_project_utils.CLI_DBT_PROJECT_PATH,
self.config.profiles_dir,
Expand All @@ -57,7 +57,7 @@ def upload_results(self, results: dict, metadata: dict):
f"Source freshness for invocation id {invocation_id} were already uploaded."
)

chunk_size = 100
chunk_size = rows_per_insert
chunk_list = list(range(0, len(results), chunk_size))
upload_with_progress_bar = alive_it(
chunk_list, title="Uploading source freshness results"
Expand Down

0 comments on commit 39c6c33

Please sign in to comment.