Skip to content
This repository has been archived by the owner on Jul 18, 2024. It is now read-only.

Commit

Permalink
Merge pull request #494 from chaojun-zhang/haystack_use_case
Browse files Browse the repository at this point in the history
[v 1.2] [ISSUE 493] Add RecursiveUrlLoader operator and change Cnvrg website RAG to use this operator
  • Loading branch information
xuechendi authored Dec 20, 2023
2 parents 2b3b3ef + 3204fa2 commit 6ea53f5
Show file tree
Hide file tree
Showing 7 changed files with 902 additions and 677 deletions.
209 changes: 111 additions & 98 deletions RecDP/examples/notebooks/llmutils/rag_cnvrg_pipeline.ipynb

Large diffs are not rendered by default.

1,181 changes: 631 additions & 550 deletions RecDP/examples/notebooks/llmutils/rag_cnvrg_pipeline_step_by_step.ipynb

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion RecDP/pyrecdp/primitives/operations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
from .text_perplexity_score import TextPerplexityScore
from .random_select import RandomSelect
from .text_ingestion import DocumentIngestion
from .doc_loader import DirectoryLoader, DocumentLoader, UrlLoader, YoutubeLoader
from .doc_loader import DirectoryLoader, DocumentLoader, UrlLoader, RecursiveUrlLoader, YoutubeLoader
from .text_to_qa import TextToQA
from .table_summary import TableSummary
from .text_spell_correct import TextSpellCorrect
Expand Down
130 changes: 120 additions & 10 deletions RecDP/pyrecdp/primitives/operations/doc_loader.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import os
from typing import Optional, List, Callable
import re
from typing import Optional, List, Callable, Union, Sequence
from urllib.parse import urlparse, urlunparse, urljoin

import requests

from pyrecdp.core.import_utils import check_availability_and_install
from pyrecdp.core.import_utils import import_langchain, import_markdownify, import_beautiful_soup
from pyrecdp.primitives.llmutils.document.schema import Document
from pyrecdp.primitives.operations.base import BaseLLMOperation, LLMOPERATORS
from pyrecdp.primitives.operations.base import LLMOPERATORS
from pyrecdp.primitives.operations.constant import DEFAULT_HEADER
from pyrecdp.primitives.operations.text_reader import TextReader
from pyrecdp.primitives.operations.logging_utils import logger
from pyrecdp.core.import_utils import check_availability_and_install
from pyrecdp.primitives.operations.text_reader import TextReader


