From b25aed6536e4aa70a5295bfc58bc738c5713c56a Mon Sep 17 00:00:00 2001 From: hexastack Date: Tue, 10 Dec 2024 20:20:32 +0100 Subject: [PATCH] feat: add ludwig serve API to handle multiple models + extra refactoring --- nlu/.env.example | 2 +- nlu/.gitignore | 6 +- nlu/docker-compose.predict.yml | 9 +- nlu/docker-compose.serve.yml | 13 +- nlu/docker-compose.train.yml | 5 +- nlu/docker-compose.visualize.yml | 9 +- nlu/requirements.txt | 7 +- nlu/serve.py | 256 ++++++++++++++++++++++++++++ nlu/src/intent-classifier-lstm.yaml | 6 +- nlu/src/language-classifier.yaml | 7 +- nlu/src/slot-filler-lstm.yaml | 2 +- 11 files changed, 294 insertions(+), 28 deletions(-) create mode 100644 nlu/serve.py diff --git a/nlu/.env.example b/nlu/.env.example index f323bd34..9edabcbb 100644 --- a/nlu/.env.example +++ b/nlu/.env.example @@ -1 +1 @@ -MODEL_NAME='intent_classifier' \ No newline at end of file +MODEL_NAME='intent-classifier-lstm' \ No newline at end of file diff --git a/nlu/.gitignore b/nlu/.gitignore index 0c079cd0..4c732979 100644 --- a/nlu/.gitignore +++ b/nlu/.gitignore @@ -1,6 +1,8 @@ results/ -predicitions/ +predictions/ visualizations/ data/* .env -venv \ No newline at end of file +venv/ +.idea/ +.aim/ diff --git a/nlu/docker-compose.predict.yml b/nlu/docker-compose.predict.yml index 0826bcba..5e683709 100644 --- a/nlu/docker-compose.predict.yml +++ b/nlu/docker-compose.predict.yml @@ -5,9 +5,12 @@ services: image: ludwigai/ludwig:master command: > predict - --model_path /src/results/experiment_run/model + --model_path /results/experiment_run/model --dataset /data/predict.csv - --output_directory /src/predictions + --output_directory /predictions volumes: - ./data:/data - - ./src:/src \ No newline at end of file + - ./src:/src + - ./results:/results + - ./predictions:/predictions + \ No newline at end of file diff --git a/nlu/docker-compose.serve.yml b/nlu/docker-compose.serve.yml index 8579b847..f8adaf71 100644 --- a/nlu/docker-compose.serve.yml +++ b/nlu/docker-compose.serve.yml @@ -3,12 +3,15 @@ version: '3.8' services: ludwig: image: ludwigai/ludwig:master - command: serve --model_path /src/results/experiment_run/model + command: >- + serve + --model_paths '{"slot": "/results/slot-filler-lstm/experiment_run/model", "intent": "/results/language-classifier-cnn/experiment_run/model"}' stdin_open: true tty: true volumes: - - ./data:/data - - ./src:/src - ports: - - "8000:8000" # This exposes port 8000 on the host and maps it to port 8000 inside the container + - ./data:/data + - ./results:/results + - ./serve.py:/usr/local/lib/python3.8/site-packages/ludwig/serve.py + ports: + - "8000:8000" diff --git a/nlu/docker-compose.train.yml b/nlu/docker-compose.train.yml index 3e17dec0..16b0eb0c 100644 --- a/nlu/docker-compose.train.yml +++ b/nlu/docker-compose.train.yml @@ -5,8 +5,9 @@ services: image: ludwigai/ludwig:master command: > experiment --config /src/${MODEL_NAME}.yaml - --dataset /data/data.json - --output_directory /src/results + --dataset /data/train_data.json + --output_directory /results volumes: - ./data:/data - ./src:/src + - ./results/results diff --git a/nlu/docker-compose.visualize.yml b/nlu/docker-compose.visualize.yml index 42a68397..09d7b7b6 100644 --- a/nlu/docker-compose.visualize.yml +++ b/nlu/docker-compose.visualize.yml @@ -5,15 +5,14 @@ services: image: ludwigai/ludwig:master command: > visualize --visualization learning_curves - --ground_truth_metadata /src/results/experiment_run_0/model/training_set_metadata.json - --training_statistics /src/results/experiment_run_0/training_statistics.json + --ground_truth_metadata /results/experiment_run_0/model/training_set_metadata.json + --training_statistics /results/experiment_run_0/training_statistics.json --file_format png - --output_directory /src/results/visualizations + --output_directory /visualizations volumes: - - ./src:/src + - ./visualizations:/visualizations environment: - PYTHONUNBUFFERED=1 # Optional, to ensure logs are printed immediately - working_dir: /src/results networks: - default diff --git a/nlu/requirements.txt b/nlu/requirements.txt index a8bcb705..71203ea5 100644 --- a/nlu/requirements.txt +++ b/nlu/requirements.txt @@ -9,9 +9,9 @@ scipy>=0.18 tabulate>=0.7 scikit-learn tqdm -torch>=2.0.0 +torch==2.0.1 torchaudio -torchtext +torchtext==0.15.2 torchvision pydantic<2.0 transformers>=4.42.3 @@ -68,4 +68,5 @@ html5lib # html datasets # pin required for torch 2.1.0 -urllib3<2 \ No newline at end of file +urllib3<2 +ludwig==0.10.2 # Must downgrade to 0.10.2 or it won't be able to load in dockerized environment using Ludwig's official image diff --git a/nlu/serve.py b/nlu/serve.py new file mode 100644 index 00000000..83854664 --- /dev/null +++ b/nlu/serve.py @@ -0,0 +1,256 @@ +import argparse +import io +import json +import logging +import os +import sys +import tempfile + +import pandas as pd +import torch +from torchvision.io import decode_image + +from ludwig.api import LudwigModel +from ludwig.constants import AUDIO, COLUMN +from ludwig.contrib import add_contrib_callback_args +from ludwig.globals import LUDWIG_VERSION +from ludwig.utils.print_utils import get_logging_level_registry, print_ludwig +from ludwig.utils.server_utils import NumpyJSONResponse + +logger = logging.getLogger(__name__) + +try: + import uvicorn + from fastapi import FastAPI + from starlette.datastructures import UploadFile + from starlette.middleware import Middleware + from starlette.middleware.cors import CORSMiddleware + from starlette.requests import Request +except ImportError as e: + logger.error(e) + logger.error( + " fastapi and other serving dependencies cannot be loaded" + "and may have not been installed. " + "In order to install all serving dependencies run " + "pip install ludwig[serve]" + ) + sys.exit(-1) + +ALL_FEATURES_PRESENT_ERROR = {"error": "entry must contain all input features"} + +COULD_NOT_RUN_INFERENCE_ERROR = {"error": "Unexpected Error: could not run inference on model"} + +def server(models, allowed_origins=None): + middleware = [Middleware(CORSMiddleware, allow_origins=allowed_origins)] if allowed_origins else None + app = FastAPI(middleware=middleware) + + @app.get("/") + def check_health(): + return NumpyJSONResponse({"message": "Ludwig server is up", "models": list(models.keys())}) + + @app.post("/predict") + async def predict(request: Request): + try: + form = await request.form() + model_name = form.get("model_name") + if model_name not in models: + return NumpyJSONResponse( + {"error": f"Model '{model_name}' not found. Available models: {list(models.keys())}."}, + status_code=400, + ) + model = models[model_name] + input_features = {f[COLUMN] for f in model.config["input_features"]} + entry, files = convert_input(form, model.model.input_features) + except Exception: + logger.exception("Failed to parse predict form") + return NumpyJSONResponse(COULD_NOT_RUN_INFERENCE_ERROR, status_code=500) + + try: + if (entry.keys() & input_features) != input_features: + missing_features = set(input_features) - set(entry.keys()) + return NumpyJSONResponse( + { + "error": "Data received does not contain all input features. " + f"Missing features: {missing_features}." + }, + status_code=400, + ) + try: + resp, _ = model.predict(dataset=[entry], data_format=dict) + resp = resp.to_dict("records")[0] + return NumpyJSONResponse(resp) + except Exception as exc: + logger.exception(f"Failed to run predict: {exc}") + return NumpyJSONResponse(COULD_NOT_RUN_INFERENCE_ERROR, status_code=500) + finally: + for f in files: + os.remove(f.name) + + @app.post("/batch_predict") + async def batch_predict(request: Request): + try: + form = await request.form() + model_name = form.get("model_name") + if model_name not in models: + return NumpyJSONResponse( + {"error": f"Model '{model_name}' not found. Available models: {list(models.keys())}."}, + status_code=400, + ) + model = models[model_name] + input_features = {f[COLUMN] for f in model.config["input_features"]} + data, files = convert_batch_input(form, model.model.input_features) + data_df = pd.DataFrame.from_records(data["data"], index=data.get("index"), columns=data["columns"]) + except Exception: + logger.exception("Failed to parse batch_predict form") + return NumpyJSONResponse(COULD_NOT_RUN_INFERENCE_ERROR, status_code=500) + + if (set(data_df.columns) & input_features) != input_features: + missing_features = set(input_features) - set(data_df.columns) + return NumpyJSONResponse( + { + "error": "Data received does not contain all input features. " + f"Missing features: {missing_features}." + }, + status_code=400, + ) + try: + resp, _ = model.predict(dataset=data_df) + resp = resp.to_dict("split") + return NumpyJSONResponse(resp) + except Exception: + logger.exception("Failed to run batch_predict: {}") + return NumpyJSONResponse(COULD_NOT_RUN_INFERENCE_ERROR, status_code=500) + + return app + +def _write_file(v, files): + # Convert UploadFile to a NamedTemporaryFile to ensure it's on the disk + suffix = os.path.splitext(v.filename)[1] + named_file = tempfile.NamedTemporaryFile(delete=False, suffix=suffix) + files.append(named_file) + named_file.write(v.file.read()) + named_file.close() + return named_file.name + + +def _read_image_buffer(v): + # read bytes sent via REST API and convert to image tensor + # in [channels, height, width] format + byte_string = io.BytesIO(v.file.read()).read() + image = decode_image(torch.frombuffer(byte_string, dtype=torch.uint8)) + return image # channels, height, width + + +def convert_input(form, input_features): + """Returns a new input and a list of files to be cleaned up.""" + new_input = {} + files = [] + for k, v in form.multi_items(): + if isinstance(v, UploadFile): + # check if audio or image file + if input_features.get(k).type() == AUDIO: + new_input[k] = _write_file(v, files) + else: + new_input[k] = _read_image_buffer(v) + else: + new_input[k] = v + + return new_input, files + + +def convert_batch_input(form, input_features): + """Returns a new input and a list of files to be cleaned up.""" + file_index = {} + files = [] + for k, v in form.multi_items(): + if isinstance(v, UploadFile): + file_index[v.filename] = v + + data = json.loads(form["dataset"]) + for row in data["data"]: + for i, value in enumerate(row): + if value in file_index: + feature_name = data["columns"][i] + if input_features.get(feature_name).type() == AUDIO: + row[i] = _write_file(file_index[value], files) + else: + row[i] = _read_image_buffer(file_index[value]) + + return data, files +def run_server( + model_paths: dict, # Dictionary of model IDs to paths + host: str, + port: int, + allowed_origins: list, +) -> None: + """Loads pre-trained models and serves them on an http server.""" + # If model_paths is a string, convert it to a dictionary + if isinstance(model_paths, str): + model_paths = json.loads(model_paths) + + models = {} + for model_name, path in model_paths.items(): + models[model_name] = LudwigModel.load(path, backend="local") + + app = server(models, allowed_origins) + uvicorn.run(app, host=host, port=port) + + +def cli(sys_argv): + parser = argparse.ArgumentParser( + description="This script serves a pretrained model", prog="ludwig serve", usage="%(prog)s [options]" + ) + + # ---------------- + # Model parameters + # ---------------- + parser.add_argument("-m", "--model_paths", help="model to load", required=True) + + parser.add_argument( + "-l", + "--logging_level", + default="info", + help="the level of logging to use", + choices=["critical", "error", "warning", "info", "debug", "notset"], + ) + + # ---------------- + # Server parameters + # ---------------- + parser.add_argument( + "-p", + "--port", + help="port for server (default: 8000)", + default=8000, + type=int, + ) + + parser.add_argument("-H", "--host", help="host for server (default: 0.0.0.0)", default="0.0.0.0") + + parser.add_argument( + "-ao", + "--allowed_origins", + nargs="*", + help="A list of origins that should be permitted to make cross-origin requests. " + 'Use "*" to allow any origin. See https://www.starlette.io/middleware/#corsmiddleware.', + ) + + add_contrib_callback_args(parser) + args = parser.parse_args(sys_argv) + + args.callbacks = args.callbacks or [] + for callback in args.callbacks: + callback.on_cmdline("serve", *sys_argv) + + args.logging_level = get_logging_level_registry()[args.logging_level] + logging.getLogger("ludwig").setLevel(args.logging_level) + global logger + logger = logging.getLogger("ludwig.serve") + + print_ludwig("Serve", LUDWIG_VERSION) + + run_server(args.model_paths, args.host, args.port, args.allowed_origins) + + +if __name__ == "__main__": + cli(sys.argv[1:]) diff --git a/nlu/src/intent-classifier-lstm.yaml b/nlu/src/intent-classifier-lstm.yaml index 86213dfc..49dcccdc 100644 --- a/nlu/src/intent-classifier-lstm.yaml +++ b/nlu/src/intent-classifier-lstm.yaml @@ -3,7 +3,7 @@ input_features: type: sequence encoder: type: rnn - dropout: 0.2 + dropout: 0.2 cell_type: lstm bidirectional: true num_layers: 2 @@ -15,7 +15,7 @@ input_features: output_features: - name: intent type: category - dropout: 0.35 + dropout: 0.2 reduce_input: sum preprocessing: @@ -31,5 +31,5 @@ trainer: decay: true early_stop: 0 epochs: 10 - learning_rate: 0.0005 + learning_rate: 0.001 \ No newline at end of file diff --git a/nlu/src/language-classifier.yaml b/nlu/src/language-classifier.yaml index 66ee6365..77705e26 100644 --- a/nlu/src/language-classifier.yaml +++ b/nlu/src/language-classifier.yaml @@ -1,6 +1,6 @@ input_features: - name: text - type: text # We assume the data consists of text for language classification + type: text preprocessing: tokenizer: ngram ngram: 3 @@ -10,13 +10,14 @@ input_features: output_features: - name: language - type: category # Multi-class classification problem, with language labels + type: category decoder: type: classifier preprocessing: split: - type: random + type: stratify + column: language probabilities: - 0.85 - 0.1 diff --git a/nlu/src/slot-filler-lstm.yaml b/nlu/src/slot-filler-lstm.yaml index 6f822666..318d1b36 100644 --- a/nlu/src/slot-filler-lstm.yaml +++ b/nlu/src/slot-filler-lstm.yaml @@ -28,4 +28,4 @@ preprocessing: trainer: early_stop: 0 epochs: 10 - + \ No newline at end of file