From 2981ab4efa53af8c8e52fe8176c2f9c6cdaaff82 Mon Sep 17 00:00:00 2001 From: Jennifer Hamon Date: Fri, 20 Oct 2023 10:52:47 -0400 Subject: [PATCH] Config / init refactor for state encapsulation (#220) ## Problem Singleton config limits how people can use this SDK and makes testing more difficult. Also, we no longer should fetch projectId and construct data url out of configuration parts, to keep the client agnostic to the format of the data plane url. ## Solution - **State encapsulation using classes** - We want to encapsulate all config-related state into a new class `Config` and a subclass `PineconeConfig`. - `Config` is a vessel for api_key, host, and openapi configuration. It is host-agnostic because it is used by both the control plane (`Pinecone`) and the data plane (`Index`) - `PineconeConfig` is a convenience class to set the controller host for `Pinecone`. - New singleton store `IndexHostStore` means we won't make unnecessary and repetitious calls to describe_index if a user creates multiple `Index` instances referring to the same index. In the old paradigm, this was accomplished with a piece of global state storing off projectId. - **Modified class init for Index** - `Index` class constructor now requires host url instead of an index name. If you use the helper method on a `Pinecone` class instance to instantiate the object, it will manage fetching the host url for you. E.g. `p = Pinecone(api_key='foo'); index = p.Index('my-index')`. ```python # Old way import pinecone pinecone.init(api_key='foo') index = pinecone.Index('my-index') # New way from pinecone import Pinecone client = Pinecone(api_key='foo') index = client.Index('my-index') # New way (alternate, for data-plane only use case) from pinecone import Index index = Index(api_key='foo', host='https://data-url') ``` - **Moving things around** - `manage.py` got moved and renamed to `pinecone/control/pinecone.py`. The functions such as `create_index`, `describe_index`, etc that previously were top-level package exports relying on global state to access config information have been turned into class methods on a new `Pinecone` class defined in this file. - `pinecone/index.py` got moved to `pinecone/data/index.py`. The `pinecone/data` package is a home for all classes used to access the data plane. - Everything `grpc` related got shoved into `pinecone/data/grpc`. - GRPC mess in one place only. Utils that were grpc-related got pulled out of `pinecone/utils.py` and moved to `pinecone/data/grpc/utils.py`. - Leftover util junk moved into a `deprecated` package. Probably will be deleted soon. - **Breaking down big files into small files** - The file previously at `pinecone/index_grpc.py` to split up into many smaller files inside `pinecone/data/grpc/` package. - Remaining util functions in `pinecone/utils.py` got broken up into many smaller files in `pinecone/utils/` - **Fix existing tests** - Needed to adjust a lot of import statements and sometimes setup functions to get pre-existing tests running. ## Using it ```python # We're going to use a new Pinecone class instead of interacting directly with the package # namespace as in the past from pinecone import Pinecone # No more init() method p = Pinecone(api_key='foo') # Target an index index = p.Index('my-index') # Use data plane methods index.describe_index_stats() ``` **Mostly working** - `describe_index` (call succeeds but need to follow up on source_collection field, would ideally return different object for different capacity_modes) **Somewhat working**: - `create_index` (API call succeeds, but client blows up when response is not the expected shape) - `delete_index` (only when timeout=-1, since otherwise it has dependency on `list_indexes`) **Broken**: - `list_indexes` (response not expected shape) ## Type of Change - [x] New feature (non-breaking change which adds functionality) - [x] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [x] This change requires a documentation update ## Test Plan Automated tests will be expanded in a future PR. A bunch of code moved around to different files or got split from big files into small files and I want to merge this part of the change to unlock parallel work by Austin various follow-ups. --- pinecone/__init__.py | 18 +- pinecone/config.py | 260 -------------- pinecone/config/__init__.py | 2 + pinecone/config/config.py | 69 ++++ pinecone/config/logging.py | 8 + pinecone/config/openapi.py | 78 +++++ pinecone/config/pinecone_config.py | 10 + pinecone/control/index_host_store.py | 39 +++ .../control/models/collection_description.py | 7 + pinecone/control/models/index_description.py | 13 + pinecone/control/pinecone.py | 277 +++++++++++++++ .../core/client/api/index_operations_api.py | 18 +- pinecone/core/client/models/create_request.py | 2 +- .../core/client/models/index_meta_database.py | 2 +- pinecone/data/__init__.py | 2 + pinecone/{ => data}/grpc/__init__.py | 0 pinecone/data/grpc/base.py | 161 +++++++++ pinecone/data/grpc/config.py | 33 ++ pinecone/data/grpc/future.py | 34 ++ pinecone/{ => data}/grpc/index_grpc.py | 319 +----------------- .../grpc/protos/vector_column_service_pb2.py | 2 +- .../grpc/protos/vector_column_service_pb2.pyi | 2 +- .../protos/vector_column_service_pb2_grpc.py | 2 +- pinecone/{ => data}/grpc/retry.py | 2 +- pinecone/data/grpc/utils.py | 107 ++++++ pinecone/{ => data}/index.py | 48 +-- pinecone/deprecated/__init__.py | 1 + pinecone/deprecated/legacy_utils.py | 88 +++++ pinecone/exceptions.py | 4 + pinecone/manage.py | 300 ---------------- pinecone/utils/__init__.py | 141 +------- pinecone/utils/check_kwargs.py | 8 + pinecone/utils/constants.py | 11 +- pinecone/utils/deprecation_notice.py | 5 + pinecone/utils/environment.py | 8 + pinecone/utils/error_handling.py | 9 +- pinecone/utils/fix_tuple_length.py | 3 + pinecone/utils/user_agent.py | 10 + tests/unit/test_config.py | 237 +++---------- tests/unit/test_control.py | 66 ++++ tests/unit/test_grpc_index.py | 9 +- tests/unit/test_index.py | 9 +- tests/unit/test_manage.py | 54 --- 43 files changed, 1162 insertions(+), 1316 deletions(-) delete mode 100644 pinecone/config.py create mode 100644 pinecone/config/__init__.py create mode 100644 pinecone/config/config.py create mode 100644 pinecone/config/logging.py create mode 100644 pinecone/config/openapi.py create mode 100644 pinecone/config/pinecone_config.py create mode 100644 pinecone/control/index_host_store.py create mode 100644 pinecone/control/models/collection_description.py create mode 100644 pinecone/control/models/index_description.py create mode 100644 pinecone/control/pinecone.py create mode 100644 pinecone/data/__init__.py rename pinecone/{ => data}/grpc/__init__.py (100%) create mode 100644 pinecone/data/grpc/base.py create mode 100644 pinecone/data/grpc/config.py create mode 100644 pinecone/data/grpc/future.py rename pinecone/{ => data}/grpc/index_grpc.py (71%) rename pinecone/{ => data}/grpc/protos/vector_column_service_pb2.py (99%) rename pinecone/{ => data}/grpc/protos/vector_column_service_pb2.pyi (99%) rename pinecone/{ => data}/grpc/protos/vector_column_service_pb2_grpc.py (99%) rename pinecone/{ => data}/grpc/retry.py (99%) create mode 100644 pinecone/data/grpc/utils.py rename pinecone/{ => data}/index.py (95%) create mode 100644 pinecone/deprecated/__init__.py create mode 100644 pinecone/deprecated/legacy_utils.py delete mode 100644 pinecone/manage.py create mode 100644 pinecone/utils/check_kwargs.py create mode 100644 pinecone/utils/deprecation_notice.py create mode 100644 pinecone/utils/environment.py create mode 100644 pinecone/utils/fix_tuple_length.py create mode 100644 pinecone/utils/user_agent.py create mode 100644 tests/unit/test_control.py delete mode 100644 tests/unit/test_manage.py diff --git a/pinecone/__init__.py b/pinecone/__init__.py index 8ade68bc..a614ff28 100644 --- a/pinecone/__init__.py +++ b/pinecone/__init__.py @@ -3,22 +3,10 @@ """ from .config import * from .exceptions import * -from .manage import * -from .index import * +from .control.pinecone import Pinecone +from .data.index import * try: - from .grpc.index_grpc import * + from .data.grpc.index_grpc import * except ImportError: pass # ignore for non-[grpc] installations - -# Kept for backwards-compatibility -UpsertResult = None -"""@private""" -DeleteResult = None -"""@private""" -QueryResult = None -"""@private""" -FetchResult = None -"""@private""" -InfoResult = None -"""@private""" diff --git a/pinecone/config.py b/pinecone/config.py deleted file mode 100644 index f4bc276c..00000000 --- a/pinecone/config.py +++ /dev/null @@ -1,260 +0,0 @@ -import logging -import sys -from typing import NamedTuple, List -import os - -import certifi -import requests -import configparser -import socket - -from urllib3.connection import HTTPConnection - -from pinecone.core.client.exceptions import ApiKeyError -from pinecone.utils import warn_deprecated, check_kwargs -from pinecone.utils.constants import ( - CLIENT_VERSION, - PARENT_LOGGER_NAME, - DEFAULT_PARENT_LOGGER_LEVEL, - TCP_KEEPIDLE, - TCP_KEEPINTVL, - TCP_KEEPCNT, -) -from pinecone.core.client.configuration import Configuration as OpenApiConfiguration - -__all__ = ["Config", "init"] - -_logger = logging.getLogger(__name__) -_parent_logger = logging.getLogger(PARENT_LOGGER_NAME) -_parent_logger.setLevel(DEFAULT_PARENT_LOGGER_LEVEL) - - -class ConfigBase(NamedTuple): - environment: str = "" - api_key: str = "" - project_name: str = "" - controller_host: str = "" - openapi_config: OpenApiConfiguration = None - - -class _CONFIG: - """ - - Order of configs to load: - - - configs specified explicitly in reset - - environment variables - - configs specified in the INI file - - default configs - """ - - def __init__(self): - self.reset() - - def validate(self): - if not self._config.api_key: # None or empty string invalid - raise ApiKeyError("You haven't specified an Api-Key.") - - def reset(self, config_file=None, **kwargs): - config = ConfigBase() - - # Load config from file - file_config = self._load_config_file(config_file) - - # Get the environment first. Make sure that it is not overwritten in subsequent config objects. - environment = ( - kwargs.pop("environment", None) - or os.getenv("PINECONE_ENVIRONMENT") - or file_config.pop("environment", None) - or "us-west1-gcp" - ) - config = config._replace(environment=environment) - - # Set default config - default_config = ConfigBase( - controller_host="https://controller.{0}.pinecone.io".format(config.environment), - ) - config = config._replace(**self._preprocess_and_validate_config(default_config._asdict())) - - # Set INI file config - config = config._replace(**self._preprocess_and_validate_config(file_config)) - - # Set environment config - env_config = ConfigBase( - project_name=os.getenv("PINECONE_PROJECT_NAME"), - api_key=os.getenv("PINECONE_API_KEY"), - controller_host=os.getenv("PINECONE_CONTROLLER_HOST"), - ) - config = config._replace(**self._preprocess_and_validate_config(env_config._asdict())) - - # Set explicit config - config = config._replace(**self._preprocess_and_validate_config(kwargs)) - - self._config = config - - # Set OpenAPI client config - default_openapi_config = OpenApiConfiguration.get_default_copy() - default_openapi_config.ssl_ca_cert = certifi.where() - openapi_config = kwargs.pop("openapi_config", None) or default_openapi_config - - openapi_config.socket_options = self._get_socket_options() - - config = config._replace(openapi_config=openapi_config) - self._config = config - - def _preprocess_and_validate_config(self, config: dict) -> dict: - """Normalize, filter, and validate config keys/values. - - Trims whitespace, removes invalid keys (and the "environment" key), - and raises ValueError in case an invalid value was specified. - """ - # general preprocessing and filtering - result = {k: v for k, v in config.items() if k in ConfigBase._fields if v is not None} - result.pop("environment", None) - # validate api key - api_key = result.get("api_key") - # if api_key: - # try: - # uuid.UUID(api_key) - # except ValueError as e: - # raise ValueError(f"Pinecone API key \"{api_key}\" appears invalid. " - # f"Did you specify it correctly?") from e - return result - - def _load_config_file(self, config_file: str) -> dict: - """Load from INI config file.""" - config_obj = {} - if config_file: - full_path = os.path.expanduser(config_file) - if os.path.isfile(full_path): - parser = configparser.ConfigParser() - parser.read(full_path) - if "default" in parser.sections(): - config_obj = {**parser["default"]} - return config_obj - - @staticmethod - def _get_socket_options( - do_keep_alive: bool = True, - keep_alive_idle_sec: int = TCP_KEEPIDLE, - keep_alive_interval_sec: int = TCP_KEEPINTVL, - keep_alive_tries: int = TCP_KEEPCNT, - ) -> List[tuple]: - """ - Returns the socket options to pass to OpenAPI's Rest client - Args: - do_keep_alive: Whether to enable TCP keep alive mechanism - keep_alive_idle_sec: Time in seconds of connection idleness before starting to send keep alive probes - keep_alive_interval_sec: Interval time in seconds between keep alive probe messages - keep_alive_tries: Number of failed keep alive tries (unanswered KA messages) before terminating the connection - - Returns: - A list of socket options for the Rest client's connection pool - """ - # Source: https://www.finbourne.com/blog/the-mysterious-hanging-client-tcp-keep-alives - - socket_params = HTTPConnection.default_socket_options - if not do_keep_alive: - return socket_params - - socket_params += [(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)] - - # TCP Keep Alive Probes for different platforms - platform = sys.platform - # TCP Keep Alive Probes for Linux - if ( - platform == "linux" - and hasattr(socket, "TCP_KEEPIDLE") - and hasattr(socket, "TCP_KEEPINTVL") - and hasattr(socket, "TCP_KEEPCNT") - ): - socket_params += [(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, keep_alive_idle_sec)] - socket_params += [(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, keep_alive_interval_sec)] - socket_params += [(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, keep_alive_tries)] - - # TCP Keep Alive Probes for Windows OS - # NOTE: Changing TCP KA params on windows is done via a different mechanism which OpenAPI's Rest client doesn't expose. - # Since the default values work well, it seems setting `(socket.SO_KEEPALIVE, 1)` is sufficient. - # Leaving this code here for future reference. - # elif platform == 'win32' and hasattr(socket, "SIO_KEEPALIVE_VALS"): - # socket.ioctl((socket.SIO_KEEPALIVE_VALS, (1, keep_alive_idle_sec * 1000, keep_alive_interval_sec * 1000))) - - # TCP Keep Alive Probes for Mac OS - elif platform == "darwin": - TCP_KEEPALIVE = 0x10 - socket_params += [(socket.IPPROTO_TCP, TCP_KEEPALIVE, keep_alive_interval_sec)] - - return socket_params - - @property - def ENVIRONMENT(self): - return self._config.environment - - @property - def API_KEY(self): - return self._config.api_key - - @property - def PROJECT_NAME(self): - return self._config.project_name - - @property - def CONTROLLER_HOST(self): - return self._config.controller_host - - @property - def OPENAPI_CONFIG(self): - return self._config.openapi_config - - @property - def LOG_LEVEL(self): - """ - Deprecated since v2.0.2 [Will be removed in v3.0.0]; use the standard logging module logger "pinecone" instead. - """ - warn_deprecated( - description='LOG_LEVEL is deprecated. Use the standard logging module logger "pinecone" instead.', - deprecated_in="2.0.2", - removal_in="3.0.0", - ) - return logging.getLevelName(logging.getLogger("pinecone").level) - - -def init( - api_key: str = None, - host: str = None, - environment: str = None, - log_level: str = None, - openapi_config: OpenApiConfiguration = None, - config: str = "~/.pinecone", - **kwargs -): - """Initializes the Pinecone client. - - :param api_key: Required if not set in config file or by environment variable ``PINECONE_API_KEY``. - :param host: Optional. Controller host. - :param environment: Optional. Deployment environment. - :param openapi_config: Optional. Set OpenAPI client configuration. - :param config: Optional. An INI configuration file. - :param log_level: Deprecated since v2.0.2 [Will be removed in v3.0.0]; use the standard logging module to manage logger "pinecone" instead. - """ - check_kwargs(init, kwargs) - Config.reset( - api_key=api_key, - controller_host=host, - environment=environment, - openapi_config=openapi_config, - config_file=config, - **kwargs - ) - if log_level: - warn_deprecated( - description='log_level is deprecated. Use the standard logging module to manage logger "pinecone" instead.', - deprecated_in="2.0.2", - removal_in="3.0.0", - ) - - -Config = _CONFIG() - -# Init -init() diff --git a/pinecone/config/__init__.py b/pinecone/config/__init__.py new file mode 100644 index 00000000..42784d1e --- /dev/null +++ b/pinecone/config/__init__.py @@ -0,0 +1,2 @@ +from .config import Config +from .pinecone_config import PineconeConfig diff --git a/pinecone/config/config.py b/pinecone/config/config.py new file mode 100644 index 00000000..886d01b5 --- /dev/null +++ b/pinecone/config/config.py @@ -0,0 +1,69 @@ +from typing import NamedTuple +import os + +from pinecone.utils import check_kwargs +from pinecone.exceptions import PineconeConfigurationError +from pinecone.core.client.exceptions import ApiKeyError +from pinecone.config.openapi import OpenApiConfigFactory +from pinecone.core.client.configuration import Configuration as OpenApiConfiguration + + +class ConfigBase(NamedTuple): + api_key: str = "" + host: str = "" + openapi_config: OpenApiConfiguration = None + + +class Config: + """ + + Configurations are resolved in the following order: + + - configs passed as keyword parameters + - configs specified in environment variables + - default values (if applicable) + """ + + """Initializes the Pinecone client. + + :param api_key: Required if not set in config file or by environment variable ``PINECONE_API_KEY``. + :param host: Optional. Controller host. + :param openapi_config: Optional. Set OpenAPI client configuration. + """ + + def __init__( + self, + api_key: str = None, + host: str = None, + openapi_config: OpenApiConfiguration = None, + **kwargs, + ): + api_key = api_key or kwargs.pop("api_key", None) or os.getenv("PINECONE_API_KEY") + host = host or kwargs.pop("host", None) + openapi_config = ( + openapi_config + or kwargs.pop("openapi_config", None) + or OpenApiConfigFactory.build(api_key=api_key, host=host) + ) + + check_kwargs(self.__init__, kwargs) + self._config = ConfigBase(api_key, host, openapi_config) + self.validate() + + def validate(self): + if not self._config.api_key: + raise PineconeConfigurationError("You haven't specified an Api-Key.") + if not self._config.host: + raise PineconeConfigurationError("You haven't specified a host.") + + @property + def API_KEY(self): + return self._config.api_key + + @property + def HOST(self): + return self._config.host + + @property + def OPENAPI_CONFIG(self): + return self._config.openapi_config diff --git a/pinecone/config/logging.py b/pinecone/config/logging.py new file mode 100644 index 00000000..51c78855 --- /dev/null +++ b/pinecone/config/logging.py @@ -0,0 +1,8 @@ +import logging + +PARENT_LOGGER_NAME = "pinecone" +DEFAULT_PARENT_LOGGER_LEVEL = "ERROR" + +_logger = logging.getLogger(__name__) +_parent_logger = logging.getLogger(PARENT_LOGGER_NAME) +_parent_logger.setLevel(DEFAULT_PARENT_LOGGER_LEVEL) diff --git a/pinecone/config/openapi.py b/pinecone/config/openapi.py new file mode 100644 index 00000000..952a672d --- /dev/null +++ b/pinecone/config/openapi.py @@ -0,0 +1,78 @@ +import sys +from typing import List + +import certifi +import requests +import socket + +from urllib3.connection import HTTPConnection + +from pinecone.core.client.configuration import Configuration as OpenApiConfiguration + +TCP_KEEPINTVL = 60 # Sec +TCP_KEEPIDLE = 300 # Sec +TCP_KEEPCNT = 4 + + +class OpenApiConfigFactory: + @classmethod + def build(cls, api_key: str, host: str = None, **kwargs): + openapi_config = OpenApiConfiguration.get_default() + openapi_config.host = host + openapi_config.ssl_ca_cert = certifi.where() + openapi_config.socket_options = cls._get_socket_options() + openapi_config.api_key = {"ApiKeyAuth": api_key} + return openapi_config + + @classmethod + def _get_socket_options( + do_keep_alive: bool = True, + keep_alive_idle_sec: int = TCP_KEEPIDLE, + keep_alive_interval_sec: int = TCP_KEEPINTVL, + keep_alive_tries: int = TCP_KEEPCNT, + ) -> List[tuple]: + """ + Returns the socket options to pass to OpenAPI's Rest client + Args: + do_keep_alive: Whether to enable TCP keep alive mechanism + keep_alive_idle_sec: Time in seconds of connection idleness before starting to send keep alive probes + keep_alive_interval_sec: Interval time in seconds between keep alive probe messages + keep_alive_tries: Number of failed keep alive tries (unanswered KA messages) before terminating the connection + + Returns: + A list of socket options for the Rest client's connection pool + """ + # Source: https://www.finbourne.com/blog/the-mysterious-hanging-client-tcp-keep-alives + + socket_params = HTTPConnection.default_socket_options + if not do_keep_alive: + return socket_params + + socket_params += [(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)] + + # TCP Keep Alive Probes for different platforms + platform = sys.platform + # TCP Keep Alive Probes for Linux + if ( + platform == "linux" + and hasattr(socket, "TCP_KEEPIDLE") + and hasattr(socket, "TCP_KEEPINTVL") + and hasattr(socket, "TCP_KEEPCNT") + ): + socket_params += [(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, keep_alive_idle_sec)] + socket_params += [(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, keep_alive_interval_sec)] + socket_params += [(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, keep_alive_tries)] + + # TCP Keep Alive Probes for Windows OS + # NOTE: Changing TCP KA params on windows is done via a different mechanism which OpenAPI's Rest client doesn't expose. + # Since the default values work well, it seems setting `(socket.SO_KEEPALIVE, 1)` is sufficient. + # Leaving this code here for future reference. + # elif platform == 'win32' and hasattr(socket, "SIO_KEEPALIVE_VALS"): + # socket.ioctl((socket.SIO_KEEPALIVE_VALS, (1, keep_alive_idle_sec * 1000, keep_alive_interval_sec * 1000))) + + # TCP Keep Alive Probes for Mac OS + elif platform == "darwin": + TCP_KEEPALIVE = 0x10 + socket_params += [(socket.IPPROTO_TCP, TCP_KEEPALIVE, keep_alive_interval_sec)] + + return socket_params diff --git a/pinecone/config/pinecone_config.py b/pinecone/config/pinecone_config.py new file mode 100644 index 00000000..64f879a8 --- /dev/null +++ b/pinecone/config/pinecone_config.py @@ -0,0 +1,10 @@ +import os +from .config import Config + +DEFAULT_CONTROLLER_HOST = "https://api.pinecone.io" + + +class PineconeConfig(Config): + def __init__(self, api_key: str = None, host: str = None, **kwargs): + host = host or kwargs.get("host") or os.getenv("PINECONE_CONTROLLER_HOST") or DEFAULT_CONTROLLER_HOST + super().__init__(api_key=api_key, host=host, **kwargs) diff --git a/pinecone/control/index_host_store.py b/pinecone/control/index_host_store.py new file mode 100644 index 00000000..efe9ea5d --- /dev/null +++ b/pinecone/control/index_host_store.py @@ -0,0 +1,39 @@ +from pinecone.config import Config +from pinecone.core.client.api.index_operations_api import IndexOperationsApi + + +class SingletonMeta(type): + _instances = {} + + def __call__(cls, *args, **kwargs): + if cls not in cls._instances: + instance = super().__call__(*args, **kwargs) + cls._instances[cls] = instance + return cls._instances[cls] + + +class IndexHostStore(metaclass=SingletonMeta): + def __init__(self): + self._indexHosts = {} + + def _key(self, config: Config, index_name: str) -> str: + return ":".join([config.API_KEY, index_name]) + + def delete_host(self, config: Config, index_name: str): + key = self._key(config, index_name) + if key in self._indexHosts: + del self._indexHosts[key] + + def set_host(self, config: Config, index_name: str, host: str): + if host: + key = self._key(config, index_name) + self._indexHosts[key] = "https://" + host + + def get_host(self, api: IndexOperationsApi, config: Config, index_name: str) -> str: + key = self._key(config, index_name) + if key in self._indexHosts: + return self._indexHosts[key] + else: + description = api.describe_index(index_name) + self.set_host(config, index_name, description.status.host) + return self._indexHosts[key] diff --git a/pinecone/control/models/collection_description.py b/pinecone/control/models/collection_description.py new file mode 100644 index 00000000..ff3f4b41 --- /dev/null +++ b/pinecone/control/models/collection_description.py @@ -0,0 +1,7 @@ +class CollectionDescription(object): + def __init__(self, keys, values): + for k, v in zip(keys, values): + self.__dict__[k] = v + + def __str__(self): + return str(self.__dict__) diff --git a/pinecone/control/models/index_description.py b/pinecone/control/models/index_description.py new file mode 100644 index 00000000..e86ffbd5 --- /dev/null +++ b/pinecone/control/models/index_description.py @@ -0,0 +1,13 @@ +from typing import NamedTuple + + +class IndexDescription(NamedTuple): + name: str + metric: str + replicas: int + dimension: int + shards: int + pods: int + pod_type: str + status: None + metadata_config: None diff --git a/pinecone/control/pinecone.py b/pinecone/control/pinecone.py new file mode 100644 index 00000000..c6d3037b --- /dev/null +++ b/pinecone/control/pinecone.py @@ -0,0 +1,277 @@ +import time +from typing import Optional + +from .index_host_store import IndexHostStore + +from pinecone.config import PineconeConfig, Config + +from pinecone.control.models.index_description import IndexDescription +from pinecone.control.models.collection_description import CollectionDescription + +from pinecone.core.client.api.index_operations_api import IndexOperationsApi +from pinecone.core.client.api_client import ApiClient +from pinecone.core.client.models.create_request import CreateRequest +from pinecone.core.client.models.patch_request import PatchRequest +from pinecone.core.client.models.create_collection_request import CreateCollectionRequest +from pinecone.utils import get_user_agent + +from pinecone.data import Index +from pinecone.data import GRPCIndex + + +class Pinecone: + def __init__( + self, + api_key: str = None, + host: str = None, + config: Config = None, + index_api: IndexOperationsApi = None, + **kwargs, + ): + if config or kwargs.get("config"): + self.config = config or kwargs.get("config") + else: + self.config = PineconeConfig(api_key=api_key, host=host, **kwargs) + + if index_api: + self.index_api = index_api + else: + api_client = ApiClient(configuration=self.config.OPENAPI_CONFIG) + api_client.user_agent = get_user_agent() + self.index_api = IndexOperationsApi(api_client) + + self.index_host_store = IndexHostStore() + + def create_index( + self, + name: str, + dimension: int, + cloud: str, + region: str, + capacity_mode: str, + timeout: int = None, + index_type: str = "approximated", + metric: str = "cosine", + replicas: int = 1, + shards: int = 1, + pods: int = 1, + pod_type: str = "p1", + index_config: dict = None, + metadata_config: dict = None, + source_collection: str = "", + ): + """Creates a Pinecone index. + + :param name: the name of the index. + :type name: str + :param dimension: the dimension of vectors that would be inserted in the index + :param cloud: The cloud where you would like your index hosted. One of `{"aws", "gcp"}`. + :param region: The region where you would like your index hosted. + :param capacity_mode: The capacity mode for the index. One of `{"pod"}`. + :param index_type: type of index, one of `{"approximated", "exact"}`, defaults to "approximated". + The "approximated" index uses fast approximate search algorithms developed by Pinecone. + The "exact" index uses accurate exact search algorithms. + It performs exhaustive searches and thus it is usually slower than the "approximated" index. + :type index_type: str, optional + :param metric: type of metric used in the vector index, one of `{"cosine", "dotproduct", "euclidean"}`, defaults to "cosine". + Use "cosine" for cosine similarity, + "dotproduct" for dot-product, + and "euclidean" for euclidean distance. + :type metric: str, optional + :param replicas: the number of replicas, defaults to 1. + Use at least 2 replicas if you need high availability (99.99% uptime) for querying. + For additional throughput (QPS) your index needs to support, provision additional replicas. + :type replicas: int, optional + :param shards: the number of shards per index, defaults to 1. + Use 1 shard per 1GB of vectors + :type shards: int,optional + :param pods: Total number of pods to be used by the index. pods = shard*replicas + :type pods: int,optional + :param pod_type: the pod type to be used for the index. can be one of p1 or s1. + :type pod_type: str,optional + :param index_config: Advanced configuration options for the index + :param metadata_config: Configuration related to the metadata index + :type metadata_config: dict, optional + :param source_collection: Collection name to create the index from + :type metadata_config: str, optional + :type timeout: int, optional + :param timeout: Timeout for wait until index gets ready. If None, wait indefinitely; if >=0, time out after this many seconds; + if -1, return immediately and do not wait. Default: None + """ + api_instance = self.index_api + + api_instance.create_index( + create_request=CreateRequest( + name=name, + dimension=dimension, + cloud=cloud, + region=region, + capacity_mode=capacity_mode, + index_type=index_type, + metric=metric, + replicas=replicas, + shards=shards, + pods=pods, + pod_type=pod_type, + index_config=index_config or {}, + metadata_config=metadata_config, + source_collection=source_collection, + ) + ) + + def is_ready(): + status = self._get_status(name) + ready = status["ready"] + return ready + + if timeout == -1: + return + if timeout is None: + while not is_ready(): + time.sleep(5) + else: + while (not is_ready()) and timeout >= 0: + time.sleep(5) + timeout -= 5 + if timeout and timeout < 0: + raise ( + TimeoutError( + "Please call the describe_index API ({}) to confirm index status.".format( + "https://www.pinecone.io/docs/api/operation/describe_index/" + ) + ) + ) + + def delete_index(self, name: str, timeout: int = None): + """Deletes a Pinecone index. + + :param name: the name of the index. + :type name: str + :param timeout: Timeout for wait until index gets ready. If None, wait indefinitely; if >=0, time out after this many seconds; + if -1, return immediately and do not wait. Default: None + :type timeout: int, optional + """ + api_instance = self.index_api + api_instance.delete_index(name) + self.index_host_store.delete_host(self.config, name) + + def get_remaining(): + return name in api_instance.list_indexes() + + if timeout == -1: + return + + if timeout is None: + while get_remaining(): + time.sleep(5) + else: + while get_remaining() and timeout >= 0: + time.sleep(5) + timeout -= 5 + if timeout and timeout < 0: + raise ( + TimeoutError( + "Please call the list_indexes API ({}) to confirm if index is deleted".format( + "https://www.pinecone.io/docs/api/operation/list_indexes/" + ) + ) + ) + + def list_indexes(self): + """Lists all indexes.""" + response = self.index_api.list_indexes() + return response + + def describe_index(self, name: str): + """Describes a Pinecone index. + + :param name: the name of the index to describe. + :return: Returns an `IndexDescription` object + """ + api_instance = self.index_api + response = api_instance.describe_index(name) + db = response.database + host = response.status.host + + self.index_host_store.set_host(self.config, name, host) + + return IndexDescription( + name=db.name, + metric=db.metric, + replicas=db.replicas, + dimension=db.dimension, + shards=db.shards, + pods=db.pods, + pod_type=db.pod_type, + status=response.status, + metadata_config=db.metadata_config, + ) + + def configure_index(self, name: str, replicas: Optional[int] = None, pod_type: Optional[str] = ""): + """Changes current configuration of the index. + :param: name: the name of the Index + :param: replicas: the desired number of replicas, lowest value is 0. + :param: pod_type: the new pod_type for the index. + """ + api_instance = self.index_api + config_args = {} + if pod_type != "": + config_args.update(pod_type=pod_type) + if replicas: + config_args.update(replicas=replicas) + patch_request = PatchRequest(**config_args) + api_instance.configure_index(name, patch_request=patch_request) + + def scale_index(self, name: str, replicas: int): + """Increases number of replicas for the index. + + :param name: the name of the Index + :type name: str + :param replicas: the number of replicas in the index now, lowest value is 0. + :type replicas: int + """ + api_instance = self.index_api + api_instance.configure_index(name, patch_request=PatchRequest(replicas=replicas, pod_type="")) + + def create_collection(self, name: str, source: str): + """Create a collection + :param name: Name of the collection + :param source: Name of the source index + """ + api_instance = self.index_api + api_instance.create_collection(create_collection_request=CreateCollectionRequest(name=name, source=source)) + + def list_collections(self): + """List all collections""" + api_instance = self.index_api + response = api_instance.list_collections() + return response + + def delete_collection(self, name: str): + """Deletes a collection. + :param: name: The name of the collection + """ + api_instance = self.index_api + api_instance.delete_collection(name) + + def describe_collection(self, name: str): + """Describes a collection. + :param: The name of the collection + :return: Description of the collection + """ + api_instance = self.index_api + response = api_instance.describe_collection(name).to_dict() + response_object = CollectionDescription(response.keys(), response.values()) + return response_object + + def _get_status(self, name: str): + api_instance = self.index_api + response = api_instance.describe_index(name) + return response["status"] + + def Index(self, name: str): + index_host = self.index_host_store.get_host(self.index_api, self.config, name) + return Index(api_key=self.config.API_KEY, host=index_host) + + def GRPCIndex(self, name: str): + return GRPCIndex(self.config, name) diff --git a/pinecone/core/client/api/index_operations_api.py b/pinecone/core/client/api/index_operations_api.py index 5c33010f..52f58178 100644 --- a/pinecone/core/client/api/index_operations_api.py +++ b/pinecone/core/client/api/index_operations_api.py @@ -134,7 +134,7 @@ def configure_index_with_http_info( :rtype: tuple(IndexMeta, status_code(int), headers(HTTPHeaderDict)) """ - _hosts = ["https://controller.{environment}.pinecone.io"] + _hosts = ["https://api.pinecone.io"] _host = _hosts[0] if kwargs.get("_host_index"): _host_index = int(kwargs.get("_host_index")) @@ -294,7 +294,7 @@ def create_collection_with_http_info( :rtype: tuple(str, status_code(int), headers(HTTPHeaderDict)) """ - _hosts = ["https://controller.{environment}.pinecone.io"] + _hosts = ["https://api.pinecone.io"] _host = _hosts[0] if kwargs.get("_host_index"): _host_index = int(kwargs.get("_host_index")) @@ -450,7 +450,7 @@ def create_index_with_http_info( :rtype: tuple(IndexMeta, status_code(int), headers(HTTPHeaderDict)) """ - _hosts = ["https://controller.{environment}.pinecone.io"] + _hosts = ["https://api.pinecone.io"] _host = _hosts[0] if kwargs.get("_host_index"): _host_index = int(kwargs.get("_host_index")) @@ -608,7 +608,7 @@ def delete_collection_with_http_info( :rtype: tuple(str, status_code(int), headers(HTTPHeaderDict)) """ - _hosts = ["https://controller.{environment}.pinecone.io"] + _hosts = ["https://api.pinecone.io"] _host = _hosts[0] if kwargs.get("_host_index"): _host_index = int(kwargs.get("_host_index")) @@ -757,7 +757,7 @@ def delete_index_with_http_info( :rtype: tuple(str, status_code(int), headers(HTTPHeaderDict)) """ - _hosts = ["https://controller.{environment}.pinecone.io"] + _hosts = ["https://api.pinecone.io"] _host = _hosts[0] if kwargs.get("_host_index"): _host_index = int(kwargs.get("_host_index")) @@ -906,7 +906,7 @@ def describe_collection_with_http_info( :rtype: tuple(CollectionMeta, status_code(int), headers(HTTPHeaderDict)) """ - _hosts = ["https://controller.{environment}.pinecone.io"] + _hosts = ["https://api.pinecone.io"] _host = _hosts[0] if kwargs.get("_host_index"): _host_index = int(kwargs.get("_host_index")) @@ -1055,7 +1055,7 @@ def describe_index_with_http_info( :rtype: tuple(IndexMeta, status_code(int), headers(HTTPHeaderDict)) """ - _hosts = ["https://controller.{environment}.pinecone.io"] + _hosts = ["https://api.pinecone.io"] _host = _hosts[0] if kwargs.get("_host_index"): _host_index = int(kwargs.get("_host_index")) @@ -1196,7 +1196,7 @@ def list_collections_with_http_info(self, **kwargs) -> ApiResponse: # noqa: E50 :rtype: tuple(List[str], status_code(int), headers(HTTPHeaderDict)) """ - _hosts = ["https://controller.{environment}.pinecone.io"] + _hosts = ["https://api.pinecone.io"] _host = _hosts[0] if kwargs.get("_host_index"): _host_index = int(kwargs.get("_host_index")) @@ -1335,7 +1335,7 @@ def list_indexes_with_http_info(self, **kwargs) -> ApiResponse: # noqa: E501 :rtype: tuple(ListIndexes200Response, status_code(int), headers(HTTPHeaderDict)) """ - _hosts = ["https://controller.{environment}.pinecone.io"] + _hosts = ["https://api.pinecone.io"] _host = _hosts[0] if kwargs.get("_host_index"): _host_index = int(kwargs.get("_host_index")) diff --git a/pinecone/core/client/models/create_request.py b/pinecone/core/client/models/create_request.py index 9c6c9517..87bedfd3 100644 --- a/pinecone/core/client/models/create_request.py +++ b/pinecone/core/client/models/create_request.py @@ -35,7 +35,7 @@ class CreateRequest(BaseModel): dimension: StrictInt = Field(..., description="The dimensions of the vectors to be inserted in the index") region: StrictStr = Field(..., description="The region where you would like your index to be created") cloud: StrictStr = Field(..., description="The public cloud where you would like your index hosted") - capacity_mode: StrictStr = Field(..., description="The capacity mode for the index") + capacity_mode: StrictStr = Field(..., description="The capacity mode for the index.") index_type: Optional[StrictStr] = Field( "approximated", description="The type of vector index. Pinecone supports 'approximated'." ) diff --git a/pinecone/core/client/models/index_meta_database.py b/pinecone/core/client/models/index_meta_database.py index ba1af18e..7d0970e1 100644 --- a/pinecone/core/client/models/index_meta_database.py +++ b/pinecone/core/client/models/index_meta_database.py @@ -30,7 +30,7 @@ class IndexMetaDatabase(BaseModel): """ name: StrictStr = Field(...) - dimension: StrictStr = Field(...) + dimension: StrictInt = Field(...) capacity_mode: StrictStr = Field(...) index_type: Optional[StrictStr] = None metric: StrictStr = Field(...) diff --git a/pinecone/data/__init__.py b/pinecone/data/__init__.py new file mode 100644 index 00000000..7d4f066e --- /dev/null +++ b/pinecone/data/__init__.py @@ -0,0 +1,2 @@ +from .index import Index +from .grpc.index_grpc import GRPCIndex \ No newline at end of file diff --git a/pinecone/grpc/__init__.py b/pinecone/data/grpc/__init__.py similarity index 100% rename from pinecone/grpc/__init__.py rename to pinecone/data/grpc/__init__.py diff --git a/pinecone/data/grpc/base.py b/pinecone/data/grpc/base.py new file mode 100644 index 00000000..3452f596 --- /dev/null +++ b/pinecone/data/grpc/base.py @@ -0,0 +1,161 @@ +import logging +from abc import ABC, abstractmethod +from functools import wraps +from typing import NamedTuple, Optional, Dict + +import certifi +import grpc +from grpc._channel import _InactiveRpcError +import json + +from .retry import RetryConfig + +from pinecone import Config +from .utils import _generate_request_id +from .config import GRPCClientConfig +from pinecone.utils.constants import MAX_MSG_SIZE, REQUEST_ID, CLIENT_VERSION +from pinecone.exceptions import PineconeException + +_logger = logging.getLogger(__name__) + +class GRPCIndexBase(ABC): + """ + Base class for grpc-based interaction with Pinecone indexes + """ + + _pool = None + + def __init__( + self, + index_name: str, + config: Config, + channel=None, + grpc_config: GRPCClientConfig = None, + _endpoint_override: str = None, + ): + self.name = index_name + + self.grpc_client_config = grpc_config or GRPCClientConfig() + self.retry_config = self.grpc_client_config.retry_config or RetryConfig() + self.fixed_metadata = {"api-key": config.API_KEY, "service-name": index_name, "client-version": CLIENT_VERSION} + self._endpoint_override = _endpoint_override + + self.method_config = json.dumps( + { + "methodConfig": [ + { + "name": [{"service": "VectorService.Upsert"}], + "retryPolicy": { + "maxAttempts": 5, + "initialBackoff": "0.1s", + "maxBackoff": "1s", + "backoffMultiplier": 2, + "retryableStatusCodes": ["UNAVAILABLE"], + }, + }, + { + "name": [{"service": "VectorService"}], + "retryPolicy": { + "maxAttempts": 5, + "initialBackoff": "0.1s", + "maxBackoff": "1s", + "backoffMultiplier": 2, + "retryableStatusCodes": ["UNAVAILABLE"], + }, + }, + ] + } + ) + + self._channel = channel or self._gen_channel() + self.stub = self.stub_class(self._channel) + + @property + @abstractmethod + def stub_class(self): + pass + + def _endpoint(self): + return ( + self._endpoint_override + if self._endpoint_override + else f"{self.name}-{Config.PROJECT_NAME}.svc.{Config.ENVIRONMENT}.pinecone.io:443" + ) + + def _gen_channel(self, options=None): + target = self._endpoint() + default_options = { + "grpc.max_send_message_length": MAX_MSG_SIZE, + "grpc.max_receive_message_length": MAX_MSG_SIZE, + "grpc.service_config": self.method_config, + "grpc.enable_retries": True, + } + if self.grpc_client_config.secure: + default_options["grpc.ssl_target_name_override"] = target.split(":")[0] + user_provided_options = options or {} + _options = tuple((k, v) for k, v in {**default_options, **user_provided_options}.items()) + _logger.debug( + "creating new channel with endpoint %s options %s and config %s", target, _options, self.grpc_client_config + ) + if not self.grpc_client_config.secure: + channel = grpc.insecure_channel(target, options=_options) + else: + root_cas = open(certifi.where(), "rb").read() + tls = grpc.ssl_channel_credentials(root_certificates=root_cas) + channel = grpc.secure_channel(target, tls, options=_options) + + return channel + + @property + def channel(self): + """Creates GRPC channel.""" + if self.grpc_client_config.reuse_channel and self._channel and self.grpc_server_on(): + return self._channel + self._channel = self._gen_channel() + return self._channel + + def grpc_server_on(self) -> bool: + try: + grpc.channel_ready_future(self._channel).result(timeout=self.grpc_client_config.conn_timeout) + return True + except grpc.FutureTimeoutError: + return False + + def close(self): + """Closes the connection to the index.""" + try: + self._channel.close() + except TypeError: + pass + + def _wrap_grpc_call( + self, func, request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None + ): + @wraps(func) + def wrapped(): + user_provided_metadata = metadata or {} + _metadata = tuple( + (k, v) for k, v in {**self.fixed_metadata, **self._request_metadata(), **user_provided_metadata}.items() + ) + try: + return func( + request, + timeout=timeout, + metadata=_metadata, + credentials=credentials, + wait_for_ready=wait_for_ready, + compression=compression, + ) + except _InactiveRpcError as e: + raise PineconeException(e._state.debug_error_string) from e + + return wrapped() + + def _request_metadata(self) -> Dict[str, str]: + return {REQUEST_ID: _generate_request_id()} + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() diff --git a/pinecone/data/grpc/config.py b/pinecone/data/grpc/config.py new file mode 100644 index 00000000..13ab7dae --- /dev/null +++ b/pinecone/data/grpc/config.py @@ -0,0 +1,33 @@ +from .retry import RetryConfig +from typing import NamedTuple, Optional, Dict + + +class GRPCClientConfig(NamedTuple): + """ + GRPC client configuration options. + + :param secure: Whether to use encrypted protocol (SSL). defaults to True. + :type traceroute: bool, optional + :param timeout: defaults to 2 seconds. Fail if gateway doesn't receive response within timeout. + :type timeout: int, optional + :param conn_timeout: defaults to 1. Timeout to retry connection if gRPC is unavailable. 0 is no retry. + :type conn_timeout: int, optional + :param reuse_channel: Whether to reuse the same grpc channel for multiple requests + :type reuse_channel: bool, optional + :param retry_config: RetryConfig indicating how requests should be retried + :type retry_config: RetryConfig, optional + :param grpc_channel_options: A dict of gRPC channel arguments + :type grpc_channel_options: Dict[str, str] + """ + + secure: bool = True + timeout: int = 20 + conn_timeout: int = 1 + reuse_channel: bool = True + retry_config: Optional[RetryConfig] = None + grpc_channel_options: Dict[str, str] = None + + @classmethod + def _from_dict(cls, kwargs: dict): + cls_kwargs = {kk: vv for kk, vv in kwargs.items() if kk in cls._fields} + return cls(**cls_kwargs) diff --git a/pinecone/data/grpc/future.py b/pinecone/data/grpc/future.py new file mode 100644 index 00000000..160169d6 --- /dev/null +++ b/pinecone/data/grpc/future.py @@ -0,0 +1,34 @@ +from grpc._channel import _MultiThreadedRendezvous +from pinecone.exceptions import PineconeException + + +class PineconeGrpcFuture: + def __init__(self, delegate): + self._delegate = delegate + + def cancel(self): + return self._delegate.cancel() + + def cancelled(self): + return self._delegate.cancelled() + + def running(self): + return self._delegate.running() + + def done(self): + return self._delegate.done() + + def add_done_callback(self, fun): + return self._delegate.add_done_callback(fun) + + def result(self, timeout=None): + try: + return self._delegate.result(timeout=timeout) + except _MultiThreadedRendezvous as e: + raise PineconeException(e._state.debug_error_string) from e + + def exception(self, timeout=None): + return self._delegate.exception(timeout=timeout) + + def traceback(self, timeout=None): + return self._delegate.traceback(timeout=timeout) diff --git a/pinecone/grpc/index_grpc.py b/pinecone/data/grpc/index_grpc.py similarity index 71% rename from pinecone/grpc/index_grpc.py rename to pinecone/data/grpc/index_grpc.py index 1e35300b..ef6e23db 100644 --- a/pinecone/grpc/index_grpc.py +++ b/pinecone/data/grpc/index_grpc.py @@ -1,22 +1,19 @@ import logging import numbers -from abc import ABC, abstractmethod -from functools import wraps -from importlib.util import find_spec -from typing import NamedTuple, Optional, Dict, Iterable, Union, List, Tuple, Any +from typing import Optional, Dict, Iterable, Union, List, Tuple, Any from collections.abc import Mapping -import certifi -import grpc from google.protobuf import json_format -from grpc._channel import _InactiveRpcError, _MultiThreadedRendezvous + from tqdm.autonotebook import tqdm -import json -from pinecone import FetchResponse, QueryResponse, ScoredVector, SingleQueryResults, DescribeIndexStatsResponse -from pinecone.config import Config -from pinecone.core.client.models.namespace_summary import NamespaceSummary -from pinecone.core.client.models.vector import Vector as _Vector +from .utils import dict_to_proto_struct, parse_fetch_response, parse_query_response, parse_stats_response + +from pinecone.core.client.models import ( + FetchResponse, + QueryResponse, + DescribeIndexStatsResponse, +) from pinecone.core.grpc.protos.vector_service_pb2 import ( Vector as GRPCVector, QueryVector as GRPCQueryVector, @@ -31,311 +28,21 @@ UpdateResponse, SparseValues as GRPCSparseValues, ) -from pinecone.core.client.models.sparse_values import SparseValues from pinecone.core.grpc.protos.vector_service_pb2_grpc import VectorServiceStub -from pinecone.grpc.retry import RetryOnRpcErrorClientInterceptor, RetryConfig -from pinecone.utils import _generate_request_id, dict_to_proto_struct, fix_tuple_length +from pinecone.utils import fix_tuple_length from pinecone.utils.constants import ( - MAX_MSG_SIZE, - REQUEST_ID, - CLIENT_VERSION, REQUIRED_VECTOR_FIELDS, OPTIONAL_VECTOR_FIELDS, ) -from pinecone.exceptions import PineconeException +from pinecone.data.grpc.base import GRPCIndexBase +from pinecone.data.grpc.future import PineconeGrpcFuture __all__ = ["GRPCIndex", "GRPCVector", "GRPCQueryVector", "GRPCSparseValues"] _logger = logging.getLogger(__name__) -class GRPCClientConfig(NamedTuple): - """ - GRPC client configuration options. - - :param secure: Whether to use encrypted protocol (SSL). defaults to True. - :type traceroute: bool, optional - :param timeout: defaults to 2 seconds. Fail if gateway doesn't receive response within timeout. - :type timeout: int, optional - :param conn_timeout: defaults to 1. Timeout to retry connection if gRPC is unavailable. 0 is no retry. - :type conn_timeout: int, optional - :param reuse_channel: Whether to reuse the same grpc channel for multiple requests - :type reuse_channel: bool, optional - :param retry_config: RetryConfig indicating how requests should be retried - :type retry_config: RetryConfig, optional - :param grpc_channel_options: A dict of gRPC channel arguments - :type grpc_channel_options: Dict[str, str] - """ - - secure: bool = True - timeout: int = 20 - conn_timeout: int = 1 - reuse_channel: bool = True - retry_config: Optional[RetryConfig] = None - grpc_channel_options: Dict[str, str] = None - - @classmethod - def _from_dict(cls, kwargs: dict): - cls_kwargs = {kk: vv for kk, vv in kwargs.items() if kk in cls._fields} - return cls(**cls_kwargs) - - -class GRPCIndexBase(ABC): - """ - Base class for grpc-based interaction with Pinecone indexes - """ - - _pool = None - - def __init__( - self, index_name: str, channel=None, grpc_config: GRPCClientConfig = None, _endpoint_override: str = None - ): - self.name = index_name - - self.grpc_client_config = grpc_config or GRPCClientConfig() - self.retry_config = self.grpc_client_config.retry_config or RetryConfig() - self.fixed_metadata = {"api-key": Config.API_KEY, "service-name": index_name, "client-version": CLIENT_VERSION} - self._endpoint_override = _endpoint_override - - self.method_config = json.dumps( - { - "methodConfig": [ - { - "name": [{"service": "VectorService.Upsert"}], - "retryPolicy": { - "maxAttempts": 5, - "initialBackoff": "0.1s", - "maxBackoff": "1s", - "backoffMultiplier": 2, - "retryableStatusCodes": ["UNAVAILABLE"], - }, - }, - { - "name": [{"service": "VectorService"}], - "retryPolicy": { - "maxAttempts": 5, - "initialBackoff": "0.1s", - "maxBackoff": "1s", - "backoffMultiplier": 2, - "retryableStatusCodes": ["UNAVAILABLE"], - }, - }, - ] - } - ) - - self._channel = channel or self._gen_channel() - self.stub = self.stub_class(self._channel) - - @property - @abstractmethod - def stub_class(self): - pass - - def _endpoint(self): - return ( - self._endpoint_override - if self._endpoint_override - else f"{self.name}-{Config.PROJECT_NAME}.svc.{Config.ENVIRONMENT}.pinecone.io:443" - ) - - def _gen_channel(self, options=None): - target = self._endpoint() - default_options = { - "grpc.max_send_message_length": MAX_MSG_SIZE, - "grpc.max_receive_message_length": MAX_MSG_SIZE, - "grpc.service_config": self.method_config, - "grpc.enable_retries": True, - } - if self.grpc_client_config.secure: - default_options["grpc.ssl_target_name_override"] = target.split(":")[0] - user_provided_options = options or {} - _options = tuple((k, v) for k, v in {**default_options, **user_provided_options}.items()) - _logger.debug( - "creating new channel with endpoint %s options %s and config %s", target, _options, self.grpc_client_config - ) - if not self.grpc_client_config.secure: - channel = grpc.insecure_channel(target, options=_options) - else: - root_cas = open(certifi.where(), "rb").read() - tls = grpc.ssl_channel_credentials(root_certificates=root_cas) - channel = grpc.secure_channel(target, tls, options=_options) - - return channel - - @property - def channel(self): - """Creates GRPC channel.""" - if self.grpc_client_config.reuse_channel and self._channel and self.grpc_server_on(): - return self._channel - self._channel = self._gen_channel() - return self._channel - - def grpc_server_on(self) -> bool: - try: - grpc.channel_ready_future(self._channel).result(timeout=self.grpc_client_config.conn_timeout) - return True - except grpc.FutureTimeoutError: - return False - - def close(self): - """Closes the connection to the index.""" - try: - self._channel.close() - except TypeError: - pass - - def _wrap_grpc_call( - self, func, request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None - ): - @wraps(func) - def wrapped(): - user_provided_metadata = metadata or {} - _metadata = tuple( - (k, v) for k, v in {**self.fixed_metadata, **self._request_metadata(), **user_provided_metadata}.items() - ) - try: - return func( - request, - timeout=timeout, - metadata=_metadata, - credentials=credentials, - wait_for_ready=wait_for_ready, - compression=compression, - ) - except _InactiveRpcError as e: - raise PineconeException(e._state.debug_error_string) from e - - return wrapped() - - def _request_metadata(self) -> Dict[str, str]: - return {REQUEST_ID: _generate_request_id()} - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_value, traceback): - self.close() - - -def parse_sparse_values(sparse_values: dict): - return ( - SparseValues(indices=sparse_values["indices"], values=sparse_values["values"]) - if sparse_values - else SparseValues(indices=[], values=[]) - ) - - -def parse_fetch_response(response: dict): - vd = {} - vectors = response.get("vectors") - if not vectors: - return None - for id, vec in vectors.items(): - v_obj = _Vector( - id=vec["id"], - values=vec["values"], - sparse_values=parse_sparse_values(vec.get("sparseValues")), - metadata=vec.get("metadata", None), - _check_type=False, - ) - vd[id] = v_obj - namespace = response.get("namespace", "") - return FetchResponse(vectors=vd, namespace=namespace, _check_type=False) - - -def parse_query_response(response: dict, unary_query: bool, _check_type: bool = False): - res = [] - - # TODO: consider deleting this deprecated case - for match in response.get("results", []): - namespace = match.get("namespace", "") - m = [] - if "matches" in match: - for item in match["matches"]: - sc = ScoredVector( - id=item["id"], - score=item.get("score", 0.0), - values=item.get("values", []), - sparse_values=parse_sparse_values(item.get("sparseValues")), - metadata=item.get("metadata", {}), - ) - m.append(sc) - res.append(SingleQueryResults(matches=m, namespace=namespace)) - - m = [] - for item in response.get("matches", []): - sc = ScoredVector( - id=item["id"], - score=item.get("score", 0.0), - values=item.get("values", []), - sparse_values=parse_sparse_values(item.get("sparseValues")), - metadata=item.get("metadata", {}), - _check_type=_check_type, - ) - m.append(sc) - - kwargs = {"_check_type": _check_type} - if unary_query: - kwargs["namespace"] = response.get("namespace", "") - kwargs["matches"] = m - else: - kwargs["results"] = res - return QueryResponse(**kwargs) - - -def parse_stats_response(response: dict): - fullness = response.get("indexFullness", 0.0) - total_vector_count = response.get("totalVectorCount", 0) - dimension = response.get("dimension", 0) - summaries = response.get("namespaces", {}) - namespace_summaries = {} - for key in summaries: - vc = summaries[key].get("vectorCount", 0) - namespace_summaries[key] = NamespaceSummary(vector_count=vc) - return DescribeIndexStatsResponse( - namespaces=namespace_summaries, - dimension=dimension, - index_fullness=fullness, - total_vector_count=total_vector_count, - _check_type=False, - ) - - -class PineconeGrpcFuture: - def __init__(self, delegate): - self._delegate = delegate - - def cancel(self): - return self._delegate.cancel() - - def cancelled(self): - return self._delegate.cancelled() - - def running(self): - return self._delegate.running() - - def done(self): - return self._delegate.done() - - def add_done_callback(self, fun): - return self._delegate.add_done_callback(fun) - - def result(self, timeout=None): - try: - return self._delegate.result(timeout=timeout) - except _MultiThreadedRendezvous as e: - raise PineconeException(e._state.debug_error_string) from e - - def exception(self, timeout=None): - return self._delegate.exception(timeout=timeout) - - def traceback(self, timeout=None): - return self._delegate.traceback(timeout=timeout) - - class GRPCIndex(GRPCIndexBase): - """A client for interacting with a Pinecone index via GRPC API.""" @property @@ -840,4 +547,4 @@ def _parse_sparse_values_arg( f"Received: {sparse_values}" ) - return GRPCSparseValues(indices=sparse_values["indices"], values=sparse_values["values"]) \ No newline at end of file + return GRPCSparseValues(indices=sparse_values["indices"], values=sparse_values["values"]) diff --git a/pinecone/grpc/protos/vector_column_service_pb2.py b/pinecone/data/grpc/protos/vector_column_service_pb2.py similarity index 99% rename from pinecone/grpc/protos/vector_column_service_pb2.py rename to pinecone/data/grpc/protos/vector_column_service_pb2.py index cab350a6..1b4ebadc 100644 --- a/pinecone/grpc/protos/vector_column_service_pb2.py +++ b/pinecone/data/grpc/protos/vector_column_service_pb2.py @@ -1290,4 +1290,4 @@ DESCRIPTOR.services_by_name["VectorColumnService"] = _VECTORCOLUMNSERVICE -# @@protoc_insertion_point(module_scope) \ No newline at end of file +# @@protoc_insertion_point(module_scope) diff --git a/pinecone/grpc/protos/vector_column_service_pb2.pyi b/pinecone/data/grpc/protos/vector_column_service_pb2.pyi similarity index 99% rename from pinecone/grpc/protos/vector_column_service_pb2.pyi rename to pinecone/data/grpc/protos/vector_column_service_pb2.pyi index 3b0176f6..3b85ddb1 100644 --- a/pinecone/grpc/protos/vector_column_service_pb2.pyi +++ b/pinecone/data/grpc/protos/vector_column_service_pb2.pyi @@ -347,4 +347,4 @@ class DescribeIndexStatsResponse(google___protobuf___message___Message): self, field_name: typing_extensions___Literal["dimension", b"dimension", "namespaces", b"namespaces"] ) -> None: ... -type___DescribeIndexStatsResponse = DescribeIndexStatsResponse \ No newline at end of file +type___DescribeIndexStatsResponse = DescribeIndexStatsResponse diff --git a/pinecone/grpc/protos/vector_column_service_pb2_grpc.py b/pinecone/data/grpc/protos/vector_column_service_pb2_grpc.py similarity index 99% rename from pinecone/grpc/protos/vector_column_service_pb2_grpc.py rename to pinecone/data/grpc/protos/vector_column_service_pb2_grpc.py index 93db5020..677e12b4 100644 --- a/pinecone/grpc/protos/vector_column_service_pb2_grpc.py +++ b/pinecone/data/grpc/protos/vector_column_service_pb2_grpc.py @@ -264,4 +264,4 @@ def DescribeIndexStats( wait_for_ready, timeout, metadata, - ) \ No newline at end of file + ) diff --git a/pinecone/grpc/retry.py b/pinecone/data/grpc/retry.py similarity index 99% rename from pinecone/grpc/retry.py rename to pinecone/data/grpc/retry.py index e1ec306a..b2718288 100644 --- a/pinecone/grpc/retry.py +++ b/pinecone/data/grpc/retry.py @@ -84,4 +84,4 @@ class RetryConfig(NamedTuple): max_attempts: int = 4 sleep_policy: SleepPolicy = ExponentialBackoff(init_backoff_ms=100, max_backoff_ms=1600, multiplier=2) - retryable_status: Optional[Tuple[grpc.StatusCode, ...]] = (grpc.StatusCode.UNAVAILABLE,) \ No newline at end of file + retryable_status: Optional[Tuple[grpc.StatusCode, ...]] = (grpc.StatusCode.UNAVAILABLE,) diff --git a/pinecone/data/grpc/utils.py b/pinecone/data/grpc/utils.py new file mode 100644 index 00000000..fc69c83e --- /dev/null +++ b/pinecone/data/grpc/utils.py @@ -0,0 +1,107 @@ +import uuid + +from google.protobuf.struct_pb2 import Struct + +from pinecone.core.client.models import ( + Vector as _Vector, + ScoredVector, + SparseValues, + FetchResponse, + SingleQueryResults, + QueryResponse, + DescribeIndexStatsResponse, + NamespaceSummary, +) + +def _generate_request_id() -> str: + return str(uuid.uuid4()) + +def dict_to_proto_struct(d: dict) -> "Struct": + if not d: + d = {} + s = Struct() + s.update(d) + return s + +def parse_sparse_values(sparse_values: dict): + return ( + SparseValues(indices=sparse_values["indices"], values=sparse_values["values"]) + if sparse_values + else SparseValues(indices=[], values=[]) + ) + + +def parse_fetch_response(response: dict): + vd = {} + vectors = response.get("vectors") + if not vectors: + return None + for id, vec in vectors.items(): + v_obj = _Vector( + id=vec["id"], + values=vec["values"], + sparse_values=parse_sparse_values(vec.get("sparseValues")), + metadata=vec.get("metadata", None), + _check_type=False, + ) + vd[id] = v_obj + namespace = response.get("namespace", "") + return FetchResponse(vectors=vd, namespace=namespace, _check_type=False) + + +def parse_query_response(response: dict, unary_query: bool, _check_type: bool = False): + res = [] + + # TODO: consider deleting this deprecated case + for match in response.get("results", []): + namespace = match.get("namespace", "") + m = [] + if "matches" in match: + for item in match["matches"]: + sc = ScoredVector( + id=item["id"], + score=item.get("score", 0.0), + values=item.get("values", []), + sparse_values=parse_sparse_values(item.get("sparseValues")), + metadata=item.get("metadata", {}), + ) + m.append(sc) + res.append(SingleQueryResults(matches=m, namespace=namespace)) + + m = [] + for item in response.get("matches", []): + sc = ScoredVector( + id=item["id"], + score=item.get("score", 0.0), + values=item.get("values", []), + sparse_values=parse_sparse_values(item.get("sparseValues")), + metadata=item.get("metadata", {}), + _check_type=_check_type, + ) + m.append(sc) + + kwargs = {"_check_type": _check_type} + if unary_query: + kwargs["namespace"] = response.get("namespace", "") + kwargs["matches"] = m + else: + kwargs["results"] = res + return QueryResponse(**kwargs) + + +def parse_stats_response(response: dict): + fullness = response.get("indexFullness", 0.0) + total_vector_count = response.get("totalVectorCount", 0) + dimension = response.get("dimension", 0) + summaries = response.get("namespaces", {}) + namespace_summaries = {} + for key in summaries: + vc = summaries[key].get("vectorCount", 0) + namespace_summaries[key] = NamespaceSummary(vector_count=vc) + return DescribeIndexStatsResponse( + namespaces=namespace_summaries, + dimension=dimension, + index_fullness=fullness, + total_vector_count=total_vector_count, + _check_type=False, + ) diff --git a/pinecone/index.py b/pinecone/data/index.py similarity index 95% rename from pinecone/index.py rename to pinecone/data/index.py index 663645d6..c2250352 100644 --- a/pinecone/index.py +++ b/pinecone/data/index.py @@ -1,3 +1,5 @@ +import os + from tqdm.autonotebook import tqdm from importlib.util import find_spec import numbers @@ -6,12 +8,12 @@ from collections.abc import Iterable, Mapping from typing import Union, List, Tuple, Optional, Dict, Any -from .core.client.models.sparse_values import SparseValues -from pinecone import Config +from pinecone.config import Config + +from pinecone.core.client.models.sparse_values import SparseValues from pinecone.core.client import ApiClient -from .core.client.models import ( +from pinecone.core.client.models import ( FetchResponse, - ProtobufAny, QueryRequest, QueryResponse, QueryVector, @@ -28,13 +30,12 @@ DescribeIndexStatsRequest, ) from pinecone.core.client.api.vector_operations_api import VectorOperationsApi -from .utils import fix_tuple_length, get_user_agent, warn_deprecated +from ..utils import fix_tuple_length, get_user_agent, warn_deprecated import copy __all__ = [ "Index", "FetchResponse", - "ProtobufAny", "QueryRequest", "QueryResponse", "QueryVector", @@ -52,8 +53,8 @@ "SparseValues", ] -from .utils.constants import REQUIRED_VECTOR_FIELDS, OPTIONAL_VECTOR_FIELDS -from .utils.error_handling import validate_and_convert_errors +from ..utils.constants import REQUIRED_VECTOR_FIELDS, OPTIONAL_VECTOR_FIELDS +from ..utils.error_handling import validate_and_convert_errors _OPENAPI_ENDPOINT_PARAMS = ( "_return_http_data_only", @@ -81,25 +82,30 @@ def upsert_numpy_deprecation_notice(context): warn_deprecated(message, deprecated_in="2.2.1", removal_in="3.0.0") -class Index(ApiClient): +class Index(): """ A client for interacting with a Pinecone index via REST API. For improved performance, use the Pinecone GRPC index client. """ - def __init__(self, index_name: str, pool_threads=1): - openapi_client_config = copy.deepcopy(Config.OPENAPI_CONFIG) - openapi_client_config.api_key = openapi_client_config.api_key or {} - openapi_client_config.api_key["ApiKeyAuth"] = openapi_client_config.api_key.get("ApiKeyAuth", Config.API_KEY) - openapi_client_config.server_variables = openapi_client_config.server_variables or {} - openapi_client_config.server_variables = { - **{"environment": Config.ENVIRONMENT, "index_name": index_name, "project_name": Config.PROJECT_NAME}, - **openapi_client_config.server_variables, - } - super().__init__(configuration=openapi_client_config, pool_threads=pool_threads) - self.user_agent = get_user_agent() - self._vector_api = VectorOperationsApi(self) + def __init__(self, api_key: str, host: str, pool_threads=1, **kwargs): + api_key = api_key or kwargs.get("api_key") + host = host or kwargs.get('host') + pool_threads = pool_threads or kwargs.get("pool_threads") + + self._config = Config(api_key=api_key, host=host, **kwargs) + + api_client = ApiClient(configuration=self._config.OPENAPI_CONFIG, pool_threads=pool_threads) + api_client.user_agent = get_user_agent() + self._api_client = api_client + self._vector_api = VectorOperationsApi(api_client=api_client) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self._api_client.close() @validate_and_convert_errors def upsert( diff --git a/pinecone/deprecated/__init__.py b/pinecone/deprecated/__init__.py new file mode 100644 index 00000000..f67d7226 --- /dev/null +++ b/pinecone/deprecated/__init__.py @@ -0,0 +1 @@ +from .legacy_utils import * \ No newline at end of file diff --git a/pinecone/deprecated/legacy_utils.py b/pinecone/deprecated/legacy_utils.py new file mode 100644 index 00000000..19b5f58e --- /dev/null +++ b/pinecone/deprecated/legacy_utils.py @@ -0,0 +1,88 @@ +import re +import uuid +from typing import List + +from pinecone.utils import warn_deprecated + +try: + from pinecone.data.grpc.protos import vector_column_service_pb2 + from google.protobuf.struct_pb2 import Struct + from google.protobuf import json_format + import numpy as np + import lz4.frame +except Exception: + pass # ignore for non-[grpc] installations + +DNS_COMPATIBLE_REGEX = re.compile("^[a-z0-9]([a-z0-9]|[-])+[a-z0-9]$") + + +def dump_numpy_public(np_array: "np.ndarray", compressed: bool = False) -> "vector_column_service_pb2.NdArray": + """ + Dump numpy array to vector_column_service_pb2.NdArray + """ + warn_deprecated( + "dump_numpy_public and all numpy-related features will be removed in a future version", + deprecated_in="2.2.1", + removal_in="3.0.0", + ) + protobuf_arr = vector_column_service_pb2.NdArray() + protobuf_arr.dtype = str(np_array.dtype) + protobuf_arr.shape.extend(np_array.shape) + if compressed: + protobuf_arr.buffer = lz4.frame.compress(np_array.tobytes()) + protobuf_arr.compressed = True + else: + protobuf_arr.buffer = np_array.tobytes() + return protobuf_arr + + +def dump_strings_public(strs: List[str], compressed: bool = False) -> "vector_column_service_pb2.NdArray": + warn_deprecated( + "dump_strings_public and all numpy-related features will be removed in a future version", + deprecated_in="2.2.1", + removal_in="3.0.0", + ) + return dump_numpy_public(np.array(strs, dtype="S"), compressed=compressed) + + + +def validate_dns_name(name): + if not DNS_COMPATIBLE_REGEX.match(name): + raise ValueError( + "{} is invalid - service names and node names must consist of lower case " + "alphanumeric characters or '-', start with an alphabetic character, and end with an " + "alphanumeric character (e.g. 'my-name', or 'abc-123')".format(name) + ) + + +def proto_struct_to_dict(s: "Struct") -> dict: + return json_format.MessageToDict(s) + + +def load_numpy_public(proto_arr: "vector_column_service_pb2.NdArray") -> "np.ndarray": + """ + Load numpy array from protobuf + :param proto_arr: + :return: + """ + warn_deprecated( + "load_numpy_public and all numpy-related features will be removed in a future version", + deprecated_in="2.2.1", + removal_in="3.0.0", + ) + if len(proto_arr.shape) == 0: + return np.array([]) + if proto_arr.compressed: + numpy_arr = np.frombuffer(lz4.frame.decompress(proto_arr.buffer), dtype=proto_arr.dtype) + else: + numpy_arr = np.frombuffer(proto_arr.buffer, dtype=proto_arr.dtype) + return numpy_arr.reshape(proto_arr.shape) + + +def load_strings_public(proto_arr: "vector_column_service_pb2.NdArray") -> List[str]: + warn_deprecated( + "load_strings_public and all numpy-related features will be removed in a future version", + deprecated_in="2.2.1", + removal_in="3.0.0", + ) + return [str(item, "utf-8") for item in load_numpy_public(proto_arr)] diff --git a/pinecone/exceptions.py b/pinecone/exceptions.py index 62da708f..98610be3 100644 --- a/pinecone/exceptions.py +++ b/pinecone/exceptions.py @@ -17,7 +17,11 @@ class PineconeException(Exception): class PineconeProtocolError(PineconeException): """Raised when something unexpected happens mid-request/response.""" +class PineconeConfigurationError(PineconeException): + """Raised when a configuration error occurs.""" + __all__ = [ + "PineconeConfigurationError", "PineconeException", "PineconeProtocolError", "OpenApiException", diff --git a/pinecone/manage.py b/pinecone/manage.py deleted file mode 100644 index 5f3227a8..00000000 --- a/pinecone/manage.py +++ /dev/null @@ -1,300 +0,0 @@ -import time -from typing import NamedTuple, Optional - -import pinecone -from pinecone.config import Config -from pinecone.core.client.api.index_operations_api import IndexOperationsApi -from pinecone.core.client.api_client import ApiClient -from pinecone.core.client.models.create_request import CreateRequest -from pinecone.core.client.models.patch_request import PatchRequest -from pinecone.core.client.models.create_collection_request import CreateCollectionRequest -from pinecone.utils import get_user_agent - -__all__ = [ - "create_index", - "delete_index", - "describe_index", - "list_indexes", - "scale_index", - "create_collection", - "describe_collection", - "list_collections", - "delete_collection", - "configure_index", - "CollectionDescription", - "IndexDescription", -] - - -class IndexDescription(NamedTuple): - name: str - metric: str - replicas: int - dimension: int - shards: int - pods: int - pod_type: str - status: None - metadata_config: None - source_collection: None - - -class CollectionDescription(object): - def __init__(self, keys, values): - for k, v in zip(keys, values): - self.__dict__[k] = v - - def __str__(self): - return str(self.__dict__) - - -def _get_api_instance(): - client_config = Config.OPENAPI_CONFIG - client_config.api_key = client_config.api_key or {} - client_config.api_key["ApiKeyAuth"] = client_config.api_key.get("ApiKeyAuth", Config.API_KEY) - client_config.server_variables = {**{"environment": Config.ENVIRONMENT}, **client_config.server_variables} - - # If a custom host has been passed with initialization pass it to the client_config - if (Config.CONTROLLER_HOST): - client_config.host = Config.CONTROLLER_HOST - - api_client = ApiClient(configuration=client_config) - api_client.user_agent = get_user_agent() - api_instance = IndexOperationsApi(api_client) - return api_instance - - -def _get_status(name: str): - api_instance = _get_api_instance() - response = api_instance.describe_index(name) - return response["status"] - - -def create_index( - name: str, - dimension: int, - cloud: str, - region: str, - capacity_mode: str, - timeout: int = None, - index_type: str = "approximated", - metric: str = "cosine", - replicas: int = 1, - shards: int = 1, - pods: int = 1, - pod_type: str = "p1", - index_config: dict = None, - metadata_config: dict = None, - source_collection: str = "", -): - """Creates a Pinecone index. - - :param name: the name of the index. - :type name: str - :param dimension: the dimension of vectors that would be inserted in the index - :param cloud: The cloud where you would like your index hosted. One of `{"aws", "gcp"}`. - :param region: The region where you would like your index hosted. - :param capacity_mode: The capacity mode for the index. One of `{"pod"}`. - :param index_type: type of index, one of `{"approximated", "exact"}`, defaults to "approximated". - The "approximated" index uses fast approximate search algorithms developed by Pinecone. - The "exact" index uses accurate exact search algorithms. - It performs exhaustive searches and thus it is usually slower than the "approximated" index. - :type index_type: str, optional - :param metric: type of metric used in the vector index, one of `{"cosine", "dotproduct", "euclidean"}`, defaults to "cosine". - Use "cosine" for cosine similarity, - "dotproduct" for dot-product, - and "euclidean" for euclidean distance. - :type metric: str, optional - :param replicas: the number of replicas, defaults to 1. - Use at least 2 replicas if you need high availability (99.99% uptime) for querying. - For additional throughput (QPS) your index needs to support, provision additional replicas. - :type replicas: int, optional - :param shards: the number of shards per index, defaults to 1. - Use 1 shard per 1GB of vectors - :type shards: int,optional - :param pods: Total number of pods to be used by the index. pods = shard*replicas - :type pods: int,optional - :param pod_type: the pod type to be used for the index. can be one of p1 or s1. - :type pod_type: str,optional - :param index_config: Advanced configuration options for the index - :param metadata_config: Configuration related to the metadata index - :type metadata_config: dict, optional - :param source_collection: Collection name to create the index from - :type metadata_config: str, optional - :type timeout: int, optional - :param timeout: Timeout for wait until index gets ready. If None, wait indefinitely; if >=0, time out after this many seconds; - if -1, return immediately and do not wait. Default: None - """ - api_instance = _get_api_instance() - - api_instance.create_index( - create_request=CreateRequest( - name=name, - dimension=dimension, - cloud=cloud, - region=region, - capacity_mode=capacity_mode, - index_type=index_type, - metric=metric, - replicas=replicas, - shards=shards, - pods=pods, - pod_type=pod_type, - index_config=index_config or {}, - metadata_config=metadata_config, - source_collection=source_collection, - ) - ) - - def is_ready(): - status = _get_status(name) - ready = status["ready"] - return ready - - if timeout == -1: - return - if timeout is None: - while not is_ready(): - time.sleep(5) - else: - while (not is_ready()) and timeout >= 0: - time.sleep(5) - timeout -= 5 - if timeout and timeout < 0: - raise ( - TimeoutError( - "Please call the describe_index API ({}) to confirm index status.".format( - "https://www.pinecone.io/docs/api/operation/describe_index/" - ) - ) - ) - - -def delete_index(name: str, timeout: int = None): - """Deletes a Pinecone index. - - :param name: the name of the index. - :type name: str - :param timeout: Timeout for wait until index gets ready. If None, wait indefinitely; if >=0, time out after this many seconds; - if -1, return immediately and do not wait. Default: None - :type timeout: int, optional - """ - api_instance = _get_api_instance() - api_instance.delete_index(name) - - def get_remaining(): - return name in api_instance.list_indexes() - - if timeout == -1: - return - - if timeout is None: - while get_remaining(): - time.sleep(5) - else: - while get_remaining() and timeout >= 0: - time.sleep(5) - timeout -= 5 - if timeout and timeout < 0: - raise ( - TimeoutError( - "Please call the list_indexes API ({}) to confirm if index is deleted".format( - "https://www.pinecone.io/docs/api/operation/list_indexes/" - ) - ) - ) - - -def list_indexes(): - """Lists all indexes.""" - api_instance = _get_api_instance() - response = api_instance.list_indexes() - return response - - -def describe_index(name: str): - """Describes a Pinecone index. - - :param name: the name of the index to describe. - :return: Returns an `IndexDescription` object - """ - api_instance = _get_api_instance() - response = api_instance.describe_index(name) - db = response["database"] - ready = response["status"]["ready"] - state = response["status"]["state"] - return IndexDescription( - name=db["name"], - metric=db["metric"], - replicas=db["replicas"], - dimension=db["dimension"], - shards=db["shards"], - pods=db.get("pods", db["shards"] * db["replicas"]), - pod_type=db.get("pod_type", "p1"), - status={"ready": ready, "state": state}, - metadata_config=db.get("metadata_config"), - source_collection=db.get("source_collection", ""), - ) - - -def scale_index(name: str, replicas: int): - """Increases number of replicas for the index. - - :param name: the name of the Index - :type name: str - :param replicas: the number of replicas in the index now, lowest value is 0. - :type replicas: int - """ - api_instance = _get_api_instance() - api_instance.configure_index(name, patch_request=PatchRequest(replicas=replicas, pod_type="")) - - -def create_collection(name: str, source: str): - """Create a collection - :param name: Name of the collection - :param source: Name of the source index - """ - api_instance = _get_api_instance() - api_instance.create_collection(create_collection_request=CreateCollectionRequest(name=name, source=source)) - - -def list_collections(): - """List all collections""" - api_instance = _get_api_instance() - response = api_instance.list_collections() - return response - - -def delete_collection(name: str): - """Deletes a collection. - :param: name: The name of the collection - """ - api_instance = _get_api_instance() - api_instance.delete_collection(name) - - -def describe_collection(name: str): - """Describes a collection. - :param: The name of the collection - :return: Description of the collection - """ - api_instance = _get_api_instance() - response = api_instance.describe_collection(name).to_dict() - response_object = CollectionDescription(response.keys(), response.values()) - return response_object - - -def configure_index(name: str, replicas: Optional[int] = None, pod_type: Optional[str] = ""): - """Changes current configuration of the index. - :param: name: the name of the Index - :param: replicas: the desired number of replicas, lowest value is 0. - :param: pod_type: the new pod_type for the index. - """ - api_instance = _get_api_instance() - config_args = {} - if pod_type != "": - config_args.update(pod_type=pod_type) - if replicas: - config_args.update(replicas=replicas) - patch_request = PatchRequest(**config_args) - api_instance.configure_index(name, patch_request=patch_request) diff --git a/pinecone/utils/__init__.py b/pinecone/utils/__init__.py index 08f885eb..c826e0ff 100644 --- a/pinecone/utils/__init__.py +++ b/pinecone/utils/__init__.py @@ -1,136 +1,5 @@ -import inspect -import logging -import re -import uuid -import warnings -from pathlib import Path -from typing import List - -import requests -import urllib3 - -try: - from pinecone.grpc.protos import vector_column_service_pb2 - from google.protobuf.struct_pb2 import Struct - from google.protobuf import json_format - import numpy as np - import lz4.frame -except Exception: - pass # ignore for non-[grpc] installations - -DNS_COMPATIBLE_REGEX = re.compile("^[a-z0-9]([a-z0-9]|[-])+[a-z0-9]$") - - -def dump_numpy_public(np_array: "np.ndarray", compressed: bool = False) -> "vector_column_service_pb2.NdArray": - """ - Dump numpy array to vector_column_service_pb2.NdArray - """ - warn_deprecated( - "dump_numpy_public and all numpy-related features will be removed in a future version", - deprecated_in="2.2.1", - removal_in="3.0.0", - ) - protobuf_arr = vector_column_service_pb2.NdArray() - protobuf_arr.dtype = str(np_array.dtype) - protobuf_arr.shape.extend(np_array.shape) - if compressed: - protobuf_arr.buffer = lz4.frame.compress(np_array.tobytes()) - protobuf_arr.compressed = True - else: - protobuf_arr.buffer = np_array.tobytes() - return protobuf_arr - - -def dump_strings_public(strs: List[str], compressed: bool = False) -> "vector_column_service_pb2.NdArray": - warn_deprecated( - "dump_strings_public and all numpy-related features will be removed in a future version", - deprecated_in="2.2.1", - removal_in="3.0.0", - ) - return dump_numpy_public(np.array(strs, dtype="S"), compressed=compressed) - - -def get_version(): - return Path(__file__).parent.parent.joinpath("__version__").read_text().strip() - - -def get_environment(): - return Path(__file__).parent.parent.joinpath("__environment__").read_text().strip() - - -def validate_dns_name(name): - if not DNS_COMPATIBLE_REGEX.match(name): - raise ValueError( - "{} is invalid - service names and node names must consist of lower case " - "alphanumeric characters or '-', start with an alphabetic character, and end with an " - "alphanumeric character (e.g. 'my-name', or 'abc-123')".format(name) - ) - - -def _generate_request_id() -> str: - return str(uuid.uuid4()) - - -def fix_tuple_length(t, n): - """Extend tuple t to length n by adding None items at the end of the tuple. Return the new tuple.""" - return t + ((None,) * (n - len(t))) if len(t) < n else t - - -def get_user_agent(): - client_id = f"python-client-{get_version()}" - user_agent_details = {"requests": requests.__version__, "urllib3": urllib3.__version__} - user_agent = "{} ({})".format(client_id, ", ".join([f"{k}:{v}" for k, v in user_agent_details.items()])) - return user_agent - - -def dict_to_proto_struct(d: dict) -> "Struct": - if not d: - d = {} - s = Struct() - s.update(d) - return s - - -def proto_struct_to_dict(s: "Struct") -> dict: - return json_format.MessageToDict(s) - - -def load_numpy_public(proto_arr: "vector_column_service_pb2.NdArray") -> "np.ndarray": - """ - Load numpy array from protobuf - :param proto_arr: - :return: - """ - warn_deprecated( - "load_numpy_public and all numpy-related features will be removed in a future version", - deprecated_in="2.2.1", - removal_in="3.0.0", - ) - if len(proto_arr.shape) == 0: - return np.array([]) - if proto_arr.compressed: - numpy_arr = np.frombuffer(lz4.frame.decompress(proto_arr.buffer), dtype=proto_arr.dtype) - else: - numpy_arr = np.frombuffer(proto_arr.buffer, dtype=proto_arr.dtype) - return numpy_arr.reshape(proto_arr.shape) - - -def load_strings_public(proto_arr: "vector_column_service_pb2.NdArray") -> List[str]: - warn_deprecated( - "load_strings_public and all numpy-related features will be removed in a future version", - deprecated_in="2.2.1", - removal_in="3.0.0", - ) - return [str(item, "utf-8") for item in load_numpy_public(proto_arr)] - - -def warn_deprecated(description: str = "", deprecated_in: str = None, removal_in: str = None): - message = f"DEPRECATED since v{deprecated_in} [Will be removed in v{removal_in}]: {description}" - warnings.warn(message, FutureWarning) - - -def check_kwargs(caller, given): - argspec = inspect.getfullargspec(caller) - diff = set(given).difference(argspec.args) - if diff: - logging.exception(caller.__name__ + " had unexpected keyword argument(s): " + ", ".join(diff), exc_info=False) \ No newline at end of file +from .check_kwargs import check_kwargs +from .environment import get_version, get_environment +from .user_agent import get_user_agent +from .deprecation_notice import warn_deprecated +from .fix_tuple_length import fix_tuple_length \ No newline at end of file diff --git a/pinecone/utils/check_kwargs.py b/pinecone/utils/check_kwargs.py new file mode 100644 index 00000000..f03bc7b7 --- /dev/null +++ b/pinecone/utils/check_kwargs.py @@ -0,0 +1,8 @@ +import inspect +import logging + +def check_kwargs(caller, given): + argspec = inspect.getfullargspec(caller) + diff = set(given).difference(argspec.args) + if diff: + logging.exception(caller.__name__ + " had unexpected keyword argument(s): " + ", ".join(diff), exc_info=False) diff --git a/pinecone/utils/constants.py b/pinecone/utils/constants.py index c415a36a..4b3090a7 100644 --- a/pinecone/utils/constants.py +++ b/pinecone/utils/constants.py @@ -1,10 +1,7 @@ import os import enum -from pinecone.utils import get_environment, get_version - -PARENT_LOGGER_NAME = "pinecone" -DEFAULT_PARENT_LOGGER_LEVEL = "ERROR" +from .environment import get_environment, get_version MAX_MSG_SIZE = 128 * 1024 * 1024 @@ -30,9 +27,5 @@ class NodeType(str, enum.Enum): CLIENT_VERSION = get_version() CLIENT_ID = f"python-client-{CLIENT_VERSION}" -TCP_KEEPINTVL = 60 # Sec -TCP_KEEPIDLE = 300 # Sec -TCP_KEEPCNT = 4 - REQUIRED_VECTOR_FIELDS = {"id", "values"} -OPTIONAL_VECTOR_FIELDS = {"sparse_values", "metadata"} \ No newline at end of file +OPTIONAL_VECTOR_FIELDS = {"sparse_values", "metadata"} diff --git a/pinecone/utils/deprecation_notice.py b/pinecone/utils/deprecation_notice.py new file mode 100644 index 00000000..a27fe9c5 --- /dev/null +++ b/pinecone/utils/deprecation_notice.py @@ -0,0 +1,5 @@ +import warnings + +def warn_deprecated(description: str = "", deprecated_in: str = None, removal_in: str = None): + message = f"DEPRECATED since v{deprecated_in} [Will be removed in v{removal_in}]: {description}" + warnings.warn(message, FutureWarning) \ No newline at end of file diff --git a/pinecone/utils/environment.py b/pinecone/utils/environment.py new file mode 100644 index 00000000..d80e868e --- /dev/null +++ b/pinecone/utils/environment.py @@ -0,0 +1,8 @@ +from pathlib import Path + +def get_version(): + return Path(__file__).parent.parent.joinpath("__version__").read_text().strip() + + +def get_environment(): + return Path(__file__).parent.parent.joinpath("__environment__").read_text().strip() \ No newline at end of file diff --git a/pinecone/utils/error_handling.py b/pinecone/utils/error_handling.py index f64b1a59..733e6091 100644 --- a/pinecone/utils/error_handling.py +++ b/pinecone/utils/error_handling.py @@ -3,19 +3,14 @@ from urllib3.exceptions import MaxRetryError, ProtocolError -from pinecone import Config - def validate_and_convert_errors(func): @wraps(func) def inner_func(*args, **kwargs): - Config.validate() # raises exceptions in case of invalid config try: return func(*args, **kwargs) except MaxRetryError as e: if isinstance(e.reason, ProtocolError): - raise ProtocolError( - f"Failed to connect to {e.url}; did you specify the correct index name?" - ) from e + raise ProtocolError(f"Failed to connect to {e.url}; did you specify the correct index name?") from e else: raise except ProtocolError as e: @@ -24,4 +19,4 @@ def inner_func(*args, **kwargs): # Override signature sig = inspect.signature(func) inner_func.__signature__ = sig - return inner_func \ No newline at end of file + return inner_func diff --git a/pinecone/utils/fix_tuple_length.py b/pinecone/utils/fix_tuple_length.py new file mode 100644 index 00000000..fa4fc5eb --- /dev/null +++ b/pinecone/utils/fix_tuple_length.py @@ -0,0 +1,3 @@ +def fix_tuple_length(t, n): + """Extend tuple t to length n by adding None items at the end of the tuple. Return the new tuple.""" + return t + ((None,) * (n - len(t))) if len(t) < n else t \ No newline at end of file diff --git a/pinecone/utils/user_agent.py b/pinecone/utils/user_agent.py new file mode 100644 index 00000000..439a7f5d --- /dev/null +++ b/pinecone/utils/user_agent.py @@ -0,0 +1,10 @@ +import requests +import urllib3 + +from pinecone.utils import get_version + +def get_user_agent(): + client_id = f"python-client-{get_version()}" + user_agent_details = {"requests": requests.__version__, "urllib3": urllib3.__version__} + user_agent = "{} ({})".format(client_id, ", ".join([f"{k}:{v}" for k, v in user_agent_details.items()])) + return user_agent \ No newline at end of file diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 7fec4826..bc65abe0 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -1,209 +1,80 @@ import pinecone -from pinecone.config import Config +from pinecone.exceptions import ApiKeyError, PineconeConfigurationError +from pinecone.config import PineconeConfig from pinecone.core.client.configuration import Configuration as OpenApiConfiguration import pytest -import tempfile import os +class TestConfig: + @pytest.fixture(autouse=True) + def run_before_and_after_tests(tmpdir): + """Fixture to execute asserts before and after a test is run""" -@pytest.fixture(autouse=True) -def run_before_and_after_tests(tmpdir): - """Fixture to execute asserts before and after a test is run""" + # Defend against unexpected env vars. Since we clear these variables below + # after each test execution, these should only be raised if there is + # test pollution in the environment coming from some other test file/setup. + known_env_vars = ["PINECONE_API_KEY", "PINECONE_ENVIRONMENT", "PINECONE_CONTROLLER_HOST"] + for var in known_env_vars: + if os.getenv(var): + raise ValueError(f"Unexpected env var {var} found in environment. Check for test pollution.") - # Defend against unexpected env vars. Since we clear these variables below - # after each test execution, these should only be raised if there is - # test pollution in the environment coming from some other test file/setup. - known_env_vars = ["PINECONE_API_KEY", "PINECONE_ENVIRONMENT", "PINECONE_CONTROLLER_HOST"] - for var in known_env_vars: - if os.getenv(var): - raise ValueError(f"Unexpected env var {var} found in environment. Check for test pollution.") + yield # this is where the testing happens - # Unfortunately since config is a singleton, we need to reset it manually between tests - pinecone.init() + # Teardown : Unset any env vars created during test execution + for var in known_env_vars: + if os.getenv(var): + del os.environ[var] - yield # this is where the testing happens + def test_init_with_environment_vars(self): + os.environ["PINECONE_API_KEY"] = "test-api-key" + os.environ["PINECONE_CONTROLLER_HOST"] = "test-controller-host" - # Teardown : Unset any env vars created during test execution - for var in known_env_vars: - if os.getenv(var): - del os.environ[var] + config = PineconeConfig() + assert config.API_KEY == "test-api-key" + assert config.HOST == "test-controller-host" -def test_default_config(): - """ - Test that default config is loaded when no config is specified. This not really a valid config that can be used, - but adding this test just to document the legacy behavior. - """ - pinecone.init() - assert Config.API_KEY == "" - assert Config.ENVIRONMENT == "us-west1-gcp" - assert Config.CONTROLLER_HOST == "https://controller.us-west1-gcp.pinecone.io" - assert Config.LOG_LEVEL == "ERROR" + def test_init_with_positional_args(self): + api_key = "my-api-key" + host = "my-controller-host" + config = PineconeConfig(api_key, host) -def test_init_with_environment_vars(): - os.environ["PINECONE_ENVIRONMENT"] = "test-env" - os.environ["PINECONE_API_KEY"] = "test-api-key" - os.environ["PINECONE_CONTROLLER_HOST"] = "test-controller-host" + assert config.API_KEY == api_key + assert config.HOST == host - pinecone.init() + def test_init_with_kwargs(self): + api_key = "my-api-key" + controller_host = "my-controller-host" + openapi_config = OpenApiConfiguration(api_key="openapi-api-key") - assert Config.API_KEY == "test-api-key" - assert Config.ENVIRONMENT == "test-env" - assert Config.CONTROLLER_HOST == "test-controller-host" + config = PineconeConfig(api_key=api_key, host=controller_host, openapi_config=openapi_config) + assert config.API_KEY == api_key + assert config.HOST == controller_host + assert config.OPENAPI_CONFIG == openapi_config -def test_init_with_positional_args(): - api_key = "my-api-key" - environment = "test-env" - host = "my-controller-host" - log_level = None # deprecated property but still in positional list - openapi_config = OpenApiConfiguration(api_key="openapi-api-key") + def test_init_with_mispelled_kwargs(self, caplog): + PineconeConfig(api_key='my-api-key', unknown_kwarg='bogus') + assert "__init__ had unexpected keyword argument(s): unknown_kwarg" in caplog.text - pinecone.init(api_key, host, environment, log_level, openapi_config) - - assert Config.API_KEY == api_key - assert Config.ENVIRONMENT == environment - assert Config.CONTROLLER_HOST == host - assert Config.OPENAPI_CONFIG == openapi_config - - -def test_init_with_kwargs(): - env = "test-env" - api_key = "my-api-key" - controller_host = "my-controller-host" - openapi_config = OpenApiConfiguration(api_key="openapi-api-key") - - pinecone.init(api_key=api_key, environment=env, host=controller_host, openapi_config=openapi_config) - - assert Config.API_KEY == api_key - assert Config.ENVIRONMENT == env - assert Config.CONTROLLER_HOST == controller_host - assert Config.OPENAPI_CONFIG == openapi_config - - -def test_init_with_mispelled_kwargs(caplog): - pinecone.init(invalid_kwarg="value") - assert "init had unexpected keyword argument(s): invalid_kwarg" in caplog.text - - -def test_init_with_file_based_configuration(): - """Test that config can be loaded from a file""" - env = "ini-test-env" - api_key = "ini-api-key" - controller_host = "ini-controller-host" - - with tempfile.NamedTemporaryFile(mode="w") as f: - f.write( - f""" - [default] - environment: {env} - api_key: {api_key} - controller_host: {controller_host} + def test_resolution_order_kwargs_over_env_vars(self): """ - ) - f.flush() - - pinecone.init(config=f.name) - - assert Config.API_KEY == api_key - assert Config.ENVIRONMENT == env - assert Config.CONTROLLER_HOST == controller_host - - -def test_resolution_order_kwargs_over_env_vars(): - """ - Test that when config is present from multiple sources, - the order of precedence is kwargs > env vars - """ - os.environ["PINECONE_ENVIRONMENT"] = "env-var-env" - os.environ["PINECONE_API_KEY"] = "env-var-api-key" - os.environ["PINECONE_CONTROLLER_HOST"] = "env-var-controller-host" - - env = "kwargs-env" - api_key = "kwargs-api-key" - controller_host = "kwargs-controller-host" - - pinecone.init(environment=env, api_key=api_key, host=controller_host) - - assert Config.API_KEY == api_key - assert Config.ENVIRONMENT == env - assert Config.CONTROLLER_HOST == controller_host - - -def test_resolution_order_kwargs_over_config_file(): - """ - Test that when config is present from multiple sources, the order of - precedence is kwargs > config file - """ - env = "ini-test-env" - api_key = "ini-api-key" - controller_host = "ini-controller-host" - - kwargs_api_key = "kwargs-api-key" - - with tempfile.NamedTemporaryFile(mode="w") as f: - f.write( - f""" - [default] - environment: {env} - api_key: {api_key} - controller_host: {controller_host} + Test that when config is present from multiple sources, + the order of precedence is kwargs > env vars """ - ) - f.flush() - - pinecone.init(api_key=kwargs_api_key, config=f.name) - - # Properties passed as kwargs take precedence over config file - assert Config.API_KEY == kwargs_api_key - - # Properties not passed as kwargs loaded from config file - assert Config.ENVIRONMENT == env - assert Config.CONTROLLER_HOST == controller_host - - -def test_resolution_order_env_vars_over_config_file(): - """ - Test that when config is present from multiple sources, the order of precedence is - env vars > config file - """ - - os.environ["PINECONE_ENVIRONMENT"] = "env-var-env" - os.environ["PINECONE_API_KEY"] = "env-var-api-key" - os.environ["PINECONE_CONTROLLER_HOST"] = "env-var-controller-host" - - with tempfile.NamedTemporaryFile(mode="w") as f: - f.write( - f""" - [default] - environment: ini-test-env - api_key: ini-api-key - controller_host: ini-controller-host - """ - ) - f.flush() - - pinecone.init(config=f.name) - - assert Config.API_KEY == "env-var-api-key" - assert Config.ENVIRONMENT == "env-var-env" - assert Config.CONTROLLER_HOST == "env-var-controller-host" - + os.environ["PINECONE_API_KEY"] = "env-var-api-key" + os.environ["PINECONE_CONTROLLER_HOST"] = "env-var-controller-host" -def test_init_from_mixed_sources(): - """ - Test that even when some vars are found in a higher precedence source, the rest - are still loaded from lower precedence sources - """ + api_key = "kwargs-api-key" + controller_host = "kwargs-controller-host" - os.environ["PINECONE_ENVIRONMENT"] = "env-var-env" - os.environ["PINECONE_API_KEY"] = "env-var-api-key" - controller_host = "kwargs-controller-host" + config = PineconeConfig(api_key=api_key, host=controller_host) - pinecone.init(host=controller_host) + assert config.API_KEY == api_key + assert config.HOST == controller_host - assert Config.API_KEY == "env-var-api-key" - assert Config.ENVIRONMENT == "env-var-env" - assert Config.CONTROLLER_HOST == controller_host + def test_errors_when_no_api_key_is_present(self): + with pytest.raises(PineconeConfigurationError): + PineconeConfig() \ No newline at end of file diff --git a/tests/unit/test_control.py b/tests/unit/test_control.py new file mode 100644 index 00000000..b15c5acd --- /dev/null +++ b/tests/unit/test_control.py @@ -0,0 +1,66 @@ +import pytest +from pinecone import Pinecone +from pinecone.core.client.api import IndexOperationsApi +import time + +class TestControl: + def test_default_host(self): + p = Pinecone(api_key="123-456-789") + assert p.index_api.api_client.configuration.host == "https://api.pinecone.io" + + def test_passing_host(self): + p = Pinecone(api_key="123-456-789", host="my-host") + assert p.index_api.api_client.configuration.host == "my-host" + + + @pytest.mark.parametrize("timeout_value, describe_index_responses, expected_describe_index_calls, expected_sleep_calls", [ + # When timeout=None, describe_index is called until ready + (None, [{ "status": {"ready": False}}, {"status": {"ready": True}}], 2, 1), + + # Timeout of 10 seconds, describe_index called 3 times, sleep twice + (10, [{"status": {"ready": False}}, {"status": {"ready": False}}, {"status": {"ready": True}}], 3, 2), + + # When timeout=-1, create_index returns immediately without calling describe_index or sleep + (-1, [{"status": {"ready": False}}], 0, 0), + ]) + def test_create_index_with_timeout(self, mocker, timeout_value, describe_index_responses, expected_describe_index_calls, expected_sleep_calls): + mocker.patch.object(IndexOperationsApi, 'describe_index', side_effect=describe_index_responses) + mocker.patch.object(IndexOperationsApi, 'create_index') + mocker.patch('time.sleep') + + p = Pinecone(api_key="123-456-789") + p.create_index("my-index", 10, timeout=timeout_value, cloud="aws", region="us-west1", capacity_mode="pod") + + assert IndexOperationsApi.create_index.call_count == 1 + assert IndexOperationsApi.describe_index.call_count == expected_describe_index_calls + assert time.sleep.call_count == expected_sleep_calls + + def test_create_index_when_timeout_exceeded(self, mocker): + with pytest.raises(TimeoutError): + get_status_responses = [{"status": {"ready": False}}] * 5 + mocker.patch.object(IndexOperationsApi, 'describe_index', side_effect=get_status_responses) + mocker.patch.object(IndexOperationsApi, 'create_index') + mocker.patch('time.sleep') + + p = Pinecone(api_key="123-456-789") + p.create_index("my-index", 10, timeout=10, cloud="aws", region="us-west1", capacity_mode="pod") + + # @pytest.mark.parametrize("timeout_value, list_indexes_calls, time_sleep_calls, list_indexes_responses", [ + # # No timeout, list_indexes called twice, sleep called once + # (None, 2, 1, [["my-index", "index-1"], ["index-1"]]), + # # Timeout of 10 seconds, list_indexes called 3 times, sleep twice + # (10, 3, 2, [["my-index", "index-1"], ["my-index", "index-1"], ["index-1"]]), + # # Timeout of -1 seconds, list_indexes not called, no sleep + # (-1, 0, 0, [["my-index", "index-1"]]), + # ]) + # def test_delete_index_with_timeout(self, mocker, timeout_value, list_indexes_calls, time_sleep_calls, list_indexes_responses): + # api_instance_mock = mocker.Mock() + # api_instance_mock.list_indexes = mocker.Mock(side_effect=list_indexes_responses) + # mocker.patch('pinecone.manage._get_api_instance', return_value=api_instance_mock) + # mocker.patch('time.sleep') + + # pinecone.manage.delete_index("my-index", timeout=timeout_value) + + # pinecone.manage._get_api_instance.assert_called_once() + # assert api_instance_mock.list_indexes.call_count == list_indexes_calls + # assert time.sleep.call_count == time_sleep_calls \ No newline at end of file diff --git a/tests/unit/test_grpc_index.py b/tests/unit/test_grpc_index.py index 036ac126..b98ce768 100644 --- a/tests/unit/test_grpc_index.py +++ b/tests/unit/test_grpc_index.py @@ -4,7 +4,7 @@ import pandas as pd import pytest -import pinecone +from pinecone import Config, GRPCIndex from pinecone import DescribeIndexStatsRequest from pinecone.core.grpc.protos.vector_service_pb2 import ( Vector, @@ -18,8 +18,7 @@ UpsertResponse, SparseValues, ) -from pinecone.utils import dict_to_proto_struct - +from pinecone.data.grpc.utils import dict_to_proto_struct class TestGrpcIndex: def setup_method(self): @@ -35,8 +34,8 @@ def setup_method(self): self.filter1 = {"genre": {"$in": ["action"]}} self.filter2 = {"year": {"$eq": 2020}} - pinecone.init(api_key="example-key") - self.index = pinecone.GRPCIndex("example-name") + self.config = Config(api_key='test-api-key', host='foo') + self.index = GRPCIndex(config=self.config, index_name="example-name", _endpoint_override="test-endpoint") self.expected_vec1 = Vector(id="vec1", values=self.vals1, metadata={}) self.expected_vec2 = Vector(id="vec2", values=self.vals2, metadata={}) diff --git a/tests/unit/test_index.py b/tests/unit/test_index.py index 28d502b3..de90bad2 100644 --- a/tests/unit/test_index.py +++ b/tests/unit/test_index.py @@ -1,13 +1,13 @@ import pandas as pd import numpy as np import pytest -import warnings import pinecone +from pinecone.config import PineconeConfig +from pinecone import Index from pinecone import UpsertRequest, Vector from pinecone import DescribeIndexStatsRequest, ScoredVector, QueryResponse, UpsertResponse, SparseValues - class TestRestIndex: def setup_method(self): self.vector_dim = 8 @@ -29,8 +29,7 @@ def setup_method(self): self.svv2 = [0.1, 0.2, 0.3] self.sv2 = {"indices": self.svi2, "values": self.svv2} - pinecone.init(api_key='example-key') - self.index = pinecone.Index('example-name') + self.index = Index(api_key='asdf', host='https://test.pinecone.io') # region: upsert tests @@ -133,7 +132,7 @@ def test_upsert_parallelUpsert_callUpsertParallel(self, mocker): [Vector(id="vec1", values=self.vals1, metadata=self.md1)], [Vector(id="vec2", values=self.vals2, metadata=self.md2)], ] - with pinecone.Index("example-index", pool_threads=30) as index: + with Index(api_key="asdf", host="https://test.pinecone.io", pool_threads=30) as index: mocker.patch.object(index._vector_api, "upsert", autospec=True) # Send requests in parallel diff --git a/tests/unit/test_manage.py b/tests/unit/test_manage.py deleted file mode 100644 index 0a65d67c..00000000 --- a/tests/unit/test_manage.py +++ /dev/null @@ -1,54 +0,0 @@ -import pytest -import pinecone -import time - -class TestManage: - - def test_get_api_instance_without_host(self): - pinecone.init(api_key="123-456-789", environment="my-environment") - api_instance = pinecone.manage._get_api_instance() - assert api_instance.api_client.configuration.host == "https://controller.my-environment.pinecone.io" - - def test_get_api_instance_with_host(self): - pinecone.init(api_key="123-456-789", environment="my-environment", host="my-host") - api_instance = pinecone.manage._get_api_instance() - assert api_instance.api_client.configuration.host == "my-host" - - @pytest.mark.parametrize("timeout_value, get_status_calls, time_sleep_calls, get_status_responses", [ - # No timeout, _get_status called twice, sleep called once - (None, 2, 1, [{"ready": False}, {"ready": True}]), - # Timeout of 10 seconds, _get_status called 3 times, sleep twice - (10, 3, 2, [{"ready": False}, {"ready": False}, {"ready": True}]), - # Timeout of -1 seconds, _get_status not called, no sleep - (-1, 0, 0, [{"ready": False}]), - ]) - def test_create_index_with_timeout(self, mocker, timeout_value, get_status_calls, time_sleep_calls, get_status_responses): - mocker.patch('pinecone.manage._get_api_instance', return_value=mocker.Mock()) - mocker.patch('pinecone.manage._get_status', side_effect=get_status_responses) - mocker.patch('time.sleep') - - pinecone.manage.create_index("my-index", 10, timeout=timeout_value, cloud="aws", region="us-west1", capacity_mode="pod") - - pinecone.manage._get_api_instance.assert_called_once() - assert pinecone.manage._get_status.call_count == get_status_calls - assert time.sleep.call_count == time_sleep_calls - - @pytest.mark.parametrize("timeout_value, list_indexes_calls, time_sleep_calls, list_indexes_responses", [ - # No timeout, list_indexes called twice, sleep called once - (None, 2, 1, [["my-index", "index-1"], ["index-1"]]), - # Timeout of 10 seconds, list_indexes called 3 times, sleep twice - (10, 3, 2, [["my-index", "index-1"], ["my-index", "index-1"], ["index-1"]]), - # Timeout of -1 seconds, list_indexes not called, no sleep - (-1, 0, 0, [["my-index", "index-1"]]), - ]) - def test_delete_index_with_timeout(self, mocker, timeout_value, list_indexes_calls, time_sleep_calls, list_indexes_responses): - api_instance_mock = mocker.Mock() - api_instance_mock.list_indexes = mocker.Mock(side_effect=list_indexes_responses) - mocker.patch('pinecone.manage._get_api_instance', return_value=api_instance_mock) - mocker.patch('time.sleep') - - pinecone.manage.delete_index("my-index", timeout=timeout_value) - - pinecone.manage._get_api_instance.assert_called_once() - assert api_instance_mock.list_indexes.call_count == list_indexes_calls - assert time.sleep.call_count == time_sleep_calls \ No newline at end of file