From a6030de3443d6e6c559bc2612fe5267fcd1f8ed8 Mon Sep 17 00:00:00 2001 From: lgaliana Date: Wed, 20 Nov 2024 10:43:33 +0000 Subject: [PATCH] Gros cleaning --- src/chain_building/build_chain.py | 47 +--- src/chain_building/build_chain_validator.py | 10 +- src/data/anonymize.py | 5 +- src/data/create_evaluation_dataset.py | 14 +- src/data/insee_contact.py | 8 +- src/data/ner.py | 80 ------ src/data/to_s3.py | 5 +- src/data/utils.py | 4 +- src/db_building/build_database.py | 60 +---- src/db_building/corpus_building.py | 104 +++----- src/db_building/document_chunker.py | 22 +- src/db_building/loading.py | 38 +-- src/db_building/utils_db.py | 18 +- src/evaluation/basic_evaluation.py | 28 +- src/evaluation/eval_configuration.py | 20 +- src/evaluation/reranking_perf.py | 12 +- .../retrieval_evaluation_measures.py | 16 +- src/evaluation/retrieval_evaluator.py | 64 ++--- src/evaluation/utils.py | 74 ++---- src/insee_data_processing.py | 46 +--- .../custom_hf_pipeline.py | 74 ++---- src/legacy/reranking.py | 232 +++++++++++++++++ src/model_building/build_llm_model.py | 12 +- src/model_building/fetch_llm_model.py | 49 ++-- src/reranking/reranking_functions.py | 246 +----------------- src/results_logging/log_conversations.py | 4 +- src/utils/formatting_utilities.py | 6 +- 27 files changed, 426 insertions(+), 872 deletions(-) delete mode 100644 src/data/ner.py rename src/{model_building => legacy}/custom_hf_pipeline.py (74%) create mode 100644 src/legacy/reranking.py diff --git a/src/chain_building/build_chain.py b/src/chain_building/build_chain.py index 2833178..1cd901f 100644 --- a/src/chain_building/build_chain.py +++ b/src/chain_building/build_chain.py @@ -32,9 +32,7 @@ def format_docs(docs: list): # Define the compression function -def compress_documents_lambda( - documents: Sequence[Document], query: str, k: int = 5, **kwargs: dict[str, Any] -) -> Sequence[Document]: +def compress_documents_lambda(documents: Sequence[Document], query: str, k: int = 5, **kwargs: dict[str, Any]) -> Sequence[Document]: """Compress retrieved documents given the query context.""" # Initialize the retriever with the documents @@ -58,17 +56,13 @@ def build_chain( accepted_rerankers = [None, "BM25", "Cross-encoder", "Ensemble"] if reranker not in accepted_rerankers: - raise ValueError( - f"Invalid reranker: {reranker}. Accepted values are: {', '.join(accepted_rerankers)}" - ) + raise ValueError(f"Invalid reranker: {reranker}. Accepted values are: {', '.join(accepted_rerankers)}") # Define the retrieval reranker strategy if reranker is None: retrieval_agent = retriever elif reranker == "BM25": - retrieval_agent = RunnableParallel( - {"documents": retriever, "query": RunnablePassthrough()} - ) | RunnableLambda( + retrieval_agent = RunnableParallel({"documents": retriever, "query": RunnablePassthrough()}) | RunnableLambda( lambda r: compress_documents_lambda( documents=r["documents"], query=r["query"], @@ -77,17 +71,11 @@ def build_chain( ) elif reranker == "Cross-encoder": model = HuggingFaceCrossEncoder(model_name=RERANKER_CROSS_ENCODER) - compressor = CrossEncoderReranker( - model=model, top_n=number_candidates_reranking - ) - retrieval_agent = ContextualCompressionRetriever( - base_compressor=compressor, base_retriever=retriever - ) + compressor = CrossEncoderReranker(model=model, top_n=number_candidates_reranking) + retrieval_agent = ContextualCompressionRetriever(base_compressor=compressor, base_retriever=retriever) elif reranker == "Ensemble": # BM25 - reranker_1 = RunnableParallel( - {"documents": retriever, "query": RunnablePassthrough()} - ) | RunnableLambda( + reranker_1 = RunnableParallel({"documents": retriever, "query": RunnablePassthrough()}) | RunnableLambda( lambda r: compress_documents_lambda( documents=r["documents"], query=r["query"], @@ -99,31 +87,18 @@ def build_chain( model=HuggingFaceCrossEncoder(model_name=RERANKER_CROSS_ENCODER), top_n=number_candidates_reranking, ) - reranker_2 = ContextualCompressionRetriever( - base_compressor=compressor, base_retriever=retriever - ) + reranker_2 = ContextualCompressionRetriever(base_compressor=compressor, base_retriever=retriever) - retrieval_agent = EnsembleRetriever( - retrievers=[reranker_1, reranker_2], weigths=[1 / 2, 1 / 2] - ) + retrieval_agent = EnsembleRetriever(retrievers=[reranker_1, reranker_2], weigths=[1 / 2, 1 / 2]) else: raise ValueError(f"Reranking method {reranker} is not implemented.") if llm is not None: # Create a Langchain LLM Chain - chain = ( - RunnablePassthrough.assign(context=(lambda x: format_docs(x["context"]))) - | prompt - | llm - | StrOutputParser() - ) - rag_chain_with_source = RunnableParallel( - {"context": retrieval_agent, "question": RunnablePassthrough()} - ).assign(answer=chain) + chain = RunnablePassthrough.assign(context=(lambda x: format_docs(x["context"]))) | prompt | llm | StrOutputParser() + rag_chain_with_source = RunnableParallel({"context": retrieval_agent, "question": RunnablePassthrough()}).assign(answer=chain) else: # retriever mode - rag_chain_with_source = RunnableParallel( - {"context": retrieval_agent, "question": RunnablePassthrough()} - ) + rag_chain_with_source = RunnableParallel({"context": retrieval_agent, "question": RunnablePassthrough()}) return rag_chain_with_source diff --git a/src/chain_building/build_chain_validator.py b/src/chain_building/build_chain_validator.py index c6931c6..72eb18e 100644 --- a/src/chain_building/build_chain_validator.py +++ b/src/chain_building/build_chain_validator.py @@ -48,13 +48,7 @@ def build_chain_validator(evaluator_llm=None, tokenizer=None): defining a chain to check if a given query is related to INSEE expertise. """ - prompt_template = tokenizer.apply_chat_template( - EVAL_TEMPLATE, tokenize=False, add_generation_prompt=True - ) + prompt_template = tokenizer.apply_chat_template(EVAL_TEMPLATE, tokenize=False, add_generation_prompt=True) prompt = PromptTemplate(template=prompt_template, input_variables=["query"]) - return ( - prompt - | evaluator_llm - | RunnableLambda(func=lambda generation: generation.lower().find("oui") != -1) - ) + return prompt | evaluator_llm | RunnableLambda(func=lambda generation: generation.lower().find("oui") != -1) diff --git a/src/data/anonymize.py b/src/data/anonymize.py index d2b6743..32815f3 100644 --- a/src/data/anonymize.py +++ b/src/data/anonymize.py @@ -6,6 +6,7 @@ import re import pandas as pd + from utils import fs @@ -86,9 +87,7 @@ def anonymize_insee_contact_message(message: str, message_ner: list[dict]) -> st if dictionary["entity_group"] == "PER": message = message.replace(dictionary["word"], "[PER]") elif dictionary["signature"]: - message = message.replace( - dictionary["word"], f"[{dictionary['entity_group']}]" - ) + message = message.replace(dictionary["word"], f"[{dictionary['entity_group']}]") # Identification of email addresses email_regex = r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+" diff --git a/src/data/create_evaluation_dataset.py b/src/data/create_evaluation_dataset.py index c0e718d..49f13bb 100644 --- a/src/data/create_evaluation_dataset.py +++ b/src/data/create_evaluation_dataset.py @@ -9,6 +9,7 @@ import pandas as pd from constants import LS_ANNOTATIONS_PATH + from utils import fs @@ -28,10 +29,7 @@ def create_insee_contact_eval_dataset(): entry_urls = [] # Parse annotations for result_element in annotation_data["result"]: - if ( - result_element["from_name"] == "keep_pair" - and "O" in result_element["value"]["choices"] - ): + if result_element["from_name"] == "keep_pair" and "O" in result_element["value"]["choices"]: keep_pair = True elif result_element["from_name"] == "urls": entry_urls += result_element["value"]["text"] @@ -43,12 +41,8 @@ def create_insee_contact_eval_dataset(): answers.append(answer) urls.append("|".join(entry_urls)) - with fs.open( - "projet-llm-insee-open-data/data/eval_data/eval_dataset_insee_contact.csv", "w" - ) as f: - pd.DataFrame({"questions": questions, "answers": answers, "urls": urls}).to_csv( - f, index=False - ) + with fs.open("projet-llm-insee-open-data/data/eval_data/eval_dataset_insee_contact.csv", "w") as f: + pd.DataFrame({"questions": questions, "answers": answers, "urls": urls}).to_csv(f, index=False) if __name__ == "__main__": diff --git a/src/data/insee_contact.py b/src/data/insee_contact.py index bc24389..8762302 100644 --- a/src/data/insee_contact.py +++ b/src/data/insee_contact.py @@ -12,9 +12,7 @@ def process_insee_contact_data(path: str): """ Process raw Insee contact data. """ - fs = s3fs.S3FileSystem( - client_kwargs={"endpoint_url": "https://" + os.environ["AWS_S3_ENDPOINT"]} - ) + fs = s3fs.S3FileSystem(client_kwargs={"endpoint_url": "https://" + os.environ["AWS_S3_ENDPOINT"]}) with fs.open(path) as f: df = pd.read_csv(f) @@ -26,9 +24,7 @@ def process_insee_contact_data(path: str): df_eval = df.sample(200, random_state=42) # Save to s3 - with fs.open( - "projet-llm-insee-open-data/data/insee_contact/data_2019_eval.csv", "w" - ) as f: + with fs.open("projet-llm-insee-open-data/data/insee_contact/data_2019_eval.csv", "w") as f: df_eval.to_csv(f, index=False) diff --git a/src/data/ner.py b/src/data/ner.py deleted file mode 100644 index 0b10422..0000000 --- a/src/data/ner.py +++ /dev/null @@ -1,80 +0,0 @@ -""" -Apply named entity recognition to data in order to identify: - - names; - - addresses; - - email addresses; -""" - -import json - -import numpy as np -import pandas as pd -from transformers import TokenClassificationPipeline, pipeline -from utils import fs - -ner = pipeline( - task="ner", - model="cmarkea/distilcamembert-base-ner", - tokenizer="cmarkea/distilcamembert-base-ner", - aggregation_strategy="simple", -) - - -def custom_ner(ner_pipeline: TokenClassificationPipeline, text: str) -> list[dict]: - """ - Ner except return empty string if text is empty. - - Args: - ner_pipeline (TokenClassificationPipeline): NER pipeline. - text (str): Text. - - Returns: - Dict: NER output. - """ - if (text == "") or (text is None): - return "" - else: - return ner(text) - - -def ner_series(strings: pd.Series) -> pd.Series: - """ - Apply named entity recognition to data in order to identify: - - - PER: personality; - - LOC: location; - - ORG: organization; - - MISC: miscellaneous entities (movies title, books, etc.); - - O: background (Outside entity). - - Args: - strings (pd.Series): Series of strings to apply NER on. - Returns: - pd.Series: Series of NER outputs. - """ - output = strings.apply(lambda x: custom_ner(ner, x)) - for sublist in output: - for dictionary in sublist: - for key, value in dictionary.items(): - if isinstance(value, np.float32): - dictionary[key] = float(value) - return output - - -if __name__ == "__main__": - # Apply NER on data from Insee Contact - path = "projet-llm-insee-open-data/data/insee_contact/data_2019_eval.csv" - with fs.open(path) as f: - df = pd.read_csv(f) - - # Save NER outputs - with fs.open( - "projet-llm-insee-open-data/data/insee_contact/ner/data_2019_eval_exchange1_ner.json", - "w", - ) as fp: - json.dump(ner_series(df["Exchange1"].fillna("")).to_list(), fp) - with fs.open( - "projet-llm-insee-open-data/data/insee_contact/ner/data_2019_eval_exchange2_ner.json", - "w", - ) as fp: - json.dump(ner_series(df["Exchange2"].fillna("")).to_list(), fp) diff --git a/src/data/to_s3.py b/src/data/to_s3.py index fa66d07..86305ca 100644 --- a/src/data/to_s3.py +++ b/src/data/to_s3.py @@ -10,6 +10,7 @@ from anonymize import anonymize_insee_contact_message from constants import LS_DATA_PATH, RAW_DATA from ner import ner_series + from utils import create_ls_task, fs @@ -38,9 +39,7 @@ def insee_contact_to_s3(): anonymized_answers.append(anonymize_insee_contact_message(message, ner)) # Json tasks creation - for idx, (question, answer) in enumerate( - zip(anonymized_questions, anonymized_answers, strict=False) - ): + for idx, (question, answer) in enumerate(zip(anonymized_questions, anonymized_answers, strict=False)): ls_task = create_ls_task(question, answer) with fs.open(LS_DATA_PATH + f"{idx}.json", "w") as f: json.dump(ls_task, f) diff --git a/src/data/utils.py b/src/data/utils.py index c026698..963a268 100644 --- a/src/data/utils.py +++ b/src/data/utils.py @@ -6,9 +6,7 @@ import s3fs -fs = s3fs.S3FileSystem( - client_kwargs={"endpoint_url": "https://" + os.environ["AWS_S3_ENDPOINT"]} -) +fs = s3fs.S3FileSystem(client_kwargs={"endpoint_url": "https://" + os.environ["AWS_S3_ENDPOINT"]}) def create_ls_task( diff --git a/src/db_building/build_database.py b/src/db_building/build_database.py index b0df806..adc573c 100644 --- a/src/db_building/build_database.py +++ b/src/db_building/build_database.py @@ -1,11 +1,10 @@ -import logging import gc -import s3fs +import logging +import s3fs from chromadb.config import Settings -from langchain_huggingface import HuggingFaceEmbeddings from langchain_community.vectorstores import Chroma - +from langchain_huggingface import HuggingFaceEmbeddings from src.config import ( CHROMA_DB_LOCAL_DIRECTORY, @@ -15,47 +14,13 @@ S3_BUCKET, ) - from .corpus_building import ( - build_or_use_from_cache, DEFAULT_LOCATIONS, + DEFAULT_LOCATIONS, + build_or_use_from_cache, ) from .utils_db import split_list -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" -) - - -def parse_collection_name(collection_name: str): - """ - Parse a concatenated string to extract the embedding model name, chunk size, and overlap size. - :param concatenated_string: A string in the format 'embeddingmodelname_chunkSize_overlapSize' - :return: A dictionary with the parsed values - """ - try: - # Split the string by the underscore delimiter - parts = collection_name.split("_") - - # Ensure there are exactly three parts - if len(parts) != 3: - raise ValueError( - "String format is incorrect. Expected format: 'modelname_chunkSize_overlapSize'" - ) - - # Extract and assign the parts - model_name = parts[0] - chunk_size = int(parts[1]) - overlap_size = int(parts[2]) - - # Return the parsed values in a dictionary - return { - "model_name": model_name, - "chunk_size": chunk_size, - "overlap_size": overlap_size, - } - except Exception as e: - print(f"Error parsing string: {e}") - return None +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") # BUILD VECTOR DATABASE FROM COLLECTION ------------------------- @@ -69,7 +34,6 @@ def build_vector_database( location_dataset: dict = DEFAULT_LOCATIONS, **kwargs, ) -> Chroma: - logging.info(f"The database will temporarily be stored in {persist_directory}") logging.info("Start building the database") @@ -82,7 +46,7 @@ def build_vector_database( s3_bucket=s3_bucket, location_dataset=location_dataset, model_id=model_id, - **kwargs + **kwargs, ) logging.info("Document chunking is over, starting to embed them") @@ -151,15 +115,13 @@ def reload_database_from_local_dir( embedding_function=emb_model, ) - logging.info( - f"The database (collection {collection_name}) " - f"has been reloaded from directory {persist_directory}" - ) + logging.info(f"The database (collection {collection_name}) " f"has been reloaded from directory {persist_directory}") return db # LOAD RETRIEVER ------------------------------- + def load_retriever( emb_model_name, vectorstore=None, @@ -186,7 +148,5 @@ def load_retriever( search_kwargs = retriever_params.get("search_kwargs", {"k": 20}) # Set up a retriever - retriever = vectorstore.as_retriever( - search_type="similarity", search_kwargs=search_kwargs - ) + retriever = vectorstore.as_retriever(search_type="similarity", search_kwargs=search_kwargs) return retriever, vectorstore diff --git a/src/db_building/corpus_building.py b/src/db_building/corpus_building.py index e2e9236..95ba318 100644 --- a/src/db_building/corpus_building.py +++ b/src/db_building/corpus_building.py @@ -1,27 +1,19 @@ import logging -from typing import Tuple -import pandas as pd -import s3fs - import typing as t + import jsonlines +import pandas as pd +import s3fs from langchain.schema import Document from src.config import ( - CHROMA_DB_LOCAL_DIRECTORY, - COLLECTION_NAME, - EMB_DEVICE, - EMB_MODEL_NAME, S3_BUCKET, ) -from .utils_db import parse_xmls from .document_chunker import chunk_documents +from .utils_db import parse_xmls - -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" -) +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") # PARAMETERS --------------------------------------------------- @@ -30,10 +22,7 @@ DEFAULT_RMES_PATH = "data/raw_data/applishare_solr_joined.parquet" -DEFAULT_LOCATIONS = { - "web4g_data": DEFAULT_WEB4G_PATH, - "rmes_data": DEFAULT_RMES_PATH -} +DEFAULT_LOCATIONS = {"web4g_data": DEFAULT_WEB4G_PATH, "rmes_data": DEFAULT_RMES_PATH} # CHUNKING ALL DATASET --------------------------------------------------- @@ -44,36 +33,36 @@ def preprocess_and_store_data( s3_bucket: str, location_dataset: dict = DEFAULT_LOCATIONS, model_id: str = "OrdalieTech/Solon-embeddings-large-0.1", - **kwargs -) -> Tuple[pd.DataFrame, t.Iterable[Document]]: + **kwargs, +) -> tuple[pd.DataFrame, t.Iterable[Document]]: """ Process data from S3, chunk the documents, and store them in an intermediate location. - + Parameters: - filesystem: s3fs.S3FileSystem object for interacting with S3. - s3_bucket: str, the name of the S3 bucket. - location_dataset: dict, paths to the main data files in the S3 bucket. - kwargs: Optional keyword arguments for data processing and chunking (e.g., 'max_pages', 'chunk_overlap', 'chunk_size'). - + Returns: - Tuple containing the processed DataFrame and chunked documents (list of Document objects). """ - + # Handle data loading, parsing, and splitting df, all_splits = _preprocess_data( model_id=model_id, filesystem=filesystem, s3_bucket=s3_bucket, location_dataset=location_dataset, - **kwargs + **kwargs, ) logging.info("Saving chunked documents in an intermediate location") # Extract parameters from kwargs - chunk_overlap = kwargs.get('chunk_overlap', None) - chunk_size = kwargs.get('chunk_size', None) - max_pages = kwargs.get('max_pages', None) + chunk_overlap = kwargs.get("chunk_overlap") + chunk_size = kwargs.get("chunk_size") + max_pages = kwargs.get("max_pages") # Create the storage path for intermediate data data_intermediate_storage = _s3_path_intermediate_collection( @@ -81,18 +70,15 @@ def preprocess_and_store_data( model_id=model_id, chunk_overlap=chunk_overlap, chunk_size=chunk_size, - max_pages=max_pages + max_pages=max_pages, ) - # Save chunked documents to JSONL using S3 and s3fs save_docs_to_jsonl(all_splits, data_intermediate_storage, filesystem) # Save the DataFrame as a Parquet file in the same directory parquet_file_path = data_intermediate_storage.replace("docs.jsonl", "corpus.parquet") - df.loc[:, ~df.columns.isin(["dateDiffusion"])] = ( - df.loc[:, ~df.columns.isin(["dateDiffusion"])].astype(str) - ) + df.loc[:, ~df.columns.isin(["dateDiffusion"])] = df.loc[:, ~df.columns.isin(["dateDiffusion"])].astype(str) df.to_parquet(parquet_file_path, filesystem=filesystem, index=False) logging.info(f"DataFrame saved to {parquet_file_path}") @@ -103,18 +89,18 @@ def _preprocess_data( filesystem: s3fs.S3FileSystem, s3_bucket: str = S3_BUCKET, location_dataset: dict = DEFAULT_LOCATIONS, - **kwargs + **kwargs, ): """ - Process and merge data from multiple parquet sources, parse XML content, + Process and merge data from multiple parquet sources, parse XML content, and split the documents into chunks for embedding. - + Parameters: - s3_bucket: str, the S3 bucket name. - filesystem: object, the filesystem handler (e.g., s3fs). - location_dataset: dict, path to the main data files in the S3 bucket. - kwargs: optional keyword arguments to control parser behavior, including 'max_pages' for limiting the number of rows. - + Returns: - all_splits: list or DataFrame containing the processed and chunked documents. """ @@ -122,7 +108,7 @@ def _preprocess_data( # Defining data locations web4g_path = location_dataset.get("web4g_data", DEFAULT_WEB4G_PATH) data_path_rmes = location_dataset.get("rmes_data", DEFAULT_RMES_PATH) - + # Load main data from parquet file logging.info(f"Input data extracted from s3://{s3_bucket}") data = pd.read_parquet(f"s3://{s3_bucket}/{web4g_path}", filesystem=filesystem) @@ -135,9 +121,7 @@ def _preprocess_data( parsed_pages = parse_xmls(data) # Merge parsed XML data with the original data - df = data.set_index("id").merge( - pd.DataFrame(parsed_pages), left_index=True, right_index=True - ) + df = data.set_index("id").merge(pd.DataFrame(parsed_pages), left_index=True, right_index=True) # Select relevant columns df = df[ @@ -179,11 +163,11 @@ def build_or_use_from_cache( s3_bucket: str, location_dataset: dict = DEFAULT_LOCATIONS, force_rebuild: bool = False, - **kwargs -) -> Tuple[pd.DataFrame, t.Iterable[Document]]: + **kwargs, +) -> tuple[pd.DataFrame, t.Iterable[Document]]: """ Either load the chunked documents and DataFrame from cache or process and store the data. - + Parameters: - filesystem: s3fs.S3FileSystem object for interacting with S3. - s3_bucket: str, the name of the S3 bucket. @@ -196,9 +180,9 @@ def build_or_use_from_cache( """ # Extract parameters from kwargs - chunk_overlap = kwargs.get('chunk_overlap', None) - chunk_size = kwargs.get('chunk_size', None) - max_pages = kwargs.get('max_pages', None) + chunk_overlap = kwargs.get("chunk_overlap") + chunk_size = kwargs.get("chunk_size") + max_pages = kwargs.get("max_pages") # Create the storage path for intermediate data data_intermediate_storage = _s3_path_intermediate_collection( @@ -206,7 +190,7 @@ def build_or_use_from_cache( model_id=model_id, chunk_overlap=chunk_overlap, chunk_size=chunk_size, - max_pages=max_pages + max_pages=max_pages, ) # Create the Parquet file path (corpus.parquet) @@ -219,7 +203,7 @@ def build_or_use_from_cache( logging.info(f"Attempting to load chunked documents from {data_intermediate_storage}") all_splits = load_docs_from_jsonl(data_intermediate_storage, filesystem) logging.info("Loaded chunked documents from cache") - + # Attempt to load the cached DataFrame from S3 logging.info(f"Attempting to load DataFrame from {parquet_file_path}") df = pd.read_parquet(parquet_file_path, filesystem=filesystem) @@ -237,13 +221,12 @@ def build_or_use_from_cache( s3_bucket=s3_bucket, location_dataset=location_dataset, model_id=model_id, - **kwargs + **kwargs, ) return df, all_splits - # PATH BUILDER FOR S3 STORAGE ------------------------------------ @@ -252,17 +235,17 @@ def _s3_path_intermediate_collection( model_id: str = "OrdalieTech/Solon-embeddings-large-0.1", chunk_overlap: int = None, chunk_size: int = None, - max_pages: int = None + max_pages: int = None, ) -> str: """ Build the intermediate storage path for chunked documents on S3. - + Parameters: - s3_bucket: str, the name of the S3 bucket. - chunk_overlap: int, the chunk overlap value (can be None). - chunk_size: int, the chunk size (can be None). - max_pages: int, the maximum number of pages (can be None). - + Returns: - str: The constructed S3 path for saving the chunked documents. """ @@ -273,6 +256,7 @@ def _s3_path_intermediate_collection( "docs.jsonl" ) + # SAVE AND LOAD DOCUMENTS AS JSON --------------------------------- @@ -284,10 +268,9 @@ def save_docs_to_jsonl(documents: t.Iterable[Document], file_path: str, fs: s3fs :param file_path: The S3 path where the JSONL file will be saved (e.g., "s3://bucket-name/path/to/file.jsonl"). :param fs: s3fs.S3FileSystem object for handling S3 file operations. """ - with fs.open(file_path, mode="w") as f: - with jsonlines.Writer(f) as writer: - for doc in documents: - writer.write(doc.dict()) # Assuming Document has a .dict() method + with fs.open(file_path, mode="w") as f, jsonlines.Writer(f) as writer: + for doc in documents: + writer.write(doc.dict()) # Assuming Document has a .dict() method def load_docs_from_jsonl(file_path: str, fs: s3fs.S3FileSystem) -> t.Iterable[Document]: @@ -299,8 +282,7 @@ def load_docs_from_jsonl(file_path: str, fs: s3fs.S3FileSystem) -> t.Iterable[Do :return: Iterable of Document objects loaded from the JSONL file. """ documents = [] - with fs.open(file_path, mode="r") as f: - with jsonlines.Reader(f) as reader: - for doc in reader: - documents.append(Document(**doc)) # Assuming Document can be instantiated from a dict - return documents \ No newline at end of file + with fs.open(file_path, mode="r") as f, jsonlines.Reader(f) as reader: + for doc in reader: + documents.append(Document(**doc)) # Assuming Document can be instantiated from a dict + return documents diff --git a/src/db_building/document_chunker.py b/src/db_building/document_chunker.py index c9aef0f..92d7ed6 100644 --- a/src/db_building/document_chunker.py +++ b/src/db_building/document_chunker.py @@ -47,9 +47,7 @@ def chunk_documents( logging.info("Applying markdown spliter") if kwargs.get("markdown_split", False): - markdown_splitter = MarkdownHeaderTextSplitter( - headers_to_split_on=HEADERS_TO_SPLIT_ON, strip_headers=False - ) + markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=HEADERS_TO_SPLIT_ON, strip_headers=False) document_list = make_md_splits(document_list, markdown_splitter) logging.info("Initializing token splitter") @@ -68,9 +66,7 @@ def chunk_documents( unique_texts.add(doc.page_content) docs_processed_unique.append(doc) - logging.info( - f"Number of created chunks: {len(docs_processed_unique)} in the Vector Database" - ) + logging.info(f"Number of created chunks: {len(docs_processed_unique)} in the Vector Database") return docs_processed_unique @@ -88,7 +84,7 @@ def compute_autokenizer_chunk_size(hf_tokenizer_name: str, **kwargs) -> tuple: """ logging.info(f"Using model {hf_tokenizer_name} to tokenize text") - hf_token = kwargs.get("HF_token", os.environ.get("HF_token", None)) + hf_token = kwargs.get("HF_TOKEN", os.environ.get("HF_TOKEN", None)) # Load the tokenizer autokenizer = AutoTokenizer.from_pretrained(hf_tokenizer_name, token=hf_token) @@ -118,9 +114,7 @@ def get_text_splitter(**kwargs) -> tuple[RecursiveCharacterTextSplitter, dict]: """ if kwargs.get("use_tokenizer_to_chunk", True): - autokenizer, chunk_size, chunk_overlap = compute_autokenizer_chunk_size( - kwargs.get("embedding_model") - ) + autokenizer, chunk_size, chunk_overlap = compute_autokenizer_chunk_size(kwargs.get("embedding_model")) text_splitter = RecursiveCharacterTextSplitter.from_huggingface_tokenizer( autokenizer, @@ -130,9 +124,7 @@ def get_text_splitter(**kwargs) -> tuple[RecursiveCharacterTextSplitter, dict]: ) else: if kwargs.get("chunk_size") is None or kwargs.get("chunk_overlap") is None: - raise ValueError( - "chunk_size and chunk_overlap must be specified if use_tokenizer_to_chunk is set to True" - ) + raise ValueError("chunk_size and chunk_overlap must be specified if use_tokenizer_to_chunk is set to True") text_splitter = RecursiveCharacterTextSplitter( chunk_size=kwargs.get("chunk_size"), @@ -143,9 +135,7 @@ def get_text_splitter(**kwargs) -> tuple[RecursiveCharacterTextSplitter, dict]: return text_splitter -def make_md_splits( - document_list: list[Document], markdown_splitter: MarkdownHeaderTextSplitter -) -> list[Document]: +def make_md_splits(document_list: list[Document], markdown_splitter: MarkdownHeaderTextSplitter) -> list[Document]: """ Splits the content of each document in the document list based on Markdown headers, and preserves the original metadata in each split section. diff --git a/src/db_building/loading.py b/src/db_building/loading.py index 25d8c16..d5a740d 100644 --- a/src/db_building/loading.py +++ b/src/db_building/loading.py @@ -1,5 +1,5 @@ -import os import logging +import os import tempfile import warnings @@ -11,7 +11,6 @@ from src.config import S3_BUCKET from .build_database import reload_database_from_local_dir -from .corpus_building import save_docs_to_jsonl, load_docs_from_jsonl def load_vector_database(filesystem: s3fs.S3FileSystem, **kwargs) -> Chroma: @@ -40,14 +39,10 @@ def load_vector_database(filesystem: s3fs.S3FileSystem, **kwargs) -> Chroma: # LOADING FROM MLFLOW ---------------------------------------- -def _load_database_from_mlflow( - run_id: str, dst_path: str = None, force: bool = False -) -> Chroma: +def _load_database_from_mlflow(run_id: str, dst_path: str = None, force: bool = False) -> Chroma: """Helper function to load database from MLflow artifacts.""" - local_path = _download_mlflow_artifacts_if_exists( - run_id=run_id, dst_path=dst_path, force=force - ) + local_path = _download_mlflow_artifacts_if_exists(run_id=run_id, dst_path=dst_path, force=force) run_params = mlflow.get_run(run_id).data.params @@ -62,7 +57,6 @@ def _load_database_from_mlflow( def _download_mlflow_artifacts_if_exists(run_id, dst_path=None, force=False): - # Construct the destination path if dst_path is None: tmpdir = tempfile.gettempdir() @@ -70,9 +64,7 @@ def _download_mlflow_artifacts_if_exists(run_id, dst_path=None, force=False): # Check if dst_path exists and force is False if os.path.exists(dst_path) and not force: - logging.info( - f"Destination path {dst_path} exists. Skipping download because force is set to False." - ) + logging.info(f"Destination path {dst_path} exists. Skipping download because force is set to False.") local_path = f"{dst_path}/chroma" else: # If dst_path doesn't exist or force is True, @@ -80,9 +72,7 @@ def _download_mlflow_artifacts_if_exists(run_id, dst_path=None, force=False): os.makedirs(dst_path, exist_ok=True) # Download artifacts to the specific dst_path - local_path = mlflow.artifacts.download_artifacts( - run_id=run_id, artifact_path="chroma", dst_path=dst_path - ) + local_path = mlflow.artifacts.download_artifacts(run_id=run_id, artifact_path="chroma", dst_path=dst_path) logging.info(f"Artifacts downloaded to: {local_path}") @@ -92,9 +82,7 @@ def _download_mlflow_artifacts_if_exists(run_id, dst_path=None, force=False): # LOADING FROM S3 ---------------------------------------- -def _load_database_from_s3( - filesystem: s3fs.S3FileSystem, kwargs: dict[str, str] -) -> Chroma: +def _load_database_from_s3(filesystem: s3fs.S3FileSystem, kwargs: dict[str, str]) -> Chroma: """Helper function to load database from S3 based on provided parameters.""" required_keys = [ "data_raw_s3_path", @@ -116,18 +104,14 @@ def _load_database_from_s3( stacklevel=2, ) - logging.info( - f"Searching for database with the following parameters: {kwargs_subset}" - ) + logging.info(f"Searching for database with the following parameters: {kwargs_subset}") db_path_prefix = f"{S3_BUCKET}/data/chroma_database/{kwargs.get('embedding_model')}" - + logging.info(f"Checking if a database has been stored at location '{db_path_prefix}'") if not filesystem.exists(db_path_prefix): - raise FileNotFoundError( - f"Database with model '{kwargs.get('embedding_model')}' not found" - ) + raise FileNotFoundError(f"Database with model '{kwargs.get('embedding_model')}' not found") for db_path in filesystem.ls(db_path_prefix): with filesystem.open(f"{db_path}/parameters.yaml") as f: @@ -141,9 +125,7 @@ def _load_database_from_s3( raise FileNotFoundError(f"Database with parameters {kwargs} not found") -def _reload_database_from_s3( - filesystem: s3fs.S3FileSystem, db_path: str, kwargs: dict[str, str] -) -> Chroma: +def _reload_database_from_s3(filesystem: s3fs.S3FileSystem, db_path: str, kwargs: dict[str, str]) -> Chroma: """Helper function to reload database from S3 to a local temporary directory.""" with tempfile.TemporaryDirectory() as temp_dir: filesystem.get(f"{db_path}", temp_dir, recursive=True) diff --git a/src/db_building/utils_db.py b/src/db_building/utils_db.py index 6ef03c6..4d749c7 100644 --- a/src/db_building/utils_db.py +++ b/src/db_building/utils_db.py @@ -143,9 +143,7 @@ def prepend_text_to_tag(tag, text): tag.string = text -def parse_xmls( - data: pd.DataFrame, id: str = "id", xml_column: str = "xml_content" -) -> pd.DataFrame: +def parse_xmls(data: pd.DataFrame, id: str = "id", xml_column: str = "xml_content") -> pd.DataFrame: """ Parses XML content from a DataFrame, extracts data, formats it, and returns a new DataFrame with the formatted content. @@ -238,13 +236,7 @@ def format_tags(soup: Tag, tags_to_ignore: list[str]) -> Tag: # in the children tags (graphique, tableau...) # Maybe we still want to keep it ? if any(remove_figure) and tag.name == "figure": - if all( - tag.find(child_tag) is None - for child_tag, remove in zip( - TAGS_FIGURE_CHILDREN, remove_figure, strict=False - ) - if not remove - ): + if all(tag.find(child_tag) is None for child_tag, remove in zip(TAGS_FIGURE_CHILDREN, remove_figure, strict=False) if not remove): tag.decompose() continue @@ -291,11 +283,7 @@ def format_tags(soup: Tag, tags_to_ignore: list[str]) -> Tag: # Rename intertitre tags if tag.name == "intertitre": - tag.name = ( - f"h{int(tag.get("niveau")) + 2}" - if tag.get("niveau") is not None - else "h3" - ) + tag.name = f"h{int(tag.get("niveau")) + 2}" if tag.get("niveau") is not None else "h3" continue ## TABLES diff --git a/src/evaluation/basic_evaluation.py b/src/evaluation/basic_evaluation.py index 441512e..5788c94 100644 --- a/src/evaluation/basic_evaluation.py +++ b/src/evaluation/basic_evaluation.py @@ -6,9 +6,7 @@ def transform_answers_bot(answers_bot: pd.DataFrame, k: int): # Accurate page or document response nbre_documents_answers_bot = ( - answers_bot.groupby("question") - .agg({"url_expected": "sum", "number_pages_expected": "mean"}) - .sort_values("url_expected", ascending=False) + answers_bot.groupby("question").agg({"url_expected": "sum", "number_pages_expected": "mean"}).sort_values("url_expected", ascending=False) ) nbre_pages_answers_bot = ( @@ -19,9 +17,7 @@ def transform_answers_bot(answers_bot: pd.DataFrame, k: int): ) eval_reponses_bot = ( - nbre_pages_answers_bot.merge( - nbre_documents_answers_bot, on=["question", "number_pages_expected"] - ) + nbre_pages_answers_bot.merge(nbre_documents_answers_bot, on=["question", "number_pages_expected"]) .rename( columns={ "url_expected_x": "Nombre de pages citées par le bot qui sont OK", @@ -33,20 +29,12 @@ def transform_answers_bot(answers_bot: pd.DataFrame, k: int): ) # Top k answers - answers_bot["cumsum_url_expected"] = answers_bot.groupby(["question"])[ - "url_expected" - ].cumsum() + answers_bot["cumsum_url_expected"] = answers_bot.groupby(["question"])["url_expected"].cumsum() answers_bot["document_position"] = answers_bot.groupby("question").cumcount() + 1 - answers_bot["cumsum_url_expected"] = answers_bot["cumsum_url_expected"].clip( - upper=1 - ) + answers_bot["cumsum_url_expected"] = answers_bot["cumsum_url_expected"].clip(upper=1) answers_bot["cumsum_url_expected"] = answers_bot["cumsum_url_expected"].astype(bool) - answers_bot_topk = ( - answers_bot.groupby("document_position") - .agg({"cumsum_url_expected": "mean"}) - .reset_index() - ) + answers_bot_topk = answers_bot.groupby("document_position").agg({"cumsum_url_expected": "mean"}).reset_index() answers_bot_topk = answers_bot_topk.loc[answers_bot_topk["document_position"] <= k] return eval_reponses_bot, answers_bot_topk @@ -66,10 +54,6 @@ def _answer_faq_by_bot(retriever, question, valid_urls): def answer_faq_by_bot(retriever, faq): answers_bot = [] for _idx, faq_items in faq.iterrows(): - answers_bot.append( - _answer_faq_by_bot( - retriever, faq_items["titre"], faq_items["urls"].split(", ") - ) - ) + answers_bot.append(_answer_faq_by_bot(retriever, faq_items["titre"], faq_items["urls"].split(", "))) answers_bot = pd.concat(answers_bot) return answers_bot diff --git a/src/evaluation/eval_configuration.py b/src/evaluation/eval_configuration.py index 48b773f..06c27fa 100644 --- a/src/evaluation/eval_configuration.py +++ b/src/evaluation/eval_configuration.py @@ -28,9 +28,7 @@ def copy(self): @dataclass class RetrievalConfiguration(EvalConfiguration): # Embedding model - embedding_model_name: str = field( - default=EMB_MODEL_NAME, metadata={"description": "embedding model"} - ) + embedding_model_name: str = field(default=EMB_MODEL_NAME, metadata={"description": "embedding model"}) collection: str = field(default=None) chunk_size: int = field(default=None, metadata={"description": "chunk size"}) overlap_size: int = field(default=None, metadata={"description": "overlap size"}) @@ -38,26 +36,18 @@ class RetrievalConfiguration(EvalConfiguration): # Reranker model reranker_type: str = field( default=None, - metadata={ - "description": """Reranker type, choose among: ["BM25", "Cross-encoder", "ColBERT", "Metadata"]""" - }, + metadata={"description": """Reranker type, choose among: ["BM25", "Cross-encoder", "ColBERT", "Metadata"]"""}, ) reranker_name: str = field( default=None, metadata={"description": "Reranker model name (when it exists)"}, ) - param_ensemble: dict[str, str | None] = field( - default_factory=dict, metadata={"description": "list of reranker configs"} - ) - use_metadata: str | None = field( - default=None, metadata={"description": "field metadata"} - ) + param_ensemble: dict[str, str | None] = field(default_factory=dict, metadata={"description": "list of reranker configs"}) + use_metadata: str | None = field(default=None, metadata={"description": "field metadata"}) rerank_k: int = field(default=None, metadata={"description": "field metadata"}) # Retrieval parameters k_values: list[int] = field(default_factory=lambda: [5, 10, 15, 20, 25, 30, 40, 50]) # Parsing metadata - markdown_separator: list[str] = field( - default_factory=lambda: ["\n\n", "\n", ".", " ", ""] - ) + markdown_separator: list[str] = field(default_factory=lambda: ["\n\n", "\n", ".", " ", ""]) diff --git a/src/evaluation/reranking_perf.py b/src/evaluation/reranking_perf.py index 805d88d..42363eb 100644 --- a/src/evaluation/reranking_perf.py +++ b/src/evaluation/reranking_perf.py @@ -1,6 +1,4 @@ -def compare_performance_reranking( - eval_reponses_bot_after_reranker, eval_reponses_bot_before_reranker -): +def compare_performance_reranking(eval_reponses_bot_after_reranker, eval_reponses_bot_before_reranker): comparaison_eval_reponses = eval_reponses_bot_after_reranker.merge( eval_reponses_bot_before_reranker, on=["question", "Nombre de pages citées dans la réponse de la FAQ"], @@ -10,12 +8,8 @@ def compare_performance_reranking( # Reordering columns columns = comparaison_eval_reponses.columns.sort_values() question_cols = ["question"] - pages_cited_cols = [ - col for col in columns if col.startswith("Nombre de pages citées") - ] - documents_cited_cols = [ - col for col in columns if col.startswith("Nombre de documents cités") - ] + pages_cited_cols = [col for col in columns if col.startswith("Nombre de pages citées")] + documents_cited_cols = [col for col in columns if col.startswith("Nombre de documents cités")] # Combine all lists in the desired order new_order = question_cols + pages_cited_cols + documents_cited_cols diff --git a/src/evaluation/retrieval_evaluation_measures.py b/src/evaluation/retrieval_evaluation_measures.py index f69f521..2243258 100644 --- a/src/evaluation/retrieval_evaluation_measures.py +++ b/src/evaluation/retrieval_evaluation_measures.py @@ -8,15 +8,11 @@ class RetrievalEvaluationMeasure: def recall(self, retrieved, relevant): intersection = set(retrieved) & set(relevant) - return ( - np.round(len(intersection) / len(relevant), 3) if len(relevant) > 0 else 0 - ) + return np.round(len(intersection) / len(relevant), 3) if len(relevant) > 0 else 0 def precision(self, retrieved, relevant): intersection = set(retrieved) & set(relevant) - return ( - np.round(len(intersection) / len(retrieved), 3) if len(retrieved) > 0 else 0 - ) + return np.round(len(intersection) / len(retrieved), 3) if len(retrieved) > 0 else 0 def hit_rate(self, retrieved, relevant): """ @@ -39,17 +35,13 @@ def mrr(self, retrieved, relevant): return mrr_score def relevance_score(self, retrieved, relevant): - return [ - 1 if retrieved_source in relevant else 0 for retrieved_source in retrieved - ] + return [1 if retrieved_source in relevant else 0 for retrieved_source in retrieved] def dcg(self, relevance_scores, k=None): if k is None: k = len(relevance_scores) relevance_scores = np.asfarray(relevance_scores)[:k] - return np.sum( - relevance_scores / np.log2(np.arange(2, relevance_scores.size + 2)) - ) + return np.sum(relevance_scores / np.log2(np.arange(2, relevance_scores.size + 2))) def idcg(self, relevance_scores, k=None): if k is None: diff --git a/src/evaluation/retrieval_evaluator.py b/src/evaluation/retrieval_evaluator.py index 020903f..77b09da 100644 --- a/src/evaluation/retrieval_evaluator.py +++ b/src/evaluation/retrieval_evaluator.py @@ -17,9 +17,7 @@ from .utils import build_chain_reranker_test ## Utility function ## -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" -) +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") def _build_vector_database(path_data: str, config: RetrievalConfiguration) -> Chroma: @@ -88,15 +86,11 @@ def run( for configuration in eval_configurations: t0 = time.time() config_name = configuration.name - logging.info( - f"Start of evaluation run for dataset: {df_name} and configuration: {config_name}" - ) + logging.info(f"Start of evaluation run for dataset: {df_name} and configuration: {config_name}") results[df_name][config_name] = {} # Load ref Corpus - vector_db = _build_vector_database( - path_data=configuration.database_path, config=configuration - ) + vector_db = _build_vector_database(path_data=configuration.database_path, config=configuration) # create a retriever # note : define the type of search "similarity", "mmr", "similarity_score_threshold" @@ -127,32 +121,18 @@ def run( golden_source = row.get("source_doc") # based retriever - retrieved_docs = vector_db.similarity_search_by_vector( - embedding=embedded_queries[i], k=max(configuration.k_values) - ) + retrieved_docs = vector_db.similarity_search_by_vector(embedding=embedded_queries[i], k=max(configuration.k_values)) # reranker - retrieved_docs = reranker.invoke( - {"documents": retrieved_docs, "query": queries[i]} - ) + retrieved_docs = reranker.invoke({"documents": retrieved_docs, "query": queries[i]}) - retrieved_sources = [ - doc.metadata["source"] for doc in retrieved_docs - ] + retrieved_sources = [doc.metadata["source"] for doc in retrieved_docs] for k in configuration.k_values: - recall_at_k = ir_measures.recall( - retrieved_sources[:k], [golden_source] - ) - precision_at_k = ir_measures.precision( - retrieved_sources[:k], [golden_source] - ) - mrr_at_k = ir_measures.mrr( - retrieved_sources[:k], [golden_source] - ) - ndcg_at_k = ir_measures.ndcg( - retrieved_sources[:k], [golden_source] - ) + recall_at_k = ir_measures.recall(retrieved_sources[:k], [golden_source]) + precision_at_k = ir_measures.precision(retrieved_sources[:k], [golden_source]) + mrr_at_k = ir_measures.mrr(retrieved_sources[:k], [golden_source]) + ndcg_at_k = ir_measures.ndcg(retrieved_sources[:k], [golden_source]) individual_recalls.append(recall_at_k) individual_precisions.append(precision_at_k) @@ -166,29 +146,21 @@ def run( # length of all_individual_recalls/precisions equals len(configuration.k_values) assert len(df) == len(all_individual_recalls), "nb of individuals error" - logging.info( - f" length of all_individual recalls is {len(all_individual_recalls)}" - ) + logging.info(f" length of all_individual recalls is {len(all_individual_recalls)}") all_k_precisions = list(zip(*all_individual_precisions, strict=False)) all_k_recalls = list(zip(*all_individual_recalls, strict=False)) all_k_mrrs = list(zip(*all_individual_mrrs, strict=False)) all_k_ndcgs = list(zip(*all_individual_ndcgs, strict=False)) - assert len(configuration.k_values) == len( - all_k_recalls - ), "Number of ks error" + assert len(configuration.k_values) == len(all_k_recalls), "Number of ks error" results[df_name][config_name]["recall"] = {} results[df_name][config_name]["precision"] = {} results[df_name][config_name]["mrr"] = {} results[df_name][config_name]["ndcg"] = {} for i, k in enumerate(configuration.k_values): - results[df_name][config_name]["recall"][k] = np.mean( - all_k_recalls[i] - ) - results[df_name][config_name]["precision"][k] = np.mean( - all_k_precisions[i] - ) + results[df_name][config_name]["recall"][k] = np.mean(all_k_recalls[i]) + results[df_name][config_name]["precision"][k] = np.mean(all_k_precisions[i]) results[df_name][config_name]["mrr"][k] = np.mean(all_k_mrrs[i]) results[df_name][config_name]["ndcg"][k] = np.mean(all_k_ndcgs[i]) @@ -208,12 +180,8 @@ def build_reference_matrix(df: pd.DataFrame) -> tuple[csr_matrix, dict, dict, di # for questions use "standard" indices (indices relative to the whole set) digit_df["question"] = df["question"].replace(val_to_id, inplace=False) # for source docs use "relative" numeric indices ("relative" to the matrix) - source_val_to_local_id = { - v: local_id for local_id, v in enumerate(unique_sources) - } - digit_df["source_doc"] = df["source_doc"].replace( - source_val_to_local_id, inplace=False - ) + source_val_to_local_id = {v: local_id for local_id, v in enumerate(unique_sources)} + digit_df["source_doc"] = df["source_doc"].replace(source_val_to_local_id, inplace=False) # Build a matrix using the indices of all the objects question_ids = digit_df["question"].values source_ids = digit_df["source_doc"].values diff --git a/src/evaluation/utils.py b/src/evaluation/utils.py index f521362..afc61a7 100644 --- a/src/evaluation/utils.py +++ b/src/evaluation/utils.py @@ -50,9 +50,7 @@ def unique_by_key(iterable: Iterable[T], key: Callable[[T], H]) -> Iterator[T]: yield e -def weighted_reciprocal_rank( - doc_lists: list[list[Document]], weights, c=60 -) -> list[Document]: +def weighted_reciprocal_rank(doc_lists: list[list[Document]], weights, c=60) -> list[Document]: """ This function comes from Langchain documentation. Perform weighted Reciprocal Rank Fusion on multiple rank lists. @@ -127,9 +125,7 @@ def use_sbert_retrieval_evaluator(df: pd.DataFrame, model: SentenceTransformer) # Define the compression function -def compress_BM25_lambda( - documents: Sequence[Document], query: str, k: int = 5, **kwargs: dict[str, Any] -) -> Sequence[Document]: +def compress_BM25_lambda(documents: Sequence[Document], query: str, k: int = 5, **kwargs: dict[str, Any]) -> Sequence[Document]: """Compress retrieved documents given the query context.""" # Initialize the retriever with the documents @@ -138,9 +134,7 @@ def compress_BM25_lambda( # Define the compression function using Metadata -def compress_metadata_lambda( - documents: Sequence[LangchainDocument], query: str, config: dict -) -> Sequence[LangchainDocument]: +def compress_metadata_lambda(documents: Sequence[LangchainDocument], query: str, config: dict) -> Sequence[LangchainDocument]: rerank_k = config.get("rerank_k", len(documents)) metadata_field = config.get("use_metadata") @@ -148,11 +142,7 @@ def compress_metadata_lambda( new_data = [] for doc in documents: meta = doc.metadata - page_content = ( - meta[metadata_field] - if metadata_field in meta and len(meta[metadata_field]) > 0 - else doc.page_content - ) + page_content = meta[metadata_field] if metadata_field in meta and len(meta[metadata_field]) > 0 else doc.page_content new_data.append( LangchainDocument( page_content=page_content, @@ -161,21 +151,13 @@ def compress_metadata_lambda( ) # Use BM25 to rerank the new data - new_retrieved_docs = compress_BM25_lambda( - documents=new_data, query=query, k=rerank_k - ) + new_retrieved_docs = compress_BM25_lambda(documents=new_data, query=query, k=rerank_k) # Map sources to original documents - source_to_doc_map = { - doc.metadata.get("source", "unknown"): doc for doc in documents - } + source_to_doc_map = {doc.metadata.get("source", "unknown"): doc for doc in documents} # Return the reranked documents based on the source mapping - return [ - source_to_doc_map[new_doc.metadata["source"]] - for new_doc in new_retrieved_docs - if new_doc.metadata["source"] in source_to_doc_map - ] + return [source_to_doc_map[new_doc.metadata["source"]] for new_doc in new_retrieved_docs if new_doc.metadata["source"] in source_to_doc_map] else: # If no metadata field specified, return top k documents return documents[:rerank_k] @@ -196,38 +178,20 @@ def choosing_reranker_test(config: dict): retrieval_agent = RunnableLambda(lambda r: r["documents"]) elif reranker_type == "BM25": # need to format {"documents" : ..., "query" : ...} - retrieval_agent = RunnableLambda( - lambda r: compress_BM25_lambda( - documents=r["documents"], query=r["query"], k=rerank_k - ) - ) + retrieval_agent = RunnableLambda(lambda r: compress_BM25_lambda(documents=r["documents"], query=r["query"], k=rerank_k)) elif reranker_type == "Cross-encoder": # need to format {"documents" : ..., "query" : ...} - model = HuggingFaceCrossEncoder( - model_name=reranker_name, model_kwargs={"device": "cuda"} - ) + model = HuggingFaceCrossEncoder(model_name=reranker_name, model_kwargs={"device": "cuda"}) compressor = CrossEncoderReranker(model=model, top_n=rerank_k) - retrieval_agent = RunnableLambda( - func=lambda inputs: compressor.compress_documents( - documents=inputs["documents"], query=inputs["query"] - ) - ) + retrieval_agent = RunnableLambda(func=lambda inputs: compressor.compress_documents(documents=inputs["documents"], query=inputs["query"])) elif reranker_type == "ColBERT": # need to format {"documents" : ..., "query" : ...} colBERT = RAGPretrainedModel.from_pretrained(reranker_name) compressor = colBERT.as_langchain_document_compressor(k=rerank_k) - retrieval_agent = RunnableLambda( - func=lambda r: compressor.compress_documents( - documents=r["documents"], query=r["query"], top_n=rerank_k - ) - ) + retrieval_agent = RunnableLambda(func=lambda r: compressor.compress_documents(documents=r["documents"], query=r["query"], top_n=rerank_k)) elif reranker_type == "Metadata": # need to format {"documents" : ..., "query" : ...} - retrieval_agent = RunnableLambda( - func=lambda r: compress_metadata_lambda( - documents=r["documents"], query=r["query"], config=config - ) - ) + retrieval_agent = RunnableLambda(func=lambda r: compress_metadata_lambda(documents=r["documents"], query=r["query"], config=config)) return retrieval_agent @@ -270,16 +234,12 @@ def build_chain_reranker_test(config=RetrievalConfiguration): results[f"model_{i}"] = reranker_model weights.append(r_w) - weights = ( - [1 / len(weights) for _ in weights] if np.sum(weights) != 1 else weights - ) # uniform weights. + weights = [1 / len(weights) for _ in weights] if np.sum(weights) != 1 else weights # uniform weights. retrieval_agent = ( results | RunnableLambda(func=lambda d: aggregate_ensemble_reranker(d)) - | RunnableLambda( - func=lambda d: weighted_reciprocal_rank(doc_lists=d, weights=weights) - ) + | RunnableLambda(func=lambda d: weighted_reciprocal_rank(doc_lists=d, weights=weights)) ) return retrieval_agent @@ -399,11 +359,7 @@ def hist_results( for j, config in enumerate(eval_configs): config_results = results.get(config.name, {}) metric_values = config_results.get(metric, {}) - value = ( - metric_values.get(k) - if isinstance(metric_values, dict) - else metric_values - ) + value = metric_values.get(k) if isinstance(metric_values, dict) else metric_values if value is not None: values.append(value) diff --git a/src/insee_data_processing.py b/src/insee_data_processing.py index 2c9ddb3..bb75f93 100644 --- a/src/insee_data_processing.py +++ b/src/insee_data_processing.py @@ -3,9 +3,10 @@ import pandas as pd import s3fs +from markdownify import markdownify as md + from config import S3_BUCKET, S3_ENDPOINT_URL from db_building.utils_db import complete_url_builder -from markdownify import markdownify as md FILES = [ "applishare_extract", @@ -50,9 +51,7 @@ def extract_rmes_data(data: dict): """ id = get_content(data, "id") titre = data.get("titre", "") - note_historique = md( - data.get("noteHistorique", [{}])[0].get("contenu", ""), bullets="-" - ) + note_historique = md(data.get("noteHistorique", [{}])[0].get("contenu", ""), bullets="-") label = get_content(data, "label", 0) frequence_collecte = get_content(data, "frequenceCollecte", "label", 0) resume = md(data.get("resume", [{}])[0].get("contenu", ""), bullets="-") @@ -72,16 +71,8 @@ def extract_rmes_data(data: dict): parts.append(f"### Historique\n{note_historique}\n") if note_historique else None parts.append(f"### Famille\n{famille}\n") if famille else None - ( - parts.append(f"### Organisme responsable\n{organismes_responsables}\n") - if organismes_responsables - else None - ) - ( - parts.append(f"### Fréquence de collecte des données\n{frequence_collecte}\n") - if frequence_collecte - else None - ) + (parts.append(f"### Organisme responsable\n{organismes_responsables}\n") if organismes_responsables else None) + (parts.append(f"### Fréquence de collecte des données\n{frequence_collecte}\n") if frequence_collecte else None) parts.append(f"### Partenaires\n{partenaires}\n") if partenaires else None formatted_page = "\n".join(parts).replace("\\.", ".").replace("\\-", "-") @@ -93,29 +84,18 @@ def process_row(row): def main(): - logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" - ) + logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") fs = s3fs.S3FileSystem(client_kwargs={"endpoint_url": S3_ENDPOINT_URL}) - tables = { - file: pd.read_parquet( - f"s3://{S3_BUCKET}/data/raw_data/{file}.parquet", filesystem=fs - ) - for file in FILES - } + tables = {file: pd.read_parquet(f"s3://{S3_BUCKET}/data/raw_data/{file}.parquet", filesystem=fs) for file in FILES} for key, table in tables.items(): logging.info(f"Size of {key} : {len(table)}") - joined_table = tables["applishare_extract"].merge( - tables["solr_extract"], how="inner", on="id" - ) + joined_table = tables["applishare_extract"].merge(tables["solr_extract"], how="inner", on="id") joined_table["url"] = complete_url_builder(joined_table) - joined_table["theme"] = [ - x[0] if x is not None else x for x in joined_table["theme"] - ] + joined_table["theme"] = [x[0] if x is not None else x for x in joined_table["theme"]] subset_table = joined_table.reset_index(drop=True)[ [ "id", @@ -132,12 +112,8 @@ def main(): "xml_content", ] ] - subset_table["dateDiffusion"] = pd.to_datetime( - subset_table["dateDiffusion"], format="mixed" - ).dt.strftime("%Y-%m-%d %H:%M") - subset_table.to_parquet( - f"s3://{S3_BUCKET}/data/raw_data/applishare_solr_joined.parquet", filesystem=fs - ) + subset_table["dateDiffusion"] = pd.to_datetime(subset_table["dateDiffusion"], format="mixed").dt.strftime("%Y-%m-%d %H:%M") + subset_table.to_parquet(f"s3://{S3_BUCKET}/data/raw_data/applishare_solr_joined.parquet", filesystem=fs) rmes_sources_content = pd.DataFrame( tables["rmes_extract_sources"]["xml_content"].apply(process_row).to_list(), diff --git a/src/model_building/custom_hf_pipeline.py b/src/legacy/custom_hf_pipeline.py similarity index 74% rename from src/model_building/custom_hf_pipeline.py rename to src/legacy/custom_hf_pipeline.py index 4b8c496..f92aef7 100644 --- a/src/model_building/custom_hf_pipeline.py +++ b/src/legacy/custom_hf_pipeline.py @@ -69,9 +69,7 @@ def from_model_id( from transformers import pipeline as hf_pipeline except ImportError as err: - raise ValueError( - "Could not import transformers python package. Please install it with `pip install transformers`." - ) from err + raise ValueError("Could not import transformers python package. Please install it with `pip install transformers`.") from err _model_kwargs = model_kwargs or {} tokenizer = AutoTokenizer.from_pretrained(model_id, **_model_kwargs) @@ -82,22 +80,14 @@ def from_model_id( elif task in ("text2text-generation", "summarization"): model = AutoModelForSeq2SeqLM.from_pretrained(model_id, **_model_kwargs) else: - raise ValueError( - f"Got invalid task {task}, " - f"currently only {VALID_TASKS} are supported" - ) + raise ValueError(f"Got invalid task {task}, " f"currently only {VALID_TASKS} are supported") except ImportError as e: - raise ValueError( - f"Could not load the {task} model due to missing dependencies." - ) from e + raise ValueError(f"Could not load the {task} model due to missing dependencies.") from e if tokenizer.pad_token is None: tokenizer.pad_token_id = model.config.eos_token_id - if ( - getattr(model, "is_loaded_in_4bit", False) - or getattr(model, "is_loaded_in_8bit", False) - ) and device is not None: + if (getattr(model, "is_loaded_in_4bit", False) or getattr(model, "is_loaded_in_8bit", False)) and device is not None: logger.warning( f"Setting the `device` argument to None from {device} to avoid " "the error caused by attempting to move the model that was already " @@ -111,10 +101,7 @@ def from_model_id( cuda_device_count = torch.cuda.device_count() if device < -1 or (device >= cuda_device_count): - raise ValueError( - f"Got device=={device}, " - f"device is required to be within [-1, {cuda_device_count})" - ) + raise ValueError(f"Got device=={device}, " f"device is required to be within [-1, {cuda_device_count})") if device_map is not None and device < 0: device = None if device is not None and device < 0 and cuda_device_count > 0: @@ -126,9 +113,7 @@ def from_model_id( cuda_device_count, ) if "trust_remote_code" in _model_kwargs: - _model_kwargs = { - k: v for k, v in _model_kwargs.items() if k != "trust_remote_code" - } + _model_kwargs = {k: v for k, v in _model_kwargs.items() if k != "trust_remote_code"} _pipeline_kwargs = pipeline_kwargs or {} pipeline = hf_pipeline( task=task, @@ -141,10 +126,7 @@ def from_model_id( **_pipeline_kwargs, ) if pipeline.task not in VALID_TASKS: - raise ValueError( - f"Got invalid task {pipeline.task}, " - f"currently only {VALID_TASKS} are supported" - ) + raise ValueError(f"Got invalid task {pipeline.task}, " f"currently only {VALID_TASKS} are supported") return cls( pipeline=pipeline, model_id=model_id, @@ -192,30 +174,19 @@ def _generate( try: from transformers.pipelines.text_generation import ReturnType - remove_prompt = ( - self.pipeline._postprocess_params.get("return_type") - != ReturnType.NEW_TEXT - ) + remove_prompt = self.pipeline._postprocess_params.get("return_type") != ReturnType.NEW_TEXT except Exception as e: - logger.warning( - f"Unable to extract pipeline return_type. Received error:\n\n{e}" - ) + logger.warning(f"Unable to extract pipeline return_type. Received error:\n\n{e}") remove_prompt = True - text = ( - response["generated_text"][len(batch_prompts[j]) :] - if remove_prompt - else response["generated_text"] - ) + text = response["generated_text"][len(batch_prompts[j]) :] if remove_prompt else response["generated_text"] elif self.pipeline.task == "text2text-generation": text = response["generated_text"] elif self.pipeline.task == "summarization": text = response["summary_text"] else: - raise ValueError( - f"Got invalid task {self.pipeline.task}, currently only {VALID_TASKS} are supported" - ) + raise ValueError(f"Got invalid task {self.pipeline.task}, currently only {VALID_TASKS} are supported") if stop: # Enforce stop tokens @@ -224,9 +195,7 @@ def _generate( # Append the processed text to results text_generations.append(text) - return LLMResult( - generations=[[Generation(text=text)] for text in text_generations] - ) + return LLMResult(generations=[[Generation(text=text)] for text in text_generations]) async def _astream( self, @@ -241,28 +210,17 @@ async def _astream( from transformers import TextIteratorStreamer except ImportError as err: - raise ValueError( - "Could not import transformers python package. Please install it with `pip install transformers`." - ) from err + raise ValueError("Could not import transformers python package. Please install it with `pip install transformers`.") from err try: streamer = self.pipeline._forward_params["streamer"] if streamer is None: - raise ValueError( - "Could not get TextIteratorStreamer from pipeline. " - "Please check your pipeline." - ) + raise ValueError("Could not get TextIteratorStreamer from pipeline. " "Please check your pipeline.") elif type(streamer) is not TextIteratorStreamer: - raise ValueError( - "Passed Streamer is not supported. Please use TextIteratorStreamer." - "Please check your pipeline." - ) + raise ValueError("Passed Streamer is not supported. Please use TextIteratorStreamer." "Please check your pipeline.") except Exception as e: - raise ValueError( - "Could not get TextIteratorStreamer from pipeline. " - "Please check your pipeline." - ) from e + raise ValueError("Could not get TextIteratorStreamer from pipeline. " "Please check your pipeline.") from e # Prepare the inputs for the model tok = self.pipeline.tokenizer diff --git a/src/legacy/reranking.py b/src/legacy/reranking.py new file mode 100644 index 0000000..6d48a4c --- /dev/null +++ b/src/legacy/reranking.py @@ -0,0 +1,232 @@ +import numpy as np +import torch +import torch.nn.functional as F + +###### LLM Reraner functions ###### + + +def expected_relevance_values(logits, grades_token_ids, list_grades): + next_token_logits = logits[:, -1, :] + next_token_logits = next_token_logits.cpu()[0] + probabilities = F.softmax(next_token_logits[grades_token_ids], dim=-1).numpy() + return np.dot(np.array(list_grades), probabilities) + + +def peak_relevance_likelihood(logits, grades_token_ids, list_grades): + index_max_grade = np.array(list_grades).argmax() + next_token_logits = logits[:, -1, :] + probabilities = F.softmax(next_token_logits, dim=-1).cpu().numpy()[0] + return probabilities[grades_token_ids[index_max_grade]] + + +def compute_sequence_log_probs(tokenizer, model, sequence): + # Tokenize the input sequence + inputs = tokenizer(sequence, return_tensors="pt") + input_ids = inputs["input_ids"] + + # Move input tensors to the same device as the model + inputs = inputs.to(model.device) + + # Get the logits from the model + with torch.no_grad(): + outputs = model(**inputs) + logits = outputs.logits + + # Calculate the probabilities + log_probs = F.log_softmax(logits, dim=-1) + + # Compute the probability of the sequence + sequence_probability = 0.0 + for i in range(len(input_ids[0]) - 1): + token_id = input_ids[0][i + 1] + sequence_probability += log_probs[0, i, token_id].item() + + return sequence_probability + + +def RG_S(tokenizer, model, query, document, aggregating_method, k=5): + list_grades = list(range(k)) + grades_token_ids = [tokenizer(str(grade))["input_ids"][1] for grade in list_grades] + + RG_S_template = """ + Sur une échelle de 0 à {k}, jugez la pertinence entre la requête et le document. + Requête : {query} + Document : {document} + Réponse : """ + + messages = [ + { + "role": "system", + "content": "Tu es un assistant chatbot expert en Statistique Publique.", + }, + { + "role": "user", + "content": RG_S_template.format(query=query, document=document, k=k), + }, + ] + + input_text = tokenizer.apply_chat_template(messages, add_generation_prompt=True, tokenize=False) + + inputs = tokenizer(input_text, return_tensors="pt").to(model.device) + + with torch.no_grad(): + outputs = model(**inputs) + logits = outputs.logits + + return aggregating_method(logits, grades_token_ids, list_grades) + + +def RG_4L(tokenizer, model, query, document, args): + possible_judgements = [ + " Parfaitement Pertinent", + " Très Pertinent", + " Assez Pertinent", + " Non Pertinent", + ] + list_grades = np.array([3, 2, 1, 0]) + RG_4L_template = """ + Evaluez la pertinence du document donné par rapport à la question posée. + Répondez uniquement parmi : Parfaitement Pertinent, Très Pertinent, Assez Pertinent ou Non Pertinent. + Requête : {query} + Document : {document} + Réponse : {judgement}""" + + messages = [ + { + "role": "system", + "content": "Tu es un assistant chatbot expert en Statistique Publique.", + }, + {"role": "user", "content": RG_4L_template}, + ] + + log_probs = [] + for judgement in possible_judgements: + input_text = tokenizer.apply_chat_template(messages, add_generation_prompt=False, tokenize=False).format( + query=query, document=document, judgement=judgement + ) + log_probs.append(compute_sequence_log_probs(sequence=input_text)) + + probs = F.softmax(torch.tensor(log_probs), dim=-1).numpy() + return np.dot(probs, list_grades) + + +def RG_3L(tokenizer, model, query, document, args): + possible_judgements = [" Très Pertinent", " Assez Pertinent", " Non Pertinent"] + list_grades = np.array([2, 1, 0]) + RG_3L_template = """ + Evaluez la pertinence du document donné par rapport à la question posée. + Répondez uniquement parmi : Très Pertinent, Assez Pertinent ou Non Pertinent. + Requête : {query} + Document : {document} + Réponse : {judgement}""" + + messages = [ + { + "role": "system", + "content": "Tu es un assistant chatbot expert en Statistique Publique.", + }, + {"role": "user", "content": RG_3L_template}, + ] + + log_probs = [] + for judgement in possible_judgements: + input_text = tokenizer.apply_chat_template(messages, add_generation_prompt=False, tokenize=False).format( + query=query, document=document, judgement=judgement + ) + log_probs.append(compute_sequence_log_probs(sequence=input_text)) + + probs = F.softmax(torch.tensor(log_probs), dim=-1).numpy() + return np.dot(probs, list_grades) + + +def RG_YN(tokenizer, model, query, document, aggregating_method): + list_judgements = [" Oui", " Non"] + grades_token_ids = [tokenizer(j)["input_ids"][1] for j in list_judgements] + list_grades = [1, 0] + + RG_YN_template = """ + Pour la requête et le document suivants, jugez s'ils sont pertinents. Répondez UNIQUEMENT par Oui ou Non. + Requête : {query} + Document : {document} + Réponse : """ + + messages = [ + { + "role": "system", + "content": "Tu es un assistant chatbot expert en Statistique Publique.", + }, + { + "role": "user", + "content": RG_YN_template.format(query=query, document=document), + }, + ] + + input_text = tokenizer.apply_chat_template(messages, add_generation_prompt=True, tokenize=False) + + inputs = tokenizer(input_text, return_tensors="pt").to(model.device) + + with torch.no_grad(): + outputs = model(**inputs) + logits = outputs.logits + + return aggregating_method(logits, grades_token_ids, list_grades) + + +def llm_reranking(tokenizer, model, query, retrieved_documents, assessing_method, aggregating_method): + docs_content = retrieved_documents.copy() # [doc.page_content for doc in retrieved_documents] + + scores = [] + for document in docs_content: + score = assessing_method(query, document, aggregating_method) + scores.append(score) + + docs_with_scores = list(zip(retrieved_documents, scores, strict=False)) + docs_with_scores.sort(key=lambda x: x[1], reverse=True) + sorted_documents = [doc for doc, score in docs_with_scores] # docs_with_scores + return sorted_documents + + +""" +def compute_proba_judgement(sequence, judgement): + # Tokenize the input sequence and judgement + inputs = tokenizer(sequence, return_tensors='pt') + input_ids = inputs['input_ids'] + judgement_ids = tokenizer(judgement, return_tensors='pt')['input_ids'][0][1:] + + # print("Input IDs:", input_ids) + # print("Judgement IDs:", judgement_ids) + + # Move input tensors to the same device as the model + input_ids = input_ids.to(model.device) + + # Get the logits from the model + with torch.no_grad(): + outputs = model(**inputs) + logits = outputs.logits + + # Calculate the probabilities + log_probs = F.log_softmax(logits, dim=-1) + probs = F.softmax(logits, dim=-1) + + # Convert input IDs tensor to list for indexing + input_ids_list = input_ids[0].tolist()[1:] + judgement_ids_list = judgement_ids.tolist() + + start_index = find_sublist_indices(main_list=input_ids_list, sublist=judgement_ids_list) + #print("Start index:", start_index, "End index:", end_index) + + if start_index == -1 or end_index == -1: + raise ValueError("Judgement sublist not found in the input sequence") + + # Compute the probability of the sequence + sequence_probability = 0.0 + list_indexes = list(range(0, start_index + len(judgement_ids_list), 1)) + for i in list_indexes: + token_id = input_ids_list[i] + token_log_prob = log_probs[0, i, token_id].item() + token_prob = probs[0, i, token_id].item() + print(" Token:", tokenizer.decode([token_id]),"/ Probability:", token_prob * 100, "%") + sequence_probability += token_log_prob + + return sequence_probability/len(list_indexes) +""" diff --git a/src/model_building/build_llm_model.py b/src/model_building/build_llm_model.py index d0fd430..0356df1 100644 --- a/src/model_building/build_llm_model.py +++ b/src/model_building/build_llm_model.py @@ -11,7 +11,7 @@ pipeline, ) -# from src.model_building.custom_hf_pipeline import CustomHuggingFacePipeline +# from legacy.model_building.custom_hf_pipeline import CustomHuggingFacePipeline from .fetch_llm_model import cache_model_from_hf_hub # Add the project root directory to sys.path @@ -55,18 +55,12 @@ def build_llm_model( else None ), # Load LLM config - "config": ( - AutoConfig.from_pretrained(model_name, trust_remote_code=True, token=token) - if config - else None - ), + "config": (AutoConfig.from_pretrained(model_name, trust_remote_code=True, token=token) if config else None), "token": token, } # Load LLM tokenizer - tokenizer = AutoTokenizer.from_pretrained( - model_name, use_fast=True, device_map="auto", token=configs["token"] - ) + tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True, device_map="auto", token=configs["token"]) streamer = None if streaming: streamer = TextStreamer(tokenizer=tokenizer, skip_prompt=True) diff --git a/src/model_building/fetch_llm_model.py b/src/model_building/fetch_llm_model.py index cc104fc..538319e 100644 --- a/src/model_building/fetch_llm_model.py +++ b/src/model_building/fetch_llm_model.py @@ -1,34 +1,33 @@ import logging import os import subprocess + import s3fs from transformers import AutoModelForCausalLM logger = logging.getLogger(__name__) -logging.basicConfig( - format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %I:%M:%S %p", level=logging.INFO -) +logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %I:%M:%S %p", level=logging.INFO) def get_file_system(token=None) -> s3fs.S3FileSystem: """ Creates and returns an S3 file system instance using the s3fs library. - This function configures the S3 file system with endpoint URL and credentials - obtained from environment variables, enabling interactions with the specified - S3-compatible storage. Optionally, a security token can be provided for session-based + This function configures the S3 file system with endpoint URL and credentials + obtained from environment variables, enabling interactions with the specified + S3-compatible storage. Optionally, a security token can be provided for session-based authentication. Parameters: ----------- token : str, optional - A temporary security token for session-based authentication. This is optional and + A temporary security token for session-based authentication. This is optional and should be provided when using session-based credentials. Returns: -------- s3fs.S3FileSystem - An instance of the S3 file system configured with the specified endpoint and + An instance of the S3 file system configured with the specified endpoint and credentials, ready to interact with S3-compatible storage. Environment Variables: @@ -48,15 +47,13 @@ def get_file_system(token=None) -> s3fs.S3FileSystem: options = { "client_kwargs": {"endpoint_url": f"https://{os.environ['AWS_S3_ENDPOINT']}"}, "key": os.environ["AWS_ACCESS_KEY_ID"], - "secret": os.environ["AWS_SECRET_ACCESS_KEY"] + "secret": os.environ["AWS_SECRET_ACCESS_KEY"], } if token is not None: - options['token'] = token + options["token"] = token - return s3fs.S3FileSystem( - **options - ) + return s3fs.S3FileSystem(**options) def cache_model_from_hf_hub( @@ -64,7 +61,7 @@ def cache_model_from_hf_hub( s3_bucket="models-hf", s3_cache_dir="hf_hub", s3_token=None, - hf_token=None + hf_token=None, ): """Use S3 as proxy cache from HF hub if a model is not already cached locally. @@ -73,9 +70,7 @@ def cache_model_from_hf_hub( s3_bucket (str): Name of the S3 bucket to use. s3_cache_dir (str): Path of the cache directory on S3. """ - assert ( - "MC_HOST_s3" in os.environ - ), "Please set the MC_HOST_s3 environment variable." + assert "MC_HOST_s3" in os.environ, "Please set the MC_HOST_s3 environment variable." # Local cache config LOCAL_HF_CACHE_DIR = os.path.join(os.path.expanduser("~"), ".cache", "huggingface", "hub") @@ -84,39 +79,27 @@ def cache_model_from_hf_hub( # Remote cache config fs = get_file_system(token=s3_token) - available_models_s3 = [ - os.path.basename(path) for path in fs.ls(os.path.join(s3_bucket, s3_cache_dir)) - ] + available_models_s3 = [os.path.basename(path) for path in fs.ls(os.path.join(s3_bucket, s3_cache_dir))] dir_model_s3 = os.path.join(s3_bucket, s3_cache_dir, model_name_hf_cache) if model_name_hf_cache not in os.listdir(LOCAL_HF_CACHE_DIR): # Try fetching from S3 if available if model_name_hf_cache in available_models_s3: print(f"Fetching model {model_name} from S3.") - cmd = [ - "mc", - "cp", - "-r", - f"s3/{dir_model_s3}", - f"{LOCAL_HF_CACHE_DIR}/" - ] + cmd = ["mc", "cp", "-r", f"s3/{dir_model_s3}", f"{LOCAL_HF_CACHE_DIR}/"] with open("/dev/null", "w") as devnull: subprocess.run(cmd, check=True, stdout=devnull, stderr=devnull) # Else, fetch from HF Hub and push to S3 else: print(f"Model {model_name} not found on S3, fetching from HF hub.") - AutoModelForCausalLM.from_pretrained( - model_name, - torch_dtype="auto", - token=hf_token - ) + AutoModelForCausalLM.from_pretrained(model_name, torch_dtype="auto", token=hf_token) print(f"Putting model {model_name} on S3.") cmd = [ "mc", "cp", "-r", f"{dir_model_local}/", - f"s3/{dir_model_s3}", + f"s3/{dir_model_s3}", ] with open("/dev/null", "w") as devnull: subprocess.run(cmd, check=True, stdout=devnull, stderr=devnull) diff --git a/src/reranking/reranking_functions.py b/src/reranking/reranking_functions.py index 5c3165d..0d88cd3 100644 --- a/src/reranking/reranking_functions.py +++ b/src/reranking/reranking_functions.py @@ -1,257 +1,13 @@ from collections.abc import Any, Sequence -import numpy as np -import torch -import torch.nn.functional as F from langchain.schema import Document from langchain_community.retrievers import BM25Retriever # Define the compression function -def compress_documents_lambda( - documents: Sequence[Document], query: str, k: int = 5, **kwargs: dict[str, Any] -) -> Sequence[Document]: +def compress_documents_lambda(documents: Sequence[Document], query: str, k: int = 5, **kwargs: dict[str, Any]) -> Sequence[Document]: """Compress retrieved documents given the query context.""" # Initialize the retriever with the documents retriever = BM25Retriever.from_documents(documents, k=k, **kwargs) return retriever.get_relevant_documents(query) - - -###### LLM Reraner functions ###### - - -def expected_relevance_values(logits, grades_token_ids, list_grades): - next_token_logits = logits[:, -1, :] - next_token_logits = next_token_logits.cpu()[0] - probabilities = F.softmax(next_token_logits[grades_token_ids], dim=-1).numpy() - return np.dot(np.array(list_grades), probabilities) - - -def peak_relevance_likelihood(logits, grades_token_ids, list_grades): - index_max_grade = np.array(list_grades).argmax() - next_token_logits = logits[:, -1, :] - probabilities = F.softmax(next_token_logits, dim=-1).cpu().numpy()[0] - return probabilities[grades_token_ids[index_max_grade]] - - -def compute_sequence_log_probs(tokenizer, model, sequence): - # Tokenize the input sequence - inputs = tokenizer(sequence, return_tensors="pt") - input_ids = inputs["input_ids"] - - # Move input tensors to the same device as the model - inputs = inputs.to(model.device) - - # Get the logits from the model - with torch.no_grad(): - outputs = model(**inputs) - logits = outputs.logits - - # Calculate the probabilities - log_probs = F.log_softmax(logits, dim=-1) - - # Compute the probability of the sequence - sequence_probability = 0.0 - for i in range(len(input_ids[0]) - 1): - token_id = input_ids[0][i + 1] - sequence_probability += log_probs[0, i, token_id].item() - - return sequence_probability - - -def RG_S(tokenizer, model, query, document, aggregating_method, k=5): - - list_grades = list(range(k)) - grades_token_ids = [tokenizer(str(grade))["input_ids"][1] for grade in list_grades] - - RG_S_template = """ - Sur une échelle de 0 à {k}, jugez la pertinence entre la requête et le document. - Requête : {query} - Document : {document} - Réponse : """ - - messages = [ - { - "role": "system", - "content": "Tu es un assistant chatbot expert en Statistique Publique.", - }, - { - "role": "user", - "content": RG_S_template.format(query=query, document=document, k=k), - }, - ] - - input_text = tokenizer.apply_chat_template( - messages, add_generation_prompt=True, tokenize=False - ) - - inputs = tokenizer(input_text, return_tensors="pt").to(model.device) - - with torch.no_grad(): - outputs = model(**inputs) - logits = outputs.logits - - return aggregating_method(logits, grades_token_ids, list_grades) - - -def RG_4L(tokenizer, model, query, document, args): - possible_judgements = [ - " Parfaitement Pertinent", - " Très Pertinent", - " Assez Pertinent", - " Non Pertinent", - ] - list_grades = np.array([3, 2, 1, 0]) - RG_4L_template = """ - Evaluez la pertinence du document donné par rapport à la question posée. - Répondez uniquement parmi : Parfaitement Pertinent, Très Pertinent, Assez Pertinent ou Non Pertinent. - Requête : {query} - Document : {document} - Réponse : {judgement}""" - - messages = [ - { - "role": "system", - "content": "Tu es un assistant chatbot expert en Statistique Publique.", - }, - {"role": "user", "content": RG_4L_template}, - ] - - log_probs = [] - for judgement in possible_judgements: - input_text = tokenizer.apply_chat_template( - messages, add_generation_prompt=False, tokenize=False - ).format(query=query, document=document, judgement=judgement) - log_probs.append(compute_sequence_log_probs(sequence=input_text)) - - probs = F.softmax(torch.tensor(log_probs), dim=-1).numpy() - return np.dot(probs, list_grades) - - -def RG_3L(tokenizer, model, query, document, args): - possible_judgements = [" Très Pertinent", " Assez Pertinent", " Non Pertinent"] - list_grades = np.array([2, 1, 0]) - RG_3L_template = """ - Evaluez la pertinence du document donné par rapport à la question posée. - Répondez uniquement parmi : Très Pertinent, Assez Pertinent ou Non Pertinent. - Requête : {query} - Document : {document} - Réponse : {judgement}""" - - messages = [ - { - "role": "system", - "content": "Tu es un assistant chatbot expert en Statistique Publique.", - }, - {"role": "user", "content": RG_3L_template}, - ] - - log_probs = [] - for judgement in possible_judgements: - input_text = tokenizer.apply_chat_template( - messages, add_generation_prompt=False, tokenize=False - ).format(query=query, document=document, judgement=judgement) - log_probs.append(compute_sequence_log_probs(sequence=input_text)) - - probs = F.softmax(torch.tensor(log_probs), dim=-1).numpy() - return np.dot(probs, list_grades) - - -def RG_YN(tokenizer, model, query, document, aggregating_method): - list_judgements = [" Oui", " Non"] - grades_token_ids = [tokenizer(j)["input_ids"][1] for j in list_judgements] - list_grades = [1, 0] - - RG_YN_template = """ - Pour la requête et le document suivants, jugez s'ils sont pertinents. Répondez UNIQUEMENT par Oui ou Non. - Requête : {query} - Document : {document} - Réponse : """ - - messages = [ - { - "role": "system", - "content": "Tu es un assistant chatbot expert en Statistique Publique.", - }, - { - "role": "user", - "content": RG_YN_template.format(query=query, document=document), - }, - ] - - input_text = tokenizer.apply_chat_template( - messages, add_generation_prompt=True, tokenize=False - ) - - inputs = tokenizer(input_text, return_tensors="pt").to(model.device) - - with torch.no_grad(): - outputs = model(**inputs) - logits = outputs.logits - - return aggregating_method(logits, grades_token_ids, list_grades) - - -def llm_reranking( - tokenizer, model, query, retrieved_documents, assessing_method, aggregating_method -): - docs_content = ( - retrieved_documents.copy() - ) # [doc.page_content for doc in retrieved_documents] - - scores = [] - for document in docs_content: - score = assessing_method(query, document, aggregating_method) - scores.append(score) - - docs_with_scores = list(zip(retrieved_documents, scores, strict=False)) - docs_with_scores.sort(key=lambda x: x[1], reverse=True) - sorted_documents = [doc for doc, score in docs_with_scores] # docs_with_scores - return sorted_documents - - -""" -def compute_proba_judgement(sequence, judgement): - # Tokenize the input sequence and judgement - inputs = tokenizer(sequence, return_tensors='pt') - input_ids = inputs['input_ids'] - judgement_ids = tokenizer(judgement, return_tensors='pt')['input_ids'][0][1:] - - # print("Input IDs:", input_ids) - # print("Judgement IDs:", judgement_ids) - - # Move input tensors to the same device as the model - input_ids = input_ids.to(model.device) - - # Get the logits from the model - with torch.no_grad(): - outputs = model(**inputs) - logits = outputs.logits - - # Calculate the probabilities - log_probs = F.log_softmax(logits, dim=-1) - probs = F.softmax(logits, dim=-1) - - # Convert input IDs tensor to list for indexing - input_ids_list = input_ids[0].tolist()[1:] - judgement_ids_list = judgement_ids.tolist() - - start_index = find_sublist_indices(main_list=input_ids_list, sublist=judgement_ids_list) - #print("Start index:", start_index, "End index:", end_index) - - if start_index == -1 or end_index == -1: - raise ValueError("Judgement sublist not found in the input sequence") - - # Compute the probability of the sequence - sequence_probability = 0.0 - list_indexes = list(range(0, start_index + len(judgement_ids_list), 1)) - for i in list_indexes: - token_id = input_ids_list[i] - token_log_prob = log_probs[0, i, token_id].item() - token_prob = probs[0, i, token_id].item() - print(" Token:", tokenizer.decode([token_id]),"/ Probability:", token_prob * 100, "%") - sequence_probability += token_log_prob - - return sequence_probability/len(list_indexes) -""" diff --git a/src/results_logging/log_conversations.py b/src/results_logging/log_conversations.py index 17ce46a..b039688 100644 --- a/src/results_logging/log_conversations.py +++ b/src/results_logging/log_conversations.py @@ -56,9 +56,7 @@ def log_qa_to_s3( json.dump(log_entry, file_out, indent=4) -def log_feedback_to_s3( - thread_id: str, message_id: str, feedback_value: int, feedback_comment: str -): +def log_feedback_to_s3(thread_id: str, message_id: str, feedback_value: int, feedback_comment: str): today_date = datetime.now().strftime("%Y-%m-%d") target_path_s3 = os.path.join(DIR_LOGS_S3, today_date, f"{thread_id}.json") diff --git a/src/utils/formatting_utilities.py b/src/utils/formatting_utilities.py index 83a005a..7e8e47b 100644 --- a/src/utils/formatting_utilities.py +++ b/src/utils/formatting_utilities.py @@ -19,11 +19,7 @@ def add_sources_to_messages(message: str, sources: list, titles: list, topk: int - topk (int) : number of displayed sources. """ if len(sources) == len(titles): - sources_titles = [ - f"{i+1}. {title} ({source})" - for i, (source, title) in enumerate(zip(sources, titles, strict=False)) - if i < topk - ] + sources_titles = [f"{i+1}. {title} ({source})" for i, (source, title) in enumerate(zip(sources, titles, strict=False)) if i < topk] formatted_sources = f"\n\nSources (Top {topk}):\n" + "\n".join(sources_titles) message += formatted_sources else: