Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test version #1503

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions application/parser/chunking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import re
from typing import List, Tuple, Union
import logging
from application.parser.schema.base import Document
from application.utils import get_encoding

logger = logging.getLogger(__name__)

class Chunker:
def __init__(
self,
chunking_strategy: str = "classic_chunk",
max_tokens: int = 2000,
min_tokens: int = 150,
duplicate_headers: bool = False,
):
if chunking_strategy not in ["classic_chunk"]:
raise ValueError(f"Unsupported chunking strategy: {chunking_strategy}")
self.chunking_strategy = chunking_strategy
self.max_tokens = max_tokens
self.min_tokens = min_tokens
self.duplicate_headers = duplicate_headers
self.encoding = get_encoding()

Check warning on line 23 in application/parser/chunking.py

View check run for this annotation

Codecov / codecov/patch

application/parser/chunking.py#L17-L23

Added lines #L17 - L23 were not covered by tests

def separate_header_and_body(self, text: str) -> Tuple[str, str]:
header_pattern = r"^(.*?\n){3}"
match = re.match(header_pattern, text)
if match:
header = match.group(0)
body = text[len(header):]

Check warning on line 30 in application/parser/chunking.py

View check run for this annotation

Codecov / codecov/patch

application/parser/chunking.py#L26-L30

Added lines #L26 - L30 were not covered by tests
else:
header, body = "", text # No header, treat entire text as body
return header, body

Check warning on line 33 in application/parser/chunking.py

View check run for this annotation

Codecov / codecov/patch

application/parser/chunking.py#L32-L33

Added lines #L32 - L33 were not covered by tests

def combine_documents(self, doc: Document, next_doc: Document) -> Document:
combined_text = doc.text + " " + next_doc.text
combined_token_count = len(self.encoding.encode(combined_text))
new_doc = Document(

Check warning on line 38 in application/parser/chunking.py

View check run for this annotation

Codecov / codecov/patch

application/parser/chunking.py#L36-L38

Added lines #L36 - L38 were not covered by tests
text=combined_text,
doc_id=doc.doc_id,
embedding=doc.embedding,
extra_info={**(doc.extra_info or {}), "token_count": combined_token_count}
)
return new_doc

Check warning on line 44 in application/parser/chunking.py

View check run for this annotation

Codecov / codecov/patch

application/parser/chunking.py#L44

Added line #L44 was not covered by tests

def split_document(self, doc: Document) -> List[Document]:
split_docs = []
header, body = self.separate_header_and_body(doc.text)
header_tokens = self.encoding.encode(header) if header else []
body_tokens = self.encoding.encode(body)

Check warning on line 50 in application/parser/chunking.py

View check run for this annotation

Codecov / codecov/patch

application/parser/chunking.py#L47-L50

Added lines #L47 - L50 were not covered by tests

current_position = 0
part_index = 0
while current_position < len(body_tokens):
end_position = current_position + self.max_tokens - len(header_tokens)
chunk_tokens = (header_tokens + body_tokens[current_position:end_position]

Check warning on line 56 in application/parser/chunking.py

View check run for this annotation

Codecov / codecov/patch

application/parser/chunking.py#L52-L56

Added lines #L52 - L56 were not covered by tests
if self.duplicate_headers or part_index == 0 else body_tokens[current_position:end_position])
chunk_text = self.encoding.decode(chunk_tokens)
new_doc = Document(

Check warning on line 59 in application/parser/chunking.py

View check run for this annotation

Codecov / codecov/patch

application/parser/chunking.py#L58-L59

Added lines #L58 - L59 were not covered by tests
text=chunk_text,
doc_id=f"{doc.doc_id}-{part_index}",
embedding=doc.embedding,
extra_info={**(doc.extra_info or {}), "token_count": len(chunk_tokens)}
)
split_docs.append(new_doc)
current_position = end_position
part_index += 1
header_tokens = []
return split_docs

Check warning on line 69 in application/parser/chunking.py

View check run for this annotation

Codecov / codecov/patch

application/parser/chunking.py#L65-L69

