diff --git a/.gitignore b/.gitignore
index 9116245a..29753e14 100644
--- a/.gitignore
+++ b/.gitignore
@@ -162,4 +162,5 @@ cython_debug/
# Mac OS
**/.DS_Store
-datafiles/*
\ No newline at end of file
+datafiles/*
+canopy-api-docs.html
\ No newline at end of file
diff --git a/src/canopy/models/api_models.py b/src/canopy/models/api_models.py
index 964aa48d..53a93585 100644
--- a/src/canopy/models/api_models.py
+++ b/src/canopy/models/api_models.py
@@ -28,7 +28,7 @@ def calc_total_tokens(cls, v, values, **kwargs):
class ChatResponse(BaseModel):
- id: str
+ id: str = Field(description="Canopy session Id.")
object: str
created: int
model: str
diff --git a/src/canopy/models/data_models.py b/src/canopy/models/data_models.py
index 7d0281a2..05a941e5 100644
--- a/src/canopy/models/data_models.py
+++ b/src/canopy/models/data_models.py
@@ -11,35 +11,52 @@
class Query(BaseModel):
- text: str
- namespace: str = ""
- metadata_filter: Optional[dict] = None
- top_k: Optional[int] = None
- query_params: dict = Field(default_factory=dict)
+ text: str = Field(description="The query text.")
+ namespace: str = Field(
+ default="",
+ description="The namespace of the query, to learn more about namespaces, see https://docs.pinecone.io/docs/namespaces", # noqa: E501
+ )
+ metadata_filter: Optional[dict] = Field(
+ default=None,
+ description="A pinecone metadata filter, to learn more about metadata filters, see https://docs.pinecone.io/docs/metadata-filtering", # noqa: E501
+ )
+ top_k: Optional[int] = Field(
+ default=None,
+ description="[soon deprecated] The number of results to return."
+ )
+ query_params: dict = Field(
+ default_factory=dict,
+ description="Pinecone Client additional query parameters."
+ )
class Document(BaseModel):
- id: str
- text: str
- source: str = ""
- metadata: Metadata = Field(default_factory=dict)
+ id: str = Field(description="The document id.")
+ text: str = Field(description="The document text.")
+ source: str = Field(
+ default="",
+ description="The source of the document: a URL, a file path, etc."
+ )
+ metadata: Metadata = Field(
+ default_factory=dict,
+ description="The document metadata, to learn more about metadata, see https://docs.pinecone.io/docs/manage-data", # noqa: E501
+ )
class Config:
extra = Extra.forbid
- @validator('metadata')
+ @validator("metadata")
def metadata_reseved_fields(cls, v):
- if 'text' in v:
+ if "text" in v:
raise ValueError('Metadata cannot contain reserved field "text"')
- if 'document_id' in v:
+ if "document_id" in v:
raise ValueError('Metadata cannot contain reserved field "document_id"')
- if 'source' in v:
+ if "source" in v:
raise ValueError('Metadata cannot contain reserved field "source"')
return v
class ContextContent(BaseModel, ABC):
-
# Any context should be able to be represented as well formatted text.
# In the most minimal case, that could simply be a call to `.json()`.
@abstractmethod
@@ -59,6 +76,8 @@ def to_text(self, **kwargs) -> str:
return "\n".join([c.to_text(**kwargs) for c in self.content])
+ContextContentResponse = Union[ContextContent, Sequence[ContextContent]]
+
# --------------------- LLM models ------------------------
@@ -69,12 +88,12 @@ class Role(Enum):
class MessageBase(BaseModel):
- role: Role
- content: str
+ role: Role = Field(description="The role of the messages author.")
+ content: str = Field(description="The contents of the message.")
def dict(self, *args, **kwargs):
d = super().dict(*args, **kwargs)
- d['role'] = d['role'].value
+ d["role"] = d["role"].value
return d
diff --git a/src/canopy_cli/__init__.py b/src/canopy_cli/__init__.py
index e69de29b..a4791643 100644
--- a/src/canopy_cli/__init__.py
+++ b/src/canopy_cli/__init__.py
@@ -0,0 +1,27 @@
+HTML_TEMPLATE = """
+
+
+
+ Canopy API Spec
+
+
+
+
+
+
+
+
+ Redoc
+
+
+
+
+""" # noqa: E501
diff --git a/src/canopy_cli/cli.py b/src/canopy_cli/cli.py
index d0e9000d..d0b5c827 100644
--- a/src/canopy_cli/cli.py
+++ b/src/canopy_cli/cli.py
@@ -551,5 +551,29 @@ def stop(url):
raise CLIError(msg)
+@cli.command(
+ help=(
+ """
+ \b
+ Open the Canopy Server docs
+ """
+ )
+)
+def docs():
+ import json
+ from canopy_cli import HTML_TEMPLATE
+ from canopy_server.app import app
+ # generate docs
+
+ filename = "canopy-api-docs.html"
+
+ with open(filename, "w") as fd:
+ print(HTML_TEMPLATE % json.dumps(app.openapi()), file=fd)
+
+ import webbrowser
+
+ webbrowser.open('file://' + os.path.realpath(filename))
+
+
if __name__ == "__main__":
cli()
diff --git a/src/canopy_server/__init__.py b/src/canopy_server/__init__.py
index e69de29b..85a89cf2 100644
--- a/src/canopy_server/__init__.py
+++ b/src/canopy_server/__init__.py
@@ -0,0 +1,13 @@
+description = """
+Canopy is an open-source Retrieval Augmented Generation (RAG) framework and context engine built on top of the Pinecone vector database. Canopy enables you to quickly and easily experiment with and build applications using RAG. Start chatting with your documents or text data with a few simple commands.
+
+Canopy provides a configurable built-in server so you can effortlessly deploy a RAG-powered chat application to your existing chat UI or interface. Or you can build your own, custom RAG application using the Canopy lirbary.
+
+## Prerequisites
+
+### Pinecone API key
+To get Pinecone free trial API key and environment register or log into your Pinecone account in the console (https://app.pinecone.io/). You can access your API key from the "API Keys" section in the sidebar of your dashboard, and find the environment name next to it.
+
+### OpenAI API key
+You can find your free trial OpenAI API key https://platform.openai.com/account/api-keys. You might need to login or register to OpenAI services.
+""" # noqa: E501
diff --git a/src/canopy_server/api_models.py b/src/canopy_server/api_models.py
index dae422f5..c80d542f 100644
--- a/src/canopy_server/api_models.py
+++ b/src/canopy_server/api_models.py
@@ -1,15 +1,26 @@
from typing import Optional, List
-from pydantic import BaseModel
+from pydantic import BaseModel, Field
from canopy.models.data_models import Messages, Query, Document
class ChatRequest(BaseModel):
- model: str = ""
- messages: Messages
- stream: bool = False
- user: Optional[str] = None
+ model: str = Field(
+ default="",
+ description="ID of the model to use. If empty, the default model will be used.", # noqa: E501
+ )
+ messages: Messages = Field(
+ description="A list of messages comprising the conversation so far."
+ )
+ stream: bool = Field(
+ default=False,
+ description="""If set, partial message deltas will be sent, like in ChatGPT. Tokens will be sent as data-only server-sent events as they become available, with the stream terminated by a data: [DONE] message.""", # noqa: E501
+ )
+ user: Optional[str] = Field(
+ default=None,
+ description="A unique identifier representing your end-user, which can help OpenAI to monitor and detect abuse.", # noqa: E501
+ )
class ContextQueryRequest(BaseModel):
@@ -19,11 +30,13 @@ class ContextQueryRequest(BaseModel):
class ContextUpsertRequest(BaseModel):
documents: List[Document]
- batch_size: int = 200
+ batch_size: int = Field(
+ default=200, description="Batch size for upserting documents to Pinecone."
+ )
class ContextDeleteRequest(BaseModel):
- document_ids: List[str]
+ document_ids: List[str] = Field(description="List of document ids to delete.")
class HealthStatus(BaseModel):
@@ -38,5 +51,28 @@ class ChatDebugInfo(BaseModel):
prompt_tokens: Optional[int] = None
generated_tokens: Optional[int] = None
- def to_text(self,):
+ def to_text(
+ self,
+ ):
return self.json()
+
+
+class ShutdownResponse(BaseModel):
+ message: str = Field(
+ default="Shutting down",
+ description="Message indicating the server is shutting down.",
+ )
+
+
+class SuccessUpsertResponse(BaseModel):
+ message: str = Field(
+ default="Success",
+ description="Message indicating the upsert was successful.",
+ )
+
+
+class SuccessDeleteResponse(BaseModel):
+ message: str = Field(
+ default="Success",
+ description="Message indicating the delete was successful.",
+ )
diff --git a/src/canopy_server/app.py b/src/canopy_server/app.py
index bf34cb83..97b70881 100644
--- a/src/canopy_server/app.py
+++ b/src/canopy_server/app.py
@@ -22,19 +22,39 @@
import uvicorn
from typing import cast
-from canopy.models.api_models import StreamingChatResponse, ChatResponse
-from canopy.models.data_models import Context, UserMessage
-from .api_models import \
- ChatRequest, ContextQueryRequest, \
- ContextUpsertRequest, HealthStatus, ContextDeleteRequest
+from canopy.models.api_models import (
+ StreamingChatResponse,
+ ChatResponse,
+)
+from canopy.models.data_models import Context, UserMessage, ContextContentResponse
+from .api_models import (
+ ChatRequest,
+ ContextQueryRequest,
+ ContextUpsertRequest,
+ HealthStatus,
+ ContextDeleteRequest,
+ ShutdownResponse,
+ SuccessUpsertResponse,
+ SuccessDeleteResponse,
+)
from canopy.llm.openai import OpenAILLM
from canopy_cli.errors import ConfigError
+from canopy_server import description
+from canopy import __version__
load_dotenv() # load env vars before import of openai
openai.api_key = os.getenv("OPENAI_API_KEY")
-app = FastAPI()
+app = FastAPI(
+ title="Canopy API",
+ description=description,
+ version=__version__,
+ license_info={
+ "name": "Apache 2.0",
+ "url": "https://www.apache.org/licenses/LICENSE-2.0.html",
+ },
+)
context_engine: ContextEngine
chat_engine: ChatEngine
@@ -45,19 +65,29 @@
@app.post(
"/context/chat/completions",
+ response_model=ChatResponse,
+ responses={500: {"description": "Failed to chat with Canopy"}}, # noqa: E501
)
async def chat(
request: ChatRequest = Body(...),
-):
+) -> ChatResponse:
+ """
+ Chat with Canopy, using the LLM and context engine, and return a response.
+
+ The request schema is following OpenAI's chat completion API schema, but removes the need to configure
+ anything, other than the messages field: for more imformation see: https://platform.openai.com/docs/api-reference/chat/create
+
+ """ # noqa: E501
try:
session_id = request.user or "None" # noqa: F841
question_id = str(uuid.uuid4())
logger.debug(f"Received chat request: {request.messages[-1].content}")
- answer = await run_in_threadpool(chat_engine.chat,
- messages=request.messages,
- stream=request.stream)
+ answer = await run_in_threadpool(
+ chat_engine.chat, messages=request.messages, stream=request.stream
+ )
if request.stream:
+
def stringify_content(response: StreamingChatResponse):
for chunk in response.chunks:
chunk.id = question_id
@@ -65,7 +95,7 @@ def stringify_content(response: StreamingChatResponse):
yield data
content_stream = stringify_content(cast(StreamingChatResponse, answer))
- return EventSourceResponse(content_stream, media_type='text/event-stream')
+ return EventSourceResponse(content_stream, media_type="text/event-stream")
else:
chat_response = cast(ChatResponse, answer)
@@ -74,105 +104,134 @@ def stringify_content(response: StreamingChatResponse):
except Exception as e:
logger.exception(f"Chat with question_id {question_id} failed")
- raise HTTPException(
- status_code=500, detail=f"Internal Service Error: {str(e)}")
+ raise HTTPException(status_code=500, detail=f"Internal Service Error: {str(e)}")
@app.post(
"/context/query",
+ response_model=ContextContentResponse,
+ responses={
+ 500: {"description": "Failed to query the knowledgebase or Build the context"}
+ },
)
async def query(
request: ContextQueryRequest = Body(...),
-):
+) -> ContextContentResponse:
+ """
+ Query the knowledgebase and return a context. Context is a collections of text snippets, each with a source.
+ Query enables tuning the context length (in tokens) such that you can cap the cost of the generation.
+ This method can be used with or without a LLM.
+ """ # noqa: E501
try:
context: Context = await run_in_threadpool(
context_engine.query,
queries=request.queries,
- max_context_tokens=request.max_tokens)
+ max_context_tokens=request.max_tokens,
+ )
return context.content
except Exception as e:
logger.exception(e)
- raise HTTPException(
- status_code=500, detail=f"Internal Service Error: {str(e)}")
+ raise HTTPException(status_code=500, detail=f"Internal Service Error: {str(e)}")
@app.post(
"/context/upsert",
+ response_model=SuccessUpsertResponse,
+ responses={500: {"description": "Failed to upsert documents"}},
)
async def upsert(
request: ContextUpsertRequest = Body(...),
-):
+) -> SuccessUpsertResponse:
+ """
+ Upsert documents into the knowledgebase. Upserting is a way to add new documents or update existing ones.
+ Each document has a unique ID. If a document with the same ID already exists, it will be updated.
+
+ This method will run the processing, chunking and endocing of the data in parallel, and then send the
+ encoded data to the Pinecone Index in batches.
+ """ # noqa: E501
try:
logger.info(f"Upserting {len(request.documents)} documents")
- upsert_results = await run_in_threadpool(
- kb.upsert,
- documents=request.documents,
- batch_size=request.batch_size)
+ await run_in_threadpool(
+ kb.upsert, documents=request.documents, batch_size=request.batch_size
+ )
- return upsert_results
+ return SuccessUpsertResponse()
except Exception as e:
logger.exception(e)
- raise HTTPException(
- status_code=500, detail=f"Internal Service Error: {str(e)}")
+ raise HTTPException(status_code=500, detail=f"Internal Service Error: {str(e)}")
@app.post(
"/context/delete",
+ response_model=SuccessDeleteResponse,
+ responses={500: {"description": "Failed to delete documents"}},
)
async def delete(
request: ContextDeleteRequest = Body(...),
-):
+) -> SuccessDeleteResponse:
+ """
+ Delete documents from the knowledgebase. Deleting documents is done by their unique ID.
+ """ # noqa: E501
try:
logger.info(f"Delete {len(request.document_ids)} documents")
- await run_in_threadpool(
- kb.delete,
- document_ids=request.document_ids)
- return {"message": "success"}
+ await run_in_threadpool(kb.delete, document_ids=request.document_ids)
+ return SuccessDeleteResponse()
except Exception as e:
logger.exception(e)
- raise HTTPException(
- status_code=500, detail=f"Internal Service Error: {str(e)}")
+ raise HTTPException(status_code=500, detail=f"Internal Service Error: {str(e)}")
@app.get(
"/health",
+ response_model=HealthStatus,
+ responses={500: {"description": "Failed to connect to Pinecone or LLM"}},
)
-async def health_check():
+@app.exception_handler(Exception)
+async def health_check() -> HealthStatus:
+ """
+ Health check for the Canopy server. This endpoint checks the connection to Pinecone and the LLM.
+ """ # noqa: E501
try:
await run_in_threadpool(kb.verify_index_connection)
except Exception as e:
err_msg = f"Failed connecting to Pinecone Index {kb._index_name}"
logger.exception(err_msg)
raise HTTPException(
- status_code=500, detail=f"{err_msg}. Error: {str(e)}") from e
+ status_code=500, detail=f"{err_msg}. Error: {str(e)}"
+ ) from e
try:
msg = UserMessage(content="This is a health check. Are you alive? Be concise")
- await run_in_threadpool(llm.chat_completion,
- messages=[msg],
- max_tokens=50)
+ await run_in_threadpool(llm.chat_completion, messages=[msg], max_tokens=50)
except Exception as e:
err_msg = f"Failed to communicate with {llm.__class__.__name__}"
logger.exception(err_msg)
raise HTTPException(
- status_code=500, detail=f"{err_msg}. Error: {str(e)}") from e
+ status_code=500, detail=f"{err_msg}. Error: {str(e)}"
+ ) from e
return HealthStatus(pinecone_status="OK", llm_status="OK")
-@app.get(
- "/shutdown"
-)
-async def shutdown():
+@app.get("/shutdown")
+async def shutdown() -> ShutdownResponse:
+ """
+ __WARNING__: Experimental method.
+
+
+ This method will shutdown the server. It is used for testing purposes, and not recommended to be used
+ in production.
+ This method will locate the parent process and send a SIGINT signal to it.
+ """ # noqa: E501
logger.info("Shutting down")
proc = current_process()
pid = proc._parent_pid if "SpawnProcess" in proc.name else proc.pid
os.kill(pid, signal.SIGINT)
- return {"message": "Shutting down"}
+ return ShutdownResponse()
@app.on_event("startup")
@@ -190,11 +249,11 @@ def _init_logging():
stdout_handler = logging.StreamHandler(stream=sys.stdout)
handlers = [file_handler, stdout_handler]
logging.basicConfig(
- format='%(asctime)s - %(processName)s - %(name)-10s [%(levelname)-8s]: '
- '%(message)s',
+ format="%(asctime)s - %(processName)s - %(name)-10s [%(levelname)-8s]: "
+ "%(message)s",
level=os.getenv("CE_LOG_LEVEL", "INFO").upper(),
handlers=handlers,
- force=True
+ force=True,
)
logger = logging.getLogger(__name__)
@@ -211,8 +270,10 @@ def _init_engines():
_load_config(config_file)
else:
- logger.info("Did not find config file. Initializing engines with default "
- "configuration")
+ logger.info(
+ "Did not find config file. Initializing engines with default "
+ "configuration"
+ )
Tokenizer.initialize()
kb = KnowledgeBase(index_name=index_name)
context_engine = ContextEngine(knowledge_base=kb)
@@ -230,9 +291,7 @@ def _load_config(config_file):
config = yaml.safe_load(f)
except Exception as e:
logger.exception(f"Failed to load config file {config_file}")
- raise ConfigError(
- f"Failed to load config file {config_file}. Error: {str(e)}"
- )
+ raise ConfigError(f"Failed to load config file {config_file}. Error: {str(e)}")
tokenizer_config = config.get("tokenizer", {})
Tokenizer.initialize_from_config(tokenizer_config)
if "chat_engine" not in config: