diff --git a/README.md b/README.md index 39cd4586..bf4afd93 100644 --- a/README.md +++ b/README.md @@ -220,7 +220,7 @@ This will open a similar chat interface window, but will show both the RAG and n ## Considerations -* Canopy currently only supports OpenAI as the backend for both the embedding model and the LLM. Rate limits and pricing set by OpenAI will apply. +* Rate limits and pricing set by model providers apply to Canopy usage. Canopy currently works with OpenAI, Azure OpenAI, Anyscale, and Cohere models. * More integrations will be supported in the near future. ## Contributing @@ -248,19 +248,18 @@ client = OpenAI(base_url="http://localhost:8000/v1/my-namespace") ### Running Canopy server in production -Canopy is using FastAPI as the web framework and Uvicorn as the ASGI server. It is recommended to use Gunicorn as the production server, mainly because it supports multiple worker processes and can handle multiple requests in parallel, more details can be found [here](https://www.uvicorn.org/deployment/#using-a-process-manager). - -To run the canopy server for production, please run: +Canopy is using FastAPI as the web framework and Uvicorn as the ASGI server. +To use Canopy in production, it is recommended to utilize Canopy's docker image, available on [GitHub Packages](https://github.com/pinecone-io/canopy/pkgs/container/canopy), +for your production needs. +For guidance on deploying Canopy on the Google Cloud Platform (GCP), refer to the example provided in the +[Deployment to GCP](docs/deployment-gcp.md) documentation. +Alternatively, you can use Gunicorn as production-grade WSGI, more details [here](https://www.uvicorn.org/deployment/#using-a-process-manager). +Set your desired `PORT` and `WORKER_COUNT` envrionment variables, and start the server with: ```bash -gunicorn canopy_server.app:app --worker-class uvicorn.workers.UvicornWorker --bind 0.0.0.0:PORT --workers WORKER_COUNT +gunicorn canopy_server.app:app --worker-class uvicorn.workers.UvicornWorker --bind 0.0.0.0:$PORT --workers $WORKER_COUNT ``` -Alternatively, consider utilizing the Canopy Docker image available on [GitHub Packages](https://github.com/pinecone-io/canopy/pkgs/container/canopy) -for your production needs. For guidance on deploying Canopy on the Google Cloud Platform (GCP), refer to the example provided in the -[Deployment to GCP](docs/deployment-gcp.md) documentation. - - > [!IMPORTANT] > The server interacts with services like Pinecone and OpenAI using your own authentication credentials. When deploying the server on a public web hosting provider, it is recommended to enable an authentication mechanism, diff --git a/pyproject.toml b/pyproject.toml index 9f333aac..5636c238 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,7 +18,7 @@ openai = "^1.2.3" tiktoken = "^0.3.3" pydantic = "^1.10.7" pandas-stubs = "^2.0.3.230814" -fastapi = ">=0.92.0, <1.0.0" +fastapi = ">=0.93.0, <1.0.0" uvicorn = ">=0.20.0, <1.0.0" tenacity = "^8.2.1" sse-starlette = "^1.6.5" @@ -77,8 +77,10 @@ module = [ 'pinecone_datasets', 'pinecone', 'transformers.*', + 'tokenizers.*', 'cohere.*', - 'pinecone.grpc' + 'pinecone.grpc', + 'huggingface_hub.utils' ] ignore_missing_imports = true diff --git a/src/canopy/chat_engine/query_generator/__init__.py b/src/canopy/chat_engine/query_generator/__init__.py index e91a19e0..332094cc 100644 --- a/src/canopy/chat_engine/query_generator/__init__.py +++ b/src/canopy/chat_engine/query_generator/__init__.py @@ -2,3 +2,4 @@ from .function_calling import FunctionCallingQueryGenerator from .last_message import LastMessageQueryGenerator from .instruction import InstructionQueryGenerator +from .cohere import CohereQueryGenerator diff --git a/src/canopy/chat_engine/query_generator/cohere.py b/src/canopy/chat_engine/query_generator/cohere.py new file mode 100644 index 00000000..450872ac --- /dev/null +++ b/src/canopy/chat_engine/query_generator/cohere.py @@ -0,0 +1,42 @@ +from typing import List, Optional, cast + +from canopy.chat_engine.query_generator import QueryGenerator +from canopy.chat_engine.history_pruner.raising import RaisingHistoryPruner +from canopy.llm import BaseLLM, CohereLLM +from canopy.models.data_models import Messages, Query + + +class CohereQueryGenerator(QueryGenerator): + """ + Query generator for LLM clients that have a built-in feature to + generate search queries from chat messages. + """ + _DEFAULT_COMPONENTS = { + "llm": CohereLLM, + } + + def __init__(self, + *, + llm: Optional[BaseLLM] = None): + self._llm = llm or self._DEFAULT_COMPONENTS["llm"]() + + if not isinstance(self._llm, CohereLLM): + raise NotImplementedError( + "CohereQueryGenerator only compatible with CohereLLM" + ) + + self._history_pruner = RaisingHistoryPruner() + + def generate(self, + messages: Messages, + max_prompt_tokens: int) -> List[Query]: + messages = self._history_pruner.build(chat_history=messages, + max_tokens=max_prompt_tokens) + llm = cast(CohereLLM, self._llm) + queries = llm.generate_search_queries(messages) + return [Query(text=query) for query in queries] + + async def agenerate(self, + messages: Messages, + max_prompt_tokens: int) -> List[Query]: + raise NotImplementedError diff --git a/src/canopy/config/cohere.yaml b/src/canopy/config/cohere.yaml new file mode 100644 index 00000000..0d0bfaea --- /dev/null +++ b/src/canopy/config/cohere.yaml @@ -0,0 +1,83 @@ +# ================================================================== +# Configuration file for Canopy Server with Cohere. +# ================================================================== + +# --------------------------------------------------------------------------------- +system_prompt: &system_prompt | + Use the documents to answer the user question at the next messages. The documents are retrieved from a knowledge + database and you should use only the facts from the documents to answer. Always remember to include the source to + the documents you used from their 'source' field in the format 'Source: $SOURCE_HERE'. + If you don't know the answer, just say that you don't know, don't try to make up an answer, use the documents. + Don't address the documents directly, but use them to answer the user question like it's your own knowledge. + + +# ------------------------------------------------------------------------------------------- +# Tokenizer configuration +# ------------------------------------------------------------------------------------------- +tokenizer: + type: CohereHFTokenizer + params: + model_name: Cohere/Command-nightly + + +# ------------------------------------------------------------------------------------------------------------- +# Chat engine configuration +# ------------------------------------------------------------------------------------------------------------- +chat_engine: + params: + system_prompt: *system_prompt + + # ------------------------------------------------------------------------------------------------------------- + # LLM configuration + # ------------------------------------------------------------------------------------------------------------- + llm: &llm + type: CohereLLM + params: + model_name: command + # You can add any additional parameters which are supported by the Cohere Co.Chat API. The values set + # here will be used in every Co.Chat API call. For example: + # prompt_truncation: "AUTO" + # citation_quality: "accurate" + # temperature: 0.85 + # Specifying connectors is contrary to Canopy's purpose of searching the Pinecone knowledge base only, + # but technically can still be passed like this: + # connectors: + # - "web-search" + # Uncomment to suppress errors when unrecognized or unsupported model params are sent to CohereLLM. + # ignore_unrecognized_params: true + + # -------------------------------------------------------------------- + # Configuration for the QueryBuilder subcomponent of the chat engine. + # -------------------------------------------------------------------- + query_builder: + type: CohereQueryGenerator + params: {} + llm: + <<: *llm + + + # ------------------------------------------------------------------------------------------------------------- + # ContextEngine configuration + # ------------------------------------------------------------------------------------------------------------- + context_engine: + # ----------------------------------------------------------------------------------------------------------- + # KnowledgeBase configuration + # ----------------------------------------------------------------------------------------------------------- + knowledge_base: + params: + default_top_k: 100 + + # -------------------------------------------------------------------------- + # Configuration for the RecordEncoder subcomponent of the knowledge base. + # -------------------------------------------------------------------------- + record_encoder: + type: CohereRecordEncoder + params: + model_name: # The name of the model to use for encoding + "embed-english-v3.0" + batch_size: 100 # The number of document chunks to encode in each call to the encoding model + + reranker: + type: CohereReranker + params: + top_n: 5 \ No newline at end of file diff --git a/src/canopy/knowledge_base/record_encoder/__init__.py b/src/canopy/knowledge_base/record_encoder/__init__.py index d3cd86f8..260953fb 100644 --- a/src/canopy/knowledge_base/record_encoder/__init__.py +++ b/src/canopy/knowledge_base/record_encoder/__init__.py @@ -5,4 +5,5 @@ from .anyscale import AnyscaleRecordEncoder from .azure_openai import AzureOpenAIRecordEncoder from .jina import JinaRecordEncoder +from .sentence_transformers import SentenceTransformerRecordEncoder from .hybrid import HybridRecordEncoder diff --git a/src/canopy/knowledge_base/record_encoder/dense.py b/src/canopy/knowledge_base/record_encoder/dense.py index c693368e..7e605a25 100644 --- a/src/canopy/knowledge_base/record_encoder/dense.py +++ b/src/canopy/knowledge_base/record_encoder/dense.py @@ -10,7 +10,7 @@ class DenseRecordEncoder(RecordEncoder): """ DenseRecordEncoder is a subclass of RecordEncoder that generates dense vector representation of documents chunks and textual queries. - The dense represntation generated by the `DenseRecordEncoder` is a list of floats in a given dimension. + The dense representation generated by the `DenseRecordEncoder` is a list of floats in a given dimension. DenseRecordEncoder wraps a BaseDenseEncoder from the `pinecone-text` library to encode the text itself. for more information about the BaseDenseEncoder see: https://github.com/pinecone-io/pinecone-text """ # noqa: E501 diff --git a/src/canopy/knowledge_base/record_encoder/sentence_transformers.py b/src/canopy/knowledge_base/record_encoder/sentence_transformers.py new file mode 100644 index 00000000..b15fcdb8 --- /dev/null +++ b/src/canopy/knowledge_base/record_encoder/sentence_transformers.py @@ -0,0 +1,57 @@ +from typing import Optional +from pinecone_text.dense import SentenceTransformerEncoder +from canopy.knowledge_base.record_encoder.dense import DenseRecordEncoder +from huggingface_hub.utils import RepositoryNotFoundError + + +class SentenceTransformerRecordEncoder(DenseRecordEncoder): + """ + SentenceTransformerRecordEncoder is a type of DenseRecordEncoder that uses a Sentence Transformer model. + The implementation uses the `SentenceTransformerEncoder` class from the `pinecone-text` library. + For more information about see: https://github.com/pinecone-io/pinecone-text + + """ # noqa: E501 + + def __init__(self, + *, + model_name: str = "sentence-transformers/all-MiniLM-L6-v2", + query_encoder_name: Optional[str] = None, + batch_size: int = 400, + device: Optional[str] = None, + **kwargs) -> None: + """ + Initialize the SentenceTransformerRecordEncoder + + Args: + model_name: The name of the embedding model to use for encoding documents. + See https://huggingface.co/models?library=sentence-transformers + for all possible Sentence Transformer models. + query_encoder_name: The name of the embedding model to use for encoding queries. + See https://huggingface.co/models?library=sentence-transformers + for all possible Sentence Transformer models. + Defaults to `model_name`. + batch_size: The number of documents or queries to encode at once. + Defaults to 400. + device: The local device to use for encoding, for example "cpu", "cuda" or "mps". + Defaults to "cuda" if cuda is available, otherwise to "cpu". + **kwargs: Additional arguments to pass to the underlying `pinecone-text.SentenceTransformerEncoder`. + """ # noqa: E501 + try: + encoder = SentenceTransformerEncoder( + document_encoder_name=model_name, + query_encoder_name=query_encoder_name, + device=device, + **kwargs, + ) + except RepositoryNotFoundError as e: + raise RuntimeError( + "Your chosen Sentence Transformer model(s) could not be found. " + f"Details: {str(e)}" + ) from e + except ImportError: + raise ImportError( + f"{self.__class__.__name__} requires the `torch` and `transformers` " + f"extra dependencies. Please install them using " + f"`pip install canopy-sdk[torch,transformers]`." + ) + super().__init__(dense_encoder=encoder, batch_size=batch_size) diff --git a/src/canopy/llm/__init__.py b/src/canopy/llm/__init__.py index 96d363a5..0e34502d 100644 --- a/src/canopy/llm/__init__.py +++ b/src/canopy/llm/__init__.py @@ -2,3 +2,4 @@ from .openai import OpenAILLM from .anyscale import AnyscaleLLM from .azure_openai_llm import AzureOpenAILLM +from .cohere import CohereLLM diff --git a/src/canopy/llm/cohere.py b/src/canopy/llm/cohere.py new file mode 100644 index 00000000..f16f9fd1 --- /dev/null +++ b/src/canopy/llm/cohere.py @@ -0,0 +1,403 @@ +import time +from copy import deepcopy +from typing import Union, Iterable, Optional, Any, Dict, List + +from tenacity import retry, stop_after_attempt + +try: + import cohere +except (OSError, ImportError, ModuleNotFoundError): + _cohere_installed = False +else: + _cohere_installed = True + +from canopy.llm import BaseLLM +from canopy.llm.models import Function +from canopy.models.api_models import ( + _Choice, + _StreamChoice, + ChatResponse, + StreamingChatChunk, + TokenCounts, +) +from canopy.models.data_models import Context, MessageBase, Messages, Role, Query +from canopy.context_engine.context_builder.stuffing import StuffingContextContent + + +COMMON_PARAMS = { + "model", + "frequency_penalty", + "logit_bias", + "max_tokens", + "presence_penalty", + "stream", + "temperature", +} + + +EQUIVALENT_PARAMS = { + "top_p": "p", + "user": "user_name", +} + + +class CohereLLM(BaseLLM): + """ + Cohere LLM wrapper built on top of the Cohere Python client. + + Note: Cohere requires a valid API key to use this class. + You can set the "CO_API_KEY" environment variable to your API key. + """ + def __init__(self, + model_name: str = "command", + *, + api_key: Optional[str] = None, + base_url: Optional[str] = None, + ignore_unrecognized_params: Optional[bool] = False, + **kwargs: Any, + ): + """ + Initialize the Cohere LLM. + + Args: + model_name: The name of the model to use. See https://docs.cohere.com/docs/models + api_key: Your Cohere API key. Defaults to None (uses the "CO_API_KEY" environment variable). + base_url: The base URL to use for the Cohere API. Defaults to None (uses the "CO_API_URL" environment variable if set, otherwise use default Cohere API URL). + ignore_unrecognized_params: Flag to suppress errors when unrecognized model params (from other LLMs) are passed to Cohere. + **kwargs: Generation default parameters to use for each request. See https://platform.openai.com/docs/api-reference/chat/create + For example, you can set the temperature, p, etc + These params can be overridden by passing a `model_params` argument to the `chat_completion` methods. + """ # noqa: E501 + super().__init__(model_name) + + if not _cohere_installed: + raise ImportError( + "Failed to import cohere. Make sure you install cohere extra " + "dependencies by running: " + "pip install canopy-sdk[cohere]" + ) + + try: + self._client = cohere.Client(api_key, api_url=base_url) + except cohere.error.CohereError as e: + raise RuntimeError( + "Failed to connect to Cohere, please make sure that the CO_API_KEY " + "environment variable is set correctly.\n" + f"Error: {e.message}" + ) + + self.ignore_unrecognized_params = ignore_unrecognized_params + self.default_model_params = kwargs + + def chat_completion(self, + system_prompt: str, + chat_history: Messages, + context: Optional[Context] = None, + *, + stream: bool = False, + max_tokens: Optional[int] = None, + model_params: Optional[dict] = None, + ) -> Union[ChatResponse, Iterable[StreamingChatChunk]]: + """ + Chat completion using the Cohere API. + + Note: this function is wrapped in a retry decorator to handle transient errors. + + Args: + system_prompt: The system prompt to use for the chat completion (preamble). + chat_history: Messages (chat history) to send to the model. + context: Knowledge base context to use for the chat completion. Defaults to None (no context). + stream: Whether to stream the response or not. + max_tokens: Maximum number of tokens to generate. Defaults to None (generates until stop sequence or until hitting max context size). + model_params: Model parameters to use for this request. Defaults to None (uses the default model parameters). + Dictonary of parametrs to override the default model parameters if set on initialization. + For example, you can pass: {"temperature": 0.9, "top_p": 1.0} to override the default temperature and top_p. + see: https://platform.openai.com/docs/api-reference/chat/create + Returns: + ChatResponse or StreamingChatChunk + + Usage: + >>> from canopy.llm import OpenAILLM + >>> from canopy.models.data_models import UserMessage + >>> llm = CohereLLM() + >>> messages = [UserMessage(content="Hello! How are you?")] + >>> result = llm.chat_completion(messages) + >>> print(result.choices[0].message.content) + "I'm good, how are you?" + """ # noqa: E501 + model_params_dict: Dict[str, Any] = deepcopy(self.default_model_params) + model_params_dict.update( + model_params or {} + ) + model_params_dict["max_tokens"] = max_tokens + + model_params_dict = self._convert_model_params(model_params_dict) + + connectors = model_params_dict.pop('connectors', None) + messages: List[Dict[str, Any]] = self._map_messages(chat_history) + model_name = model_params_dict.pop('model', None) or self.model_name + + if not messages: + raise RuntimeError("No message provided") + + if system_prompt: + messages = self._prepend_system_prompt_to_messages(system_prompt, messages) + + try: + response = self._client.chat( + model=model_name, + message=messages.pop()['message'], + chat_history=messages, + documents=self.generate_documents_from_context(context), + preamble_override=None, + stream=stream, + connectors=[ + {"id": connector} for connector in connectors + ] if connectors else None, + **model_params_dict + ) + except cohere.error.CohereAPIError as e: + raise RuntimeError( + f"Failed to use Cohere's {model_name} model for chat " + f"completion. " + f"Underlying Error:\n{e.message}" + ) + + def streaming_iterator(res): + for chunk in res: + if chunk.event_type != "text-generation": + continue + + choice = _StreamChoice( + index=0, + delta={ + "content": chunk.text, + "function_call": None, + "role": Role.ASSISTANT, + "tool_calls": None + }, + finish_reason=None, + ) + streaming_chat_chunk = StreamingChatChunk( + id='', + object="chat.completion.chunk", + created=int(time.time()), + model=self.model_name, + choices=[choice], + ) + streaming_chat_chunk.id = chunk.id + + yield streaming_chat_chunk + + if stream: + return streaming_iterator(response) + + return ChatResponse( + id=response.id, + created=int(time.time()), + choices=[_Choice( + index=0, + message=MessageBase( + role=Role.ASSISTANT, + content=response.text, + ), + finish_reason="stop", + )], + object="chat.completion", + model=self.model_name, + usage=TokenCounts( + prompt_tokens=response.token_count["prompt_tokens"], + completion_tokens=response.token_count["response_tokens"], + total_tokens=response.token_count["billed_tokens"], + ), + ) + + @retry( + reraise=True, + stop=stop_after_attempt(3), + ) + def generate_search_queries(self, messages): + messages = self._map_messages(messages) + response = self._client.chat( + model=self.model_name, + message=messages[-1]['message'], + chat_history=messages[:-1], + stream=False, + search_queries_only=True, + ) + return [search_query['text'] for search_query in response.search_queries] + + def enforced_function_call(self, + system_prompt: str, + chat_history: Messages, + function: Function, + *, + max_tokens: Optional[int] = None, + model_params: Optional[dict] = None + ) -> dict: + raise NotImplementedError("Cohere LLM doesn't support function calling") + + async def aenforced_function_call(self, + system_prompt: str, + chat_history: Messages, + function: Function, *, + max_tokens: Optional[int] = None, + model_params: Optional[dict] = None): + raise NotImplementedError("Cohere LLM doesn't support function calling") + + async def achat_completion(self, + system_prompt: str, + chat_history: Messages, + context: Optional[Context] = None, + *, + stream: bool = False, + max_generated_tokens: Optional[int] = None, + model_params: Optional[dict] = None, + ) -> Union[ChatResponse, + Iterable[StreamingChatChunk]]: + raise NotImplementedError("Cohere LLM doesn't support async chat completion") + + async def agenerate_queries(self, + messages: Messages, + *, + max_generated_tokens: Optional[int] = None, + model_params: Optional[dict] = None, + ) -> List[Query]: + raise NotImplementedError("Cohere LLM doesn't support async query generation") + + def _convert_model_params(self, openai_model_params: dict) -> dict: + """ + Convert Open AI model params to Cohere equivalents. + + Args: + openai_model_params: model params passed from client to Canopy API in OpenAI format. + + Returns: + Model params used with Cohere Chat API. + """ # noqa: E501 + converted_model_params = {} + + for param in list(openai_model_params.keys()): + if param in COMMON_PARAMS: + converted_model_params[param] = openai_model_params.pop(param) + elif param in EQUIVALENT_PARAMS: + converted_model_params[EQUIVALENT_PARAMS[param]] = \ + openai_model_params.pop(param) + + # Scale is -2.0 to 2.0 with OpenAI, but -1.0 to 1.0 with Cohere. + if presence_penalty := converted_model_params.get("presence_penalty"): + converted_model_params = presence_penalty * 0.5 + + unrecognized_keys = set(openai_model_params.keys()) + default_keys = set(self.default_model_params.keys()) + + if unrecognized_keys.difference(default_keys) \ + and not self.ignore_unrecognized_params: + raise NotImplementedError( + f"{','.join(unrecognized_keys)} not supported by Cohere Chat API." + ) + + return converted_model_params + + def _map_messages(self, messages: Messages) -> List[dict[str, Any]]: + """ + Map the messages to format expected by Cohere. + + Cohere Chat API expects message history to be in the format: + { + "role": "USER", + "message": "message text" + } + + System messages will be passed as user messages. + + Args: + messages: (chat history) to send to the model. + + Returns: + list A List of dicts in format expected by Cohere chat API. + """ + mapped_messages = [] + + for message in messages: + if not message.content: + continue + + mapped_messages.append({ + "role": "CHATBOT" if message.role == Role.ASSISTANT else "USER", + "message": message.content, + }) + + return mapped_messages + + def _prepend_system_prompt_to_messages(self, + system_prompt: str, + messages: List[dict[str, Any]]) -> ( + List)[dict[str, Any]]: + """ + Prepend the value passed as the system prompt to the messages. + + Cohere does not have a direct equivalent to the system prompt, and when passing + documents it's preferred to send the system prompt as the first message instead. + """ + first_message = messages[0] + + if (first_message["message"] == system_prompt + and first_message["role"] == "USER"): + return messages + + system_prompt_messages = [ + { + "role": "USER", + "message": system_prompt, + }, + { + "role": "CHATBOT", + "message": "Ok." + } + ] + + return system_prompt_messages + messages + + def generate_documents_from_context( + self, context: Optional[Context]) -> List[Dict[str, Any]]: + """ + Generate document data to pass to Cohere Chat API from provided context data. + + Args: + context: Knowledge base context to use for the chat completion. + + Returns: + documents: list of document objects for Cohere API. + """ + if not context: + return [] + + if isinstance(context.content, StuffingContextContent): + return ( + self.generate_documents_from_stuffing_context_content(context.content) + ) + + raise NotImplementedError( + "Cohere LLM is currently supported only with StuffingContextBuilder." + ) + + def generate_documents_from_stuffing_context_content( + self, + content: StuffingContextContent) -> List[Dict[str, Any]]: + """ + Generate document data to pass to Cohere Chat API from StuffingContextContent. + + Args: + content: Stuffing context content from the context. + + Returns: + documents: list of document objects for Cohere API. + """ + documents = [] + + for result in content.__root__: + for snippet in result.snippets: + documents.append(snippet.dict()) + + return documents diff --git a/src/canopy/tokenizer/__init__.py b/src/canopy/tokenizer/__init__.py index 9c699052..18d9006c 100644 --- a/src/canopy/tokenizer/__init__.py +++ b/src/canopy/tokenizer/__init__.py @@ -1,3 +1,4 @@ -from .openai import OpenAITokenizer +from .cohere import CohereAPITokenizer, CohereHFTokenizer from .llama import LlamaTokenizer +from .openai import OpenAITokenizer from .tokenizer import Tokenizer diff --git a/src/canopy/tokenizer/cohere.py b/src/canopy/tokenizer/cohere.py new file mode 100644 index 00000000..172bbac0 --- /dev/null +++ b/src/canopy/tokenizer/cohere.py @@ -0,0 +1,197 @@ +from typing import List, Optional + +from tokenizers import Tokenizer as HfTokenizer +try: + import cohere +except (OSError, ImportError, ModuleNotFoundError): + _cohere_installed = False +else: + _cohere_installed = True + +from .base import BaseTokenizer +from ..models.data_models import Messages + + +class CohereHFTokenizer(BaseTokenizer): + """ + Tokenizer for Cohere models, based on the Hugging Face tokenizers library. + + Usage: + Initialize the singleton tokenizer with the LlamaTokenizer class: + >>> from canopy.tokenizer import Tokenizer + >>> Tokenizer.initialize(tokenizer_class=CohereHFTokenizer, + model_name="Cohere/Command-nightly") + + You can then use the tokenizer instance from anywhere in the code: + >>> from canopy.tokenizer import Tokenizer + >>> tokenizer = Tokenizer() + >>> tokenizer.tokenize("Hello World!") + ['▁Hello', '▁World', '!'] + """ + + MESSAGE_TOKENS_OVERHEAD = 3 + FIXED_PREFIX_TOKENS = 3 + + def __init__( + self, + model_name: str = "Cohere/Command-nightly", + ): + """ + Initialize the tokenizer. + + Args: + model_name: The name of the Hugging Face model to use. Defaults to "Cohere/Command-nightly". + """ # noqa: E501 + if not _cohere_installed: + raise ImportError( + "Failed to import cohere. Make sure you install cohere extra " + "dependencies by running: " + "pip install canopy-sdk[cohere]" + ) + + self._encoder = HfTokenizer.from_pretrained(model_name) + + def tokenize(self, text: str) -> List[str]: + """ + Tokenize a text using HuggingFace's tokenizers library. + + Args: + text: The text to tokenize. + + Returns: + The list of tokens. + """ + return self._encoder.encode(text, add_special_tokens=False).tokens + + def detokenize(self, tokens: List[str]) -> str: + """ + Detokenize a list of tokens that were previously tokenized using this tokenizer. + + Args: + tokens: The list of tokens to detokenize. + + Returns: + The detokenized text as a string. + """ + if not isinstance(tokens, List): + raise TypeError(f"detokenize expect List[str], got f{type(tokens)}") + + ids = [self._encoder.token_to_id(token) for token in tokens] + return self._encoder.decode(ids) + + def token_count(self, text: str) -> int: + """ + Count the number of tokens in a text. + + Args: + text: The text to count the tokens of. + + Returns: + The number of tokens in the text. + """ + return len(self._encoder.encode(text, add_special_tokens=False).ids) + + def messages_token_count(self, messages: Messages) -> int: + """ + Count the number of tokens in a list of messages, as expected to be + counted by Cohere models. + + Args: + messages: The list of messages to count the tokens of. + + Returns: + The number of tokens in the messages, as expected to be counted by Cohere models. + """ # noqa: E501 + num_tokens = 0 + for message in messages: + num_tokens += self.MESSAGE_TOKENS_OVERHEAD + for key, value in message.dict().items(): + num_tokens += self.token_count(value) + num_tokens += self.FIXED_PREFIX_TOKENS + return num_tokens + + +class CohereAPITokenizer(BaseTokenizer): + """ + Tokenizer for Cohere models, based on the Cohere Tokenize API. + + Usage: + Initialize the singleton tokenizer with the CohereAPITokenizer class: + >>> from canopy.tokenizer import Tokenizer + >>> Tokenizer.initialize(tokenizer_class=CohereAPITokenizer, model_name="embed-multilingual-v3.0") + + You can then use the tokenizer instance from anywhere in the code: + >>> from canopy.tokenizer import Tokenizer + >>> tokenizer = Tokenizer() + >>> tokenizer.tokenize("Hello world!") + ['Hello', ' world', '!'] + """ # noqa: E501 + + MESSAGE_TOKENS_OVERHEAD = 3 + FIXED_PREFIX_TOKENS = 3 + + def __init__(self, + model_name: Optional[str] = None, + *, + api_key: Optional[str] = None, + api_url: Optional[str] = None): + """ + Initialize the tokenizer. + + Args: + model_name: The name of the model to use. + api_key: Your Cohere API key. Defaults to None (uses the "CO_API_KEY" environment variable). + api_url: The base URL to use for the Cohere API. Defaults to None (uses the "CO_API_URL" environment variable if set, otherwise use default Cohere API URL). + """ # noqa: E501 + self.model_name = model_name + self._client = cohere.Client(api_key, api_url=api_url) + + def tokenize(self, text: str) -> List[str]: + """ + Tokenize a text using Cohere Tokenize API. + + Args: + text: The text to tokenize. + + Returns: + The list of tokens. + """ + if not text: + return [] + + tokens = self._client.tokenize(text, model=self.model_name) + return tokens.token_strings + + def detokenize(self, tokens: List[str]) -> str: + """ + Detokenize a list of tokens that were previously tokenized using this tokenizer. + + Args: + tokens: The list of tokens to detokenize. + + Returns: + The detokenized text as a string. + """ + if not isinstance(tokens, List): + raise TypeError(f"detokenize expects List[str], got f{type(tokens)}") + return "".join(tokens) + + def messages_token_count(self, messages: Messages) -> int: + """ + Count the number of tokens in a list of messages as expected to be counted by Cohere models. + Account for the overhead of the messages structure. + Taken from: https://github.com/openai/openai-cookbook/.../How_to_format_inputs_to_ChatGPT_models.ipynb + + Args: + messages: The list of messages to count the tokens of. + + Returns: + The number of tokens in the messages, as expected to be counted by OpenAI models. + """ # noqa: E501 + num_tokens = 0 + for message in messages: + num_tokens += self.MESSAGE_TOKENS_OVERHEAD + for key, value in message.dict().items(): + num_tokens += self.token_count(value) + num_tokens += self.FIXED_PREFIX_TOKENS + return num_tokens diff --git a/tests/system/llm/test_cohere.py b/tests/system/llm/test_cohere.py new file mode 100644 index 00000000..69a66571 --- /dev/null +++ b/tests/system/llm/test_cohere.py @@ -0,0 +1,302 @@ +from unittest.mock import MagicMock + +import pytest +from cohere.error import CohereAPIError + +from canopy.models.data_models import Context, ContextContent, Role, MessageBase +from canopy.context_engine.context_builder.stuffing import ( + StuffingContextContent, ContextQueryResult, ContextSnippet +) +from canopy.models.api_models import ChatResponse, StreamingChatChunk +from canopy.llm.cohere import CohereLLM + + +def assert_chat_completion(response): + assert len(response.choices) == 1 # Cohere API does not return multiple choices. + + assert isinstance(response.choices[0].message, MessageBase) + assert isinstance(response.choices[0].message.content, str) + assert len(response.choices[0].message.content) > 0 + assert isinstance(response.choices[0].message.role, Role) + + +@pytest.fixture +def model_name(): + return "command" + + +@pytest.fixture +def system_prompt(): + return "Use only the provided documents to answer." + + +@pytest.fixture +def expected_chat_kwargs(system_prompt): + return { + "model": "command", + "message": "Just checking in. Be concise.", + "chat_history": [ + {"role": "USER", "message": "Use only the provided documents to answer."}, + {"role": "CHATBOT", "message": "Ok."}, + {'role': 'USER', 'message': 'Hello, assistant.'}, + {"role": "CHATBOT", "message": "Hello, user. How can I assist you?"} + ], + "connectors": None, + "documents": [], + "preamble_override": None, + "stream": False, + "max_tokens": None, + } + + +@pytest.fixture +def model_params_high_temperature(): + return {"temperature": 0.9} + + +@pytest.fixture +def model_params_low_temperature(): + return {"temperature": 0.2} + + +@pytest.fixture +def cohere_llm(): + return CohereLLM() + + +@pytest.fixture +def unsupported_context(): + class UnsupportedContextContent(ContextContent): + def to_text(self, **kwargs): + return '' + + return Context(content=UnsupportedContextContent(), num_tokens=123) + + +def test_init_with_custom_params(): + llm = CohereLLM(model_name="test_model_name", + api_key="test_api_key", + temperature=0.9) + + assert llm.model_name == "test_model_name" + assert llm.default_model_params["temperature"] == 0.9 + assert llm._client.api_key == "test_api_key" + + +def test_chat_completion(cohere_llm, messages, system_prompt, expected_chat_kwargs): + cohere_llm._client = MagicMock(wraps=cohere_llm._client) + response = cohere_llm.chat_completion( + chat_history=messages, system_prompt=system_prompt) + cohere_llm._client.chat.assert_called_once_with(**expected_chat_kwargs) + assert_chat_completion(response) + + +def test_chat_completion_high_temperature(cohere_llm, + messages, + model_params_high_temperature): + response = cohere_llm.chat_completion( + chat_history=messages, + model_params=model_params_high_temperature, + system_prompt='', + ) + assert_chat_completion(response) + + +def test_chat_completion_low_temperature(cohere_llm, + messages, + model_params_low_temperature): + response = cohere_llm.chat_completion(chat_history=messages, + model_params=model_params_low_temperature, + system_prompt='') + assert_chat_completion(response) + + +def test_chat_completion_without_system_prompt(cohere_llm, + messages, + expected_chat_kwargs): + expected_chat_kwargs["chat_history"] = expected_chat_kwargs["chat_history"][2:] + cohere_llm._client = MagicMock(wraps=cohere_llm._client) + response = cohere_llm.chat_completion( + chat_history=messages, system_prompt="") + cohere_llm._client.chat.assert_called_once_with(**expected_chat_kwargs) + assert_chat_completion(response) + + +def test_chat_streaming(cohere_llm, messages): + stream = True + response = cohere_llm.chat_completion(chat_history=messages, + stream=stream, + system_prompt='') + messages_received = [message for message in response] + assert len(messages_received) > 0 + + for message in messages_received: + assert isinstance(message, StreamingChatChunk) + assert message.object == "chat.completion.chunk" + + +def test_max_tokens(cohere_llm, messages): + max_tokens = 2 + response = cohere_llm.chat_completion(chat_history=messages, + max_tokens=max_tokens, + system_prompt='') + assert isinstance(response, ChatResponse) + assert len(response.choices[0].message.content.split()) <= max_tokens + + +def test_missing_messages(cohere_llm): + with pytest.raises(RuntimeError, match="No message provided"): + cohere_llm.chat_completion(chat_history=[], system_prompt='') + + +def test_negative_max_tokens(cohere_llm, messages): + with pytest.raises(RuntimeError, match="max_tokens cannot be less than 0"): + cohere_llm.chat_completion( + chat_history=messages, max_tokens=-5, system_prompt='') + + +def test_chat_completion_api_failure_propagates(cohere_llm, + messages): + cohere_llm._client = MagicMock() + cohere_llm._client.chat.side_effect = CohereAPIError("API call failed") + + with pytest.raises(RuntimeError, match="API call failed"): + cohere_llm.chat_completion(chat_history=messages, system_prompt="") + + +def test_chat_completion_with_unsupported_context_engine(cohere_llm, + messages, + unsupported_context): + cohere_llm._client = MagicMock() + + with pytest.raises(NotImplementedError): + cohere_llm.chat_completion(chat_history=messages, + system_prompt="", + context=unsupported_context) + + +def test_chat_completion_with_unrecognized_param_raises_error(cohere_llm, messages): + with pytest.raises(NotImplementedError): + cohere_llm.chat_completion(chat_history=messages, + system_prompt="", + model_params={ + "functions": {}, + }) + + +def test_chat_completion_ignores_unrecognized_model_params_with_init_kwarg(messages): + cohere_llm = CohereLLM(ignore_unrecognized_params=True) + response = cohere_llm.chat_completion(chat_history=messages, + system_prompt="", + model_params={ + "functions": {}, + }) + assert response.object == "chat.completion" + + +def test_chat_completion_with_equivalent_model_params(cohere_llm, + messages, + system_prompt, + expected_chat_kwargs): + cohere_llm._client = MagicMock(wraps=cohere_llm._client) + response = cohere_llm.chat_completion( + chat_history=messages, + system_prompt=system_prompt, + model_params={ + "top_p": 0.9, + "user": "admin", + } + ) + expected_chat_kwargs_with_equivalents = { + **expected_chat_kwargs, + "p": 0.9, + "user_name": "admin", + } + cohere_llm._client.chat.assert_called_once_with( + **expected_chat_kwargs_with_equivalents + ) + assert response.object == "chat.completion" + + +def test_chat_completion_with_stuffing_context_snippets(cohere_llm, + messages, + expected_chat_kwargs, + system_prompt): + cohere_llm._client = MagicMock(wraps=cohere_llm._client) + content = StuffingContextContent(__root__=[ + ContextQueryResult(query="", snippets=[ + ContextSnippet( + source="https://www.example.com/document", + text="Document text", + ), + ContextSnippet( + source="https://www.example.com/second_document", + text="Second document text", + ) + ]) + ]) + stuffing_context = Context( + content=content, + num_tokens=123) + + response = cohere_llm.chat_completion( + chat_history=messages, + system_prompt=system_prompt, + context=stuffing_context) + + # Check that we got a valid chat response - details tested in other tests + assert isinstance(response, ChatResponse) + assert response.object == "chat.completion" + + # Check that Cohere client was called with the snippets + expected_chat_kwargs["documents"] = [ + { + "source": "https://www.example.com/document", + "text": "Document text", + }, + { + "source": "https://www.example.com/second_document", + "text": "Second document text", + }, + ] + cohere_llm._client.chat.assert_called_once_with(**expected_chat_kwargs) + + +def test_token_counts_mapped_in_chat_response(cohere_llm, messages, system_prompt): + response = cohere_llm.chat_completion(chat_history=messages, + system_prompt=system_prompt) + assert response.usage.prompt_tokens == 107 + assert response.usage.completion_tokens + assert response.usage.total_tokens == ( + response.usage.prompt_tokens + response.usage.completion_tokens + ) + + +def test_api_errors_caught_and_raised_as_runtime_errors(cohere_llm, + messages, + system_prompt): + expected_message = ( + "Failed to use Cohere's unknown_model model for chat completion." + " Underlying Error:\n" + ".+" + ) + + with pytest.raises(RuntimeError, match=expected_message): + cohere_llm.chat_completion(chat_history=messages, + system_prompt=system_prompt, + model_params={ + "model": "unknown_model", + }) + + +def test_bad_api_key(monkeypatch): + monkeypatch.setenv("CO_API_KEY", "") + + expected_message = ( + "Failed to connect to Cohere, please make sure that the CO_API_KEY" + " environment variable is set correctly.\n" + ".*API key" + ) + + with pytest.raises(RuntimeError, match=expected_message): + CohereLLM() diff --git a/tests/system/query_generator/test_cohere_query_generator.py b/tests/system/query_generator/test_cohere_query_generator.py new file mode 100644 index 00000000..163826fd --- /dev/null +++ b/tests/system/query_generator/test_cohere_query_generator.py @@ -0,0 +1,30 @@ +import pytest + +from canopy.chat_engine.query_generator.cohere import CohereQueryGenerator +from canopy.models.data_models import MessageBase, Role + + +@pytest.fixture +def messages(): + return [ + MessageBase( + role=Role.USER, content="Hello, assistant."), + MessageBase( + role=Role.ASSISTANT, content="Hello, user. How can I assist you?"), + MessageBase( + role=Role.USER, content="How do I init a pinecone client?.") + ] + + +def test_generate_queries(messages): + query_generator = CohereQueryGenerator() + queries = query_generator.generate(messages, max_prompt_tokens=100) + assert queries + assert queries[0].text + + +def test_max_tokens_exceeded_raises_error(messages): + query_generator = CohereQueryGenerator() + + with pytest.raises(ValueError): + query_generator.generate(messages, max_prompt_tokens=10) diff --git a/tests/system/record_encoder/test_sentence_transformers_encoder.py b/tests/system/record_encoder/test_sentence_transformers_encoder.py new file mode 100644 index 00000000..10ed6f6c --- /dev/null +++ b/tests/system/record_encoder/test_sentence_transformers_encoder.py @@ -0,0 +1,61 @@ +import pytest + +from canopy.knowledge_base.models import KBDocChunk +from canopy.knowledge_base.record_encoder.sentence_transformers import ( + SentenceTransformerRecordEncoder +) +from canopy.models.data_models import Query + +documents = [KBDocChunk( + id=f"doc_1_{i}", + text=f"Sample document {i}", + document_id=f"doc_{i}", + metadata={"test": i}, + source="doc_1", +) + for i in range(4) +] + +queries = [Query(text="Sample query 1"), + Query(text="Sample query 2"), + Query(text="Sample query 3"), + Query(text="Sample query 4")] + + +@pytest.fixture +def encoder(): + try: + encoder = SentenceTransformerRecordEncoder(batch_size=2) + except ImportError: + pytest.skip( + "`transformers` extra not installed. Skipping SentenceTransformer system " + "tests" + ) + return encoder + + +def test_dimension(encoder): + assert encoder.dimension == 384 + + +@pytest.mark.parametrize("items,function", + [(documents, "encode_documents"), + (queries, "encode_queries"), + ([], "encode_documents"), + ([], "encode_queries")]) +def test_encode_documents(encoder, items, function): + + encoded_documents = getattr(encoder, function)(items) + + assert len(encoded_documents) == len(items) + assert all(len(encoded.values) == encoder.dimension + for encoded in encoded_documents) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("items,function", + [("aencode_documents", documents), + ("aencode_queries", queries)]) +async def test_aencode_not_implemented(encoder, function, items): + with pytest.raises(NotImplementedError): + await encoder.aencode_queries(items) diff --git a/tests/system/tokenizer/__init__.py b/tests/system/tokenizer/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/system/tokenizer/test_cohere_api_tokenizer.py b/tests/system/tokenizer/test_cohere_api_tokenizer.py new file mode 100644 index 00000000..fcdb6576 --- /dev/null +++ b/tests/system/tokenizer/test_cohere_api_tokenizer.py @@ -0,0 +1,46 @@ +import os + +import pytest + +from canopy.models.data_models import MessageBase, Role +from canopy.tokenizer import CohereAPITokenizer +from ...unit.tokenizer.base_test_tokenizer import BaseTestTokenizer + + +class TestCohereAPITokenizer(BaseTestTokenizer): + @staticmethod + @pytest.fixture(scope="class") + def tokenizer(): + if not os.getenv("CO_API_KEY"): + pytest.skip("Skipping Cohere API tokenizer tests because " + "COHERE_API_KEY environment variable is not set.") + return CohereAPITokenizer(model_name="command") + + @staticmethod + @pytest.fixture + def text(): + return "string with special characters like !@#$%^&*()_+日本 " \ + "spaces \n \n\n CASE cAse " + + @staticmethod + @pytest.fixture + def expected_tokens(text): + return ['string', ' with', ' special', ' characters', ' like', + ' !', '@', '#', '$', '%', '^', '&', '*', '()', '_', '+', '日', + '本', ' spaces', ' ', '\n ', '\n\n', ' CASE', ' c', 'A', + 'se', " "] + + @staticmethod + def test_messages_token_count(tokenizer): + messages = [MessageBase(role=Role.USER, content="Hello, assistant.")] + assert tokenizer.messages_token_count(messages) == 11 + + messages = [MessageBase(role=Role.USER, + content="Hello, assistant."), + MessageBase(role=Role.ASSISTANT, + content="Hello, user. How can I assist you?")] + assert tokenizer.messages_token_count(messages) == 25 + + @staticmethod + def test_messages_token_count_empty_messages(tokenizer): + assert tokenizer.messages_token_count([]) == 3 diff --git a/tests/unit/record_encoder/test_sentence_transformers_encoder.py b/tests/unit/record_encoder/test_sentence_transformers_encoder.py new file mode 100644 index 00000000..cccc5812 --- /dev/null +++ b/tests/unit/record_encoder/test_sentence_transformers_encoder.py @@ -0,0 +1,74 @@ +import pytest + +from canopy.knowledge_base.models import KBDocChunk +from canopy.knowledge_base.record_encoder.sentence_transformers import ( + SentenceTransformerRecordEncoder +) +from canopy.models.data_models import Query + +from unittest.mock import patch + +documents = [KBDocChunk( + id=f"doc_1_{i}", + text=f"Sample document {i}", + document_id=f"doc_{i}", + metadata={"test": i}, + source="doc_1", +) + for i in range(4) +] + +queries = [Query(text="Sample query 1"), + Query(text="Sample query 2"), + Query(text="Sample query 3"), + Query(text="Sample query 4")] + + +@pytest.fixture +def encoder(): + try: + encoder = SentenceTransformerRecordEncoder(batch_size=2) + except ImportError: + pytest.skip( + "`transformers` extra not installed. Skipping SentenceTransformer unit " + "tests" + ) + return encoder + + +def test_dimension(encoder): + with patch('pinecone_text.dense.SentenceTransformerEncoder.encode_documents') \ + as mock_encode_documents: + mock_encode_documents.return_value = [[0.1, 0.2, 0.3]] + assert encoder.dimension == 3 + + +def custom_encode(*args, **kwargs): + input_to_encode = args[0] + return [[0.1, 0.2, 0.3] for _ in input_to_encode] + + +@pytest.mark.parametrize("items,function", + [(documents, "encode_documents"), + (queries, "encode_queries"), + ([], "encode_documents"), + ([], "encode_queries")]) +def test_encode_documents(encoder, items, function): + with patch('pinecone_text.dense.SentenceTransformerEncoder.encode_documents', + side_effect=custom_encode): + with patch('pinecone_text.dense.SentenceTransformerEncoder.encode_queries', + side_effect=custom_encode): + encoded_documents = getattr(encoder, function)(items) + + assert len(encoded_documents) == len(items) + assert all(len(encoded.values) == encoder.dimension + for encoded in encoded_documents) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("items,function", + [("aencode_documents", documents), + ("aencode_queries", queries)]) +async def test_aencode_not_implemented(encoder, function, items): + with pytest.raises(NotImplementedError): + await encoder.aencode_queries(items) diff --git a/tests/unit/tokenizer/test_cohere_hf_tokenizer.py b/tests/unit/tokenizer/test_cohere_hf_tokenizer.py new file mode 100644 index 00000000..79c5961c --- /dev/null +++ b/tests/unit/tokenizer/test_cohere_hf_tokenizer.py @@ -0,0 +1,59 @@ +import pytest +from canopy.tokenizer import CohereHFTokenizer +from canopy.models.data_models import MessageBase, Role +from .base_test_tokenizer import BaseTestTokenizer + + +class TestCohereHFTokenizer(BaseTestTokenizer): + @staticmethod + @pytest.fixture(scope="class") + def tokenizer(): + return CohereHFTokenizer() + + @staticmethod + @pytest.fixture + def expected_tokens(text): + return [ + 'string', + 'Ġwith', + 'Ġspecial', + 'Ġcharacters', + 'Ġlike', + 'Ġ!', + '@', + '#', + '$', + '%', + '^', + '&', + '*', + '()', + '_', + '+', + 'Ġæ', + 'Ĺ', + '¥', + 'æľ¬', + 'Ġspaces', + 'ĠĠĠ', + 'ĊĠ', + 'ĊĊ', + 'ĠCASE', + 'Ġc', + 'A', + 'se', + 'Ġ', + ] + + @staticmethod + def test_messages_token_count(tokenizer): + messages = [MessageBase(role=Role.USER, content="Hello, assistant.")] + assert tokenizer.messages_token_count(messages) == 11 + + messages = [ + MessageBase(role=Role.USER, content="Hello, assistant."), + MessageBase( + role=Role.ASSISTANT, content="Hello, user. How can I assist you?" + ), + ] + assert tokenizer.messages_token_count(messages) == 25