-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathquerying.py
67 lines (53 loc) · 2.44 KB
/
querying.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from haystack import Pipeline
from haystack.components.embedders import OpenAITextEmbedder
from haystack.utils import Secret
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator
from haystack.components.readers import ExtractiveReader
from haystack.document_stores.in_memory import InMemoryDocumentStore
from dotenv import load_dotenv
import os
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
load_dotenv(".env")
open_ai_key = os.environ.get("OPENAI_API_KEY")
from haystack import Pipeline
from haystack.components.embedders import OpenAITextEmbedder
from haystack.utils import Secret
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator
class RetrieveDocuments:
def __init__(self, doc_store, open_ai_key):
# Initialize components
text_embedder = OpenAITextEmbedder(api_key=Secret.from_token(open_ai_key))
retriever = InMemoryEmbeddingRetriever(document_store=doc_store)
reader = ExtractiveReader()
reader.warm_up()
# Build the pipeline
self.query_pipeline = Pipeline()
self.query_pipeline.add_component("embedder",text_embedder)
self.query_pipeline.add_component("retriever", retriever)
self.query_pipeline.add_component("reader", reader)
# Connect components
self.query_pipeline.connect("embedder.embedding", "retriever.query_embedding")
self.query_pipeline.connect("retriever.documents", "reader.documents")
def run(self, query, symbols):
logger.info(f"Running query pipeline with query: {query}")
self.query_pipeline.draw("query_pipeline.png")
# Pass query through the pipeline
response = self.query_pipeline.run(
data={"embedder": {"text": query},
"retriever": {"top_k": 3},
"reader": {"query": query, "top_k": 2}}
)
logger.info(f"Response: {response}")
return response #["llm"]["replies"][0]
# load_dotenv(".env")
# OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
# document_store = InMemoryDocumentStore()
# query_pipeline = RetrieveDocuments(doc_store=document_store, open_ai_key=OPENAI_API_KEY)
# # Running the pipeline
# question = "Tell me about what you know"
# response = query_pipeline.run(question, "AAPL")