From 661e1ce7f90aaa79de0d32ac43f013291df931d3 Mon Sep 17 00:00:00 2001 From: Tim Reichard Date: Thu, 29 Feb 2024 15:10:13 -0600 Subject: [PATCH] add promote_model_to_production func and update python to 3.12.2 --- .pylintrc | 2 +- HISTORY.rst | 14 ++ Makefile | 2 +- aioradio.egg-info/PKG-INFO | 187 +++++++++++++++++++++++++ aioradio.egg-info/SOURCES.txt | 27 ++++ aioradio.egg-info/dependency_links.txt | 1 + aioradio.egg-info/not-zip-safe | 1 + aioradio.egg-info/requires.txt | 23 +++ aioradio.egg-info/top_level.txt | 2 + aioradio/ds_utils.py | 50 ++++++- aioradio/requirements.txt | 20 +-- setup.py | 4 +- 12 files changed, 319 insertions(+), 14 deletions(-) create mode 100644 aioradio.egg-info/PKG-INFO create mode 100644 aioradio.egg-info/SOURCES.txt create mode 100644 aioradio.egg-info/dependency_links.txt create mode 100644 aioradio.egg-info/not-zip-safe create mode 100644 aioradio.egg-info/requires.txt create mode 100644 aioradio.egg-info/top_level.txt diff --git a/.pylintrc b/.pylintrc index 5d310c5..c4c9cd5 100644 --- a/.pylintrc +++ b/.pylintrc @@ -1,2 +1,2 @@ [FORMAT] -max-line-length=150 +max-line-length=160 diff --git a/HISTORY.rst b/HISTORY.rst index fb77fba..aefe6d8 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,20 @@ History ======= +v0.20.9 (2024-02-29) + +* Add new function promote_model_to_production in ds_utils. +* Update ddtrace==2.6.5. +* Update httpx==0.27.0. +* Update orjson==3.9.15. +* Update pandas==2.2.1. +* Update pyodbc==5.1.0. +* Update polars==0.20.13. +* Update redis==5.0.2. +* Update python to 3.12.2. +* Add .show() to end of spark.sql(...) to display merge statistics. + + v0.20.8 (2024-02-19) * Update aioboto3==12.3.0. diff --git a/Makefile b/Makefile index 015d9cb..bef4043 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,7 @@ clean: install: . env/bin/activate; \ - pip install -r aioradio/requirements.txt + pip install -r aioradio/requirements.txt --use-deprecated=legacy-resolver setup: . env/bin/activate; \ diff --git a/aioradio.egg-info/PKG-INFO b/aioradio.egg-info/PKG-INFO new file mode 100644 index 0000000..9c732b0 --- /dev/null +++ b/aioradio.egg-info/PKG-INFO @@ -0,0 +1,187 @@ +Metadata-Version: 2.1 +Name: aioradio +Version: 0.20.9 +Summary: Generic asynchronous i/o python utilities for AWS services (SQS, S3, DynamoDB, Secrets Manager), Redis, MSSQL (pyodbc), JIRA and more +Home-page: https://github.com/nrccua/aioradio +Author: Encoura DS Team +Author-email: tim.reichard@encoura.org +License: MIT +Classifier: Programming Language :: Python :: 3 +Classifier: Operating System :: OS Independent +Requires-Python: >=3.7 +Description-Content-Type: text/markdown +License-File: LICENSE +Requires-Dist: cython>=0.29.33 +Requires-Dist: aioboto3==12.3.0 +Requires-Dist: aiojobs>=1.0.0 +Requires-Dist: backoff>=2.1.2 +Requires-Dist: botocore==1.34.34 +Requires-Dist: boto3==1.34.34 +Requires-Dist: ddtrace>=0.60.1 +Requires-Dist: faust-cchardet>=2.1.18 +Requires-Dist: fakeredis>=2.20.0 +Requires-Dist: haversine>=2.8.0 +Requires-Dist: httpx>=0.23.0 +Requires-Dist: mandrill>=1.0.60 +Requires-Dist: mlflow>=2.10.2 +Requires-Dist: numpy>=1.19 +Requires-Dist: openpyxl==3.0.10 +Requires-Dist: orjson>=3.6.8 +Requires-Dist: pandas>=1.3.5 +Requires-Dist: polars>=0.19.12 +Requires-Dist: pyarrow>=13.0.0 +Requires-Dist: pysmb>=1.2.7 +Requires-Dist: python-json-logger>=2.0.2 +Requires-Dist: pyspark>=3.4.1 +Requires-Dist: redis==5.0.1 + +# aioradio +Generic asynchronous i/o python utilities for AWS services (SQS, S3, DynamoDB, Secrets Manager), Redis, MSSQL (pyodbc), JIRA and more. + +## AWS S3 example code +aioradio abstracts using aiobotocore and aioboto3 making async AWS funtion calls simple one liners. +Besides what is shown below in the examples, there is also support for SQS, DynamoDB and Secrets Manager. + + +```python +import asyncio + +from aioradio.aws.s3 import ( + create_bucket, + delete_s3_object, + download_file, + get_object, + list_s3_objects, + upload_file +) + +async def main(): + s3_bucket = 'aioradio' + s3_prefix = 'test' + filename = 'hello_world.txt' + s3_key = f'{s3_prefix}/{filename}' + + # create an s3 bucket called aioradio + await create_bucket(bucket=s3_bucket) + + # create hello_world.txt file + with open(filename, 'w') as file_handle: + file_handle.write('hello world of aioradio!') + + # upload the file from s3 and confirm it now exists in s3 + await upload_file(bucket=s3_bucket, filepath=filename, s3_key=s3_key) + assert s3_key in await list_s3_objects(bucket=s3_bucket, s3_prefix=s3_prefix) + + # test downloading the file + await download_file(bucket=s3_bucket, filepath=filename, s3_key=s3_key) + + # test getting file data to object + result = await get_object(bucket=s3_bucket, s3_key=s3_key) + assert result == b'hello world of aioradio!' + + # delete the file from s3 + await delete_s3_object(bucket=s3_bucket, s3_prefix=s3_key) + assert s3_key not in await list_s3_objects(bucket=s3_bucket, s3_prefix=s3_prefix) + +asyncio.get_event_loop().run_until_complete(main()) +``` + +## MSSQL example code +aioredis uses the pyodbc library to work with ODBC databases. +It currently has support for connecting and sending queries to mssql. + +```python +import asyncio + +from aioradio.pyodbc import establish_pyodbc_connection +from aioradio.pyodbc import pyodbc_query_fetchone +from aioradio.pyodbc import pyodbc_query_fetchall + +def main(): + conn = establish_pyodbc_connection(host='your-host', user='your-user', pwd='your-password') + + query = "SELECT homeruns FROM MLB.dbo.LosAngelesAngels WHERE lastname = 'Trout' AND year = '2020'" + row = pyodbc_query_fetchone(conn=conn, query=query) + print(row) + + query = "SELECT homeruns FROM MLB.dbo.LosAngelesAngels WHERE lastname = 'Trout'" + rows = pyodbc_query_fetchall(conn=conn, query=query) + print(rows) + + +asyncio.get_event_loop().run_until_complete(main()) +``` + +## Jira example code +Jira uses the async library httpx behind the scene to send http requests. + +```python +import asyncio + +from aioradio.jira import add_comment_to_jira +from aioradio.jira import get_jira_issue +from aioradio.jira import post_jira_issue + +async def main(): + + # create a jira ticket + url = 'https://aioradio.atlassian.net/rest/api/2/issue/' + payload = { + "fields": { + "project": {"key": "aioradio"}, + "issuetype": {"name": "Task"}, + "reporter": {"accountId": "somebodies-account-id"}, + "priority": {"name": "Medium"}, + "summary": "Aioradio rocks!", + "description": "Aioradio Review", + "labels": ["aioradio"], + "assignee": {"accountId": "somebodies-account-id"} + } + } + resp = await post_jira_issue(url=url, jira_user='your-user', jira_token='your-password', payload=payload) + jira_id = resp.json()['key'] + + # get jira ticket info + resp = await get_jira_issue(url=f'{url}/{jira_id}', jira_user='your-user', jira_token='your-password') + + # add comment to jira ticket + comment = 'aioradio rocks!' + response = await add_comment_to_jira(url=url, jira_user='your-user', jira_token='your-password', comment=comment) + +asyncio.get_event_loop().run_until_complete(main()) +``` + +## INSTALLING FOR DIRECT DEVELOPMENT OF AIORADIO + +Install [python 3.11.X](https://www.python.org/downloads/) + +Make sure you've installed [ODBC drivers](https://docs.microsoft.com/en-us/sql/connect/python/pyodbc/step-1-configure-development-environment-for-pyodbc-python-development), required for using the python package pyodbc. + +Clone aioradio locally and navigate to the root directory + +Install and activate python VirtualEnv +```bash +python3.11 -m venv env +source env/bin/activate +``` + +Install python modules included in requirements.txt +```bash +pip install cython +pip install -r aioradio/requirements.txt +``` + +Run Makefile command from the root directory to test all is good before issuing push to master +``` +make all +``` + +## AUTHORS + +* **Tim Reichard** - [aioradio](https://github.com/nrccua/aioradio) + +See also the list of [contributors](https://github.com/nrccua/aioradio/graphs/contributors) who participated in this project. + +## ACKNOWLEDGEMENTS + +* **Pedro Artiga** - Developer contributing to aioradio. diff --git a/aioradio.egg-info/SOURCES.txt b/aioradio.egg-info/SOURCES.txt new file mode 100644 index 0000000..f13b62c --- /dev/null +++ b/aioradio.egg-info/SOURCES.txt @@ -0,0 +1,27 @@ +LICENSE +MANIFEST.in +README.md +setup.cfg +setup.py +aioradio/__init__.py +aioradio/ds_utils.py +aioradio/file_ingestion.py +aioradio/jira.py +aioradio/logger.py +aioradio/long_running_jobs.py +aioradio/psycopg2.py +aioradio/pyodbc.py +aioradio/redis.py +aioradio/utils.py +aioradio.egg-info/PKG-INFO +aioradio.egg-info/SOURCES.txt +aioradio.egg-info/dependency_links.txt +aioradio.egg-info/not-zip-safe +aioradio.egg-info/requires.txt +aioradio.egg-info/top_level.txt +aioradio/aws/dynamodb.py +aioradio/aws/moto_server.py +aioradio/aws/s3.py +aioradio/aws/secrets.py +aioradio/aws/sqs.py +aioradio/aws/utils.py \ No newline at end of file diff --git a/aioradio.egg-info/dependency_links.txt b/aioradio.egg-info/dependency_links.txt new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/aioradio.egg-info/dependency_links.txt @@ -0,0 +1 @@ + diff --git a/aioradio.egg-info/not-zip-safe b/aioradio.egg-info/not-zip-safe new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/aioradio.egg-info/not-zip-safe @@ -0,0 +1 @@ + diff --git a/aioradio.egg-info/requires.txt b/aioradio.egg-info/requires.txt new file mode 100644 index 0000000..69bbf50 --- /dev/null +++ b/aioradio.egg-info/requires.txt @@ -0,0 +1,23 @@ +cython>=0.29.33 +aioboto3==12.3.0 +aiojobs>=1.0.0 +backoff>=2.1.2 +botocore==1.34.34 +boto3==1.34.34 +ddtrace>=0.60.1 +faust-cchardet>=2.1.18 +fakeredis>=2.20.0 +haversine>=2.8.0 +httpx>=0.23.0 +mandrill>=1.0.60 +mlflow>=2.10.2 +numpy>=1.19 +openpyxl==3.0.10 +orjson>=3.6.8 +pandas>=1.3.5 +polars>=0.19.12 +pyarrow>=13.0.0 +pysmb>=1.2.7 +python-json-logger>=2.0.2 +pyspark>=3.4.1 +redis==5.0.1 diff --git a/aioradio.egg-info/top_level.txt b/aioradio.egg-info/top_level.txt new file mode 100644 index 0000000..186ca1b --- /dev/null +++ b/aioradio.egg-info/top_level.txt @@ -0,0 +1,2 @@ +aioradio +aioradio/aws diff --git a/aioradio/ds_utils.py b/aioradio/ds_utils.py index 2cff64a..cce1068 100644 --- a/aioradio/ds_utils.py +++ b/aioradio/ds_utils.py @@ -29,6 +29,8 @@ import pandas as pd import polars as pl from haversine import haversine, Unit +from mlflow.entities.model_registry.model_version_status import ModelVersionStatus +from mlflow.tracking.client import MlflowClient from pyspark.sql import SparkSession from smb.SMBConnection import SMBConnection @@ -119,7 +121,7 @@ def merge_spark_df_in_db(df, target, on, partition_by=None, stage_table=None, pa match_clause = ', '.join(f'{target}.{col} = {stage}.{col}' for col in df.columns if col != 'CREATED_DATETIME') try: - spark.sql(f'MERGE INTO {target} USING {stage} ON {on_clause} WHEN MATCHED THEN UPDATE SET {match_clause} WHEN NOT MATCHED THEN INSERT *') + spark.sql(f'MERGE INTO {target} USING {stage} ON {on_clause} WHEN MATCHED THEN UPDATE SET {match_clause} WHEN NOT MATCHED THEN INSERT *').show() spark.sql(f'DROP TABLE {stage}') except Exception: spark.sql(f'DROP TABLE {stage}') @@ -155,7 +157,7 @@ def merge_pandas_df_in_db(df, target, on, partition_by=None, stage_table=None): match_clause = ', '.join(f'{target}.{col} = {stage}.{col}' for col in df.columns if col != 'CREATED_DATETIME') try: - spark.sql(f'MERGE INTO {target} USING {stage} ON {on_clause} WHEN MATCHED THEN UPDATE SET {match_clause} WHEN NOT MATCHED THEN INSERT *') + spark.sql(f'MERGE INTO {target} USING {stage} ON {on_clause} WHEN MATCHED THEN UPDATE SET {match_clause} WHEN NOT MATCHED THEN INSERT *').show() spark.sql(f'DROP TABLE {stage}') except Exception: spark.sql(f'DROP TABLE {stage}') @@ -191,6 +193,50 @@ def read_constants_from_db(constants_list=None): return mapping +def promote_model_to_production(model_name, tags): + """Transition new model to production in Databricks.""" + + client = MlflowClient() + + # current registered version + new_model = client.get_latest_versions(name=model_name, stages=["None"]) + logger.info(f"new_model: {new_model}") + new_version = new_model[0].version + logger.info(f"new_version: {new_version}") + + # Add tags to registered model + for key in tags: + value = client.get_run(new_model[0].run_id).data.tags[key] + client.set_model_version_tag(model_name, new_version, key, value) + + # current production version + current_production = client.get_latest_versions(name=model_name, stages=["Production"]) + if len(current_production) > 0: + current_production_version = current_production[0].version + logger.info(f"current_production_version: {current_production_version}") + else: + current_production_version = None + + # ensure current version is ready + for _ in range(10): + model_version_details = client.get_model_version(name=model_name, version=new_version) + status = ModelVersionStatus.from_string(model_version_details.status) + logger.info(f"Model status: {ModelVersionStatus.to_string(status)}") + if status == ModelVersionStatus.READY: + break + time.sleep(1) + + registered_model = client.get_registered_model(model_name) + logger.info(f"registered_model: {registered_model}") + + # transition to production + client.transition_model_version_stage(name=model_name, version=new_version, stage="Production") + + # archive previous model (can also delete, but that is permanent) + if current_production_version is not None and new_version != current_production_version: + client.transition_model_version_stage(name=model_name, version=current_production_version, stage="Archived") + + ################################## DataFrame functions #################################### diff --git a/aioradio/requirements.txt b/aioradio/requirements.txt index 35ee8ea..365707e 100644 --- a/aioradio/requirements.txt +++ b/aioradio/requirements.txt @@ -4,32 +4,34 @@ backoff==2.2.1 boto3==1.34.34 botocore==1.34.34 cython==3.0.8 -ddtrace==1.11.2 +ddtrace==2.6.5 dominodatalab==1.3.0 fakeredis==2.21.1 faust-cchardet==2.1.19 flask==2.1.2 flask-cors==3.0.10 haversine==2.8.0 -httpx==0.26.0 +httpx==0.27.0 mandrill==1.0.60 +mlflow==2.10.2 moto==3.1.18 openpyxl==3.0.10 -orjson==3.8.10 -pandas==2.2.0 -polars==0.20.10 +orjson==3.9.15 +pandas==2.2.1 +polars==0.20.13 pre-commit==3.6.2 psycopg2-binary==2.9.9 pyarrow==15.0.0 -pylint==3.0.3 -pyodbc==4.0.39 --no-binary=pyodbc +pylint==3.1.0 +pyodbc==5.1.0 --no-binary=pyodbc pysmb==1.2.9.1 pyspark==3.4.1 -pytest==8.0.1 +pytest==8.0.2 pytest-asyncio==0.21.1 pytest-cov==4.1.0 python-json-logger==2.0.7 -redis==5.0.1 +redis==5.0.2 twine==5.0.0 +typing_extensions==4.10.0 werkzeug==2.1.2 wheel==0.42.0 diff --git a/setup.py b/setup.py index 4cd2d15..87c2700 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ long_description = fileobj.read() setup(name='aioradio', - version='0.20.8', + version='0.20.9', description='Generic asynchronous i/o python utilities for AWS services (SQS, S3, DynamoDB, Secrets Manager), Redis, MSSQL (pyodbc), JIRA and more', long_description=long_description, long_description_content_type="text/markdown", @@ -32,6 +32,7 @@ 'haversine>=2.8.0', 'httpx>=0.23.0', 'mandrill>=1.0.60', + 'mlflow>=2.10.2', 'numpy>=1.19', 'openpyxl==3.0.10', 'orjson>=3.6.8', @@ -53,6 +54,7 @@ 'pytest>=7.0.1', 'pytest-asyncio>=0.15.1', 'pytest-cov>=3.0.0', + 'typing_extensions>=4.10.0', 'werkzeug==2.1.2' ], zip_safe=False,