diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index fec1b1f..b52375b 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -2,7 +2,7 @@ default_language_version: python: python3.11 repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.4.0 + rev: v4.5.0 hooks: - id: check-added-large-files - id: check-ast diff --git a/.pylintrc b/.pylintrc index 417d671..5d310c5 100644 --- a/.pylintrc +++ b/.pylintrc @@ -1,2 +1,2 @@ [FORMAT] -max-line-length=140 +max-line-length=150 diff --git a/HISTORY.rst b/HISTORY.rst index 797950a..df05343 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,21 @@ History ======= +v0.19.4 (2023-11-03) + +* Add common data science functions to ds_utils.py. +* Update cython==3.0.5. +* Update httpx==0.25.1. +* Update pandas==2.1.2. +* Update pylint==3.0.2. +* Update pytest==7.4.3. +* Update wheel==0.41.3. +* Add python library haversine==2.8.0. +* Add python library polars==0.19.12. +* Add python library pyarrow==13.0.0. +* Add python library pyspark==3.4.1. + + v0.19.3 (2023-10-20) * Add TrustServerCertificate option in sqlserver connection string, enabling use of driver {ODBC Driver 18 for SQL Server}. diff --git a/aioradio/ds_utils.py b/aioradio/ds_utils.py index 2de6693..7897db2 100644 --- a/aioradio/ds_utils.py +++ b/aioradio/ds_utils.py @@ -1,9 +1,11 @@ """utils.py.""" +# pylint: disable=broad-except # pylint: disable=import-outside-toplevel # pylint: disable=invalid-name # pylint: disable=logging-fstring-interpolation # pylint: disable=no-member +# pylint: disable=protected-access # pylint: disable=too-many-arguments # pylint: disable=too-many-boolean-expressions # pylint: disable=unnecessary-comprehension @@ -16,12 +18,18 @@ import os import pickle import warnings +from math import cos, degrees, radians, sin from platform import system from tempfile import NamedTemporaryFile from time import sleep, time import boto3 +import numpy as np +import pyarrow as pa import pandas as pd +import polars as pl +from haversine import haversine, Unit +from pyspark.sql import SparkSession from smb.SMBConnection import SMBConnection warnings.simplefilter(action='ignore', category=UserWarning) @@ -36,6 +44,158 @@ c_handler.setFormatter(c_format) logger.addHandler(c_handler) +spark = SparkSession.builder.getOrCreate() + + +############################### Databricks functions ################################ + + +def db_catalog(env): + """Return the DataBricks catalog based on the passed in environment.""" + + catalog = '' + if env == 'sandbox': + catalog = 'dsc_sbx' + elif env == 'prod': + catalog = 'dsc_prd' + + return catalog + + +def sql_to_polars_df(sql): + """Get polars DataFrame from SQL query results.""" + + return pl.from_arrow(pa.Table.from_batches(spark.sql(sql)._collect_as_arrow())) + + +def does_db_table_exists(name): + """Check if delta table exists in databricks.""" + + exists = False + try: + spark.sql(f"describe formatted {name}") + exists = True + except Exception: + pass + + return exists + + +def merge_spark_df_in_db(df, target, on, partition_by=None): + """Convert spark DF to staging table than merge with target table in + Databricks.""" + + stage = f"{target}_stage" + + if not does_db_table_exists(target): + if partition_by is None: + df.write.option("delta.columnMapping.mode", "name").saveAsTable(target) + else: + df.write.option("delta.columnMapping.mode", "name").partitionBy(partition_by).saveAsTable(target) + else: + if partition_by is None: + df.write.option("delta.columnMapping.mode", "name").mode('overwrite').saveAsTable(stage) + else: + df.write.option("delta.columnMapping.mode", "name").mode('overwrite').partitionBy(partition_by).saveAsTable(stage) + + on_clause = ' AND '.join(f'{target}.{col} = {stage}.{col}' for col in on) + match_clause = ', '.join(f'{target}.{col} = {stage}.{col}' for col in df.columns if col != 'CREATED_DATETIME') + + try: + spark.sql(f'MERGE INTO {target} USING {stage} ON {on_clause} WHEN MATCHED THEN UPDATE SET {match_clause} WHEN NOT MATCHED THEN INSERT *') + spark.sql(f'DROP TABLE {stage}') + except Exception: + spark.sql(f'DROP TABLE {stage}') + raise + + +def merge_pandas_df_in_db(df, target, on, partition_by=None): + """Convert pandas DF to staging table than merge with target table in + Databricks.""" + + stage = f"{target}_stage" + + for col, dtype in df.dtypes.apply(lambda x: x.name).to_dict().items(): + if dtype == 'object': + df[col] = df[col].astype('string[pyarrow]') + df[col].mask(df[col].isna(), '', inplace=True) + elif dtype == 'string': + # pyspark will throw an exception if strings are set to so convert to empty string + df[col].mask(df[col].isna(), '', inplace=True) + + if not does_db_table_exists(target): + if partition_by is None: + spark.createDataFrame(df).write.option("delta.columnMapping.mode", "name").saveAsTable(target) + else: + spark.createDataFrame(df).write.option("delta.columnMapping.mode", "name").partitionBy(partition_by).saveAsTable(target) + else: + if partition_by is None: + spark.createDataFrame(df).write.option("delta.columnMapping.mode", "name").mode('overwrite').saveAsTable(stage) + else: + spark.createDataFrame(df).write.option("delta.columnMapping.mode", "name").mode('overwrite').partitionBy(partition_by).saveAsTable(stage) + + on_clause = ' AND '.join(f'{target}.{col} = {stage}.{col}' for col in on) + match_clause = ', '.join(f'{target}.{col} = {stage}.{col}' for col in df.columns if col != 'CREATED_DATETIME') + + try: + spark.sql(f'MERGE INTO {target} USING {stage} ON {on_clause} WHEN MATCHED THEN UPDATE SET {match_clause} WHEN NOT MATCHED THEN INSERT *') + spark.sql(f'DROP TABLE {stage}') + except Exception: + spark.sql(f'DROP TABLE {stage}') + raise + + +################################## DataFrame functions #################################### + + +def convert_pyspark_dtypes_to_pandas(df): + """The pyspark toPandas function converts strings to objects. + + This function takes the resulting df and converts the object dtypes + to string[pyarrow], then it converts empty strings to pd.NA. + """ + + for col, dtype in df.dtypes.apply(lambda x: x.name).to_dict().items(): + + if dtype == 'object': + df[col] = df[col].astype('string[pyarrow]') + df[col].mask(df[col] == '', pd.NA, inplace=True) + elif (dtype.startswith('int') or dtype.startswith('float')) and not dtype.endswith('[pyarrow]'): + df[col] = df[col].astype(f'{dtype}[pyarrow]') + elif 'string' in dtype: + df[col] = df[col].astype('string[pyarrow]') + df[col].mask(df[col] == '', pd.NA, inplace=True) + + return df + + +def remove_pyarrow_dtypes(df): + """Switch pyarrow dtype to non pyarrow dtype (int8['pyarrow'] to int8)""" + + df = df.astype({k: v.replace('[pyarrow]', '') for k, v in df.dtypes.apply(lambda x: x.name).to_dict().items()}) + return df + + +################################## AWS functions #################################### + + +def get_boto3_session(env): + """Get Boto3 Session.""" + + aws_profile = os.getenv('AWS_PROFILE') + + try: + if aws_profile is not None: + del os.environ['AWS_PROFILE'] + aws_creds = get_aws_creds(env) + boto3_session = boto3.Session(**aws_creds) + except ValueError: + if aws_profile is not None: + os.environ["AWS_PROFILE"] = aws_profile + boto3_session = boto3.Session() + + return boto3_session + def file_to_s3(s3_client, local_filepath, s3_bucket, key): """Write file to s3.""" @@ -82,20 +242,6 @@ def delete_s3_object(s3_client, bucket, s3_prefix): return s3_client.delete_object(Bucket=bucket, Key=s3_prefix) -def get_fice_institutions_map(db_config): - """Get mapping of fice to college from mssql table.""" - - from aioradio.pyodbc import pyodbc_query_fetchall - - result = {} - with DbInfo(db_config) as target_db: - query = "SELECT FICE, Institution FROM EESFileuploadAssignments WHERE FileCategory = 'EnrollmentLens'" - rows = pyodbc_query_fetchall(conn=target_db.conn, query=query) - result = {fice: institution for fice, institution in rows} - - return result - - def bytes_to_s3(s3_client, s3_bucket, key, body): """Write data in bytes to s3.""" @@ -185,24 +331,6 @@ def get_s3_pickle_to_object(s3_client, s3_bucket, key): return data -def get_ftp_connection(secret_id, port=139, is_direct_tcp=False, env='sandbox'): - """Get SMB Connection.""" - - secret_client = get_boto3_session(env).client("secretsmanager", region_name='us-east-1') - creds = json.loads(secret_client.get_secret_value(SecretId=secret_id)['SecretString']) - conn = SMBConnection( - creds['user'], - creds['password'], - secret_id, - creds['server'], - use_ntlm_v2=True, - is_direct_tcp=is_direct_tcp - ) - conn.connect(creds['server'], port) - - return conn - - def get_aws_creds(env): """Get AWS credentials from environment variables.""" @@ -223,6 +351,79 @@ def get_aws_creds(env): return aws_creds +get_s3_csv_to_df = get_large_s3_csv_to_df +get_s3_parquet_to_df = get_large_s3_parquet_to_df + + +################################# Misc functions #################################### + + +def bearing(slat, elat, slon, elon): + """Bearing function.""" + + slat, elat, slon, elon = radians(slat), radians(elat), radians(slon), radians(elon) + var_dl = elon - slon + var_x = cos(elat) * sin(var_dl) + var_y = cos(slat) * sin(elat) - sin(slat) * cos(elat) * cos(var_dl) + return (degrees(np.arctan2(var_x, var_y)) + 360) % 360 + + +def apply_bearing(dataframe, latitude, longitude): + """Apply bearing function on split dataframe.""" + + return dataframe.apply(lambda x: bearing(x.LATITUDE, latitude, x.LONGITUDE, longitude), axis=1) + + +def apply_haversine(dataframe, latitude, longitude): + """Apply haversine function on split dataframe.""" + + return dataframe.apply(lambda x: haversine((x.LATITUDE, x.LONGITUDE), (latitude, longitude), unit=Unit.MILES), axis=1) + + +def logit(x, a, b, c, d): + """Logit function.""" + + return a / (1 + np.exp(-c * (x - d))) + b + + +def apply_logit(dataframe, a, b, c, d): + """Apply logit function on split dataframe.""" + + return dataframe.apply(lambda x: logit(x, a, b, c, d)) + + +def get_fice_institutions_map(db_config): + """Get mapping of fice to college from mssql table.""" + + from aioradio.pyodbc import pyodbc_query_fetchall + + result = {} + with DbInfo(db_config) as target_db: + query = "SELECT FICE, Institution FROM EESFileuploadAssignments WHERE FileCategory = 'EnrollmentLens'" + rows = pyodbc_query_fetchall(conn=target_db.conn, query=query) + result = {fice: institution for fice, institution in rows} + + return result + + +def get_ftp_connection(secret_id, port=139, is_direct_tcp=False, env='sandbox'): + """Get SMB Connection.""" + + secret_client = get_boto3_session(env).client("secretsmanager", region_name='us-east-1') + creds = json.loads(secret_client.get_secret_value(SecretId=secret_id)['SecretString']) + conn = SMBConnection( + creds['user'], + creds['password'], + secret_id, + creds['server'], + use_ntlm_v2=True, + is_direct_tcp=is_direct_tcp + ) + conn.connect(creds['server'], port) + + return conn + + def monitor_domino_run(domino, run_id, sleep_time=10): """Monitor domino job run and return True/False depending if job was successful.""" @@ -241,24 +442,6 @@ def monitor_domino_run(domino, run_id, sleep_time=10): return status -def get_boto3_session(env): - """Get Boto3 Session.""" - - aws_profile = os.getenv('AWS_PROFILE') - - try: - if aws_profile is not None: - del os.environ['AWS_PROFILE'] - aws_creds = get_aws_creds(env) - boto3_session = boto3.Session(**aws_creds) - except ValueError: - if aws_profile is not None: - os.environ["AWS_PROFILE"] = aws_profile - boto3_session = boto3.Session() - - return boto3_session - - def get_domino_connection(secret_id, project, host, env='sandbox'): """Get domino connection.""" @@ -268,6 +451,9 @@ def get_domino_connection(secret_id, project, host, env='sandbox'): return Domino(project=project, api_key=api_key, host=host) +######################## Postgres or MSSQL Connection Classes ####################### + + class DB_CONNECT(): """[Class for database connection] diff --git a/aioradio/requirements.txt b/aioradio/requirements.txt index 0556a83..aedc1af 100644 --- a/aioradio/requirements.txt +++ b/aioradio/requirements.txt @@ -3,29 +3,33 @@ aiojobs==1.2.0 backoff==2.2.1 boto3==1.28.17 botocore==1.31.17 -cython==3.0.4 +cython==3.0.5 ddtrace==1.11.2 dominodatalab==1.2.4 fakeredis==1.10.1 faust-cchardet==2.1.19 flask==2.1.2 flask-cors==3.0.10 -httpx==0.25.0 +haversine==2.8.0 +httpx==0.25.1 mandrill==1.0.60 moto==3.1.18 openpyxl==3.0.10 orjson==3.8.10 -pandas==2.1.1 +pandas==2.1.2 +polars==0.19.12 pre-commit==3.5.0 psycopg2-binary==2.9.9 -pylint==3.0.1 +pyarrow==13.0.0 +pylint==3.0.2 pyodbc==4.0.39 --no-binary=pyodbc pysmb==1.2.9.1 -pytest==7.4.2 +pyspark==3.4.1 +pytest==7.4.3 pytest-asyncio==0.21.1 pytest-cov==4.1.0 python-json-logger==2.0.7 redis==3.5.3 twine==4.0.2 werkzeug==2.1.2 -wheel==0.41.2 +wheel==0.41.3 diff --git a/setup.py b/setup.py index e0386fb..490f86a 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ long_description = fileobj.read() setup(name='aioradio', - version='0.19.3', + version='0.19.4', description='Generic asynchronous i/o python utilities for AWS services (SQS, S3, DynamoDB, Secrets Manager), Redis, MSSQL (pyodbc), JIRA and more', long_description=long_description, long_description_content_type="text/markdown", @@ -26,17 +26,21 @@ 'backoff>=2.1.2', 'botocore==1.31.17', 'boto3==1.28.17', - 'faust-cchardet>=2.1.18', 'ddtrace>=0.60.1', + 'faust-cchardet>=2.1.18', 'fakeredis>=1.7.1', + 'haversine>=2.8.0', 'httpx>=0.23.0', 'mandrill>=1.0.60', 'numpy>=1.19', 'openpyxl==3.0.10', 'orjson>=3.6.8', 'pandas>=1.3.5', + 'polars>=0.19.12', + 'pyarrow>=13.0.0', 'pysmb>=1.2.7', 'python-json-logger>=2.0.2', + 'pyspark>=3.4.1', 'redis==3.5.3' ], include_package_data=True,