Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cost estimation #1

Merged
merged 4 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/_toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ parts:
- file: source/reference/evaql/load_csv
- file: source/reference/evaql/load_image
- file: source/reference/evaql/load_video
- file: source/reference/evaql/load_pdf
- file: source/reference/evaql/select
- file: source/reference/evaql/explain
- file: source/reference/evaql/show_functions
Expand Down
16 changes: 16 additions & 0 deletions docs/source/reference/evaql/load_pdf.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
LOAD PDF
==========

.. _load-pdf:

.. code:: mysql

LOAD PDF 'test_pdf.pdf' INTO MyPDFs;

PDFs can be directly imported into a table, where the PDF document is segmented into pages and paragraphs.
Each row in the table corresponds to a paragraph extracted from the PDF, and the resulting table includes columns for ``name`` , ``page``, ``paragraph``, and ``data``.

| ``name`` signifies the title of the uploaded PDF.
| ``page`` signifies the specific page number from which the data is retrieved.
| ``paragraph`` signifies the individual paragraph within a page from which the data is extracted.
| ``data`` refers to the text extracted from the paragraph on the given page.
1 change: 1 addition & 0 deletions evadb/configuration/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@
DEFAULT_DOCUMENT_CHUNK_OVERLAP = 200
DEFAULT_TRAIN_REGRESSION_METRIC = "rmse"
DEFAULT_XGBOOST_TASK = "regression"
EVADB_STATS = {}
201 changes: 201 additions & 0 deletions evadb/executor/cost_estimator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
# coding=utf-8
# Copyright 2018-2023 EvaDB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Iterator, Union

from evadb.database import EvaDBDatabase
from evadb.executor.abstract_executor import AbstractExecutor
from evadb.executor.vector_index_scan_executor import VectorIndexScanExecutor
from evadb.models.storage.batch import Batch
from evadb.parser.create_statement import CreateDatabaseStatement
from evadb.parser.set_statement import SetStatement
from evadb.parser.statement import AbstractStatement
from evadb.parser.use_statement import UseStatement
from evadb.plan_nodes.abstract_plan import AbstractPlan
from evadb.plan_nodes.types import PlanOprType
from evadb.utils.logging_manager import logger
from evadb.configuration import constants
import json
import ast


class CostEstimator:
"""
This acts as the interface to estimate the cost of a particular query. Now it is implemeted after the query optimization
stage, but ideally this will be present as part of optimization engine and help in deciding the right plan

Arguments:
plan (AbstractPlan): Physical plan tree which needs to be executed
evadb (EvaDBDatabase): database to execute the query on
"""

def __init__(self, evadb: EvaDBDatabase, plan: AbstractPlan):
self._db = evadb
self._plan = plan
self._cost = 0
self._predicateSet = []

def getCostFromStats(self,table_name):
if str(table_name) in self._predicateSet:
return 0
elif str(table_name) in constants.EVADB_STATS:
table_data = constants.EVADB_STATS[str(table_name)]
num_rows = table_data.get('num_rows',0)
return num_rows
else:
return 0

def _build_execution_tree(
self, plan: Union[AbstractPlan, AbstractStatement]
) -> AbstractExecutor:

root = None
if plan is None:
return root

# First handle cases when the plan is actually a parser statement
if isinstance(plan, CreateDatabaseStatement):
self._cost += 0
elif isinstance(plan, UseStatement):
self._cost += 0
elif isinstance(plan, SetStatement):
self._cost += 0

# Get plan node type
plan_opr_type = plan.opr_type

