Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add promote_model_to_production func and update python to 3.12.2 #122

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[FORMAT]
max-line-length=150
max-line-length=160
14 changes: 14 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,20 @@ History
=======


v0.20.9 (2024-02-29)

* Add new function promote_model_to_production in ds_utils.
* Update ddtrace==2.6.5.
* Update httpx==0.27.0.
* Update orjson==3.9.15.
* Update pandas==2.2.1.
* Update pyodbc==5.1.0.
* Update polars==0.20.13.
* Update redis==5.0.2.
* Update python to 3.12.2.
* Add .show() to end of spark.sql(...) to display merge statistics.


v0.20.8 (2024-02-19)

* Update aioboto3==12.3.0.
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ clean:

install:
. env/bin/activate; \
pip install -r aioradio/requirements.txt
pip install -r aioradio/requirements.txt --use-deprecated=legacy-resolver

setup:
. env/bin/activate; \
Expand Down
187 changes: 187 additions & 0 deletions aioradio.egg-info/PKG-INFO
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
Metadata-Version: 2.1
Name: aioradio
Version: 0.20.9
Summary: Generic asynchronous i/o python utilities for AWS services (SQS, S3, DynamoDB, Secrets Manager), Redis, MSSQL (pyodbc), JIRA and more
Home-page: https://github.com/nrccua/aioradio
Author: Encoura DS Team
Author-email: [email protected]
License: MIT
Classifier: Programming Language :: Python :: 3
Classifier: Operating System :: OS Independent
Requires-Python: >=3.7
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: cython>=0.29.33
Requires-Dist: aioboto3==12.3.0
Requires-Dist: aiojobs>=1.0.0
Requires-Dist: backoff>=2.1.2
Requires-Dist: botocore==1.34.34
Requires-Dist: boto3==1.34.34
Requires-Dist: ddtrace>=0.60.1
Requires-Dist: faust-cchardet>=2.1.18
Requires-Dist: fakeredis>=2.20.0
Requires-Dist: haversine>=2.8.0
Requires-Dist: httpx>=0.23.0
Requires-Dist: mandrill>=1.0.60
Requires-Dist: mlflow>=2.10.2
Requires-Dist: numpy>=1.19
Requires-Dist: openpyxl==3.0.10
Requires-Dist: orjson>=3.6.8
Requires-Dist: pandas>=1.3.5
Requires-Dist: polars>=0.19.12
Requires-Dist: pyarrow>=13.0.0
Requires-Dist: pysmb>=1.2.7
Requires-Dist: python-json-logger>=2.0.2
Requires-Dist: pyspark>=3.4.1
Requires-Dist: redis==5.0.1

# aioradio
Generic asynchronous i/o python utilities for AWS services (SQS, S3, DynamoDB, Secrets Manager), Redis, MSSQL (pyodbc), JIRA and more.

## AWS S3 example code
aioradio abstracts using aiobotocore and aioboto3 making async AWS funtion calls simple one liners.
Besides what is shown below in the examples, there is also support for SQS, DynamoDB and Secrets Manager.


```python
import asyncio

from aioradio.aws.s3 import (
create_bucket,
delete_s3_object,
download_file,
get_object,
list_s3_objects,
upload_file
)

async def main():
s3_bucket = 'aioradio'
s3_prefix = 'test'
filename = 'hello_world.txt'
s3_key = f'{s3_prefix}/{filename}'

# create an s3 bucket called aioradio
await create_bucket(bucket=s3_bucket)

# create hello_world.txt file
with open(filename, 'w') as file_handle:
file_handle.write('hello world of aioradio!')

# upload the file from s3 and confirm it now exists in s3
await upload_file(bucket=s3_bucket, filepath=filename, s3_key=s3_key)
assert s3_key in await list_s3_objects(bucket=s3_bucket, s3_prefix=s3_prefix)

# test downloading the file
await download_file(bucket=s3_bucket, filepath=filename, s3_key=s3_key)

# test getting file data to object
result = await get_object(bucket=s3_bucket, s3_key=s3_key)
assert result == b'hello world of aioradio!'

# delete the file from s3
await delete_s3_object(bucket=s3_bucket, s3_prefix=s3_key)
assert s3_key not in await list_s3_objects(bucket=s3_bucket, s3_prefix=s3_prefix)

asyncio.get_event_loop().run_until_complete(main())
```

