diff --git a/evadb/configuration/constants.py b/evadb/configuration/constants.py index 126e6bcfc..eccbf5c1e 100644 --- a/evadb/configuration/constants.py +++ b/evadb/configuration/constants.py @@ -36,5 +36,7 @@ DEFAULT_DOCUMENT_CHUNK_OVERLAP = 200 DEFAULT_TRAIN_REGRESSION_METRIC = "rmse" DEFAULT_XGBOOST_TASK = "regression" +EVADB_STATS = {} DEFAULT_SKLEARN_TRAIN_MODEL = "rf" SKLEARN_SUPPORTED_MODELS = ["rf", "extra_tree", "kneighbor"] + diff --git a/evadb/executor/cost_estimator.py b/evadb/executor/cost_estimator.py new file mode 100644 index 000000000..eda0f3661 --- /dev/null +++ b/evadb/executor/cost_estimator.py @@ -0,0 +1,201 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Iterator, Union + +from evadb.database import EvaDBDatabase +from evadb.executor.abstract_executor import AbstractExecutor +from evadb.executor.vector_index_scan_executor import VectorIndexScanExecutor +from evadb.models.storage.batch import Batch +from evadb.parser.create_statement import CreateDatabaseStatement +from evadb.parser.set_statement import SetStatement +from evadb.parser.statement import AbstractStatement +from evadb.parser.use_statement import UseStatement +from evadb.plan_nodes.abstract_plan import AbstractPlan +from evadb.plan_nodes.types import PlanOprType +from evadb.utils.logging_manager import logger +from evadb.configuration import constants +import json +import ast + + +class CostEstimator: + """ + This acts as the interface to estimate the cost of a particular query. Now it is implemeted after the query optimization + stage, but ideally this will be present as part of optimization engine and help in deciding the right plan + + Arguments: + plan (AbstractPlan): Physical plan tree which needs to be executed + evadb (EvaDBDatabase): database to execute the query on + """ + + def __init__(self, evadb: EvaDBDatabase, plan: AbstractPlan): + self._db = evadb + self._plan = plan + self._cost = 0 + self._predicateSet = [] + + def getCostFromStats(self,table_name): + if str(table_name) in self._predicateSet: + return 0 + elif str(table_name) in constants.EVADB_STATS: + table_data = constants.EVADB_STATS[str(table_name)] + num_rows = table_data.get('num_rows',0) + return num_rows + else: + return 0 + + def _build_execution_tree( + self, plan: Union[AbstractPlan, AbstractStatement] + ) -> AbstractExecutor: + + root = None + if plan is None: + return root + + # First handle cases when the plan is actually a parser statement + if isinstance(plan, CreateDatabaseStatement): + self._cost += 0 + elif isinstance(plan, UseStatement): + self._cost += 0 + elif isinstance(plan, SetStatement): + self._cost += 0 + + # Get plan node type + plan_opr_type = plan.opr_type + + # Cost Estimation added for Seq Scan and Predicate Scan + # TODO: Add cost estimations for other types of operators + if plan_opr_type == PlanOprType.SEQUENTIAL_SCAN: + self._cost += self.getCostFromStats(plan.alias) + elif plan_opr_type == PlanOprType.UNION: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.STORAGE_PLAN: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.PP_FILTER: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.CREATE: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.RENAME: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.DROP_OBJECT: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.INSERT: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.CREATE_FUNCTION: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.LOAD_DATA: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.GROUP_BY: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.ORDER_BY: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.LIMIT: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.SAMPLE: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.NESTED_LOOP_JOIN: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.HASH_JOIN: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.HASH_BUILD: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.FUNCTION_SCAN: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.EXCHANGE: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.PROJECT: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.PREDICATE_FILTER: + self.getCostFromPredicate(plan.predicate) + elif plan_opr_type == PlanOprType.SHOW_INFO: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.EXPLAIN: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.CREATE_INDEX: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.APPLY_AND_MERGE: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.VECTOR_INDEX_SCAN: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.DELETE: + self._cost += 0.0 + + for children in plan.children: + self._build_execution_tree(children) + + def get_execution_cost( + self, + do_not_raise_exceptions: bool = False, + do_not_print_exceptions: bool = False, + ) -> Iterator[Batch]: + """cost estimation of the plan tree""" + try: + self._build_execution_tree(self._plan) + return self._cost + except Exception as e: + if do_not_raise_exceptions is False: + if do_not_print_exceptions is False: + logger.exception(str(e)) + + def getCostFromPredicate(self, plan_predicate): + predicate = str(plan_predicate).split(" ") + table_name = predicate[2].split(".")[0] + column = predicate[2].split(".")[1] + condition = predicate[3] + condition_value = predicate[4][:-1] + self._predicateSet.append(table_name) + if str(table_name) in constants.EVADB_STATS: + table_data = constants.EVADB_STATS[str(table_name)] + hist_data = table_data["hist"] + my_list = ast.literal_eval(hist_data) + json_data = json.dumps(my_list) + data_list = json.loads(json_data) + for item in data_list: + level_dict = item.get(table_name + "." + column, {}) + for value in level_dict: + if self.evaluate(condition_value,condition,value): + self._cost += level_dict[value] + + + def evaluate(self, condition_value, condition, value): + if condition == '>': + if value > condition_value: + return True + else: + return False + elif condition == '>=': + if value >= condition_value: + return True + else: + return False + elif condition == '<': + if value < condition_value: + return True + else: + return False + elif condition == '<=': + if value <= condition_value: + return True + else: + return False + elif condition == '=': + if value == condition_value: + return True + else: + return False + return False + + + diff --git a/evadb/executor/cost_estimator_utils.py b/evadb/executor/cost_estimator_utils.py new file mode 100644 index 000000000..9e0a74c7b --- /dev/null +++ b/evadb/executor/cost_estimator_utils.py @@ -0,0 +1,17 @@ +from evadb.server.command_handler import execute_query_fetch_all +from evadb.configuration import constants + + +class CostEstimatorUtils(): + + def fetch_table_stats(db, query): + query_result = execute_query_fetch_all( + db, query, do_not_print_exceptions=False, do_not_raise_exceptions=True + ) + for _, row in query_result.iterrows(): + entry = { + 'table_name': row['table_stats.table_name'], + 'num_rows': row['table_stats.num_rows'], + 'hist': row['table_stats.hist'] + } + constants.EVADB_STATS[row['table_stats.table_name']] = entry \ No newline at end of file diff --git a/evadb/executor/explain_executor.py b/evadb/executor/explain_executor.py index a579071a7..d6813f4e6 100644 --- a/evadb/executor/explain_executor.py +++ b/evadb/executor/explain_executor.py @@ -19,7 +19,8 @@ from evadb.models.storage.batch import Batch from evadb.plan_nodes.abstract_plan import AbstractPlan from evadb.plan_nodes.explain_plan import ExplainPlan - +from evadb.configuration import constants +from evadb.executor.cost_estimator import CostEstimator class ExplainExecutor(AbstractExecutor): def __init__(self, db: EvaDBDatabase, node: ExplainPlan): @@ -29,7 +30,11 @@ def exec(self, *args, **kwargs): # Traverse optimized physical plan, which is commonly supported. # Logical plan can be also printed by passing explainable_opr # attribute of the node, but is not done for now. + cost_estimated = CostEstimator(self._db, self._node.children[0]).get_execution_cost( + False, False + ) plan_str = self._exec(self._node.children[0], 0) + plan_str += "estimated rows accessed: " + str(cost_estimated) yield Batch(pd.DataFrame([plan_str])) def _exec(self, node: AbstractPlan, depth: int): diff --git a/evadb/functions/function_bootstrap_queries.py b/evadb/functions/function_bootstrap_queries.py index 3b5008586..4956dabb3 100644 --- a/evadb/functions/function_bootstrap_queries.py +++ b/evadb/functions/function_bootstrap_queries.py @@ -16,6 +16,8 @@ from evadb.configuration.constants import EvaDB_INSTALLATION_DIR from evadb.database import EvaDBDatabase from evadb.server.command_handler import execute_query_fetch_all +from evadb.configuration import constants +from collections import Counter NDARRAY_DIR = "ndarray" TUTORIALS_DIR = "tutorials" @@ -241,6 +243,8 @@ EvaDB_INSTALLATION_DIR ) +get_all_tables = "show tables" + def init_builtin_functions(db: EvaDBDatabase, mode: str = "debug") -> None: """Load the built-in functions into the system during system bootstrapping. @@ -284,6 +288,7 @@ def init_builtin_functions(db: EvaDBDatabase, mode: str = "debug") -> None: chatgpt_function_query, face_detection_function_query, # Mvit_function_query, + get_all_tables, Sift_function_query, Yolo_function_query, stablediffusion_function_query, @@ -309,8 +314,38 @@ def init_builtin_functions(db: EvaDBDatabase, mode: str = "debug") -> None: # ignore exceptions during the bootstrapping phase due to missing packages for query in queries: try: - execute_query_fetch_all( - db, query, do_not_print_exceptions=False, do_not_raise_exceptions=True - ) - except Exception: + if query.startswith("show"): + tables = execute_query_fetch_all( + db, query, do_not_print_exceptions=False, do_not_raise_exceptions=True + ) + # delete all entries in table_stats table. This is a temporary fix because update query is not yet + # available in EvaDB, the table_stats table is deleted and new data is inserted everytime + execute_query_fetch_all(db, "drop table table_stats", False, True) + execute_query_fetch_all(db, "create table table_stats(table_name TEXT, num_rows integer, hist TEXT)", False, True) + # get the histograms for each column for every table + row_count = 0 + for _,table in tables.iterrows(): + if str(table['name']) != "table_stats": + final_result = [] + result_dict = {} + table_data_query = f"select * from {table['name']}" + table_data = execute_query_fetch_all(db, table_data_query, False, True) + for column in table_data.columns: + try: + column_dict = dict(Counter(table_data.column_as_numpy_array(column))) + row_count = len(table_data.column_as_numpy_array(column)) + result_dict[column] = column_dict + except Exception as e: + print(e) + final_result = [{key: value} for key, value in result_dict.items()] + try: + insert_into_table_stats = f"insert into table_stats(table_name, num_rows, hist) values(\"{table['name']}\",{row_count},\"{final_result}\");" + execute_query_fetch_all(db, insert_into_table_stats, False, True) + except Exception as e: + print(e) + else: + execute_query_fetch_all( + db, query, do_not_print_exceptions=False, do_not_raise_exceptions=True + ) + except Exception as e: pass diff --git a/evadb/interfaces/relational/db.py b/evadb/interfaces/relational/db.py index 714593a8a..3d81e6156 100644 --- a/evadb/interfaces/relational/db.py +++ b/evadb/interfaces/relational/db.py @@ -46,6 +46,7 @@ from evadb.utils.generic_utils import find_nearest_word, is_ray_enabled_and_installed from evadb.utils.job_scheduler import JobScheduler from evadb.utils.logging_manager import logger +from evadb.executor.cost_estimator_utils import CostEstimatorUtils class EvaDBConnection: @@ -465,6 +466,8 @@ def query(self, sql_query: str) -> EvaDBQuery: 1 3 4 2 5 6 """ + #refresh the table_stats table to get the latest statistics of the database + CostEstimatorUtils.fetch_table_stats(self._evadb, "select * from table_stats;") stmt = parse_query(sql_query) return EvaDBQuery(self._evadb, stmt)