diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1395dfa6..a79973ee 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -15,4 +15,11 @@ repos: - id: black language_version: python3 args: # arguments to configure black - - --line-length=128 \ No newline at end of file + - --line-length=128 + + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v2.0.0 # Use the latest version + hooks: + - id: flake8 + args: # arguments to configure black + - --ignore=E402,E501 \ No newline at end of file diff --git a/aixplain/cli_groups.py b/aixplain/cli_groups.py index c5f05826..ea5e28be 100644 --- a/aixplain/cli_groups.py +++ b/aixplain/cli_groups.py @@ -21,7 +21,7 @@ CLI Runner """ import click -from aixplain.factories.cli.model_factory_cli import list_host_machines, list_functions, create_asset_repo, asset_repo_login, onboard_model, deploy_huggingface_model, get_huggingface_model_status +from aixplain.factories.cli.model_factory_cli import list_host_machines, list_functions, create_asset_repo, asset_repo_login, onboard_model, deploy_huggingface_model, get_huggingface_model_status, list_gpus @click.group('cli') def cli(): @@ -51,6 +51,7 @@ def onboard(): create.add_command(create_asset_repo) list.add_command(list_host_machines) list.add_command(list_functions) +list.add_command(list_gpus) get.add_command(asset_repo_login) get.add_command(get_huggingface_model_status) onboard.add_command(onboard_model) @@ -58,4 +59,4 @@ def onboard(): def run_cli(): - cli() \ No newline at end of file + cli() diff --git a/aixplain/factories/cli/model_factory_cli.py b/aixplain/factories/cli/model_factory_cli.py index 264fadd9..b83d61cc 100644 --- a/aixplain/factories/cli/model_factory_cli.py +++ b/aixplain/factories/cli/model_factory_cli.py @@ -44,7 +44,7 @@ def list_host_machines(api_key: Optional[Text] = None) -> None: click.echo(ret_val_yaml) @click.command("functions") -@click.option("--verbose", default=False, +@click.option("--verbose", is_flag=True, help="List all function details, False by default.") @click.option("--api-key", default=None, help="TEAM_API_KEY if not already set in environment.") @@ -62,21 +62,37 @@ def list_functions(verbose: bool, api_key: Optional[Text] = None) -> None: ret_val_yaml = yaml.dump(ret_val) click.echo(ret_val_yaml) +@click.command("gpus") +@click.option("--api-key", default=None, + help="TEAM_API_KEY if not already set in environment.") +def list_gpus(api_key: Optional[Text] = None) -> None: + """CLI wrapper function for the LIST_GPUS function in ModelFactory. + + Args: + api_key (Text, optional): Team API key. Defaults to None. + Returns: + None + """ + ret_val = ModelFactory.list_gpus(api_key) + ret_val_yaml = yaml.dump(ret_val) + click.echo(ret_val_yaml) + @click.command("image-repo") @click.option("--name", help="Model name.") -@click.option("--hosting-machine", - help="Hosting machine code obtained from LIST_HOSTS.") -@click.option("--version", help="Model version.") @click.option("--description", help="Description of model.") @click.option("--function", help="Function name obtained from LIST_FUNCTIONS.") @click.option("--source-language", default="en", help="Model source language in 2-character 639-1 code or 3-character 639-3 code.") +@click.option("--input-modality", help="Input type (text, video, image, etc.)") +@click.option("--output-modality", help="Output type (text, video, image, etc.)") +@click.option("--documentation-url", default="", help="Link to model documentation.") @click.option("--api-key", default=None, help="TEAM_API_KEY if not already set in environment.") -def create_asset_repo(name: Text, hosting_machine: Text, version: Text, - description: Text, function: Text, - source_language: Text, - api_key: Optional[Text] = None) -> None: +def create_asset_repo(name: Text, description: Text, function: Text, + source_language: Text, input_modality: Text, + output_modality: Text, + documentation_url: Optional[Text] = "", + api_key: Optional[Text] = None) -> None: """CLI wrapper function for the CREATE_ASSET_REPO function in ModelFactory. Args: @@ -93,9 +109,10 @@ def create_asset_repo(name: Text, hosting_machine: Text, version: Text, Returns: None """ - ret_val = ModelFactory.create_asset_repo(name, hosting_machine, version, - description, function, - source_language, api_key) + ret_val = ModelFactory.create_asset_repo(name, description, function, + source_language, input_modality, + output_modality, documentation_url, + api_key) ret_val_yaml = yaml.dump(ret_val) click.echo(ret_val_yaml) @@ -119,8 +136,10 @@ def asset_repo_login(api_key: Optional[Text] = None) -> None: @click.option("--model-id", help="Model ID from CREATE_IMAGE_REPO.") @click.option("--image-tag", help="The tag of the image that you would like hosted.") @click.option("--image-hash", help="The hash of the image you would like onboarded.") +@click.option("--host-machine", default="", help="The machine on which to host the model.") @click.option("--api-key", default=None, help="TEAM_API_KEY if not already set in environment.") def onboard_model(model_id: Text, image_tag: Text, image_hash: Text, + host_machine: Optional[Text] = "", api_key: Optional[Text] = None) -> None: """CLI wrapper function for the ONBOARD_MODEL function in ModelFactory. @@ -132,17 +151,20 @@ def onboard_model(model_id: Text, image_tag: Text, image_hash: Text, Returns: None """ - ret_val = ModelFactory.onboard_model(model_id, image_tag, image_hash, api_key) + ret_val = ModelFactory.onboard_model(model_id, image_tag, image_hash, + host_machine, api_key) ret_val_yaml = yaml.dump(ret_val) click.echo(ret_val_yaml) @click.command("hf-model") @click.option("--name", help="User-defined name for Hugging Face model.") @click.option("--hf-repo-id", help="Repository ID from Hugging Face in {supplier}/{model name} form.") -@click.option("--hf-token", help="Hugging Face token used to authenticate to this model.") +@click.option("--revision", default="", help="Commit hash of repository.") +@click.option("--hf-token", default=None, help="Hugging Face token used to authenticate to this model.") @click.option("--api-key", default=None, help="TEAM_API_KEY if not already set in environment.") def deploy_huggingface_model(name: Text, hf_repo_id: Text, hf_token: Optional[Text] = None, + revision: Optional[Text] = None, api_key: Optional[Text] = None) -> None: """CLI wrapper function for the DEPLOY_HUGGINGFACE_MODEL function in ModelFactory. @@ -153,7 +175,7 @@ def deploy_huggingface_model(name: Text, hf_repo_id: Text, Returns: None """ - ret_val = ModelFactory.deploy_huggingface_model(name, hf_repo_id, hf_token, api_key) + ret_val = ModelFactory.deploy_huggingface_model(name, hf_repo_id, revision, hf_token, api_key) ret_val_yaml = yaml.dump(ret_val) click.echo(ret_val_yaml) @@ -172,4 +194,4 @@ def get_huggingface_model_status(model_id: Text, api_key: Optional[Text] = None) """ ret_val = ModelFactory.get_huggingface_model_status(model_id, api_key) ret_val_yaml = yaml.dump(ret_val) - click.echo(ret_val_yaml) \ No newline at end of file + click.echo(ret_val_yaml) diff --git a/aixplain/factories/model_factory.py b/aixplain/factories/model_factory.py index 9ed3138f..0fb845f1 100644 --- a/aixplain/factories/model_factory.py +++ b/aixplain/factories/model_factory.py @@ -24,6 +24,7 @@ import json import logging from aixplain.modules.model import Model +from aixplain.modules.model.llm_model import LLM from aixplain.enums import Function, Language, OwnershipType, Supplier, SortBy, SortOrder from aixplain.utils import config from aixplain.utils.file_utils import _request_with_retry @@ -60,13 +61,18 @@ def _create_model_from_response(cls, response: Dict) -> Model: if "language" in param["name"]: parameters[param["name"]] = [w["value"] for w in param["values"]] - return Model( + function = Function(response["function"]["id"]) + ModelClass = Model + if function == Function.TEXT_GENERATION: + ModelClass = LLM + + return ModelClass( response["id"], response["name"], supplier=response["supplier"], api_key=response["api_key"], cost=response["pricing"], - function=Function(response["function"]["id"]), + function=function, parameters=parameters, is_subscribed=True if "subscription" in response else False, version=response["version"]["id"], @@ -100,7 +106,7 @@ def get(cls, model_id: Text, api_key: Optional[Text] = None) -> Model: model = cls._create_model_from_response(resp) logging.info(f"Model Creation: Model {model_id} instantiated.") return model - except Exception as e: + except Exception: if resp is not None and "statusCode" in resp: status_code = resp["statusCode"] message = resp["message"] @@ -135,7 +141,7 @@ def _get_assets_from_page( sort_order: SortOrder = SortOrder.ASCENDING, ) -> List[Model]: try: - url = urljoin(cls.backend_url, f"sdk/models/paginate") + url = urljoin(cls.backend_url, "sdk/models/paginate") filter_params = {"q": query, "pageNumber": page_number, "pageSize": page_size} if is_finetunable is not None: filter_params["isFineTunable"] = is_finetunable @@ -253,7 +259,7 @@ def list_host_machines(cls, api_key: Optional[Text] = None) -> List[Dict]: List[Dict]: List of dictionaries containing information about each hosting machine. """ - machines_url = urljoin(config.BACKEND_URL, f"sdk/hosting-machines") + machines_url = urljoin(config.BACKEND_URL, "sdk/hosting-machines") logging.debug(f"URL: {machines_url}") if api_key: headers = {"x-api-key": f"{api_key}", "Content-Type": "application/json"} @@ -264,6 +270,25 @@ def list_host_machines(cls, api_key: Optional[Text] = None) -> List[Dict]: for dictionary in response_dicts: del dictionary["id"] return response_dicts + + @classmethod + def list_gpus(cls, api_key: Optional[Text] = None) -> List[List[Text]]: + """List GPU names on which you can host your language model. + + Args: + api_key (Text, optional): Team API key. Defaults to None. + + Returns: + List[List[Text]]: List of all available GPUs and their prices. + """ + gpu_url = urljoin(config.BACKEND_URL, "sdk/model-onboarding/gpus") + if api_key: + headers = {"Authorization": f"Token {api_key}", "Content-Type": "application/json"} + else: + headers = {"Authorization": f"Token {config.TEAM_API_KEY}", "Content-Type": "application/json"} + response = _request_with_retry("get", gpu_url, headers=headers) + response_list = json.loads(response.text) + return response_list @classmethod def list_functions(cls, verbose: Optional[bool] = False, api_key: Optional[Text] = None) -> List[Dict]: @@ -278,7 +303,7 @@ def list_functions(cls, verbose: Optional[bool] = False, api_key: Optional[Text] List[Dict]: List of dictionaries containing information about each supported function. """ - functions_url = urljoin(config.BACKEND_URL, f"sdk/functions") + functions_url = urljoin(config.BACKEND_URL, "sdk/functions") logging.debug(f"URL: {functions_url}") if api_key: headers = {"x-api-key": f"{api_key}", "Content-Type": "application/json"} @@ -304,12 +329,13 @@ def list_functions(cls, verbose: Optional[bool] = False, api_key: Optional[Text] def create_asset_repo( cls, name: Text, - hosting_machine: Text, - version: Text, description: Text, function: Text, source_language: Text, - api_key: Optional[Text] = None, + input_modality: Text, + output_modality: Text, + documentation_url: Optional[Text] = "", + api_key: Optional[Text] = None ) -> Dict: """Creates an image repository for this model and registers it in the platform backend. @@ -336,27 +362,36 @@ def create_asset_repo( function_id = function_dict["id"] if function_id is None: raise Exception("Invalid function name") - create_url = urljoin(config.BACKEND_URL, f"sdk/models/register") + create_url = urljoin(config.BACKEND_URL, f"sdk/models/onboard") logging.debug(f"URL: {create_url}") if api_key: headers = {"x-api-key": f"{api_key}", "Content-Type": "application/json"} else: headers = {"x-api-key": f"{config.TEAM_API_KEY}", "Content-Type": "application/json"} - always_on = False - is_async = False # Hard-coded to False for first release + payload = { - "name": name, - "hostingMachine": hosting_machine, - "alwaysOn": always_on, - "version": version, - "description": description, - "function": function_id, - "isAsync": is_async, - "sourceLanguage": source_language, + "model": { + "name": name, + "description": description, + "connectionType": [ + "synchronous" + ], + "function": function_id, + "modalities": [ + f"{input_modality}-{output_modality}" + ], + "documentationUrl": documentation_url, + "sourceLanguage": source_language + }, + "source": "aixplain-ecr", + "onboardingParams": { + } } - payload = json.dumps(payload) logging.debug(f"Body: {str(payload)}") - response = _request_with_retry("post", create_url, headers=headers, data=payload) + response = _request_with_retry("post", create_url, headers=headers, json=payload) + + assert response.status_code == 201 + return response.json() @classmethod @@ -370,23 +405,26 @@ def asset_repo_login(cls, api_key: Optional[Text] = None) -> Dict: Returns: Dict: Backend response """ - login_url = urljoin(config.BACKEND_URL, f"sdk/ecr/login") + login_url = urljoin(config.BACKEND_URL, "sdk/ecr/login") logging.debug(f"URL: {login_url}") if api_key: - headers = {"x-api-key": f"{api_key}", "Content-Type": "application/json"} + headers = {"Authorization": f"Token {api_key}", "Content-Type": "application/json"} else: - headers = {"x-api-key": f"{config.TEAM_API_KEY}", "Content-Type": "application/json"} + headers = {"Authorization": f"Token {config.TEAM_API_KEY}", "Content-Type": "application/json"} response = _request_with_retry("post", login_url, headers=headers) + print(f"Response: {response}") response_dict = json.loads(response.text) return response_dict @classmethod - def onboard_model(cls, model_id: Text, image_tag: Text, image_hash: Text, api_key: Optional[Text] = None) -> Dict: + def onboard_model(cls, model_id: Text, image_tag: Text, image_hash: Text, host_machine: Optional[Text] = "", api_key: Optional[Text] = None) -> Dict: """Onboard a model after its image has been pushed to ECR. Args: model_id (Text): Model ID obtained from CREATE_ASSET_REPO. image_tag (Text): Image tag to be onboarded. + image_hash (Text): Image digest. + host_machine (Text, optional): Machine on which to host model. api_key (Text, optional): Team API key. Defaults to None. Returns: Dict: Backend response @@ -397,18 +435,18 @@ def onboard_model(cls, model_id: Text, image_tag: Text, image_hash: Text, api_ke headers = {"x-api-key": f"{api_key}", "Content-Type": "application/json"} else: headers = {"x-api-key": f"{config.TEAM_API_KEY}", "Content-Type": "application/json"} - payload = {"image": image_tag, "sha": image_hash} - payload = json.dumps(payload) + payload = {"image": image_tag, "sha": image_hash, "hostMachine": host_machine} logging.debug(f"Body: {str(payload)}") - response = _request_with_retry("post", onboard_url, headers=headers, data=payload) - message = "Your onboarding request has been submitted to an aiXplain specialist for finalization. We will notify you when the process is completed." - logging.info(message) + response = _request_with_retry("post", onboard_url, headers=headers, json=payload) + if response.status_code == 201: + message = "Your onboarding request has been submitted to an aiXplain specialist for finalization. We will notify you when the process is completed." + logging.info(message) + else: + message = "An error has occurred. Please make sure your model_id is valid and your host_machine, if set, is a valid option from the LIST_GPUS function." return response @classmethod - def deploy_huggingface_model( - cls, name: Text, hf_repo_id: Text, hf_token: Optional[Text] = "", api_key: Optional[Text] = None - ) -> Dict: + def deploy_huggingface_model(cls, name: Text, hf_repo_id: Text, revision: Optional[Text] = "", hf_token: Optional[Text] = "", api_key: Optional[Text] = None) -> Dict: """Onboards and deploys a Hugging Face large language model. Args: @@ -420,7 +458,7 @@ def deploy_huggingface_model( Dict: Backend response """ supplier, model_name = hf_repo_id.split("/") - deploy_url = urljoin(config.BACKEND_URL, f"sdk/model-onboarding/onboard") + deploy_url = urljoin(config.BACKEND_URL, "sdk/model-onboarding/onboard") if api_key: headers = {"Authorization": f"Token {api_key}", "Content-Type": "application/json"} else: @@ -435,7 +473,12 @@ def deploy_huggingface_model( "sourceLanguage": "en", }, "source": "huggingface", - "onboardingParams": {"hf_model_name": model_name, "hf_supplier": supplier, "hf_token": hf_token}, + "onboardingParams": { + "hf_supplier": supplier, + "hf_model_name": model_name, + "hf_token": hf_token, + "revision": revision + } } response = _request_with_retry("post", deploy_url, headers=headers, json=body) logging.debug(response.text) diff --git a/aixplain/factories/pipeline_factory.py b/aixplain/factories/pipeline_factory.py index 404a5556..cc94ff79 100644 --- a/aixplain/factories/pipeline_factory.py +++ b/aixplain/factories/pipeline_factory.py @@ -73,7 +73,9 @@ def get(cls, pipeline_id: Text, api_key: Optional[Text] = None) -> Pipeline: resp = None try: url = urljoin(cls.backend_url, f"sdk/pipelines/{pipeline_id}") - if cls.aixplain_key != "": + if api_key is not None: + headers = {"Authorization": f"Token {api_key}", "Content-Type": "application/json"} + elif cls.aixplain_key != "": headers = {"x-aixplain-key": f"{cls.aixplain_key}", "Content-Type": "application/json"} else: headers = {"Authorization": f"Token {config.TEAM_API_KEY}", "Content-Type": "application/json"} @@ -86,7 +88,7 @@ def get(cls, pipeline_id: Text, api_key: Optional[Text] = None) -> Pipeline: resp["api_key"] = api_key pipeline = cls.__from_response(resp) return pipeline - except Exception as e: + except Exception: status_code = 400 if resp is not None and "statusCode" in resp: status_code = resp["statusCode"] @@ -172,7 +174,7 @@ def list( else: headers = {"Authorization": f"Token {config.TEAM_API_KEY}", "Content-Type": "application/json"} - assert 0 < page_size <= 100, f"Pipeline List Error: Page size must be greater than 0 and not exceed 100." + assert 0 < page_size <= 100, "Pipeline List Error: Page size must be greater than 0 and not exceed 100." payload = { "pageSize": page_size, "pageNumber": page_number, @@ -223,13 +225,16 @@ def list( return {"results": pipelines, "page_total": page_total, "page_number": page_number, "total": total} @classmethod - def create(cls, name: Text, pipeline: Union[Text, Dict], status: Text = "draft") -> Pipeline: + def create( + cls, name: Text, pipeline: Union[Text, Dict], status: Text = "draft", api_key: Optional[Text] = None + ) -> Pipeline: """Pipeline Creation Args: name (Text): Pipeline Name pipeline (Union[Text, Dict]): Pipeline as a Python dictionary or in a JSON file status (Text, optional): Status of the pipeline. Currently only draft pipelines can be saved. Defaults to "draft". + api_key (Optional[Text], optional): API Key. Defaults to None. Raises: Exception: Currently just the creation of draft pipelines are supported @@ -250,11 +255,12 @@ def create(cls, name: Text, pipeline: Union[Text, Dict], status: Text = "draft") # prepare payload payload = {"name": name, "status": "draft", "architecture": pipeline} url = urljoin(cls.backend_url, "sdk/pipelines") - headers = {"Authorization": f"Token {config.TEAM_API_KEY}", "Content-Type": "application/json"} + api_key = api_key if api_key is not None else config.TEAM_API_KEY + headers = {"Authorization": f"Token {api_key}", "Content-Type": "application/json"} logging.info(f"Start service for POST Create Pipeline - {url} - {headers} - {json.dumps(payload)}") r = _request_with_retry("post", url, headers=headers, json=payload) response = r.json() - return Pipeline(response["id"], name, config.TEAM_API_KEY) + return Pipeline(response["id"], name, api_key) except Exception as e: raise Exception(e) diff --git a/aixplain/modules/__init__.py b/aixplain/modules/__init__.py index bb9e696b..488c8c2f 100644 --- a/aixplain/modules/__init__.py +++ b/aixplain/modules/__init__.py @@ -27,6 +27,7 @@ from .metadata import MetaData from .metric import Metric from .model import Model +from .model.llm_model import LLM from .pipeline import Pipeline from .finetune import Finetune, FinetuneCost from .finetune.status import FinetuneStatus diff --git a/aixplain/modules/model.py b/aixplain/modules/model/__init__.py similarity index 98% rename from aixplain/modules/model.py rename to aixplain/modules/model/__init__.py index 983737c7..285cbe55 100644 --- a/aixplain/modules/model.py +++ b/aixplain/modules/model/__init__.py @@ -45,7 +45,7 @@ class Model(Asset): url (Text, optional): endpoint of the model. Defaults to config.MODELS_RUN_URL. supplier (Union[Dict, Text, Supplier, int], optional): supplier of the asset. Defaults to "aiXplain". version (Text, optional): version of the model. Defaults to "1.0". - function (Text, optional): model AI function. Defaults to None. + function (Function, optional): model AI function. Defaults to None. url (str): URL to run the model. backend_url (str): URL of the backend. pricing (Dict, optional): model price. Defaults to None. @@ -60,7 +60,7 @@ def __init__( api_key: Optional[Text] = None, supplier: Union[Dict, Text, Supplier, int] = "aiXplain", version: Optional[Text] = None, - function: Optional[Text] = None, + function: Optional[Function] = None, is_subscribed: bool = False, cost: Optional[Dict] = None, **additional_info, @@ -102,7 +102,7 @@ def __repr__(self): except Exception: return f"" - def __polling(self, poll_url: Text, name: Text = "model_process", wait_time: float = 0.5, timeout: float = 300) -> Dict: + def sync_poll(self, poll_url: Text, name: Text = "model_process", wait_time: float = 0.5, timeout: float = 300) -> Dict: """Keeps polling the platform to check whether an asynchronous call is done. Args: @@ -198,7 +198,7 @@ def run( return response poll_url = response["url"] end = time.time() - response = self.__polling(poll_url, name=name, timeout=timeout, wait_time=wait_time) + response = self.sync_poll(poll_url, name=name, timeout=timeout, wait_time=wait_time) return response except Exception as e: msg = f"Error in request for {name} - {traceback.format_exc()}" @@ -267,7 +267,7 @@ def check_finetune_status(self, after_epoch: Optional[int] = None): """ from aixplain.enums.asset_status import AssetStatus from aixplain.modules.finetune.status import FinetuneStatus - + headers = {"x-api-key": self.api_key, "Content-Type": "application/json"} resp = None try: @@ -278,7 +278,6 @@ def check_finetune_status(self, after_epoch: Optional[int] = None): finetune_status = AssetStatus(resp["finetuneStatus"]) model_status = AssetStatus(resp["modelStatus"]) logs = sorted(resp["logs"], key=lambda x: float(x["epoch"])) - target_epoch = None if after_epoch is not None: logs = [log for log in logs if float(log["epoch"]) > after_epoch] @@ -286,7 +285,6 @@ def check_finetune_status(self, after_epoch: Optional[int] = None): target_epoch = float(logs[0]["epoch"]) elif len(logs) > 0: target_epoch = float(logs[-1]["epoch"]) - if target_epoch is not None: log = None for log_ in logs: @@ -298,7 +296,6 @@ def check_finetune_status(self, after_epoch: Optional[int] = None): log["trainLoss"] = log_["trainLoss"] if log_["evalLoss"] is not None: log["evalLoss"] = log_["evalLoss"] - status = FinetuneStatus( status=finetune_status, model_status=model_status, diff --git a/aixplain/modules/model/llm_model.py b/aixplain/modules/model/llm_model.py new file mode 100644 index 00000000..349ea595 --- /dev/null +++ b/aixplain/modules/model/llm_model.py @@ -0,0 +1,227 @@ +__author__ = "lucaspavanelli" + +""" +Copyright 2024 The aiXplain SDK authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +Author: Thiago Castro Ferreira, Shreyas Sharma and Lucas Pavanelli +Date: June 4th 2024 +Description: + Large Language Model Class +""" +import time +import json +import logging +import traceback +from aixplain.factories.file_factory import FileFactory +from aixplain.enums import Function, Supplier +from aixplain.modules.model import Model +from aixplain.utils import config +from aixplain.utils.file_utils import _request_with_retry +from typing import Union, Optional, List, Text, Dict + + +class LLM(Model): + """Ready-to-use LLM model. This model can be run in both synchronous and asynchronous manner. + + Attributes: + id (Text): ID of the Model + name (Text): Name of the Model + description (Text, optional): description of the model. Defaults to "". + api_key (Text, optional): API key of the Model. Defaults to None. + url (Text, optional): endpoint of the model. Defaults to config.MODELS_RUN_URL. + supplier (Union[Dict, Text, Supplier, int], optional): supplier of the asset. Defaults to "aiXplain". + version (Text, optional): version of the model. Defaults to "1.0". + function (Text, optional): model AI function. Defaults to None. + url (str): URL to run the model. + backend_url (str): URL of the backend. + pricing (Dict, optional): model price. Defaults to None. + **additional_info: Any additional Model info to be saved + """ + + def __init__( + self, + id: Text, + name: Text, + description: Text = "", + api_key: Optional[Text] = None, + supplier: Union[Dict, Text, Supplier, int] = "aiXplain", + version: Optional[Text] = None, + function: Optional[Function] = None, + is_subscribed: bool = False, + cost: Optional[Dict] = None, + **additional_info, + ) -> None: + """LLM Init + + Args: + id (Text): ID of the Model + name (Text): Name of the Model + description (Text, optional): description of the model. Defaults to "". + api_key (Text, optional): API key of the Model. Defaults to None. + supplier (Union[Dict, Text, Supplier, int], optional): supplier of the asset. Defaults to "aiXplain". + version (Text, optional): version of the model. Defaults to "1.0". + function (Function, optional): model AI function. Defaults to None. + is_subscribed (bool, optional): Is the user subscribed. Defaults to False. + cost (Dict, optional): model price. Defaults to None. + **additional_info: Any additional Model info to be saved + """ + assert function == Function.TEXT_GENERATION, "LLM only supports large language models (i.e. text generation function)" + super().__init__( + id=id, + name=name, + description=description, + supplier=supplier, + version=version, + cost=cost, + function=function, + is_subscribed=is_subscribed, + api_key=api_key, + **additional_info, + ) + self.url = config.MODELS_RUN_URL + self.backend_url = config.BACKEND_URL + + def run( + self, + data: Text, + context: Optional[Text] = None, + prompt: Optional[Text] = None, + history: Optional[List[Dict]] = None, + temperature: float = 0.001, + max_tokens: int = 128, + top_p: float = 1.0, + name: Text = "model_process", + timeout: float = 300, + parameters: Dict = {}, + wait_time: float = 0.5, + ) -> Dict: + """Synchronously running a Large Language Model (LLM) model. + + Args: + data (Union[Text, Dict]): Text to LLM or last user utterance of a conversation. + context (Optional[Text], optional): System message. Defaults to None. + prompt (Optional[Text], optional): Prompt Message which comes on the left side of the last utterance. Defaults to None. + history (Optional[List[Dict]], optional): Conversation history in OpenAI format ([{ "role": "assistant", "content": "Hello, world!"}]). Defaults to None. + temperature (float, optional): LLM temperature. Defaults to 0.001. + max_tokens (int, optional): Maximum Generation Tokens. Defaults to 128. + top_p (float, optional): Top P. Defaults to 1.0. + name (Text, optional): ID given to a call. Defaults to "model_process". + timeout (float, optional): total polling time. Defaults to 300. + parameters (Dict, optional): optional parameters to the model. Defaults to "{}". + wait_time (float, optional): wait time in seconds between polling calls. Defaults to 0.5. + + Returns: + Dict: parsed output from model + """ + start = time.time() + try: + response = self.run_async( + data, + name=name, + temperature=temperature, + max_tokens=max_tokens, + top_p=top_p, + context=context, + prompt=prompt, + history=history, + parameters=parameters, + ) + if response["status"] == "FAILED": + end = time.time() + response["elapsed_time"] = end - start + return response + poll_url = response["url"] + end = time.time() + response = self.sync_poll(poll_url, name=name, timeout=timeout, wait_time=wait_time) + return response + except Exception as e: + msg = f"Error in request for {name} - {traceback.format_exc()}" + logging.error(f"LLM Run: Error in running for {name}: {e}") + end = time.time() + return {"status": "FAILED", "error": msg, "elapsed_time": end - start} + + def run_async( + self, + data: Text, + context: Optional[Text] = None, + prompt: Optional[Text] = None, + history: Optional[List[Dict]] = None, + temperature: float = 0.001, + max_tokens: int = 128, + top_p: float = 1.0, + name: Text = "model_process", + parameters: Dict = {}, + ) -> Dict: + """Runs asynchronously a model call. + + Args: + data (Union[Text, Dict]): Text to LLM or last user utterance of a conversation. + context (Optional[Text], optional): System message. Defaults to None. + prompt (Optional[Text], optional): Prompt Message which comes on the left side of the last utterance. Defaults to None. + history (Optional[List[Dict]], optional): Conversation history in OpenAI format ([{ "role": "assistant", "content": "Hello, world!"}]). Defaults to None. + temperature (float, optional): LLM temperature. Defaults to 0.001. + max_tokens (int, optional): Maximum Generation Tokens. Defaults to 128. + top_p (float, optional): Top P. Defaults to 1.0. + name (Text, optional): ID given to a call. Defaults to "model_process". + parameters (Dict, optional): optional parameters to the model. Defaults to "{}". + + Returns: + dict: polling URL in response + """ + headers = {"x-api-key": self.api_key, "Content-Type": "application/json"} + + data = FileFactory.to_link(data) + if isinstance(data, dict): + payload = data + else: + try: + payload = json.loads(data) + if isinstance(payload, dict) is False: + if isinstance(payload, int) is True or isinstance(payload, float) is True: + payload = str(payload) + payload = {"data": payload} + except Exception: + payload = {"data": data} + parameters.update( + { + "context": context, + "prompt": prompt, + "history": history, + "temperature": temperature, + "max_tokens": max_tokens, + "top_p": top_p, + } + ) + payload.update(parameters) + payload = json.dumps(payload) + + call_url = f"{self.url}/{self.id}" + r = _request_with_retry("post", call_url, headers=headers, data=payload) + logging.info(f"Model Run Async: Start service for {name} - {self.url} - {payload} - {headers}") + + resp = None + try: + resp = r.json() + logging.info(f"Result of request for {name} - {r.status_code} - {resp}") + + poll_url = resp["data"] + response = {"status": "IN_PROGRESS", "url": poll_url} + except Exception: + response = {"status": "FAILED"} + msg = f"Error in request for {name} - {traceback.format_exc()}" + logging.error(f"Model Run Async: Error in running for {name}: {resp}") + if resp is not None: + response["error"] = msg + return response diff --git a/docs/user/user_doc.md b/docs/user/user_doc.md index 89efa478..4466e121 100644 --- a/docs/user/user_doc.md +++ b/docs/user/user_doc.md @@ -57,80 +57,85 @@ poll_url = start_response["url"] ## Poll to see current job status poll_response = model.poll(poll_url) ``` + +You may also set special parameters for Large Language Models in the platform. + +```python +from aixplain.factories import ModelFactory +from aixplain.enums import Function +model = ModelFactory.list(query="GPT-4o", function=Function.TEXT_GENERATION)["results"][0] +response = model.run( + data="What is my name?", # last utterance + context="Always assist with care, respect, and truth. Respond with utmost utility yet securely. Avoid harmful, unethical, prejudiced, or negative content. Ensure replies promote fairness and positivity.", # system prompt + history=[ + { "role": "user", "content": "Hello! My name is James." }, + { "role": "assistant", "content": "Hello!" } + ], # conversation history, + temperature=0.7 +) +``` + ### Deploying Hugging Face Large Language Models -This feature is currently undergoing maintenance. - +Once the on-boarding process has completed, you can use this newly-deployed large language model just like any other private model on our platform. Note that our platform currently only supports language models up 7 billion parameters in size (~30 GB), so any attempts to deploy larger models will result in an error message. ### Uploading Models -This feature is currently undergoing maintenance. - +This will send an email to an aiXplain associate to finalize the onboarding process. ## Pipelines [Design](https://aixplain.com/platform/studio/) is aiXplain’s no-code AI pipeline builder tool that accelerates AI development by providing a seamless experience to build complex AI systems and deploy them within minutes. You can visit our platform and design your own custom pipeline [here](https://platform.aixplain.com/studio). diff --git a/tests/functional/finetune/data/finetune_test_cost_estimation.json b/tests/functional/finetune/data/finetune_test_cost_estimation.json index 80f4d331..44707255 100644 --- a/tests/functional/finetune/data/finetune_test_cost_estimation.json +++ b/tests/functional/finetune/data/finetune_test_cost_estimation.json @@ -9,4 +9,4 @@ {"model_name": "MPT 7b storywriter", "model_id": "6551a870bf42e6037ab109db", "dataset_name": "Test text generation dataset"}, {"model_name": "BloomZ 7b", "model_id": "6551ab17bf42e6037ab109e0", "dataset_name": "Test text generation dataset"}, {"model_name": "BloomZ 7b MT", "model_id": "656e80147ca71e334752d5a3", "dataset_name": "Test text generation dataset"} -] \ No newline at end of file +] diff --git a/tests/functional/finetune/data/finetune_test_end2end.json b/tests/functional/finetune/data/finetune_test_end2end.json index 80768de9..90232a03 100644 --- a/tests/functional/finetune/data/finetune_test_end2end.json +++ b/tests/functional/finetune/data/finetune_test_end2end.json @@ -23,4 +23,4 @@ "required_dev": false, "search_metadata": false } -] \ No newline at end of file +] diff --git a/tests/functional/finetune/finetune_functional_test.py b/tests/functional/finetune/finetune_functional_test.py index ffa9ad5a..7b45613c 100644 --- a/tests/functional/finetune/finetune_functional_test.py +++ b/tests/functional/finetune/finetune_functional_test.py @@ -130,4 +130,4 @@ def test_prompt_validator(validate_prompt_input_map): finetune = FinetuneFactory.create( str(uuid.uuid4()), dataset_list, model, prompt_template=validate_prompt_input_map["prompt_template"] ) - assert exc_info.type is AssertionError \ No newline at end of file + assert exc_info.type is AssertionError diff --git a/tests/functional/general_assets/asset_functional_test.py b/tests/functional/general_assets/asset_functional_test.py index 93a3b297..d35a4d9a 100644 --- a/tests/functional/general_assets/asset_functional_test.py +++ b/tests/functional/general_assets/asset_functional_test.py @@ -3,6 +3,7 @@ load_dotenv() from aixplain.factories import ModelFactory, DatasetFactory, MetricFactory, PipelineFactory +from aixplain.modules import LLM from pathlib import Path from aixplain.enums import Function, Language, OwnershipType, Supplier, SortBy, SortOrder @@ -90,7 +91,7 @@ def test_model_sort(): def test_model_ownership(): models = ModelFactory.list(ownership=OwnershipType.SUBSCRIBED)["results"] for model in models: - assert model.is_subscribed == True + assert model.is_subscribed is True def test_model_query(): @@ -101,6 +102,13 @@ def test_model_query(): def test_model_deletion(): + """Test that a model cannot be deleted.""" model = ModelFactory.get("640b517694bf816d35a59125") with pytest.raises(Exception): model.delete() + + +def test_llm_instantiation(): + """Test that the LLM model is correctly instantiated.""" + models = ModelFactory.list(function=Function.TEXT_GENERATION)["results"] + assert isinstance(models[0], LLM) diff --git a/tests/functional/model/run_model_test.py b/tests/functional/model/run_model_test.py new file mode 100644 index 00000000..79979357 --- /dev/null +++ b/tests/functional/model/run_model_test.py @@ -0,0 +1,22 @@ +__author__ = "thiagocastroferreira" + +import pytest + +from aixplain.enums import Function +from aixplain.factories import ModelFactory +from aixplain.modules import LLM + + +@pytest.mark.parametrize("llm_model", ["Groq Llama 3 70B", "Chat GPT 3.5", "GPT-4o", "GPT 4 (32k)"]) +def test_llm_run(llm_model): + """Testing LLMs with history context""" + model = ModelFactory.list(query=llm_model, function=Function.TEXT_GENERATION)["results"][0] + + assert isinstance(model, LLM) + + response = model.run( + data="What is my name?", + history=[{"role": "user", "content": "Hello! My name is Thiago."}, {"role": "assistant", "content": "Hello!"}], + ) + assert response["status"] == "SUCCESS" + assert "thiago" in response["data"].lower() diff --git a/tests/functional/pipelines/run_test.py b/tests/functional/pipelines/run_test.py index e4389587..e8bc4d9c 100644 --- a/tests/functional/pipelines/run_test.py +++ b/tests/functional/pipelines/run_test.py @@ -18,6 +18,7 @@ import pytest import os +import requests from aixplain.factories import DatasetFactory, PipelineFactory @@ -38,61 +39,110 @@ def test_get_pipeline(): assert hypothesis_pipeline.id == reference_pipeline.id -@pytest.mark.parametrize("batchmode", [True, False]) -def test_run_single_str(batchmode: bool): +@pytest.mark.parametrize( + "batchmode,version", + [ + (True, "2.0"), + (True, "3.0"), + (False, "2.0"), + (False, "3.0"), + ], +) +def test_run_single_str(batchmode: bool, version: str): pipeline = PipelineFactory.list(query="SingleNodePipeline")["results"][0] - response = pipeline.run(data="Translate this thing", **{"batchmode": batchmode}) + response = pipeline.run(data="Translate this thing", **{"batchmode": batchmode, "version": version}) assert response["status"] == "SUCCESS" -@pytest.mark.parametrize("batchmode", [True, False]) -def test_run_single_local_file(batchmode: bool): +@pytest.mark.parametrize( + "batchmode,version", + [ + (True, "2.0"), + (True, "3.0"), + (False, "2.0"), + (False, "3.0"), + ], +) +def test_run_single_local_file(batchmode: bool, version: str): pipeline = PipelineFactory.list(query="SingleNodePipeline")["results"][0] fname = "translate_this.txt" with open(fname, "w") as f: f.write("Translate this thing") - response = pipeline.run(data=fname, **{"batchmode": batchmode}) + response = pipeline.run(data=fname, **{"batchmode": batchmode, "version": version}) os.remove(fname) assert response["status"] == "SUCCESS" -@pytest.mark.parametrize("batchmode", [True, False]) -def test_run_with_url(batchmode: bool): +@pytest.mark.parametrize( + "batchmode,version", + [ + (True, "2.0"), + (True, "3.0"), + (False, "2.0"), + (False, "3.0"), + ], +) +def test_run_with_url(batchmode: bool, version: str): pipeline = PipelineFactory.list(query="SingleNodePipeline")["results"][0] response = pipeline.run( data="https://aixplain-platform-assets.s3.amazonaws.com/data/dev/64c81163f8bdcac7443c2dad/data/f8.txt", - **{"batchmode": batchmode} + **{"batchmode": batchmode, "version": version} ) assert response["status"] == "SUCCESS" -@pytest.mark.parametrize("batchmode", [True, False]) -def test_run_with_dataset(batchmode: bool): +@pytest.mark.parametrize( + "batchmode,version", + [ + (True, "2.0"), + (True, "3.0"), + (False, "2.0"), + (False, "3.0"), + ], +) +def test_run_with_dataset(batchmode: bool, version: str): dataset = DatasetFactory.list(query="for_functional_tests")["results"][0] data_asset_id = dataset.id data_id = dataset.source_data["en"].id pipeline = PipelineFactory.list(query="SingleNodePipeline")["results"][0] - response = pipeline.run(data=data_id, data_asset=data_asset_id, **{"batchmode": batchmode}) + response = pipeline.run(data=data_id, data_asset=data_asset_id, **{"batchmode": batchmode, "version": version}) assert response["status"] == "SUCCESS" -@pytest.mark.parametrize("batchmode", [True, False]) -def test_run_multipipe_with_strings(batchmode: bool): +@pytest.mark.parametrize( + "batchmode,version", + [ + (True, "2.0"), + (True, "3.0"), + (False, "2.0"), + (False, "3.0"), + ], +) +def test_run_multipipe_with_strings(batchmode: bool, version: str): pipeline = PipelineFactory.list(query="MultiInputPipeline")["results"][0] response = pipeline.run( - data={"Input": "Translate this thing.", "Reference": "Traduza esta coisa."}, **{"batchmode": batchmode} + data={"Input": "Translate this thing.", "Reference": "Traduza esta coisa."}, + **{"batchmode": batchmode, "version": version} ) assert response["status"] == "SUCCESS" -@pytest.mark.parametrize("batchmode", [True, False]) -def test_run_multipipe_with_datasets(batchmode: bool): +@pytest.mark.parametrize( + "batchmode,version", + [ + (True, "2.0"), + (True, "3.0"), + (False, "2.0"), + (False, "3.0"), + ], +) +def test_run_multipipe_with_datasets(batchmode: bool, version: str): pipeline = PipelineFactory.list(query="MultiInputPipeline")["results"][0] dataset = DatasetFactory.list(query="for_functional_tests")["results"][0] @@ -104,27 +154,50 @@ def test_run_multipipe_with_datasets(batchmode: bool): response = pipeline.run( data={"Input": input_id, "Reference": reference_id}, data_asset={"Input": data_asset_id, "Reference": data_asset_id}, - **{"batchmode": batchmode} + **{"batchmode": batchmode, "version": version} ) assert response["status"] == "SUCCESS" -def test_run_segment_reconstruct(): +@pytest.mark.parametrize("version", ["2.0", "3.0"]) +def test_run_segment_reconstruct(version: str): pipeline = PipelineFactory.list(query="Segmentation/Reconstruction Functional Test - DO NOT DELETE")["results"][0] - response = pipeline.run("https://aixplain-platform-assets.s3.amazonaws.com/samples/en/CPAC1x2.wav") + response = pipeline.run("https://aixplain-platform-assets.s3.amazonaws.com/samples/en/CPAC1x2.wav", **{"version": version}) assert response["status"] == "SUCCESS" output = response["data"][0] assert output["label"] == "Output 1" -def test_run_metric(): +@pytest.mark.parametrize("version", ["2.0", "3.0"]) +def test_run_translation_metric(version: str): + dataset = DatasetFactory.list(query="for_functional_tests")["results"][0] + data_asset_id = dataset.id + + reference_id = dataset.target_data["pt"][0].id + + pipeline = PipelineFactory.list(query="Translation Metric Functional Test - DO NOT DELETE")["results"][0] + response = pipeline.run( + data={"TextInput": reference_id, "ReferenceInput": reference_id}, + data_asset={"TextInput": data_asset_id, "ReferenceInput": data_asset_id}, + **{"version": version} + ) + + assert response["status"] == "SUCCESS" + data = response["data"][0]["segments"][0]["response"] + data = requests.get(data).text + assert float(data) == 100.0 + + +@pytest.mark.parametrize("version", ["2.0", "3.0"]) +def test_run_metric(version: str): pipeline = PipelineFactory.list(query="ASR Metric Functional Test - DO NOT DELETE")["results"][0] response = pipeline.run( { "AudioInput": "https://aixplain-platform-assets.s3.amazonaws.com/samples/en/CPAC1x2.wav", "ReferenceInput": "https://aixplain-platform-assets.s3.amazonaws.com/samples/en/CPAC1x2.txt", - } + }, + **{"version": version} ) assert response["status"] == "SUCCESS" @@ -134,15 +207,17 @@ def test_run_metric(): @pytest.mark.parametrize( - "input_data,output_data", + "input_data,output_data,version", [ - ("https://aixplain-platform-assets.s3.amazonaws.com/samples/en/CPAC1x2.wav", "AudioOutput"), - ("https://aixplain-platform-assets.s3.amazonaws.com/samples/en/CPAC1x2.txt", "TextOutput"), + ("https://aixplain-platform-assets.s3.amazonaws.com/samples/en/CPAC1x2.wav", "AudioOutput", "2.0"), + ("https://aixplain-platform-assets.s3.amazonaws.com/samples/en/CPAC1x2.txt", "TextOutput", "2.0"), + ("https://aixplain-platform-assets.s3.amazonaws.com/samples/en/CPAC1x2.wav", "AudioOutput", "3.0"), + ("https://aixplain-platform-assets.s3.amazonaws.com/samples/en/CPAC1x2.txt", "TextOutput", "3.0"), ], ) -def test_run_router(input_data: str, output_data: str): +def test_run_router(input_data: str, output_data: str, version: str): pipeline = PipelineFactory.list(query="Router Test - DO NOT DELETE")["results"][0] - response = pipeline.run(input_data) + response = pipeline.run(input_data, **{"version": version}) assert response["status"] == "SUCCESS" assert response["data"][0]["label"] == output_data @@ -151,13 +226,15 @@ def test_run_router(input_data: str, output_data: str): @pytest.mark.parametrize( "input_data,output_data", [ - ("I love it.", "PositiveOutput"), - ("I hate it.", "NegativeOutput"), + ("I love it.", "PositiveOutput", "2.0"), + ("I hate it.", "NegativeOutput", "2.0"), + ("I love it.", "PositiveOutput", "3.0"), + ("I hate it.", "NegativeOutput", "3.0"), ], ) -def test_run_decision(input_data: str, output_data: str): +def test_run_decision(input_data: str, output_data: str, version: str): pipeline = PipelineFactory.list(query="Decision Test - DO NOT DELETE")["results"][0] - response = pipeline.run(input_data) + response = pipeline.run(input_data, **{"version": version}) assert response["status"] == "SUCCESS" assert response["data"][0]["label"] == output_data