Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: create index from single document #1073

Merged
merged 5 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions evadb/catalog/sql_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@

from evadb.utils.generic_utils import is_postgres_uri, parse_config_yml

# Permanent identifier column.
IDENTIFIER_COLUMN = "_row_id"

# Runtime generated column.
ROW_NUM_COLUMN = "_row_number"
ROW_NUM_MAGIC = 0xFFFFFFFF

CATALOG_TABLES = [
"column_catalog",
"table_catalog",
Expand Down
6 changes: 3 additions & 3 deletions evadb/executor/create_index_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import pandas as pd

from evadb.catalog.sql_config import IDENTIFIER_COLUMN
from evadb.catalog.sql_config import ROW_NUM_COLUMN
from evadb.database import EvaDBDatabase
from evadb.executor.abstract_executor import AbstractExecutor
from evadb.executor.executor_utils import ExecutorError, handle_vector_store_params
Expand Down Expand Up @@ -87,7 +87,7 @@ def _create_index(self):
# array. Use zero index to get the actual numpy array.
feat = input_batch.column_as_numpy_array(feat_col_name)

row_id = input_batch.column_as_numpy_array(IDENTIFIER_COLUMN)
row_num = input_batch.column_as_numpy_array(ROW_NUM_COLUMN)

for i in range(len(input_batch)):
row_feat = feat[i].reshape(1, -1)
Expand All @@ -103,7 +103,7 @@ def _create_index(self):
self.index.create(input_dim)

# Row ID for mapping back to the row.
self.index.add([FeaturePayload(row_id[i], row_feat)])
self.index.add([FeaturePayload(row_num[i], row_feat)])

# Persist index.
self.index.persist()
Expand Down
22 changes: 11 additions & 11 deletions evadb/executor/vector_index_scan_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import pandas as pd

from evadb.catalog.sql_config import IDENTIFIER_COLUMN
from evadb.catalog.sql_config import ROW_NUM_COLUMN
from evadb.database import EvaDBDatabase
from evadb.executor.abstract_executor import AbstractExecutor
from evadb.executor.executor_utils import handle_vector_store_params
Expand All @@ -27,11 +27,11 @@
from evadb.utils.logging_manager import logger


# Helper function for getting row_id column alias.
def get_row_id_column_alias(column_list):
# Helper function for getting row_num column alias.
def get_row_num_column_alias(column_list):
for column in column_list:
alias, col_name = column.split(".")
if col_name == IDENTIFIER_COLUMN:
if col_name == ROW_NUM_COLUMN:
return alias


Expand Down Expand Up @@ -74,10 +74,10 @@ def exec(self, *args, **kwargs) -> Iterator[Batch]:
)
# todo support queries over distance as well
# distance_list = index_result.similarities
row_id_np = index_result.ids
row_num_np = index_result.ids

# Load projected columns from disk and join with search results.
row_id_col_name = None
row_num_col_name = None

# handle the case where the index_results are less than self.limit_count.value
num_required_results = self.limit_count.value
Expand All @@ -90,14 +90,14 @@ def exec(self, *args, **kwargs) -> Iterator[Batch]:
res_row_list = [None for _ in range(num_required_results)]
for batch in self.children[0].exec(**kwargs):
column_list = batch.columns
if not row_id_col_name:
row_id_alias = get_row_id_column_alias(column_list)
row_id_col_name = "{}.{}".format(row_id_alias, IDENTIFIER_COLUMN)
if not row_num_col_name:
row_num_alias = get_row_num_column_alias(column_list)
row_num_col_name = "{}.{}".format(row_num_alias, ROW_NUM_COLUMN)

# Nested join.
for _, row in batch.frames.iterrows():
for idx, rid in enumerate(row_id_np):
if rid == row[row_id_col_name]:
for idx, row_num in enumerate(row_num_np):
if row_num == row[row_num_col_name]:
res_row = dict()
for col_name in column_list:
res_row[col_name] = row[col_name]
Expand Down
3 changes: 3 additions & 0 deletions evadb/readers/decord_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import numpy as np

from evadb.catalog.catalog_type import VideoColumnName
from evadb.catalog.sql_config import ROW_NUM_COLUMN
from evadb.constants import AUDIORATE, IFRAMES
from evadb.expression.abstract_expression import AbstractExpression
from evadb.expression.expression_utils import extract_range_list_from_predicate
Expand Down Expand Up @@ -126,6 +127,7 @@ def __get_video_frame(self, frame_id):

return {
VideoColumnName.id.name: frame_id,
ROW_NUM_COLUMN: frame_id,
VideoColumnName.data.name: frame_video,
VideoColumnName.seconds.name: round(timestamp, 2),
}
Expand All @@ -136,6 +138,7 @@ def __get_audio_frame(self, frame_id):

return {
VideoColumnName.id.name: frame_id,
ROW_NUM_COLUMN: frame_id,
VideoColumnName.data.name: np.empty(0),
VideoColumnName.seconds.name: 0.0,
VideoColumnName.audio.name: frame_audio,
Expand Down
9 changes: 8 additions & 1 deletion evadb/readers/document/document_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from pathlib import Path
from typing import Dict, Iterator

from evadb.catalog.sql_config import ROW_NUM_COLUMN
from evadb.readers.abstract_reader import AbstractReader
from evadb.readers.document.registry import (
_lazy_import_loader,
Expand Down Expand Up @@ -44,8 +45,14 @@ def _read(self) -> Iterator[Dict]:
chunk_size=self._chunk_size, chunk_overlap=self._chunk_overlap
)

row_num = 0
for data in loader.load():
for chunk_id, row in enumerate(
langchain_text_splitter.split_documents([data])
):
yield {"chunk_id": chunk_id, "data": row.page_content}
yield {
"chunk_id": chunk_id,
"data": row.page_content,
ROW_NUM_COLUMN: row_num,
}
row_num += 1
4 changes: 4 additions & 0 deletions evadb/readers/pdf_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.
from typing import Dict, Iterator

from evadb.catalog.sql_config import ROW_NUM_COLUMN
from evadb.readers.abstract_reader import AbstractReader
from evadb.utils.generic_utils import try_to_import_fitz

Expand All @@ -35,6 +36,7 @@ def _read(self) -> Iterator[Dict]:
doc = fitz.open(self.file_url)

# PAGE ID, PARAGRAPH ID, STRING
row_num = 0
for page_no, page in enumerate(doc):
blocks = page.get_text("dict")["blocks"]
# iterate through the text blocks
Expand All @@ -51,7 +53,9 @@ def _read(self) -> Iterator[Dict]:
if span["text"].strip():
block_string += span["text"]
yield {
ROW_NUM_COLUMN: row_num,
"page": page_no + 1,
"paragraph": paragraph_no + 1,
"data": block_string,
}
row_num += 1
6 changes: 5 additions & 1 deletion evadb/storage/document_storage_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from typing import Iterator

from evadb.catalog.models.table_catalog import TableCatalogEntry
from evadb.catalog.sql_config import ROW_NUM_COLUMN, ROW_NUM_MAGIC
from evadb.database import EvaDBDatabase
from evadb.models.storage.batch import Batch
from evadb.readers.document.document_reader import DocumentReader
Expand All @@ -28,7 +29,7 @@ def __init__(self, db: EvaDBDatabase):

def read(self, table: TableCatalogEntry, chunk_params: dict) -> Iterator[Batch]:
for doc_files in self._rdb_handler.read(self._get_metadata_table(table), 12):
for _, (row_id, file_name) in doc_files.iterrows():
for _, (row_id, file_name, _) in doc_files.iterrows():
system_file_name = self._xform_file_url_to_file_name(file_name)
doc_file = Path(table.file_url) / system_file_name
# setting batch_mem_size = 1, we need fix it
Expand All @@ -38,4 +39,7 @@ def read(self, table: TableCatalogEntry, chunk_params: dict) -> Iterator[Batch]:
for batch in reader.read():
batch.frames[table.columns[0].name] = row_id
batch.frames[table.columns[1].name] = str(file_name)
batch.frames[ROW_NUM_COLUMN] = (
row_id * ROW_NUM_MAGIC + batch.frames[ROW_NUM_COLUMN]
)
yield batch
4 changes: 3 additions & 1 deletion evadb/storage/image_storage_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from typing import Iterator

from evadb.catalog.models.table_catalog import TableCatalogEntry
from evadb.catalog.sql_config import ROW_NUM_COLUMN
from evadb.database import EvaDBDatabase
from evadb.models.storage.batch import Batch
from evadb.readers.image.opencv_image_reader import CVImageReader
Expand All @@ -28,12 +29,13 @@ def __init__(self, db: EvaDBDatabase):

def read(self, table: TableCatalogEntry) -> Iterator[Batch]:
for image_files in self._rdb_handler.read(self._get_metadata_table(table)):
for _, (row_id, file_name) in image_files.iterrows():
for _, (row_id, file_name, _) in image_files.iterrows():
system_file_name = self._xform_file_url_to_file_name(file_name)
image_file = Path(table.file_url) / system_file_name
# setting batch_mem_size = 1, we need fix it
reader = CVImageReader(str(image_file), batch_mem_size=1)
for batch in reader.read():
batch.frames[table.columns[0].name] = row_id
batch.frames[table.columns[1].name] = str(file_name)
batch.frames[ROW_NUM_COLUMN] = batch.frames[table.columns[0].name]
yield batch
6 changes: 5 additions & 1 deletion evadb/storage/pdf_storage_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from typing import Iterator

from evadb.catalog.models.table_catalog import TableCatalogEntry
from evadb.catalog.sql_config import ROW_NUM_COLUMN, ROW_NUM_MAGIC
from evadb.database import EvaDBDatabase
from evadb.models.storage.batch import Batch
from evadb.readers.pdf_reader import PDFReader
Expand All @@ -28,12 +29,15 @@ def __init__(self, db: EvaDBDatabase):

def read(self, table: TableCatalogEntry) -> Iterator[Batch]:
for image_files in self._rdb_handler.read(self._get_metadata_table(table), 12):
for _, (row_id, file_name) in image_files.iterrows():
for _, (row_id, file_name, _) in image_files.iterrows():
system_file_name = self._xform_file_url_to_file_name(file_name)
image_file = Path(table.file_url) / system_file_name
# setting batch_mem_size = 1, we need fix it
reader = PDFReader(str(image_file), batch_mem_size=1)
for batch in reader.read():
batch.frames[table.columns[0].name] = row_id
batch.frames[table.columns[1].name] = str(file_name)
batch.frames[ROW_NUM_COLUMN] = (
row_id * ROW_NUM_MAGIC + batch.frames[ROW_NUM_COLUMN]
)
yield batch
19 changes: 15 additions & 4 deletions evadb/storage/sqlite_storage_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from evadb.catalog.models.column_catalog import ColumnCatalogEntry
from evadb.catalog.models.table_catalog import TableCatalogEntry
from evadb.catalog.schema_utils import SchemaUtils
from evadb.catalog.sql_config import IDENTIFIER_COLUMN
from evadb.catalog.sql_config import IDENTIFIER_COLUMN, ROW_NUM_COLUMN
from evadb.database import EvaDBDatabase
from evadb.models.storage.batch import Batch
from evadb.parser.table_ref import TableInfo
Expand Down Expand Up @@ -67,6 +67,7 @@ def _deserialize_sql_row(self, sql_row: dict, columns: List[ColumnCatalogEntry])
dict_row[col.name] = self._serializer.deserialize(sql_row[col.name])
else:
dict_row[col.name] = sql_row[col.name]
dict_row[ROW_NUM_COLUMN] = dict_row[IDENTIFIER_COLUMN]
return dict_row

def _try_loading_table_via_reflection(self, table_name: str):
Expand Down Expand Up @@ -94,7 +95,11 @@ def create(self, table: TableCatalogEntry, **kwargs):

# During table creation, assume row_id is automatically handled by
# the sqlalchemy engine.
table_columns = [col for col in table.columns if col.name != IDENTIFIER_COLUMN]
table_columns = [
col
for col in table.columns
if (col.name != IDENTIFIER_COLUMN and col.name != ROW_NUM_COLUMN)
]
sqlalchemy_schema = SchemaUtils.xform_to_sqlalchemy_schema(table_columns)
attr_dict.update(sqlalchemy_schema)

Expand Down Expand Up @@ -148,12 +153,18 @@ def write(self, table: TableCatalogEntry, rows: Batch):
# the sqlalchemy engine. Another assumption we make here is the
# updated data need not to take care of row_id.
table_columns = [
col for col in table.columns if col.name != IDENTIFIER_COLUMN
col
for col in table.columns
if (col.name != IDENTIFIER_COLUMN and col.name != ROW_NUM_COLUMN)
]

# Todo: validate the data type before inserting into the table
for record in rows.frames.values:
row_data = {col: record[idx] for idx, col in enumerate(columns)}
row_data = {
col: record[idx]
for idx, col in enumerate(columns)
if col != ROW_NUM_COLUMN
}
data.append(self._dict_to_sql_row(row_data, table_columns))
self._sql_session.execute(table_to_update.insert(), data)
self._sql_session.commit()
Expand Down
6 changes: 5 additions & 1 deletion evadb/storage/video_storage_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from typing import Iterator

from evadb.catalog.models.table_catalog import TableCatalogEntry
from evadb.catalog.sql_config import ROW_NUM_COLUMN, ROW_NUM_MAGIC
from evadb.database import EvaDBDatabase
from evadb.expression.abstract_expression import AbstractExpression
from evadb.models.storage.batch import Batch
Expand All @@ -39,7 +40,7 @@ def read(
read_video: bool = True,
) -> Iterator[Batch]:
for video_files in self._rdb_handler.read(self._get_metadata_table(table), 12):
for _, (row_id, video_file_name) in video_files.iterrows():
for _, (row_id, video_file_name, _) in video_files.iterrows():
system_file_name = self._xform_file_url_to_file_name(video_file_name)
video_file = Path(table.file_url) / system_file_name
# increase batch size when reading audio so that
Expand All @@ -58,4 +59,7 @@ def read(
for batch in reader.read():
batch.frames[table.columns[0].name] = row_id
batch.frames[table.columns[1].name] = str(video_file_name)
batch.frames[ROW_NUM_COLUMN] = (
row_id * ROW_NUM_MAGIC + batch.frames[ROW_NUM_COLUMN]
)
yield batch
Loading