Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Train use mlflow #287

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion nlu/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@ Icon?

# IDEs
*.swp
.env
.env
*.pkl
mlruns
3 changes: 2 additions & 1 deletion nlu/boilerplate.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def extra_params(self, value):
def save_dir(self):
return self._save_dir

def save(self):
def save_model(self):
"""Save the model's weights."""
if self._ckpt is None:
self._ckpt = tf.train.Checkpoint(model=self)
Expand All @@ -153,6 +153,7 @@ def save(self):
self.save_dir, "extra_params.json")
with open(extra_params_path, "w") as f:
json.dump(self.extra_params, f, indent=4, sort_keys=True)
return self

def restore(self):
"""Restore the model's latest saved weights."""
Expand Down
23 changes: 13 additions & 10 deletions nlu/data_loaders/jisfdl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import numpy as np
from transformers import PreTrainedTokenizerFast, PreTrainedTokenizer


import boilerplate as tfbp
from utils.json_helper import JsonHelper

Expand All @@ -25,6 +24,8 @@ def __init__(self, id, intent, positions, slots, text):

def __repr__(self):
return str(json.dumps(self.__dict__, indent=2)) # type: ignore


##
# JISFDL : Joint Intent and Slot Filling Model Data Loader
##
Expand Down Expand Up @@ -54,7 +55,7 @@ def get_slot_from_token(self, token: str, slot_dict: Dict[str, str]):

def encode_slots(self, tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast],
all_slots: List[Dict[str, str]], all_texts: List[str],
slot_map: Dict[str, int], max_len: int):
slot_map: Dict[str, int], max_len: int):

encoded_slots = np.zeros(
shape=(len(all_texts), max_len), dtype=np.int32)
Expand Down Expand Up @@ -89,7 +90,7 @@ def encode_slots(self, tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizer
# now add to encoded_slots
# the first and the last elements
# in encoded text are special characters
encoded_slots[idx, 1:len(enc)+1] = enc
encoded_slots[idx, 1:len(enc) + 1] = enc

return encoded_slots

Expand All @@ -105,7 +106,9 @@ def parse_dataset_intents(self, data):
if not bool(lang):
examples = all_examples
else:
examples = filter(lambda exp: any(e['entity'] == 'language' and e['value'] == lang for e in exp['entities']), all_examples)
examples = filter(
lambda exp: any(e['entity'] == 'language' and e['value'] == lang for e in exp['entities']),
all_examples)

# Parse raw data
for exp in examples:
Expand All @@ -126,7 +129,7 @@ def parse_dataset_intents(self, data):

return intents

def __call__(self, tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast], model_params = None):
def __call__(self, tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast], model_params=None):
# I have already transformed the train and test datasets to the new format using
# the transform to new hidden method.

