From 9c0f6f9d14596b2b3f0aec45cde10e291093dfda Mon Sep 17 00:00:00 2001 From: Michael Skarlinski Date: Tue, 12 Mar 2024 09:31:46 -0700 Subject: [PATCH] separate parsing and chunking in read_doc add optional metadata --- paperqa/docs.py | 8 +- paperqa/readers.py | 269 +++++++++++++++++++++++++++++------------- paperqa/types.py | 40 +++++++ tests/test_paperqa.py | 39 +++++- 4 files changed, 271 insertions(+), 85 deletions(-) diff --git a/paperqa/docs.py b/paperqa/docs.py index cfdab5ce2..5cf36e4a0 100644 --- a/paperqa/docs.py +++ b/paperqa/docs.py @@ -353,7 +353,7 @@ async def aadd( texts = read_doc(path, fake_doc, chunk_chars=chunk_chars, overlap=100) if len(texts) == 0: raise ValueError(f"Could not read document {path}. Is it empty?") - chain_result = await cite_chain({"text": texts[0].text}, None) + chain_result = await cite_chain({"text": texts[0].text}, None) # type: ignore[union-attr] citation = chain_result.text if ( len(citation) < 3 # noqa: PLR2004 @@ -385,13 +385,13 @@ async def aadd( # loose check to see if document was loaded if ( len(texts) == 0 - or len(texts[0].text) < 10 # noqa: PLR2004 - or (not disable_check and not maybe_is_text(texts[0].text)) + or len(texts[0].text) < 10 # type: ignore[union-attr] # noqa: PLR2004 + or (not disable_check and not maybe_is_text(texts[0].text)) # type: ignore[union-attr] ): raise ValueError( f"This does not look like a text document: {path}. Path disable_check to ignore this error." ) - if await self.aadd_texts(texts, doc): + if await self.aadd_texts(texts, doc): # type: ignore[arg-type] return docname return None diff --git a/paperqa/readers.py b/paperqa/readers.py index 06b614487..6dc9248ed 100644 --- a/paperqa/readers.py +++ b/paperqa/readers.py @@ -4,56 +4,70 @@ from pathlib import Path from typing import List # noqa: F401 +import html2text import tiktoken -from html2text import html2text -from .types import Doc, Text +from .types import ChunkMetadata, Doc, ParsedMetadata, ParsedText, Text +from .version import __version__ as pqa_version -def parse_pdf_fitz(path: Path, doc: Doc, chunk_chars: int, overlap: int) -> list[Text]: +def parse_pdf_fitz_to_pages(path: Path) -> ParsedText: import fitz - file = fitz.open(path) - split = "" - pages: list[str] = [] - texts: list[Text] = [] - for i in range(file.page_count): - page = file.load_page(i) - split += page.get_text("text", sort=True) - pages.append(str(i + 1)) - # split could be so long it needs to be split - # into multiple chunks. Or it could be so short - # that it needs to be combined with the next chunk. - while len(split) > chunk_chars: - # pretty formatting of pages (e.g. 1-3, 4, 5-7) - pg = "-".join([pages[0], pages[-1]]) - texts.append( - Text( - text=split[:chunk_chars], name=f"{doc.docname} pages {pg}", doc=doc - ) - ) - split = split[chunk_chars - overlap :] - pages = [str(i + 1)] - if len(split) > overlap or len(texts) == 0: - pg = "-".join([pages[0], pages[-1]]) - texts.append( - Text(text=split[:chunk_chars], name=f"{doc.docname} pages {pg}", doc=doc) - ) - file.close() - return texts + with fitz.open(path) as file: + pages: dict[str, str] = {} + total_length = 0 + for i in range(file.page_count): + page = file.load_page(i) + pages[str(i + 1)] = page.get_text("text", sort=True) + total_length += len(pages[str(i + 1)]) -def parse_pdf(path: Path, doc: Doc, chunk_chars: int, overlap: int) -> list[Text]: + metadata = ParsedMetadata( + parsing_libraries=[f"fitz ({fitz.__doc__})"], + paperqa_version=str(pqa_version), + total_parsed_text_length=total_length, + parse_type="pdf", + ) + return ParsedText(content=pages, metadata=metadata) + + +def parse_pdf_to_pages(path: Path) -> ParsedText: import pypdf - pdfFileObj = open(path, "rb") # noqa: SIM115 - pdfReader = pypdf.PdfReader(pdfFileObj) - split = "" + with open(path, "rb") as pdfFileObj: + pdfReader = pypdf.PdfReader(pdfFileObj) + pages: dict[str, str] = {} + total_length = 0 + + for i, page in enumerate(pdfReader.pages): + pages[str(i + 1)] = page.extract_text() + total_length += len(pages[str(i + 1)]) + + metadata = ParsedMetadata( + parsing_libraries=[f"pypdf ({pypdf.__version__})"], + paperqa_version=str(pqa_version), + total_parsed_text_length=total_length, + parse_type="pdf", + ) + return ParsedText(content=pages, metadata=metadata) + + +def chunk_pdf( + parsed_text: ParsedText, doc: Doc, chunk_chars: int, overlap: int +) -> list[Text]: pages: list[str] = [] texts: list[Text] = [] - for i, page in enumerate(pdfReader.pages): - split += page.extract_text() - pages.append(str(i + 1)) + split: str = "" + + if not isinstance(parsed_text.content, dict): + raise NotImplementedError( + "pdf chunking only implemented per-page, ParsedText.content must be a `dict`." + ) + + for page_num, page_text in parsed_text.content.items(): + split += page_text + pages.append(page_num) # split could be so long it needs to be split # into multiple chunks. Or it could be so short # that it needs to be combined with the next chunk. @@ -66,55 +80,89 @@ def parse_pdf(path: Path, doc: Doc, chunk_chars: int, overlap: int) -> list[Text ) ) split = split[chunk_chars - overlap :] - pages = [str(i + 1)] + pages = [page_num] + if len(split) > overlap or len(texts) == 0: pg = "-".join([pages[0], pages[-1]]) texts.append( Text(text=split[:chunk_chars], name=f"{doc.docname} pages {pg}", doc=doc) ) - pdfFileObj.close() return texts -def parse_txt( - path: Path, doc: Doc, chunk_chars: int, overlap: int, html: bool = False -) -> list[Text]: - """Parse a document into chunks, based on tiktoken encoding. +def parse_text( + path: Path, html: bool = False, split_lines=False, use_tiktoken=True +) -> ParsedText: + """Simple text splitter, can optionally use tiktoken, parse html, or split into newlines. + + Args: + path: path to file + html: flag to use html2text library for parsing + split_lines: flag to split lines into a list + use_tiktoken: flag to use tiktoken library to encode text - NOTE: We get some byte continuation errors. - Currently ignored, but should explore more to make sure we - don't miss anything. """ try: with open(path) as f: - text = f.read() + text = [str(line) for line in f] if split_lines else f.read() except UnicodeDecodeError: with open(path, encoding="utf-8", errors="ignore") as f: text = f.read() + + if html: + text = html2text.html2text(text) + + metadata = { + "parsing_libraries": ["tiktoken (cl100k_base)"] if use_tiktoken else [], + "paperqa_version": str(pqa_version), + "total_parsed_text_length": ( + len(text) if isinstance(text, str) else sum([len(t) for t in text]) + ), + "parse_type": "txt" if not html else "html", + } if html: - text = html2text(text) + metadata["parsing_libraries"].append(f"html2text ({html2text.__version__})") # type: ignore[attr-defined] + + return ParsedText(content=text, metadata=ParsedMetadata(**metadata)) + + +def chunk_text( + parsed_text: ParsedText, doc: Doc, chunk_chars: int, overlap: int, use_tiktoken=True +) -> list[Text]: + """Parse a document into chunks, based on tiktoken encoding. + + NOTE: We get some byte continuation errors. + Currently ignored, but should explore more to make sure we + don't miss anything. + """ texts: list[Text] = [] - # we tokenize using tiktoken so cuts are in reasonable places - # See https://github.com/openai/tiktoken enc = tiktoken.get_encoding("cl100k_base") - encoded = enc.encode_ordinary(text) split = [] + + if not isinstance(parsed_text.content, str): + raise NotImplementedError( + "text chunking only implemented on str, ParsedText.content must be a `str`." + ) + + content = parsed_text.content if not use_tiktoken else parsed_text.encode_content() + # convert from characters to chunks - char_count = len(text) # e.g., 25,000 - token_count = len(encoded) # e.g., 4,500 + char_count = parsed_text.metadata.total_parsed_text_length # e.g., 25,000 + token_count = len(content) # e.g., 4,500 chars_per_token = char_count / token_count # e.g., 5.5 chunk_tokens = chunk_chars / chars_per_token # e.g., 3000 / 5.5 = 545 overlap_tokens = overlap / chars_per_token # e.g., 100 / 5.5 = 18 chunk_count = ceil(token_count / chunk_tokens) # e.g., 4500 / 545 = 9 + for i in range(chunk_count): - split = encoded[ + split = content[ max(int(i * chunk_tokens - overlap_tokens), 0) : int( (i + 1) * chunk_tokens + overlap_tokens ) ] texts.append( Text( - text=enc.decode(split), + text=enc.decode(split) if use_tiktoken else split, name=f"{doc.docname} chunk {i + 1}", doc=doc, ) @@ -122,25 +170,31 @@ def parse_txt( return texts -def parse_code_txt(path: Path, doc: Doc, chunk_chars: int, overlap: int) -> list[Text]: +def chunk_code_text( + parsed_text: ParsedText, doc: Doc, chunk_chars: int, overlap: int +) -> list[Text]: """Parse a document into chunks, based on line numbers (for code).""" split = "" texts: list[Text] = [] last_line = 0 - with open(path) as f: - for i, line in enumerate(f): - split += line - while len(split) > chunk_chars: - texts.append( - Text( - text=split[:chunk_chars], - name=f"{doc.docname} lines {last_line}-{i}", - doc=doc, - ) + if not isinstance(parsed_text.content, list): + raise NotImplementedError( + "code chunking only implemented for one entry per line, ParsedText.content must be a `list`" + ) + + for i, line in enumerate(parsed_text.content): + split += line + while len(split) > chunk_chars: + texts.append( + Text( + text=split[:chunk_chars], + name=f"{doc.docname} lines {last_line}-{i}", + doc=doc, ) - split = split[chunk_chars - overlap :] - last_line = i + ) + split = split[chunk_chars - overlap :] + last_line = i if len(split) > overlap or len(texts) == 0: texts.append( Text( @@ -152,25 +206,80 @@ def parse_code_txt(path: Path, doc: Doc, chunk_chars: int, overlap: int) -> list return texts -def read_doc( +def read_doc( # noqa: PLR0912 path: Path, doc: Doc, chunk_chars: int = 3000, overlap: int = 100, force_pypdf: bool = False, -) -> list[Text]: - """Parse a document into chunks.""" + parsed_text_only: bool = False, + include_metadata: bool = False, +) -> list[Text] | ParsedText | tuple[list[Text], ParsedMetadata]: + """Parse a document and split into chunks. + + Optionally can include just the parsing as well as metadata about the parsing/chunking + + Args: + path: local document path + doc: object with document metadata + chunk_chars: size of chunks + overlap: size of overlap between chunks + force_pypdf: flag to force use of pypdf in parsing + parsed_text_only: return parsed text without chunking + include_metadata: return a tuple + """ str_path = str(path) + parsed_text = None + + # start with parsing -- users may want to store this separately if str_path.endswith(".pdf"): if force_pypdf: - return parse_pdf(path, doc, chunk_chars, overlap) - try: - return parse_pdf_fitz(path, doc, chunk_chars, overlap) - except ImportError: - return parse_pdf(path, doc, chunk_chars, overlap) + parsed_text = parse_pdf_to_pages(path) + else: + try: + parsed_text = parse_pdf_fitz_to_pages(path) + except ImportError: + parsed_text = parse_pdf_to_pages(path) + elif str_path.endswith(".txt"): - return parse_txt(path, doc, chunk_chars, overlap) + parsed_text = parse_text(path, html=False, split_lines=False, use_tiktoken=True) elif str_path.endswith(".html"): - return parse_txt(path, doc, chunk_chars, overlap, html=True) + parsed_text = parse_text(path, html=True, split_lines=False, use_tiktoken=True) else: - return parse_code_txt(path, doc, chunk_chars, overlap) + parsed_text = parse_text(path, html=False, split_lines=True, use_tiktoken=False) + + if parsed_text_only: + return parsed_text + + # next chunk the parsed text + if str_path.endswith(".pdf"): + chunked_text = chunk_pdf( + parsed_text, doc, chunk_chars=chunk_chars, overlap=overlap + ) + chunk_metadata = ChunkMetadata( + chunk_chars=chunk_chars, overlap=overlap, chunk_type="overlap_pdf_by_page" + ) + elif str_path.endswith((".txt", ".html")): + chunked_text = chunk_text( + parsed_text, + doc, + chunk_chars=chunk_chars, + overlap=overlap, + use_tiktoken=True, + ) + chunk_metadata = ChunkMetadata( + chunk_chars=chunk_chars, overlap=overlap, chunk_type="overlap" + ) + else: + chunked_text = chunk_code_text( + parsed_text, doc, chunk_chars=chunk_chars, overlap=overlap + ) + chunk_metadata = ChunkMetadata( + chunk_chars=chunk_chars, overlap=overlap, chunk_type="overlap_code_by_line" + ) + + if include_metadata: + parsed_text.metadata.chunk_metadata = chunk_metadata + return chunked_text, parsed_text.metadata + + return chunked_text diff --git a/paperqa/types.py b/paperqa/types.py index fd203589f..63a25fd77 100644 --- a/paperqa/types.py +++ b/paperqa/types.py @@ -3,6 +3,7 @@ from typing import Any, Callable from uuid import UUID, uuid4 +import tiktoken from pydantic import ( BaseModel, ConfigDict, @@ -22,6 +23,7 @@ summary_prompt, ) from .utils import get_citenames +from .version import __version__ as pqa_version # Just for clarity DocKey = Any @@ -221,3 +223,41 @@ def add_tokens(self, result: LLMResult): else: self.token_counts[result.model][0] += result.prompt_count self.token_counts[result.model][1] += result.completion_count + + +class ChunkMetadata(BaseModel): + """Metadata for chunking algorithm.""" + + chunk_chars: int + overlap: int + chunk_type: str + + +class ParsedMetadata(BaseModel): + """Metadata for parsed text.""" + + parsing_libraries: list[str] + total_parsed_text_length: int + paperqa_version: str = pqa_version + parse_type: str | None = None + chunk_metadata: ChunkMetadata | None = None + + +class ParsedText(BaseModel): + """Parsed text (pre-chunking).""" + + content: dict | str | list[str] + metadata: ParsedMetadata + + def encode_content(self): + # we tokenize using tiktoken so cuts are in reasonable places + # See https://github.com/openai/tiktoken + enc = tiktoken.get_encoding("cl100k_base") + if isinstance(self.content, str): + return enc.encode_ordinary(self.content) + elif isinstance(self.content, list): # noqa: RET505 + return [enc.encode_ordinary(c) for c in self.content] + else: + raise NotImplementedError( + "Encoding only implemented for str and list[str] content." + ) diff --git a/tests/test_paperqa.py b/tests/test_paperqa.py index b82f3663c..aaa19e1b8 100644 --- a/tests/test_paperqa.py +++ b/tests/test_paperqa.py @@ -1234,11 +1234,48 @@ def test_pdf_pypdf_reader(): chunk_chars=3000, ) assert ( - strings_similarity(splits1[0].text.casefold(), splits2[0].text.casefold()) + strings_similarity(splits1[0].text.casefold(), splits2[0].text.casefold()) # type: ignore[union-attr] > 0.85 ) +def test_parser_only_reader(): + tests_dir = os.path.dirname(os.path.abspath(__file__)) + doc_path = os.path.join(tests_dir, "paper.pdf") + parsed_text = read_doc( + doc_path, # type: ignore[arg-type] + Doc(docname="foo", citation="Foo et al, 2002", dockey="1"), + force_pypdf=True, + overlap=100, + chunk_chars=3000, + parsed_text_only=True, + ) + assert parsed_text.metadata.parse_type == "pdf" # type: ignore[union-attr] + assert any("pypdf" in t for t in parsed_text.metadata.parsing_libraries) # type: ignore[union-attr] + assert parsed_text.metadata.chunk_metadata is None # type: ignore[union-attr] + assert parsed_text.metadata.total_parsed_text_length == sum( # type: ignore[union-attr] + [len(t) for t in parsed_text.content.values()] # type: ignore[misc,union-attr] + ) + + +def test_chunk_metadata_reader(): + tests_dir = os.path.dirname(os.path.abspath(__file__)) + doc_path = os.path.join(tests_dir, "paper.pdf") + chunk_text, metadata = read_doc( + doc_path, # type: ignore[arg-type] + Doc(docname="foo", citation="Foo et al, 2002", dockey="1"), + force_pypdf=True, + overlap=100, + chunk_chars=3000, + parsed_text_only=False, + include_metadata=True, + ) + assert metadata.parse_type == "pdf" + assert metadata.chunk_metadata.chunk_type == "overlap_pdf_by_page" # type: ignore[union-attr] + assert metadata.chunk_metadata.overlap == 100 # type: ignore[union-attr] + assert metadata.chunk_metadata.chunk_chars == 3000 # type: ignore[union-attr] + + def test_prompt_length(): doc_path = "example.txt" with open(doc_path, "w", encoding="utf-8") as f: