Skip to content

Commit

Permalink
Add integration tests for async_req with upsert, fetch, delete
Browse files Browse the repository at this point in the history
  • Loading branch information
jhamon committed Oct 31, 2024
1 parent edfe31d commit a75805d
Show file tree
Hide file tree
Showing 7 changed files with 290 additions and 13 deletions.
4 changes: 4 additions & 0 deletions pinecone/grpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,24 @@
from .index_grpc import GRPCIndex
from .pinecone import PineconeGRPC
from .config import GRPCClientConfig
from .future import PineconeGrpcFuture

from pinecone.core.grpc.protos.vector_service_pb2 import (
Vector as GRPCVector,
SparseValues as GRPCSparseValues,
Vector,
SparseValues,
DeleteResponse as GRPCDeleteResponse,
)

__all__ = [
"GRPCIndex",
"PineconeGRPC",
"GRPCDeleteResponse",
"GRPCClientConfig",
"GRPCVector",
"GRPCSparseValues",
"Vector",
"SparseValues",
"PineconeGrpcFuture",
]
20 changes: 15 additions & 5 deletions pinecone/grpc/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,26 @@


class PineconeGrpcFuture(ConcurrentFuture):
def __init__(self, grpc_future: GrpcFuture, timeout: Optional[int] = 10):
def __init__(
self, grpc_future: GrpcFuture, timeout: Optional[int] = 10, result_transformer=None
):
super().__init__()
self._grpc_future = grpc_future
self.default_timeout = timeout # seconds
self.result_transformer = result_transformer

# Sync initial state, in case the gRPC future is already done
self._sync_state(self._grpc_future)

# Add callback to subscribe to updates from the gRPC future
self._grpc_future.add_done_callback(self._sync_state)

# Sync initial state, in case the gRPC future is already done
self._sync_state(self._grpc_future)
@property
def grpc_future(self):
return self._grpc_future

def _sync_state(self, grpc_future):
# Sync the gRPC future completion to the wrapper future
if self.done():
# Future already done, nothing to do
return

if grpc_future.cancelled():
Expand All @@ -35,6 +40,11 @@ def _sync_state(self, grpc_future):
elif grpc_future.running():
self.set_running_or_notify_cancel()

def set_result(self, result):
if self.result_transformer:
result = self.result_transformer(result)
return super().set_result(result)

