Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added '--rows-per-insert' to 'upload-source-freshness' operation and changed default to 25. #1379

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions elementary/operations/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,20 @@ def run_operation():
default=Config.DEFAULT_TARGET_PATH,
help="Absolute target path for saving edr files such as logs and reports",
)
@click.option(
"--rows-per-insert",
type=int,
default=25,
help="Amount of rows to insert per insert statement.",
)
@click.pass_context
def upload_source_freshness(ctx, **conf):
def upload_source_freshness(ctx, rows_per_insert: int, **conf):
"""
Upload the results of `dbt source freshness` to Elementary's schema.
This is required in order to monitor and get alerts on source freshness failures.
"""
config = Config(**conf)
anonymous_tracking = AnonymousCommandLineTracking(config)
anonymous_tracking.track_cli_start(_MODULE_NAME, None, ctx.command.name)
UploadSourceFreshnessOperation(config).run()
UploadSourceFreshnessOperation(config).run(rows_per_insert)
anonymous_tracking.track_cli_end(_MODULE_NAME, None, ctx.command.name)
8 changes: 4 additions & 4 deletions elementary/operations/upload_source_freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ class UploadSourceFreshnessOperation:
def __init__(self, config: Config):
self.config = config

def run(self):
def run(self, rows_per_insert: int):
if not self.config.project_dir:
raise click.ClickException(
"Path to dbt project is missing. Please run the command with `--project-dir <DBT_PROJECT_DIR>`."
)
sources_file_contents = self.get_sources_file_contents()
results = sources_file_contents["results"]
metadata = sources_file_contents["metadata"]
self.upload_results(results, metadata)
self.upload_results(results, metadata, rows_per_insert)
click.echo("Uploaded source freshness results successfully.")

def get_sources_file_contents(self) -> dict:
Expand All @@ -35,7 +35,7 @@ def get_sources_file_contents(self) -> dict:
)
return json.loads(source_path.read_text())

def upload_results(self, results: dict, metadata: dict):
def upload_results(self, results: dict, metadata: dict, rows_per_insert: int):
dbt_runner = DbtRunner(
dbt_project_utils.CLI_DBT_PROJECT_PATH,
self.config.profiles_dir,
Expand All @@ -57,7 +57,7 @@ def upload_results(self, results: dict, metadata: dict):
f"Source freshness for invocation id {invocation_id} were already uploaded."
)

chunk_size = 100
chunk_size = rows_per_insert
chunk_list = list(range(0, len(results), chunk_size))
upload_with_progress_bar = alive_it(
chunk_list, title="Uploading source freshness results"
Expand Down