Added lines #L65 - L69 were not covered by tests

def classic_chunk(self, documents: List[Document]) -> List[Document]:
processed_docs = []
i = 0
while i < len(documents):
doc = documents[i]
tokens = self.encoding.encode(doc.text)
token_count = len(tokens)

Check warning on line 77 in application/parser/chunking.py

View check run for this annotation

Codecov / codecov/patch

application/parser/chunking.py#L72-L77

Added lines #L72 - L77 were not covered by tests

if self.min_tokens <= token_count <= self.max_tokens:
doc.extra_info = doc.extra_info or {}
doc.extra_info["token_count"] = token_count
processed_docs.append(doc)
i += 1
elif token_count < self.min_tokens:
if i + 1 < len(documents):
next_doc = documents[i + 1]
next_tokens = self.encoding.encode(next_doc.text)
if token_count + len(next_tokens) <= self.max_tokens:

Check warning on line 88 in application/parser/chunking.py

View check run for this annotation

Codecov / codecov/patch

application/parser/chunking.py#L79-L88

Added lines #L79 - L88 were not covered by tests
# Combine small documents
combined_doc = self.combine_documents(doc, next_doc)
processed_docs.append(combined_doc)
i += 2

Check warning on line 92 in application/parser/chunking.py

View check run for this annotation

Codecov / codecov/patch

application/parser/chunking.py#L90-L92

Added lines #L90 - L92 were not covered by tests
else:
# Keep the small document as is if adding next_doc would exceed max_tokens
doc.extra_info = doc.extra_info or {}
doc.extra_info["token_count"] = token_count
processed_docs.append(doc)
i += 1

Check warning on line 98 in application/parser/chunking.py

View check run for this annotation

Codecov / codecov/patch

application/parser/chunking.py#L95-L98

Added lines #L95 - L98 were not covered by tests
else:
# No next document to combine with; add the small document as is
doc.extra_info = doc.extra_info or {}
doc.extra_info["token_count"] = token_count
processed_docs.append(doc)
i += 1

Check warning on line 104 in application/parser/chunking.py

View check run for this annotation

Codecov / codecov/patch

application/parser/chunking.py#L101-L104

Added lines #L101 - L104 were not covered by tests
else:
# Split large documents
processed_docs.extend(self.split_document(doc))
i += 1
return processed_docs

Check warning on line 109 in application/parser/chunking.py

View check run for this annotation

Codecov / codecov/patch

application/parser/chunking.py#L107-L109

Added lines #L107 - L109 were not covered by tests

def chunk(
self,
documents: List[Document]
) -> List[Document]:
if self.chunking_strategy == "classic_chunk":
return self.classic_chunk(documents)

Check warning on line 116 in application/parser/chunking.py

View check run for this annotation

Codecov / codecov/patch

application/parser/chunking.py#L115-L116

Added lines #L115 - L116 were not covered by tests
else:
raise ValueError("Unsupported chunking strategy")

Check warning on line 118 in application/parser/chunking.py

View check run for this annotation

Codecov / codecov/patch

application/parser/chunking.py#L118

Added line #L118 was not covered by tests
86 changes: 86 additions & 0 deletions application/parser/embedding_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import os
import logging
from retry import retry
from tqdm import tqdm
from application.core.settings import settings
from application.vectorstore.vector_creator import VectorCreator


@retry(tries=10, delay=60)
def add_text_to_store_with_retry(store, doc, source_id):
"""
Add a document's text and metadata to the vector store with retry logic.
Args:
store: The vector store object.
doc: The document to be added.
source_id: Unique identifier for the source.
"""
try:
doc.metadata["source_id"] = str(source_id)
store.add_texts([doc.page_content], metadatas=[doc.metadata])
except Exception as e:
logging.error(f"Failed to add document with retry: {e}")
raise

Check warning on line 23 in application/parser/embedding_pipeline.py

View check run for this annotation

Codecov / codecov/patch

application/parser/embedding_pipeline.py#L18-L23

Added lines #L18 - L23 were not covered by tests


