diff --git a/docker/.env.example b/docker/.env.example index f9daf1a90..f213a6aee 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -48,6 +48,13 @@ TFLC_REPO_ID=Hexastack/tflc INTENT_CLASSIFIER_REPO_ID=Hexastack/intent-classifier SLOT_FILLER_REPO_ID=Hexastack/slot-filler NLU_ENGINE_PORT=5000 +MLFLOW_SERVER_PORT=5002 +POSTGRES_DB_PORT=5432 +POSTGRES_USER=postgres +POSTGRES_PASSWORD=postgres +POSTGRES_DB=mlflow_db +BACKEND_STORE_URI=postgresql://postgres:postgres@mlflow_postgres:5432/mlflow_db +ARTIFACT_STORE_URI=./mlruns # Frontend (Next.js) APP_FRONTEND_PORT=8080 diff --git a/docker/docker-compose.nlu.dev.yml b/docker/docker-compose.nlu.dev.yml index f4649846e..be1f735fd 100644 --- a/docker/docker-compose.nlu.dev.yml +++ b/docker/docker-compose.nlu.dev.yml @@ -8,3 +8,22 @@ services: pull_policy: build ports: - ${NLU_ENGINE_PORT}:5000 + mlflow-server: + build: + context: ../nlu/docker + dockerfile: Dockerfile + pull_policy: build + ports: + - ${MLFLOW_SERVER_PORT}:5000 + + mlflow_postgres: + image: bitnami/postgresql + container_name: postgres_db + environment: + - POSTGRES_USER=${POSTGRES_USER} + - POSTGRES_PASSWORD=${POSTGRES_PASSWORD} + - POSTGRES_DB=${POSTGRES_DB} + volumes: + - postgres_data:/var/lib/postgresql/data + ports: + - ${POSTGRES_DB_PORT}:${POSTGRES_DB_PORT} diff --git a/docker/docker-compose.nlu.yml b/docker/docker-compose.nlu.yml index e715c7cb1..04c7a69b5 100644 --- a/docker/docker-compose.nlu.yml +++ b/docker/docker-compose.nlu.yml @@ -23,8 +23,35 @@ services: retries: 5 start_period: 10s + mlflow_postgres: + image: bitnami/postgresql + container_name: postgres_db + environment: + - POSTGRES_USER=postgres + - POSTGRES_PASSWORD=postgres + - POSTGRES_DB=mlflow_db + volumes: + - postgres_data:/var/lib/postgresql/data + ports: + - ${POSTGRES_DB_PORT}:${POSTGRES_DB_PORT} + + mlflow_server: + restart: always + image: hexastack/hexabot-mlflow-server:latest + container_name: mlflow_server + environment: + - BACKEND_STORE_URI=${BACKEND_STORE_URI} # Connection string to Postgres + - ARTIFACT_STORE_URI=${ARTIFACT_STORE_URI} # Local directory for storing artifacts + ports: + - ${MLFLOW_SERVER_PORT}:5000 # Expose MLflow UI + volumes: + - mlruns:/mlruns # Mount local directory for MLflow artifacts + command: mlflow server --backend-store-uri postgresql://postgres:postgres@mlflow_postgres:5432/mlflow_db --default-artifact-root ./mlruns --host 0.0.0.0 --port 5000 + volumes: nlu-data: + postgres_data: + mlruns: networks: nlu-network: diff --git a/nlu/.gitignore b/nlu/.gitignore index 783e0c8ff..fde3dfc36 100644 --- a/nlu/.gitignore +++ b/nlu/.gitignore @@ -20,4 +20,6 @@ Icon? # IDEs *.swp -.env \ No newline at end of file +.env +*.pkl +mlruns \ No newline at end of file diff --git a/nlu/boilerplate.py b/nlu/boilerplate.py index 8e7c35ad2..1b6966fb1 100644 --- a/nlu/boilerplate.py +++ b/nlu/boilerplate.py @@ -138,7 +138,7 @@ def extra_params(self, value): def save_dir(self): return self._save_dir - def save(self): + def save_model(self): """Save the model's weights.""" if self._ckpt is None: self._ckpt = tf.train.Checkpoint(model=self) @@ -153,6 +153,7 @@ def save(self): self.save_dir, "extra_params.json") with open(extra_params_path, "w") as f: json.dump(self.extra_params, f, indent=4, sort_keys=True) + return self def restore(self): """Restore the model's latest saved weights.""" diff --git a/nlu/data_loaders/jisfdl.py b/nlu/data_loaders/jisfdl.py index ce4979185..fbe5ba6f9 100644 --- a/nlu/data_loaders/jisfdl.py +++ b/nlu/data_loaders/jisfdl.py @@ -4,7 +4,6 @@ import numpy as np from transformers import PreTrainedTokenizerFast, PreTrainedTokenizer - import boilerplate as tfbp from utils.json_helper import JsonHelper @@ -25,6 +24,8 @@ def __init__(self, id, intent, positions, slots, text): def __repr__(self): return str(json.dumps(self.__dict__, indent=2)) # type: ignore + + ## # JISFDL : Joint Intent and Slot Filling Model Data Loader ## @@ -54,7 +55,7 @@ def get_slot_from_token(self, token: str, slot_dict: Dict[str, str]): def encode_slots(self, tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast], all_slots: List[Dict[str, str]], all_texts: List[str], - slot_map: Dict[str, int], max_len: int): + slot_map: Dict[str, int], max_len: int): encoded_slots = np.zeros( shape=(len(all_texts), max_len), dtype=np.int32) @@ -89,7 +90,7 @@ def encode_slots(self, tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizer # now add to encoded_slots # the first and the last elements # in encoded text are special characters - encoded_slots[idx, 1:len(enc)+1] = enc + encoded_slots[idx, 1:len(enc) + 1] = enc return encoded_slots @@ -105,7 +106,9 @@ def parse_dataset_intents(self, data): if not bool(lang): examples = all_examples else: - examples = filter(lambda exp: any(e['entity'] == 'language' and e['value'] == lang for e in exp['entities']), all_examples) + examples = filter( + lambda exp: any(e['entity'] == 'language' and e['value'] == lang for e in exp['entities']), + all_examples) # Parse raw data for exp in examples: @@ -126,7 +129,7 @@ def parse_dataset_intents(self, data): return intents - def __call__(self, tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast], model_params = None): + def __call__(self, tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast], model_params=None): # I have already transformed the train and test datasets to the new format using # the transform to new hidden method. @@ -143,7 +146,8 @@ def __call__(self, tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast else: raise ValueError("Unknown method!") - def _transform_dataset(self, dataset: List[JointRawData], tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast], model_params = None): + def _transform_dataset(self, dataset: List[JointRawData], + tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast], model_params=None): # We have to encode the texts using the tokenizer to create tensors for training # the classifier. texts = [d.text for d in dataset] @@ -167,7 +171,7 @@ def _transform_dataset(self, dataset: List[JointRawData], tokenizer: Union[PreTr intent_names = model_params["intent_names"] else: intent_names = None - + if "slot_names" in model_params: slot_names = model_params["slot_names"] else: @@ -201,15 +205,14 @@ def _transform_dataset(self, dataset: List[JointRawData], tokenizer: Union[PreTr max_len = len(encoded_texts["input_ids"][0]) # type: ignore all_slots = [td.slots for td in dataset] all_texts = [td.text for td in dataset] - + if slot_map: encoded_slots = self.encode_slots(tokenizer, - all_slots, all_texts, slot_map, max_len) + all_slots, all_texts, slot_map, max_len) else: encoded_slots = None return encoded_texts, encoded_intents, encoded_slots, intent_names, slot_names - def encode_text(self, text: str, tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast]): return self.encode_texts([text], tokenizer) diff --git a/nlu/docker-compose.yml b/nlu/docker-compose.yml new file mode 100644 index 000000000..d64134250 --- /dev/null +++ b/nlu/docker-compose.yml @@ -0,0 +1,33 @@ +version: '3.9' +services: + mlflow_postgres: + image: bitnami/postgresql + container_name: postgres_db + environment: + - POSTGRES_USER=postgres + - POSTGRES_PASSWORD=postgres + - POSTGRES_DB=mlflow_db + volumes: + - postgres_data:/var/lib/postgresql/data + ports: + - "5432:5432" + + mlflow_server: + restart: always + build: + context: ./docker + dockerfile: Dockerfile # Specify the Dockerfile explicitly + image: mlflow + container_name: mlflow_server + environment: + - BACKEND_STORE_URI=postgresql://postgres:postgres@mlflow_postgres:5432/mlflow_db # Connection string to Postgres + - ARTIFACT_STORE_URI=./mlruns # Local directory for storing artifacts + ports: + - "5002:5000" # Expose MLflow UI + volumes: + - mlruns:/mlruns # Mount local directory for MLflow artifacts + command: mlflow server --backend-store-uri postgresql://postgres:postgres@mlflow_postgres:5432/mlflow_db --default-artifact-root ./mlruns --host 0.0.0.0 --port 5000 + +volumes: + postgres_data: {} + mlruns: \ No newline at end of file diff --git a/nlu/docker/Dockerfile b/nlu/docker/Dockerfile new file mode 100644 index 000000000..eed577682 --- /dev/null +++ b/nlu/docker/Dockerfile @@ -0,0 +1,8 @@ +FROM python:3.11 + +# Install python package +COPY requirements.txt /tmp/ + +EXPOSE 5000 + +RUN pip install --no-cache-dir -r /tmp/requirements.txt \ No newline at end of file diff --git a/nlu/docker/requirements.txt b/nlu/docker/requirements.txt new file mode 100644 index 000000000..a45d5c0df --- /dev/null +++ b/nlu/docker/requirements.txt @@ -0,0 +1,3 @@ +mlflow==2.16.2 +psycopg2-binary==2.9.10 +boto3==1.35.47 \ No newline at end of file diff --git a/nlu/models/intent_classifier.py b/nlu/models/intent_classifier.py index 0863dd3bd..c0c077553 100644 --- a/nlu/models/intent_classifier.py +++ b/nlu/models/intent_classifier.py @@ -19,7 +19,9 @@ from data_loaders.jisfdl import JISFDL import boilerplate as tfbp - +import mlflow +import time +import pickle ## # Intent Classification with BERT # This code is based on the paper BERT for Joint Intent Classification and Slot Filling by Chen et al. (2019), @@ -34,7 +36,7 @@ 'fr': "dbmdz/bert-base-french-europeana-cased", } - +mlflow.set_tracking_uri("http://mlflow_server:5002") # 0.0.0.0 for local development @tfbp.default_export class IntentClassifier(tfbp.Model): default_hparams = { @@ -42,7 +44,7 @@ class IntentClassifier(tfbp.Model): "num_epochs": 2, "dropout_prob": 0.1, "intent_num_labels": 7, - "gamma": 2, + "gamma": 2.0, "k": 3 } data_loader: JISFDL @@ -119,35 +121,70 @@ def format_scores(self, scores: Dict[str, dict]): @tfbp.runnable def fit(self): """Training""" - encoded_texts, encoded_intents, encoded_slots, intent_names, slot_names = self.data_loader( - self.tokenizer) - - if self.hparams.intent_num_labels != len(intent_names): - raise ValueError( - f"Hyperparam intent_num_labels mismatch, should be : {len(intent_names)}" - ) - - # Hyperparams, Optimizer and Loss function - opt = Adam(learning_rate=3e-5, epsilon=1e-08) - - losses = SparseCategoricalFocalLoss(gamma=self.hparams.gamma) - - metrics = [SparseCategoricalAccuracy("accuracy")] - - # Compile model - self.compile(optimizer=opt, loss=losses, metrics=metrics) - - x = {"input_ids": encoded_texts["input_ids"], "token_type_ids": encoded_texts["token_type_ids"], - "attention_mask": encoded_texts["attention_mask"]} - - super().fit( - x, encoded_intents, epochs=self.hparams.num_epochs, batch_size=32, shuffle=True) - - # Persist the model - self.extra_params["intent_names"] = intent_names - - self.save() - + # Start MLflow run + with mlflow.start_run() as run: + # Log hyperparameters + mlflow.log_param("language", self.hparams.language) + mlflow.log_param("num_epochs", self.hparams.num_epochs) + mlflow.log_param("dropout_prob", self.hparams.dropout_prob) + mlflow.log_param("intent_num_labels", self.hparams.intent_num_labels) + + encoded_texts, encoded_intents, encoded_slots, intent_names, slot_names = self.data_loader( + self.tokenizer) + + if self.hparams.intent_num_labels != len(intent_names): + raise ValueError( + f"Hyperparam intent_num_labels mismatch, should be : {len(intent_names)}" + ) + + # Hyperparams, Optimizer and Loss function + opt = Adam(learning_rate=3e-5, epsilon=1e-08) + + losses = SparseCategoricalFocalLoss(gamma=self.hparams.gamma) + + metrics = [SparseCategoricalAccuracy("accuracy")] + + # Compile model + self.compile(optimizer=opt, loss=losses, metrics=metrics) + + x = {"input_ids": encoded_texts["input_ids"], "token_type_ids": encoded_texts["token_type_ids"], + "attention_mask": encoded_texts["attention_mask"]} + + start_time = time.time() + history = super().fit( + x, encoded_intents, epochs=self.hparams.num_epochs, batch_size=32, shuffle=True) + end_time = time.time() + + # Log training time + mlflow.log_metric("training_time", end_time - start_time) + + # Log training metrics + for epoch in range(len(history.history['loss'])): + mlflow.log_metric("loss", history.history["loss"][epoch], step=epoch) + mlflow.log_metric("accuracy", history.history["accuracy"][epoch], step=epoch) + + # Persist the model and log the model in MLflow + self.extra_params["intent_names"] = intent_names + mlflow.log_params(self.extra_params) + model_instance = self.save_model() # Save the model using the internal method + # Log the model in MLflow + mlflow.keras.log_model(model_instance, "intent_classifier_model") + # Register the model in MLflow's Model Registry + model_uri = f"runs:/{run.info.run_id}/intent_classifier_model" + mlflow.register_model(model_uri, "IntentClassifierModel") + + def get_model(self): + # Define input layers + input_ids = tf.keras.Input(shape=(None,), dtype=tf.int32, name='input_ids') + attention_mask = tf.keras.Input(shape=(None,), dtype=tf.int32, name='attention_mask') + token_type_ids = tf.keras.Input(shape=(None,), dtype=tf.int32, name='token_type_ids') + + # Call the model on the inputs + outputs = self.call( + {'input_ids': input_ids, 'attention_mask': attention_mask, 'token_type_ids': token_type_ids}) + + # Return a Keras Model + return tf.keras.Model(inputs=[input_ids, attention_mask, token_type_ids], outputs=outputs) @tfbp.runnable def evaluate(self): encoded_texts, encoded_intents, _, _, _ = self.data_loader( @@ -168,9 +205,16 @@ def evaluate(self): scores["Overall Scores"] = overall_score scores = self.format_scores(scores) + # Log evaluation results to MLflow + with mlflow.start_run(): + mlflow.log_metrics({ + "intent_confidence": overall_score["intent_confidence"], + "loss": overall_score["loss"] + }) + print("\nScores per intent:") for intent, score in scores.items(): - print("{}: {}".format(intent, score)) + print(f"{intent}: {score}") return scores diff --git a/nlu/requirements.txt b/nlu/requirements.txt index 4c861bbe1..ff1312939 100644 --- a/nlu/requirements.txt +++ b/nlu/requirements.txt @@ -7,4 +7,5 @@ fastapi==0.100.0 uvicorn[standard]==0.23.1 autopep8==2.0.2 focal-loss==0.0.7 +mlflow==2.16.2 h5py --only-binary=h5py diff --git a/nlu/run.py b/nlu/run.py index 5ec628b95..c334542df 100644 --- a/nlu/run.py +++ b/nlu/run.py @@ -96,7 +96,7 @@ if os.path.isfile(os.path.join(model.save_dir, "checkpoint")): model.restore() else: - model.save() + model.save_model() # Run the specified model method. if FLAGS.method not in Model._methods: