Skip to content

Commit

Permalink
make the summary change non-breaking (#1576)
Browse files Browse the repository at this point in the history
* make the summary change non-breaking

* rollbk
  • Loading branch information
emrgnt-cmplxty authored Nov 12, 2024
1 parent 960543d commit a8e05de
Show file tree
Hide file tree
Showing 4 changed files with 319 additions and 177 deletions.
122 changes: 84 additions & 38 deletions py/core/providers/database/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,38 +46,41 @@ async def create_tables(self):
logger.info(
f"Creating table, if not exists: {self._get_table_name(PostgresDocumentHandler.TABLE_NAME)}"
)
query = f"""
CREATE TABLE IF NOT EXISTS {self._get_table_name(PostgresDocumentHandler.TABLE_NAME)} (
document_id UUID PRIMARY KEY,
collection_ids UUID[],
user_id UUID,
type TEXT,
metadata JSONB,
title TEXT,
summary TEXT,
summary_embedding vector({self.dimension}) NULL,
version TEXT,
size_in_bytes INT,
ingestion_status TEXT DEFAULT 'pending',
kg_extraction_status TEXT DEFAULT 'pending',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
ingestion_attempt_number INT DEFAULT 0,
doc_search_vector tsvector GENERATED ALWAYS AS (
setweight(to_tsvector('english', COALESCE(title, '')), 'A') ||
setweight(to_tsvector('english', COALESCE(summary, '')), 'B') ||
setweight(to_tsvector('english', COALESCE((metadata->>'description')::text, '')), 'C')
) STORED
);
CREATE INDEX IF NOT EXISTS idx_collection_ids_{self.project_name}
ON {self._get_table_name(PostgresDocumentHandler.TABLE_NAME)} USING GIN (collection_ids);
-- Full text search index
CREATE INDEX IF NOT EXISTS idx_doc_search_{self.project_name}
ON {self._get_table_name(PostgresDocumentHandler.TABLE_NAME)}
USING GIN (doc_search_vector);
"""
await self.connection_manager.execute_query(query)
try:
query = f"""
CREATE TABLE IF NOT EXISTS {self._get_table_name(PostgresDocumentHandler.TABLE_NAME)} (
document_id UUID PRIMARY KEY,
collection_ids UUID[],
user_id UUID,
type TEXT,
metadata JSONB,
title TEXT,
summary TEXT NULL,
summary_embedding vector({self.dimension}) NULL,
version TEXT,
size_in_bytes INT,
ingestion_status TEXT DEFAULT 'pending',
kg_extraction_status TEXT DEFAULT 'pending',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
ingestion_attempt_number INT DEFAULT 0,
doc_search_vector tsvector GENERATED ALWAYS AS (
setweight(to_tsvector('english', COALESCE(title, '')), 'A') ||
setweight(to_tsvector('english', COALESCE(summary, '')), 'B') ||
setweight(to_tsvector('english', COALESCE((metadata->>'description')::text, '')), 'C')
) STORED
);
CREATE INDEX IF NOT EXISTS idx_collection_ids_{self.project_name}
ON {self._get_table_name(PostgresDocumentHandler.TABLE_NAME)} USING GIN (collection_ids);
-- Full text search index
CREATE INDEX IF NOT EXISTS idx_doc_search_{self.project_name}
ON {self._get_table_name(PostgresDocumentHandler.TABLE_NAME)}
USING GIN (doc_search_vector);
"""
await self.connection_manager.execute_query(query)
except Exception as e:
logger.warning(f"Error {e} when creating document table.")

async def upsert_documents_overview(
self, documents_overview: Union[DocumentInfo, list[DocumentInfo]]
Expand Down Expand Up @@ -415,11 +418,51 @@ async def get_documents_overview(
if conditions:
base_query += " WHERE " + " AND ".join(conditions)

# query = f"""
# SELECT document_id, collection_ids, user_id, type, metadata, title, version,
# size_in_bytes, ingestion_status, kg_extraction_status, created_at, updated_at,
# summary, summary_embedding,
# COUNT(*) OVER() AS total_entries
# {base_query}
# ORDER BY created_at DESC
# OFFSET ${param_index}
# """

# First check if the new columns exist
try:
check_query = f"""
SELECT EXISTS (
SELECT 1
FROM information_schema.columns
WHERE table_name = '{self._get_table_name(PostgresDocumentHandler.TABLE_NAME)}'
AND column_name = 'summary'
);
"""
has_new_columns = await self.connection_manager.fetch_query(
check_query
)
has_new_columns = has_new_columns[0]["exists"]
except Exception as e:
logger.warning(f"Error checking for new columns: {e}")
has_new_columns = False

# Construct the SELECT part of the query based on column existence
if has_new_columns:
select_fields = """
SELECT document_id, collection_ids, user_id, type, metadata, title, version,
size_in_bytes, ingestion_status, kg_extraction_status, created_at, updated_at,
summary, summary_embedding,
COUNT(*) OVER() AS total_entries
"""
else:
select_fields = """
SELECT document_id, collection_ids, user_id, type, metadata, title, version,
size_in_bytes, ingestion_status, kg_extraction_status, created_at, updated_at,
COUNT(*) OVER() AS total_entries
"""

query = f"""
SELECT document_id, collection_ids, user_id, type, metadata, title, version,
size_in_bytes, ingestion_status, kg_extraction_status, created_at, updated_at,
summary, summary_embedding,
COUNT(*) OVER() AS total_entries
{select_fields}
{base_query}
ORDER BY created_at DESC
OFFSET ${param_index}
Expand All @@ -440,7 +483,10 @@ async def get_documents_overview(
for row in results:
# Safely handle the embedding
embedding = None
if row["summary_embedding"] is not None:
if (
"summary_embedding" in row
and row["summary_embedding"] is not None
):
try:
# Parse the vector string returned by Postgres
embedding_str = row["summary_embedding"]
Expand Down Expand Up @@ -475,7 +521,7 @@ async def get_documents_overview(
),
created_at=row["created_at"],
updated_at=row["updated_at"],
summary=row["summary"],
summary=row["summary"] if "summary" in row else None,
summary_embedding=embedding,
)
)
Expand Down
Loading

0 comments on commit a8e05de

Please sign in to comment.