## MSSQL example code
aioredis uses the pyodbc library to work with ODBC databases.
It currently has support for connecting and sending queries to mssql.

```python
import asyncio

from aioradio.pyodbc import establish_pyodbc_connection
from aioradio.pyodbc import pyodbc_query_fetchone
from aioradio.pyodbc import pyodbc_query_fetchall

def main():
conn = establish_pyodbc_connection(host='your-host', user='your-user', pwd='your-password')

query = "SELECT homeruns FROM MLB.dbo.LosAngelesAngels WHERE lastname = 'Trout' AND year = '2020'"
row = pyodbc_query_fetchone(conn=conn, query=query)
print(row)

query = "SELECT homeruns FROM MLB.dbo.LosAngelesAngels WHERE lastname = 'Trout'"
rows = pyodbc_query_fetchall(conn=conn, query=query)
print(rows)


asyncio.get_event_loop().run_until_complete(main())
```

## Jira example code
Jira uses the async library httpx behind the scene to send http requests.

```python
import asyncio

from aioradio.jira import add_comment_to_jira
from aioradio.jira import get_jira_issue
from aioradio.jira import post_jira_issue

async def main():

# create a jira ticket
url = 'https://aioradio.atlassian.net/rest/api/2/issue/'
payload = {
"fields": {
"project": {"key": "aioradio"},
"issuetype": {"name": "Task"},
"reporter": {"accountId": "somebodies-account-id"},
"priority": {"name": "Medium"},
"summary": "Aioradio rocks!",
"description": "Aioradio Review",
"labels": ["aioradio"],
"assignee": {"accountId": "somebodies-account-id"}
}
}
resp = await post_jira_issue(url=url, jira_user='your-user', jira_token='your-password', payload=payload)
jira_id = resp.json()['key']

# get jira ticket info
resp = await get_jira_issue(url=f'{url}/{jira_id}', jira_user='your-user', jira_token='your-password')

# add comment to jira ticket
comment = 'aioradio rocks!'
response = await add_comment_to_jira(url=url, jira_user='your-user', jira_token='your-password', comment=comment)

asyncio.get_event_loop().run_until_complete(main())
```

## INSTALLING FOR DIRECT DEVELOPMENT OF AIORADIO

