Skip to content

Commit

Permalink
Merge pull request #120 from nrccua/DS-467-in-merge_spark_df_in_db-ma…
Browse files Browse the repository at this point in the history
…ke-separation-explicit-i

In merge_spark_df_in_db make separation explicit in the operation condition to avoid ConcurrentAppendException
  • Loading branch information
nrccua-timr authored Dec 14, 2023
2 parents b7718ac + 6c6aaf8 commit 124db44
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 2 deletions.
5 changes: 5 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ History
=======


v0.20.7 (2023-12-14)

* In merge_spark_df_in_db make separation explicit in the operation condition to avoid ConcurrentAppendException.


v0.20.6 (2023-12-14)

* Add stage_table optional argument to merge_spark_df_in_db ds.utils function.
Expand Down
5 changes: 4 additions & 1 deletion aioradio/ds_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def does_db_table_exists(name):
return exists


def merge_spark_df_in_db(df, target, on, partition_by=None, stage_table=None):
def merge_spark_df_in_db(df, target, on, partition_by=None, stage_table=None, partition_by_values=None):
"""Convert spark DF to staging table than merge with target table in
Databricks."""

Expand All @@ -113,6 +113,9 @@ def merge_spark_df_in_db(df, target, on, partition_by=None, stage_table=None):
df.write.option("delta.columnMapping.mode", "name").mode('overwrite').partitionBy(partition_by).saveAsTable(stage)

on_clause = ' AND '.join(f'{target}.{col} = {stage}.{col}' for col in on)
if partition_by_values is not None:
explicit_separation = ' AND '.join(f'{target}.{col} IN ({str(values)[1:-1]})' for col, values in partition_by_values.items())
on_clause += f' AND {explicit_separation}'
match_clause = ', '.join(f'{target}.{col} = {stage}.{col}' for col in df.columns if col != 'CREATED_DATETIME')

try:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
long_description = fileobj.read()

setup(name='aioradio',
version='0.20.6',
version='0.20.7',
description='Generic asynchronous i/o python utilities for AWS services (SQS, S3, DynamoDB, Secrets Manager), Redis, MSSQL (pyodbc), JIRA and more',
long_description=long_description,
long_description_content_type="text/markdown",
Expand Down

0 comments on commit 124db44

Please sign in to comment.