Skip to content
This repository has been archived by the owner on Jul 18, 2024. It is now read-only.

Commit

Permalink
[v1.2][ISSUE-306]Support add embedding to vector store. (#458)
Browse files Browse the repository at this point in the history
* Support add embedding to vector store.

.

* Integrate rag related operators into one.

.

* Wrap RAG pipeline into function for Easydata.

.

* Refine the document about RAG Data Pipeline and the rag notebook.

.

* Update test_llmutils_operations.py

---------

Co-authored-by: Chendi.Xue <[email protected]>
  • Loading branch information
yao531441 and xuechendi authored Nov 30, 2023
1 parent 32328fe commit 67ae0b7
Show file tree
Hide file tree
Showing 13 changed files with 287 additions and 245 deletions.
93 changes: 82 additions & 11 deletions RecDP/examples/notebooks/llmutils/rag_pipeline.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,19 @@
"id": "bMqBJ9eckIs6"
},
"source": [
"### 2. Set parameters"
"## 2. Set parameters according to your environment\n"
]
},
{
"cell_type": "markdown",
"source": [
"\n",
"### 2.1 Parametera about vector store.\n"
],
"metadata": {
"id": "nBa-OiRcQhLr"
}
},
{
"cell_type": "code",
"execution_count": null,
Expand All @@ -59,26 +69,84 @@
},
"outputs": [],
"source": [
"# Where to store vectore store data\n",
"out_dir=/content/vs_store\n",
"vector_store_type=\"FAISS\"\n",
"index_name=\"knowledge_db\"\n",
"\n",
"index_name=\"knowledge_db\""
]
},
{
"cell_type": "markdown",
"source": [
"### 2.2 Parametera about TextSplitter"
],
"metadata": {
"id": "PmgACKQzQv7z"
}
},
{
"cell_type": "code",
"source": [
"text_splitter = \"RecursiveCharacterTextSplitter\"\n",
"text_splitter_args = {\"chunk_size\": 500, \"chunk_overlap\": 0}\n",
"\n",
"target_urls = [\"https://www.intc.com/news-events/press-releases/detail/1655/intel-reports-third-quarter-2023-financial-results\"]\n",
"\n",
"text_splitter_args = {\"chunk_size\": 500, \"chunk_overlap\": 0}"
],
"metadata": {
"id": "tvXP1IysQyza"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"### 2.3 Parametera about Embedding"
],
"metadata": {
"id": "WrdD1PdBQ0ax"
}
},
{
"cell_type": "code",
"source": [
"embeddings_type=\"HuggingFaceEmbeddings\"\n",
"embeddings_args={'model_name': f\"sentence-transformers/all-mpnet-base-v2\"}"
]
],
"metadata": {
"id": "Pr_caSYPQ21R"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"### 2.4 Specify the data you need to process"
],
"metadata": {
"id": "YjMrPVCJQ56w"
}
},
{
"cell_type": "code",
"source": [
"# web data\n",
"target_urls = [\"https://www.intc.com/news-events/press-releases/detail/1655/intel-reports-third-quarter-2023-financial-results\"]\n",
"# or some file data\n",
"# data_path = \"/content/my_pdf_path\""
],
"metadata": {
"id": "lfl_tOq6Q5fY"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "JjTnnzw_kRVV"
},
"source": [
"## 3. Extract data and build a knowledge database"
"## 3. Use recdp to extract data and build a knowledge database"
]
},
{
Expand All @@ -102,7 +170,7 @@
"outputs": [],
"source": [
"from pyrecdp.LLM import TextPipeline\n",
"from pyrecdp.primitives.operations import Url_Loader, DocumentSplit, DocumentIngestion"
"from pyrecdp.primitives.operations import Url_Loader, DocumentSplit, DocumentIngestion, RAGTextFix"
]
},
{
Expand All @@ -128,7 +196,10 @@
"source": [
"pipeline = TextPipeline()\n",
"ops = [\n",
" Url_Loader(urls=target_urls, target_tag='div', target_attrs={'class': 'main-content'}),\n",
" Url_Loader(urls=target_urls),\n",
" # DirectoryLoader(data_path, glob=\"**/*.pdf\"),\n",
" # Use operators provided by Recdp to process the data\n",
" RAGTextFix(),\n",
" DocumentSplit(text_splitter=text_splitter, text_splitter_args=text_splitter_args),\n",
" DocumentIngestion(\n",
" vector_store=vector_store_type,\n",
Expand Down
13 changes: 11 additions & 2 deletions RecDP/pyrecdp/LLM/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,23 @@ pip install pyrecdp[LLM] --pre
### Data pipeline

#### 1. RAG Data Pipeline - Build from public HTML [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/intel/e2eAIOK/blob/main/RecDP/examples/notebooks/llmutils/rag_pipeline.ipynb)

```
Retrieval-augmented generation (RAG) for large language models (LLMs) aims to improve prediction quality by using an external datastore at inference time to build a richer prompt that includes some combination of context, history, and recent/relevant knowledge (RAG LLMs).
Recdp LLM can provide a pipeline for ingesting data from a source and indexing it. We mainly provide the following capabilities.
- **Load Data**: Load your data from source. You can use `UrlLoader` or `DirectoryLoader` for this.
- **Improve Data Quality**: Clean up text for LLM RAG to use. It mainly solves the problem of sentences being split by incorrect line breaks after parsing the file, removing special characters, fixing unicode errors, and so on.
- **Split Text**: `DocumentSplit` helps break large Documents into smaller chunks. This is useful for indexing data and make it better used by the model.
- **Vector Store**: In order to retrieve your data, We provide `DocumentIngestion` use a VectorStore and Embeddings model to store and index your data.

Here is a basic RAG Data Pipeline example:
```python
from pyrecdp.primitives.operations import *
from pyrecdp.LLM import TextPipeline

pipeline = TextPipeline()
ops = [
Url_Loader(urls=["https://www.intc.com/news-events/press-releases/detail/1655/intel-reports-third-quarter-2023-financial-results"], target_tag='div', target_attrs={'class': 'main-content'}),
# DirectoryLoader(files_path, glob="**/*.pdf"),
RAGTextFix(),
DocumentSplit(),
DocumentIngestion(
vector_store='FAISS',
Expand Down
83 changes: 83 additions & 0 deletions RecDP/pyrecdp/primitives/llmutils/rag_data_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import argparse
from typing import Optional, List

from pyrecdp.core.utils import Timer
from pyrecdp.primitives.operations.logging_utils import logger

from pyrecdp.LLM import TextPipeline
from pyrecdp.primitives.operations import UrlLoader, DocumentSplit, DocumentIngestion, RAGTextFix, DirectoryLoader


def rag_data_prepare(
files_path: str = None,
target_urls: List[str] = None,
text_splitter: str = "RecursiveCharacterTextSplitter",
text_splitter_args: Optional[dict] = None,
vs_output_dir: str = "recdp_vs",
vector_store_type: str = 'FAISS',
index_name: str = 'recdp_index',
embeddings_type: str = 'HuggingFaceEmbeddings',
embeddings_args: Optional[dict] = None,
):
if bool(files_path):
loader = DirectoryLoader(files_path, glob="**/*.pdf")
elif bool(target_urls):
loader = UrlLoader(urls=target_urls, target_tag='div')
else:
logger.error("You must specify at least one parameter in files_path and target_urls")
exit(1)
if text_splitter_args is None:
text_splitter_args = {"chunk_size": 500, "chunk_overlap": 0}
if embeddings_args is None:
embeddings_args = {'model_name': f"sentence-transformers/all-mpnet-base-v2"}
pipeline = TextPipeline()
ops = [
loader,
RAGTextFix(),
DocumentSplit(text_splitter=text_splitter, text_splitter_args=text_splitter_args),
DocumentIngestion(
vector_store=vector_store_type,
vector_store_args={
"output_dir": vs_output_dir,
"index": index_name
},
embeddings=embeddings_type,
embeddings_args=embeddings_args
),
]
pipeline.add_operations(ops)
pipeline.execute()


if __name__ == '__main__':
parser = argparse.ArgumentParser()
# data_files, dup_dir, ngram_size, num_perm, bands, ranges
# pipeline = minHashLSH_prepare(df, num_perm = 256, ngram_size = 6, bands = 9, ranges = 13)
parser.add_argument("--files_path", dest="files_path", type=str)
parser.add_argument("--target_urls", dest="target_urls", type=str)
parser.add_argument("--text_splitter", dest="text_splitter", type=str, default='RecursiveCharacterTextSplitter')
parser.add_argument("--vs_output_dir", dest="vs_output_dir", type=str, default='recdp_vs')
parser.add_argument("--vector_store_type", dest="vector_store_type", type=str, default='FAISS')
parser.add_argument("--index_name", dest="index_name", type=str, default='recdp_index')
parser.add_argument("--embeddings_type", dest="embeddings_type", type=str, default='HuggingFaceEmbeddings')
args = parser.parse_args()
files_path = args.files_path
if args.target_urls:
target_urls = args.target_urls.split(",")
else:
target_urls = []
text_splitter = args.text_splitter
vs_output_dir = args.vs_output_dir
vector_store_type = args.vector_store_type
index_name = args.index_name
embeddings_type = args.embeddings_type

with Timer(f"Process RAG data"):
rag_data_prepare(files_path=files_path,
target_urls=target_urls,
text_splitter=text_splitter,
vs_output_dir=vs_output_dir,
vector_store_type=vector_store_type,
index_name=index_name,
embeddings_type=embeddings_type,
)
8 changes: 2 additions & 6 deletions RecDP/pyrecdp/primitives/operations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from .text_normalize import TextNormalize
from .text_bytesize import TextBytesize
from .filter import *
from .text_fixer import TextFix
from .text_fixer import TextFix, RAGTextFix
from .text_language_identify import LanguageIdentify
from .text_split import DocumentSplit, ParagraphsTextSplitter
from .text_pii_remove import PIIRemoval
Expand All @@ -65,11 +65,7 @@
from .text_perplexity_score import TextPerplexityScore
from .random_select import RandomSelect
from .text_ingestion import DocumentIngestion
from .doc_loader import DirectoryLoader, DocumentLoader, Url_Loader
from .text_specific_chars_remove import TextSpecificCharsRemove
from .text_unicode_fixer import TextUnicodeFixer
from .text_whitespace_normalization import TextWhitespaceNormalization
from .text_sentence_resplit import TextSentenceResplit
from .doc_loader import DirectoryLoader, DocumentLoader, UrlLoader
from .text_to_qa import TextToQA
except:
pass
4 changes: 2 additions & 2 deletions RecDP/pyrecdp/primitives/operations/doc_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def load_html_to_md(page_url, target_tag: str = None, target_attrs: dict = None)
)


class Url_Loader(BaseLLMOperation):
class UrlLoader(BaseLLMOperation):
def __init__(self, urls: list = None, target_tag: str = None, target_attrs: dict = None,
args_dict: Optional[dict] = None):
settings = {
Expand Down Expand Up @@ -183,4 +183,4 @@ def process_spark(self, spark, spark_df=None):
return self.cache


LLMOPERATORS.register(Url_Loader)
LLMOPERATORS.register(UrlLoader)
74 changes: 72 additions & 2 deletions RecDP/pyrecdp/primitives/operations/text_fixer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
from ray.data import Dataset
from pyspark.sql import DataFrame

import os
from typing import List, Union
import re
from typing import Dict
from selectolax.parser import HTMLParser

from .constant import VARIOUS_WHITESPACES
from pyrecdp.core.model_utils import prepare_model, get_model

CPAT = re.compile("copyright", re.IGNORECASE)
PAT = re.compile("/\\*[^*]*\\*+(?:[^/*][^*]*\\*+)*/")

Expand Down Expand Up @@ -258,7 +261,7 @@ def process_rayds(self, ds: Dataset) -> Dataset:
if self.actual_func is None:
self.actual_func = get_fixer_by_type(self.text_type)
return ds.map(lambda x: self.process_row(x, self.text_key, new_name, self.actual_func))

def process_spark(self, spark, spark_df: DataFrame) -> DataFrame:
import pyspark.sql.functions as F
fix_by_type_udf = F.udf(get_fixer_by_type(self.text_type))
Expand All @@ -270,3 +273,70 @@ def process_spark(self, spark, spark_df: DataFrame) -> DataFrame:


LLMOPERATORS.register(TextFix)


class RAGTextFix(BaseLLMOperation):
def __init__(self, text_key='text', chars_to_remove: Union[str, List[str]] = '◆●■►▼▲▴∆▻▷❖♡□', language: str = 'en'):
"""
Clean up text for LLM RAG to use.
Step 1: Fix unicode errors in text using ftfy
Step 2: Normalize different kinds of whitespaces to whitespace ' ' (0x20) in text
Different kinds of whitespaces can be found here:
https://en.wikipedia.org/wiki/Whitespace_character
Step 3: Clean specific chars in text.
Step 4: Re segment sentences in the text to avoid sentence segmentation errors caused by unnecessary line breaks
:param language: Supported language. Default: en. (en)
:param chars_to_remove: Chars to remove. Default: '◆●■►▼▲▴∆▻▷❖♡□'
"""
settings = {'chars_to_remove': chars_to_remove, 'text_key': text_key, 'language': language}
super().__init__(settings)
self.support_spark = True
self.support_ray = True
self.text_key = text_key
self.inplace = True
self.chars_to_remove = chars_to_remove
self.language = language

def process_rayds(self, ds: Dataset) -> Dataset:
remover = self.get_compute_func()
new_ds = ds.map(lambda x: self.process_row(x, self.text_key, self.text_key, remover))
return new_ds

def process_spark(self, spark, spark_df: DataFrame) -> DataFrame:
import pyspark.sql.functions as F
custom_udf = F.udf(self.get_compute_func())
return spark_df.withColumn(self.text_key, custom_udf(F.col(self.text_key)))

def get_compute_func(self):
import ftfy
pattern = '[' + '|'.join(self.chars_to_remove) + ']'
model_key = prepare_model(lang=self.language, model_type='nltk')
nltk_model = get_model(model_key, lang=self.language, model_type='nltk')

def compute(text):
# fix unicode errors
text = ftfy.fix_text(text)
# normalize different kinds of whitespaces
text = ''.join([
char if char not in VARIOUS_WHITESPACES else ' ' for char in text
])
# clean specific chars in text.
text = re.sub(pattern=pattern, repl=r'',
string=text, flags=re.DOTALL)
# Re segment sentences
paragraph_break_pattern = "\\n\s*\\n"
replace_str = '*^*^*'
text = re.sub(pattern=paragraph_break_pattern, repl=replace_str,
string=text, flags=re.DOTALL)
sentences = nltk_model.tokenize(text)
new_sentences = []
for sentence in sentences:
new_sentences.append(sentence.replace("\n", " "))
new_text = ' '.join(new_sentences).replace(replace_str, "\n\n")
return new_text

return compute


LLMOPERATORS.register(RAGTextFix)
Loading

0 comments on commit 67ae0b7

Please sign in to comment.