diff --git a/evadb/binder/function_expression_binder.py b/evadb/binder/function_expression_binder.py new file mode 100644 index 0000000000..b1919bdbdd --- /dev/null +++ b/evadb/binder/function_expression_binder.py @@ -0,0 +1,223 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from pathlib import Path + +from evadb.binder.binder_utils import ( + BinderError, + extend_star, + resolve_alias_table_value_expression, +) +from evadb.binder.statement_binder import StatementBinder +from evadb.catalog.catalog_utils import ( + get_metadata_properties, + get_video_table_column_definitions, +) +from evadb.configuration.constants import EvaDB_INSTALLATION_DIR +from evadb.executor.execution_context import Context +from evadb.expression.function_expression import FunctionExpression +from evadb.expression.tuple_value_expression import TupleValueExpression +from evadb.parser.types import FunctionType +from evadb.third_party.huggingface.binder import assign_hf_function +from evadb.utils.generic_utils import ( + load_function_class_from_file, + string_comparison_case_insensitive, +) +from evadb.utils.logging_manager import logger + + +def bind_func_expr(binder: StatementBinder, node: FunctionExpression): + # setup the context + # we read the GPUs from the catalog and populate in the context + gpus_ids = binder._catalog().get_configuration_catalog_value("gpu_ids") + node._context = Context(gpus_ids) + + # handle the special case of "extract_object" + if node.name.upper() == str(FunctionType.EXTRACT_OBJECT): + handle_bind_extract_object_function(node, binder) + return + + # handle the special case of "completion or chatgpt" + if string_comparison_case_insensitive( + node.name, "chatgpt" + ) or string_comparison_case_insensitive(node.name, "completion"): + handle_bind_llm_function(node, binder) + + # Handle Func(*) + if ( + len(node.children) == 1 + and isinstance(node.children[0], TupleValueExpression) + and node.children[0].name == "*" + ): + node.children = extend_star(binder._binder_context) + # bind all the children + for child in node.children: + binder.bind(child) + + function_obj = binder._catalog().get_function_catalog_entry_by_name(node.name) + if function_obj is None: + err_msg = ( + f"Function '{node.name}' does not exist in the catalog. " + "Please create the function using CREATE FUNCTION command." + ) + logger.error(err_msg) + raise BinderError(err_msg) + + if string_comparison_case_insensitive(function_obj.type, "HuggingFace"): + node.function = assign_hf_function(function_obj) + + elif string_comparison_case_insensitive(function_obj.type, "Ludwig"): + function_class = load_function_class_from_file( + function_obj.impl_file_path, + "GenericLudwigModel", + ) + function_metadata = get_metadata_properties(function_obj) + assert "model_path" in function_metadata, "Ludwig models expect 'model_path'." + node.function = lambda: function_class( + model_path=function_metadata["model_path"] + ) + + else: + if function_obj.type == "ultralytics": + # manually set the impl_path for yolo functions we only handle object + # detection for now, hopefully this can be generalized + function_dir = Path(EvaDB_INSTALLATION_DIR) / "functions" + function_obj.impl_file_path = ( + Path(f"{function_dir}/yolo_object_detector.py").absolute().as_posix() + ) + + # Verify the consistency of the function. If the checksum of the function does not + # match the one stored in the catalog, an error will be thrown and the user + # will be asked to register the function again. + # assert ( + # get_file_checksum(function_obj.impl_file_path) == function_obj.checksum + # ), f"""Function file {function_obj.impl_file_path} has been modified from the + # registration. Please use DROP FUNCTION to drop it and re-create it # using CREATE FUNCTION.""" + + try: + function_class = load_function_class_from_file( + function_obj.impl_file_path, + function_obj.name, + ) + # certain functions take additional inputs like yolo needs the model_name + # these arguments are passed by the user as part of metadata + + properties = get_metadata_properties(function_obj) + node.function = lambda: function_class(**properties) + except Exception as e: + err_msg = ( + f"{str(e)}. Please verify that the function class name in the " + "implementation file matches the function name." + ) + logger.error(err_msg) + raise BinderError(err_msg) + + node.function_obj = function_obj + output_objs = binder._catalog().get_function_io_catalog_output_entries(function_obj) + if node.output: + for obj in output_objs: + if obj.name.lower() == node.output: + node.output_objs = [obj] + if not node.output_objs: + err_msg = f"Output {node.output} does not exist for {function_obj.name}." + logger.error(err_msg) + raise BinderError(err_msg) + node.projection_columns = [node.output] + else: + node.output_objs = output_objs + node.projection_columns = [obj.name.lower() for obj in output_objs] + + resolve_alias_table_value_expression(node) + + +def handle_bind_llm_function(node, binder): + # we also handle the special case of ChatGPT where we need to send the + # OpenAPI key as part of the parameter if not provided by the user + function_obj = binder._catalog().get_function_catalog_entry_by_name(node.name) + properties = get_metadata_properties(function_obj) + # if the user didn't provide any API_KEY, check if we have one in the catalog + if "OPENAI_API_KEY" not in properties.keys(): + openapi_key = binder._catalog().get_configuration_catalog_value( + "OPENAI_API_KEY" + ) + properties["openai_api_key"] = openapi_key + + +def handle_bind_extract_object_function( + node: FunctionExpression, binder_context: StatementBinder +): + """Handles the binding of extract_object function. + 1. Bind the source video data + 2. Create and bind the detector function expression using the provided name. + 3. Create and bind the tracker function expression. + Its inputs are id, data, output of detector. + 4. Bind the EXTRACT_OBJECT function expression and append the new children. + 5. Handle the alias and populate the outputs of the EXTRACT_OBJECT function + + Args: + node (FunctionExpression): The function expression representing the extract object operation. + binder_context (StatementBinder): The binder object used to bind expressions in the statement. + + Raises: + AssertionError: If the number of children in the `node` is not equal to 3. + """ + assert ( + len(node.children) == 3 + ), f"Invalid arguments provided to {node}. Example correct usage, (data, Detector, Tracker)" + + # 1. Bind the source video + video_data = node.children[0] + binder_context.bind(video_data) + + # 2. Construct the detector + # convert detector to FunctionExpression before binding + # eg. YoloV5 -> YoloV5(data) + detector = FunctionExpression(None, node.children[1].name) + detector.append_child(video_data.copy()) + binder_context.bind(detector) + + # 3. Construct the tracker + # convert tracker to FunctionExpression before binding + # eg. ByteTracker -> ByteTracker(id, data, labels, bboxes, scores) + tracker = FunctionExpression(None, node.children[2].name) + # create the video id expression + columns = get_video_table_column_definitions() + tracker.append_child( + TupleValueExpression(name=columns[1].name, table_alias=video_data.table_alias) + ) + tracker.append_child(video_data.copy()) + binder_context.bind(tracker) + # append the bound output of detector + for obj in detector.output_objs: + col_alias = "{}.{}".format(obj.function_name.lower(), obj.name.lower()) + child = TupleValueExpression( + obj.name, + table_alias=obj.function_name.lower(), + col_object=obj, + col_alias=col_alias, + ) + tracker.append_child(child) + + # 4. Bind the EXTRACT_OBJECT expression and append the new children. + node.children = [] + node.children = [video_data, detector, tracker] + + # 5. assign the outputs of tracker to the output of extract_object + node.output_objs = tracker.output_objs + node.projection_columns = [obj.name.lower() for obj in node.output_objs] + + # 5. resolve alias based on the what user provided + # we assign the alias to tracker as it governs the output of the extract object + resolve_alias_table_value_expression(node) + tracker.alias = node.alias diff --git a/evadb/binder/statement_binder.py b/evadb/binder/statement_binder.py index 128e6e7eed..e08085a1d8 100644 --- a/evadb/binder/statement_binder.py +++ b/evadb/binder/statement_binder.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. from functools import singledispatchmethod -from pathlib import Path from typing import Callable from evadb.binder.binder_utils import ( @@ -26,15 +25,11 @@ extend_star, get_bound_func_expr_outputs_as_tuple_value_expr, get_column_definition_from_select_target_list, - handle_bind_extract_object_function, - resolve_alias_table_value_expression, ) from evadb.binder.statement_binder_context import StatementBinderContext from evadb.catalog.catalog_type import ColumnType, TableType -from evadb.catalog.catalog_utils import get_metadata_properties, is_document_table +from evadb.catalog.catalog_utils import is_document_table from evadb.catalog.sql_config import RESTRICTED_COL_NAMES -from evadb.configuration.constants import EvaDB_INSTALLATION_DIR -from evadb.executor.execution_context import Context from evadb.expression.abstract_expression import AbstractExpression, ExpressionType from evadb.expression.function_expression import FunctionExpression from evadb.expression.tuple_value_expression import TupleValueExpression @@ -47,13 +42,7 @@ from evadb.parser.select_statement import SelectStatement from evadb.parser.statement import AbstractStatement from evadb.parser.table_ref import TableRef -from evadb.parser.types import FunctionType -from evadb.third_party.huggingface.binder import assign_hf_function -from evadb.utils.generic_utils import ( - load_function_class_from_file, - string_comparison_case_insensitive, -) -from evadb.utils.logging_manager import logger +from evadb.utils.generic_utils import string_comparison_case_insensitive class StatementBinder: @@ -274,115 +263,6 @@ def _bind_tuple_expr(self, node: TupleValueExpression): @bind.register(FunctionExpression) def _bind_func_expr(self, node: FunctionExpression): - # setup the context - # we read the GPUs from the catalog and populate in the context - gpus_ids = self._catalog().get_configuration_catalog_value("gpu_ids") - node._context = Context(gpus_ids) - - # handle the special case of "extract_object" - if node.name.upper() == str(FunctionType.EXTRACT_OBJECT): - handle_bind_extract_object_function(node, self) - return - - # Handle Func(*) - if ( - len(node.children) == 1 - and isinstance(node.children[0], TupleValueExpression) - and node.children[0].name == "*" - ): - node.children = extend_star(self._binder_context) - # bind all the children - for child in node.children: - self.bind(child) - - function_obj = self._catalog().get_function_catalog_entry_by_name(node.name) - if function_obj is None: - err_msg = ( - f"Function '{node.name}' does not exist in the catalog. " - "Please create the function using CREATE FUNCTION command." - ) - logger.error(err_msg) - raise BinderError(err_msg) - - if string_comparison_case_insensitive(function_obj.type, "HuggingFace"): - node.function = assign_hf_function(function_obj) - - elif string_comparison_case_insensitive(function_obj.type, "Ludwig"): - function_class = load_function_class_from_file( - function_obj.impl_file_path, - "GenericLudwigModel", - ) - function_metadata = get_metadata_properties(function_obj) - assert ( - "model_path" in function_metadata - ), "Ludwig models expect 'model_path'." - node.function = lambda: function_class( - model_path=function_metadata["model_path"] - ) - - else: - if function_obj.type == "ultralytics": - # manually set the impl_path for yolo functions we only handle object - # detection for now, hopefully this can be generalized - function_dir = Path(EvaDB_INSTALLATION_DIR) / "functions" - function_obj.impl_file_path = ( - Path(f"{function_dir}/yolo_object_detector.py") - .absolute() - .as_posix() - ) - - # Verify the consistency of the function. If the checksum of the function does not - # match the one stored in the catalog, an error will be thrown and the user - # will be asked to register the function again. - # assert ( - # get_file_checksum(function_obj.impl_file_path) == function_obj.checksum - # ), f"""Function file {function_obj.impl_file_path} has been modified from the - # registration. Please use DROP FUNCTION to drop it and re-create it # using CREATE FUNCTION.""" - - try: - function_class = load_function_class_from_file( - function_obj.impl_file_path, - function_obj.name, - ) - # certain functions take additional inputs like yolo needs the model_name - # these arguments are passed by the user as part of metadata - # we also handle the special case of ChatGPT where we need to send the - # OpenAPI key as part of the parameter if not provided by the user - properties = get_metadata_properties(function_obj) - if string_comparison_case_insensitive(node.name, "CHATGPT"): - # if the user didn't provide any API_KEY, check if we have one in the catalog - if "OPENAI_API_KEY" not in properties.keys(): - openapi_key = self._catalog().get_configuration_catalog_value( - "OPENAI_API_KEY" - ) - properties["openai_api_key"] = openapi_key - - node.function = lambda: function_class(**properties) - except Exception as e: - err_msg = ( - f"{str(e)}. Please verify that the function class name in the " - "implementation file matches the function name." - ) - logger.error(err_msg) - raise BinderError(err_msg) - - node.function_obj = function_obj - output_objs = self._catalog().get_function_io_catalog_output_entries( - function_obj - ) - if node.output: - for obj in output_objs: - if obj.name.lower() == node.output: - node.output_objs = [obj] - if not node.output_objs: - err_msg = ( - f"Output {node.output} does not exist for {function_obj.name}." - ) - logger.error(err_msg) - raise BinderError(err_msg) - node.projection_columns = [node.output] - else: - node.output_objs = output_objs - node.projection_columns = [obj.name.lower() for obj in output_objs] + from evadb.binder.function_expression_binder import bind_func_expr - resolve_alias_table_value_expression(node) + bind_func_expr(self, node) diff --git a/evadb/constants.py b/evadb/constants.py index 80777c5ab4..c8693ca203 100644 --- a/evadb/constants.py +++ b/evadb/constants.py @@ -21,3 +21,4 @@ IFRAMES = "IFRAMES" AUDIORATE = "AUDIORATE" DEFAULT_FUNCTION_EXPRESSION_COST = 100 +LLM_FUNCTIONS = ["chatgpt", "completion"] diff --git a/evadb/executor/llm_executor.py b/evadb/executor/llm_executor.py new file mode 100644 index 0000000000..86dc2626a1 --- /dev/null +++ b/evadb/executor/llm_executor.py @@ -0,0 +1,36 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Iterator + +from evadb.database import EvaDBDatabase +from evadb.executor.abstract_executor import AbstractExecutor +from evadb.models.storage.batch import Batch +from evadb.plan_nodes.llm_plan import LLMPlan + + +class LLMExecutor(AbstractExecutor): + def __init__(self, db: EvaDBDatabase, node: LLMPlan): + super().__init__(db, node) + self.llm_expr = node.llm_expr + self.alias = node.alias + + def exec(self, *args, **kwargs) -> Iterator[Batch]: + child_executor = self.children[0] + for batch in child_executor.exec(**kwargs): + llm_result = self.llm_expr.evaluate(batch) + + output = Batch.merge_column_wise([batch, llm_result]) + + yield output diff --git a/evadb/executor/plan_executor.py b/evadb/executor/plan_executor.py index 94d290bdb3..d18073d883 100644 --- a/evadb/executor/plan_executor.py +++ b/evadb/executor/plan_executor.py @@ -32,6 +32,7 @@ from evadb.executor.insert_executor import InsertExecutor from evadb.executor.join_build_executor import BuildJoinExecutor from evadb.executor.limit_executor import LimitExecutor +from evadb.executor.llm_executor import LLMExecutor from evadb.executor.load_executor import LoadDataExecutor from evadb.executor.nested_loop_join_executor import NestedLoopJoinExecutor from evadb.executor.orderby_executor import OrderByExecutor @@ -149,6 +150,8 @@ def _build_execution_tree( executor_node = CreateIndexExecutor(db=self._db, node=plan) elif plan_opr_type == PlanOprType.APPLY_AND_MERGE: executor_node = ApplyAndMergeExecutor(db=self._db, node=plan) + elif plan_opr_type == PlanOprType.LLM: + executor_node = LLMExecutor(db=self._db, node=plan) elif plan_opr_type == PlanOprType.VECTOR_INDEX_SCAN: executor_node = VectorIndexScanExecutor(db=self._db, node=plan) elif plan_opr_type == PlanOprType.DELETE: diff --git a/evadb/expression/expression_utils.py b/evadb/expression/expression_utils.py index b863d1d0d2..5fd413c631 100644 --- a/evadb/expression/expression_utils.py +++ b/evadb/expression/expression_utils.py @@ -15,9 +15,11 @@ from typing import List, Set +from evadb.constants import LLM_FUNCTIONS from evadb.expression.abstract_expression import AbstractExpression, ExpressionType from evadb.expression.comparison_expression import ComparisonExpression from evadb.expression.constant_value_expression import ConstantValueExpression +from evadb.expression.function_expression import FunctionExpression from evadb.expression.logical_expression import LogicalExpression from evadb.expression.tuple_value_expression import TupleValueExpression @@ -296,3 +298,20 @@ def _has_simple_expressions(expr): ] return _has_simple_expressions(predicate) and contains_single_column(predicate) + + +def is_llm_expression(expr: AbstractExpression): + if isinstance(expr, FunctionExpression) and expr.name.lower() in LLM_FUNCTIONS: + return True + return False + + +def extract_llm_expressions_from_project(exprs: List[AbstractExpression]): + remaining_exprs = [] + llm_exprs = [] + for expr in exprs: + if is_llm_expression(expr): + llm_exprs.append(expr.copy()) + else: + remaining_exprs.append(expr) + return llm_exprs, remaining_exprs diff --git a/evadb/functions/llms/base.py b/evadb/functions/llms/base.py new file mode 100644 index 0000000000..726552b7ba --- /dev/null +++ b/evadb/functions/llms/base.py @@ -0,0 +1,91 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import json +import os +from abc import abstractmethod +from typing import List + +import pandas as pd + +from evadb.catalog.catalog_type import NdArrayType +from evadb.functions.abstract.abstract_function import AbstractFunction +from evadb.functions.decorators.decorators import forward, setup +from evadb.functions.decorators.io_descriptors.data_types import PandasDataframe + + +class BaseLLM(AbstractFunction): + """ """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.model_stats = None + + @setup(cacheable=True, function_type="chat-completion", batchable=True) + def setup(self, *args, **kwargs) -> None: + super().setup(*args, **kwargs) + + @forward( + input_signatures=[ + PandasDataframe( + columns=["prompt"], + column_types=[ + NdArrayType.STR, + ], + column_shapes=[(None,)], + ) + ], + output_signatures=[ + PandasDataframe( + columns=["response"], + column_types=[ + NdArrayType.STR, + ], + column_shapes=[(None,)], + ) + ], + ) + def forward(self, text_df): + prompts = text_df[text_df.columns[0]] + responses = self.generate(prompts) + return pd.DataFrame({"response": responses}) + + @abstractmethod + def generate(self, prompts: List[str]) -> List[str]: + """ + All the child classes should overload this function + """ + raise NotImplementedError + + @abstractmethod + def get_cost(self, prompt: str, response: str = "") -> tuple(tuple, float): + """ + Return the token usage as tuple of input_token_usage, output_token_usage, and dollar cost of running the LLM on the prompt and the getting the provided response. + """ + pass + + def get_model_stats(self, model_name: str): + # read the statistics if not already read + if self.model_stats is None: + current_file_path = os.path.dirname(os.path.realpath(__file__)) + with open(f"{current_file_path}/llm_stats.json") as f: + self.model_stats = json.load(f) + + assert ( + model_name in self.model_stats + ), f"we do not have statistics for the model {model_name}" + + return self.model_stats[model_name] diff --git a/evadb/functions/llms/llm_stats.json b/evadb/functions/llms/llm_stats.json new file mode 100644 index 0000000000..4910fe4cbd --- /dev/null +++ b/evadb/functions/llms/llm_stats.json @@ -0,0 +1,128 @@ +{ + "gpt-4": { + "max_token_context": 8192, + "input_cost_per_token": 0.00003, + "output_cost_per_token": 0.00006, + "provider": "openai", + "mode": "chat" + }, + "gpt-4-0314": { + "max_token_context": 8192, + "input_cost_per_token": 0.00003, + "output_cost_per_token": 0.00006, + "provider": "openai", + "mode": "chat" + }, + "gpt-4-0613": { + "max_token_context": 8192, + "input_cost_per_token": 0.00003, + "output_cost_per_token": 0.00006, + "provider": "openai", + "mode": "chat" + }, + "gpt-4-32k": { + "max_token_context": 32768, + "input_cost_per_token": 0.00006, + "output_cost_per_token": 0.00012, + "provider": "openai", + "mode": "chat" + }, + "gpt-4-32k-0314": { + "max_token_context": 32768, + "input_cost_per_token": 0.00006, + "output_cost_per_token": 0.00012, + "provider": "openai", + "mode": "chat" + }, + "gpt-4-32k-0613": { + "max_token_context": 32768, + "input_cost_per_token": 0.00006, + "output_cost_per_token": 0.00012, + "provider": "openai", + "mode": "chat" + }, + "gpt-3.5-turbo": { + "max_token_context": 4097, + "input_cost_per_token": 0.0000015, + "output_cost_per_token": 0.000002, + "provider": "openai", + "mode": "chat" + }, + "gpt-3.5-turbo-0301": { + "max_token_context": 4097, + "input_cost_per_token": 0.0000015, + "output_cost_per_token": 0.000002, + "provider": "openai", + "mode": "chat" + }, + "gpt-3.5-turbo-0613": { + "max_token_context": 4097, + "input_cost_per_token": 0.0000015, + "output_cost_per_token": 0.000002, + "provider": "openai", + "mode": "chat" + }, + "gpt-3.5-turbo-16k": { + "max_token_context": 16385, + "input_cost_per_token": 0.000003, + "output_cost_per_token": 0.000004, + "provider": "openai", + "mode": "chat" + }, + "gpt-3.5-turbo-16k-0613": { + "max_token_context": 16385, + "input_cost_per_token": 0.000003, + "output_cost_per_token": 0.000004, + "provider": "openai", + "mode": "chat" + }, + "text-davinci-003": { + "max_token_context": 4097, + "input_cost_per_token": 0.000002, + "output_cost_per_token": 0.000002, + "provider": "openai", + "mode": "completion" + }, + "text-curie-001": { + "max_token_context": 2049, + "input_cost_per_token": 0.000002, + "output_cost_per_token": 0.000002, + "provider": "openai", + "mode": "completion" + }, + "text-babbage-001": { + "max_token_context": 2049, + "input_cost_per_token": 0.0000004, + "output_cost_per_token": 0.0000004, + "provider": "openai", + "mode": "completion" + }, + "text-ada-001": { + "max_token_context": 2049, + "input_cost_per_token": 0.0000004, + "output_cost_per_token": 0.0000004, + "provider": "openai", + "mode": "completion" + }, + "babbage-002": { + "max_token_context": 16384, + "input_cost_per_token": 0.0000004, + "output_cost_per_token": 0.0000004, + "provider": "openai", + "mode": "completion" + }, + "davinci-002": { + "max_token_context": 16384, + "input_cost_per_token": 0.000002, + "output_cost_per_token": 0.000002, + "provider": "openai", + "mode": "completion" + }, + "gpt-3.5-turbo-instruct": { + "max_token_context": 8192, + "input_cost_per_token": 0.0000015, + "output_cost_per_token": 0.000002, + "provider": "openai", + "mode": "completion" + } +} \ No newline at end of file diff --git a/evadb/functions/llms/openai.py b/evadb/functions/llms/openai.py new file mode 100644 index 0000000000..240c98c7d7 --- /dev/null +++ b/evadb/functions/llms/openai.py @@ -0,0 +1,113 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os +from typing import List + +from retry import retry + +from evadb.functions.llms.base import BaseLLM +from evadb.utils.generic_utils import ( + try_to_import_openai, + try_to_import_tiktoken, + validate_kwargs, +) + +_DEFAULT_MODEL = "gpt-3.5-turbo" +_DEFAULT_PARAMS = { + "model": _DEFAULT_MODEL, + "temperature": 0.0, + "request_timeout": 30, + "max_tokens": 1000, +} + + +class OpenAILLM(BaseLLM): + @property + def name(self) -> str: + return "OpenAILLM" + + def setup( + self, + openai_api_key="", + **kwargs, + ) -> None: + super().setup(**kwargs) + + try_to_import_openai() + import openai + + openai.api_key = self.openai_api_key + if len(openai.api_key) == 0: + openai.api_key = os.environ.get("OPENAI_API_KEY", "") + assert ( + len(openai.api_key) != 0 + ), "Please set your OpenAI API key using SET OPENAI_API_KEY = 'sk-' or environment variable (OPENAI_API_KEY)" + + validate_kwargs(kwargs, allowed_keys=_DEFAULT_PARAMS.keys()) + self.model_params = {**_DEFAULT_PARAMS, **kwargs} + + self.model_name = self.model_params["model"] + + def generate(self, prompts: List[str]) -> List[str]: + import openai + + @retry(tries=6, delay=20) + def completion_with_backoff(**kwargs): + return openai.ChatCompletion.create(**kwargs) + + results = [] + + for prompt in prompts: + def_sys_prompt_message = { + "role": "system", + "content": "You are a helpful assistant that accomplishes user tasks.", + } + + self.model_params["messages"].append(def_sys_prompt_message) + self.model_params["messages"].extend( + [ + { + "role": "user", + "content": f"Complete the following task: {prompt}", + }, + ], + ) + + response = completion_with_backoff(**self.model_params) + answer = response.choices[0].message.content + results.append(answer) + + return results + + def get_cost(self, prompt: str, response: str): + try_to_import_tiktoken() + import tiktoken + + encoding = tiktoken.encoding_for_model(self.model_name) + num_prompt_tokens = len(encoding.encode(prompt)) + num_response_tokens = self.model_params["max_tokens"] + if response: + num_response_tokens = len(encoding.encode(response)) + + model_stats = self.get_model_stats(self.model_name) + + token_consumed = (num_prompt_tokens, num_response_tokens) + dollar_cost = ( + model_stats["input_cost_per_token"] * num_prompt_tokens + + model_stats["output_cost_per_token"] * num_response_tokens + ) + return token_consumed, dollar_cost diff --git a/evadb/optimizer/operators.py b/evadb/optimizer/operators.py index 5b9bbf78de..4f756208e5 100644 --- a/evadb/optimizer/operators.py +++ b/evadb/optimizer/operators.py @@ -64,6 +64,7 @@ class OperatorType(IntEnum): LOGICAL_EXTRACT_OBJECT = auto() LOGICAL_VECTOR_INDEX_SCAN = auto() LOGICAL_USE = auto() + LOGICAL_LLM = auto() LOGICALDELIMITER = auto() @@ -1269,3 +1270,27 @@ def __hash__(self) -> int: self.search_query_expr, ) ) + + +class LogicalLLM(Operator): + def __init__( + self, + llm_expr: FunctionExpression, + children: List = None, + ): + super().__init__(OperatorType.LOGICAL_LLM, children) + self.llm_expr = llm_expr + + def __eq__(self, other): + is_subtree_equal = super().__eq__(other) + if not isinstance(other, LogicalLLM): + return False + return is_subtree_equal and self.llm_expr == other.llm_expr + + def __hash__(self) -> int: + return hash( + ( + super().__hash__(), + self.llm_expr, + ) + ) diff --git a/evadb/optimizer/rules/rules.py b/evadb/optimizer/rules/rules.py index cb9ff32742..2b82ef4820 100644 --- a/evadb/optimizer/rules/rules.py +++ b/evadb/optimizer/rules/rules.py @@ -44,6 +44,7 @@ from evadb.plan_nodes.exchange_plan import ExchangePlan from evadb.plan_nodes.explain_plan import ExplainPlan from evadb.plan_nodes.hash_join_build_plan import HashJoinBuildPlan +from evadb.plan_nodes.llm_plan import LLMPlan from evadb.plan_nodes.nested_loop_join_plan import NestedLoopJoinPlan from evadb.plan_nodes.predicate_plan import PredicatePlan from evadb.plan_nodes.project_plan import ProjectPlan @@ -70,6 +71,7 @@ LogicalInsert, LogicalJoin, LogicalLimit, + LogicalLLM, LogicalLoadData, LogicalOrderBy, LogicalProject, @@ -1412,5 +1414,24 @@ def apply(self, before: LogicalProject, context: OptimizerContext): yield exchange_plan +class LogicalLLMToPhysical(Rule): + def __init__(self): + pattern = Pattern(OperatorType.LOGICAL_LLM) + pattern.append_child(Pattern(OperatorType.DUMMY)) + super().__init__(RuleType.LOGICAL_LLM_TO_PHYSICAL, pattern) + + def promise(self): + return Promise.LOGICAL_LLM_TO_PHYSICAL + + def check(self, grp_id: int, context: OptimizerContext): + return True + + def apply(self, before: LogicalLLM, context: OptimizerContext): + after = LLMPlan(before.llm_expr) + for child in before.children: + after.append_child(child) + yield after + + # IMPLEMENTATION RULES END ############################################## diff --git a/evadb/optimizer/rules/rules_base.py b/evadb/optimizer/rules/rules_base.py index 2f61e1e23d..3cbeac1589 100644 --- a/evadb/optimizer/rules/rules_base.py +++ b/evadb/optimizer/rules/rules_base.py @@ -84,6 +84,7 @@ class RuleType(Flag): LOGICAL_CREATE_INDEX_TO_VECTOR_INDEX = auto() LOGICAL_APPLY_AND_MERGE_TO_PHYSICAL = auto() LOGICAL_VECTOR_INDEX_SCAN_TO_PHYSICAL = auto() + LOGICAL_LLM_TO_PHYSICAL = auto() IMPLEMENTATION_DELIMITER = auto() NUM_RULES = auto() @@ -97,6 +98,7 @@ class Promise(IntEnum): """ # IMPLEMENTATION RULES + LOGICAL_LLM_TO_PHYSICAL = auto() LOGICAL_EXCHANGE_TO_PHYSICAL = auto() LOGICAL_UNION_TO_PHYSICAL = auto() LOGICAL_GROUPBY_TO_PHYSICAL = auto() diff --git a/evadb/optimizer/rules/rules_manager.py b/evadb/optimizer/rules/rules_manager.py index cc88a9575d..57169ec78a 100644 --- a/evadb/optimizer/rules/rules_manager.py +++ b/evadb/optimizer/rules/rules_manager.py @@ -46,6 +46,7 @@ LogicalJoinToPhysicalNestedLoopJoin, LogicalLateralJoinToPhysical, LogicalLimitToPhysical, + LogicalLLMToPhysical, LogicalLoadToPhysical, LogicalOrderByToPhysical, LogicalProjectNoTableToPhysical, @@ -115,6 +116,7 @@ def __init__(self, configs: dict = {}): LogicalCreateIndexToVectorIndex(), LogicalVectorIndexScanToPhysical(), LogicalProjectNoTableToPhysical(), + LogicalLLMToPhysical(), ] # These rules are enabled only if diff --git a/evadb/optimizer/statement_to_opr_converter.py b/evadb/optimizer/statement_to_opr_converter.py index c60d0b258b..885e156d9f 100644 --- a/evadb/optimizer/statement_to_opr_converter.py +++ b/evadb/optimizer/statement_to_opr_converter.py @@ -14,6 +14,7 @@ # limitations under the License. from evadb.binder.binder_utils import get_bound_func_expr_outputs_as_tuple_value_expr from evadb.expression.abstract_expression import AbstractExpression +from evadb.expression.expression_utils import extract_llm_expressions_from_project from evadb.expression.function_expression import FunctionExpression from evadb.optimizer.operators import ( LogicalCreate, @@ -30,6 +31,7 @@ LogicalInsert, LogicalJoin, LogicalLimit, + LogicalLLM, LogicalLoadData, LogicalOrderBy, LogicalProject, @@ -228,9 +230,35 @@ def _visit_union(self, target, all): self._plan.append_child(right_child_plan) def _visit_projection(self, select_columns): + def __construct_llm_nodes(llm_exprs): + return [LogicalLLM(llm_expr) for llm_expr in llm_exprs] + + llm_exprs, remaining_exprs = extract_llm_expressions_from_project( + select_columns + ) + llm_nodes = __construct_llm_nodes(llm_exprs) + + select_columns = [] + for expr in llm_exprs: + select_columns.extend(get_bound_func_expr_outputs_as_tuple_value_expr(expr)) + + select_columns.extend(remaining_exprs) + + # add llm plan nodes + if llm_nodes: + # add existing plan as a child of llm + plan_root = LogicalLLM(llm_exprs[0]) + plan_root.append_child(self._plan) + for expr in llm_exprs[1:]: + new_root = LogicalLLM(expr) + new_root.append_child(plan_root) + plan_root = new_root + self._plan = plan_root + projection_opr = LogicalProject(select_columns) if self._plan is not None: projection_opr.append_child(self._plan) + self._plan = projection_opr def _visit_select_predicate(self, predicate: AbstractExpression): diff --git a/evadb/plan_nodes/llm_plan.py b/evadb/plan_nodes/llm_plan.py new file mode 100644 index 0000000000..ead61ad5aa --- /dev/null +++ b/evadb/plan_nodes/llm_plan.py @@ -0,0 +1,30 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from evadb.expression.function_expression import FunctionExpression +from evadb.plan_nodes.abstract_plan import AbstractPlan +from evadb.plan_nodes.types import PlanOprType + + +class LLMPlan(AbstractPlan): + def __init__(self, llm_expr: FunctionExpression): + self.llm_expr = llm_expr + self.alias = llm_expr.alias + super().__init__(PlanOprType.LLM) + + def __str__(self): + return f"LLMPlan(llm_expr={self.llm_expr})" + + def __hash__(self) -> int: + return hash((super().__hash__(), self.llm_expr, self.alias)) diff --git a/evadb/plan_nodes/types.py b/evadb/plan_nodes/types.py index 7b989e8e89..3acbec24d1 100644 --- a/evadb/plan_nodes/types.py +++ b/evadb/plan_nodes/types.py @@ -48,4 +48,5 @@ class PlanOprType(Enum): VECTOR_INDEX_SCAN = auto() NATIVE = auto() SQLALCHEMY = auto() + LLM = auto() # add other types diff --git a/evadb/utils/generic_utils.py b/evadb/utils/generic_utils.py index fb6bd9986a..3a1a75ee72 100644 --- a/evadb/utils/generic_utils.py +++ b/evadb/utils/generic_utils.py @@ -526,6 +526,16 @@ def try_to_import_openai(): ) +def try_to_import_tiktoken(): + try: + import tiktoken # noqa: F401 + except ImportError: + raise ValueError( + """Could not import tiktoken python package. + Please install them with `pip install tiktoken`.""" + ) + + def try_to_import_langchain(): try: import langchain # noqa: F401 diff --git a/setup.py b/setup.py index a18796d847..ca2f86d41c 100644 --- a/setup.py +++ b/setup.py @@ -81,6 +81,7 @@ def read(path, encoding="utf-8"): "protobuf", "bs4", "openai>=0.27.4", # CHATGPT + "tiktoken >= 0.3.3", # For calculating tokens "gpt4all", # PRIVATE GPT "sentencepiece", # TRANSFORMERS ] @@ -123,13 +124,11 @@ def read(path, encoding="utf-8"): xgboost_libs = ["flaml[automl]"] forecasting_libs = [ - "statsforecast", # MODEL TRAIN AND FINE TUNING - "neuralforecast" # MODEL TRAIN AND FINE TUNING + "statsforecast", # MODEL TRAIN AND FINE TUNING + "neuralforecast", # MODEL TRAIN AND FINE TUNING ] -imagegen_libs = [ - "replicate" -] +imagegen_libs = ["replicate"] ### NEEDED FOR DEVELOPER TESTING ONLY @@ -174,7 +173,15 @@ def read(path, encoding="utf-8"): "xgboost": xgboost_libs, "forecasting": forecasting_libs, # everything except ray, qdrant, ludwig and postgres. The first three fail on pyhton 3.11. - "dev": dev_libs + vision_libs + document_libs + function_libs + notebook_libs + forecasting_libs + sklearn_libs + imagegen_libs + xgboost_libs + "dev": dev_libs + + vision_libs + + document_libs + + function_libs + + notebook_libs + + forecasting_libs + + sklearn_libs + + imagegen_libs + + xgboost_libs, } setup( diff --git a/test/integration_tests/short/test_llm_executor.py b/test/integration_tests/short/test_llm_executor.py new file mode 100644 index 0000000000..0a17454510 --- /dev/null +++ b/test/integration_tests/short/test_llm_executor.py @@ -0,0 +1,84 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import unittest +from test.util import ( + file_remove, + get_evadb_for_testing, + get_logical_query_plan, + get_physical_query_plan, + load_functions_for_testing, + shutdown_ray, +) + +import pandas as pd +import pytest + +from evadb.models.storage.batch import Batch +from evadb.optimizer.operators import LogicalLLM +from evadb.plan_nodes.llm_plan import LLMPlan +from evadb.server.command_handler import execute_query_fetch_all + +NUM_FRAMES = 10 + + +@pytest.mark.notparallel +class LLMExecutorTest(unittest.TestCase): + @classmethod + def setUpClass(cls): + # add DummyLLM to LLM_FUNCTIONS, CACHEABLE_FUNCTIONS + from evadb.constants import CACHEABLE_FUNCTIONS, LLM_FUNCTIONS + + LLM_FUNCTIONS += ["DummyLLM".lower()] + CACHEABLE_FUNCTIONS += ["DummyLLM".lower()] + + cls.evadb = get_evadb_for_testing() + cls.evadb.catalog().reset() + + load_functions_for_testing(cls.evadb) + execute_query_fetch_all(cls.evadb, "CREATE TABLE fruitTable (data TEXT(100))") + cls.data_list = [ + "The color of apple is red", + "The color of banana is yellow", + ] + for data in cls.data_list: + execute_query_fetch_all( + cls.evadb, f"INSERT INTO fruitTable (data) VALUES ('{data}')" + ) + + @classmethod + def tearDownClass(cls): + shutdown_ray() + + file_remove("dummy.avi") + + def test_llm_in_project_should_call_llm_executor(self): + prompt = '"What is the fruit described in this sentence"' + select_query = f"SELECT DummyLLM({prompt}, data) FROM fruitTable;" + logical_plan = get_logical_query_plan(self.evadb, select_query) + assert len(list(logical_plan.find_all(LogicalLLM))) > 0 + physical_plan = get_physical_query_plan(self.evadb, select_query) + assert len(list(physical_plan.find_all(LLMPlan))) > 0 + batches = execute_query_fetch_all(self.evadb, select_query) + expected = Batch( + pd.DataFrame( + [prompt.strip('"') + " " + data for data in self.data_list], + columns=["dummyllm.response"], + ) + ) + self.assertEqual(batches, expected) + + +if __name__ == "__main__": + unittest.main()