diff --git a/application/parser/chunking.py b/application/parser/chunking.py new file mode 100644 index 000000000..26f05dbaa --- /dev/null +++ b/application/parser/chunking.py @@ -0,0 +1,118 @@ +import re +from typing import List, Tuple, Union +import logging +from application.parser.schema.base import Document +from application.utils import get_encoding + +logger = logging.getLogger(__name__) + +class Chunker: + def __init__( + self, + chunking_strategy: str = "classic_chunk", + max_tokens: int = 2000, + min_tokens: int = 150, + duplicate_headers: bool = False, + ): + if chunking_strategy not in ["classic_chunk"]: + raise ValueError(f"Unsupported chunking strategy: {chunking_strategy}") + self.chunking_strategy = chunking_strategy + self.max_tokens = max_tokens + self.min_tokens = min_tokens + self.duplicate_headers = duplicate_headers + self.encoding = get_encoding() + + def separate_header_and_body(self, text: str) -> Tuple[str, str]: + header_pattern = r"^(.*?\n){3}" + match = re.match(header_pattern, text) + if match: + header = match.group(0) + body = text[len(header):] + else: + header, body = "", text # No header, treat entire text as body + return header, body + + def combine_documents(self, doc: Document, next_doc: Document) -> Document: + combined_text = doc.text + " " + next_doc.text + combined_token_count = len(self.encoding.encode(combined_text)) + new_doc = Document( + text=combined_text, + doc_id=doc.doc_id, + embedding=doc.embedding, + extra_info={**(doc.extra_info or {}), "token_count": combined_token_count} + ) + return new_doc + + def split_document(self, doc: Document) -> List[Document]: + split_docs = [] + header, body = self.separate_header_and_body(doc.text) + header_tokens = self.encoding.encode(header) if header else [] + body_tokens = self.encoding.encode(body) + + current_position = 0 + part_index = 0 + while current_position < len(body_tokens): + end_position = current_position + self.max_tokens - len(header_tokens) + chunk_tokens = (header_tokens + body_tokens[current_position:end_position] + if self.duplicate_headers or part_index == 0 else body_tokens[current_position:end_position]) + chunk_text = self.encoding.decode(chunk_tokens) + new_doc = Document( + text=chunk_text, + doc_id=f"{doc.doc_id}-{part_index}", + embedding=doc.embedding, + extra_info={**(doc.extra_info or {}), "token_count": len(chunk_tokens)} + ) + split_docs.append(new_doc) + current_position = end_position + part_index += 1 + header_tokens = [] + return split_docs + + def classic_chunk(self, documents: List[Document]) -> List[Document]: + processed_docs = [] + i = 0 + while i < len(documents): + doc = documents[i] + tokens = self.encoding.encode(doc.text) + token_count = len(tokens) + + if self.min_tokens <= token_count <= self.max_tokens: + doc.extra_info = doc.extra_info or {} + doc.extra_info["token_count"] = token_count + processed_docs.append(doc) + i += 1 + elif token_count < self.min_tokens: + if i + 1 < len(documents): + next_doc = documents[i + 1] + next_tokens = self.encoding.encode(next_doc.text) + if token_count + len(next_tokens) <= self.max_tokens: + # Combine small documents + combined_doc = self.combine_documents(doc, next_doc) + processed_docs.append(combined_doc) + i += 2 + else: + # Keep the small document as is if adding next_doc would exceed max_tokens + doc.extra_info = doc.extra_info or {} + doc.extra_info["token_count"] = token_count + processed_docs.append(doc) + i += 1 + else: + # No next document to combine with; add the small document as is + doc.extra_info = doc.extra_info or {} + doc.extra_info["token_count"] = token_count + processed_docs.append(doc) + i += 1 + else: + # Split large documents + processed_docs.extend(self.split_document(doc)) + i += 1 + return processed_docs + + def chunk( + self, + documents: List[Document] + ) -> List[Document]: + if self.chunking_strategy == "classic_chunk": + return self.classic_chunk(documents) + else: + raise ValueError("Unsupported chunking strategy") diff --git a/application/parser/embedding_pipeline.py b/application/parser/embedding_pipeline.py new file mode 100755 index 000000000..6cf40048c --- /dev/null +++ b/application/parser/embedding_pipeline.py @@ -0,0 +1,86 @@ +import os +import logging +from retry import retry +from tqdm import tqdm +from application.core.settings import settings +from application.vectorstore.vector_creator import VectorCreator + + +@retry(tries=10, delay=60) +def add_text_to_store_with_retry(store, doc, source_id): + """ + Add a document's text and metadata to the vector store with retry logic. + Args: + store: The vector store object. + doc: The document to be added. + source_id: Unique identifier for the source. + """ + try: + doc.metadata["source_id"] = str(source_id) + store.add_texts([doc.page_content], metadatas=[doc.metadata]) + except Exception as e: + logging.error(f"Failed to add document with retry: {e}") + raise + + +def embed_and_store_documents(docs, folder_name, source_id, task_status): + """ + Embeds documents and stores them in a vector store. + + Args: + docs (list): List of documents to be embedded and stored. + folder_name (str): Directory to save the vector store. + source_id (str): Unique identifier for the source. + task_status: Task state manager for progress updates. + + Returns: + None + """ + # Ensure the folder exists + if not os.path.exists(folder_name): + os.makedirs(folder_name) + + # Initialize vector store + if settings.VECTOR_STORE == "faiss": + docs_init = [docs.pop(0)] + store = VectorCreator.create_vectorstore( + settings.VECTOR_STORE, + docs_init=docs_init, + source_id=folder_name, + embeddings_key=os.getenv("EMBEDDINGS_KEY"), + ) + else: + store = VectorCreator.create_vectorstore( + settings.VECTOR_STORE, + source_id=source_id, + embeddings_key=os.getenv("EMBEDDINGS_KEY"), + ) + store.delete_index() + + total_docs = len(docs) + + # Process and embed documents + for idx, doc in tqdm( + docs, + desc="Embedding 🦖", + unit="docs", + total=total_docs, + bar_format="{l_bar}{bar}| Time Left: {remaining}", + ): + try: + # Update task status for progress tracking + progress = int((idx / total_docs) * 100) + task_status.update_state(state="PROGRESS", meta={"current": progress}) + + # Add document to vector store + add_text_to_store_with_retry(store, doc, source_id) + except Exception as e: + logging.error(f"Error embedding document {idx}: {e}") + logging.info(f"Saving progress at document {idx} out of {total_docs}") + store.save_local(folder_name) + break + + # Save the vector store + if settings.VECTOR_STORE == "faiss": + store.save_local(folder_name) + logging.info("Vector store saved successfully.") diff --git a/application/parser/open_ai_func.py b/application/parser/open_ai_func.py deleted file mode 100755 index 3109f5839..000000000 --- a/application/parser/open_ai_func.py +++ /dev/null @@ -1,75 +0,0 @@ -import os - -from retry import retry - -from application.core.settings import settings - -from application.vectorstore.vector_creator import VectorCreator - - -# from langchain_community.embeddings import HuggingFaceEmbeddings -# from langchain_community.embeddings import HuggingFaceInstructEmbeddings -# from langchain_community.embeddings import CohereEmbeddings - - -@retry(tries=10, delay=60) -def store_add_texts_with_retry(store, i, id): - # add source_id to the metadata - i.metadata["source_id"] = str(id) - store.add_texts([i.page_content], metadatas=[i.metadata]) - # store_pine.add_texts([i.page_content], metadatas=[i.metadata]) - - -def call_openai_api(docs, folder_name, id, task_status): - # Function to create a vector store from the documents and save it to disk - - if not os.path.exists(f"{folder_name}"): - os.makedirs(f"{folder_name}") - - from tqdm import tqdm - - c1 = 0 - if settings.VECTOR_STORE == "faiss": - docs_init = [docs[0]] - docs.pop(0) - - store = VectorCreator.create_vectorstore( - settings.VECTOR_STORE, - docs_init=docs_init, - source_id=f"{folder_name}", - embeddings_key=os.getenv("EMBEDDINGS_KEY"), - ) - else: - store = VectorCreator.create_vectorstore( - settings.VECTOR_STORE, - source_id=str(id), - embeddings_key=os.getenv("EMBEDDINGS_KEY"), - ) - store.delete_index() - # Uncomment for MPNet embeddings - # model_name = "sentence-transformers/all-mpnet-base-v2" - # hf = HuggingFaceEmbeddings(model_name=model_name) - # store = FAISS.from_documents(docs_test, hf) - s1 = len(docs) - for i in tqdm( - docs, - desc="Embedding 🦖", - unit="docs", - total=len(docs), - bar_format="{l_bar}{bar}| Time Left: {remaining}", - ): - try: - task_status.update_state( - state="PROGRESS", meta={"current": int((c1 / s1) * 100)} - ) - store_add_texts_with_retry(store, i, id) - except Exception as e: - print(e) - print("Error on ", i) - print("Saving progress") - print(f"stopped at {c1} out of {len(docs)}") - store.save_local(f"{folder_name}") - break - c1 += 1 - if settings.VECTOR_STORE == "faiss": - store.save_local(f"{folder_name}") diff --git a/application/parser/token_func.py b/application/parser/token_func.py deleted file mode 100644 index 7511cde09..000000000 --- a/application/parser/token_func.py +++ /dev/null @@ -1,79 +0,0 @@ -import re -from math import ceil -from typing import List - -import tiktoken -from application.parser.schema.base import Document - - -def separate_header_and_body(text): - header_pattern = r"^(.*?\n){3}" - match = re.match(header_pattern, text) - header = match.group(0) - body = text[len(header):] - return header, body - - -def group_documents(documents: List[Document], min_tokens: int, max_tokens: int) -> List[Document]: - docs = [] - current_group = None - - for doc in documents: - doc_len = len(tiktoken.get_encoding("cl100k_base").encode(doc.text)) - - # Check if current group is empty or if the document can be added based on token count and matching metadata - if (current_group is None or - (len(tiktoken.get_encoding("cl100k_base").encode(current_group.text)) + doc_len < max_tokens and - doc_len < min_tokens and - current_group.extra_info == doc.extra_info)): - if current_group is None: - current_group = doc # Use the document directly to retain its metadata - else: - current_group.text += " " + doc.text # Append text to the current group - else: - docs.append(current_group) - current_group = doc # Start a new group with the current document - - if current_group is not None: - docs.append(current_group) - - return docs - - -def split_documents(documents: List[Document], max_tokens: int) -> List[Document]: - docs = [] - for doc in documents: - token_length = len(tiktoken.get_encoding("cl100k_base").encode(doc.text)) - if token_length <= max_tokens: - docs.append(doc) - else: - header, body = separate_header_and_body(doc.text) - if len(tiktoken.get_encoding("cl100k_base").encode(header)) > max_tokens: - body = doc.text - header = "" - num_body_parts = ceil(token_length / max_tokens) - part_length = ceil(len(body) / num_body_parts) - body_parts = [body[i:i + part_length] for i in range(0, len(body), part_length)] - for i, body_part in enumerate(body_parts): - new_doc = Document(text=header + body_part.strip(), - doc_id=f"{doc.doc_id}-{i}", - embedding=doc.embedding, - extra_info=doc.extra_info) - docs.append(new_doc) - return docs - - -def group_split(documents: List[Document], max_tokens: int = 2000, min_tokens: int = 150, token_check: bool = True): - if not token_check: - return documents - print("Grouping small documents") - try: - documents = group_documents(documents=documents, min_tokens=min_tokens, max_tokens=max_tokens) - except Exception: - print("Grouping failed, try running without token_check") - print("Separating large documents") - try: - documents = split_documents(documents=documents, max_tokens=max_tokens) - except Exception: - print("Grouping failed, try running without token_check") - return documents diff --git a/application/worker.py b/application/worker.py index 33cd90e5a..0edb46ff1 100755 --- a/application/worker.py +++ b/application/worker.py @@ -12,10 +12,10 @@ from application.core.mongo_db import MongoDB from application.core.settings import settings from application.parser.file.bulk import SimpleDirectoryReader -from application.parser.open_ai_func import call_openai_api +from application.parser.embedding_pipeline import embed_and_store_documents from application.parser.remote.remote_creator import RemoteCreator from application.parser.schema.base import Document -from application.parser.token_func import group_split +from application.parser.chunking import Chunker from application.utils import count_tokens_docs mongo = MongoDB.get_client() @@ -153,17 +153,19 @@ def ingest_worker( exclude_hidden=exclude, file_metadata=metadata_from_filename, ).load_data() - raw_docs = group_split( - documents=raw_docs, - min_tokens=MIN_TOKENS, + + chunker = Chunker( + chunking_strategy="classic_chunk", max_tokens=MAX_TOKENS, - token_check=token_check, + min_tokens=MIN_TOKENS, + duplicate_headers=False ) + raw_docs = chunker.chunk(documents=raw_docs) docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs] id = ObjectId() - call_openai_api(docs, full_path, id, self) + embed_and_store_documents(docs, full_path, id, self) tokens = count_tokens_docs(docs) self.update_state(state="PROGRESS", meta={"current": 100}) @@ -217,21 +219,23 @@ def remote_worker( remote_loader = RemoteCreator.create_loader(loader) raw_docs = remote_loader.load_data(source_data) - docs = group_split( - documents=raw_docs, - min_tokens=MIN_TOKENS, + chunker = Chunker( + chunking_strategy="classic_chunk", max_tokens=MAX_TOKENS, - token_check=token_check, + min_tokens=MIN_TOKENS, + duplicate_headers=False ) + docs = chunker.chunk(documents=raw_docs) + tokens = count_tokens_docs(docs) if operation_mode == "upload": id = ObjectId() - call_openai_api(docs, full_path, id, self) + embed_and_store_documents(docs, full_path, id, self) elif operation_mode == "sync": if not doc_id or not ObjectId.is_valid(doc_id): raise ValueError("doc_id must be provided for sync operation.") id = ObjectId(doc_id) - call_openai_api(docs, full_path, id, self) + embed_and_store_documents(docs, full_path, id, self) self.update_state(state="PROGRESS", meta={"current": 100}) file_data = {