π₯ Discover Model Lifecycle Automation and Orchestration π»
In the previous unit, you implemented a full model lifecycle in the cloud:
- Sourcing data from a data warehouse (Google BigQuery) and storing model weights on a bucket (GCS)
- Launching a training task on a virtual machine (VM), including evaluating the model performance and making predictions
The WagonCab team is really happy with your work and assigns you to a new mission: ensure the validity of the trained model over time.
As you might imagine, the fare amount of a taxi ride tends to change over time with the economy, and the model could be accurate right now but obsolete in the future.
π€― After a quick brainstorming session with your team, you come up with a plan:
- Implement a process to monitor the performance of the
Production
model over time - Implement an automated workflow to:
- Fetch fresh data
- Preprocess the fresh data
- Evaluate the performance of the
Production
model on fresh data - Train a
Staging
model on the fresh data, in parallel to the task above - Compare
Production
vsStaging
performance - Set a threshold for a model being good enough for production
- If
Staging
better than bothProduction
and the threshold, put it into production automatically - Otherwise where
Production
is better and still above the threshold leave it in production. - If neither meet the threshold notify a human who will decide whether or not to deploy the
Staging
model toProduction
and what others fixes are needed!
- Deploy this workflow and wait for fresh data to come
βInstructions (expand me)
π» Install version 0.0.10
of the taxifare
package
make reinstall_package
Notice we've added 3 new packages: mlflow
, prefect
and psycopg2-binary
β
Check your taxifare
package version
pip list | grep taxifare
# taxifare 0.0.10
π» copy the .env.sample
file, fill .env
, allow direnv
We want to see some proper learning curve today: Let's set
DATA_SIZE='200k'
We'll move to all
at the very end!
π You are up and ready!
βInstructions (expand me)
π€― You may remember that handling model versioning with local storage or GCS was quite shaky! We had to store weights as model/{current_timestamp}.h5
, then and sort by most_recent etc...
π€ Welcome MLFlow! It will:
- store both trained models weights and the results of our experiments (metrics, params) in the cloud!
- allow us to tag our models
- allow us to visually monitor the evolution of the performance of our models, experiment after experiment!
π We have only slightly updated your taxifare package compared with unit 02:
interface/main.py
:train()
andevaluate()
are now decorated with@mlflow_run
ml_logic/registry.py
: definesmlflow_run()
to automatically log TF training params!interface/workflow.py
: (Keep for later) Entry point to run the "re-train-if-performance-decreases" worflow)
The WagonCab tech team put in production an MLflow server located at https://mlflow.lewagon.ai, you will use in to track your experiments and store your trained models.
π Look at your .env
file and discover 4 new variables to edit:
MODEL_TARGET
(local
,gcs
, or nowmlflow
) which defines how thetaxifare
package should save the outputs of the trainingMLFLOW_TRACKING_URI
MLFLOW_EXPERIMENT
, which is the name of the experiment, should containtaxifare_experiment_<user.github_nickname>
MLFLOW_MODEL_NAME
, which is the name of your model, should containtaxifare_<user.github_nickname>
π§ͺ Run the tests with make test_mlflow_config
Now that your MLflow config is set up, you need to update your package so that the trained model, its params and its performance metrics are pushed to MLflow every time you run an new experiment, i.e. a new training.
β Which module of your taxifare
package is responsible for saving the training outputs?
Answer
It is the role of the taxifare.ml_logic.registry
module to save the trained model, its parameters, and its performance metrics, all thanks to the save_model()
, save_results()
, and mlflow_run()
functions.
save_model
to save the models!save_results
to save parameters and metricsmlflow_run
is a decorator to start the runs and start the tf autologging
First, check if you already have a processed dataset available with the correct DATA_SIZE.
make show_sources_all
If not,
make run_preprocess
Now, lets do a first run of training to see what our decorator @mlflow_run
creates for us thanks to mlflow.tensorflow.autolog()
make run_train
βοΈ This time, you should see the print "β mlflow_run autolog done"
β Checkout what is logged on your experiment on https://mlflow.lewagon.ai/
- Try to plot the your learning curve of
mae
andval_mae
as function of epochs directly on the website UI !
Beyond tensorflow specific training metrics, what else do you think we would want to log as well ?
π‘ Solution
We can give more context:
- Was this a train() run or evaluate()?
- Data: How much data was used for this training run!
- etc...
β Edit registry::save_results
so that when the model target is mlflow also save our additional params and metrics to mlflow.
π‘ Try Cmd-Shift-R for global symbol search - thank me later =)
π Solution
For params
if MODEL_TARGET == "mlflow":
if params is not None:
mlflow.log_params(params)
if metrics is not None:
mlflow.log_metrics(metrics)
print("β
Results saved on mlflow")
Let's have a look at taxifare.ml_logic.registry::save_model
-
π€― Handling model versioning manually with local storage or GCS was quite shaky! We have to store weights as
model/{current_timestamp}.h5
, then and sort by most_recent etc... -
Let's use mlflow
mlflow.tensorflow.log_model
method to store model for us instead! MLflow will use its own AWS S3 bucket (equivalent to GCS) !
π» Complete the first step of the save_model
function
# registry.py
def save_model():
# [...]
if MODEL_TARGET == "mlflow":
# YOUR CODE HERE
π Solution
mlflow.tensorflow.log_model(model=model,
artifact_path="model",
registered_model_name=MLFLOW_MODEL_NAME
)
print("β
Model saved to mlflow")
Once a new model is trained, it should be moved into staging, and then compared with the model in production, if there is an improvement it should be moved into production instead!
β Add your code at the section in interface.main
using registry.mlflow_transition_model
:
def train():
# [...]
# The latest model should be moved to staging
pass # YOUR CODE HERE
Make a final training so as to save model to ML flow in "Staging" stage π€ Why staging? We never want to put in production a model without checking it's metric first!
make run_train
It should print something like this
- β Model saved to mlflow
- β Model <model_name> version 1 transitioned from None to Staging
Take a look at your model now on https://mlflow.lewagon.ai
"What's the point of storing my model on MLflow", you say? Well, for starters, MLflow allows you to very easily handle the lifecycle stage of the model (None, Staging or Production) to synchronize the information across the team. And more importantly, it allows any application to load a trained model at any given stage to make a prediction.
First, notice that make run_pred
requires a model in Production by default (not in Staging)
π Let's manually change your model from "Staging" to "Production" in mlflow graphical UI!
π» Then, complete the load_model
function in the taxifare.ml_logic.registry
module
- And try to run a prediction using
make run_pred
- π‘ Hint: Have a look at the MLflow Python API for Tensorflow and find a function to retrieve your trained model.
π Solution
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
client = MlflowClient()
try:
model_versions = client.get_latest_versions(name=MLFLOW_MODEL_NAME, stages=[stage])
model_uri = model_versions[0].source
assert model_uri is not None
except:
print(f"\nβ No model found with name {MLFLOW_MODEL_NAME} in stage {stage}")
return None
model = mlflow.tensorflow.load_model(model_uri=model_uri)
print("β
model loaded from mlflow")
π» Check that you can also evaluate your production model by calling make run_evaluate
β
When you are all set, track your progress on Kitt with make test_kitt
π Congrats! Your taxifare
package is now persisting every aspect of your experiments on MLflow, and you have a production-ready model!
βInstructions (expand me)
Currently our retraining process relies on us running and comparing results manually. Lets build a prefect workflow to automate this process!
- Checkout the
.env
make sure PREFECT_FLOW_NAME is filled. - Go to https://www.prefect.io/, log in and then create a workspace!
- Authenticate via the cli:
prefect cloud login
π Edit your .env
project configuration file:**
PREFECT_FLOW_NAME
should follow thetaxifare_lifecycle_<user.github_nickname>
conventionPREFECT_LOG_LEVEL
should sayWARNING
(more info here).
π§ͺ Run the tests with make test_prefect_config
Now by running make run_workflow
on your prefect cloud dashboard you should see an empty flow run appear on your cloud dashboard.
π― Now you need to work on completing train_flow()
that you will find in workflow.py
.
@flow(name=PREFECT_FLOW_NAME)
def train_flow():
"""
Build the prefect workflow for the `taxifare` package. It should:
- preprocess 1 month of new data, starting from EVALUATION_START_DATE
- compute `old_mae` by evaluating current production model in this new month period
- compute `new_mae` by re-training then evaluating current production model on this new month period
- if new better than old, replace current production model by new one
- if neither models are good enough, send a notification!
"""
π‘ Keep your code DRY: Our tasks simply call our various main.py
entrypoints with argument of our choice! We could even get rid of them entirely and simply decorate our main entrypoints with @tasks. How elegant is that!
π‘ Quick TLDR on how prefect works:
# Define your tasks
@task
def task1():
pass
@task
def task2():
pass
# Define your workflow
@flow
def myworkflow():
# Define the orchestration graph ("DAG")
task1_future = task1.submit()
task2_future = task2.submit(..., wait_for=[task1_future]) # <-- task2 starts only after task1
# Compute your results as actual python object
task1_result = task1_future.result()
task2_result = task2_future.result()
# Do something with the results (e.g. compare them)
assert task1_result < task2_result
# Actually launch your workflow
myworkflow()
π§ͺ Check your code with make run_workflow
You should see two tasks run one after the other like below π
π‘ In the flow task re_train
make sure to set split size to 0.2: as only using 0.02 won't be enough when we are getting new data for just one month.
π§ͺ make run_workflow
again: you should see a workflow like this in your prefect dashboard
We have a scenario where neither model is good enough - in that case, we want to send messages to our team and say what has happened with a model depending on the retraining!
β Implement the notify
task
π Code to copy-paste
# flow.py
import requests
@task
def notify(old_mae, new_mae):
"""
Notify about the performance
"""
base_url = 'https://chat.api.lewagon.com'
channel = 'YOUR_BATCH_NUMBER' # Change to your batch number
url = f"{base_url}/{channel}/messages"
author = 'YOUR_GITHUB_NICKNAME' # Change this to your github nickname
if new_mae < old_mae and new_mae < 2.5:
content = f"π New model replacing old in production with MAE: {new_mae} the Old MAE was: {old_mae}"
elif old_mae < 2.5:
content = f"β
Old model still good enough: Old MAE: {old_mae} - New MAE: {new_mae}"
else:
content = f"π¨ No model good enough: Old MAE: {old_mae} - New MAE: {new_mae}"
data = dict(author=author, content=content)
response = requests.post(url, data=data)
response.raise_for_status()
β
When you are all set, track your results on Kitt with make test_kitt
βInstructions (expand me)
First, train a full model up to Jan 2015 on all
data
DATA_SIZE='all'
MLFLOW_MODEL_NAME=taxifare_<user.github_nickname>_all
direnv reload
ONLY IF you haven't done it yet with all
data in the past!
# make run_preprocess
Then
make run_train
β And manually put this first model manually to production.
π We are now end January
EVALUATION_START_DATE="2015-01-01"
Compare your current model with a newly trained one
make run_workflow
π Our new model retrained on the data in Jan should performs slightly better so we have rolled it into production!
β Check your notification on https://chat.api.lewagon.com/<user.batch_slug>
π We are now end February
EVALUATION_START_DATE="2015-02-01"
direnv reload
make run_workflow
π We are now end March
EVALUATION_START_DATE="2015-03-01"
direnv reload
make run_workflow
π We are now end April ...
ππππ Congrats on plugging the taxifare
package into a fully automated workflow lifecycle!
βInstructions (expand me)
- Before deciding which model version to put in production, try a couple of hyperparameters during the training phase, by wisely testing (grid-searching?) various values for
batch_size
,learning_rate
andpatience
. - In addition, after fine-tuning and deciding on a model, try to re-train using the whole new dataset of each month, and not just the "train_new".
- Try to replace prefect cloud with a locally run prefect local UI
- Add a work queue
- Put this onto a vm to with a schedule to have a truly automated model lifecycle!