From 8ccc9e7ebecb1729375c5bc919706f98284eb1e2 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Mon, 7 Aug 2023 10:51:19 -0500 Subject: [PATCH 01/34] Redesign communication client types Redesigning and renaming abstraction, making it more focused on just the transport layer communication handling, rather than general service client behavior; also extending new type with a subtype that adds SSL context security. Making RequestClient concrete but generic. Add ConnectionContextClient as an abstract transport client than maintains a connection via an async managed context, and (re)implementing WebSocketClient using the new (secure) transport client with a context managed connection. Update specialized clients to extend RequestClient Fix ExternalClient with changes. --- .../dmod/communication/client.py | 741 +++++++++--------- 1 file changed, 353 insertions(+), 388 deletions(-) diff --git a/python/lib/communication/dmod/communication/client.py b/python/lib/communication/dmod/communication/client.py index 571f0f97d..e8059c40d 100644 --- a/python/lib/communication/dmod/communication/client.py +++ b/python/lib/communication/dmod/communication/client.py @@ -12,8 +12,7 @@ import websockets -from .maas_request import ExternalRequest, ExternalRequestResponse, ModelExecRequest, ModelExecRequestResponse, NWMRequest, \ - NGENRequest +from .maas_request import ExternalRequest, ExternalRequestResponse from .message import AbstractInitRequest, Message, Response, InitRequestResponseReason from .partition_request import PartitionRequest, PartitionResponse from .dataset_management_message import DatasetManagementMessage, DatasetManagementResponse @@ -34,9 +33,7 @@ EXTERN_REQ_M = TypeVar("EXTERN_REQ_M", bound=ExternalRequest) EXTERN_REQ_R = TypeVar("EXTERN_REQ_R", bound=ExternalRequestResponse) -MOD_EX_M = TypeVar("MOD_EX_M", bound=ModelExecRequest) -MOD_EX_R = TypeVar("MOD_EX_R", bound=ModelExecRequestResponse) - +CONN = TypeVar("CONN") def get_or_create_eventloop() -> AbstractEventLoop: """ @@ -58,13 +55,14 @@ def get_or_create_eventloop() -> AbstractEventLoop: raise -class AbstractClient(ABC): +class TransportLayerClient(ABC): """ - Abstract client capable of securely communicating with a server at some endpoint. + Abstract client capable of communicating with a server at some endpoint. - Abstract client with an interface for securely sending data to a server at some endpoint. The interface function - for this behavior supports optionally waiting for and returning a raw response. Alternatively, the type provides an - interface for receiving a response from the server independently. + Abstract client for interacting with a service at the OSI transport layer. It provides an interface for sending + data to accept data and send this data to a server at some endpoint. The interface function for this behavior + supports optionally waiting for and returning a raw data response. Alternatively, the type provides a function for + receiving a response from the server independently. """ def __init__(self, endpoint_uri: str, *args, **kwargs): """ @@ -115,23 +113,44 @@ async def async_recv(self) -> str: """ pass + +class SSLSecuredTransportLayerClient(TransportLayerClient, ABC): + """ + Abstract ::class:`TransportLayerClient` capable securing its communications using an ::class:`SSLContext`. + """ + def __init__(self, ssl_directory: Path, *args, **kwargs): + super().__init__(*args, **kwargs) + + self._ssl_directory = ssl_directory + """Path: The parent directory of the cert PEM file used for the client SSL context.""" + + # Setup this as a property to allow more private means to override the actual filename of the cert PEM file + self._client_ssl_context = None + """ssl.SSLContext: The private field for the client SSL context property.""" + + self._cert_pem_file_basename: str = 'certificate.pem' + """str: The basename of the certificate PEM file to use.""" + @property - @abstractmethod def client_ssl_context(self) -> ssl.SSLContext: """ - Get the client SSL context property. + Get the client SSL context property, lazily instantiating if necessary. Returns ------- ssl.SSLContext The client SSL context for secure connections. """ - pass + if self._client_ssl_context is None: + self._client_ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + endpoint_pem = self._ssl_directory.joinpath(self._cert_pem_file_basename) + self.client_ssl_context.load_verify_locations(endpoint_pem) + return self._client_ssl_context -class ExternalClient(AbstractClient, ABC): +class ExternalClient(ABC): """ - Abstract client encapsulating the logic for using external connections secured using sessions. + Abstraction encapsulating the logic for using external connections secured using sessions. Abstract client type that requires connections that work using secure sessions. It is able to serialize session details to a file and, by default, load them from this file if appropriate. @@ -290,15 +309,8 @@ def _check_if_new_session_needed(self, use_current_values: bool = True) -> bool: else: return True - # TODO: ... async def authenticate(self, cached_session_file: Optional[Path] = None): - #async with websockets.connect(self.endpoint_uri, ssl=self.client_ssl_context) as websocket: - #async with websockets.connect(self.maas_endpoint_uri) as websocket: - # return await EditView._authenticate_over_websocket(websocket) - # Right now, it doesn't matter as long as it is valid - # TODO: Fix this to not be ... fixed ... - json_as_dict = {'username': 'someone', 'user_secret': 'something'} - response_txt = await self.async_send(data=json.dumps(json_as_dict), await_response=True) + response_txt = await self.request_auth_from_service() try: if cached_session_file is not None and not cached_session_file.is_dir() \ and cached_session_file.parent.is_dir(): @@ -321,6 +333,18 @@ def parse_session_auth_text(self, auth_text: str): maas_session_created = auth_response['data']['created'] return maas_session_id, maas_session_secret, maas_session_created + @abstractmethod + async def request_auth_from_service(self) -> str: + """ + Prepare and send authentication to service, and return the raw response. + + Returns + ------- + str + The raw response to sending a request for authentication. + """ + pass + @property def session_created(self): return self._session_created @@ -334,65 +358,210 @@ def session_secret(self): return self._session_secret -class WebSocketClient(AbstractClient, ABC): +class RequestClient(Generic[M, R]): """ - Abstract subtype of ::class:`AbstractClient` that specifically works over websocket connections. + Simple, generic DMOD service client, dealing with some type of DMOD request message and response objects. - An abstract websocket-based implementation of ::class:`AbstractClient`. Instances are also async context managers - for runtime contexts that handle websocket connections, with the manager function returning the instance itself. + Generic client type for interaction with a DMOD service. Its primary function accepts some DMOD request message + object, makes submits a request to the service by relaying the aforementioned object, and receives/returns the + response as an object of the corresponding type. The underlying communication is handled by way of a + ::class:`TransportLayerClient` supplied during initialization. - A new runtime context will check whether there is an open websocket connection already and open a connection if not. - In all cases, it maintains an instance attribute that is a counter of the number of active usages of the connection - (i.e., the number of separate, active contexts). When the context is exited, the instance's active usage counter is - reduced by one and, if that context represents the last active use of the connection, the connection object is - closed and then has its reference removed. - - The ::method:`async_send` and ::method:`async_recv` functions can be used without already being in an active context - (i.e., they will enter a new context for the scope of the function). However, within in an already open context, - calls to ::method:`async_send` and ::method:`async_recv` can be used as needed to support arbitrarily communication - over the websocket. + This type is relatively simple in that a particular instance deals with a strict request/response pair. Its + functions are thus implemented to sanity check the types received - both as argument and in communication from the + service - and raise exceptions if they are of unexpected types. """ - @classmethod - def build_endpoint_uri(cls, host: str, port: Union[int, str], path: Optional[str] = None, is_secure: bool = True): - proto = 'wss' if is_secure else 'ws' - if path is None: - path = '' - else: - path = path.strip() - if path[0] != '/': - path = '/' + path - return '{}://{}:{}{}'.format(proto, host.strip(), str(port).strip(), path) - - def __init__(self, ssl_directory: Path, *args, **kwargs): + def __init__(self, transport_client: TransportLayerClient, request_type: Type[M], response_type: Type[R], *args, + **kwargs): """ - Initialize this instance. + Initialize. Parameters ---------- - ssl_directory + transport_client : TransportLayerClient + The client for handling the underlying raw OSI transport layer communications with the service. args kwargs + """ + self._transport_client = transport_client + self._request_type: Type[M] = request_type + self._response_type: Type[R] = response_type - Other Parameters + @property + def response_type(self) -> Type[R]: + """ + The response subtype class appropriate for this client implementation. + + Returns + ------- + Type[R] + The response subtype class appropriate for this client implementation. + """ + return self._response_type + + @property + def request_type(self) -> Type[M]: + """ + Return the request message subtype class appropriate for this client implementation. + + Returns + ------- + Type[M] + The request message subtype class appropriate for this client implementation. + """ + return self._request_type + + def _process_request_response(self, response_str: str) -> R: + """ + Process the serial form of a response returned by ::method:`async_send` into a response object. + + Parameters ---------- - endpoint_uri : str - The endpoint for the client to connect to when opening a connection, for superclass init. + response_str : str + The string returned by a request made via ::method:`async_send`. + + Returns + ------- + R + The inflated response object. + + See Also + ------- + async_send """ - super().__init__(*args, **kwargs) + response_type = self.response_type + my_class_name = self.__class__.__name__ + response_json = {} + try: + # Consume the response confirmation by deserializing first to JSON, then from this to a response object + response_json = json.loads(response_str) + try: + response_object = response_type.factory_init_from_deserialized_json(response_json) + if response_object is None: + msg = '********** {} could not deserialize {} from raw websocket response: `{}`'.format( + my_class_name, response_type.__name__, str(response_str)) + reason = '{} Could Not Deserialize To {}'.format(my_class_name, response_type.__name__) + response_object = self.build_response(success=False, reason=reason, message=msg, data=response_json) + except Exception as e2: + msg = '********** While deserializing {}, {} encountered {}: {}'.format( + response_type.__name__, my_class_name, e2.__class__.__name__, str(e2)) + reason = '{} {} Deserializing {}'.format(my_class_name, e2.__class__.__name__, response_type.__name__) + response_object = self.build_response(success=False, reason=reason, message=msg, data=response_json) + except Exception as e: + reason = 'Invalid JSON Response' + msg = 'Encountered {} loading response to JSON: {}'.format(e.__class__.__name__, str(e)) + response_object = self.build_response(success=False, reason=reason, message=msg, data=response_json) - self._ssl_directory = ssl_directory - """Path: The parent directory of the cert PEM file used for the client SSL context.""" + if not response_object.success: + logging.error(response_object.message) + logging.debug('************* {} returning {} object {}'.format(self.__class__.__name__, response_type.__name__, + response_object.to_json())) + return response_object - # Setup this as a property to allow more private means to override the actual filename of the cert PEM file - self._client_ssl_context = None - """ssl.SSLContext: The private field for the client SSL context property.""" + async def async_make_request(self, message: M) -> R: + """ + Async send a request message object and return the received response. - self._cert_pem_file_basename: str = 'certificate.pem' - """str: The basename of the certificate PEM file to use.""" + Send (within Python's async functionality) the appropriate type of request :class:`Message` for this client + implementation type and return the response as a corresponding, appropriate :class:`Response` instance. + + Parameters + ---------- + message : M + the request message object + + Returns + ------- + R + the request response object + """ + if not isinstance(message, self.request_type): + reason = f'{self.__class__.__name__} Received Unexpected Type {message.__class__.__name__}' + msg = f'{self.__class__.__name__} received unexpected {message.__class__.__name__} ' \ + f'instance as request, rather than a {self.request_type.__name__} instance; not submitting' + logger.error(msg) + return self.build_response(success=False, reason=reason, message=msg) + + response_json = {} + try: + # Send the request and get the service response + serialized_response = await self._transport_client.async_send(data=str(message), await_response=True) + if serialized_response is None: + raise ValueError(f'Serialized response from {self.__class__.__name__} async message was `None`') + except Exception as e: + reason = f'{self.__class__.__name__} Send {message.__class__.__name__} Failure ({e.__class__.__name__})' + msg = f'{self.__class__.__name__} raised {e.__class__.__name__} sending {message.__class__.__name__}: ' \ + f'{str(e)}' + logger.error(msg) + return self.build_response(success=False, reason=reason, message=msg, data=response_json) - self.connection: typing.Optional[websockets.WebSocketClientProtocol] = None - """Optional[websockets.client.Connect]: The open websocket connection, if set, for this client's context.""" + assert isinstance(serialized_response, str) + return self._process_request_response(serialized_response) + + def build_response(self, success: bool, reason: str, message: str = '', data: Optional[dict] = None, + **kwargs) -> R: + """ + Build a response of the appropriate subtype from the given response details. + + Build a response of the appropriate subtype for this particular implementation, using the given parameters for + this function as the initialization params for the response. Per the design of ::class:`Response`, the primary + attributes are ::attribute:`Response.success`, ::attribute:`Response.reason`, ::attribute:`Response.message`, + and ::attribute:`Response.data`. However, implementations may permit or require additional param values, which + can be supplied via keyword args. + + As with the init of ::class:`Request`, defaults of ``''`` (empty string) and ``None`` are in place for for + ``message`` and ``data`` respectively. + + A default implementation is provided that initializes an instance of the type return by + ::method:`get_response_subtype`. Keyword args are not used in this default implementation. + + Parameters + ---------- + success : bool + The value for ::attribute:`Response.success` to use when initializing the response object. + reason : str + The value for ::attribute:`Response.reason` to use when initializing the response object. + message : str + The value for ::attribute:`Response.message` to use when initializing the response object (default: ``''``). + data : dict + The value for ::attribute:`Response.data` to use when initializing the response object (default: ``None``). + kwargs : dict + A dict for any additional implementation specific init params for the response object. + + Returns + ------- + R + A response object of the appropriate subtype. + """ + return self.response_type(success=success, reason=reason, message=message, data=data) + + +class ConnectionContextClient(Generic[CONN], TransportLayerClient, ABC): + """ + Transport client subtype that maintains connections via an async managed contexts. + + Instances of this type will increment an active connections counter upon entering the context. If the counter was + at ``0``, a new connection will be opened using ::method:`_establish_connection` and assigned to + ::attribute:`connection`. The reverse happens on context close, with ::method:`_close_connection` being used to + close the connection once the counter is ``0`` again. + + Subtypes should provide implementations for ::method:`_establish_connection` and ::method:`_close_connection`. + + Implementations of ::method:`async_send` and ::method:`async_recv` functions are provided. They can be used without + already being in an active context (i.e., they will enter a new context for the scope of the function). However, + within in an already open context, calls to ::method:`async_send` and ::method:`async_recv` can be used as needed to + support arbitrarily communication over the websocket. + + The ::method:`async_send` and ::method:`async_recv` implementations depend on ::method:`_connection_send` and + ::method:`_connection_recv`, which must be provided by subtypes. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self._connection: typing.Optional[CONN] = None + """Optional[CONN]: The open connection, if set, for this client's context.""" self._opening_connection = False """bool: Whether some task is in the process of opening a new connection in the context, but is awaiting.""" @@ -415,12 +584,12 @@ async def __aenter__(self): # Safely conclude at this point that nothing else (worth paying attention to) is in the middle of opening a # connection, so check whether there already is one ... - if self.connection is None: + if self._connection is None: # If not, mark that this exec is opening a connection, before giving up control during the await self._opening_connection = True # Then asynchronously open the connection ... try: - self.connection = await websockets.connect(self.endpoint_uri, ssl=self.client_ssl_context) + self._connection = await self._establish_connection() except Exception as e: raise e # And now, note that we are no longer in the middle of an attempt to open a connection @@ -435,221 +604,176 @@ async def __aexit__(self, *exc_info): """ self.active_connections -= 1 if self.active_connections < 1: - await self.connection.close() - self.connection = None + await self._close_connection() + self._connection = None self.active_connections = 0 - async def async_send(self, data: Union[str, bytearray], await_response: bool = False): + @abstractmethod + async def _connection_recv(self) -> Optional[str]: """ - Send data to websocket, by default returning immediately, but optionally receiving and returning response. + Perform operations to receive data over already opened ::attribute:`connection`. - The function will cause the runtime context to be entered, opening a connection if needed. In such cases, - the connection will also be closed at the conclusion of this function. + Returns + ------- + Optional[str] + Data received over already opened ::attribute:`connection`. + """ + pass + + @abstractmethod + async def _connection_send(self, data: Union[str, bytearray]): + """ + Perform operations to send data over already opened ::attribute:`connection`. Parameters ---------- - data: Optional[str] + data The data to send. - await_response - Whether the method should also await a response on the websocket connection and return it. + """ + pass - Returns - ------- - Optional[str] - The response to the sent data, if one should be awaited; otherwise ``None``. + @abstractmethod + async def _close_connection(self): """ - async with self as websocket: - #TODO ensure correct type for data??? - await websocket.connection.send(data) - return await websocket.connection.recv() if await_response else None + Close the managed context's established connection. + """ + pass - async def listen(self) -> typing.Union[str, bytes]: + @abstractmethod + async def _establish_connection(self) -> CONN: """ - Waits for a message through the websocket connection + Establish a connection for the managed context. - Returns: - A string for data sent through the socket as a string and bytes for data sent as binary + Returns + ------- + CONN + A newly established connection. """ - async with self as websocket: - return await websocket.connection.recv() + pass - @abstractmethod - async def async_make_request(self, message: Message) -> Response: + async def async_send(self, data: Union[str, bytearray], await_response: bool = False): """ - Send (within Python's async functionality) the appropriate type of request :class:`Message` for this client - implementation type and return the response as a corresponding, appropriate :class:`Response` instance. + Send data over connection, by default returning immediately, but optionally receiving and returning response. + + The function will cause the runtime context to be entered, opening a connection if needed. In such cases, + the connection will also be closed at the conclusion of this function. Parameters ---------- - message - the request message object + data: Optional[str] + The data to send. + await_response + Whether the method should also await a response on the connection and return it. Returns ------- - response - the request response object + Optional[str] + The response to the sent data, if one should be awaited; otherwise ``None``. """ - pass + async with self as connection_owner: + await connection_owner._connection_send(data) + return await connection_owner._connection_recv() if await_response else None async def async_recv(self) -> Union[str, bytes]: """ - Receive data over the websocket connection. + Receive data over the connection. Returns ------- Union[str, bytes] The data received over the connection. """ - with self as websocket: - return await websocket.connection.recv() + with self as connection_owner: + return await connection_owner._connection_recv() @property - def client_ssl_context(self) -> ssl.SSLContext: - """ - Get the client SSL context property, lazily instantiating if necessary. - - Returns - ------- - ssl.SSLContext - the client SSL context for secure connections - """ - if self._client_ssl_context is None: - self._client_ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) - endpoint_pem = self._ssl_directory.joinpath(self._cert_pem_file_basename) - self.client_ssl_context.load_verify_locations(endpoint_pem) - return self._client_ssl_context + def connection(self) -> Optional[CONN]: + return self._connection -class InternalServiceClient(WebSocketClient, Generic[M, R], ABC): +class WebSocketClient(SSLSecuredTransportLayerClient, ConnectionContextClient[websockets.WebSocketClientProtocol]): """ - Abstraction for a client that interacts with some internal, non-public-facing DMOD service. + Subtype of ::class:`SSLSecuredTransportLayerClient` that specifically works over SSL-secured websocket connections. + + A websocket-based implementation of ::class:`SSLSecuredTransportLayerClient`. Instances are also async context + managers for runtime contexts that handle websocket connections, with the manager function returning the instance + itself. + + A new runtime context will check whether there is an open websocket connection already and open a connection if not. + In all cases, it maintains an instance attribute that is a counter of the number of active usages of the connection + (i.e., the number of separate, active contexts). When the context is exited, the instance's active usage counter is + reduced by one and, if that context represents the last active use of the connection, the connection object is + closed and then has its reference removed. + + The ::method:`async_send` and ::method:`async_recv` functions can be used without already being in an active context + (i.e., they will enter a new context for the scope of the function). However, within in an already open context, + calls to ::method:`async_send` and ::method:`async_recv` can be used as needed to support arbitrarily communication + over the websocket. """ @classmethod - @abstractmethod - def get_response_subtype(cls) -> Type[R]: + def build_endpoint_uri(cls, host: str, port: Union[int, str], path: Optional[str] = None, is_secure: bool = True): + proto = 'wss' if is_secure else 'ws' + if path is None: + path = '' + else: + path = path.strip() + if path[0] != '/': + path = '/' + path + return '{}://{}:{}{}'.format(proto, host.strip(), str(port).strip(), path) + + async def _connection_recv(self) -> Optional[str]: """ - Return the response subtype class appropriate for this client implementation. + Perform operations to receive data over already opened ::attribute:`connection`. Returns ------- - Type[R] - The response subtype class appropriate for this client implementation. + Optional[str] + Data received over already opened ::attribute:`connection`. """ - pass + return await self.connection.recv() - def build_response(self, success: bool, reason: str, message: str = '', data: Optional[dict] = None, - **kwargs) -> R: + @abstractmethod + async def _connection_send(self, data: Union[str, bytearray]): """ - Build a response of the appropriate subtype from the given response details. - - Build a response of the appropriate subtype for this particular implementation, using the given parameters for - this function as the initialization params for the response. Per the design of ::class:`Response`, the primary - attributes are ::attribute:`Response.success`, ::attribute:`Response.reason`, ::attribute:`Response.message`, - and ::attribute:`Response.data`. However, implementations may permit or require additional param values, which - can be supplied via keyword args. - - As with the init of ::class:`Request`, defaults of ``''`` (empty string) and ``None`` are in place for for - ``message`` and ``data`` respectively. - - A default implementation is provided that initializes an instance of the type return by - ::method:`get_response_subtype`. Keyword args are not used in this default implementation. + Perform operations to send data over already opened ::attribute:`connection`. Parameters ---------- - success : bool - The value for ::attribute:`Response.success` to use when initializing the response object. - reason : str - The value for ::attribute:`Response.reason` to use when initializing the response object. - message : str - The value for ::attribute:`Response.message` to use when initializing the response object (default: ``''``). - data : dict - The value for ::attribute:`Response.data` to use when initializing the response object (default: ``None``). - kwargs : dict - A dict for any additional implementation specific init params for the response object. - - Returns - ------- - R - A response object of the appropriate subtype. + data + The data to send. """ - return self.get_response_subtype()(success=success, reason=reason, message=message, data=data) - - def _process_request_response(self, response_str: str): - response_type = self.get_response_subtype() - my_class_name = self.__class__.__name__ - response_json = {} - try: - # Consume the response confirmation by deserializing first to JSON, then from this to a response object - response_json = json.loads(response_str) - try: - response_object = response_type.factory_init_from_deserialized_json(response_json) - if response_object is None: - msg = '********** {} could not deserialize {} from raw websocket response: `{}`'.format( - my_class_name, response_type.__name__, str(response_str)) - reason = '{} Could Not Deserialize To {}'.format(my_class_name, response_type.__name__) - response_object = self.build_response(success=False, reason=reason, message=msg, data=response_json) - except Exception as e2: - msg = '********** While deserializing {}, {} encountered {}: {}'.format( - response_type.__name__, my_class_name, e2.__class__.__name__, str(e2)) - reason = '{} {} Deserializing {}'.format(my_class_name, e2.__class__.__name__, response_type.__name__) - response_object = self.build_response(success=False, reason=reason, message=msg, data=response_json) - except Exception as e: - reason = 'Invalid JSON Response' - msg = 'Encountered {} loading response to JSON: {}'.format(e.__class__.__name__, str(e)) - response_object = self.build_response(success=False, reason=reason, message=msg, data=response_json) + await self.connection.send(data) - if not response_object.success: - logging.error(response_object.message) - logging.debug('************* {} returning {} object {}'.format(self.__class__.__name__, response_type.__name__, - response_object.to_json())) - return response_object - - async def async_make_request(self, message: M) -> R: + async def _close_connection(self): """ - Async send the given request and return the corresponding response. - - Send (within Python's async functionality) the appropriate type of request :class:`Message` for this client - implementation type and return the response as a corresponding, appropriate :class:`Response` instance. + Close the managed context's established connection. + """ + await self.connection.close() - Parameters - ---------- - message : M - The request message object. + async def _establish_connection(self) -> CONN: + """ + Establish a connection for the managed context. Returns ------- - response : R - The request response object. + CONN + A newly established connection. """ - response_type = self.get_response_subtype() - expected_req_type = response_type.get_response_to_type() - my_class_name = self.__class__.__name__ - req_class_name = message.__class__.__name__ + return await websockets.connect(self.endpoint_uri, ssl=self.client_ssl_context) - if not isinstance(message, expected_req_type): - reason = '{} Received Unexpected Type {}'.format(my_class_name, req_class_name) - msg = '{} received unexpected {} instance as request, rather than a {} instance; not submitting'.format( - my_class_name, req_class_name, expected_req_type.__name__) - logger.error(msg) - return self.build_response(success=False, reason=reason, message=msg) - - response_json = {} - try: - # Send the request and get the service response - serialized_response = await self.async_send(data=str(message), await_response=True) - if serialized_response is None: - raise ValueError('Serialized response from {} async message was `None`'.format(my_class_name)) - except Exception as e: - reason = '{} Send {} Failure ({})'.format(my_class_name, req_class_name, e.__class__.__name__) - msg = '{} encountered {} sending {}: {}'.format(my_class_name, e.__class__.__name__, req_class_name, str(e)) - logger.error(msg) - return self.build_response(success=False, reason=reason, message=msg, data=response_json) + async def listen(self) -> typing.Union[str, bytes]: + """ + Waits for a message through the websocket connection - return self._process_request_response(serialized_response) + Returns: + A string for data sent through the socket as a string and bytes for data sent as binary + """ + async with self as websocket: + return await websocket.connection.recv() -class SchedulerClient(InternalServiceClient[SchedulerRequestMessage, SchedulerRequestResponse]): +class SchedulerClient(RequestClient[SchedulerRequestMessage, SchedulerRequestResponse]): @classmethod def get_response_subtype(cls) -> Type[SchedulerRequestResponse]: @@ -678,7 +802,7 @@ async def async_send_update(self, message: UpdateMessage) -> UpdateMessageRespon response_json = {} serialized_response = None try: - serialized_response = await self.async_send(data=str(message), await_response=True) + serialized_response = await self._transport_client.async_send(data=str(message), await_response=True) if serialized_response is None: raise ValueError('Response from {} async update message was `None`'.format(self.__class__.__name__)) response_object = UpdateMessageResponse.factory_init_from_deserialized_json(json.loads(serialized_response)) @@ -692,70 +816,8 @@ async def async_send_update(self, message: UpdateMessage) -> UpdateMessageRespon return UpdateMessageResponse(digest=message.digest, object_found=False, success=False, reason=reason, response_text='None' if serialized_response is None else serialized_response) - async def get_results(self): - logging.debug('************* {} preparing to yield results'.format(self.__class__.__name__)) - async for message in self.connection: - logging.debug('************* {} yielding result: {}'.format(self.__class__.__name__, str(message))) - yield message - - -class ExternalRequestClient(ExternalClient, WebSocketClient, Generic[EXTERN_REQ_M, EXTERN_REQ_R], ABC): - - @staticmethod - def _request_failed_due_to_expired_session(response_obj: EXTERN_REQ_R): - """ - Test if request failed due to an expired session. - - Test if the response to a websocket-sent request failed specifically because the utilized session is consider to - be expired, either because the session is explicitly expired or there is no longer a record of the session with - the session secret in the init request (i.e., it is implicitly expired). - - Parameters - ---------- - response_obj - Returns - ------- - bool - whether a failure occur and it specifically was due to a lack of authorization over the used session - """ - is_expired = response_obj.reason_enum == InitRequestResponseReason.UNRECOGNIZED_SESSION_SECRET - is_expired = is_expired or response_obj.reason_enum == InitRequestResponseReason.EXPIRED_SESSION - return response_obj is not None and not response_obj.success and is_expired - - @classmethod - def _run_validation(cls, message: Union[EXTERN_REQ_M, EXTERN_REQ_R]): - """ - Run validation for the given message object using the appropriate validator subtype. - - Parameters - ---------- - message - The message to validate, which will be either a ``ExternalRequest`` or a ``ExternalRequestResponse`` subtype. - - Returns - ------- - tuple - A tuple with the first item being whether or not the message was valid, and the second being either None or - the particular error that caused the message to be identified as invalid - - Raises - ------- - RuntimeError - Raised if the message is of a particular type for which there is not a supported validator type configured. - """ - if message is None: - return False, None - elif isinstance(message, NWMRequest): - is_valid, error = NWMRequestJsonValidator().validate(message.to_dict()) - return is_valid, error - elif isinstance(message, NGENRequest): - is_valid, error = NWMRequestJsonValidator().validate(message.to_dict()) - return is_valid, error - elif isinstance(message, Serializable): - return message.__class__.factory_init_from_deserialized_json(message.to_dict()) == message, None - else: - raise RuntimeError('Unsupported ExternalRequest subtype: ' + str(message.__class__)) +class ExternalRequestClient(RequestClient[EXTERN_REQ_M, EXTERN_REQ_R], ExternalClient, ABC): def __init__(self, *args, **kwargs): """ @@ -781,29 +843,11 @@ def __init__(self, *args, **kwargs): self._warnings = None self._info = None - @abstractmethod - def _update_after_valid_response(self, response: EXTERN_REQ_R): - """ - Perform any required internal updates immediately after a request gets back a successful, valid response. - - This provides a way of extending the behavior of this type specifically regarding the ::method:make_maas_request - function. Any updates specific to the type, which should be performed after a request receives back a valid, - successful response object, can be implemented here. - - In the base implementation, no further action is taken. - - See Also - ------- - ::method:make_maas_request - """ - pass - - # TODO: this can probably be taken out, as the superclass implementation should suffice - async def async_make_request(self, request: EXTERN_REQ_M) -> EXTERN_REQ_R: - async with websockets.connect(self.endpoint_uri, ssl=self.client_ssl_context) as websocket: - await websocket.send(request.to_json()) - response = await websocket.recv() - return request.__class__.factory_init_correct_response_subtype(json_obj=json.loads(response)) + async def request_auth_from_service(self) -> str: + # Right now, it doesn't matter as long as it is valid + # TODO: Fix this to not be ... fixed ... + json_as_dict = {'username': 'someone', 'user_secret': 'something'} + return await self._transport_client.async_send(data=json.dumps(json_as_dict), await_response=True) @property def errors(self): @@ -817,63 +861,12 @@ def info(self): def is_new_session(self): return self._is_new_session - def make_maas_request(self, maas_request: EXTERN_REQ_M, force_new_session: bool = False): - request_type_str = maas_request.__class__.__name__ - logger.debug("client Making {} type request".format(request_type_str)) - self._acquire_session_info(force_new=force_new_session) - # Make sure to set if empty or reset if a new session was forced and just acquired - if force_new_session or maas_request.session_secret is None: - maas_request.session_secret = self._session_secret - # If able to get session details, proceed with making a job request - if self._session_secret is not None: - print("******************* Request: " + maas_request.to_json()) - try: - is_request_valid, request_validation_error = self._run_validation(message=maas_request) - if is_request_valid: - try: - response_obj: EXTERN_REQ_R = get_or_create_eventloop().run_until_complete( - self.async_make_request(maas_request)) - print('***************** Response: ' + str(response_obj)) - # Try to get a new session if session is expired (and we hadn't already gotten a new session) - if self._request_failed_due_to_expired_session(response_obj) and not force_new_session: - return self.make_maas_request(maas_request=maas_request, force_new_session=True) - elif not self.validate_maas_request_response(response_obj): - raise RuntimeError('Invalid response received for requested job: ' + str(response_obj)) - elif not response_obj.success: - template = 'Request failed (reason: {}): {}' - raise RuntimeError(template.format(response_obj.reason, response_obj.message)) - else: - self._update_after_valid_response(response_obj) - return response_obj - except Exception as e: - # TODO: log error instead of print - msg_template = 'Encountered error submitting {} over session {} : \n{}: {}' - msg = msg_template.format(request_type_str, str(self._session_id), str(type(e)), str(e)) - print(msg) - traceback.print_exc() - self.errors.append(msg) - else: - msg_template = 'Could not submit invalid MaaS request over session {} ({})' - msg = msg_template.format(str(self._session_id), str(request_validation_error)) - print(msg) - self.errors.append(msg) - except RuntimeError as e: - print(str(e)) - self.errors.append(str(e)) - else: - logger.info("client Unable to aquire session details") - self.errors.append("Unable to acquire session details or authenticate new session for request") - return None - - def validate_maas_request_response(self, maas_request_response: EXTERN_REQ_R): - return self._run_validation(message=maas_request_response)[0] - @property def warnings(self): return self._warnings -class DataServiceClient(InternalServiceClient[DatasetManagementMessage, DatasetManagementResponse]): +class DataServiceClient(RequestClient[DatasetManagementMessage, DatasetManagementResponse]): """ Client for data service communication between internal DMOD services. """ @@ -882,35 +875,7 @@ def get_response_subtype(cls) -> Type[DatasetManagementResponse]: return DatasetManagementResponse -class ModelExecRequestClient(ExternalRequestClient[MOD_EX_M, MOD_EX_R], ABC): - - def __init__(self, endpoint_uri: str, ssl_directory: Path): - super().__init__(endpoint_uri=endpoint_uri, ssl_directory=ssl_directory) - - def _update_after_valid_response(self, response: MOD_EX_R): - """ - Perform any required internal updates immediately after a request gets back a successful, valid response. - - This provides a way of extending the behavior of this type specifically regarding the ::method:make_maas_request - function. Any updates specific to the type, which should be performed after a request receives back a valid, - successful response object, can be implemented here. - - In this implementation, the ::attribute:`info` property is appended to, noting that the job of the given id has - just been started by the scheduler. - - See Also - ------- - ::method:make_maas_request - """ - #self.job_id = self.resp_as_json['data']['job_id'] - #results = self.resp_as_json['data']['results'] - #jobs = self.resp_as_json['data']['all_jobs'] - #self.info.append("Scheduler started job, id {}, results: {}".format(self.job_id, results)) - #self.info.append("All user jobs: {}".format(jobs)) - self.info.append("Scheduler started job, id {}".format(response.data['job_id'])) - - -class PartitionerServiceClient(InternalServiceClient[PartitionRequest, PartitionResponse]): +class PartitionerServiceClient(RequestClient[PartitionRequest, PartitionResponse]): """ A client for interacting with the partitioner service. @@ -931,7 +896,7 @@ def get_response_subtype(cls) -> Type[PartitionResponse]: return PartitionResponse -class EvaluationServiceClient(InternalServiceClient[EvaluationConnectionRequest, EvaluationConnectionRequestResponse]): +class EvaluationServiceClient(RequestClient[EvaluationConnectionRequest, EvaluationConnectionRequestResponse]): """ A client for interacting with the evaluation service """ From 733ab9da7345f19ca79327bbedd56e51e81071ef Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Tue, 8 Aug 2023 21:10:45 -0500 Subject: [PATCH 02/34] Add AuthClient and CachedAuthClient. --- .../dmod/communication/client.py | 363 +++++++++++++++++- 1 file changed, 343 insertions(+), 20 deletions(-) diff --git a/python/lib/communication/dmod/communication/client.py b/python/lib/communication/dmod/communication/client.py index e8059c40d..263beabc9 100644 --- a/python/lib/communication/dmod/communication/client.py +++ b/python/lib/communication/dmod/communication/client.py @@ -358,6 +358,338 @@ def session_secret(self): return self._session_secret +class AuthClient: + """ + Simple client object responsible for handling acquiring and applying authenticated session details to requests. + """ + def __init__(self, transport_client: TransportLayerClient, *args, **kwargs): + self._transport_client: TransportLayerClient = transport_client + # TODO: get full session implementation if possible + self._session_id, self._session_secret, self._session_created = None, None, None + self._force_reauth = False + + def _acquire_session(self) -> bool: + """ + Synchronous function to acquire an authenticated session. + + Wrapper convenience function for use outside of the async event loop. + + Returns + ------- + bool + Whether acquiring an authenticated session was successful. + + See Also + ------- + _async_acquire_session + """ + try: + return get_or_create_eventloop().run_until_complete(self._async_acquire_session()) + except Exception as e: + msg = f"{self.__class__.__name__} failed to acquire auth credential due to {e.__class__.__name__}: {str(e)}" + logger.error(msg) + return False + + async def _async_acquire_session(self) -> bool: + """ + Acquire an authenticated session. + + Returns + ------- + bool + Whether acquiring an authenticated session was successful. + """ + # Clear anything previously set when forced reauth + if self.force_reauth: + self._session_id, self._session_secret, self._session_created = None, None, None + self.force_reauth = False + # Otherwise, if we have the session details already, just return True + elif all([self._session_id, self._session_secret, self._session_created]): + return True + + try: + auth_resp = await self._transport_client.async_send(data=json.dumps(self._prepare_auth_request_payload()), + await_response=True) + return self._parse_auth_data(auth_resp) + # In the future, consider whether we should treat ConnectionResetError separately + except Exception as e: + msg = f"{self.__class__.__name__} failed to acquire auth credential due to {e.__class__.__name__}: {str(e)}" + logger.error(msg) + return False + + def _parse_auth_data(self, auth_data_str: str): + """ + Parse serialized authentication data and update instance state accordingly. + + Parse the given serialized authentication data and update the state of the instance accordingly to represent the + successful authentication (assuming the data parses appropriately). This method must support, at minimum, + parsing the text data returned from the service as the response to the authentication payload, + + Note that a return value of ``True`` indicates the instance holds valid authentication details that can be + applied to requests. + + Parameters + ---------- + auth_data_str : str + The data to be parsed, such as that returned in the service response to an authentication payload. + + Returns + ---------- + bool + Whether parsing was successful. + """ + try: + auth_response = json.loads(auth_data_str) + # TODO: consider making sure this parses to a SessionInitResponse + session_id = auth_response['data']['session_id'] + session_secret = auth_response['data']['session_secret'] + session_created = auth_response['data']['created'] + if all((session_id, session_secret, session_created)): + self._session_id, self._session_secret, self._session_created = session_id, session_secret, session_created + return True + else: + return False + except Exception as e: + return False + + def _prepare_auth_request_payload(self) -> dict: + """ + Generate JSON payload to be transmitted by ::method:`async_acquire_session` to service when requesting auth. + + Returns + ------- + dict + The JSON payload to be transmitted by ::method:`async_acquire_session` to the service when requesting auth. + """ + # Right now, it doesn't matter as long as it is valid + # TODO: Fix this to not be ... fixed ... + return {'username': 'someone', 'user_secret': 'something'} + + async def apply_auth(self, external_request: ExternalRequest) -> bool: + """ + Apply appropriate authentication details to this request object, acquiring them first if needed. + + Parameters + ---------- + external_request : ExternalRequest + A request that needs the appropriate session secret applied. + + Returns + ---------- + bool + Whether the secret was obtained and applied successfully. + """ + if await self._async_acquire_session(): + external_request.session_secret = self._session_secret + return True + else: + return False + + @property + def force_reauth(self) -> bool: + """ + Whether the client should be forced to reacquire a new authenticated session from the service. + + Returns + ------- + bool + Whether the client should be forced to re-authenticate and get a new session from the auth service. + """ + return self._force_reauth + + @force_reauth.setter + def force_reauth(self, should_force_new: bool): + self._force_reauth = should_force_new + + @property + def session_created(self) -> str: + return self._session_created + + @property + def session_id(self) -> str: + return self._session_id + + +class CachedAuthClient(AuthClient): + """ + Extension of ::class:`AuthClient` that supports caching the session to a file. + """ + + def __init__(self, session_file: Optional[Path] = None, *args, **kwargs): + """ + Initialize this instance, including creating empty session-related attributes. + + If a ``session_file`` is not given, a default path in the home directory with a timestamp-based name will be + used. If ``session_file`` is a directory, similiarly a timestamp-based default basename will be used for a file + in this directory. + + Parameters + ---------- + session_file : Optional[Path] + Optional specified path to file for a serialized session, both for loading from and saving to. + args + kwargs + + Keyword Args + ---------- + endpoint_uri : str + The endpoint for the client to connect to when opening a connection, for ::class:`RequestClient` + superclass init. + """ + super().__init__(*args, **kwargs) + + self._is_new_session = None + self._force_reload = False + + default_basename = '.{}_session'.format(datetime.datetime.now().strftime('%Y%m%d%H%M%S%s')) + + if session_file is None: + self._cached_session_file = Path.home().joinpath(default_basename) + elif session_file.is_dir(): + self._cached_session_file = session_file.joinpath(default_basename) + else: + self._cached_session_file = session_file + + assert isinstance(self._cached_session_file, Path) + assert self._cached_session_file.is_file() or not self._cached_session_file.exists() + + async def _async_acquire_session(self) -> bool: + """ + Acquire an authenticated session. + + Returns + ------- + bool + Whether acquiring an authenticated session was successful. + """ + if not self._check_if_new_session_needed(): + return True + + try: + auth_resp = await self._transport_client.async_send(data=json.dumps(self._prepare_auth_request_payload()), + await_response=True) + # Execute the call to the parsing function before attempting to write, but don't set the attributes yet + session_attribute_vals_tuple = self._parse_auth_data(auth_resp) + + # Need a nested try block here to control what happens with a failure to cache the session + try: + self._cached_session_file.write_text(auth_resp) + except Exception as inner_e: + # TODO: consider having parameters/attributes to control exactly how this is handled ... + # ... for now just catch and pass so a bad save file doesn't tank us + msg = f"{self.__class__.__name__} successfully authenticated but failed to cache details to file " \ + f"'{str(self._cached_session_file)}' due to {inner_e.__class__.__name__}: {str(inner_e)}" + logger.warning(msg) + pass + + # Wait until after the cache file write section to modify any instance state + self._session_id, self._session_secret, self._session_created = session_attribute_vals_tuple + self.force_reauth = False + self._is_new_session = True + return True + # In the future, consider whether we should treat ConnectionResetError separately + except Exception as e: + msg = f"{self.__class__.__name__} failed to acquire auth credential due to {e.__class__.__name__}: {str(e)}" + logger.error(msg) + return False + + def _check_if_new_session_needed(self) -> bool: + """ + Check if a new session is required, potentially loading a cached session from an implementation-specific source. + + Check whether a new session must be acquired. As a side effect, potentially load a cached session from a source + specific to this type as an alternative to acquiring a new session. + + For the default implementation of this function, the source for a cached session is a serialized session file. + + For a new session to be needed, there must be no other **acceptable** source of authenticated session data. + + If ::attribute:`force_reauth` is set to ``True``, any currently stored session attributes are cleared and the + function returns ``True``. Nothing is loaded from a cached session file. + + If ::attribute:`force_reload` is set to ``True``, any currently stored session attributes are cleared. However, + the function does not return at this point, and instead proceeds with remaining logic. + + The session attributes of this instance subsequently checked for acceptable session data. If at this point they + are all properly set (i.e., non-``None`` and non-empty) and ::attribute:`force_reload` is ``False``, then the + function returns ``False``. + + If any session attributes are not properly set or ::attribute:`force_reload` is ``True``, the function attempts + to load a session from the cached session file. If valid session attributes can be loaded, the function then + returns ``False``. If they could not be loaded, the function will return ``True``, indicating a new session + needs to be acquired. + + The function will return ``False`` IFF all session attributes are non-``None`` and non-empty at the end of the + function's execution. + + Returns + ------- + bool + Whether a new session must be acquired. + """ + # If we need to re-auth, clear any old session data and immediately return True (i.e., new session is needed) + if self.force_reauth: + self._session_id, self._session_secret, self._session_created = None, None, None + return True + + # If we need to reload, also clear any old session data, but this time proceed with the rest of the function + if self.force_reload: + self._session_id, self._session_secret, self._session_created = None, None, None + # Once we force clearing these to ensure a reload is attempted, reset the attribute + self.force_reload = False + # If not set to force a reload, we may already have valid session attributes; short here if so + elif all([self._session_id, self._session_secret, self._session_created]): + return False + + # If there is a cached session file, we will try to load from it + if self._cached_session_file.exists(): + try: + session_id, secret, created = self._parse_auth_data(self._cached_session_file.read_text()) + # Only set if all three read properties are valid + if all([session_id, secret, created]): + self._session_id = session_id + self._session_secret = secret + self._session_created = created + self._is_new_session = False + except Exception as e: + pass + # Return opposite of whether session properties are now set correctly (that would mean don't need a session) + return not all([self._session_id, self._session_secret, self._session_created]) + else: + return True + + @property + def force_reload(self) -> bool: + """ + Whether client should be forced to reload cached auth data on the next call to ::method:`async_acquire_session`. + + Note that this property will be (re)set to ``False`` after the next call to ::method:`async_acquire_session`. + + Returns + ------- + bool + Whether to force reloading cached auth data on the next called to ::method:`async_acquire_session`. + """ + return self._force_reload + + @force_reload.setter + def force_reload(self, should_force_reload: bool): + self._force_reload = should_force_reload + + @property + def is_new_session(self) -> Optional[bool]: + """ + Whether the current session was obtained newly from the service, as opposed to read from cache. + + Returns + ------- + Optional[bool] + Whether the current session was obtained newly from the service, as opposed to read from cache; ``None`` if + no session is yet acquired/loaded. + """ + return self._is_new_session + + class RequestClient(Generic[M, R]): """ Simple, generic DMOD service client, dealing with some type of DMOD request message and response objects. @@ -412,7 +744,7 @@ def request_type(self) -> Type[M]: """ return self._request_type - def _process_request_response(self, response_str: str) -> R: + def _process_request_response(self, response_str: str, response_type: Type[R]) -> R: """ Process the serial form of a response returned by ::method:`async_send` into a response object. @@ -430,8 +762,6 @@ def _process_request_response(self, response_str: str) -> R: ------- async_send """ - response_type = self.response_type - my_class_name = self.__class__.__name__ response_json = {} try: # Consume the response confirmation by deserializing first to JSON, then from this to a response object @@ -439,18 +769,18 @@ def _process_request_response(self, response_str: str) -> R: try: response_object = response_type.factory_init_from_deserialized_json(response_json) if response_object is None: - msg = '********** {} could not deserialize {} from raw websocket response: `{}`'.format( - my_class_name, response_type.__name__, str(response_str)) - reason = '{} Could Not Deserialize To {}'.format(my_class_name, response_type.__name__) + msg = f'********** {self.__class__.__name__} could not deserialize {response_type.__name__} from ' \ + f'raw websocket response: `{str(response_str)}`' + reason = f'{self.__class__.__name__} Could Not Deserialize To {response_type.__name__}' response_object = self.build_response(success=False, reason=reason, message=msg, data=response_json) except Exception as e2: - msg = '********** While deserializing {}, {} encountered {}: {}'.format( - response_type.__name__, my_class_name, e2.__class__.__name__, str(e2)) - reason = '{} {} Deserializing {}'.format(my_class_name, e2.__class__.__name__, response_type.__name__) + msg = f'********** While deserializing {response_type.__name__}, {self.__class__.__name__} ' \ + f'encountered {e2.__class__.__name__}: {str(e2)}' + reason = f'{self.__class__.__name__} {e2.__class__.__name__} Deserializing {response_type.__name__}' response_object = self.build_response(success=False, reason=reason, message=msg, data=response_json) except Exception as e: reason = 'Invalid JSON Response' - msg = 'Encountered {} loading response to JSON: {}'.format(e.__class__.__name__, str(e)) + msg = f'Encountered {e.__class__.__name__} loading response to JSON: {str(e)}' response_object = self.build_response(success=False, reason=reason, message=msg, data=response_json) if not response_object.success: @@ -459,7 +789,7 @@ def _process_request_response(self, response_str: str) -> R: response_object.to_json())) return response_object - async def async_make_request(self, message: M) -> R: + async def async_make_request(self, message: M, expected_response_type: Type[R]) -> R: """ Async send a request message object and return the received response. @@ -476,13 +806,6 @@ async def async_make_request(self, message: M) -> R: R the request response object """ - if not isinstance(message, self.request_type): - reason = f'{self.__class__.__name__} Received Unexpected Type {message.__class__.__name__}' - msg = f'{self.__class__.__name__} received unexpected {message.__class__.__name__} ' \ - f'instance as request, rather than a {self.request_type.__name__} instance; not submitting' - logger.error(msg) - return self.build_response(success=False, reason=reason, message=msg) - response_json = {} try: # Send the request and get the service response @@ -494,10 +817,10 @@ async def async_make_request(self, message: M) -> R: msg = f'{self.__class__.__name__} raised {e.__class__.__name__} sending {message.__class__.__name__}: ' \ f'{str(e)}' logger.error(msg) - return self.build_response(success=False, reason=reason, message=msg, data=response_json) + return expected_response_type(success=False, reason=reason, message=msg, data=response_json) assert isinstance(serialized_response, str) - return self._process_request_response(serialized_response) + return self._process_request_response(serialized_response, expected_response_type) def build_response(self, success: bool, reason: str, message: str = '', data: Optional[dict] = None, **kwargs) -> R: From 3ef01cf2152cda37097e984869e729c1f323cb95 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Tue, 8 Aug 2023 21:28:05 -0500 Subject: [PATCH 03/34] Modify ExternalRequestClient to use AuthClient --- .../dmod/communication/client.py | 48 ++++++++++++------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/python/lib/communication/dmod/communication/client.py b/python/lib/communication/dmod/communication/client.py index 263beabc9..d4a851cae 100644 --- a/python/lib/communication/dmod/communication/client.py +++ b/python/lib/communication/dmod/communication/client.py @@ -1140,9 +1140,9 @@ async def async_send_update(self, message: UpdateMessage) -> UpdateMessageRespon response_text='None' if serialized_response is None else serialized_response) -class ExternalRequestClient(RequestClient[EXTERN_REQ_M, EXTERN_REQ_R], ExternalClient, ABC): +class ExternalRequestClient(RequestClient[EXTERN_REQ_M, EXTERN_REQ_R]): - def __init__(self, *args, **kwargs): + def __init__(self, auth_client: AuthClient, *args, **kwargs): """ Initialize instance. @@ -1153,24 +1153,42 @@ def __init__(self, *args, **kwargs): Other Parameters ---------- - endpoint_uri : str - The client connection endpoint for opening new websocket connections, required for superclass init. - ssl_directory : Path - The directory of the SSL certificate files for the client SSL context. - session_file : Optional[Path] - Optional path to file for a serialized session, both for loading from and saving to. + transport_client: TransportLayerClient + request_type: Type[EXTERN_REQ_M] + response_type: Type[EXTERN_REQ_R] """ super().__init__(*args, **kwargs) + self._auth_client: AuthClient = auth_client + self._errors = None self._warnings = None self._info = None - async def request_auth_from_service(self) -> str: - # Right now, it doesn't matter as long as it is valid - # TODO: Fix this to not be ... fixed ... - json_as_dict = {'username': 'someone', 'user_secret': 'something'} - return await self._transport_client.async_send(data=json.dumps(json_as_dict), await_response=True) + async def async_make_request(self, message: EXTERN_REQ_M, expected_response_type: Type[EXTERN_REQ_R]) -> EXTERN_REQ_R: + """ + Async send a request message object and return the received response. + + Send (within Python's async functionality) the appropriate type of request :class:`Message` for this client + implementation type and return the response as a corresponding, appropriate :class:`Response` instance. + + Parameters + ---------- + message : EXTERN_REQ_R + the request message object + + Returns + ------- + EXTERN_REQ_R + the request response object + """ + if await self._auth_client.apply_auth(message): + return await super().async_make_request(message, expected_response_type) + else: + reason = f'{self.__class__.__name__} Request Auth Failure' + msg = f'{self.__class__.__name__} async_make_request could not apply auth to {message.__class__.__name__}' + logger.error(msg) + return expected_response_type(success=False, reason=reason, message=msg) @property def errors(self): @@ -1180,10 +1198,6 @@ def errors(self): def info(self): return self._info - @property - def is_new_session(self): - return self._is_new_session - @property def warnings(self): return self._warnings From 6b7c6f128a5e63c40008ea3f30a2b09404072d17 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 10 Aug 2023 13:29:59 -0500 Subject: [PATCH 04/34] Remove obsolete ExternalClient. --- .../dmod/communication/client.py | 210 ------------------ 1 file changed, 210 deletions(-) diff --git a/python/lib/communication/dmod/communication/client.py b/python/lib/communication/dmod/communication/client.py index d4a851cae..7683f59b2 100644 --- a/python/lib/communication/dmod/communication/client.py +++ b/python/lib/communication/dmod/communication/client.py @@ -148,216 +148,6 @@ def client_ssl_context(self) -> ssl.SSLContext: return self._client_ssl_context -class ExternalClient(ABC): - """ - Abstraction encapsulating the logic for using external connections secured using sessions. - - Abstract client type that requires connections that work using secure sessions. It is able to serialize session - details to a file and, by default, load them from this file if appropriate. - """ - - def __init__(self, session_file: Optional[Path] = None, *args, **kwargs): - """ - Initialize this instance, including creating empty session-related attributes. - - If a ``session_file`` is not given, a default path in the home directory with a timestamp-based name will be - used. - - Parameters - ---------- - session_file : Optional[Path] - Optional path to file for a serialized session, both for loading from and saving to. - args - kwargs - - Keyword Args - ---------- - endpoint_uri : str - The endpoint for the client to connect to when opening a connection, for ::class:`RequestClient` - superclass init. - """ - super().__init__(*args, **kwargs) - # TODO: get full session implementation if possible - self._session_id, self._session_secret, self._session_created, self._is_new_session = None, None, None, None - if session_file is None: - self._cached_session_file = Path.home().joinpath( - '.{}_session'.format(datetime.datetime.now().strftime('%Y%m%d%H%M%S%s'))) - else: - self._cached_session_file = session_file - - def _acquire_new_session(self): - try: - return get_or_create_eventloop().run_until_complete(self._async_acquire_new_session()) - except Exception as e: - logger.info("Expecting exception to follow") - logger.exception("Failed _acquire_session_info") - return False - - def _acquire_session_info(self, use_current_values: bool = True, force_new: bool = False) -> bool: - """ - Attempt to set the session information properties needed for a secure connection. - - Parameters - ---------- - use_current_values : bool - Whether to use currently held attribute values for session details, if already not None (disregarded if - ``force_new`` is ``True``). - force_new : bool - Whether to force acquiring a new session, regardless of data available is available on an existing session. - - Returns - ------- - bool - Whether session details were acquired and set successfully. - """ - logger.debug("{}._acquire_session_info: getting session info".format(self.__class__.__name__)) - if not force_new and not self._check_if_new_session_needed(use_current_values=use_current_values): - logger.debug('Using previously acquired session details (new session not forced)') - return True - else: - logger.debug("Session from {}}: force_new={}".format(self.__class__.__name__, force_new)) - tmp = self._acquire_new_session() - logger.debug("Session Info Return: {}".format(tmp)) - return tmp - - async def _async_acquire_session_info(self, use_current_values: bool = True, force_new: bool = False) -> bool: - """ - Async attempt to set the session information properties needed for a secure connection. - - Parameters - ---------- - use_current_values : bool - Whether to use currently held attribute values for session details, if already not None (disregarded if - ``force_new`` is ``True``). - force_new : bool - Whether to force acquiring a new session, regardless of data available is available on an existing session. - - Returns - ------- - bool - Whether session details were acquired and set successfully. - """ - if not force_new and not self._check_if_new_session_needed(use_current_values=use_current_values): - logger.debug('Using previously acquired session details (new session not forced)') - return True - else: - tmp = await self._async_acquire_new_session(cached_session_file=self._cached_session_file) - logger.debug("Session Info Return: {}".format(tmp)) - return tmp - - async def _async_acquire_new_session(self, cached_session_file: Optional[Path] = None): - try: - logger.info("Connection to request handler web socket") - auth_details = await self.authenticate(cached_session_file=cached_session_file) - logger.info("auth_details returned") - self._session_id, self._session_secret, self._session_created = auth_details - self._is_new_session = True - return True - except ConnectionResetError as e: - logger.info("Expecting exception to follow") - logger.exception("Failed _acquire_session_info") - return False - except Exception as e: - logger.info("Expecting exception to follow") - logger.exception("Failed _acquire_session_info") - return False - - def _check_if_new_session_needed(self, use_current_values: bool = True) -> bool: - """ - Check if a new session is required, potentially loading a cached session from an implementation-specific source. - - Check whether a new session must be acquired. As a side effect, potentially load a cached session from a source - specific to this type as an alternative to acquiring a new session. - - For the default implementation of this function, the source for a cached session is a serialized session file. - - Loading of a cached session will not be done if ``use_current_values`` is ``True`` and session attributes are - properly set (i.e., non-``None`` and non-empty). Further, loaded cached session details will not be used if any - is empty or ``None``. - - The function will return ``False`` IFF all session attributes are non-``None`` and non-empty at the end of the - function's execution. - - Parameters - ---------- - use_current_values : bool - Whether it is acceptable to use the current values of the instance's session-related attributes, if all such - attributes already have values set. - - Returns - ------- - bool - Whether a new session must be acquired. - """ - # If we should use current values, and current values constitute a valid session, then we do not need a new one - if use_current_values and all([self._session_id, self._session_secret, self._session_created]): - return False - # If there is a cached session file, we will try to load from it - if self._cached_session_file.exists(): - try: - session_id, secret, created = self.parse_session_auth_text(self._cached_session_file.read_text()) - # Only set if all three read properties are valid - if all([session_id, secret, created]): - self._session_id = session_id - self._session_secret = secret - self._session_created = created - except Exception as e: - pass - # Return opposite of whether session properties are now set correctly (that would mean don't need a session) - return not all([self._session_id, self._session_secret, self._session_created]) - # Otherwise (i.e., don't/can't use current session details + no cached file to load), need a new session - else: - return True - - async def authenticate(self, cached_session_file: Optional[Path] = None): - response_txt = await self.request_auth_from_service() - try: - if cached_session_file is not None and not cached_session_file.is_dir() \ - and cached_session_file.parent.is_dir(): - cached_session_file.write_text(response_txt) - except Exception as e: - # TODO: consider logging something here, but for now just handle so a bad save file doesn't tank us - pass - #print('*************** Auth response: ' + json.dumps(response_txt)) - return self.parse_session_auth_text(response_txt) - - @property - def is_new_session(self): - return self._is_new_session - - def parse_session_auth_text(self, auth_text: str): - auth_response = json.loads(auth_text) - # TODO: consider making sure this parses to a SessionInitResponse - maas_session_id = auth_response['data']['session_id'] - maas_session_secret = auth_response['data']['session_secret'] - maas_session_created = auth_response['data']['created'] - return maas_session_id, maas_session_secret, maas_session_created - - @abstractmethod - async def request_auth_from_service(self) -> str: - """ - Prepare and send authentication to service, and return the raw response. - - Returns - ------- - str - The raw response to sending a request for authentication. - """ - pass - - @property - def session_created(self): - return self._session_created - - @property - def session_id(self): - return self._session_id - - @property - def session_secret(self): - return self._session_secret - - class AuthClient: """ Simple client object responsible for handling acquiring and applying authenticated session details to requests. From 52dfa9f75af97f2586382c2dcb08f566d7a547e6 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 10 Aug 2023 13:39:15 -0500 Subject: [PATCH 05/34] Un-generifying RequestClient and subtypes. --- .../dmod/communication/client.py | 209 ++++++------------ 1 file changed, 73 insertions(+), 136 deletions(-) diff --git a/python/lib/communication/dmod/communication/client.py b/python/lib/communication/dmod/communication/client.py index 7683f59b2..dba8a20ed 100644 --- a/python/lib/communication/dmod/communication/client.py +++ b/python/lib/communication/dmod/communication/client.py @@ -2,24 +2,21 @@ import datetime import json import ssl -import traceback import typing from abc import ABC, abstractmethod from asyncio import AbstractEventLoop from pathlib import Path from typing import Generic, Optional, Type, TypeVar, Union -from dmod.core.serializable import Serializable import websockets from .maas_request import ExternalRequest, ExternalRequestResponse -from .message import AbstractInitRequest, Message, Response, InitRequestResponseReason +from .message import AbstractInitRequest, Response from .partition_request import PartitionRequest, PartitionResponse from .dataset_management_message import DatasetManagementMessage, DatasetManagementResponse from .scheduler_request import SchedulerRequestMessage, SchedulerRequestResponse from .evaluation_request import EvaluationConnectionRequest from .evaluation_request import EvaluationConnectionRequestResponse -from .validator import NWMRequestJsonValidator from .update_message import UpdateMessage, UpdateMessageResponse import logging @@ -480,22 +477,22 @@ def is_new_session(self) -> Optional[bool]: return self._is_new_session -class RequestClient(Generic[M, R]): +class RequestClient: """ - Simple, generic DMOD service client, dealing with some type of DMOD request message and response objects. + Simple DMOD service client, dealing with DMOD request message and response objects. - Generic client type for interaction with a DMOD service. Its primary function accepts some DMOD request message - object, makes submits a request to the service by relaying the aforementioned object, and receives/returns the - response as an object of the corresponding type. The underlying communication is handled by way of a - ::class:`TransportLayerClient` supplied during initialization. + Basic client type for interaction with a DMOD service. Its primary function, ::method:`async_make_request`, accepts + some DMOD ::class:`AbstractInitRequest` object, uses a ::class:`TransportLayerClient` to submit the request object + to a service, and receives/returns the service's response. - This type is relatively simple in that a particular instance deals with a strict request/response pair. Its - functions are thus implemented to sanity check the types received - both as argument and in communication from the - service - and raise exceptions if they are of unexpected types. + To parse responses, instances must know the appropriate class type for a response. This can be provided as an + optional parameter to ::method:`async_make_request`. A default response class type can also be supplied to an + instance during init, which is used by ::method:`async_make_request` if a class type is not provided. One of the + two must be set for ::method:`async_make_request` to function. """ - def __init__(self, transport_client: TransportLayerClient, request_type: Type[M], response_type: Type[R], *args, - **kwargs): + def __init__(self, transport_client: TransportLayerClient, default_response_type: Optional[Type[Response]] = None, + *args, **kwargs): """ Initialize. @@ -503,38 +500,15 @@ def __init__(self, transport_client: TransportLayerClient, request_type: Type[M] ---------- transport_client : TransportLayerClient The client for handling the underlying raw OSI transport layer communications with the service. + default_response_type: Optional[Type[Response]] + Optional class type for responses, to use when no response class param is given when making a request. args kwargs """ self._transport_client = transport_client - self._request_type: Type[M] = request_type - self._response_type: Type[R] = response_type + self._default_response_type: Optional[Type[Response]] = default_response_type - @property - def response_type(self) -> Type[R]: - """ - The response subtype class appropriate for this client implementation. - - Returns - ------- - Type[R] - The response subtype class appropriate for this client implementation. - """ - return self._response_type - - @property - def request_type(self) -> Type[M]: - """ - Return the request message subtype class appropriate for this client implementation. - - Returns - ------- - Type[M] - The request message subtype class appropriate for this client implementation. - """ - return self._request_type - - def _process_request_response(self, response_str: str, response_type: Type[R]) -> R: + def _process_request_response(self, response_str: str, response_type: Optional[Type[Response]] = None) -> Response: """ Process the serial form of a response returned by ::method:`async_send` into a response object. @@ -542,16 +516,22 @@ def _process_request_response(self, response_str: str, response_type: Type[R]) - ---------- response_str : str The string returned by a request made via ::method:`async_send`. + response_type: Optional[Type[Response]] + An optional class type for the response that, if ``None`` (the default) is replaced with the default + provided at initialization. Returns ------- - R + Response The inflated response object. See Also ------- async_send """ + if response_type is None: + response_type = self._default_response_type + response_json = {} try: # Consume the response confirmation by deserializing first to JSON, then from this to a response object @@ -559,27 +539,26 @@ def _process_request_response(self, response_str: str, response_type: Type[R]) - try: response_object = response_type.factory_init_from_deserialized_json(response_json) if response_object is None: - msg = f'********** {self.__class__.__name__} could not deserialize {response_type.__name__} from ' \ - f'raw websocket response: `{str(response_str)}`' + msg = f'********** {self.__class__.__name__} could not deserialize {response_type.__name__} ' \ + f'from raw websocket response: `{str(response_str)}`' reason = f'{self.__class__.__name__} Could Not Deserialize To {response_type.__name__}' - response_object = self.build_response(success=False, reason=reason, message=msg, data=response_json) + response_object = response_type(success=False, reason=reason, message=msg, data=response_json) except Exception as e2: msg = f'********** While deserializing {response_type.__name__}, {self.__class__.__name__} ' \ f'encountered {e2.__class__.__name__}: {str(e2)}' - reason = f'{self.__class__.__name__} {e2.__class__.__name__} Deserializing {response_type.__name__}' - response_object = self.build_response(success=False, reason=reason, message=msg, data=response_json) + reason = f'{self.__class__.__name__} {e2.__class__.__name__} Deserialize {response_type.__name__}' + response_object = response_type(success=False, reason=reason, message=msg, data=response_json) except Exception as e: reason = 'Invalid JSON Response' msg = f'Encountered {e.__class__.__name__} loading response to JSON: {str(e)}' - response_object = self.build_response(success=False, reason=reason, message=msg, data=response_json) + response_object = response_type(success=False, reason=reason, message=msg, data=response_json) if not response_object.success: logging.error(response_object.message) - logging.debug('************* {} returning {} object {}'.format(self.__class__.__name__, response_type.__name__, - response_object.to_json())) + logging.debug(f'{self.__class__.__name__} returning {str(response_type)} {response_str}') return response_object - async def async_make_request(self, message: M, expected_response_type: Type[R]) -> R: + async def async_make_request(self, message: AbstractInitRequest, response_type: Optional[Type[Response]] = None) -> Response: """ Async send a request message object and return the received response. @@ -588,14 +567,24 @@ async def async_make_request(self, message: M, expected_response_type: Type[R]) Parameters ---------- - message : M - the request message object + message : AbstractInitRequest + The request message object. + response_type: Optional[Type[Response]] + An optional class type for the response that, if ``None`` (the default) is replaced with the default + provided at initialization. Returns ------- - R + Response the request response object """ + if response_type is None: + if self._default_response_type is None: + msg = f"{self.__class__.__name__} can't make request with neither response type parameter or default" + raise RuntimeError(msg) + else: + response_type = self._default_response_type + response_json = {} try: # Send the request and get the service response @@ -604,50 +593,12 @@ async def async_make_request(self, message: M, expected_response_type: Type[R]) raise ValueError(f'Serialized response from {self.__class__.__name__} async message was `None`') except Exception as e: reason = f'{self.__class__.__name__} Send {message.__class__.__name__} Failure ({e.__class__.__name__})' - msg = f'{self.__class__.__name__} raised {e.__class__.__name__} sending {message.__class__.__name__}: ' \ - f'{str(e)}' + msg = f'Sending {message.__class__.__name__} raised {e.__class__.__name__}: {str(e)}' logger.error(msg) - return expected_response_type(success=False, reason=reason, message=msg, data=response_json) + return response_type(success=False, reason=reason, message=msg, data=response_json) assert isinstance(serialized_response, str) - return self._process_request_response(serialized_response, expected_response_type) - - def build_response(self, success: bool, reason: str, message: str = '', data: Optional[dict] = None, - **kwargs) -> R: - """ - Build a response of the appropriate subtype from the given response details. - - Build a response of the appropriate subtype for this particular implementation, using the given parameters for - this function as the initialization params for the response. Per the design of ::class:`Response`, the primary - attributes are ::attribute:`Response.success`, ::attribute:`Response.reason`, ::attribute:`Response.message`, - and ::attribute:`Response.data`. However, implementations may permit or require additional param values, which - can be supplied via keyword args. - - As with the init of ::class:`Request`, defaults of ``''`` (empty string) and ``None`` are in place for for - ``message`` and ``data`` respectively. - - A default implementation is provided that initializes an instance of the type return by - ::method:`get_response_subtype`. Keyword args are not used in this default implementation. - - Parameters - ---------- - success : bool - The value for ::attribute:`Response.success` to use when initializing the response object. - reason : str - The value for ::attribute:`Response.reason` to use when initializing the response object. - message : str - The value for ::attribute:`Response.message` to use when initializing the response object (default: ``''``). - data : dict - The value for ::attribute:`Response.data` to use when initializing the response object (default: ``None``). - kwargs : dict - A dict for any additional implementation specific init params for the response object. - - Returns - ------- - R - A response object of the appropriate subtype. - """ - return self.response_type(success=success, reason=reason, message=message, data=data) + return self._process_request_response(serialized_response) class ConnectionContextClient(Generic[CONN], TransportLayerClient, ABC): @@ -886,11 +837,10 @@ async def listen(self) -> typing.Union[str, bytes]: return await websocket.connection.recv() -class SchedulerClient(RequestClient[SchedulerRequestMessage, SchedulerRequestResponse]): +class SchedulerClient(RequestClient): - @classmethod - def get_response_subtype(cls) -> Type[SchedulerRequestResponse]: - return SchedulerRequestResponse + def __init__(self, *args, **kwargs): + super().__init__(default_response_type=SchedulerRequestResponse, *args, **kwargs) async def async_send_update(self, message: UpdateMessage) -> UpdateMessageResponse: """ @@ -930,7 +880,7 @@ async def async_send_update(self, message: UpdateMessage) -> UpdateMessageRespon response_text='None' if serialized_response is None else serialized_response) -class ExternalRequestClient(RequestClient[EXTERN_REQ_M, EXTERN_REQ_R]): +class ExternalRequestClient(RequestClient): def __init__(self, auth_client: AuthClient, *args, **kwargs): """ @@ -944,8 +894,6 @@ def __init__(self, auth_client: AuthClient, *args, **kwargs): Other Parameters ---------- transport_client: TransportLayerClient - request_type: Type[EXTERN_REQ_M] - response_type: Type[EXTERN_REQ_R] """ super().__init__(*args, **kwargs) @@ -955,7 +903,8 @@ def __init__(self, auth_client: AuthClient, *args, **kwargs): self._warnings = None self._info = None - async def async_make_request(self, message: EXTERN_REQ_M, expected_response_type: Type[EXTERN_REQ_R]) -> EXTERN_REQ_R: + async def async_make_request(self, message: ExternalRequest, + response_type: Optional[Type[ExternalRequestResponse]] = None) -> ExternalRequestResponse: """ Async send a request message object and return the received response. @@ -964,21 +913,27 @@ async def async_make_request(self, message: EXTERN_REQ_M, expected_response_type Parameters ---------- - message : EXTERN_REQ_R - the request message object + message : ExternalRequest + The request message object. + response_type: Optional[Type[ExternalRequestResponse]] + An optional class type for the response that, if ``None`` (the default) is replaced with the default + provided at initialization. Returns ------- EXTERN_REQ_R the request response object """ + if response_type is None: + response_type = self._default_response_type + if await self._auth_client.apply_auth(message): - return await super().async_make_request(message, expected_response_type) + return await super().async_make_request(message, response_type=response_type) else: reason = f'{self.__class__.__name__} Request Auth Failure' msg = f'{self.__class__.__name__} async_make_request could not apply auth to {message.__class__.__name__}' logger.error(msg) - return expected_response_type(success=False, reason=reason, message=msg) + return response_type(success=False, reason=reason, message=msg) @property def errors(self): @@ -993,16 +948,16 @@ def warnings(self): return self._warnings -class DataServiceClient(RequestClient[DatasetManagementMessage, DatasetManagementResponse]): +class DataServiceClient(RequestClient): """ Client for data service communication between internal DMOD services. """ - @classmethod - def get_response_subtype(cls) -> Type[DatasetManagementResponse]: - return DatasetManagementResponse + + def __init__(self, *args, **kwargs): + super().__init__(default_response_type=DatasetManagementResponse, *args, **kwargs) -class PartitionerServiceClient(RequestClient[PartitionRequest, PartitionResponse]): +class PartitionerServiceClient(RequestClient): """ A client for interacting with the partitioner service. @@ -1010,31 +965,13 @@ class PartitionerServiceClient(RequestClient[PartitionRequest, PartitionResponse does not need to be a (public) ::class:`ExternalRequestClient` based type. """ - @classmethod - def get_response_subtype(cls) -> Type[PartitionResponse]: - """ - Return the response subtype class appropriate for this client implementation. - - Returns - ------- - Type[PartitionResponse] - The response subtype class appropriate for this client implementation. - """ - return PartitionResponse + def __init__(self, *args, **kwargs): + super().__init__(default_response_type=PartitionResponse, *args, **kwargs) -class EvaluationServiceClient(RequestClient[EvaluationConnectionRequest, EvaluationConnectionRequestResponse]): +class EvaluationServiceClient(RequestClient): """ A client for interacting with the evaluation service """ - - @classmethod - def get_response_subtype(cls) -> Type[EvaluationConnectionRequestResponse]: - """ - Return the response subtype class appropriate for this client implementation - - Returns: - Type[EvaluationConnectionRequestResponse] - The response subtype class appropriate for this client implementation - """ - return EvaluationConnectionRequestResponse + def __init__(self, *args, **kwargs): + super().__init__(default_response_type=EvaluationConnectionRequestResponse, *args, **kwargs) From 072d08684d550a207923a70a57c37b9c2b04600a Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 10 Aug 2023 13:41:23 -0500 Subject: [PATCH 06/34] Updating tests for client changes. --- .../dmod/test/test_scheduler_client.py | 76 ++++++++++--------- 1 file changed, 42 insertions(+), 34 deletions(-) diff --git a/python/lib/communication/dmod/test/test_scheduler_client.py b/python/lib/communication/dmod/test/test_scheduler_client.py index f2142a1b1..025691576 100644 --- a/python/lib/communication/dmod/test/test_scheduler_client.py +++ b/python/lib/communication/dmod/test/test_scheduler_client.py @@ -1,20 +1,16 @@ import asyncio -import json import logging +import ssl import unittest -from pathlib import Path from typing import Optional, Union -from ..communication import NWMRequest, SchedulerClient, SchedulerRequestMessage, SchedulerRequestResponse +from ..communication import NWMRequest, SchedulerClient, SchedulerRequestMessage, SchedulerRequestResponse, \ + TransportLayerClient -class MockSendTestingSchedulerClient(SchedulerClient): - """ - Customized extension of ``SchedulerClient`` for testing purposes, where the :meth:`async_send` method has been - overridden with a mock implementation to allow for testing without actually needing a real websocket connection. - """ +class MockTransportLayerClient(TransportLayerClient): def __init__(self): - super().__init__(endpoint_uri='', ssl_directory=Path('.')) + super().__init__(endpoint_uri='') self.test_responses = dict() @@ -52,20 +48,52 @@ async def async_send(self, data: Union[str, bytearray], await_response: bool = F else: return str(response) - def set_scheduler_response_none(self): + async def async_recv(self) -> str: + pass + + @property + def client_ssl_context(self) -> ssl.SSLContext: + pass + + def set_client_response_none(self): self.test_response_selection = 0 - def set_scheduler_response_non_json_string(self): + def set_client_response_non_json_string(self): self.test_response_selection = 1 - def set_scheduler_response_unrecognized_json(self): + def set_client_response_unrecognized_json(self): self.test_response_selection = 2 - def set_scheduler_response_valid_obj_for_failure(self): + def set_client_response_valid_obj_for_failure(self): self.test_response_selection = 3 - def set_scheduler_response_valid_obj_for_success(self): + def set_client_response_valid_obj_for_success(self): self.test_response_selection = 4 + + +class MockSendTestingSchedulerClient(SchedulerClient): + """ + Customized extension of ``SchedulerClient`` for testing purposes, where the :meth:`async_send` method has been + overridden with a mock implementation to allow for testing without actually needing a real websocket connection. + """ + + def __init__(self): + super().__init__(transport_client=MockTransportLayerClient()) + + def set_scheduler_response_none(self): + self._transport_client.test_response_selection = 0 + + def set_scheduler_response_non_json_string(self): + self._transport_client.test_response_selection = 1 + + def set_scheduler_response_unrecognized_json(self): + self._transport_client.test_response_selection = 2 + + def set_scheduler_response_valid_obj_for_failure(self): + self._transport_client.test_response_selection = 3 + + def set_scheduler_response_valid_obj_for_success(self): + self._transport_client.test_response_selection = 4 class TestSchedulerClient(unittest.TestCase): @@ -103,26 +131,6 @@ def tearDown(self) -> None: self.loop.stop() self.loop.close() - def test_get_response_subtype_1_a(self): - """ - Test that ``get_response_subtype`` returns the right type. - """ - self.assertEqual(SchedulerRequestResponse, self.client.get_response_subtype()) - - def test_build_response_1_a(self): - """ - Basic test to ensure this function operates correctly. - """ - response = self.client.build_response(success=True, reason='Test Good', message='Test worked correctly') - self.assertTrue(isinstance(response, SchedulerRequestResponse)) - - def test_build_response_1_b(self): - """ - Basic test to ensure this response has the expected ``success`` value. - """ - response = self.client.build_response(success=True, reason='Test Good', message='Test worked correctly') - self.assertTrue(response.success) - def test_async_make_request_1_a(self): """ Test when function gets ``None`` returned over websocket that response object ``success`` is ``False``. From 7362f99ab455b49815b295c8c7582fa18c3ab8b8 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 10 Aug 2023 13:45:41 -0500 Subject: [PATCH 07/34] Updating comms package init. --- python/lib/communication/dmod/communication/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/lib/communication/dmod/communication/__init__.py b/python/lib/communication/dmod/communication/__init__.py index 5dadb9b75..e01e99be5 100644 --- a/python/lib/communication/dmod/communication/__init__.py +++ b/python/lib/communication/dmod/communication/__init__.py @@ -1,6 +1,6 @@ from ._version import __version__ -from .client import DataServiceClient, InternalServiceClient, ModelExecRequestClient, ExternalRequestClient, \ - PartitionerServiceClient, SchedulerClient +from .client import AuthClient, CachedAuthClient, DataServiceClient, ExternalRequestClient, PartitionerServiceClient, \ + RequestClient, SchedulerClient, SSLSecuredTransportLayerClient, TransportLayerClient, WebSocketClient from .maas_request import get_available_models, get_available_outputs, get_distribution_types, get_parameters, \ get_request, AbstractNgenRequest, Distribution, DmodJobRequest, ExternalRequest, ExternalRequestResponse,\ ModelExecRequest, ModelExecRequestResponse, NWMRequest, NWMRequestResponse, Scalar, NGENRequest, \ From 7e802783e105c4611ad4910a71bf1e061e46a2ff Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 10 Aug 2023 13:47:40 -0500 Subject: [PATCH 08/34] Bump comms package to 1.0.0. --- python/lib/communication/dmod/communication/_version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lib/communication/dmod/communication/_version.py b/python/lib/communication/dmod/communication/_version.py index ef9199407..1f356cc57 100644 --- a/python/lib/communication/dmod/communication/_version.py +++ b/python/lib/communication/dmod/communication/_version.py @@ -1 +1 @@ -__version__ = '0.14.0' +__version__ = '1.0.0' From 7050ff654d83d000e384b889c0e28bb0e877a018 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 10 Aug 2023 14:00:06 -0500 Subject: [PATCH 09/34] Deprecating some RequestClient subclasses. Deprecating to help move away from service-specific types in a general library. --- .../lib/communication/dmod/communication/client.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/python/lib/communication/dmod/communication/client.py b/python/lib/communication/dmod/communication/client.py index dba8a20ed..19ea72980 100644 --- a/python/lib/communication/dmod/communication/client.py +++ b/python/lib/communication/dmod/communication/client.py @@ -5,6 +5,7 @@ import typing from abc import ABC, abstractmethod from asyncio import AbstractEventLoop +from deprecated import deprecated from pathlib import Path from typing import Generic, Optional, Type, TypeVar, Union @@ -12,10 +13,9 @@ from .maas_request import ExternalRequest, ExternalRequestResponse from .message import AbstractInitRequest, Response -from .partition_request import PartitionRequest, PartitionResponse -from .dataset_management_message import DatasetManagementMessage, DatasetManagementResponse -from .scheduler_request import SchedulerRequestMessage, SchedulerRequestResponse -from .evaluation_request import EvaluationConnectionRequest +from .partition_request import PartitionResponse +from .dataset_management_message import DatasetManagementResponse +from .scheduler_request import SchedulerRequestResponse from .evaluation_request import EvaluationConnectionRequestResponse from .update_message import UpdateMessage, UpdateMessageResponse @@ -837,6 +837,7 @@ async def listen(self) -> typing.Union[str, bytes]: return await websocket.connection.recv() +@deprecated("Use RequestClient or ExternalRequestClient instead") class SchedulerClient(RequestClient): def __init__(self, *args, **kwargs): @@ -948,6 +949,7 @@ def warnings(self): return self._warnings +@deprecated("Use RequestClient or ExternalRequestClient instead") class DataServiceClient(RequestClient): """ Client for data service communication between internal DMOD services. @@ -957,6 +959,7 @@ def __init__(self, *args, **kwargs): super().__init__(default_response_type=DatasetManagementResponse, *args, **kwargs) +@deprecated("Use RequestClient or ExternalRequestClient instead") class PartitionerServiceClient(RequestClient): """ A client for interacting with the partitioner service. @@ -969,6 +972,7 @@ def __init__(self, *args, **kwargs): super().__init__(default_response_type=PartitionResponse, *args, **kwargs) +@deprecated("Use RequestClient or ExternalRequestClient instead") class EvaluationServiceClient(RequestClient): """ A client for interacting with the evaluation service From f56bc988d7cab3bd2c18f901cd7bbf411580f54c Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 10 Aug 2023 14:08:35 -0500 Subject: [PATCH 10/34] Remove obsoleted generic type vars. --- python/lib/communication/dmod/communication/client.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/python/lib/communication/dmod/communication/client.py b/python/lib/communication/dmod/communication/client.py index 19ea72980..dd69a9b01 100644 --- a/python/lib/communication/dmod/communication/client.py +++ b/python/lib/communication/dmod/communication/client.py @@ -24,12 +24,6 @@ # TODO: refactor this to allow for implementation-specific overriding more easily logger = logging.getLogger("gui_log") -M = TypeVar("M", bound=AbstractInitRequest) -R = TypeVar("R", bound=Response) - -EXTERN_REQ_M = TypeVar("EXTERN_REQ_M", bound=ExternalRequest) -EXTERN_REQ_R = TypeVar("EXTERN_REQ_R", bound=ExternalRequestResponse) - CONN = TypeVar("CONN") def get_or_create_eventloop() -> AbstractEventLoop: From 5b6d015908dc70fb61dfb9cfcbbab2590927ed7a Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 10 Aug 2023 14:38:55 -0500 Subject: [PATCH 11/34] Update externalrequests to latest comms dep. --- python/lib/externalrequests/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lib/externalrequests/setup.py b/python/lib/externalrequests/setup.py index 8a304ea9f..4187ea9af 100644 --- a/python/lib/externalrequests/setup.py +++ b/python/lib/externalrequests/setup.py @@ -20,6 +20,6 @@ author_email='', url='', license='', - install_requires=['websockets', 'dmod-core>=0.1.0', 'dmod-communication>=0.4.2', 'dmod-access>=0.1.1'], + install_requires=['websockets', 'dmod-core>=0.1.0', 'dmod-communication>=1.0.0', 'dmod-access>=0.1.1'], packages=find_namespace_packages(exclude=['dmod.test', 'schemas', 'ssl', 'src']) ) From fbf6fa0d62e1c944a9fb071d626764d24909f5e3 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 10 Aug 2023 14:40:32 -0500 Subject: [PATCH 12/34] Update external request handlers for comms. Updating handlers to account for changes to communication package. --- .../externalrequests/maas_request_handlers.py | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/python/lib/externalrequests/dmod/externalrequests/maas_request_handlers.py b/python/lib/externalrequests/dmod/externalrequests/maas_request_handlers.py index 23283d9bb..ee384ce70 100644 --- a/python/lib/externalrequests/dmod/externalrequests/maas_request_handlers.py +++ b/python/lib/externalrequests/dmod/externalrequests/maas_request_handlers.py @@ -6,8 +6,8 @@ from dmod.access import Authorizer from dmod.communication import AbstractRequestHandler, DataServiceClient, FullAuthSession, ExternalRequest, \ - InitRequestResponseReason, InternalServiceClient, PartitionRequest, PartitionResponse, PartitionerServiceClient, \ - Session, SessionManager + InitRequestResponseReason, RequestClient, PartitionRequest, PartitionResponse, PartitionerServiceClient, \ + TransportLayerClient, Session, SessionManager, WebSocketClient from dmod.communication.dataset_management_message import MaaSDatasetManagementMessage, MaaSDatasetManagementResponse, \ ManagementAction from dmod.communication.data_transmit_message import DataTransmitMessage, DataTransmitResponse @@ -38,6 +38,7 @@ def __init__(self, session_manager: SessionManager, authorizer: Authorizer, serv self._service_port = service_port self._service_ssl_dir = service_ssl_dir self._service_url = None + self._transport_client = None async def _is_authorized(self, request: ExternalRequest, session: FullAuthSession) -> bool: """ @@ -129,15 +130,22 @@ async def get_authorized_session(self, request: ExternalRequest) -> Tuple[ msg = None return session, is_authorized, reason, msg + @property + def transport_client(self) -> TransportLayerClient: + if self._transport_client is None: + # TODO: parameterize whether to, e.g., use websocket uri/protocol, as opposed to something else + self._transport_client = WebSocketClient(endpoint_uri=self.service_url, ssl_directory=self.service_ssl_dir) + return self._transport_client + @property @abstractmethod - def service_client(self) -> InternalServiceClient: + def service_client(self) -> RequestClient: """ - Get the client for interacting with the service, which also is a context manager for connections. + Get the client for interacting with the service. Returns ------- - InternalServiceClient + RequestClient The client for interacting with the service. """ pass @@ -148,6 +156,7 @@ def service_ssl_dir(self) -> Path: @property def service_url(self) -> str: + # TODO: parameterize whether to, e.g., use websocket uri/protocol, as opposed to something else if self._service_url is None: self._service_url = 'wss://{}:{}'.format(str(self._service_host), str(self._service_port)) return self._service_url @@ -202,7 +211,7 @@ async def determine_required_access_types(self, request: PartitionRequest, user) @property def service_client(self) -> PartitionerServiceClient: if self._service_client is None: - self._service_client = PartitionerServiceClient(self.service_url, self.service_ssl_dir) + self._service_client = PartitionerServiceClient(transport_client=self.transport_client) return self._service_client async def handle_request(self, request: PartitionRequest, **kwargs) -> PartitionResponse: @@ -332,5 +341,5 @@ async def handle_request(self, request: MaaSDatasetManagementMessage, **kwargs) @property def service_client(self) -> DataServiceClient: if self._service_client is None: - self._service_client = DataServiceClient(endpoint_uri=self.service_url, ssl_directory=self.service_ssl_dir) + self._service_client = DataServiceClient(transport_client=self.transport_client) return self._service_client From 3f421ab27228bd1c44481b9ccc434a8d74db8701 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 10 Aug 2023 14:40:49 -0500 Subject: [PATCH 13/34] Bump externalrequests package version to 0.5.0. --- python/lib/externalrequests/dmod/externalrequests/_version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lib/externalrequests/dmod/externalrequests/_version.py b/python/lib/externalrequests/dmod/externalrequests/_version.py index b703f5c96..9bdd4d277 100644 --- a/python/lib/externalrequests/dmod/externalrequests/_version.py +++ b/python/lib/externalrequests/dmod/externalrequests/_version.py @@ -1 +1 @@ -__version__ = '0.4.1' \ No newline at end of file +__version__ = '0.5.0' \ No newline at end of file From 2bc8fc30d595903607a4ed110ea4540dbf48b22a Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Mon, 21 Aug 2023 14:54:37 -0400 Subject: [PATCH 14/34] Adjusting comms version to not be 1.0.0 yet. --- python/lib/communication/dmod/communication/_version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lib/communication/dmod/communication/_version.py b/python/lib/communication/dmod/communication/_version.py index 1f356cc57..a842d05a7 100644 --- a/python/lib/communication/dmod/communication/_version.py +++ b/python/lib/communication/dmod/communication/_version.py @@ -1 +1 @@ -__version__ = '1.0.0' +__version__ = '0.15.0' From f05a09ceaac973b5136ef9692d926b1e3c7e5232 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Mon, 21 Aug 2023 14:54:50 -0400 Subject: [PATCH 15/34] Adjusting comms dep to not be 1.0.0 yet. --- python/lib/externalrequests/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lib/externalrequests/setup.py b/python/lib/externalrequests/setup.py index 4187ea9af..f5ac1b936 100644 --- a/python/lib/externalrequests/setup.py +++ b/python/lib/externalrequests/setup.py @@ -20,6 +20,6 @@ author_email='', url='', license='', - install_requires=['websockets', 'dmod-core>=0.1.0', 'dmod-communication>=1.0.0', 'dmod-access>=0.1.1'], + install_requires=['websockets', 'dmod-core>=0.1.0', 'dmod-communication>=0.15.0', 'dmod-access>=0.1.1'], packages=find_namespace_packages(exclude=['dmod.test', 'schemas', 'ssl', 'src']) ) From 97548fad24e56f1b4d8385d67fda328edfc59dc3 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Mon, 21 Aug 2023 15:00:51 -0400 Subject: [PATCH 16/34] Fix optional client session property type hints. --- python/lib/communication/dmod/communication/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/lib/communication/dmod/communication/client.py b/python/lib/communication/dmod/communication/client.py index dd69a9b01..a1c6ad255 100644 --- a/python/lib/communication/dmod/communication/client.py +++ b/python/lib/communication/dmod/communication/client.py @@ -283,11 +283,11 @@ def force_reauth(self, should_force_new: bool): self._force_reauth = should_force_new @property - def session_created(self) -> str: + def session_created(self) -> Optional[str]: return self._session_created @property - def session_id(self) -> str: + def session_id(self) -> Optional[str]: return self._session_id From b7ac988cccb11fb056a9884296e7678b9317ba46 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Mon, 21 Aug 2023 15:31:17 -0400 Subject: [PATCH 17/34] For cached auth, use fix, common default basename. --- python/lib/communication/dmod/communication/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lib/communication/dmod/communication/client.py b/python/lib/communication/dmod/communication/client.py index a1c6ad255..f689a5af9 100644 --- a/python/lib/communication/dmod/communication/client.py +++ b/python/lib/communication/dmod/communication/client.py @@ -322,7 +322,7 @@ def __init__(self, session_file: Optional[Path] = None, *args, **kwargs): self._is_new_session = None self._force_reload = False - default_basename = '.{}_session'.format(datetime.datetime.now().strftime('%Y%m%d%H%M%S%s')) + default_basename = '.dmod_session' if session_file is None: self._cached_session_file = Path.home().joinpath(default_basename) From 4be3cb1276f9c46506d0f73945ec0ebf48e6a237 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Mon, 21 Aug 2023 16:18:39 -0400 Subject: [PATCH 18/34] Make WebSocketClient.build_endpoint_uri static. Making this a static method rather than a class method, as it doesn't need any class details. --- python/lib/communication/dmod/communication/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/lib/communication/dmod/communication/client.py b/python/lib/communication/dmod/communication/client.py index f689a5af9..acc1eb969 100644 --- a/python/lib/communication/dmod/communication/client.py +++ b/python/lib/communication/dmod/communication/client.py @@ -769,8 +769,8 @@ class WebSocketClient(SSLSecuredTransportLayerClient, ConnectionContextClient[we over the websocket. """ - @classmethod - def build_endpoint_uri(cls, host: str, port: Union[int, str], path: Optional[str] = None, is_secure: bool = True): + @staticmethod + def build_endpoint_uri(host: str, port: Union[int, str], path: Optional[str] = None, is_secure: bool = True): proto = 'wss' if is_secure else 'ws' if path is None: path = '' From 9e60aa0833a87fc83f946f6cfa8d699f7abecac6 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 24 Aug 2023 13:34:33 -0400 Subject: [PATCH 19/34] Put SSLContext directly in TransportLayerClient. Moving to top-level interface rather than having subtype for this. --- .../dmod/communication/client.py | 41 ++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/python/lib/communication/dmod/communication/client.py b/python/lib/communication/dmod/communication/client.py index acc1eb969..7afc7c9b9 100644 --- a/python/lib/communication/dmod/communication/client.py +++ b/python/lib/communication/dmod/communication/client.py @@ -54,15 +54,34 @@ class TransportLayerClient(ABC): data to accept data and send this data to a server at some endpoint. The interface function for this behavior supports optionally waiting for and returning a raw data response. Alternatively, the type provides a function for receiving a response from the server independently. + + Instances are capable of securing communications using an ::class:`SSLContext`. A customized context or default + context can be created, depending on the parameters passed during init. """ - def __init__(self, endpoint_uri: str, *args, **kwargs): + def __init__(self, endpoint_uri: str, cafile: Optional[Path] = None, capath: Optional[Path] = None, + use_default_context: bool = False, *args, **kwargs): """ Initialize this instance. + Initialization may or may not include creation of an ::class:`SSLContext`, according to these rules: + - If ``cafile`` is ``None``, ``capath`` is ``None``, and ``use_default_context`` is ``False`` (which are the + default values for each), then no ::class:`SSLContext` is created. + - If ``use_default_context`` is ``True``, ::function:`ssl.create_default_context` is used to create a + context object, with ``cafile`` and ``capath`` passed as kwargs. + - If either ``cafile`` or ``capath`` is not ``None``, and ``use_default_context`` is ``False``, a customized + context object is created, with certificates loaded from locations at ``cafile`` and/or ``capath``. + Parameters ---------- endpoint_uri: str The endpoint for the client to connect to when opening a connection. + cafile: Optional[Path] + Optional path to CA certificates PEM file. + capath: Optional[Path] + Optional path to directory containing CA certificates PEM files, following an OpenSSL specific layout (see + ::function:`ssl.SSLContext.load_verify_locations`). + use_default_context: bool + Whether to use ::function:`ssl.create_default_context` to create a ::class:`SSLContext` (default ``False``). args Other unused positional parameters. kwargs @@ -73,6 +92,14 @@ def __init__(self, endpoint_uri: str, *args, **kwargs): self.endpoint_uri = endpoint_uri """str: The endpoint for the client to connect to when opening a connection.""" + if use_default_context: + self._client_ssl_context = ssl.create_default_context(cafile=cafile, capath=capath) + elif cafile is not None or capath is not None: + self._client_ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + self._client_ssl_context.load_verify_locations(cafile=cafile, capath=capath) + else: + self._client_ssl_context = None + @abstractmethod async def async_send(self, data: Union[str, bytearray, bytes], await_response: bool = False) -> Optional[str]: """ @@ -104,6 +131,18 @@ async def async_recv(self) -> str: """ pass + @property + def client_ssl_context(self) -> Optional[ssl.SSLContext]: + """ + The client SSL context for securing connections, if one was created. + + Returns + ------- + Optional[ssl.SSLContext] + The client SSL context for securing connections, if one was created; otherwise ``None``. + """ + return self._client_ssl_context + class SSLSecuredTransportLayerClient(TransportLayerClient, ABC): """ From abd58db0bce3f81350c36b9f81adc0db12a06316 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 24 Aug 2023 13:35:08 -0400 Subject: [PATCH 20/34] Remove SSLSecuredTransportLayerClient. Removing redundant SSLSecuredTransportLayerClient subtype. --- .../dmod/communication/client.py | 34 ------------------- 1 file changed, 34 deletions(-) diff --git a/python/lib/communication/dmod/communication/client.py b/python/lib/communication/dmod/communication/client.py index 7afc7c9b9..2a83f9f62 100644 --- a/python/lib/communication/dmod/communication/client.py +++ b/python/lib/communication/dmod/communication/client.py @@ -144,40 +144,6 @@ def client_ssl_context(self) -> Optional[ssl.SSLContext]: return self._client_ssl_context -class SSLSecuredTransportLayerClient(TransportLayerClient, ABC): - """ - Abstract ::class:`TransportLayerClient` capable securing its communications using an ::class:`SSLContext`. - """ - def __init__(self, ssl_directory: Path, *args, **kwargs): - super().__init__(*args, **kwargs) - - self._ssl_directory = ssl_directory - """Path: The parent directory of the cert PEM file used for the client SSL context.""" - - # Setup this as a property to allow more private means to override the actual filename of the cert PEM file - self._client_ssl_context = None - """ssl.SSLContext: The private field for the client SSL context property.""" - - self._cert_pem_file_basename: str = 'certificate.pem' - """str: The basename of the certificate PEM file to use.""" - - @property - def client_ssl_context(self) -> ssl.SSLContext: - """ - Get the client SSL context property, lazily instantiating if necessary. - - Returns - ------- - ssl.SSLContext - The client SSL context for secure connections. - """ - if self._client_ssl_context is None: - self._client_ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) - endpoint_pem = self._ssl_directory.joinpath(self._cert_pem_file_basename) - self.client_ssl_context.load_verify_locations(endpoint_pem) - return self._client_ssl_context - - class AuthClient: """ Simple client object responsible for handling acquiring and applying authenticated session details to requests. From da0c407ca3ea3149d2ba914cfed2ccd418460b1c Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Fri, 25 Aug 2023 14:59:23 -0400 Subject: [PATCH 21/34] Remove SSLSecuredTransportLayerClient from init. Removing redundant SSLSecuredTransportLayerClient from communication package __init__.py. --- python/lib/communication/dmod/communication/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lib/communication/dmod/communication/__init__.py b/python/lib/communication/dmod/communication/__init__.py index e01e99be5..8178b3a70 100644 --- a/python/lib/communication/dmod/communication/__init__.py +++ b/python/lib/communication/dmod/communication/__init__.py @@ -1,6 +1,6 @@ from ._version import __version__ from .client import AuthClient, CachedAuthClient, DataServiceClient, ExternalRequestClient, PartitionerServiceClient, \ - RequestClient, SchedulerClient, SSLSecuredTransportLayerClient, TransportLayerClient, WebSocketClient + RequestClient, SchedulerClient, TransportLayerClient, WebSocketClient from .maas_request import get_available_models, get_available_outputs, get_distribution_types, get_parameters, \ get_request, AbstractNgenRequest, Distribution, DmodJobRequest, ExternalRequest, ExternalRequestResponse,\ ModelExecRequest, ModelExecRequestResponse, NWMRequest, NWMRequestResponse, Scalar, NGENRequest, \ From 5b6160d3ba24779a09d3fb0e99a90229f0d89191 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Fri, 25 Aug 2023 15:02:37 -0400 Subject: [PATCH 22/34] Refactor WebSocketClient after SSL client removal. --- python/lib/communication/dmod/communication/client.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/python/lib/communication/dmod/communication/client.py b/python/lib/communication/dmod/communication/client.py index 2a83f9f62..a726ff0a7 100644 --- a/python/lib/communication/dmod/communication/client.py +++ b/python/lib/communication/dmod/communication/client.py @@ -754,13 +754,12 @@ def connection(self) -> Optional[CONN]: return self._connection -class WebSocketClient(SSLSecuredTransportLayerClient, ConnectionContextClient[websockets.WebSocketClientProtocol]): +class WebSocketClient(ConnectionContextClient[websockets.WebSocketClientProtocol]): """ - Subtype of ::class:`SSLSecuredTransportLayerClient` that specifically works over SSL-secured websocket connections. + Subtype of ::class:`ConnectionContextClient` that specifically works over SSL-secured websocket connections. - A websocket-based implementation of ::class:`SSLSecuredTransportLayerClient`. Instances are also async context - managers for runtime contexts that handle websocket connections, with the manager function returning the instance - itself. + A websocket-based implementation of ::class:`ConnectionContextClient`. Instances are also async context managers for + runtime contexts that handle websocket connections, with the manager function returning the instance itself. A new runtime context will check whether there is an open websocket connection already and open a connection if not. In all cases, it maintains an instance attribute that is a counter of the number of active usages of the connection From 549045f84241c3517b21c5820d0f1a981460702a Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Fri, 25 Aug 2023 15:05:07 -0400 Subject: [PATCH 23/34] Modify transport client to take URI components. Modifying TransportLayerClient to expect init params for (most of) the components of a URI, rather than the endpoint URI directly, and adding abstract property so that concrete types can implement this themselves (e.g., to control the specific protocol component used in the URI). --- .../dmod/communication/client.py | 64 ++++++++++++++----- 1 file changed, 48 insertions(+), 16 deletions(-) diff --git a/python/lib/communication/dmod/communication/client.py b/python/lib/communication/dmod/communication/client.py index a726ff0a7..05dbb6e09 100644 --- a/python/lib/communication/dmod/communication/client.py +++ b/python/lib/communication/dmod/communication/client.py @@ -58,8 +58,10 @@ class TransportLayerClient(ABC): Instances are capable of securing communications using an ::class:`SSLContext`. A customized context or default context can be created, depending on the parameters passed during init. """ - def __init__(self, endpoint_uri: str, cafile: Optional[Path] = None, capath: Optional[Path] = None, - use_default_context: bool = False, *args, **kwargs): + + def __init__(self, endpoint_host: str, endpoint_port: Union[int, str], endpoint_path: Optional[str] = None, + cafile: Optional[Path] = None, capath: Optional[Path] = None, use_default_context: bool = False, + *args, **kwargs): """ Initialize this instance. @@ -73,8 +75,13 @@ def __init__(self, endpoint_uri: str, cafile: Optional[Path] = None, capath: Opt Parameters ---------- - endpoint_uri: str + endpoint_host: str + The host component for building this client's endpoint URI for opening a connection. The endpoint for the client to connect to when opening a connection. + endpoint_port: Union[int, str] + The host port component for building this client's endpoint URI for opening a connection. + endpoint_path: Optional[str] + The optional path component for building this client's endpoint URI for opening a connection. cafile: Optional[Path] Optional path to CA certificates PEM file. capath: Optional[Path] @@ -89,8 +96,16 @@ def __init__(self, endpoint_uri: str, cafile: Optional[Path] = None, capath: Opt """ super().__init__(*args, **kwargs) - self.endpoint_uri = endpoint_uri - """str: The endpoint for the client to connect to when opening a connection.""" + self._endpoint_host: str = endpoint_host.strip() + self._endpoint_port: str = str(endpoint_port).strip() + if endpoint_path is None: + self._endpoint_path: str = '' + else: + self._endpoint_path: str = endpoint_path.strip() + if self._endpoint_path[0] != '/': + self._endpoint_path = '/' + self._endpoint_path + + self._endpoint_uri = None if use_default_context: self._client_ssl_context = ssl.create_default_context(cafile=cafile, capath=capath) @@ -131,6 +146,19 @@ async def async_recv(self) -> str: """ pass + @property + @abstractmethod + def endpoint_uri(self) -> str: + """ + The endpoint for the client to connect to when opening a connection. + + Returns + ------- + str + The endpoint for the client to connect to when opening a connection. + """ + pass + @property def client_ssl_context(self) -> Optional[ssl.SSLContext]: """ @@ -773,17 +801,6 @@ class WebSocketClient(ConnectionContextClient[websockets.WebSocketClientProtocol over the websocket. """ - @staticmethod - def build_endpoint_uri(host: str, port: Union[int, str], path: Optional[str] = None, is_secure: bool = True): - proto = 'wss' if is_secure else 'ws' - if path is None: - path = '' - else: - path = path.strip() - if path[0] != '/': - path = '/' + path - return '{}://{}:{}{}'.format(proto, host.strip(), str(port).strip(), path) - async def _connection_recv(self) -> Optional[str]: """ Perform operations to receive data over already opened ::attribute:`connection`. @@ -824,6 +841,21 @@ async def _establish_connection(self) -> CONN: """ return await websockets.connect(self.endpoint_uri, ssl=self.client_ssl_context) + @property + def endpoint_uri(self) -> str: + """ + The endpoint for the client to connect to when opening a connection. + + Returns + ------- + str + The endpoint for the client to connect to when opening a connection. + """ + if self._endpoint_uri is None: + proto = 'ws' if self.client_ssl_context is None else 'wss' + self._endpoint_uri = f"{proto}://{self._endpoint_host}:{self._endpoint_port}{self._endpoint_path}" + return self._endpoint_uri + async def listen(self) -> typing.Union[str, bytes]: """ Waits for a message through the websocket connection From 2815193b66719ac9428f65af92947673e2488759 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Fri, 25 Aug 2023 15:05:57 -0400 Subject: [PATCH 24/34] Update request handlers for client class changes. --- .../dmod/externalrequests/maas_request_handlers.py | 12 ++++-------- .../externalrequests/model_exec_request_handler.py | 2 +- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/python/lib/externalrequests/dmod/externalrequests/maas_request_handlers.py b/python/lib/externalrequests/dmod/externalrequests/maas_request_handlers.py index ee384ce70..913d9a45b 100644 --- a/python/lib/externalrequests/dmod/externalrequests/maas_request_handlers.py +++ b/python/lib/externalrequests/dmod/externalrequests/maas_request_handlers.py @@ -134,7 +134,10 @@ async def get_authorized_session(self, request: ExternalRequest) -> Tuple[ def transport_client(self) -> TransportLayerClient: if self._transport_client is None: # TODO: parameterize whether to, e.g., use websocket uri/protocol, as opposed to something else - self._transport_client = WebSocketClient(endpoint_uri=self.service_url, ssl_directory=self.service_ssl_dir) + # TODO: subsequent PR that removes this from these types (receive a service client on init) or at least has + # it supplied on init. + self._transport_client = WebSocketClient(endpoint_host=self._service_host, endpoint_port=self._service_port, + cafile=self.service_ssl_dir.joinpath("certificate.pem")) return self._transport_client @property @@ -154,13 +157,6 @@ def service_client(self) -> RequestClient: def service_ssl_dir(self) -> Path: return self._service_ssl_dir - @property - def service_url(self) -> str: - # TODO: parameterize whether to, e.g., use websocket uri/protocol, as opposed to something else - if self._service_url is None: - self._service_url = 'wss://{}:{}'.format(str(self._service_host), str(self._service_port)) - return self._service_url - class PartitionRequestHandler(MaaSRequestHandler): diff --git a/python/lib/externalrequests/dmod/externalrequests/model_exec_request_handler.py b/python/lib/externalrequests/dmod/externalrequests/model_exec_request_handler.py index bc0a641b3..d7effd285 100644 --- a/python/lib/externalrequests/dmod/externalrequests/model_exec_request_handler.py +++ b/python/lib/externalrequests/dmod/externalrequests/model_exec_request_handler.py @@ -183,7 +183,7 @@ async def handle_request(self, request: ModelExecRequest, **kwargs) -> ModelExec @property def service_client(self) -> SchedulerClient: if self._scheduler_client is None: - self._scheduler_client = SchedulerClient(ssl_directory=self.service_ssl_dir, endpoint_uri=self.service_url) + self._scheduler_client = SchedulerClient(transport_client=self.transport_client) return self._scheduler_client From b3ec89e9b25d3cdaa5296b3dcaf85bae8c31d34a Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Fri, 25 Aug 2023 15:06:43 -0400 Subject: [PATCH 25/34] Update comms mock test class for client changes. --- python/lib/communication/dmod/test/test_scheduler_client.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/lib/communication/dmod/test/test_scheduler_client.py b/python/lib/communication/dmod/test/test_scheduler_client.py index 025691576..b640d2eec 100644 --- a/python/lib/communication/dmod/test/test_scheduler_client.py +++ b/python/lib/communication/dmod/test/test_scheduler_client.py @@ -10,7 +10,7 @@ class MockTransportLayerClient(TransportLayerClient): def __init__(self): - super().__init__(endpoint_uri='') + super().__init__(endpoint_host='', endpoint_port=8888) self.test_responses = dict() @@ -55,6 +55,10 @@ async def async_recv(self) -> str: def client_ssl_context(self) -> ssl.SSLContext: pass + @property + def endpoint_uri(self) -> str: + return '' + def set_client_response_none(self): self.test_response_selection = 0 From 5d834f4213f5447006d04f3b0b5f41d93407c94e Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Mon, 28 Aug 2023 14:37:49 -0400 Subject: [PATCH 26/34] Tweak transport client endpoint uri handling. --- .../dmod/communication/client.py | 54 +++++++++++++++---- 1 file changed, 45 insertions(+), 9 deletions(-) diff --git a/python/lib/communication/dmod/communication/client.py b/python/lib/communication/dmod/communication/client.py index 05dbb6e09..6b850d3e3 100644 --- a/python/lib/communication/dmod/communication/client.py +++ b/python/lib/communication/dmod/communication/client.py @@ -59,6 +59,24 @@ class TransportLayerClient(ABC): context can be created, depending on the parameters passed during init. """ + @classmethod + @abstractmethod + def get_endpoint_protocol_str(cls, use_secure_connection: bool = True) -> str: + """ + Get the protocol substring portion for valid connection URI strings for an instance of this class. + + Parameters + ---------- + use_secure_connection : bool + Whether to get the protocol substring applicable for secure connections (``True`` by default). + + Returns + ------- + str + The protocol substring portion for valid connection URI strings for an instance of this class. + """ + pass + def __init__(self, endpoint_host: str, endpoint_port: Union[int, str], endpoint_path: Optional[str] = None, cafile: Optional[Path] = None, capath: Optional[Path] = None, use_default_context: bool = False, *args, **kwargs): @@ -97,13 +115,8 @@ def __init__(self, endpoint_host: str, endpoint_port: Union[int, str], endpoint_ super().__init__(*args, **kwargs) self._endpoint_host: str = endpoint_host.strip() - self._endpoint_port: str = str(endpoint_port).strip() - if endpoint_path is None: - self._endpoint_path: str = '' - else: - self._endpoint_path: str = endpoint_path.strip() - if self._endpoint_path[0] != '/': - self._endpoint_path = '/' + self._endpoint_path + self._endpoint_port = endpoint_port.strip() if isinstance(endpoint_port, str) else endpoint_port + self._endpoint_path: str = '' if endpoint_path is None else endpoint_path.strip() self._endpoint_uri = None @@ -801,6 +814,23 @@ class WebSocketClient(ConnectionContextClient[websockets.WebSocketClientProtocol over the websocket. """ + @classmethod + def get_endpoint_protocol_str(cls, use_secure_connection: bool = True) -> str: + """ + Get the protocol substring portion for valid connection URI strings for an instance of this class. + + Parameters + ---------- + use_secure_connection : bool + Whether to get the protocol substring applicable for secure connections (``True`` by default). + + Returns + ------- + str + The protocol substring portion for valid connection URI strings for an instance of this class. + """ + return 'wss' if use_secure_connection else 'ws' + async def _connection_recv(self) -> Optional[str]: """ Perform operations to receive data over already opened ::attribute:`connection`. @@ -852,8 +882,14 @@ def endpoint_uri(self) -> str: The endpoint for the client to connect to when opening a connection. """ if self._endpoint_uri is None: - proto = 'ws' if self.client_ssl_context is None else 'wss' - self._endpoint_uri = f"{proto}://{self._endpoint_host}:{self._endpoint_port}{self._endpoint_path}" + proto = self.get_endpoint_protocol_str(use_secure_connection=self.client_ssl_context is not None) + + if self._endpoint_path and self._endpoint_path[0] != '/': + path_str = '/' + self._endpoint_path + else: + path_str = self._endpoint_path + + self._endpoint_uri = f"{proto}://{self._endpoint_host}:{self._endpoint_port!s}{path_str}" return self._endpoint_uri async def listen(self) -> typing.Union[str, bytes]: From 3bc4cc685b5bd29a9b444515cddaaefa836ffe62 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Fri, 8 Sep 2023 15:37:23 -0400 Subject: [PATCH 27/34] Fix issues in client.py. Cleaning up TransportLayerClient __init__ to not pass var/keyword args to super (ABC), and removing abstract annotation from implemented method in WebSocketClient. --- python/lib/communication/dmod/communication/client.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/lib/communication/dmod/communication/client.py b/python/lib/communication/dmod/communication/client.py index 6b850d3e3..89c88580e 100644 --- a/python/lib/communication/dmod/communication/client.py +++ b/python/lib/communication/dmod/communication/client.py @@ -112,7 +112,7 @@ def __init__(self, endpoint_host: str, endpoint_port: Union[int, str], endpoint_ kwargs Other unused keyword parameters. """ - super().__init__(*args, **kwargs) + super().__init__() self._endpoint_host: str = endpoint_host.strip() self._endpoint_port = endpoint_port.strip() if isinstance(endpoint_port, str) else endpoint_port @@ -842,7 +842,6 @@ async def _connection_recv(self) -> Optional[str]: """ return await self.connection.recv() - @abstractmethod async def _connection_send(self, data: Union[str, bytearray]): """ Perform operations to send data over already opened ::attribute:`connection`. From 828a1eabc729bee5c709fcda356120f66921c53b Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Fri, 8 Sep 2023 15:38:48 -0400 Subject: [PATCH 28/34] Fix MockTransportLayerClient for unit tests. Fixing setup of scheduler client unit tests by adding implementation of this classmethod, declared as abstract in upstream superclass. --- python/lib/communication/dmod/test/test_scheduler_client.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/lib/communication/dmod/test/test_scheduler_client.py b/python/lib/communication/dmod/test/test_scheduler_client.py index b640d2eec..1fbd20aaf 100644 --- a/python/lib/communication/dmod/test/test_scheduler_client.py +++ b/python/lib/communication/dmod/test/test_scheduler_client.py @@ -9,6 +9,10 @@ class MockTransportLayerClient(TransportLayerClient): + @classmethod + def get_endpoint_protocol_str(cls, use_secure_connection: bool = True) -> str: + return "mock" + def __init__(self): super().__init__(endpoint_host='', endpoint_port=8888) From 4dce76361ecf29be1be30f73e73da3e85184a737 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Fri, 8 Sep 2023 15:55:59 -0400 Subject: [PATCH 29/34] Have transport client init only use keyword args. --- python/lib/communication/dmod/communication/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/lib/communication/dmod/communication/client.py b/python/lib/communication/dmod/communication/client.py index 89c88580e..7f936b845 100644 --- a/python/lib/communication/dmod/communication/client.py +++ b/python/lib/communication/dmod/communication/client.py @@ -77,9 +77,9 @@ def get_endpoint_protocol_str(cls, use_secure_connection: bool = True) -> str: """ pass - def __init__(self, endpoint_host: str, endpoint_port: Union[int, str], endpoint_path: Optional[str] = None, + def __init__(self, *, endpoint_host: str, endpoint_port: Union[int, str], endpoint_path: Optional[str] = None, cafile: Optional[Path] = None, capath: Optional[Path] = None, use_default_context: bool = False, - *args, **kwargs): + **kwargs): """ Initialize this instance. From 6d63be367176a40c05d9cc5bc6c437f66e56ecbb Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Fri, 8 Sep 2023 16:06:26 -0400 Subject: [PATCH 30/34] Remove public client_ssl_context property. Removing public property in TransportLayerClient and replacing with use of private attribute. --- .../communication/dmod/communication/client.py | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/python/lib/communication/dmod/communication/client.py b/python/lib/communication/dmod/communication/client.py index 7f936b845..f2a03045f 100644 --- a/python/lib/communication/dmod/communication/client.py +++ b/python/lib/communication/dmod/communication/client.py @@ -172,18 +172,6 @@ def endpoint_uri(self) -> str: """ pass - @property - def client_ssl_context(self) -> Optional[ssl.SSLContext]: - """ - The client SSL context for securing connections, if one was created. - - Returns - ------- - Optional[ssl.SSLContext] - The client SSL context for securing connections, if one was created; otherwise ``None``. - """ - return self._client_ssl_context - class AuthClient: """ @@ -868,7 +856,7 @@ async def _establish_connection(self) -> CONN: CONN A newly established connection. """ - return await websockets.connect(self.endpoint_uri, ssl=self.client_ssl_context) + return await websockets.connect(self.endpoint_uri, ssl=self._client_ssl_context) @property def endpoint_uri(self) -> str: @@ -881,7 +869,7 @@ def endpoint_uri(self) -> str: The endpoint for the client to connect to when opening a connection. """ if self._endpoint_uri is None: - proto = self.get_endpoint_protocol_str(use_secure_connection=self.client_ssl_context is not None) + proto = self.get_endpoint_protocol_str(use_secure_connection=self._client_ssl_context is not None) if self._endpoint_path and self._endpoint_path[0] != '/': path_str = '/' + self._endpoint_path From 583dbf32ebe1be6db89c6b6e275c143ecad4c005 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Fri, 8 Sep 2023 16:16:46 -0400 Subject: [PATCH 31/34] Refactor endpoint_uri property to private func. --- .../dmod/communication/client.py | 30 +++++++++---------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/python/lib/communication/dmod/communication/client.py b/python/lib/communication/dmod/communication/client.py index f2a03045f..8e63c66b1 100644 --- a/python/lib/communication/dmod/communication/client.py +++ b/python/lib/communication/dmod/communication/client.py @@ -128,6 +128,18 @@ def __init__(self, *, endpoint_host: str, endpoint_port: Union[int, str], endpoi else: self._client_ssl_context = None + @abstractmethod + def _get_endpoint_uri(self) -> str: + """ + Get the endpoint for the client to connect to when opening a connection. + + Returns + ------- + str + The endpoint for the client to connect to when opening a connection. + """ + pass + @abstractmethod async def async_send(self, data: Union[str, bytearray, bytes], await_response: bool = False) -> Optional[str]: """ @@ -159,19 +171,6 @@ async def async_recv(self) -> str: """ pass - @property - @abstractmethod - def endpoint_uri(self) -> str: - """ - The endpoint for the client to connect to when opening a connection. - - Returns - ------- - str - The endpoint for the client to connect to when opening a connection. - """ - pass - class AuthClient: """ @@ -856,10 +855,9 @@ async def _establish_connection(self) -> CONN: CONN A newly established connection. """ - return await websockets.connect(self.endpoint_uri, ssl=self._client_ssl_context) + return await websockets.connect(self._get_endpoint_uri(), ssl=self._client_ssl_context) - @property - def endpoint_uri(self) -> str: + def _get_endpoint_uri(self) -> str: """ The endpoint for the client to connect to when opening a connection. From 0aaf94a2a43656b892d5784cae4aafa8b9b32fd0 Mon Sep 17 00:00:00 2001 From: Robert Bartel <37884615+robertbartel@users.noreply.github.com> Date: Fri, 8 Sep 2023 16:29:40 -0400 Subject: [PATCH 32/34] Clarify that endpoint_host does not include protocol. Clarify that endpoint_host does not include protocol in docstring. Co-authored-by: Austin Raney --- python/lib/communication/dmod/communication/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lib/communication/dmod/communication/client.py b/python/lib/communication/dmod/communication/client.py index 8e63c66b1..42c7547f3 100644 --- a/python/lib/communication/dmod/communication/client.py +++ b/python/lib/communication/dmod/communication/client.py @@ -94,7 +94,7 @@ def __init__(self, *, endpoint_host: str, endpoint_port: Union[int, str], endpoi Parameters ---------- endpoint_host: str - The host component for building this client's endpoint URI for opening a connection. + The host component for building this client's endpoint URI for opening a connection. Does not include the protocol. The endpoint for the client to connect to when opening a connection. endpoint_port: Union[int, str] The host port component for building this client's endpoint URI for opening a connection. From a1bbb5ea2cdb4c087366b636476b97473efcc3cb Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Fri, 8 Sep 2023 16:32:55 -0400 Subject: [PATCH 33/34] Update RequestClient init to keyword args only. --- python/lib/communication/dmod/communication/client.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/python/lib/communication/dmod/communication/client.py b/python/lib/communication/dmod/communication/client.py index 42c7547f3..dd05b1cec 100644 --- a/python/lib/communication/dmod/communication/client.py +++ b/python/lib/communication/dmod/communication/client.py @@ -518,8 +518,10 @@ class RequestClient: two must be set for ::method:`async_make_request` to function. """ - def __init__(self, transport_client: TransportLayerClient, default_response_type: Optional[Type[Response]] = None, - *args, **kwargs): + def __init__(self, *, + transport_client: TransportLayerClient, + default_response_type: Optional[Type[Response]] = None, + **kwargs): """ Initialize. @@ -529,7 +531,6 @@ def __init__(self, transport_client: TransportLayerClient, default_response_type The client for handling the underlying raw OSI transport layer communications with the service. default_response_type: Optional[Type[Response]] Optional class type for responses, to use when no response class param is given when making a request. - args kwargs """ self._transport_client = transport_client From 92e58e81c91cce22a6c9820381a89a28bc6e897b Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Fri, 8 Sep 2023 16:33:12 -0400 Subject: [PATCH 34/34] Update scheduler client tests for endpoint uri. --- python/lib/communication/dmod/test/test_scheduler_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lib/communication/dmod/test/test_scheduler_client.py b/python/lib/communication/dmod/test/test_scheduler_client.py index 1fbd20aaf..2e834345c 100644 --- a/python/lib/communication/dmod/test/test_scheduler_client.py +++ b/python/lib/communication/dmod/test/test_scheduler_client.py @@ -60,7 +60,7 @@ def client_ssl_context(self) -> ssl.SSLContext: pass @property - def endpoint_uri(self) -> str: + def _get_endpoint_uri(self) -> str: return '' def set_client_response_none(self):