def cancel(self):
self._grpc_future.cancel()
return super().cancel()
Expand Down
16 changes: 12 additions & 4 deletions pinecone/grpc/index_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,11 @@ def delete(
return self.runner.run(self.stub.Delete, request, timeout=timeout)

def fetch(
self, ids: Optional[List[str]], namespace: Optional[str] = None, **kwargs
self,
ids: Optional[List[str]],
namespace: Optional[str] = None,
async_req: Optional[bool] = False,
**kwargs,
) -> FetchResponse:
"""
The fetch operation looks up and returns vectors, by ID, from a single namespace.
Expand All @@ -304,9 +308,13 @@ def fetch(
args_dict = self._parse_non_empty_args([("namespace", namespace)])

request = FetchRequest(ids=ids, **args_dict, **kwargs)
response = self.runner.run(self.stub.Fetch, request, timeout=timeout)
json_response = json_format.MessageToDict(response)
return parse_fetch_response(json_response)

if async_req:
future = self.runner.run(self.stub.Fetch.future, request, timeout=timeout)
return PineconeGrpcFuture(future, result_transformer=parse_fetch_response)
else:
response = self.runner.run(self.stub.Fetch, request, timeout=timeout)
return parse_fetch_response(response)

def query(
self,
Expand Down
13 changes: 9 additions & 4 deletions pinecone/grpc/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from typing import Optional
from google.protobuf import json_format
from google.protobuf.message import Message

import uuid

from pinecone.core.openapi.data.models import (
Expand Down Expand Up @@ -35,10 +38,12 @@ def parse_sparse_values(sparse_values: dict):
)


def parse_fetch_response(response: dict):
def parse_fetch_response(response: Message):
json_response = json_format.MessageToDict(response)

vd = {}
vectors = response.get("vectors", {})
namespace = response.get("namespace", "")
vectors = json_response.get("vectors", {})
namespace = json_response.get("namespace", "")

for id, vec in vectors.items():
vd[id] = _Vector(
Expand All @@ -52,7 +57,7 @@ def parse_fetch_response(response: dict):
return FetchResponse(
vectors=vd,
namespace=namespace,
usage=parse_usage(response.get("usage", {})),
usage=parse_usage(json_response.get("usage", {})),
_check_type=False,
)

Expand Down
30 changes: 30 additions & 0 deletions tests/integration/data/test_delete_future.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import os
import pytest
from pinecone import Vector
from pinecone.grpc import GRPCDeleteResponse
from ..helpers import poll_stats_for_namespace


class TestDeleteFuture:
@pytest.mark.skipif(
os.getenv("USE_GRPC") != "true", reason="PineconeGrpcFutures only returned from grpc client"
)
def test_delete_future(self, idx, namespace):
idx.upsert(
vectors=[
Vector(id="id1", values=[0.1, 0.2]),
Vector(id="id2", values=[0.1, 0.2]),
Vector(id="id3", values=[0.1, 0.2]),
],
namespace=namespace,
)
poll_stats_for_namespace(idx, namespace, 3)

delete_one = idx.delete(ids=["id1"], namespace=namespace, async_req=True)
delete_namespace = idx.delete(namespace=namespace, delete_all=True, async_req=True)

from concurrent.futures import as_completed

for future in as_completed([delete_one, delete_namespace], timeout=10):
resp = future.result()
assert isinstance(resp, GRPCDeleteResponse)
99 changes: 99 additions & 0 deletions tests/integration/data/test_fetch_future.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import os
import pytest
from pinecone.grpc import PineconeGrpcFuture


@pytest.mark.skipif(
os.getenv("USE_GRPC") != "true", reason="PineconeGrpcFutures only returned from grpc client"
)
class TestFetchFuture:
def setup_method(self):
self.expected_dimension = 2

def test_fetch_multiple_by_id(self, idx, namespace):
target_namespace = namespace

results = idx.fetch(ids=["1", "2", "4"], namespace=target_namespace, async_req=True)
assert isinstance(results, PineconeGrpcFuture)

from concurrent.futures import wait, FIRST_COMPLETED

done, _ = wait([results], return_when=FIRST_COMPLETED)

results = done.pop().result()
assert results.usage is not None
assert results.usage["read_units"] is not None
assert results.usage["read_units"] > 0

assert results.namespace == target_namespace
assert len(results.vectors) == 3
assert results.vectors["1"].id == "1"
assert results.vectors["2"].id == "2"
# Metadata included, if set
assert results.vectors["1"].metadata is None
assert results.vectors["2"].metadata is None
assert results.vectors["4"].metadata is not None
assert results.vectors["4"].metadata["genre"] == "action"
assert results.vectors["4"].metadata["runtime"] == 120
# Values included
assert results.vectors["1"].values is not None
assert len(results.vectors["1"].values) == self.expected_dimension

def test_fetch_single_by_id(self, idx, namespace):
target_namespace = namespace

future = idx.fetch(ids=["1"], namespace=target_namespace, async_req=True)

from concurrent.futures import wait, FIRST_COMPLETED

done, _ = wait([future], return_when=FIRST_COMPLETED)
results = done.pop().result()

assert results.namespace == target_namespace
assert len(results.vectors) == 1
assert results.vectors["1"].id == "1"
assert results.vectors["1"].metadata is None
assert results.vectors["1"].values is not None
assert len(results.vectors["1"].values) == self.expected_dimension

def test_fetch_nonexistent_id(self, idx, namespace):
target_namespace = namespace

# Fetch id that is missing
future = idx.fetch(ids=["100"], namespace=target_namespace, async_req=True)

from concurrent.futures import wait, FIRST_COMPLETED

done, _ = wait([future], return_when=FIRST_COMPLETED)
results = done.pop().result()

assert results.namespace == target_namespace
assert len(results.vectors) == 0

def test_fetch_nonexistent_namespace(self, idx):
target_namespace = "nonexistent-namespace"

# Fetch from namespace with no vectors
future = idx.fetch(ids=["1"], namespace=target_namespace, async_req=True)

from concurrent.futures import wait, FIRST_COMPLETED

done, _ = wait([future], return_when=FIRST_COMPLETED)
results = done.pop().result()

assert results.namespace == target_namespace
assert len(results.vectors) == 0

def test_fetch_unspecified_namespace(self, idx):
# Fetch without specifying namespace gives default namespace results
future = idx.fetch(ids=["1", "4"], async_req=True)

from concurrent.futures import wait, FIRST_COMPLETED

done, _ = wait([future], return_when=FIRST_COMPLETED)
results = done.pop().result()

assert results.namespace == ""
assert results.vectors["1"].id == "1"
assert results.vectors["1"].values is not None
assert results.vectors["4"].metadata is not None
121 changes: 121 additions & 0 deletions tests/integration/data/test_upsert_future.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import pytest
import os
from pinecone import Vector, PineconeException
from ..helpers import poll_stats_for_namespace
from .utils import embedding_values


class TestUpsertWithAsyncReq:
@pytest.mark.skipif(
os.getenv("USE_GRPC") != "true", reason="PineconeGrpcFutures only returned from grpc client"
)
def test_upsert_to_namespace(self, idx, namespace):
target_namespace = namespace

# Upsert with tuples
upsert1 = idx.upsert(
vectors=[
("1", embedding_values()),
("2", embedding_values()),
("3", embedding_values()),
],
namespace=target_namespace,
async_req=True,
)

# Upsert with objects
upsert2 = idx.upsert(
vectors=[
Vector(id="4", values=embedding_values()),
Vector(id="5", values=embedding_values()),
Vector(id="6", values=embedding_values()),
],
namespace=target_namespace,
async_req=True,
)

# Upsert with dict
upsert3 = idx.upsert(
vectors=[
{"id": "7", "values": embedding_values()},
{"id": "8", "values": embedding_values()},
{"id": "9", "values": embedding_values()},
],
namespace=target_namespace,
async_req=True,
)

poll_stats_for_namespace(idx, target_namespace, 9)

# Check the vector count reflects some data has been upserted
stats = idx.describe_index_stats()
assert stats.total_vector_count >= 9
assert stats.namespaces[target_namespace].vector_count == 9

# Use returned futures
from concurrent.futures import as_completed

total_upserted = 0
for future in as_completed([upsert1, upsert2, upsert3], timeout=10):
total_upserted += future.result().upserted_count

assert total_upserted == 9

@pytest.mark.skipif(
os.getenv("USE_GRPC") != "true", reason="PineconeGrpcFutures only returned from grpc client"
)
def test_upsert_to_namespace_when_failed_req(self, idx, namespace):
target_namespace = namespace

# Upsert with tuples
upsert1 = idx.upsert(
vectors=[
("1", embedding_values()),
("2", embedding_values()),
("3", embedding_values()),
],
namespace=target_namespace,
async_req=True,
)

# Upsert with objects
wrong_dimension = 10
upsert2 = idx.upsert(
vectors=[
Vector(id="4", values=embedding_values(wrong_dimension)),
Vector(id="5", values=embedding_values(wrong_dimension)),
Vector(id="6", values=embedding_values(wrong_dimension)),
],
namespace=target_namespace,
async_req=True,
)

# Upsert with dict
upsert3 = idx.upsert(
vectors=[
{"id": "7", "values": embedding_values()},
{"id": "8", "values": embedding_values()},
{"id": "9", "values": embedding_values()},
],
namespace=target_namespace,
async_req=True,
)

from concurrent.futures import wait, ALL_COMPLETED

done, not_done = wait([upsert1, upsert2, upsert3], timeout=10, return_when=ALL_COMPLETED)

assert len(done) == 3
assert len(not_done) == 0

total_upserted = 0
for future in done:
if future.exception():
assert future is upsert2
assert isinstance(future.exception(), PineconeException)
assert "Vector dimension 10 does not match the dimension of the index 2" in str(
future.exception()
)
else:
total_upserted += future.result().upserted_count
assert total_upserted == 6

0 comments on commit a75805d

Please sign in to comment.