# Cost Estimation added for Seq Scan and Predicate Scan
# TODO: Add cost estimations for other types of operators
if plan_opr_type == PlanOprType.SEQUENTIAL_SCAN:
self._cost += self.getCostFromStats(plan.alias)
elif plan_opr_type == PlanOprType.UNION:
self._cost += 0.0
elif plan_opr_type == PlanOprType.STORAGE_PLAN:
self._cost += 0.0
elif plan_opr_type == PlanOprType.PP_FILTER:
self._cost += 0.0
elif plan_opr_type == PlanOprType.CREATE:
self._cost += 0.0
elif plan_opr_type == PlanOprType.RENAME:
self._cost += 0.0
elif plan_opr_type == PlanOprType.DROP_OBJECT:
self._cost += 0.0
elif plan_opr_type == PlanOprType.INSERT:
self._cost += 0.0
elif plan_opr_type == PlanOprType.CREATE_FUNCTION:
self._cost += 0.0
elif plan_opr_type == PlanOprType.LOAD_DATA:
self._cost += 0.0
elif plan_opr_type == PlanOprType.GROUP_BY:
self._cost += 0.0
elif plan_opr_type == PlanOprType.ORDER_BY:
self._cost += 0.0
elif plan_opr_type == PlanOprType.LIMIT:
self._cost += 0.0
elif plan_opr_type == PlanOprType.SAMPLE:
self._cost += 0.0
elif plan_opr_type == PlanOprType.NESTED_LOOP_JOIN:
self._cost += 0.0
elif plan_opr_type == PlanOprType.HASH_JOIN:
self._cost += 0.0
elif plan_opr_type == PlanOprType.HASH_BUILD:
self._cost += 0.0
elif plan_opr_type == PlanOprType.FUNCTION_SCAN:
self._cost += 0.0
elif plan_opr_type == PlanOprType.EXCHANGE:
self._cost += 0.0
elif plan_opr_type == PlanOprType.PROJECT:
self._cost += 0.0
elif plan_opr_type == PlanOprType.PREDICATE_FILTER:
self.getCostFromPredicate(plan.predicate)
elif plan_opr_type == PlanOprType.SHOW_INFO:
self._cost += 0.0
elif plan_opr_type == PlanOprType.EXPLAIN:
self._cost += 0.0
elif plan_opr_type == PlanOprType.CREATE_INDEX:
self._cost += 0.0
elif plan_opr_type == PlanOprType.APPLY_AND_MERGE:
self._cost += 0.0
elif plan_opr_type == PlanOprType.VECTOR_INDEX_SCAN:
self._cost += 0.0
elif plan_opr_type == PlanOprType.DELETE:
self._cost += 0.0

for children in plan.children:
self._build_execution_tree(children)

def get_execution_cost(
self,
do_not_raise_exceptions: bool = False,
do_not_print_exceptions: bool = False,
) -> Iterator[Batch]:
"""cost estimation of the plan tree"""
try:
self._build_execution_tree(self._plan)
return self._cost
except Exception as e:
if do_not_raise_exceptions is False:
if do_not_print_exceptions is False:
logger.exception(str(e))

def getCostFromPredicate(self, plan_predicate):
predicate = str(plan_predicate).split(" ")
table_name = predicate[2].split(".")[0]
column = predicate[2].split(".")[1]
condition = predicate[3]
condition_value = predicate[4][:-1]
self._predicateSet.append(table_name)
if str(table_name) in constants.EVADB_STATS:
table_data = constants.EVADB_STATS[str(table_name)]
hist_data = table_data["hist"]
my_list = ast.literal_eval(hist_data)
json_data = json.dumps(my_list)
data_list = json.loads(json_data)
for item in data_list:
level_dict = item.get(table_name + "." + column, {})
for value in level_dict:
if self.evaluate(condition_value,condition,value):
self._cost += level_dict[value]


def evaluate(self, condition_value, condition, value):
if condition == '>':
if value > condition_value:
return True
else:
return False
elif condition == '>=':
if value >= condition_value:
return True
else:
return False
elif condition == '<':
if value < condition_value:
return True
else:
return False
elif condition == '<=':
if value <= condition_value:
return True
else:
return False
elif condition == '=':
if value == condition_value:
return True
else:
return False
return False



17 changes: 17 additions & 0 deletions evadb/executor/cost_estimator_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from evadb.server.command_handler import execute_query_fetch_all
from evadb.configuration import constants


class CostEstimatorUtils():

def fetch_table_stats(db, query):
query_result = execute_query_fetch_all(
db, query, do_not_print_exceptions=False, do_not_raise_exceptions=True
)
for _, row in query_result.iterrows():
entry = {
'table_name': row['table_stats.table_name'],
'num_rows': row['table_stats.num_rows'],
'hist': row['table_stats.hist']
}
constants.EVADB_STATS[row['table_stats.table_name']] = entry
7 changes: 6 additions & 1 deletion evadb/executor/explain_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
from evadb.models.storage.batch import Batch
from evadb.plan_nodes.abstract_plan import AbstractPlan
from evadb.plan_nodes.explain_plan import ExplainPlan

from evadb.configuration import constants
from evadb.executor.cost_estimator import CostEstimator