def embed_and_store_documents(docs, folder_name, source_id, task_status):
"""
Embeds documents and stores them in a vector store.

Args:
docs (list): List of documents to be embedded and stored.
folder_name (str): Directory to save the vector store.
source_id (str): Unique identifier for the source.
task_status: Task state manager for progress updates.

Returns:
None
"""
# Ensure the folder exists
if not os.path.exists(folder_name):
os.makedirs(folder_name)

Check warning on line 41 in application/parser/embedding_pipeline.py

View check run for this annotation

Codecov / codecov/patch

application/parser/embedding_pipeline.py#L40-L41

Added lines #L40 - L41 were not covered by tests

# Initialize vector store
if settings.VECTOR_STORE == "faiss":
docs_init = [docs.pop(0)]
store = VectorCreator.create_vectorstore(

Check warning on line 46 in application/parser/embedding_pipeline.py

View check run for this annotation

Codecov / codecov/patch

application/parser/embedding_pipeline.py#L44-L46

Added lines #L44 - L46 were not covered by tests
settings.VECTOR_STORE,
docs_init=docs_init,
source_id=folder_name,
embeddings_key=os.getenv("EMBEDDINGS_KEY"),
)
else:
store = VectorCreator.create_vectorstore(

Check warning on line 53 in application/parser/embedding_pipeline.py

View check run for this annotation

Codecov / codecov/patch

application/parser/embedding_pipeline.py#L53

Added line #L53 was not covered by tests
settings.VECTOR_STORE,
source_id=source_id,
embeddings_key=os.getenv("EMBEDDINGS_KEY"),
)
store.delete_index()

Check warning on line 58 in application/parser/embedding_pipeline.py

View check run for this annotation

Codecov / codecov/patch

application/parser/embedding_pipeline.py#L58

Added line #L58 was not covered by tests

total_docs = len(docs)

Check warning on line 60 in application/parser/embedding_pipeline.py

View check run for this annotation

Codecov / codecov/patch

application/parser/embedding_pipeline.py#L60

Added line #L60 was not covered by tests

# Process and embed documents
for idx, doc in tqdm(

Check warning on line 63 in application/parser/embedding_pipeline.py

View check run for this annotation

Codecov / codecov/patch

application/parser/embedding_pipeline.py#L63

Added line #L63 was not covered by tests
docs,
desc="Embedding 🦖",
unit="docs",
total=total_docs,
bar_format="{l_bar}{bar}| Time Left: {remaining}",
):
try:

Check warning on line 70 in application/parser/embedding_pipeline.py

View check run for this annotation

Codecov / codecov/patch

application/parser/embedding_pipeline.py#L70

Added line #L70 was not covered by tests
# Update task status for progress tracking
progress = int((idx / total_docs) * 100)
task_status.update_state(state="PROGRESS", meta={"current": progress})

Check warning on line 73 in application/parser/embedding_pipeline.py

View check run for this annotation

Codecov / codecov/patch

application/parser/embedding_pipeline.py#L72-L73

Added lines #L72 - L73 were not covered by tests

# Add document to vector store
add_text_to_store_with_retry(store, doc, source_id)
except Exception as e:
logging.error(f"Error embedding document {idx}: {e}")
logging.info(f"Saving progress at document {idx} out of {total_docs}")
store.save_local(folder_name)
break

Check warning on line 81 in application/parser/embedding_pipeline.py

View check run for this annotation

Codecov / codecov/patch

application/parser/embedding_pipeline.py#L76-L81

Added lines #L76 - L81 were not covered by tests

# Save the vector store
if settings.VECTOR_STORE == "faiss":
store.save_local(folder_name)
logging.info("Vector store saved successfully.")

Check warning on line 86 in application/parser/embedding_pipeline.py

View check run for this annotation

Codecov / codecov/patch

application/parser/embedding_pipeline.py#L84-L86

Added lines #L84 - L86 were not covered by tests
75 changes: 0 additions & 75 deletions application/parser/open_ai_func.py

This file was deleted.

79 changes: 0 additions & 79 deletions application/parser/token_func.py

This file was deleted.

Loading
Loading