From 9f761c488c3afb108038edb4b504f0842bf449b7 Mon Sep 17 00:00:00 2001 From: Tim Reichard Date: Wed, 29 Nov 2023 09:13:17 -0600 Subject: [PATCH] Add Databricks functions to read/write constants for Data Science --- HISTORY.rst | 5 +++++ aioradio/ds_utils.py | 29 +++++++++++++++++++++++++++++ aioradio/requirements.txt | 8 ++++---- setup.py | 2 +- 4 files changed, 39 insertions(+), 5 deletions(-) diff --git a/HISTORY.rst b/HISTORY.rst index 6fbdca7..d7b3866 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,11 @@ History ======= +v0.20.3 (2023-11-29) + +* Add Databricks functions to read/write constants for Data Science to use across projects. + + v0.20.2 (2023-11-16) * Hard-code 'us-east-1' and 'us-east-2' as regions in SQS AwsServiceManager. diff --git a/aioradio/ds_utils.py b/aioradio/ds_utils.py index 7897db2..0b02074 100644 --- a/aioradio/ds_utils.py +++ b/aioradio/ds_utils.py @@ -145,6 +145,35 @@ def merge_pandas_df_in_db(df, target, on, partition_by=None): raise +def write_constants_to_db(df, df_library='pandas'): + """Write constants defined in a dataframe to databricks. + + Each constant value is defined as a JSON string. See schema of + dsc_prd.student_data.constants. + """ + + table = f"{db_catalog('prod')}.student_data.constants" + + if df_library.lower() == 'pandas': + merge_spark_df_in_db(spark.createDataFrame(df), table, on=['key']) + elif df_library.lower() == 'polars': + merge_spark_df_in_db(spark.createDataFrame(df.to_pandas()), table, on=['key']) + elif df_library.lower() == 'spark': + merge_spark_df_in_db(df, table, on=['key']) + else: + logger.info(f'Unknown dataframe library {df_library}') + + +def read_constants_from_db(constants_list=None): + """Read all constants or pass in a list to filter constants.""" + + table = f"{db_catalog('prod')}.student_data.constants" + constants = '*' if constants_list is None else ','.join(constants_list) + mapping = {i['key']: json.loads(i['value']) for i in sql_to_polars_df(f'SELECT {constants} FROM {table}').to_dicts()} + + return mapping + + ################################## DataFrame functions #################################### diff --git a/aioradio/requirements.txt b/aioradio/requirements.txt index a2b8376..f100ad8 100644 --- a/aioradio/requirements.txt +++ b/aioradio/requirements.txt @@ -3,7 +3,7 @@ aiojobs==1.2.0 backoff==2.2.1 boto3==1.28.17 botocore==1.31.17 -cython==3.0.5 +cython==3.0.6 ddtrace==1.11.2 dominodatalab==1.2.4 fakeredis==2.20.0 @@ -11,13 +11,13 @@ faust-cchardet==2.1.19 flask==2.1.2 flask-cors==3.0.10 haversine==2.8.0 -httpx==0.25.1 +httpx==0.25.2 mandrill==1.0.60 moto==3.1.18 openpyxl==3.0.10 orjson==3.8.10 pandas==2.1.3 -polars==0.19.13 +polars==0.19.17 pre-commit==3.5.0 psycopg2-binary==2.9.9 pyarrow==13.0.0 @@ -32,4 +32,4 @@ python-json-logger==2.0.7 redis==5.0.1 twine==4.0.2 werkzeug==2.1.2 -wheel==0.41.3 +wheel==0.42.0 diff --git a/setup.py b/setup.py index 9e2f062..c815fb5 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ long_description = fileobj.read() setup(name='aioradio', - version='0.20.2', + version='0.20.3', description='Generic asynchronous i/o python utilities for AWS services (SQS, S3, DynamoDB, Secrets Manager), Redis, MSSQL (pyodbc), JIRA and more', long_description=long_description, long_description_content_type="text/markdown",