class ExplainExecutor(AbstractExecutor):
def __init__(self, db: EvaDBDatabase, node: ExplainPlan):
Expand All @@ -29,7 +30,11 @@ def exec(self, *args, **kwargs):
# Traverse optimized physical plan, which is commonly supported.
# Logical plan can be also printed by passing explainable_opr
# attribute of the node, but is not done for now.
cost_estimated = CostEstimator(self._db, self._node.children[0]).get_execution_cost(
False, False
)
plan_str = self._exec(self._node.children[0], 0)
plan_str += "estimated rows accessed: " + str(cost_estimated)
yield Batch(pd.DataFrame([plan_str]))

def _exec(self, node: AbstractPlan, depth: int):
Expand Down
43 changes: 39 additions & 4 deletions evadb/functions/function_bootstrap_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from evadb.configuration.constants import EvaDB_INSTALLATION_DIR
from evadb.database import EvaDBDatabase
from evadb.server.command_handler import execute_query_fetch_all
from evadb.configuration import constants
from collections import Counter

NDARRAY_DIR = "ndarray"
TUTORIALS_DIR = "tutorials"
Expand Down Expand Up @@ -238,6 +240,8 @@
EvaDB_INSTALLATION_DIR
)

get_all_tables = "show tables"


def init_builtin_functions(db: EvaDBDatabase, mode: str = "debug") -> None:
"""Load the built-in functions into the system during system bootstrapping.
Expand Down Expand Up @@ -281,6 +285,7 @@ def init_builtin_functions(db: EvaDBDatabase, mode: str = "debug") -> None:
chatgpt_function_query,
face_detection_function_query,
# Mvit_function_query,
get_all_tables,
Sift_function_query,
Yolo_function_query,
stablediffusion_function_query,
Expand All @@ -306,8 +311,38 @@ def init_builtin_functions(db: EvaDBDatabase, mode: str = "debug") -> None:
# ignore exceptions during the bootstrapping phase due to missing packages
for query in queries:
try:
execute_query_fetch_all(
db, query, do_not_print_exceptions=False, do_not_raise_exceptions=True
)
except Exception:
if query.startswith("show"):
tables = execute_query_fetch_all(
db, query, do_not_print_exceptions=False, do_not_raise_exceptions=True
)
# delete all entries in table_stats table. This is a temporary fix because update query is not yet
# available in EvaDB, the table_stats table is deleted and new data is inserted everytime
execute_query_fetch_all(db, "drop table table_stats", False, True)
execute_query_fetch_all(db, "create table table_stats(table_name TEXT, num_rows integer, hist TEXT)", False, True)
# get the histograms for each column for every table
row_count = 0
for _,table in tables.iterrows():
if str(table['name']) != "table_stats":
final_result = []
result_dict = {}
table_data_query = f"select * from {table['name']}"
table_data = execute_query_fetch_all(db, table_data_query, False, True)
for column in table_data.columns:
try:
column_dict = dict(Counter(table_data.column_as_numpy_array(column)))
row_count = len(table_data.column_as_numpy_array(column))
result_dict[column] = column_dict
except Exception as e:
print(e)
final_result = [{key: value} for key, value in result_dict.items()]
try:
insert_into_table_stats = f"insert into table_stats(table_name, num_rows, hist) values(\"{table['name']}\",{row_count},\"{final_result}\");"
execute_query_fetch_all(db, insert_into_table_stats, False, True)
except Exception as e:
print(e)
else:
execute_query_fetch_all(
db, query, do_not_print_exceptions=False, do_not_raise_exceptions=True
)
except Exception as e:
pass
3 changes: 3 additions & 0 deletions evadb/interfaces/relational/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from evadb.server.command_handler import execute_statement
from evadb.utils.generic_utils import find_nearest_word, is_ray_enabled_and_installed
from evadb.utils.logging_manager import logger
from evadb.executor.cost_estimator_utils import CostEstimatorUtils


class EvaDBConnection:
Expand Down Expand Up @@ -444,6 +445,8 @@ def query(self, sql_query: str) -> EvaDBQuery:
1 3 4
2 5 6
"""
#refresh the table_stats table to get the latest statistics of the database
CostEstimatorUtils.fetch_table_stats(self._evadb, "select * from table_stats;")
stmt = parse_query(sql_query)
return EvaDBQuery(self._evadb, stmt)

Expand Down