diff --git a/elementary/operations/cli.py b/elementary/operations/cli.py index 84d905703..fa0d9d8e7 100644 --- a/elementary/operations/cli.py +++ b/elementary/operations/cli.py @@ -45,8 +45,14 @@ def run_operation(): default=Config.DEFAULT_TARGET_PATH, help="Absolute target path for saving edr files such as logs and reports", ) +@click.option( + "--rows-per-insert", + type=int, + default=25, + help="Amount of rows to insert per insert statement.", +) @click.pass_context -def upload_source_freshness(ctx, **conf): +def upload_source_freshness(ctx, rows_per_insert: int, **conf): """ Upload the results of `dbt source freshness` to Elementary's schema. This is required in order to monitor and get alerts on source freshness failures. @@ -54,5 +60,5 @@ def upload_source_freshness(ctx, **conf): config = Config(**conf) anonymous_tracking = AnonymousCommandLineTracking(config) anonymous_tracking.track_cli_start(_MODULE_NAME, None, ctx.command.name) - UploadSourceFreshnessOperation(config).run() + UploadSourceFreshnessOperation(config).run(rows_per_insert) anonymous_tracking.track_cli_end(_MODULE_NAME, None, ctx.command.name) diff --git a/elementary/operations/upload_source_freshness.py b/elementary/operations/upload_source_freshness.py index 106cc6b12..92d566c06 100644 --- a/elementary/operations/upload_source_freshness.py +++ b/elementary/operations/upload_source_freshness.py @@ -15,7 +15,7 @@ class UploadSourceFreshnessOperation: def __init__(self, config: Config): self.config = config - def run(self): + def run(self, rows_per_insert: int): if not self.config.project_dir: raise click.ClickException( "Path to dbt project is missing. Please run the command with `--project-dir `." @@ -23,7 +23,7 @@ def run(self): sources_file_contents = self.get_sources_file_contents() results = sources_file_contents["results"] metadata = sources_file_contents["metadata"] - self.upload_results(results, metadata) + self.upload_results(results, metadata, rows_per_insert) click.echo("Uploaded source freshness results successfully.") def get_sources_file_contents(self) -> dict: @@ -35,7 +35,7 @@ def get_sources_file_contents(self) -> dict: ) return json.loads(source_path.read_text()) - def upload_results(self, results: dict, metadata: dict): + def upload_results(self, results: dict, metadata: dict, rows_per_insert: int): dbt_runner = DbtRunner( dbt_project_utils.CLI_DBT_PROJECT_PATH, self.config.profiles_dir, @@ -57,7 +57,7 @@ def upload_results(self, results: dict, metadata: dict): f"Source freshness for invocation id {invocation_id} were already uploaded." ) - chunk_size = 100 + chunk_size = rows_per_insert chunk_list = list(range(0, len(results), chunk_size)) upload_with_progress_bar = alive_it( chunk_list, title="Uploading source freshness results"