Skip to content

Commit

Permalink
feat: megaparse v54 (#3594)
Browse files Browse the repository at this point in the history
  • Loading branch information
AmineDiro authored Feb 12, 2025
1 parent 699b549 commit bc6d75d
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 27 deletions.
4 changes: 4 additions & 0 deletions core/.flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[flake8]
; Minimal configuration for Flake8 to work with Black.
max-line-length = 100
ignore = E101,E111,E112,E221,E222,E501,E711,E712,W503,W504,F401,E203
2 changes: 1 addition & 1 deletion core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ dependencies = [
"faiss-cpu>=1.8.0.post1",
"rapidfuzz>=3.10.1",
"markupsafe>=2.1.5",
"megaparse-sdk>=0.1.9",
"megaparse-sdk>=0.1.11",
"langchain-mistralai>=0.2.3",
"fasttext-langdetect>=1.0.5",
"langfuse>=2.57.0",
Expand Down
10 changes: 6 additions & 4 deletions core/quivr_core/processor/implementations/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
)
from langchain_community.document_loaders.base import BaseLoader
from langchain_community.document_loaders.text import TextLoader
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter, TextSplitter

from quivr_core.files.file import FileExtension, QuivrFile
from quivr_core.processor.processor_base import ProcessorBase
from quivr_core.processor.processor_base import ProcessedDocument, ProcessorBase
from quivr_core.processor.splitter import SplitterConfig

logger = logging.getLogger("quivr_core")
Expand Down Expand Up @@ -74,7 +73,7 @@ def processor_metadata(self) -> dict[str, Any]:
"splitter": self.splitter_config.model_dump(),
}

async def process_file_inner(self, file: QuivrFile) -> list[Document]:
async def process_file_inner(self, file: QuivrFile) -> ProcessedDocument[None]:
if hasattr(self.loader_cls, "__init__"):
# NOTE: mypy can't correctly type this as BaseLoader doesn't have a constructor method
loader = self.loader_cls(file_path=str(file.path), **self.loader_kwargs) # type: ignore
Expand All @@ -85,9 +84,12 @@ async def process_file_inner(self, file: QuivrFile) -> list[Document]:
docs = self.text_splitter.split_documents(documents)

for doc in docs:
# TODO: This metadata info should be typed
doc.metadata = {"chunk_size": len(enc.encode(doc.page_content))}

return docs
return ProcessedDocument(
chunks=docs, processor_cls=cls_name, processor_response=None
)

return type(cls_name, (ProcessorInit,), dict(_Processor.__dict__))

Expand Down
24 changes: 15 additions & 9 deletions core/quivr_core/processor/implementations/megaparse_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@
from langchain_text_splitters import RecursiveCharacterTextSplitter, TextSplitter
from megaparse_sdk.client import MegaParseNATSClient
from megaparse_sdk.config import ClientNATSConfig
from megaparse_sdk.schema.document import Document as MPDocument

from quivr_core.config import MegaparseConfig
from quivr_core.files.file import QuivrFile
from quivr_core.processor.processor_base import ProcessorBase
from quivr_core.processor.processor_base import ProcessedDocument, ProcessorBase
from quivr_core.processor.registry import FileExtension
from quivr_core.processor.splitter import SplitterConfig

logger = logging.getLogger("quivr_core")