Install [python 3.11.X](https://www.python.org/downloads/)

Make sure you've installed [ODBC drivers](https://docs.microsoft.com/en-us/sql/connect/python/pyodbc/step-1-configure-development-environment-for-pyodbc-python-development), required for using the python package pyodbc.

Clone aioradio locally and navigate to the root directory

Install and activate python VirtualEnv
```bash
python3.11 -m venv env
source env/bin/activate
```

Install python modules included in requirements.txt
```bash
pip install cython
pip install -r aioradio/requirements.txt
```

Run Makefile command from the root directory to test all is good before issuing push to master
```
make all
```

## AUTHORS

* **Tim Reichard** - [aioradio](https://github.com/nrccua/aioradio)

See also the list of [contributors](https://github.com/nrccua/aioradio/graphs/contributors) who participated in this project.

## ACKNOWLEDGEMENTS

* **Pedro Artiga** - Developer contributing to aioradio.
27 changes: 27 additions & 0 deletions aioradio.egg-info/SOURCES.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
LICENSE
MANIFEST.in
README.md
setup.cfg
setup.py
aioradio/__init__.py
aioradio/ds_utils.py
aioradio/file_ingestion.py
aioradio/jira.py
aioradio/logger.py
aioradio/long_running_jobs.py
aioradio/psycopg2.py
aioradio/pyodbc.py
aioradio/redis.py
aioradio/utils.py
aioradio.egg-info/PKG-INFO
aioradio.egg-info/SOURCES.txt
aioradio.egg-info/dependency_links.txt
aioradio.egg-info/not-zip-safe
aioradio.egg-info/requires.txt
aioradio.egg-info/top_level.txt
aioradio/aws/dynamodb.py
aioradio/aws/moto_server.py
aioradio/aws/s3.py
aioradio/aws/secrets.py
aioradio/aws/sqs.py
aioradio/aws/utils.py
1 change: 1 addition & 0 deletions aioradio.egg-info/dependency_links.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

1 change: 1 addition & 0 deletions aioradio.egg-info/not-zip-safe
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

23 changes: 23 additions & 0 deletions aioradio.egg-info/requires.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
cython>=0.29.33
aioboto3==12.3.0
aiojobs>=1.0.0
backoff>=2.1.2
botocore==1.34.34
boto3==1.34.34
ddtrace>=0.60.1
faust-cchardet>=2.1.18
fakeredis>=2.20.0
haversine>=2.8.0
httpx>=0.23.0
mandrill>=1.0.60
mlflow>=2.10.2
numpy>=1.19
openpyxl==3.0.10
orjson>=3.6.8
pandas>=1.3.5
polars>=0.19.12
pyarrow>=13.0.0
pysmb>=1.2.7
python-json-logger>=2.0.2
pyspark>=3.4.1
redis==5.0.1
2 changes: 2 additions & 0 deletions aioradio.egg-info/top_level.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
aioradio
aioradio/aws
50 changes: 48 additions & 2 deletions aioradio/ds_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import pandas as pd
import polars as pl
from haversine import haversine, Unit
from mlflow.entities.model_registry.model_version_status import ModelVersionStatus
from mlflow.tracking.client import MlflowClient
from pyspark.sql import SparkSession
from smb.SMBConnection import SMBConnection

Expand Down Expand Up @@ -119,7 +121,7 @@ def merge_spark_df_in_db(df, target, on, partition_by=None, stage_table=None, pa
match_clause = ', '.join(f'{target}.{col} = {stage}.{col}' for col in df.columns if col != 'CREATED_DATETIME')

try:
spark.sql(f'MERGE INTO {target} USING {stage} ON {on_clause} WHEN MATCHED THEN UPDATE SET {match_clause} WHEN NOT MATCHED THEN INSERT *')
spark.sql(f'MERGE INTO {target} USING {stage} ON {on_clause} WHEN MATCHED THEN UPDATE SET {match_clause} WHEN NOT MATCHED THEN INSERT *').show()
spark.sql(f'DROP TABLE {stage}')
except Exception:
spark.sql(f'DROP TABLE {stage}')
Expand Down Expand Up @@ -155,7 +157,7 @@ def merge_pandas_df_in_db(df, target, on, partition_by=None, stage_table=None):
match_clause = ', '.join(f'{target}.{col} = {stage}.{col}' for col in df.columns if col != 'CREATED_DATETIME')

try:
spark.sql(f'MERGE INTO {target} USING {stage} ON {on_clause} WHEN MATCHED THEN UPDATE SET {match_clause} WHEN NOT MATCHED THEN INSERT *')
spark.sql(f'MERGE INTO {target} USING {stage} ON {on_clause} WHEN MATCHED THEN UPDATE SET {match_clause} WHEN NOT MATCHED THEN INSERT *').show()
spark.sql(f'DROP TABLE {stage}')
except Exception:
spark.sql(f'DROP TABLE {stage}')
Expand Down Expand Up @@ -191,6 +193,50 @@ def read_constants_from_db(constants_list=None):
return mapping


def promote_model_to_production(model_name, tags):
"""Transition new model to production in Databricks."""

client = MlflowClient()

# current registered version
new_model = client.get_latest_versions(name=model_name, stages=["None"])
logger.info(f"new_model: {new_model}")
new_version = new_model[0].version
logger.info(f"new_version: {new_version}")

# Add tags to registered model
for key in tags:
value = client.get_run(new_model[0].run_id).data.tags[key]
client.set_model_version_tag(model_name, new_version, key, value)

# current production version
current_production = client.get_latest_versions(name=model_name, stages=["Production"])
if len(current_production) > 0:
current_production_version = current_production[0].version
logger.info(f"current_production_version: {current_production_version}")
else:
current_production_version = None

# ensure current version is ready
for _ in range(10):
model_version_details = client.get_model_version(name=model_name, version=new_version)
status = ModelVersionStatus.from_string(model_version_details.status)
logger.info(f"Model status: {ModelVersionStatus.to_string(status)}")
if status == ModelVersionStatus.READY:
break
time.sleep(1)

registered_model = client.get_registered_model(model_name)
logger.info(f"registered_model: {registered_model}")

# transition to production
client.transition_model_version_stage(name=model_name, version=new_version, stage="Production")

# archive previous model (can also delete, but that is permanent)
if current_production_version is not None and new_version != current_production_version:
client.transition_model_version_stage(name=model_name, version=current_production_version, stage="Archived")


################################## DataFrame functions ####################################


Expand Down
Loading
Loading