Skip to content

Commit

Permalink
cloudvision/Connector: Add support to set Custom Index Schema in GRPC…
Browse files Browse the repository at this point in the history
…Client

Why:
This changes add ability to set the schema on Aeris
for given path, which enables us to run search queries on the Aeris.

fixes=BUG830390

Change-Id: Ief52d6df8a64051eb303182b53122810f52ae93a
  • Loading branch information
BarsopiaPinkesh committed Aug 24, 2023
1 parent 3c2cee7 commit b5d40aa
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 4 deletions.
49 changes: 45 additions & 4 deletions cloudvision/Connector/grpc_client/grpcClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import grpc
from google.protobuf import timestamp_pb2 as pbts
from google.protobuf.empty_pb2 import Empty

from cloudvision import __version__ as version
from cloudvision.Connector import codec
Expand All @@ -22,6 +23,8 @@
TIME_TYPE = Union[pbts.Timestamp, datetime]
UPDATE_TYPE = Tuple[Any, Any]
UPDATES_TYPE = List[UPDATE_TYPE]
DEFAULT_DELETE_AFTER_DAYS = 34
DATASET_TYPE_DEVICE = "device"


def to_pbts(ts: TIME_TYPE) -> pbts.Timestamp:
Expand All @@ -35,7 +38,7 @@ def to_pbts(ts: TIME_TYPE) -> pbts.Timestamp:
raise TypeError("timestamp must be a datetime or protobuf timestamp")


def create_query(pathKeys: List[Any], dId: str, dtype: str = "device") -> rtr.Query:
def create_query(pathKeys: List[Any], dId: str, dtype: str = DATASET_TYPE_DEVICE) -> rtr.Query:
"""
create_query creates a protobuf query message with dataset ID dId
and dataset type dtype.
Expand Down Expand Up @@ -271,7 +274,7 @@ def publish(
self,
dId,
notifs: List[ntf.Notification],
dtype: str = "device",
dtype: str = DATASET_TYPE_DEVICE,
sync: bool = True,
compare: Optional[UPDATE_TYPE] = None,
) -> None:
Expand All @@ -291,7 +294,7 @@ def publish(

req = rtr.PublishRequest(
batch=ntf.NotificationBatch(
d="device",
d=DATASET_TYPE_DEVICE,
dataset=ntf.Dataset(type=dtype, name=dId),
notifications=notifs,
),
Expand Down Expand Up @@ -341,7 +344,7 @@ def decode_notification(self, notif):
def search(
self,
search_type=rtr.SearchRequest.CUSTOM,
d_type: str = "device",
d_type: str = DATASET_TYPE_DEVICE,
d_name: str = "",
result_size: int = 1,
start: Optional[TIME_TYPE] = None,
Expand Down Expand Up @@ -382,6 +385,44 @@ def search(
res = self.__search_client.Search(req)
return (self.decode_batch(nb) for nb in res)

def set_custom_schema(
self,
d_name: str,
path_elements: Iterable[str],
schema: Iterable[rtr.IndexField],
delete_after_days: int = DEFAULT_DELETE_AFTER_DAYS,
d_type: str = DATASET_TYPE_DEVICE,
) -> Empty:
"""Set custom index schema for given path.
:param d_name: Dataset name on aeris
:param path_elements: Path elements for which schema needs to be set
:param schema: Schema to be set
:param delete_after_days: Number of days after which data would be deleted
:param d_type: Type of the dataset
"""
req = self.create_custom_schema_index_request(
d_name, path_elements, schema, delete_after_days, d_type)
return self.__search_client.SetCustomSchema(req)

def create_custom_schema_index_request(
self,
d_name,
path_elements, schema,
delete_after_days, d_type
) -> rtr.CustomIndexSchema:
encoded_path_elements = [self.encoder.encode(x) for x in path_elements]
req = rtr.CustomIndexSchema(
query=rtr.Query
(
dataset=ntf.Dataset(type=d_type, name=d_name),
paths=[rtr.Path(path_elements=encoded_path_elements)],
),
schema=schema,
option=rtr.CustomIndexOptions(delete_after_days=delete_after_days)
)
return req

def reenroll(self, cert_path: str, key_path: str) -> bytes:
"""Reenroll the existing certificate.
Expand Down
22 changes: 22 additions & 0 deletions test/connector/grpc_client/test_grpc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from cloudvision import __version__ as version
from cloudvision.Connector.grpc_client import GRPCClient
from cloudvision.Connector.gen import router_pb2 as rtr


class TestGRPCClient:
Expand Down Expand Up @@ -68,3 +69,24 @@ def test_channel_options_overrides(self, given, want):
assert hasattr(client, "channel_options")
got = client.channel_options
assert sorted(got) == sorted(want)

def test_create_custom_schema_index_request(self):
client = GRPCClient("localhost:443")
d_name = "dataset_name"
path_elements = ["path", "element"]
schema = [rtr.IndexField(name="FieldName1", type=rtr.INTEGER),
rtr.IndexField(name="FieldName1", type=rtr.FLOAT)]
d_type = "device"
delete_after_days = 50
request = client.create_custom_schema_index_request(
d_name, path_elements, schema, delete_after_days, d_type)
assert len(request.schema) == len(schema)
for idx, fieldSchema in enumerate(request.schema):
assert fieldSchema == schema[idx]
assert request.option.delete_after_days == delete_after_days
assert request.query.dataset.name == d_name
assert request.query.dataset.type == d_type
assert len(request.query.paths) == 1
path = request.query.paths[0]
for idx, path_element in enumerate([client.encoder.encode(x) for x in path_elements]):
assert path_element == path.path_elements[idx]

0 comments on commit b5d40aa

Please sign in to comment.