From c552137e312b70c487480fcf1f48e4a8c6086dc0 Mon Sep 17 00:00:00 2001 From: Lohith K S Date: Tue, 7 Nov 2023 01:36:34 -0500 Subject: [PATCH 1/3] Verified that issue #1067 is resolved and added documentation for load pdf functionality --- docs/_toc.yml | 1 + docs/source/reference/evaql/load_pdf.rst | 16 ++++++++++++++++ 2 files changed, 17 insertions(+) create mode 100644 docs/source/reference/evaql/load_pdf.rst diff --git a/docs/_toc.yml b/docs/_toc.yml index a50c653579..ca191ce42d 100644 --- a/docs/_toc.yml +++ b/docs/_toc.yml @@ -45,6 +45,7 @@ parts: - file: source/reference/evaql/load_csv - file: source/reference/evaql/load_image - file: source/reference/evaql/load_video + - file: source/reference/evaql/load_pdf - file: source/reference/evaql/select - file: source/reference/evaql/explain - file: source/reference/evaql/show_functions diff --git a/docs/source/reference/evaql/load_pdf.rst b/docs/source/reference/evaql/load_pdf.rst new file mode 100644 index 0000000000..8ced9aeb3f --- /dev/null +++ b/docs/source/reference/evaql/load_pdf.rst @@ -0,0 +1,16 @@ +LOAD PDF +========== + +.. _load-pdf: + +.. code:: mysql + + LOAD PDF 'test_pdf.pdf' INTO MyPDFs; + +PDFs can be directly imported into a table, where the PDF document is segmented into pages and paragraphs. +Each row in the table corresponds to a paragraph extracted from the PDF, and the resulting table includes columns for ``name`` , ``page``, ``paragraph``, and ``data``. + +| ``name`` signifies the title of the uploaded PDF. +| ``page`` signifies the specific page number from which the data is retrieved. +| ``paragraph`` signifies the individual paragraph within a page from which the data is extracted. +| ``data`` refers to the text extracted from the paragraph on the given page. From 5ad867a164dbbb3ac68772c61b465a496ff820db Mon Sep 17 00:00:00 2001 From: Lohith K S Date: Mon, 20 Nov 2023 17:36:46 -0500 Subject: [PATCH 2/3] Feat: Cost Estimation for Queries: Initial Draft and support for SeqScan and Predicate Filters --- evadb/configuration/constants.py | 1 + evadb/executor/cost_estimator.py | 198 ++++++++++++++++++ evadb/executor/explain_executor.py | 7 +- evadb/functions/function_bootstrap_queries.py | 35 +++- 4 files changed, 238 insertions(+), 3 deletions(-) create mode 100644 evadb/executor/cost_estimator.py diff --git a/evadb/configuration/constants.py b/evadb/configuration/constants.py index 3665a28727..c613ad070f 100644 --- a/evadb/configuration/constants.py +++ b/evadb/configuration/constants.py @@ -36,3 +36,4 @@ DEFAULT_DOCUMENT_CHUNK_OVERLAP = 200 DEFAULT_TRAIN_REGRESSION_METRIC = "rmse" DEFAULT_XGBOOST_TASK = "regression" +EVADB_STATS = {} diff --git a/evadb/executor/cost_estimator.py b/evadb/executor/cost_estimator.py new file mode 100644 index 0000000000..08155ea9f5 --- /dev/null +++ b/evadb/executor/cost_estimator.py @@ -0,0 +1,198 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Iterator, Union + +from evadb.database import EvaDBDatabase +from evadb.executor.abstract_executor import AbstractExecutor +from evadb.executor.vector_index_scan_executor import VectorIndexScanExecutor +from evadb.models.storage.batch import Batch +from evadb.parser.create_statement import CreateDatabaseStatement +from evadb.parser.set_statement import SetStatement +from evadb.parser.statement import AbstractStatement +from evadb.parser.use_statement import UseStatement +from evadb.plan_nodes.abstract_plan import AbstractPlan +from evadb.plan_nodes.types import PlanOprType +from evadb.utils.logging_manager import logger +from evadb.configuration import constants +import json + + +class CostEstimator: + """ + This acts as the interface to estimate the cost of a particular query. Now it is implemeted after the query optimization + stage, but ideally this will be present as part of optimization engine and help in deciding the right plan + + Arguments: + plan (AbstractPlan): Physical plan tree which needs to be executed + evadb (EvaDBDatabase): database to execute the query on + """ + + def __init__(self, evadb: EvaDBDatabase, plan: AbstractPlan): + self._db = evadb + self._plan = plan + self._cost = 0 + self._predicateSet = [] + + def getCostFromStats(self,table_name): + if str(table_name) in self._predicateSet: + return 0 + elif str(table_name) in constants.EVADB_STATS: + table_data = constants.EVADB_STATS[str(table_name)] + num_rows = table_data.get('num_rows',0) + return num_rows + else: + return 0 + + def _build_execution_tree( + self, plan: Union[AbstractPlan, AbstractStatement] + ) -> AbstractExecutor: + + root = None + if plan is None: + return root + + # First handle cases when the plan is actually a parser statement + if isinstance(plan, CreateDatabaseStatement): + self._cost += 0 + elif isinstance(plan, UseStatement): + self._cost += 0 + elif isinstance(plan, SetStatement): + self._cost += 0 + + # Get plan node type + plan_opr_type = plan.opr_type + + # Cost Estimation added for Seq Scan and Predicate Scan + # TODO: Add cost estimations for other types of operators + if plan_opr_type == PlanOprType.SEQUENTIAL_SCAN: + self._cost += self.getCostFromStats(plan.alias) + elif plan_opr_type == PlanOprType.UNION: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.STORAGE_PLAN: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.PP_FILTER: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.CREATE: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.RENAME: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.DROP_OBJECT: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.INSERT: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.CREATE_FUNCTION: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.LOAD_DATA: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.GROUP_BY: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.ORDER_BY: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.LIMIT: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.SAMPLE: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.NESTED_LOOP_JOIN: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.HASH_JOIN: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.HASH_BUILD: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.FUNCTION_SCAN: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.EXCHANGE: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.PROJECT: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.PREDICATE_FILTER: + self.getCostFromPredicate(plan.predicate) + elif plan_opr_type == PlanOprType.SHOW_INFO: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.EXPLAIN: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.CREATE_INDEX: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.APPLY_AND_MERGE: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.VECTOR_INDEX_SCAN: + self._cost += 0.0 + elif plan_opr_type == PlanOprType.DELETE: + self._cost += 0.0 + + for children in plan.children: + self._build_execution_tree(children) + + def get_execution_cost( + self, + do_not_raise_exceptions: bool = False, + do_not_print_exceptions: bool = False, + ) -> Iterator[Batch]: + """cost estimation of the plan tree""" + try: + self._build_execution_tree(self._plan) + return self._cost + except Exception as e: + if do_not_raise_exceptions is False: + if do_not_print_exceptions is False: + logger.exception(str(e)) + + def getCostFromPredicate(self, plan_predicate): + predicate = str(plan_predicate).split(" ") + table_name = predicate[2].split(".")[0] + column = predicate[2].split(".")[1] + condition = predicate[3] + condition_value = predicate[4][:-1] + self._predicateSet.append(table_name) + if str(table_name) in constants.EVADB_STATS: + table_data = constants.EVADB_STATS[str(table_name)] + hist_data = table_data["hist"] + data_list = json.loads(hist_data) + for item in data_list: + level_dict = item.get(column, {}) + for value in level_dict: + if self.evaluate(condition_value,condition,value): + self._cost += level_dict[value] + + + def evaluate(self, condition_value, condition, value): + if condition == '>': + if value > condition_value: + return True + else: + return False + elif condition == '>=': + if value >= condition_value: + return True + else: + return False + elif condition == '<': + if value < condition_value: + return True + else: + return False + elif condition == '<=': + if value <= condition_value: + return True + else: + return False + elif condition == '=': + if value == condition_value: + return True + else: + return False + return False + + + diff --git a/evadb/executor/explain_executor.py b/evadb/executor/explain_executor.py index a579071a7b..d6813f4e61 100644 --- a/evadb/executor/explain_executor.py +++ b/evadb/executor/explain_executor.py @@ -19,7 +19,8 @@ from evadb.models.storage.batch import Batch from evadb.plan_nodes.abstract_plan import AbstractPlan from evadb.plan_nodes.explain_plan import ExplainPlan - +from evadb.configuration import constants +from evadb.executor.cost_estimator import CostEstimator class ExplainExecutor(AbstractExecutor): def __init__(self, db: EvaDBDatabase, node: ExplainPlan): @@ -29,7 +30,11 @@ def exec(self, *args, **kwargs): # Traverse optimized physical plan, which is commonly supported. # Logical plan can be also printed by passing explainable_opr # attribute of the node, but is not done for now. + cost_estimated = CostEstimator(self._db, self._node.children[0]).get_execution_cost( + False, False + ) plan_str = self._exec(self._node.children[0], 0) + plan_str += "estimated rows accessed: " + str(cost_estimated) yield Batch(pd.DataFrame([plan_str])) def _exec(self, node: AbstractPlan, depth: int): diff --git a/evadb/functions/function_bootstrap_queries.py b/evadb/functions/function_bootstrap_queries.py index f8186d4dd3..0d38bff8f2 100644 --- a/evadb/functions/function_bootstrap_queries.py +++ b/evadb/functions/function_bootstrap_queries.py @@ -16,6 +16,7 @@ from evadb.configuration.constants import EvaDB_INSTALLATION_DIR from evadb.database import EvaDBDatabase from evadb.server.command_handler import execute_query_fetch_all +from evadb.configuration import constants NDARRAY_DIR = "ndarray" TUTORIALS_DIR = "tutorials" @@ -197,6 +198,8 @@ EvaDB_INSTALLATION_DIR ) +getdata_from_stats_query = "SELECT * FROM table_stats;" + yolo8n_query = """CREATE FUNCTION IF NOT EXISTS Yolo TYPE ultralytics MODEL 'yolov8n.pt'; @@ -238,6 +241,20 @@ EvaDB_INSTALLATION_DIR ) +create_buckets_query = """CREATE TABLE level_counts ( + level Integer, + row_count INTEGER +);""" + +insert_into_buckets = """INSERT INTO level_counts +SELECT + level, + COUNT(*) AS row_count +FROM mydata3 +GROUP BY level;""" + +extract_data_buckets = """Select * from level_counts""" + def init_builtin_functions(db: EvaDBDatabase, mode: str = "debug") -> None: """Load the built-in functions into the system during system bootstrapping. @@ -281,6 +298,8 @@ def init_builtin_functions(db: EvaDBDatabase, mode: str = "debug") -> None: chatgpt_function_query, face_detection_function_query, # Mvit_function_query, + extract_data_buckets, + getdata_from_stats_query, Sift_function_query, Yolo_function_query, stablediffusion_function_query, @@ -306,8 +325,20 @@ def init_builtin_functions(db: EvaDBDatabase, mode: str = "debug") -> None: # ignore exceptions during the bootstrapping phase due to missing packages for query in queries: try: - execute_query_fetch_all( + if query.startswith("SELECT"): + query_result = execute_query_fetch_all( + db, query, do_not_print_exceptions=False, do_not_raise_exceptions=True + ) + for _, row in query_result.iterrows(): + entry = { + 'table_name': row['table_stats.table_name'], + 'num_rows': row['table_stats.num_rows'], + 'hist': row['table_stats.hist'] + } + constants.EVADB_STATS[row['table_stats.table_name']] = entry + else: + execute_query_fetch_all( db, query, do_not_print_exceptions=False, do_not_raise_exceptions=True ) - except Exception: + except Exception as e: pass From f9325e3bb4299f784ee3e3ca4a92e92ac54cae82 Mon Sep 17 00:00:00 2001 From: Lohith K S Date: Wed, 22 Nov 2023 17:59:20 -0500 Subject: [PATCH 3/3] Feat: Cost Estimation for Query - Added support for dynamic creation of histogram tables and refresh of table_stats table for every query --- evadb/executor/cost_estimator.py | 7 +- evadb/executor/cost_estimator_utils.py | 17 +++++ evadb/functions/function_bootstrap_queries.py | 64 ++++++++++--------- evadb/interfaces/relational/db.py | 3 + 4 files changed, 59 insertions(+), 32 deletions(-) create mode 100644 evadb/executor/cost_estimator_utils.py diff --git a/evadb/executor/cost_estimator.py b/evadb/executor/cost_estimator.py index 08155ea9f5..eda0f3661c 100644 --- a/evadb/executor/cost_estimator.py +++ b/evadb/executor/cost_estimator.py @@ -27,6 +27,7 @@ from evadb.utils.logging_manager import logger from evadb.configuration import constants import json +import ast class CostEstimator: @@ -158,9 +159,11 @@ def getCostFromPredicate(self, plan_predicate): if str(table_name) in constants.EVADB_STATS: table_data = constants.EVADB_STATS[str(table_name)] hist_data = table_data["hist"] - data_list = json.loads(hist_data) + my_list = ast.literal_eval(hist_data) + json_data = json.dumps(my_list) + data_list = json.loads(json_data) for item in data_list: - level_dict = item.get(column, {}) + level_dict = item.get(table_name + "." + column, {}) for value in level_dict: if self.evaluate(condition_value,condition,value): self._cost += level_dict[value] diff --git a/evadb/executor/cost_estimator_utils.py b/evadb/executor/cost_estimator_utils.py new file mode 100644 index 0000000000..9e0a74c7be --- /dev/null +++ b/evadb/executor/cost_estimator_utils.py @@ -0,0 +1,17 @@ +from evadb.server.command_handler import execute_query_fetch_all +from evadb.configuration import constants + + +class CostEstimatorUtils(): + + def fetch_table_stats(db, query): + query_result = execute_query_fetch_all( + db, query, do_not_print_exceptions=False, do_not_raise_exceptions=True + ) + for _, row in query_result.iterrows(): + entry = { + 'table_name': row['table_stats.table_name'], + 'num_rows': row['table_stats.num_rows'], + 'hist': row['table_stats.hist'] + } + constants.EVADB_STATS[row['table_stats.table_name']] = entry \ No newline at end of file diff --git a/evadb/functions/function_bootstrap_queries.py b/evadb/functions/function_bootstrap_queries.py index 0d38bff8f2..293e1a0443 100644 --- a/evadb/functions/function_bootstrap_queries.py +++ b/evadb/functions/function_bootstrap_queries.py @@ -17,6 +17,7 @@ from evadb.database import EvaDBDatabase from evadb.server.command_handler import execute_query_fetch_all from evadb.configuration import constants +from collections import Counter NDARRAY_DIR = "ndarray" TUTORIALS_DIR = "tutorials" @@ -198,8 +199,6 @@ EvaDB_INSTALLATION_DIR ) -getdata_from_stats_query = "SELECT * FROM table_stats;" - yolo8n_query = """CREATE FUNCTION IF NOT EXISTS Yolo TYPE ultralytics MODEL 'yolov8n.pt'; @@ -241,19 +240,7 @@ EvaDB_INSTALLATION_DIR ) -create_buckets_query = """CREATE TABLE level_counts ( - level Integer, - row_count INTEGER -);""" - -insert_into_buckets = """INSERT INTO level_counts -SELECT - level, - COUNT(*) AS row_count -FROM mydata3 -GROUP BY level;""" - -extract_data_buckets = """Select * from level_counts""" +get_all_tables = "show tables" def init_builtin_functions(db: EvaDBDatabase, mode: str = "debug") -> None: @@ -298,8 +285,7 @@ def init_builtin_functions(db: EvaDBDatabase, mode: str = "debug") -> None: chatgpt_function_query, face_detection_function_query, # Mvit_function_query, - extract_data_buckets, - getdata_from_stats_query, + get_all_tables, Sift_function_query, Yolo_function_query, stablediffusion_function_query, @@ -325,20 +311,38 @@ def init_builtin_functions(db: EvaDBDatabase, mode: str = "debug") -> None: # ignore exceptions during the bootstrapping phase due to missing packages for query in queries: try: - if query.startswith("SELECT"): - query_result = execute_query_fetch_all( - db, query, do_not_print_exceptions=False, do_not_raise_exceptions=True - ) - for _, row in query_result.iterrows(): - entry = { - 'table_name': row['table_stats.table_name'], - 'num_rows': row['table_stats.num_rows'], - 'hist': row['table_stats.hist'] - } - constants.EVADB_STATS[row['table_stats.table_name']] = entry + if query.startswith("show"): + tables = execute_query_fetch_all( + db, query, do_not_print_exceptions=False, do_not_raise_exceptions=True + ) + # delete all entries in table_stats table. This is a temporary fix because update query is not yet + # available in EvaDB, the table_stats table is deleted and new data is inserted everytime + execute_query_fetch_all(db, "drop table table_stats", False, True) + execute_query_fetch_all(db, "create table table_stats(table_name TEXT, num_rows integer, hist TEXT)", False, True) + # get the histograms for each column for every table + row_count = 0 + for _,table in tables.iterrows(): + if str(table['name']) != "table_stats": + final_result = [] + result_dict = {} + table_data_query = f"select * from {table['name']}" + table_data = execute_query_fetch_all(db, table_data_query, False, True) + for column in table_data.columns: + try: + column_dict = dict(Counter(table_data.column_as_numpy_array(column))) + row_count = len(table_data.column_as_numpy_array(column)) + result_dict[column] = column_dict + except Exception as e: + print(e) + final_result = [{key: value} for key, value in result_dict.items()] + try: + insert_into_table_stats = f"insert into table_stats(table_name, num_rows, hist) values(\"{table['name']}\",{row_count},\"{final_result}\");" + execute_query_fetch_all(db, insert_into_table_stats, False, True) + except Exception as e: + print(e) else: execute_query_fetch_all( - db, query, do_not_print_exceptions=False, do_not_raise_exceptions=True - ) + db, query, do_not_print_exceptions=False, do_not_raise_exceptions=True + ) except Exception as e: pass diff --git a/evadb/interfaces/relational/db.py b/evadb/interfaces/relational/db.py index a8d66a22bf..7ec0a9b80f 100644 --- a/evadb/interfaces/relational/db.py +++ b/evadb/interfaces/relational/db.py @@ -44,6 +44,7 @@ from evadb.server.command_handler import execute_statement from evadb.utils.generic_utils import find_nearest_word, is_ray_enabled_and_installed from evadb.utils.logging_manager import logger +from evadb.executor.cost_estimator_utils import CostEstimatorUtils class EvaDBConnection: @@ -444,6 +445,8 @@ def query(self, sql_query: str) -> EvaDBQuery: 1 3 4 2 5 6 """ + #refresh the table_stats table to get the latest statistics of the database + CostEstimatorUtils.fetch_table_stats(self._evadb, "select * from table_stats;") stmt = parse_query(sql_query) return EvaDBQuery(self._evadb, stmt)