diff --git a/pinecone/core/openapi/shared/api_client.py b/pinecone/core/openapi/shared/api_client.py index 7ec644c5..a35aa166 100644 --- a/pinecone/core/openapi/shared/api_client.py +++ b/pinecone/core/openapi/shared/api_client.py @@ -2,6 +2,7 @@ import atexit import mimetypes from multiprocessing.pool import ThreadPool +from concurrent.futures import ThreadPoolExecutor import io import os import re @@ -70,6 +71,7 @@ class ApiClient(object): """ _pool = None + _threadpool_executor = None def __init__(self, configuration=None, header_name=None, header_value=None, cookie=None, pool_threads=1): if configuration is None: @@ -92,6 +94,9 @@ def __exit__(self, exc_type, exc_value, traceback): self.close() def close(self): + if self._threadpool_executor: + self._threadpool_executor.shutdown() + self._threadpool_executor = None if self._pool: self._pool.close() self._pool.join() @@ -109,6 +114,12 @@ def pool(self): self._pool = ThreadPool(self.pool_threads) return self._pool + @property + def threadpool_executor(self): + if self._threadpool_executor is None: + self._threadpool_executor = ThreadPoolExecutor(max_workers=self.pool_threads) + return self._threadpool_executor + @property def user_agent(self): """User agent for this API client""" @@ -334,6 +345,7 @@ def call_api( response_type: typing.Optional[typing.Tuple[typing.Any]] = None, auth_settings: typing.Optional[typing.List[str]] = None, async_req: typing.Optional[bool] = None, + async_threadpool_executor: typing.Optional[bool] = None, _return_http_data_only: typing.Optional[bool] = None, collection_formats: typing.Optional[typing.Dict[str, str]] = None, _preload_content: bool = True, @@ -394,6 +406,27 @@ def call_api( If parameter async_req is False or missing, then the method will return the response directly. """ + if async_threadpool_executor: + return self.threadpool_executor.submit( + self.__call_api, + resource_path, + method, + path_params, + query_params, + header_params, + body, + post_params, + files, + response_type, + auth_settings, + _return_http_data_only, + collection_formats, + _preload_content, + _request_timeout, + _host, + _check_type, + ) + if not async_req: return self.__call_api( resource_path, @@ -690,6 +723,7 @@ def __init__(self, settings=None, params_map=None, root_map=None, headers_map=No self.params_map["all"].extend( [ "async_req", + "async_threadpool_executor", "_host_index", "_preload_content", "_request_timeout", @@ -704,6 +738,7 @@ def __init__(self, settings=None, params_map=None, root_map=None, headers_map=No self.openapi_types = root_map["openapi_types"] extra_types = { "async_req": (bool,), + "async_threadpool_executor": (bool, ), "_host_index": (none_type, int), "_preload_content": (bool,), "_request_timeout": (none_type, float, (float,), [float], int, (int,), [int]), @@ -853,6 +888,7 @@ def call_with_http_info(self, **kwargs): response_type=self.settings["response_type"], auth_settings=self.settings["auth"], async_req=kwargs["async_req"], + async_threadpool_executor=kwargs.get("async_threadpool_executor", None), _check_type=kwargs["_check_return_type"], _return_http_data_only=kwargs["_return_http_data_only"], _preload_content=kwargs["_preload_content"], diff --git a/pinecone/core/openapi/shared/configuration.py b/pinecone/core/openapi/shared/configuration.py index 96b9a4a3..83ddad29 100644 --- a/pinecone/core/openapi/shared/configuration.py +++ b/pinecone/core/openapi/shared/configuration.py @@ -469,3 +469,23 @@ def host(self, value): """Fix base path.""" self._base_path = value self.server_index = None + + def __repr__(self): + attrs = [ + f"host={self.host}", + f"api_key=***", + f"api_key_prefix={self.api_key_prefix}", + f"access_token={self.access_token}", + f"connection_pool_maxsize={self.connection_pool_maxsize}", + f"username={self.username}", + f"password={self.password}", + f"discard_unknown_keys={self.discard_unknown_keys}", + f"disabled_client_side_validations={self.disabled_client_side_validations}", + f"server_index={self.server_index}", + f"server_variables={self.server_variables}", + f"server_operation_index={self.server_operation_index}", + f"server_operation_variables={self.server_operation_variables}", + f"ssl_ca_cert={self.ssl_ca_cert}", + + ] + return f"Configuration({', '.join(attrs)})" diff --git a/pinecone/data/index.py b/pinecone/data/index.py index 52628511..eb406250 100644 --- a/pinecone/data/index.py +++ b/pinecone/data/index.py @@ -1,6 +1,7 @@ from tqdm.autonotebook import tqdm import logging +import json from typing import Union, List, Optional, Dict, Any from pinecone.config import ConfigBuilder @@ -34,7 +35,9 @@ from .features.bulk_import import ImportFeatureMixin from .vector_factory import VectorFactory from .query_results_aggregator import QueryResultsAggregator, QueryNamespacesResults + from multiprocessing.pool import ApplyResult +from concurrent.futures import as_completed from pinecone_plugin_interface import load_and_install as install_plugins @@ -67,6 +70,7 @@ "_check_return_type", "_host_index", "async_req", + "async_threadpool_executor", ) @@ -447,7 +451,7 @@ def query( **kwargs, ) - if kwargs.get("async_req", False): + if kwargs.get("async_req", False) or kwargs.get("async_threadpool_executor", False): return response else: return parse_query_response(response) @@ -491,6 +495,7 @@ def _query( ("sparse_vector", sparse_vector), ] ) + response = self._vector_api.query( QueryRequest( **args_dict, @@ -566,7 +571,7 @@ def query_namespaces( aggregator = QueryResultsAggregator(top_k=overall_topk) target_namespaces = set(namespaces) # dedup namespaces - async_results = [ + async_futures = [ self.query( vector=vector, namespace=ns, @@ -575,14 +580,16 @@ def query_namespaces( include_values=include_values, include_metadata=include_metadata, sparse_vector=sparse_vector, - async_req=True, + async_threadpool_executor=True, + _preload_content=False, **kwargs, ) for ns in target_namespaces ] - for result in async_results: - response = result.get() + for result in as_completed(async_futures): + raw_result = result.result() + response = json.loads(raw_result.data.decode("utf-8")) aggregator.add_results(response) final_results = aggregator.get_results() diff --git a/poetry.lock b/poetry.lock index 3e918a7e..88303c9f 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1071,6 +1071,17 @@ files = [ googleapis-common-protos = "*" protobuf = ">=4.21.0" +[[package]] +name = "py-cpuinfo" +version = "9.0.0" +description = "Get CPU info with pure Python" +optional = false +python-versions = "*" +files = [ + {file = "py-cpuinfo-9.0.0.tar.gz", hash = "sha256:3cdbbf3fac90dc6f118bfd64384f309edeadd902d7c8fb17f02ffa1fc3f49690"}, + {file = "py_cpuinfo-9.0.0-py3-none-any.whl", hash = "sha256:859625bc251f64e21f077d099d4162689c762b5d6a4c3c97553d56241c9674d5"}, +] + [[package]] name = "pygments" version = "2.16.1" @@ -1124,6 +1135,26 @@ pytest = ">=5.4.0" [package.extras] testing = ["coverage", "hypothesis (>=5.7.1)"] +[[package]] +name = "pytest-benchmark" +version = "5.0.0" +description = "A ``pytest`` fixture for benchmarking code. It will group the tests into rounds that are calibrated to the chosen timer." +optional = false +python-versions = ">=3.9" +files = [ + {file = "pytest-benchmark-5.0.0.tar.gz", hash = "sha256:cd0adf68516eea7ac212b78a7eb6fc3373865507de8562bb3bfff2f2f852cc63"}, + {file = "pytest_benchmark-5.0.0-py3-none-any.whl", hash = "sha256:67fed4943aa761077345119555d7f6df09877a12a36e8128f05e19ccd5942d80"}, +] + +[package.dependencies] +py-cpuinfo = "*" +pytest = ">=3.8" + +[package.extras] +aspect = ["aspectlib"] +elasticsearch = ["elasticsearch"] +histogram = ["pygal", "pygaljs", "setuptools"] + [[package]] name = "pytest-cov" version = "2.10.1" @@ -1586,4 +1617,4 @@ grpc = ["googleapis-common-protos", "grpcio", "grpcio", "lz4", "protobuf", "prot [metadata] lock-version = "2.0" python-versions = "^3.8" -content-hash = "d680a8699ebcc13e2369221c4fa987d1206f1a5549393055040fa78dac4da5be" +content-hash = "0823a7b71260e2e723e281446f1995a1033a3301cbb933b605b202ea55583e4d" diff --git a/pyproject.toml b/pyproject.toml index 7c79e3d9..9d170687 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -82,6 +82,9 @@ pytest-asyncio = "0.15.1" pytest-cov = "2.10.1" pytest-mock = "3.6.1" pytest-timeout = "2.2.0" +pytest-benchmark = [ + { version = '5.0.0', python = ">=3.9,<4.0" } +] urllib3_mock = "0.3.3" responses = ">=0.8.1" ddtrace = "^2.14.4" diff --git a/tests/perf/test_query_namespaces.py b/tests/perf/test_query_namespaces.py new file mode 100644 index 00000000..22a2492a --- /dev/null +++ b/tests/perf/test_query_namespaces.py @@ -0,0 +1,45 @@ +import time +import random +import pytest +from pinecone import Pinecone +from pinecone.grpc import PineconeGRPC + +latencies = [] + + +def call_n_threads(index): + query_vec = [random.random() for i in range(1024)] + start = time.time() + combined_results = index.query_namespaces( + vector=query_vec, + namespaces=["ns1", "ns2", "ns3", "ns4"], + include_values=False, + include_metadata=True, + filter={"publication_date": {"$eq": "Last3Months"}}, + top_k=1000, + ) + finish = time.time() + # print(f"Query took {finish-start} seconds") + latencies.append(finish - start) + + return combined_results + + +class TestQueryNamespacesRest: + @pytest.mark.parametrize("n_threads", [4]) + def test_query_namespaces_grpc(self, benchmark, n_threads): + pc = PineconeGRPC() + index = pc.Index( + host="jen1024-dojoi3u.svc.apw5-4e34-81fa.pinecone.io", pool_threads=n_threads + ) + benchmark.pedantic(call_n_threads, (index,), rounds=10, warmup_rounds=1, iterations=5) + + @pytest.mark.parametrize("n_threads", [4]) + def test_query_namespaces_rest(self, benchmark, n_threads): + pc = Pinecone() + index = pc.Index( + host="jen1024-dojoi3u.svc.apw5-4e34-81fa.pinecone.io", + pool_threads=n_threads, + connection_pool_maxsize=20, + ) + benchmark.pedantic(call_n_threads, (index,), rounds=10, warmup_rounds=1, iterations=5) diff --git a/tests/perf/test_query_results_aggregator.py b/tests/perf/test_query_results_aggregator.py new file mode 100644 index 00000000..29ac4c35 --- /dev/null +++ b/tests/perf/test_query_results_aggregator.py @@ -0,0 +1,24 @@ +import random +from pinecone.data.query_results_aggregator import QueryResultsAggregator + + +def fake_results(i): + matches = [ + {"id": f"id{i}", "score": random.random(), "values": [random.random() for _ in range(768)]} + for _ in range(1000) + ] + matches.sort(key=lambda x: x["score"], reverse=True) + return {"namespace": f"ns{i}", "matches": matches} + + +def aggregate_results(responses): + ag = QueryResultsAggregator(1000) + for response in responses: + ag.add_results(response) + return ag.get_results() + + +class TestQueryResultsAggregatorPerf: + def test_my_stuff(self, benchmark): + responses = [fake_results(i) for i in range(10)] + benchmark(aggregate_results, responses)