From 8fa9657ba6d427980540bc235cfe4f0f4494205e Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 27 Sep 2023 16:25:57 +0100 Subject: [PATCH] working full --- application/api/answer/routes.py | 226 +++++++++++------------------ application/api/internal/routes.py | 7 +- application/api/user/routes.py | 9 +- application/api/user/tasks.py | 7 + application/app.py | 16 +- application/celery.py | 7 +- application/core/settings.py | 1 + application/vectorstore/base.py | 45 ++++++ application/vectorstore/faiss.py | 15 ++ application/worker.py | 5 + 10 files changed, 178 insertions(+), 160 deletions(-) create mode 100644 application/api/user/tasks.py create mode 100644 application/vectorstore/faiss.py diff --git a/application/api/answer/routes.py b/application/api/answer/routes.py index 3755eb5bc..8f64a8f91 100644 --- a/application/api/answer/routes.py +++ b/application/api/answer/routes.py @@ -1,43 +1,24 @@ import asyncio import os -from flask import Blueprint, request, jsonify, Response -import requests +from flask import Blueprint, request, Response import json import datetime import logging import traceback -from celery.result import AsyncResult from pymongo import MongoClient from bson.objectid import ObjectId from transformers import GPT2TokenizerFast -from langchain import FAISS -from langchain import VectorDBQA, Cohere, OpenAI -from langchain.chains import LLMChain, ConversationalRetrievalChain -from langchain.chains.conversational_retrieval.prompts import CONDENSE_QUESTION_PROMPT -from langchain.chains.question_answering import load_qa_chain -from langchain.chat_models import ChatOpenAI, AzureChatOpenAI -from langchain.embeddings import ( - OpenAIEmbeddings, - HuggingFaceHubEmbeddings, - CohereEmbeddings, - HuggingFaceInstructEmbeddings, -) -from langchain.prompts import PromptTemplate -from langchain.prompts.chat import ( - ChatPromptTemplate, - SystemMessagePromptTemplate, - HumanMessagePromptTemplate, - AIMessagePromptTemplate, -) -from langchain.schema import HumanMessage, AIMessage + from application.core.settings import settings -from application.llm.openai import OpenAILLM -from application.core.settings import settings +from application.llm.openai import OpenAILLM, AzureOpenAILLM +from application.vectorstore.faiss import FaissStore from application.error import bad_request + + logger = logging.getLogger(__name__) mongo = MongoClient(settings.MONGO_URI) @@ -125,21 +106,21 @@ def get_vectorstore(data): return vectorstore -def get_docsearch(vectorstore, embeddings_key): - if settings.EMBEDDINGS_NAME == "openai_text-embedding-ada-002": - if is_azure_configured(): - os.environ["OPENAI_API_TYPE"] = "azure" - openai_embeddings = OpenAIEmbeddings(model=settings.AZURE_EMBEDDINGS_DEPLOYMENT_NAME) - else: - openai_embeddings = OpenAIEmbeddings(openai_api_key=embeddings_key) - docsearch = FAISS.load_local(vectorstore, openai_embeddings) - elif settings.EMBEDDINGS_NAME == "huggingface_sentence-transformers/all-mpnet-base-v2": - docsearch = FAISS.load_local(vectorstore, HuggingFaceHubEmbeddings()) - elif settings.EMBEDDINGS_NAME == "huggingface_hkunlp/instructor-large": - docsearch = FAISS.load_local(vectorstore, HuggingFaceInstructEmbeddings()) - elif settings.EMBEDDINGS_NAME == "cohere_medium": - docsearch = FAISS.load_local(vectorstore, CohereEmbeddings(cohere_api_key=embeddings_key)) - return docsearch +# def get_docsearch(vectorstore, embeddings_key): +# if settings.EMBEDDINGS_NAME == "openai_text-embedding-ada-002": +# if is_azure_configured(): +# os.environ["OPENAI_API_TYPE"] = "azure" +# openai_embeddings = OpenAIEmbeddings(model=settings.AZURE_EMBEDDINGS_DEPLOYMENT_NAME) +# else: +# openai_embeddings = OpenAIEmbeddings(openai_api_key=embeddings_key) +# docsearch = FAISS.load_local(vectorstore, openai_embeddings) +# elif settings.EMBEDDINGS_NAME == "huggingface_sentence-transformers/all-mpnet-base-v2": +# docsearch = FAISS.load_local(vectorstore, HuggingFaceHubEmbeddings()) +# elif settings.EMBEDDINGS_NAME == "huggingface_hkunlp/instructor-large": +# docsearch = FAISS.load_local(vectorstore, HuggingFaceInstructEmbeddings()) +# elif settings.EMBEDDINGS_NAME == "cohere_medium": +# docsearch = FAISS.load_local(vectorstore, CohereEmbeddings(cohere_api_key=embeddings_key)) +# return docsearch def is_azure_configured(): @@ -148,7 +129,7 @@ def is_azure_configured(): def complete_stream(question, docsearch, chat_history, api_key, conversation_id): if is_azure_configured(): - llm = AzureChatOpenAI( + llm = AzureOpenAILLM( openai_api_key=api_key, openai_api_base=settings.OPENAI_API_BASE, openai_api_version=settings.OPENAI_API_VERSION, @@ -158,7 +139,7 @@ def complete_stream(question, docsearch, chat_history, api_key, conversation_id) logger.debug("plain OpenAI") llm = OpenAILLM(api_key=api_key) - docs = docsearch.similarity_search(question, k=2) + docs = docsearch.search(question, k=2) # join all page_content together with a newline docs_together = "\n".join([doc.page_content for doc in docs]) p_chat_combine = chat_combine_template.replace("{summaries}", docs_together) @@ -253,9 +234,8 @@ def stream(): vectorstore = get_vectorstore({"active_docs": data["active_docs"]}) else: vectorstore = "" - docsearch = get_docsearch(vectorstore, embeddings_key) + docsearch = FaissStore(vectorstore, embeddings_key) - # question = "Hi" return Response( complete_stream(question, docsearch, chat_history=history, api_key=api_key, @@ -288,97 +268,55 @@ def api_answer(): vectorstore = get_vectorstore(data) # loading the index and the store and the prompt template # Note if you have used other embeddings than OpenAI, you need to change the embeddings - docsearch = get_docsearch(vectorstore, embeddings_key) + docsearch = FaissStore(vectorstore, embeddings_key) - q_prompt = PromptTemplate( - input_variables=["context", "question"], template=template_quest, template_format="jinja2" - ) - if settings.LLM_NAME == "openai_chat": - if is_azure_configured(): - logger.debug("in Azure") - llm = AzureChatOpenAI( - openai_api_key=api_key, - openai_api_base=settings.OPENAI_API_BASE, - openai_api_version=settings.OPENAI_API_VERSION, - deployment_name=settings.AZURE_DEPLOYMENT_NAME, - ) - else: - logger.debug("plain OpenAI") - llm = ChatOpenAI(openai_api_key=api_key, model_name=gpt_model) # optional parameter: model_name="gpt-4" - messages_combine = [SystemMessagePromptTemplate.from_template(chat_combine_template)] - if history: - tokens_current_history = 0 - # count tokens in history - history.reverse() - for i in history: - if "prompt" in i and "response" in i: - tokens_batch = count_tokens(i["prompt"]) + count_tokens(i["response"]) - if tokens_current_history + tokens_batch < settings.TOKENS_MAX_HISTORY: - tokens_current_history += tokens_batch - messages_combine.append(HumanMessagePromptTemplate.from_template(i["prompt"])) - messages_combine.append(AIMessagePromptTemplate.from_template(i["response"])) - messages_combine.append(HumanMessagePromptTemplate.from_template("{question}")) - p_chat_combine = ChatPromptTemplate.from_messages(messages_combine) - elif settings.LLM_NAME == "openai": - llm = OpenAI(openai_api_key=api_key, temperature=0) - elif settings.SELF_HOSTED_MODEL: - llm = hf - elif settings.LLM_NAME == "cohere": - llm = Cohere(model="command-xlarge-nightly", cohere_api_key=api_key) - else: - raise ValueError("unknown LLM model") - - if settings.LLM_NAME == "openai_chat": - question_generator = LLMChain(llm=llm, prompt=CONDENSE_QUESTION_PROMPT) - doc_chain = load_qa_chain(llm, chain_type="map_reduce", combine_prompt=p_chat_combine) - chain = ConversationalRetrievalChain( - retriever=docsearch.as_retriever(k=2), - question_generator=question_generator, - combine_docs_chain=doc_chain, - ) - chat_history = [] - # result = chain({"question": question, "chat_history": chat_history}) - # generate async with async generate method - result = run_async_chain(chain, question, chat_history) - elif settings.SELF_HOSTED_MODEL: - question_generator = LLMChain(llm=llm, prompt=CONDENSE_QUESTION_PROMPT) - doc_chain = load_qa_chain(llm, chain_type="map_reduce", combine_prompt=p_chat_combine) - chain = ConversationalRetrievalChain( - retriever=docsearch.as_retriever(k=2), - question_generator=question_generator, - combine_docs_chain=doc_chain, + if is_azure_configured(): + llm = AzureOpenAILLM( + openai_api_key=api_key, + openai_api_base=settings.OPENAI_API_BASE, + openai_api_version=settings.OPENAI_API_VERSION, + deployment_name=settings.AZURE_DEPLOYMENT_NAME, ) - chat_history = [] - # result = chain({"question": question, "chat_history": chat_history}) - # generate async with async generate method - result = run_async_chain(chain, question, chat_history) - else: - qa_chain = load_qa_chain( - llm=llm, chain_type="map_reduce", combine_prompt=chat_combine_template, question_prompt=q_prompt - ) - chain = VectorDBQA(combine_documents_chain=qa_chain, vectorstore=docsearch, k=3) - result = chain({"query": question}) - - print(result) - - # some formatting for the frontend - if "result" in result: - result["answer"] = result["result"] - result["answer"] = result["answer"].replace("\\n", "\n") - try: - result["answer"] = result["answer"].split("SOURCES:")[0] - except Exception: - pass - - sources = docsearch.similarity_search(question, k=2) - sources_doc = [] - for doc in sources: + logger.debug("plain OpenAI") + llm = OpenAILLM(api_key=api_key) + + + + docs = docsearch.search(question, k=2) + # join all page_content together with a newline + docs_together = "\n".join([doc.page_content for doc in docs]) + p_chat_combine = chat_combine_template.replace("{summaries}", docs_together) + messages_combine = [{"role": "system", "content": p_chat_combine}] + source_log_docs = [] + for doc in docs: if doc.metadata: - sources_doc.append({'title': doc.metadata['title'], 'text': doc.page_content}) + source_log_docs.append({"title": doc.metadata['title'].split('/')[-1], "text": doc.page_content}) else: - sources_doc.append({'title': doc.page_content, 'text': doc.page_content}) - result['sources'] = sources_doc + source_log_docs.append({"title": doc.page_content, "text": doc.page_content}) + # join all page_content together with a newline + + + if len(history) > 1: + tokens_current_history = 0 + # count tokens in history + history.reverse() + for i in history: + if "prompt" in i and "response" in i: + tokens_batch = count_tokens(i["prompt"]) + count_tokens(i["response"]) + if tokens_current_history + tokens_batch < settings.TOKENS_MAX_HISTORY: + tokens_current_history += tokens_batch + messages_combine.append({"role": "user", "content": i["prompt"]}) + messages_combine.append({"role": "system", "content": i["response"]}) + messages_combine.append({"role": "user", "content": question}) + + + completion = llm.gen(model=gpt_model, engine=settings.AZURE_DEPLOYMENT_NAME, + messages=messages_combine) + + + result = {"answer": completion, "sources": source_log_docs} + logger.debug(result) # generate conversationId if conversation_id is not None: @@ -391,22 +329,22 @@ def api_answer(): else: # create new conversation # generate summary - messages_summary = [AIMessage(content="Summarise following conversation in no more than 3 " + - "words, respond ONLY with the summary, use the same " + - "language as the system \n\nUser: " + question + "\n\nAI: " + - result["answer"]), - HumanMessage(content="Summarise following conversation in no more than 3 words, " + - "respond ONLY with the summary, use the same language as the " + - "system")] - - # completion = openai.ChatCompletion.create(model='gpt-3.5-turbo', engine=settings.AZURE_DEPLOYMENT_NAME, - # messages=messages_summary, max_tokens=30, temperature=0) - completion = llm.predict_messages(messages_summary) + messages_summary = [{"role": "assistant", "content": "Summarise following conversation in no more than 3 " + "words, respond ONLY with the summary, use the same " + "language as the system \n\nUser: " + question + "\n\n" + + "AI: " + + result["answer"]}, + {"role": "user", "content": "Summarise following conversation in no more than 3 words, " + "respond ONLY with the summary, use the same language as the " + "system"}] + + completion = llm.gen(model=gpt_model, engine=settings.AZURE_DEPLOYMENT_NAME, + messages=messages_summary, max_tokens=30) conversation_id = conversations_collection.insert_one( {"user": "local", - "date": datetime.datetime.utcnow(), - "name": completion.content, - "queries": [{"prompt": question, "response": result["answer"], "sources": result['sources']}]} + "date": datetime.datetime.utcnow(), + "name": completion, + "queries": [{"prompt": question, "response": result["answer"], "sources": source_log_docs}]} ).inserted_id result["conversation_id"] = str(conversation_id) diff --git a/application/api/internal/routes.py b/application/api/internal/routes.py index 5092f9bef..13c44724a 100644 --- a/application/api/internal/routes.py +++ b/application/api/internal/routes.py @@ -12,13 +12,16 @@ conversations_collection = db["conversations"] vectors_collection = db["vectors"] +current_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + + internal = Blueprint('internal', __name__) @internal.route("/api/download", methods=["get"]) def download_file(): user = secure_filename(request.args.get("user")) job_name = secure_filename(request.args.get("name")) filename = secure_filename(request.args.get("file")) - save_dir = os.path.join(app.config["UPLOAD_FOLDER"], user, job_name) + save_dir = os.path.join(current_dir, settings.UPLOAD_FOLDER, user, job_name) return send_from_directory(save_dir, filename, as_attachment=True) @@ -46,7 +49,7 @@ def upload_index_files(): return {"status": "no file name"} # saves index files - save_dir = os.path.join("indexes", user, job_name) + save_dir = os.path.join(current_dir, "indexes", user, job_name) if not os.path.exists(save_dir): os.makedirs(save_dir) file_faiss.save(os.path.join(save_dir, "index.faiss")) diff --git a/application/api/user/routes.py b/application/api/user/routes.py index d661af415..f04631d23 100644 --- a/application/api/user/routes.py +++ b/application/api/user/routes.py @@ -6,6 +6,9 @@ from bson.objectid import ObjectId from werkzeug.utils import secure_filename import http.client +from celery.result import AsyncResult + +from application.api.user.tasks import ingest from application.core.settings import settings mongo = MongoClient(settings.MONGO_URI) @@ -14,6 +17,8 @@ vectors_collection = db["vectors"] user = Blueprint('user', __name__) +current_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + @user.route("/api/delete_conversation", methods=["POST"]) def delete_conversation(): # deletes a conversation from the database @@ -111,13 +116,13 @@ def upload_file(): if file: filename = secure_filename(file.filename) # save dir - save_dir = os.path.join(app.config["UPLOAD_FOLDER"], user, job_name) + save_dir = os.path.join(current_dir, settings.UPLOAD_FOLDER, user, job_name) # create dir if not exists if not os.path.exists(save_dir): os.makedirs(save_dir) file.save(os.path.join(save_dir, filename)) - task = ingest.delay("temp", [".rst", ".md", ".pdf", ".txt"], job_name, filename, user) + task = ingest.delay(settings.UPLOAD_FOLDER, [".rst", ".md", ".pdf", ".txt"], job_name, filename, user) # task id task_id = task.id return {"status": "ok", "task_id": task_id} diff --git a/application/api/user/tasks.py b/application/api/user/tasks.py new file mode 100644 index 000000000..a3474939a --- /dev/null +++ b/application/api/user/tasks.py @@ -0,0 +1,7 @@ +from application.worker import ingest_worker +from application.celery import celery + +@celery.task(bind=True) +def ingest(self, directory, formats, name_job, filename, user): + resp = ingest_worker(self, directory, formats, name_job, filename, user) + return resp diff --git a/application/app.py b/application/app.py index 55c8e5c3b..5941ed6f8 100644 --- a/application/app.py +++ b/application/app.py @@ -2,15 +2,14 @@ import dotenv -from celery import Celery +from application.celery import celery from flask import Flask, request, redirect -from pymongo import MongoClient from application.core.settings import settings -from application.worker import ingest_worker from application.api.user.routes import user from application.api.answer.routes import answer +from application.api.internal.routes import internal @@ -30,21 +29,22 @@ app = Flask(__name__) app.register_blueprint(user) app.register_blueprint(answer) +app.register_blueprint(internal) app.config["UPLOAD_FOLDER"] = UPLOAD_FOLDER = "inputs" app.config["CELERY_BROKER_URL"] = settings.CELERY_BROKER_URL app.config["CELERY_RESULT_BACKEND"] = settings.CELERY_RESULT_BACKEND app.config["MONGO_URI"] = settings.MONGO_URI -celery = Celery() +#celery = Celery() celery.config_from_object("application.celeryconfig") -@celery.task(bind=True) -def ingest(self, directory, formats, name_job, filename, user): - resp = ingest_worker(self, directory, formats, name_job, filename, user) - return resp +# @celery.task(bind=True) +# def ingest(self, directory, formats, name_job, filename, user): +# resp = ingest_worker(self, directory, formats, name_job, filename, user) +# return resp @app.route("/") diff --git a/application/celery.py b/application/celery.py index 461dc53e9..3b4819e73 100644 --- a/application/celery.py +++ b/application/celery.py @@ -1,10 +1,9 @@ from celery import Celery -from app import create_app +from application.core.settings import settings def make_celery(app_name=__name__): - app = create_app() - celery = Celery(app_name, broker=app.config['CELERY_BROKER_URL']) - celery.conf.update(app.config) + celery = Celery(app_name, broker=settings.CELERY_BROKER_URL) + celery.conf.update(settings) return celery celery = make_celery() diff --git a/application/core/settings.py b/application/core/settings.py index 08673475e..d127c293b 100644 --- a/application/core/settings.py +++ b/application/core/settings.py @@ -12,6 +12,7 @@ class Settings(BaseSettings): MODEL_PATH: str = "./models/gpt4all-model.bin" TOKENS_MAX_HISTORY: int = 150 SELF_HOSTED_MODEL: bool = False + UPLOAD_FOLDER: str = "inputs" API_URL: str = "http://localhost:7091" # backend url for celery worker diff --git a/application/vectorstore/base.py b/application/vectorstore/base.py index e69de29bb..cf3623e4c 100644 --- a/application/vectorstore/base.py +++ b/application/vectorstore/base.py @@ -0,0 +1,45 @@ +from abc import ABC, abstractmethod +import os +from langchain.embeddings import ( + OpenAIEmbeddings, + HuggingFaceHubEmbeddings, + CohereEmbeddings, + HuggingFaceInstructEmbeddings, +) +from application.core.settings import settings + +class BaseVectorStore(ABC): + def __init__(self): + pass + + @abstractmethod + def search(self, *args, **kwargs): + pass + + def is_azure_configured(self): + return settings.OPENAI_API_BASE and settings.OPENAI_API_VERSION and settings.AZURE_DEPLOYMENT_NAME + + def _get_docsearch(self, embeddings_name, embeddings_key=None): + embeddings_factory = { + "openai_text-embedding-ada-002": OpenAIEmbeddings, + "huggingface_sentence-transformers/all-mpnet-base-v2": HuggingFaceHubEmbeddings, + "huggingface_hkunlp/instructor-large": HuggingFaceInstructEmbeddings, + "cohere_medium": CohereEmbeddings + } + + if embeddings_name not in embeddings_factory: + raise ValueError(f"Invalid embeddings_name: {embeddings_name}") + + if embeddings_name == "openai_text-embedding-ada-002": + if self.is_azure_configured(): + os.environ["OPENAI_API_TYPE"] = "azure" + embedding_instance = embeddings_factory[embeddings_name](model=settings.AZURE_EMBEDDINGS_DEPLOYMENT_NAME) + else: + embedding_instance = embeddings_factory[embeddings_name](openai_api_key=embeddings_key) + elif embeddings_name == "cohere_medium": + embedding_instance = embeddings_factory[embeddings_name](cohere_api_key=embeddings_key) + else: + embedding_instance = embeddings_factory[embeddings_name]() + + return embedding_instance + diff --git a/application/vectorstore/faiss.py b/application/vectorstore/faiss.py new file mode 100644 index 000000000..9a562dce6 --- /dev/null +++ b/application/vectorstore/faiss.py @@ -0,0 +1,15 @@ +from application.vectorstore.base import BaseVectorStore +from langchain import FAISS +from application.core.settings import settings + +class FaissStore(BaseVectorStore): + + def __init__(self, path, embeddings_key): + super().__init__() + self.path = path + self.docsearch = FAISS.load_local( + self.path, self._get_docsearch(settings.EMBEDDINGS_NAME, settings.EMBEDDINGS_KEY) + ) + + def search(self, *args, **kwargs): + return self.docsearch.similarity_search(*args, **kwargs) diff --git a/application/worker.py b/application/worker.py index da955a7ec..91c19c309 100644 --- a/application/worker.py +++ b/application/worker.py @@ -27,6 +27,7 @@ def metadata_from_filename(title): def generate_random_string(length): return ''.join([string.ascii_letters[i % 52] for i in range(length)]) +current_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) def ingest_worker(self, directory, formats, name_job, filename, user): # directory = 'inputs' or 'temp' @@ -43,9 +44,13 @@ def ingest_worker(self, directory, formats, name_job, filename, user): min_tokens = 150 max_tokens = 1250 full_path = directory + '/' + user + '/' + name_job + import sys + print(full_path, file=sys.stderr) # check if API_URL env variable is set file_data = {'name': name_job, 'file': filename, 'user': user} response = requests.get(urljoin(settings.API_URL, "/api/download"), params=file_data) + # check if file is in the response + print(response, file=sys.stderr) file = response.content if not os.path.exists(full_path):