+"""Python implementation of the gRPC API Eigen example server."""
+
+from concurrent import futures
+import logging
+
+import click
+import demo_eigen_wrapper
+import grpc
+import numpy as np
+
+import ansys.eigen.python.grpc.constants as constants
+import ansys.eigen.python.grpc.generated.grpcdemo_pb2 as grpcdemo_pb2
+import ansys.eigen.python.grpc.generated.grpcdemo_pb2_grpc as grpcdemo_pb2_grpc
+
+# =================================================================================================
+# AUXILIARY METHODS for Server operations
+# =================================================================================================
+
+
+
+
[docs]
+
def check_data_type(dtype, new_dtype):
+
"""Check if the new data type is the same as the previous data type.
+
+
Parameters
+
----------
+
dtype : numpy.type
+
Type of the numpy array before processing.
+
new_dtype : numpy.type
+
Type of the numpy array to be processed.
+
+
Returns
+
-------
+
numpy.type
+
Type of the numpy array.
+
+
Raises
+
------
+
RuntimeError
+
In case there is already a type and it does not match that of the new_type argument.
+
"""
+
if dtype is None:
+
return new_dtype
+
elif dtype != new_dtype:
+
raise RuntimeError(
+
"Error while processing data types... Input arguments are of different nature (such as int32, float64)."
+
)
+
else:
+
return dtype
+
+
+
+
+
[docs]
+
def check_size(size, new_size):
+
"""Check if the new parsed size is the same as the previous size.
+
+
Parameters
+
----------
+
size : tuple
+
Size of the numpy array before processing.
+
new_size : _type_
+
Size of the numpy array to process.
+
+
Returns
+
-------
+
tuple
+
Size of the numpy array.
+
+
Raises
+
------
+
RuntimeError
+
In case there is already a size and it does not match that of the new_size argument.
+
"""
+
if size is None:
+
return new_size
+
elif size != new_size:
+
raise RuntimeError(
+
"Error while processing data types... Input arguments are of different sizes."
+
)
+
else:
+
return size
+
+
+
+
+
[docs]
+
class GRPCDemoServicer(grpcdemo_pb2_grpc.GRPCDemoServicer):
+
"""Provides methods that implement functionality of the API Eigen Example server."""
+
+
def __init__(self) -> None:
+
"""No special init is required for the server... unless data is to be stored in a DB. This is to be determined."""
+
# TODO : is it required to store the input vectors in a DB?
+
super().__init__()
+
+
# =================================================================================================
+
# PUBLIC METHODS for Server operations
+
# =================================================================================================
+
+
+
[docs]
+
def SayHello(self, request, context):
+
"""Test the greeter method to see if the server is up and running correctly.
+
+
Parameters
+
----------
+
request : HelloRequest
+
Greeting request sent by the client.
+
context : grpc.ServicerContext
+
gRPC-specific information.
+
+
Returns
+
-------
+
grpcdemo_pb2.HelloReply
+
Reply to greeting by the server.
+
"""
+
click.echo("Greeting requested! Requested by: " + request.name)
+
+
# Inform about the size of the message content
+
click.echo("Size of message: " + constants.human_size(request))
+
+
return grpcdemo_pb2.HelloReply(message="Hello, %s!" % request.name)
+
+
+
+
[docs]
+
def FlipVector(self, request_iterator, context):
+
"""Flip a given vector.
+
+
Parameters
+
----------
+
request_iterator : iterator
+
Iterator to the stream of vector messages provided.
+
context : grpc.ServicerContext
+
gRPC-specific information.
+
+
Returns
+
-------
+
grpcdemo_pb2.Vector
+
Flipped vector message.
+
"""
+
click.echo("Vector flip requested.")
+
+
# Process the metadata
+
md = self._read_client_metadata(context)
+
+
# Process the input messages
+
dtype, size, vector_list = self._get_vectors(request_iterator, md)
+
+
# Flip it --> assuming that only one vector is passed
+
nparray_flipped = np.flip(vector_list[0])
+
+
# Send the response
+
return self._send_vectors(context, nparray_flipped)
+
+
+
+
[docs]
+
def AddVectors(self, request_iterator, context):
+
"""Add vectors.
+
+
Parameters
+
----------
+
request_iterator : iterator
+
Iterator to the stream of vector messages provided.
+
context : grpc.ServicerContext
+
gRPC-specific information.
+
+
Returns
+
-------
+
grpcdemo_pb2.Vector
+
Vector message.
+
"""
+
click.echo("Vector addition requested.")
+
# Process the metadata
+
md = self._read_client_metadata(context)
+
+
# Process the input messages
+
dtype, size, vector_list = self._get_vectors(request_iterator, md)
+
+
# Create an empty array with the input arguments characteristics (dtype, size)
+
result = np.zeros(size, dtype=dtype)
+
+
# Add all provided vectors using the Eigen library
+
for vector in vector_list:
+
# Casting is needed due to interface with Eigen library... Not the desired approach,
+
# but works. Ideally, vectors should be passed directly, but errors appear
+
cast_vector = np.array(vector, dtype=dtype)
+
result = demo_eigen_wrapper.add_vectors(result, cast_vector)
+
+
# Send the response
+
return self._send_vectors(context, result)
+
+
+
+
[docs]
+
def MultiplyVectors(self, request_iterator, context):
+
"""Multiply two vectors.
+
+
Parameters
+
----------
+
request_iterator : iterator
+
Iterator to the stream of vector messages provided.
+
context : grpc.ServicerContext
+
gRPC-specific information.
+
+
Returns
+
-------
+
grpcdemo_pb2.Vector
+
Vector message.
+
"""
+
click.echo("Vector dot product requested")
+
+
# Process the metadata
+
md = self._read_client_metadata(context)
+
+
# Process the input messages
+
dtype, size, vector_list = self._get_vectors(request_iterator, md)
+
+
# Check that the vctor list contains a maximum of two vectors
+
if len(vector_list) != 2:
+
raise RuntimeError(
+
"Unexpected number of vectors to be multiplied: "
+
+ len(vector_list)
+
+ ". Only 2 is valid."
+
)
+
+
# Perform the dot product of the provided vectors using the Eigen library
+
# casting is needed due to interface with Eigen library... Not the desired approach,
+
# but works. Ideally, vectors should be passed directly, but errors appear
+
vec_1 = np.array(vector_list[0], dtype=dtype)
+
vec_2 = np.array(vector_list[1], dtype=dtype)
+
result = demo_eigen_wrapper.multiply_vectors(vec_1, vec_2)
+
+
# Return the result as a numpy.ndarray
+
result = np.array(result, dtype=dtype, ndmin=1)
+
+
# Finally, send the response
+
return self._send_vectors(context, result)
+
+
+
+
[docs]
+
def AddMatrices(self, request_iterator, context):
+
"""Add matrices.
+
+
Parameters
+
----------
+
request_iterator : iterator
+
Iterator to the stream of matrix messages provided.
+
context : grpc.ServicerContext
+
gRPC-specific information.
+
+
Returns
+
-------
+
grpcdemo_pb2.Matrix
+
Matrix message.
+
"""
+
click.echo("Matrix addition requested!")
+
# Process the metadata
+
md = self._read_client_metadata(context)
+
+
# Process the input messages
+
dtype, size, matrix_list = self._get_matrices(request_iterator, md)
+
+
# Create an empty array with the input arguments characteristics (dtype, size)
+
result = np.zeros(size, dtype=dtype)
+
+
# Add all provided matrices using the Eigen library
+
for matrix in matrix_list:
+
# Casting is needed due to interface with Eigen library... Not the desired approach,
+
# but works. Ideally, we would want to pass matrix directly, but errors appear
+
cast_matrix = np.array(matrix, dtype=dtype)
+
result = demo_eigen_wrapper.add_matrices(result, cast_matrix)
+
+
# Send the response
+
return self._send_matrices(context, result)
+
+
+
+
[docs]
+
def MultiplyMatrices(self, request_iterator, context):
+
"""Multiply two matrices.
+
+
Parameters
+
----------
+
request_iterator : iterator
+
Iterator to the stream of Matrix messages provided.
+
context : grpc.ServicerContext
+
gRPC-specific information.
+
+
Returns
+
-------
+
grpcdemo_pb2.Matrix
+
Matrix message.
+
"""
+
click.echo("Matrix multiplication requested.")
+
+
# Process the metadata
+
md = self._read_client_metadata(context)
+
+
# Process the input messages
+
dtype, size, matrix_list = self._get_matrices(request_iterator, md)
+
+
# Check that the matrix list contains a maximum of two matrices
+
if len(matrix_list) != 2:
+
raise RuntimeError(
+
"Unexpected number of matrices to be multiplied: "
+
+ len(matrix_list)
+
+ ". You can only multiple two matrices."
+
)
+
+
# Due to the previous _get_matrices method, the size of all
+
# matrices is the same... check that it is a square matrix. Otherwise, no multiplication
+
# is possible
+
if size[0] != size[1]:
+
raise RuntimeError("Only square matrices are allowed for multiplication.")
+
+
# Perform the matrix multiplication of the provided matrices using the Eigen library
+
# Casting is needed due to interface with Eigen library... Not the desired approach,
+
# but works. Ideally, vector should be passed directly, but errors appear
+
mat_1 = np.array(matrix_list[0], dtype=dtype)
+
mat_2 = np.array(matrix_list[1], dtype=dtype)
+
result = demo_eigen_wrapper.multiply_matrices(mat_1, mat_2)
+
+
# Finally, send the response
+
return self._send_matrices(context, result)
+
+
+
# =================================================================================================
+
# PRIVATE METHODS for Server operations
+
# =================================================================================================
+
+
def _get_vectors(self, request_iterator, md: dict):
+
"""Process a stream of vector messages.
+
+
Parameters
+
----------
+
request_iterator : iterator
+
Iterator to the received request messages of type Vector.
+
md : dict
+
Metadata provided by the client.
+
+
Returns
+
-------
+
np.type, tuple, list of np.array
+
Type of data, size of the vectors, and list of vectors to process.
+
"""
+
# First, determine how many full vector messages are to be processed
+
full_msgs = int(md.get("full-vectors"))
+
+
# Initialize the output vector list and some aux vars
+
vector_list = []
+
dtype = None
+
size = None
+
+
# Loop over the expected full messages
+
for msg in range(1, full_msgs + 1):
+
+
# Find out how many partial vector messages constitute this full vector message
+
chunks = int(md.get("vec%d-messages" % msg))
+
+
# Initialize the output vector
+
vector = None
+
+
# Loop over the expected chunks
+
for chunk_msg in range(chunks):
+
# Read the vector message
+
chunk_vec = next(request_iterator)
+
+
# Inform about the size of the message content
+
click.echo(
+
"Size of message: "
+
+ constants.human_size(chunk_vec.vector_as_chunk)
+
)
+
+
# If processing the first chunk of the message, fill in some data
+
if chunk_msg == 0:
+
# Check the data type of the incoming vector
+
if chunk_vec.data_type == grpcdemo_pb2.DataType.Value("INTEGER"):
+
dtype = check_data_type(dtype, np.int32)
+
elif chunk_vec.data_type == grpcdemo_pb2.DataType.Value("DOUBLE"):
+
dtype = check_data_type(dtype, np.float64)
+
+
# Check the size of the incoming vector
+
size = check_size(size, (chunk_vec.vector_size,))
+
+
# Parse the chunk
+
if vector is None:
+
vector = np.frombuffer(chunk_vec.vector_as_chunk, dtype=dtype)
+
else:
+
tmp = np.frombuffer(chunk_vec.vector_as_chunk, dtype=dtype)
+
vector = np.concatenate((vector, tmp))
+
+
# Check if the final vector has the desired size
+
if vector.size != size[0]:
+
raise RuntimeError("Problems reading client full vector message...")
+
else:
+
# If everything is fine, append to vector_list
+
vector_list.append(vector)
+
+
# Return the input vector list (as a list of numpy.ndarray)
+
return dtype, size, vector_list
+
+
def _get_matrices(self, request_iterator, md: dict):
+
"""Process a stream of matrix messages.
+
+
Parameters
+
----------
+
request_iterator : iterator
+
Iterator to the received request messages of type ``Matrix``.
+
md : dict
+
Metadata provided by the client.
+
+
Returns
+
-------
+
np.type, tuple, list of np.array
+
Type of data, shape of the matrices, and list of matrices to process.
+
"""
+
# Determine how many full matrix messages are to be processed
+
full_msgs = int(md.get("full-matrices"))
+
+
# Initialize the output matrix list and some aux vars
+
matrix_list = []
+
dtype = None
+
size = None
+
+
# Loop over the expected full messages
+
for msg in range(1, full_msgs + 1):
+
+
# Find out how many partial matrix messages constitute this full matrix message
+
chunks = int(md.get("mat%d-messages" % msg))
+
+
# Initialize the output matrix
+
matrix = None
+
+
# Loop over the expected chunks
+
for chunk_msg in range(chunks):
+
# Read the matrix message
+
chunk_mat = next(request_iterator)
+
+
# Inform about the size of the message content
+
click.echo(
+
"Size of message: "
+
+ constants.human_size(chunk_mat.matrix_as_chunk)
+
)
+
+
# If processing the first chunk of the message, fill in some data
+
if chunk_msg == 0:
+
# Check the data type of the incoming matrix
+
if chunk_mat.data_type == grpcdemo_pb2.DataType.Value("INTEGER"):
+
dtype = check_data_type(dtype, np.int32)
+
elif chunk_mat.data_type == grpcdemo_pb2.DataType.Value("DOUBLE"):
+
dtype = check_data_type(dtype, np.float64)
+
+
# Check the size of the incoming matrix
+
size = check_size(
+
size,
+
(
+
chunk_mat.matrix_rows,
+
chunk_mat.matrix_cols,
+
),
+
)
+
+
# Parse the chunk
+
if matrix is None:
+
matrix = np.frombuffer(chunk_mat.matrix_as_chunk, dtype=dtype)
+
else:
+
tmp = np.frombuffer(chunk_mat.matrix_as_chunk, dtype=dtype)
+
matrix = np.concatenate((matrix, tmp))
+
+
# Check if the final matrix has the desired size
+
if matrix.size != size[0] * size[1]:
+
raise RuntimeError("Problems reading client full Matrix message...")
+
else:
+
# If everything is fine, append to matrix_list
+
matrix = np.reshape(matrix, size)
+
matrix_list.append(matrix)
+
+
# Return the input matrix list (as a list of numpy.ndarray)
+
return dtype, size, matrix_list
+
+
def _read_client_metadata(self, context):
+
"""Return the metadata as a dictionary.
+
+
Parameters
+
----------
+
context : grpc.ServicerContext
+
gRPC-specific information.
+
+
Returns
+
-------
+
dict
+
Python-readable metadata in dictionary form.
+
"""
+
metadata = context.invocation_metadata()
+
metadata_dict = {}
+
for c in metadata:
+
metadata_dict[c.key] = c.value
+
+
return metadata_dict
+
+
def _generate_md(self, message_type: str, abbrev: str, *args: np.ndarray):
+
"""Generate the server metadata sent to the client and determine the number of chunks in which to decompose each message.
+
+
Parameters
+
----------
+
message_type : str
+
Type of message being sent. Options are``vectors`` and ``matrices``.
+
abbrev : str
+
Abbreviated form of the message being sent. Options are ``vec`` and ``mat``.
+
+
Returns
+
-------
+
list[tuple], list[list[int]]
+
Metadata to be sent by the server and the chunk indices for the list
+
of messages to send.
+
+
Raises
+
------
+
RuntimeError
+
In case of an invalid use of this function.
+
"""
+
# Initialize the metadata and the chunks list for each full message
+
md = []
+
chunks = []
+
+
# Find how many arguments are to be transmitted
+
md.append(("full-" + message_type, str(len(args))))
+
+
# Loop over all input arguments
+
idx = 1
+
for arg in args:
+
# Check the size of the arrays
+
# If size is surpassed, determine chunks needed
+
if arg.nbytes > constants.MAX_CHUNKSIZE:
+
# Let us determine how many chunks we will need
+
#
+
# Max amount of elements per chunk
+
max_elems = constants.MAX_CHUNKSIZE // arg.itemsize
+
+
# Bulk number of chunks needed
+
bulk_chunks = arg.size // max_elems
+
+
# The remainder amount of elements (if any)
+
remainder = arg.size % max_elems
+
+
# This list provides the last index up to which to
+
# process in each partial vector or matrix message
+
last_idx_chunk = []
+
for i in range(1, bulk_chunks + 1):
+
last_idx_chunk.append(i * max_elems)
+
+
# Take into account that if there is a remainder,
+
# include one last partial vector or matrix message.
+
if remainder != 0:
+
last_idx_chunk.append(arg.size)
+
+
# Append the results
+
md.append((abbrev + str(idx) + "-messages", str(len(last_idx_chunk))))
+
chunks.append(last_idx_chunk)
+
+
else:
+
# Otherwise dealing with a single message.. Append results.
+
md.append((abbrev + str(idx) + "-messages", str(1)))
+
chunks.append([arg.size])
+
+
# Increase idx by 1
+
idx += 1
+
+
# Return the metadata and the chunks list for each vector or matrix
+
return md, chunks
+
+
def _send_vectors(self, context: grpc.ServicerContext, *args: np.ndarray):
+
"""Send the response vector messages.
+
+
Parameters
+
----------
+
context : grpc.ServicerContext
+
gRPC context.
+
args : np.ndarray
+
Variable size of np.arrays to transmit.
+
+
Yields
+
------
+
grpcdemo_pb2.Vector
+
Vector messages streamed (full or partial, depending on the metadata)
+
"""
+
+
# Generate the metadata and info on the chunks
+
md, chunks = self._generate_md("vectors", "vec", *args)
+
+
# Send the initial metadata
+
context.send_initial_metadata(md)
+
+
# Loop over all input arguments
+
for arg, vector_chunks in zip(args, chunks):
+
# Loop over the chunk indices
+
processed_idx = 0
+
for last_idx_chunk in vector_chunks:
+
# Use tmp_idx in yield function and update the processed_idx afterwards
+
tmp_idx = processed_idx
+
processed_idx = last_idx_chunk
+
+
# Yield!
+
yield grpcdemo_pb2.Vector(
+
data_type=constants.NP_DTYPE_TO_DATATYPE[arg.dtype.type],
+
vector_size=arg.shape[0],
+
vector_as_chunk=arg[tmp_idx:last_idx_chunk].tobytes(),
+
)
+
+
def _send_matrices(self, context: grpc.ServicerContext, *args: np.ndarray):
+
"""Sending the response matrix messages.
+
+
Parameters
+
----------
+
context : grpc.ServicerContext
+
gRPC context.
+
args : np.ndarray
+
Variable size of np.arrays to transmit.
+
+
Yields
+
------
+
grpcdemo_pb2.Matrix
+
Matrix messages streamed (full or partial, depending on the metadata)
+
"""
+
+
# Generate the metadata and info on the chunks
+
md, chunks = self._generate_md("matrices", "mat", *args)
+
+
# Send the initial metadata
+
context.send_initial_metadata(md)
+
+
# Loop over all input arguments
+
for arg, matrix_chunks in zip(args, chunks):
+
# Since we are dealing with matrices, ravel it to a 1D array (avoids copy)
+
arg_as_vec = arg.ravel()
+
+
# Loop over the chunk indices
+
processed_idx = 0
+
for last_idx_chunk in matrix_chunks:
+
# Use tmp_idx in yield function and update the processed_idx afterwards
+
tmp_idx = processed_idx
+
processed_idx = last_idx_chunk
+
+
# Yield!
+
yield grpcdemo_pb2.Matrix(
+
data_type=constants.NP_DTYPE_TO_DATATYPE[arg.dtype.type],
+
matrix_rows=arg.shape[0],
+
matrix_cols=arg.shape[1],
+
matrix_as_chunk=arg_as_vec[tmp_idx:last_idx_chunk].tobytes(),
+
)
+
+
+
+# =================================================================================================
+# SERVING METHODS for Server operations
+# =================================================================================================
+
+
+
+
[docs]
+
def serve():
+
"""Deploy the API Eigen Example server."""
+
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+
grpcdemo_pb2_grpc.add_GRPCDemoServicer_to_server(GRPCDemoServicer(), server)
+
server.add_insecure_port("[::]:50051")
+
server.start()
+
server.wait_for_termination()
+
+
+
+if __name__ == "__main__":
+ logging.basicConfig()
+ serve()
+
+
+