class MegaparseProcessor(ProcessorBase):
class MegaparseProcessor(ProcessorBase[MPDocument]):
"""
Megaparse processor for PDF files.
Expand Down Expand Up @@ -72,17 +73,22 @@ def processor_metadata(self):
"chunk_overlap": self.splitter_config.chunk_overlap,
}

async def process_file_inner(self, file: QuivrFile) -> list[Document]:
async def process_file_inner(
self, file: QuivrFile
) -> ProcessedDocument[MPDocument | str]:
logger.info(f"Uploading file {file.path} to MegaParse")
async with MegaParseNATSClient(ClientNATSConfig()) as client:
response = await client.parse_file(file=file.path)

logger.info(f"File : {response}")
document = Document(
page_content=response,
page_content=str(response),
)

docs = self.text_splitter.split_documents([document])
for doc in docs:
doc.metadata = {"chunk_size": len(self.enc.encode(doc.page_content))}
return docs
chunks = self.text_splitter.split_documents([document])
for chunk in chunks:
chunk.metadata = {"chunk_size": len(self.enc.encode(chunk.page_content))}
return ProcessedDocument(
chunks=chunks,
processor_cls="MegaparseProcessor",
processor_response=response,
)
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from langchain_core.documents import Document

from quivr_core.files.file import QuivrFile
from quivr_core.processor.processor_base import ProcessorBase
from quivr_core.processor.processor_base import ProcessedDocument, ProcessorBase
from quivr_core.processor.registry import FileExtension
from quivr_core.processor.splitter import SplitterConfig

Expand Down Expand Up @@ -47,7 +47,7 @@ def processor_metadata(self) -> dict[str, Any]:
"splitter": self.splitter_config.model_dump(),
}

async def process_file_inner(self, file: QuivrFile) -> list[Document]:
async def process_file_inner(self, file: QuivrFile) -> ProcessedDocument[str]:
async with aiofiles.open(file.path, mode="r") as f:
content = await f.read()

Expand All @@ -57,4 +57,6 @@ async def process_file_inner(self, file: QuivrFile) -> list[Document]:
doc, self.splitter_config.chunk_size, self.splitter_config.chunk_overlap
)

return docs
return ProcessedDocument(
chunks=docs, processor_cls="SimpleTxtProcessor", processor_response=content
)
10 changes: 6 additions & 4 deletions core/quivr_core/processor/implementations/tika_processor.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import tiktoken
import logging
import os
from typing import AsyncIterable

import httpx
import tiktoken
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter, TextSplitter

from quivr_core.files.file import QuivrFile
from quivr_core.processor.processor_base import ProcessorBase
from quivr_core.processor.processor_base import ProcessedDocument, ProcessorBase
from quivr_core.processor.registry import FileExtension
from quivr_core.processor.splitter import SplitterConfig

Expand Down Expand Up @@ -70,12 +70,14 @@ def processor_metadata(self):
"chunk_overlap": self.splitter_config.chunk_overlap,
}

async def process_file_inner(self, file: QuivrFile) -> list[Document]:
async def process_file_inner(self, file: QuivrFile) -> ProcessedDocument[None]:
async with file.open() as f:
txt = await self._send_parse_tika(f)
document = Document(page_content=txt)
docs = self.text_splitter.split_documents([document])
for doc in docs:
doc.metadata = {"chunk_size": len(self.enc.encode(doc.page_content))}

return docs
return ProcessedDocument(
chunks=docs, processor_cls="TikaProcessor", processor_response=None
)
23 changes: 17 additions & 6 deletions core/quivr_core/processor/processor_base.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import logging
from abc import ABC, abstractmethod
from importlib.metadata import PackageNotFoundError, version
from typing import Any
from typing import Any, Generic, List, TypeVar

from attr import dataclass
from langchain_core.documents import Document

from quivr_core.files.file import FileExtension, QuivrFile
Expand All @@ -11,13 +12,23 @@
logger = logging.getLogger("quivr_core")


R = TypeVar("R", covariant=True)


@dataclass
class ProcessedDocument(Generic[R]):
chunks: List[Document]
processor_cls: str
processor_response: R


# TODO: processors should be cached somewhere ?
# The processor should be cached by processor type
# The cache should use a single
class ProcessorBase(ABC):
class ProcessorBase(ABC, Generic[R]):
supported_extensions: list[FileExtension | str]

def check_supported(self, file: QuivrFile):
def check_supported(self, file: QuivrFile) -> None:
if file.file_extension not in self.supported_extensions:
raise ValueError(f"can't process a file of type {file.file_extension}")

Expand All @@ -26,7 +37,7 @@ def check_supported(self, file: QuivrFile):
def processor_metadata(self) -> dict[str, Any]:
raise NotImplementedError

async def process_file(self, file: QuivrFile) -> list[Document]:
async def process_file(self, file: QuivrFile) -> ProcessedDocument[R]:
logger.debug(f"Processing file {file}")
self.check_supported(file)
docs = await self.process_file_inner(file)
Expand All @@ -35,7 +46,7 @@ async def process_file(self, file: QuivrFile) -> list[Document]:
except PackageNotFoundError:
qvr_version = "dev"

for idx, doc in enumerate(docs, start=1):
for idx, doc in enumerate(docs.chunks, start=1):
if "original_file_name" in doc.metadata:
doc.page_content = f"Filename: {doc.metadata['original_file_name']} Content: {doc.page_content}"
doc.page_content = doc.page_content.replace("\u0000", "")
Expand All @@ -56,5 +67,5 @@ async def process_file(self, file: QuivrFile) -> list[Document]:
return docs

@abstractmethod
async def process_file_inner(self, file: QuivrFile) -> list[Document]:
async def process_file_inner(self, file: QuivrFile) -> ProcessedDocument[R]:
raise NotImplementedError

0 comments on commit bc6d75d

Please sign in to comment.