class DocumentLoader(TextReader):
def __init__(self,
Expand Down Expand Up @@ -143,16 +145,17 @@ def _get_loader(self) -> Callable[[], List[Document]]:

LLMOPERATORS.register(DirectoryLoader)


class YoutubeLoader(TextReader):
def __init__(self, urls: List[str], save_dir: str = None, model = 'small', **kwargs):
def __init__(self, urls: List[str], save_dir: str = None, model='small', **kwargs):
"""
Loads documents from a directory or a list of Youtube URLs.
Args:
urls: The list of Youtube video urls.
save_dir: The directory to save loaded Youtube audio, will remove the tmp file if save_dir is None.
model: The name of the whisper model, check the available ones using whisper.available_models().
"""
"""
settings = {
'urls': urls,
'save_dir': save_dir,
Expand All @@ -162,7 +165,7 @@ def __init__(self, urls: List[str], save_dir: str = None, model = 'small', **kwa
self.urls = urls
self.save_dir = save_dir
self.model_name = model

def _load(self):
import os
import tempfile
Expand All @@ -181,7 +184,7 @@ def _load(self):
audio_paths = {}
for url, blob in zip(self.urls[::-1], loader.yield_blobs()):
audio_paths[url] = str(blob.path)
import os
import os
os.system("apt-get -qq -y install ffmpeg")
check_availability_and_install('openai-whisper')
import whisper
Expand All @@ -192,9 +195,9 @@ def _load(self):
finally:
if use_temp_dir:
shutil.rmtree(save_dir)

return docs

def load_documents(self):
return [{'text': doc.text, 'metadata': doc.metadata} for doc in self._load()]

Expand All @@ -211,8 +214,10 @@ def process_spark(self, spark, spark_df=None):
self.cache = self.union_spark_df(spark_df, self.cache)
return self.cache


LLMOPERATORS.register(YoutubeLoader)


def create_doc_from_html_to_md(page_url, html_text):
import_markdownify()
import markdownify
Expand Down Expand Up @@ -382,3 +387,108 @@ def process_spark(self, spark, spark_df=None):


LLMOPERATORS.register(UrlLoader)


class RecursiveUrlLoader(TextReader):
def __init__(
self,
urls: Union[str | List[str]],
max_depth: Optional[int] = 2,
use_async: Optional[bool] = None,
extractor: Optional[Callable[[str], str]] = None,
metadata_extractor: Optional[Callable[[str, str], str]] = None,
exclude_dirs: Optional[Sequence[str]] = (),
timeout: Optional[int] = 10,
prevent_outside: bool = True,
link_regex: Union[str, re.Pattern, None] = None,
headers: Optional[dict] = None,
check_response_status: bool = False,
requirements=None,
) -> None:
"""Initialize with URL to crawl and any subdirectories to exclude.
Args:
urls: The URLS to crawl.
max_depth: The max depth of the recursive loading.
use_async: Whether to use asynchronous loading.
If True, this function will not be lazy, but it will still work in the
expected way, just not lazy.
extractor: A function to extract document contents from raw html.
When extract function returns an empty string, the document is
ignored. Default extractor will attempt to use BeautifulSoup4 to extract the text
metadata_extractor: A function to extract metadata from raw html and the
source url (args in that order). Default extractor will attempt
to use BeautifulSoup4 to extract the title, description and language
of the page.
exclude_dirs: A list of subdirectories to exclude.
timeout: The timeout for the requests, in the unit of seconds. If None then
connection will not timeout.
prevent_outside: If True, prevent loading from urls which are not children
of the root url.
link_regex: Regex for extracting sub-links from the raw html of a web page.
check_response_status: If True, check HTTP response status and skip
URLs with error responses (400-599).
"""
if requirements is None:
requirements = []

if extractor is None:
from bs4 import BeautifulSoup
extractor = lambda x: BeautifulSoup(x, "html.parser").text

settings = {
'urls': urls,
'max_depth': max_depth,
'use_async': use_async,
'extractor': extractor,
'metadata_extractor': metadata_extractor,
'exclude_dirs': exclude_dirs,
'timeout': timeout,
'prevent_outside': prevent_outside,
'link_regex': link_regex,
'headers': headers,
'check_response_status': check_response_status,
}
super().__init__(settings, requirements=['bs4', 'langchain'])
self.support_spark = True
self.support_ray = True

from langchain.document_loaders import RecursiveUrlLoader as LCRecursiveURLLoader
if isinstance(urls, str):
urls = [urls]

urls = set(urls)

self.loaders = [LCRecursiveURLLoader(
url,
max_depth=max_depth,
use_async=use_async,
extractor=extractor,
metadata_extractor=metadata_extractor,
exclude_dirs=exclude_dirs,
timeout=timeout,
prevent_outside=prevent_outside,
link_regex=link_regex,
headers=headers,
check_response_status=check_response_status,
) for url in urls]

def load_documents(self):
return [{'text': doc.page_content, 'metadata': doc.metadata} for loader in self.loaders for doc in
loader.load()]

def process_rayds(self, ds=None):
import ray
self.cache = ray.data.from_items(self.load_documents())
if ds is not None:
self.cache = self.union_ray_ds(ds, self.cache)
return self.cache

def process_spark(self, spark, spark_df=None):
self.cache = spark.createDataFrame(self.load_documents())
if spark_df is not None:
self.cache = self.union_spark_df(spark_df, self.cache)
return self.cache


LLMOPERATORS.register(RecursiveUrlLoader)
28 changes: 12 additions & 16 deletions RecDP/pyrecdp/primitives/operations/text_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,21 +195,17 @@ def do_persist(self, ds, **kwargs):
exclude_keys = ['db_handler', 'return_db_handler']
vector_store_args = dict((k, v) for k, v in self.vector_store_args.items() if k not in exclude_keys)
if isinstance(ds, Dataset):
class BatchIndexer:
def __init__(self, text_column: str, vector_store_args: Optional[Dict[str, Any]]):
from haystack.document_stores import ElasticsearchDocumentStore
self.text_column = text_column
self.vector_store_args = vector_store_args
self.elasticsearch = ElasticsearchDocumentStore(
**vector_store_args
)

def __call__(self, batch):
from haystack import Document as SDocument
documents = [SDocument(content=text) for text in batch[self.text_column]]
self.elasticsearch.write_documents(documents)

ds.map_batches(BatchIndexer, fn_constructor_kwargs=vector_store_args)
def batch_index(batch, text_column,vector_store_args: Optional[Dict[str, Any]]):
from haystack.document_stores import ElasticsearchDocumentStore
elasticsearch = ElasticsearchDocumentStore(
**vector_store_args
)
from haystack import Document as SDocument
documents = [SDocument(content=text) for text in batch[text_column]]
elasticsearch.write_documents(documents)

return {}
ds.map_batches(lambda batch: batch_index(batch,self.text_column, vector_store_args)).count()
else:
def batch_index_with_var(batch, bv_value):
from haystack import Document as SDocument
Expand All @@ -223,7 +219,7 @@ def batch_index_with_var(batch, bv_value):

ds = cast(DataFrame, ds)

bv = ds.sparkSession.sparkContext.broadcast((self.text_column, self.vector_store_args))
bv = ds.sparkSession.sparkContext.broadcast((self.text_column, vector_store_args))
ds.foreachPartition(lambda p: batch_index_with_var(p, bv))

# share this document store only when rag retrieval want to use document store created from index stage
Expand Down
19 changes: 19 additions & 0 deletions RecDP/tests/test_llmutils_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,3 +499,22 @@ def test_spell_correct_spark(self):
op = TextSpellCorrect()
with SparkContext("tests/data/llm_data/tiny_c4_sample_10.jsonl") as ctx:
ctx.show(op.process_spark(ctx.spark, ctx.ds))


def test_recursive_url_loader_spark(self):
urls = ['https://app.cnvrg.io/docs/',
'https://app.cnvrg.io/docs/core_concepts/python_sdk_v2.html',
'https://app.cnvrg.io/docs/cli_v2/cnvrgv2_cli.html',
'https://app.cnvrg.io/docs/collections/tutorials.html']
op = RecursiveUrlLoader(urls, max_depth=2)
with SparkContext("tests/data/llm_data/tiny_c4_sample.jsonl") as ctx:
ctx.show(op.process_spark(ctx.spark))

def test_recursive_url_loader_ray(self):
urls = ['https://app.cnvrg.io/docs/',
'https://app.cnvrg.io/docs/core_concepts/python_sdk_v2.html',
'https://app.cnvrg.io/docs/cli_v2/cnvrgv2_cli.html',
'https://app.cnvrg.io/docs/collections/tutorials.html']
op = RecursiveUrlLoader(urls, max_depth=2)
with RayContext("tests/data/llm_data/tiny_c4_sample.jsonl") as ctx:
ctx.show(op.process_rayds())
10 changes: 8 additions & 2 deletions RecDP/tests/test_llmutils_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,15 @@ def test_llm_rag_pdf_use_existing_db_pipeline(self):
display(ret)

def test_llm_rag_pipeline_cnvrg(self):
from pyrecdp.primitives.operations import DocumentLoader,RAGTextFix,CustomerDocumentSplit,TextCustomerFilter,JsonlWriter
from pyrecdp.primitives.operations import RecursiveUrlLoader,RAGTextFix,CustomerDocumentSplit,TextCustomerFilter,JsonlWriter
from pyrecdp.LLM import TextPipeline

def prepare_nltk_model(model, lang):
import nltk
nltk.download('punkt')

from pyrecdp.core.model_utils import prepare_model
prepare_model(model_type="nltk", model_key="nltk_rag_cnvrg", prepare_model_func=prepare_nltk_model)
urls = ['https://app.cnvrg.io/docs/',
'https://app.cnvrg.io/docs/core_concepts/python_sdk_v2.html',
'https://app.cnvrg.io/docs/cli_v2/cnvrgv2_cli.html',
Expand Down Expand Up @@ -313,7 +319,7 @@ def chunk_doc(text,max_num_of_words):

pipeline = TextPipeline()
ops = [
DocumentLoader(loader='UnstructuredURLLoader', loader_args={'urls': urls}, requirements=['unstructured']),
RecursiveUrlLoader(urls, max_depth=2),
RAGTextFix(str_to_replace={'\n###': '', '\n##': '', '\n#': ''}, remove_extra_whitespace=True),
CustomerDocumentSplit(func=lambda text: text.split('# ')[1:]),
TextCustomerFilter(custom_filter),
Expand Down

0 comments on commit 6ea53f5

Please sign in to comment.