From ae35b401bec8d8ad5660e1268b4b5fc41507bb78 Mon Sep 17 00:00:00 2001 From: Tim Reichard Date: Fri, 5 Apr 2024 16:54:35 -0500 Subject: [PATCH] Improve logging spark merge query results using logger --- HISTORY.rst | 5 +++++ aioradio/ds_utils.py | 8 ++++++-- setup.py | 2 +- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/HISTORY.rst b/HISTORY.rst index 3116117..a876dbb 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,11 @@ History ======= +v0.20.17 (2024-04-05) + +* Improve logging spark merge query results using logger instead of show. + + v0.20.16 (2024-03-19) * Remove databricks session spark object and use original method to convert spark to polars df. diff --git a/aioradio/ds_utils.py b/aioradio/ds_utils.py index a9ee92b..ce97d39 100644 --- a/aioradio/ds_utils.py +++ b/aioradio/ds_utils.py @@ -124,7 +124,9 @@ def merge_spark_df_in_db(df, target, on, partition_by=None, stage_table=None, pa match_clause = ', '.join(f'{target}.{col} = {stage}.{col}' for col in df.columns if col != 'CREATED_DATETIME') try: - spark.sql(f'MERGE INTO {target} USING {stage} ON {on_clause} WHEN MATCHED THEN UPDATE SET {match_clause} WHEN NOT MATCHED THEN INSERT *').show() + sql = f'MERGE INTO {target} USING {stage} ON {on_clause} WHEN MATCHED THEN UPDATE SET {match_clause} WHEN NOT MATCHED THEN INSERT *' + stats = spark.sql(sql).toPandas() + logger.info(f"New records: {stats['num_inserted_rows'][0]:,} | Updated records: {stats['num_updated_rows'][0]:,}") spark.sql(f'DROP TABLE {stage}') except Exception: spark.sql(f'DROP TABLE {stage}') @@ -160,7 +162,9 @@ def merge_pandas_df_in_db(df, target, on, partition_by=None, stage_table=None): match_clause = ', '.join(f'{target}.{col} = {stage}.{col}' for col in df.columns if col != 'CREATED_DATETIME') try: - spark.sql(f'MERGE INTO {target} USING {stage} ON {on_clause} WHEN MATCHED THEN UPDATE SET {match_clause} WHEN NOT MATCHED THEN INSERT *').show() + sql = f'MERGE INTO {target} USING {stage} ON {on_clause} WHEN MATCHED THEN UPDATE SET {match_clause} WHEN NOT MATCHED THEN INSERT *' + stats = spark.sql(sql).toPandas() + logger.info(f"New records: {stats['num_inserted_rows'][0]:,} | Updated records: {stats['num_updated_rows'][0]:,}") spark.sql(f'DROP TABLE {stage}') except Exception: spark.sql(f'DROP TABLE {stage}') diff --git a/setup.py b/setup.py index 2a3cfd9..618ec36 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ long_description = fileobj.read() setup(name='aioradio', - version='0.20.16', + version='0.20.17', description='Generic asynchronous i/o python utilities for AWS services (SQS, S3, DynamoDB, Secrets Manager), Redis, MSSQL (pyodbc), JIRA and more', long_description=long_description, long_description_content_type="text/markdown",