diff --git a/HISTORY.rst b/HISTORY.rst index 426c33e..197d2ba 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,11 @@ History ======= +v0.20.7 (2023-12-14) + +* In merge_spark_df_in_db make separation explicit in the operation condition to avoid ConcurrentAppendException. + + v0.20.6 (2023-12-14) * Add stage_table optional argument to merge_spark_df_in_db ds.utils function. diff --git a/aioradio/ds_utils.py b/aioradio/ds_utils.py index c494b88..b069834 100644 --- a/aioradio/ds_utils.py +++ b/aioradio/ds_utils.py @@ -95,7 +95,7 @@ def does_db_table_exists(name): return exists -def merge_spark_df_in_db(df, target, on, partition_by=None, stage_table=None): +def merge_spark_df_in_db(df, target, on, partition_by=None, stage_table=None, partition_by_values=None): """Convert spark DF to staging table than merge with target table in Databricks.""" @@ -113,6 +113,9 @@ def merge_spark_df_in_db(df, target, on, partition_by=None, stage_table=None): df.write.option("delta.columnMapping.mode", "name").mode('overwrite').partitionBy(partition_by).saveAsTable(stage) on_clause = ' AND '.join(f'{target}.{col} = {stage}.{col}' for col in on) + if partition_by_values is not None: + explicit_separation = ' AND '.join(f'{target}.{col} IN ({str(values)[1:-1]})' for col, values in partition_by_values.items()) + on_clause += f' AND {explicit_separation}' match_clause = ', '.join(f'{target}.{col} = {stage}.{col}' for col in df.columns if col != 'CREATED_DATETIME') try: diff --git a/setup.py b/setup.py index 115d229..6afe104 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ long_description = fileobj.read() setup(name='aioradio', - version='0.20.6', + version='0.20.7', description='Generic asynchronous i/o python utilities for AWS services (SQS, S3, DynamoDB, Secrets Manager), Redis, MSSQL (pyodbc), JIRA and more', long_description=long_description, long_description_content_type="text/markdown",