Skip to content

Commit

Permalink
Commit Elastic Vector Store
Browse files Browse the repository at this point in the history
Commit Elastic Vector Store
  • Loading branch information
gopi0518 committed Sep 23, 2024
1 parent 6a64171 commit a0e5220
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 389 deletions.
Binary file modified .DS_Store
Binary file not shown.
Binary file modified img/.DS_Store
Binary file not shown.
Binary file modified img/DataAugmentation.jpeg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified img/DataInference.jpeg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
34 changes: 12 additions & 22 deletions services/asyngenaichat.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import socketio
import pymongo
import confluent_kafka
from langchain_elasticsearch import ElasticsearchStore
from confluent_kafka import DeserializingConsumer
from langchain.chains.conversation.memory import ConversationBufferMemory
from confluent_kafka import SerializingProducer
Expand All @@ -25,10 +26,6 @@
from langchain.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from langchain.chains import LLMChain
from linkedin import scrape_linkedin_profile
from linkedin_lookup_agent import lookup as linkedin_lookup_agent
#from tools.linkedin import scrape_linkedin_profile
#from tools.linkedin_lookup_agent import lookup as linkedin_lookup_agent
# General
import json
import os
Expand All @@ -38,15 +35,6 @@
from langchain.chains import RetrievalQA
from langchain.chat_models import ChatOpenAI
OPENAIKEY = os.environ["OPENAI_API_KEY"]
# mongo connection
MONGO_URI = os.environ["MONGO_URI"]
client = pymongo.MongoClient(MONGO_URI)
db = client.genai
docscollection = db.docs_embeddings_v1
chatcollection = db.chatbotreq
DB_NAME = "genai"
COLLECTION_NAME = "doc_embeddings"
ATLAS_VECTOR_SEARCH_INDEX_NAME = "vector_index"
llm = ChatOpenAI(
api_key=os.environ["OPENAI_API_KEY"],
model="gpt-4o",
Expand All @@ -64,7 +52,7 @@
)
# initialize socketio client
# sio = socketio.Client(logger=True, engineio_logger=True)
# sio.connect('http://localhost:5001')
# sio.connect('http://ec2-3-87-26-1.compute-1.amazonaws.com:5001')
args = ccloud_lib.parse_args()
config_file = args.config_file
chatbotreqtopic = args.chatbotreqtopic
Expand All @@ -82,18 +70,20 @@
chatbotresfinal_producer_conf = ccloud_lib.pop_schema_registry_params_from_config(confproducer)
delivered_records = 0
fundnames = ""
embeddings = OpenAIEmbeddings(model="text-embedding-ada-002")
# Mongo RAG
def create_vector_search():
"""
Creates a MongoDBAtlasVectorSearch object using the connection string, database, and collection names, along with the OpenAI embeddings and index configuration.
"""
vector_search = MongoDBAtlasVectorSearch.from_connection_string(
MONGO_URI,
f"{DB_NAME}.{COLLECTION_NAME}",
OpenAIEmbeddings(),
index_name=ATLAS_VECTOR_SEARCH_INDEX_NAME
elastic_vector_search = ElasticsearchStore(
es_cloud_id="<<ELASTIC_CLOUD>>",
index_name="doc_embeddings_elastic_v2",
embedding=embeddings,
es_user="elastic",
es_password="<<ELASTIC_PASSWORD>>",
)
return vector_search
return elastic_vector_search
message_count = 0
waiting_count = 0
# produce response
Expand Down Expand Up @@ -139,7 +129,7 @@ def acked(err, msg):
else:
fundnames = "None"
print(fundnames)
if not (fundnames=="None") and (len(fundnames.strip())>0):
if (fundnames.find("None")==-1) and (len(fundnames.strip())>0):
print("reached this loop")
print(fundnames)
chatbotres_object.query = query
Expand Down Expand Up @@ -176,7 +166,7 @@ def perform_question_answering(query):
)
qa_retriever = vector_search.as_retriever(
search_type="similarity",
search_kwargs={"k": 100}
search_kwargs={"k": 10}
)
template = """
Use the following context (delimited by <ctx></ctx>) and the chat history (delimited by <hs></hs>) to answer the user question.If you don't know the answer, just say that you don't know, don't try to make up an answer. If the answer is valid, from the answer, extract stock symbols:
Expand Down
189 changes: 1 addition & 188 deletions services/asyngenaichatres.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,47 +25,15 @@
from langchain.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from langchain.chains import LLMChain
from linkedin import scrape_linkedin_profile
from linkedin_lookup_agent import lookup as linkedin_lookup_agent
#from tools.linkedin import scrape_linkedin_profile
#from tools.linkedin_lookup_agent import lookup as linkedin_lookup_agent
# General
import json
import os
from langchain_openai.embeddings import OpenAIEmbeddings
from langchain.llms import OpenAI
from langchain.vectorstores import MongoDBAtlasVectorSearch
from langchain.chains import RetrievalQA
from langchain.chat_models import ChatOpenAI
OPENAIKEY = os.environ["OPENAI_API_KEY"]
# mongo connection
MONGO_URI = os.environ["MONGO_URI"]
client = pymongo.MongoClient(MONGO_URI)
db = client.genai
docscollection = db.docs_embeddings_v1
chatcollection = db.chatbotreq
DB_NAME = "genai"
COLLECTION_NAME = "doc_embeddings"
ATLAS_VECTOR_SEARCH_INDEX_NAME = "vector_index"
llm = ChatOpenAI(
api_key=os.environ["OPENAI_API_KEY"],
model="gpt-4o",
temperature=0,
max_tokens=None,
timeout=None,
max_retries=2
)
# memory
memory = ConversationBufferMemory(
memory_key="chat_history",
input_key="question",
output_key='answer',
return_messages=True
)
# initialize socketio client
sio = socketio.Client(logger=True, engineio_logger=True)
sio.sleep(0)
sio.connect('http://localhost:5001')
sio.connect('http://ec2-54-167-196-238.compute-1.amazonaws.com:5001')
args = ccloud_lib.parse_args()
config_file = args.config_file
chatbotreqtopic = args.chatbotreqtopic
Expand All @@ -82,163 +50,8 @@
producer_conf = ""
producer_conf = ccloud_lib.pop_schema_registry_params_from_config(confproducer)
delivered_records = 0
# Mongo RAG
def create_vector_search():
"""
Creates a MongoDBAtlasVectorSearch object using the connection string, database, and collection names, along with the OpenAI embeddings and index configuration.
"""
vector_search = MongoDBAtlasVectorSearch.from_connection_string(
MONGO_URI,
f"{DB_NAME}.{COLLECTION_NAME}",
OpenAIEmbeddings(),
index_name=ATLAS_VECTOR_SEARCH_INDEX_NAME
)
return vector_search
message_count = 0
waiting_count = 0
# produce response
def publish_chatbotres(query,userid,context,session_id,answer):
chatbotres_avro_serializer = AvroSerializer(
schema_registry_client = schema_registry_client,
schema_str = ccloud_lib.chatbotres_schema,
to_dict = ccloud_lib.Chatbotres.chatbotres_to_dict)
try:
def acked(err, msg):
global delivered_records
"""
Delivery report handler called on successful or failed delivery of message
"""
if err is not None:
print("Failed to deliver message: {}".format(err))
else:
delivered_records += 1
print("Produced record to topic {} partition [{}] @ offset {}"
.format(msg.topic(), msg.partition(), msg.offset()))
producer_conf["value.serializer"] = chatbotres_avro_serializer
producer = SerializingProducer(producer_conf)
chatbotres_object = ccloud_lib.Chatbotres()
chatbotres_object.query = query
chatbotres_object.loginname = userid
chatbotres_object.context = context
chatbotres_object.session_id = session_id
chatbotres_object.answer = answer
fundnames = answer.partition("Fund Names and Stock Symbols:")[2]
print(fundnames)
chatbotres_object.fundnames = fundnames
producer.produce(topic=chatbotrestopic, value=chatbotres_object, on_delivery=acked)
producer.poll(0)
producer.flush()
except Exception as e:
print("An error occured:", e)
def process_chatbot_res(msg):
message_count = 0
chatbotreq_object = msg.value()
if chatbotreq_object is not None:
question = chatbotreq_object.query
message_count = message_count + 1
if question is not None:
print(
"Consumed record with value {}, Total processed rows {}".format(
question, message_count
)
)
message_count = message_count + 1
message = (
"Search for information: "
+ str(question)
+ " with genAI!"
)
# Here start with genAI
print("Hello LangChain!")
try:
answer, sources = perform_question_answering(question)
print(f"Question: {question}")
print("Answer:", answer)
sio.emit("data",answer)
# print("Source Documents:", sources)
# produce data back
except Exception as e:
print("An error occured:", e)
def process_chatbot_req(msg):
message_count = 0
chatbotreq_object = msg.value()
if chatbotreq_object is not None:
question = chatbotreq_object.query
message_count = message_count + 1
if question is not None:
print(
"Consumed record with value {}, Total processed rows {}".format(
question, message_count
)
)
message_count = message_count + 1
message = (
"Search for information: "
+ str(question)
+ " with genAI!"
)
# Here start with genAI
print("Hello LangChain!")
try:
answer, sources = perform_question_answering(question)
print(f"Question: {question}")
print("Answer:", answer)
sio.emit("data",answer)
# print("Source Documents:", sources)
# produce data back
except Exception as e:
print("An error occured:", e)
def perform_question_answering(query):
"""
This function uses a retriever and a language model to answer a query based on the context from documents.
"""
vector_search = create_vector_search()

# Setup the vector search as a retriever for finding similar documents
qa_retriever_old = vector_search.as_retriever(
search_type="similarity",
search_kwargs={"k": 100, "post_filter_pipeline": [{"$limit": 1}]}
)
qa_retriever = vector_search.as_retriever(
search_type="similarity",
search_kwargs={"k": 100}
)
template = """
Use the following context (delimited by <ctx></ctx>) and the chat history (delimited by <hs></hs>) to answer the question and also list the fund names and stock symbols mentioned in the answer delimited by comma(,):
------
<ctx>
{context}
</ctx>
------
<hs>
{history}
</hs>
------
{question}
Answer:
"""
PROMPT = PromptTemplate(
template=template, input_variables=["context", "chat_history","question"]
)

qa = RetrievalQA.from_chain_type(
# llm=OpenAI(max_tokens=100),
llm=llm,
chain_type="stuff",
retriever=qa_retriever,
return_source_documents=True,
chain_type_kwargs={
"verbose": False,
"prompt": PROMPT,
"memory": ConversationBufferMemory(
memory_key="history",
input_key="question"),
}
)

docs = qa({"query": query})

return docs["result"], docs['source_documents']
if __name__ == "__main__":
# Read arguments and configurations and initialize
args = ccloud_lib.parse_args()
Expand Down
Loading

0 comments on commit a0e5220

Please sign in to comment.