diff --git a/evadb/configuration/constants.py b/evadb/configuration/constants.py index 3665a28727..c613ad070f 100644 --- a/evadb/configuration/constants.py +++ b/evadb/configuration/constants.py @@ -36,3 +36,4 @@ DEFAULT_DOCUMENT_CHUNK_OVERLAP = 200 DEFAULT_TRAIN_REGRESSION_METRIC = "rmse" DEFAULT_XGBOOST_TASK = "regression" +EVADB_STATS = {} diff --git a/evadb/executor/cost_estimator.py b/evadb/executor/cost_estimator.py new file mode 100644 index 0000000000..08155ea9f5 --- /dev/null +++ b/evadb/executor/cost_estimator.py @@ -0,0 +1,198 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Iterator, Union + +from evadb.database import EvaDBDatabase +from evadb.executor.abstract_executor import AbstractExecutor +from evadb.executor.vector_index_scan_executor import VectorIndexScanExecutor +from evadb.models.storage.batch import Batch +from evadb.parser.create_statement import CreateDatabaseStatement +from evadb.parser.set_statement import SetStatement +from evadb.parser.statement import AbstractStatement +from evadb.parser.use_statement import UseStatement +from evadb.plan_nodes.abstract_plan import AbstractPlan +from evadb.plan_nodes.types import PlanOprType +from evadb.utils.logging_manager import logger +from evadb.configuration import constants +import json + + +class CostEstimator: + """ + This acts as the interface to estimate the cost of a particular query. Now it is implemeted after the query optimization + stage, but ideally this will be present as part of optimization engine and help in deciding the right plan + + Arguments: + plan (AbstractPlan): Physical plan tree which needs to be executed + evadb (EvaDBDatabase): database to execute the query on + """ + + def __init__(self, evadb: EvaDBDatabase, plan: AbstractPlan): + self._db = evadb + self._plan = plan + self._cost = 0 + self._predicateSet = [] + + def getCostFromStats(self,table_name): + if str(table_name) in self._predicateSet: + return 0 + elif str(table_name) in constants.EVADB_STATS: + table_data = constants.EVADB_STATS[str(table_name)] + num_rows = table_data.get('num_rows',0) + return num_rows + else: + return 0 + + def _build_execution_tree( + self, plan: Union[AbstractPlan, AbstractStatement] + ) -> AbstractExecutor: + + root = None + if plan is None: + return root + + # First handle cases when the plan is actually a parser statement + if isinstance(plan, CreateDatabaseStatement): + self._cost += 0 + elif isinstance(plan, UseStatement): + self._cost += 0 + elif isinstance(plan, SetStatement): + self._cost += 0 + + # Get plan node type + plan_opr_type = plan.opr_type + + # Cost Estimation added for Seq Scan and Predicate Scan + # TODO: Add cost estimations for other types of operators + if plan_opr_type == PlanOprType.SEQUENTIAL_SCAN: + self._cost += self.getCostFromStats(plan.alias) + elif plan_opr_type == PlanOprType.UNION: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.STORAGE_PLAN: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.PP_FILTER: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.CREATE: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.RENAME: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.DROP_OBJECT: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.INSERT: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.CREATE_FUNCTION: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.LOAD_DATA: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.GROUP_BY: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.ORDER_BY: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.LIMIT: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.SAMPLE: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.NESTED_LOOP_JOIN: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.HASH_JOIN: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.HASH_BUILD: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.FUNCTION_SCAN: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.EXCHANGE: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.PROJECT: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.PREDICATE_FILTER: + self.getCostFromPredicate(plan.predicate) + elif plan_opr_type == PlanOprType.SHOW_INFO: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.EXPLAIN: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.CREATE_INDEX: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.APPLY_AND_MERGE: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.VECTOR_INDEX_SCAN: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.DELETE: + self._cost += 0.0 + + for children in plan.children: + self._build_execution_tree(children) + + def get_execution_cost( + self, + do_not_raise_exceptions: bool = False, + do_not_print_exceptions: bool = False, + ) -> Iterator[Batch]: + """cost estimation of the plan tree""" + try: + self._build_execution_tree(self._plan) + return self._cost + except Exception as e: + if do_not_raise_exceptions is False: + if do_not_print_exceptions is False: + logger.exception(str(e)) + + def getCostFromPredicate(self, plan_predicate): + predicate = str(plan_predicate).split(" ") + table_name = predicate[2].split(".")[0] + column = predicate[2].split(".")[1] + condition = predicate[3] + condition_value = predicate[4][:-1] + self._predicateSet.append(table_name) + if str(table_name) in constants.EVADB_STATS: + table_data = constants.EVADB_STATS[str(table_name)] + hist_data = table_data["hist"] + data_list = json.loads(hist_data) + for item in data_list: + level_dict = item.get(column, {}) + for value in level_dict: + if self.evaluate(condition_value,condition,value): + self._cost += level_dict[value] + + + def evaluate(self, condition_value, condition, value): + if condition == '>': + if value > condition_value: + return True + else: + return False + elif condition == '>=': + if value >= condition_value: + return True + else: + return False + elif condition == '<': + if value < condition_value: + return True + else: + return False + elif condition == '<=': + if value <= condition_value: + return True + else: + return False + elif condition == '=': + if value == condition_value: + return True + else: + return False + return False + + + diff --git a/evadb/executor/explain_executor.py b/evadb/executor/explain_executor.py index a579071a7b..d6813f4e61 100644 --- a/evadb/executor/explain_executor.py +++ b/evadb/executor/explain_executor.py @@ -19,7 +19,8 @@ from evadb.models.storage.batch import Batch from evadb.plan_nodes.abstract_plan import AbstractPlan from evadb.plan_nodes.explain_plan import ExplainPlan - +from evadb.configuration import constants +from evadb.executor.cost_estimator import CostEstimator class ExplainExecutor(AbstractExecutor): def __init__(self, db: EvaDBDatabase, node: ExplainPlan): @@ -29,7 +30,11 @@ def exec(self, *args, **kwargs): # Traverse optimized physical plan, which is commonly supported. # Logical plan can be also printed by passing explainable_opr # attribute of the node, but is not done for now. + cost_estimated = CostEstimator(self._db, self._node.children[0]).get_execution_cost( + False, False + ) plan_str = self._exec(self._node.children[0], 0) + plan_str += "estimated rows accessed: " + str(cost_estimated) yield Batch(pd.DataFrame([plan_str])) def _exec(self, node: AbstractPlan, depth: int): diff --git a/evadb/functions/function_bootstrap_queries.py b/evadb/functions/function_bootstrap_queries.py index f8186d4dd3..0d38bff8f2 100644 --- a/evadb/functions/function_bootstrap_queries.py +++ b/evadb/functions/function_bootstrap_queries.py @@ -16,6 +16,7 @@ from evadb.configuration.constants import EvaDB_INSTALLATION_DIR from evadb.database import EvaDBDatabase from evadb.server.command_handler import execute_query_fetch_all +from evadb.configuration import constants NDARRAY_DIR = "ndarray" TUTORIALS_DIR = "tutorials" @@ -197,6 +198,8 @@ EvaDB_INSTALLATION_DIR ) +getdata_from_stats_query = "SELECT * FROM table_stats;" + yolo8n_query = """CREATE FUNCTION IF NOT EXISTS Yolo TYPE ultralytics MODEL 'yolov8n.pt'; @@ -238,6 +241,20 @@ EvaDB_INSTALLATION_DIR ) +create_buckets_query = """CREATE TABLE level_counts ( + level Integer, + row_count INTEGER +);""" + +insert_into_buckets = """INSERT INTO level_counts +SELECT + level, + COUNT(*) AS row_count +FROM mydata3 +GROUP BY level;""" + +extract_data_buckets = """Select * from level_counts""" + def init_builtin_functions(db: EvaDBDatabase, mode: str = "debug") -> None: """Load the built-in functions into the system during system bootstrapping. @@ -281,6 +298,8 @@ def init_builtin_functions(db: EvaDBDatabase, mode: str = "debug") -> None: chatgpt_function_query, face_detection_function_query, # Mvit_function_query, + extract_data_buckets, + getdata_from_stats_query, Sift_function_query, Yolo_function_query, stablediffusion_function_query, @@ -306,8 +325,20 @@ def init_builtin_functions(db: EvaDBDatabase, mode: str = "debug") -> None: # ignore exceptions during the bootstrapping phase due to missing packages for query in queries: try: - execute_query_fetch_all( + if query.startswith("SELECT"): + query_result = execute_query_fetch_all( + db, query, do_not_print_exceptions=False, do_not_raise_exceptions=True + ) + for _, row in query_result.iterrows(): + entry = { + 'table_name': row['table_stats.table_name'], + 'num_rows': row['table_stats.num_rows'], + 'hist': row['table_stats.hist'] + } + constants.EVADB_STATS[row['table_stats.table_name']] = entry + else: + execute_query_fetch_all( db, query, do_not_print_exceptions=False, do_not_raise_exceptions=True ) - except Exception: + except Exception as e: pass