Expand All @@ -143,7 +146,8 @@ def __call__(self, tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast
else:
raise ValueError("Unknown method!")

def _transform_dataset(self, dataset: List[JointRawData], tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast], model_params = None):
def _transform_dataset(self, dataset: List[JointRawData],
tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast], model_params=None):
# We have to encode the texts using the tokenizer to create tensors for training
# the classifier.
texts = [d.text for d in dataset]
Expand All @@ -167,7 +171,7 @@ def _transform_dataset(self, dataset: List[JointRawData], tokenizer: Union[PreTr
intent_names = model_params["intent_names"]
else:
intent_names = None

if "slot_names" in model_params:
slot_names = model_params["slot_names"]
else:
Expand Down Expand Up @@ -201,15 +205,14 @@ def _transform_dataset(self, dataset: List[JointRawData], tokenizer: Union[PreTr
max_len = len(encoded_texts["input_ids"][0]) # type: ignore
all_slots = [td.slots for td in dataset]
all_texts = [td.text for td in dataset]

if slot_map:
encoded_slots = self.encode_slots(tokenizer,
all_slots, all_texts, slot_map, max_len)
all_slots, all_texts, slot_map, max_len)
else:
encoded_slots = None

return encoded_texts, encoded_intents, encoded_slots, intent_names, slot_names


def encode_text(self, text: str, tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast]):
return self.encode_texts([text], tokenizer)
32 changes: 32 additions & 0 deletions nlu/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
version: '3.9'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add these ROOT/docker/docker-compose.nlu.yml and ROOT/docker/docker-compose.nlu.dev.yml

services:
mlflow_postgres:
image: bitnami/postgresql
container_name: postgres_db
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=mlflow_db
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"

mlflow_server:
restart: always
build:
context: ./docker
dockerfile: Dockerfile # Specify the Dockerfile explicitly
image: mlflow
container_name: mlflow_server
environment:
- BACKEND_STORE_URI=postgresql://postgres:postgres@mlflow_postgres:5432/mlflow_db # Connection string to Postgres
- ARTIFACT_STORE_URI=./mlruns # Local directory for storing artifacts
ports:
- "5002:5000" # Expose MLflow UI
volumes:
- ./mlruns:/mlruns # Mount local directory for MLflow artifacts
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- ./mlruns:/mlruns # Mount local directory for MLflow artifacts
- mlruns:/mlruns # Mount local directory for MLflow artifacts

Add "mlruns:" to the volumes

command: mlflow server --backend-store-uri postgresql://postgres:postgres@mlflow_postgres:5432/mlflow_db --default-artifact-root ./mlruns --host 0.0.0.0 --port 5000

volumes:
postgres_data: {}
5 changes: 5 additions & 0 deletions nlu/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM python:3.11

# Install python package
COPY requirements.txt /tmp/
RUN pip install --no-cache-dir -r /tmp/requirements.txt
3 changes: 3 additions & 0 deletions nlu/docker/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mlflow==2.16.2
psycopg2-binary==2.9.10
boto3==1.35.47
109 changes: 77 additions & 32 deletions nlu/models/intent_classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
from data_loaders.jisfdl import JISFDL

import boilerplate as tfbp

import mlflow
import time
import pickle
##
# Intent Classification with BERT
# This code is based on the paper BERT for Joint Intent Classification and Slot Filling by Chen et al. (2019),
Expand All @@ -34,6 +36,7 @@
'fr': "dbmdz/bert-base-french-europeana-cased",
}

mlflow.set_tracking_uri("http://0.0.0.0:5002")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
mlflow.set_tracking_uri("http://0.0.0.0:5002")
mlflow.set_tracking_uri("http://0.0.0.0:5002")

This should be the docker hostname


@tfbp.default_export
class IntentClassifier(tfbp.Model):
Expand All @@ -42,7 +45,7 @@ class IntentClassifier(tfbp.Model):
"num_epochs": 2,
"dropout_prob": 0.1,
"intent_num_labels": 7,
"gamma": 2,
"gamma": 2.0,
"k": 3
}
data_loader: JISFDL
Expand Down Expand Up @@ -119,35 +122,70 @@ def format_scores(self, scores: Dict[str, dict]):
@tfbp.runnable
def fit(self):
"""Training"""
encoded_texts, encoded_intents, encoded_slots, intent_names, slot_names = self.data_loader(
self.tokenizer)

if self.hparams.intent_num_labels != len(intent_names):
raise ValueError(
f"Hyperparam intent_num_labels mismatch, should be : {len(intent_names)}"
)

# Hyperparams, Optimizer and Loss function
opt = Adam(learning_rate=3e-5, epsilon=1e-08)

losses = SparseCategoricalFocalLoss(gamma=self.hparams.gamma)

metrics = [SparseCategoricalAccuracy("accuracy")]

# Compile model
self.compile(optimizer=opt, loss=losses, metrics=metrics)

x = {"input_ids": encoded_texts["input_ids"], "token_type_ids": encoded_texts["token_type_ids"],
"attention_mask": encoded_texts["attention_mask"]}

super().fit(
x, encoded_intents, epochs=self.hparams.num_epochs, batch_size=32, shuffle=True)

# Persist the model
self.extra_params["intent_names"] = intent_names

self.save()

# Start MLflow run
with mlflow.start_run() as run:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be implement in the parent class so that it would work for all models ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally it should be implemented as a helper class. Otherwise, we're logging metrics during training. The base class doesn't have a fit method.

# Log hyperparameters
mlflow.log_param("language", self.hparams.language)
mlflow.log_param("num_epochs", self.hparams.num_epochs)
mlflow.log_param("dropout_prob", self.hparams.dropout_prob)
mlflow.log_param("intent_num_labels", self.hparams.intent_num_labels)

encoded_texts, encoded_intents, encoded_slots, intent_names, slot_names = self.data_loader(
self.tokenizer)

if self.hparams.intent_num_labels != len(intent_names):
raise ValueError(
f"Hyperparam intent_num_labels mismatch, should be : {len(intent_names)}"
)

# Hyperparams, Optimizer and Loss function
opt = Adam(learning_rate=3e-5, epsilon=1e-08)

losses = SparseCategoricalFocalLoss(gamma=self.hparams.gamma)

metrics = [SparseCategoricalAccuracy("accuracy")]

# Compile model
self.compile(optimizer=opt, loss=losses, metrics=metrics)

x = {"input_ids": encoded_texts["input_ids"], "token_type_ids": encoded_texts["token_type_ids"],
"attention_mask": encoded_texts["attention_mask"]}

start_time = time.time()
history = super().fit(
x, encoded_intents, epochs=self.hparams.num_epochs, batch_size=32, shuffle=True)
end_time = time.time()

# Log training time
mlflow.log_metric("training_time", end_time - start_time)

# Log training metrics
for epoch in range(len(history.history['loss'])):
mlflow.log_metric("loss", history.history["loss"][epoch], step=epoch)
mlflow.log_metric("accuracy", history.history["accuracy"][epoch], step=epoch)

# Persist the model and log the model in MLflow
self.extra_params["intent_names"] = intent_names
mlflow.log_params(self.extra_params)
model_instance = self.save_model() # Save the model using the internal method
# Log the model in MLflow
mlflow.keras.log_model(model_instance, "intent_classifier_model")
# Register the model in MLflow's Model Registry
model_uri = f"runs:/{run.info.run_id}/intent_classifier_model"
mlflow.register_model(model_uri, "IntentClassifierModel")

def get_model(self):
# Define input layers
input_ids = tf.keras.Input(shape=(None,), dtype=tf.int32, name='input_ids')
attention_mask = tf.keras.Input(shape=(None,), dtype=tf.int32, name='attention_mask')
token_type_ids = tf.keras.Input(shape=(None,), dtype=tf.int32, name='token_type_ids')

# Call the model on the inputs
outputs = self.call(
{'input_ids': input_ids, 'attention_mask': attention_mask, 'token_type_ids': token_type_ids})

# Return a Keras Model
return tf.keras.Model(inputs=[input_ids, attention_mask, token_type_ids], outputs=outputs)
@tfbp.runnable
def evaluate(self):
encoded_texts, encoded_intents, _, _, _ = self.data_loader(
Expand All @@ -168,9 +206,16 @@ def evaluate(self):
scores["Overall Scores"] = overall_score
scores = self.format_scores(scores)

# Log evaluation results to MLflow
with mlflow.start_run():
mlflow.log_metrics({
"intent_confidence": overall_score["intent_confidence"],
"loss": overall_score["loss"]
})

print("\nScores per intent:")
for intent, score in scores.items():
print("{}: {}".format(intent, score))
print(f"{intent}: {score}")

return scores

Expand Down
1 change: 1 addition & 0 deletions nlu/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ fastapi==0.100.0
uvicorn[standard]==0.23.1
autopep8==2.0.2
focal-loss==0.0.7
mlflow==2.16.2
h5py --only-binary=h5py
2 changes: 1 addition & 1 deletion nlu/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
if os.path.isfile(os.path.join(model.save_dir, "checkpoint")):
model.restore()
else:
model.save()
model.save_model()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why we need to rename this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid confusion with keras' built-in save method. It's been throwing aberrant exceptions. Solved it by renaming the boilerplate method


# Run the specified model method.
if FLAGS.method not in Model._methods:
Expand Down