Skip to content

Commit

Permalink
allow any core message even if no session has been established (#67)
Browse files Browse the repository at this point in the history
  • Loading branch information
valentin-gauthier-geosiris authored Dec 15, 2023
1 parent 02e9062 commit 992a691
Show file tree
Hide file tree
Showing 6 changed files with 491 additions and 137 deletions.
1 change: 1 addition & 0 deletions etpproto/client_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class ClientInfo:
)
login: str = field(default="anonymousUser")
ip: str = field(default="0.0.0.0")
authenticated: bool = field(default=False)

def __post_init__(self):
self._id = self.count_instance
Expand Down
299 changes: 166 additions & 133 deletions etpproto/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
from etptypes.energistics.etp.v12.protocol.core.close_session import (
CloseSession,
)
from etptypes.energistics.etp.v12.protocol.core.authorize_response import (
AuthorizeResponse,
)
from etptypes.energistics.etp.v12.protocol.core.authorize import Authorize
from etptypes.energistics.etp.v12.protocol.core.open_session import OpenSession
from etptypes.energistics.etp.v12.protocol.core.request_session import (
RequestSession,
Expand All @@ -41,6 +45,8 @@
InvalidMessageError,
UnsupportedProtocolError,
NotSupportedError,
InvalidStateError,
AuthorizationRequired,
)
from etpproto.messages import Message
from etpproto.utils import ProtocolDict, get_all_etp_protocol_classes
Expand Down Expand Up @@ -136,6 +142,8 @@ class ETPConnection:

is_connected: bool = field(default=False)

auth_required: bool = field(default=False)

message_id: int = field(default=1)

# ______ __ __ ________ __
Expand Down Expand Up @@ -164,6 +172,7 @@ def __post_init__(self):
self.message_id = 1
elif self.connection_type == ConnectionType.CLIENT:
self.message_id = 2
self.auth_required = False # auth is only required on server side

def _handle_answer_and_error(
self,
Expand All @@ -189,158 +198,182 @@ async def _handle_message_generator(
if (
etp_input_msg is not None and etp_input_msg.header is not None
): # si pas un message none
if (
isinstance(etp_input_msg.body, RequestSession)
or isinstance(etp_input_msg.body, OpenSession)
or self.is_connected
if not self.auth_required or (
self.client_info is not None
and (
self.client_info.authenticated
or (
isinstance(etp_input_msg.body, Authorize)
or isinstance(etp_input_msg.body, AuthorizeResponse)
)
)
):
current_msg_id = etp_input_msg.header.message_id

# if requires acknowledge :
if etp_input_msg.is_asking_acknowledge() and not isinstance(
etp_input_msg.body, Acknowledge
if (
# isinstance(etp_input_msg.body, RequestSession)
# or isinstance(etp_input_msg.body, OpenSession)
etp_input_msg.header.protocol
== CommunicationProtocol.CORE.value
or self.is_connected
):
yield Message.get_object_message(
Acknowledge(),
correlation_id=current_msg_id,
msg_id=self.consume_msg_id(),
)
# time.sleep(3)

# only if the user is connected or request for an OpenSession or if the message is not the full message
current_msg_id = etp_input_msg.header.message_id

if self.is_connected and isinstance(
etp_input_msg.body, CloseSession
):
logging.debug(
f"{self.client_info.ip} : CloseSession recieved"
)
self.is_connected = False
else:
# Test if it is an Open/Request session
# if requires acknowledge :
if (
isinstance(etp_input_msg.body, RequestSession)
and self.connection_type == ConnectionType.SERVER
) or (
isinstance(etp_input_msg.body, OpenSession)
and self.connection_type == ConnectionType.CLIENT
etp_input_msg.is_asking_acknowledge()
and not isinstance(etp_input_msg.body, Acknowledge)
):
self.is_connected = True
self.client_info.negotiate(etp_input_msg.body)

# logging.debug(etp_input_msg, etp_input_msg.is_chunk_msg(), etp_input_msg.is_chunk_msg_referencer())
# On test si c'est un message de BLOB qu'il faut mettre en cache :
if etp_input_msg.is_multipart_msg() and (
etp_input_msg.is_chunk_msg()
or etp_input_msg.is_chunk_msg_referencer()
yield Message.get_object_message(
Acknowledge(),
correlation_id=current_msg_id,
msg_id=self.consume_msg_id(),
)
# time.sleep(3)

# only if the user is connected or request for an OpenSession or if the message is not the full message

if self.is_connected and isinstance(
etp_input_msg.body, CloseSession
):
cache_id = (
etp_input_msg.header.correlation_id
if etp_input_msg.header.correlation_id != 0
else etp_input_msg.header.message_id
logging.debug(
f"{self.client_info.ip} : CloseSession recieved"
)
if cache_id not in self.chunk_msg_cache:
self.chunk_msg_cache[cache_id] = []
self.chunk_msg_cache[cache_id].append(etp_input_msg)

# si final on rassemble et on handle.
if etp_input_msg.is_final_msg():
logging.debug(
f"Reassemble chunks :{self.chunk_msg_cache[cache_id]}",
self.is_connected = False
else:
# Test if it is an Open/Request session
if (
isinstance(etp_input_msg.body, RequestSession)
and self.connection_type == ConnectionType.SERVER
) or (
isinstance(etp_input_msg.body, OpenSession)
and self.connection_type == ConnectionType.CLIENT
):
self.is_connected = True
self.client_info.negotiate(etp_input_msg.body)

# logging.debug(etp_input_msg, etp_input_msg.is_chunk_msg(), etp_input_msg.is_chunk_msg_referencer())
# On test si c'est un message de BLOB qu'il faut mettre en cache :
if etp_input_msg.is_multipart_msg() and (
etp_input_msg.is_chunk_msg()
or etp_input_msg.is_chunk_msg_referencer()
):
cache_id = (
etp_input_msg.header.correlation_id
if etp_input_msg.header.correlation_id != 0
else etp_input_msg.header.message_id
)
if cache_id not in self.chunk_msg_cache:
self.chunk_msg_cache[cache_id] = []
self.chunk_msg_cache[cache_id].append(
etp_input_msg
)

# si final on rassemble et on handle.
if etp_input_msg.is_final_msg():
logging.debug(
f"Reassemble chunks :{self.chunk_msg_cache[cache_id]}",
)
try:
async for msg in self._handle_message_generator(
Message.reassemble_chunk(
self.chunk_msg_cache[cache_id]
)
):
if msg is not None:
yield msg
else:
if (
cache_id
not in self.error_msg_cache
):
self.error_msg_cache[
cache_id
] = []
self.error_msg_cache[
cache_id
].append(
InvalidMessageError().to_etp_message(
msg_id=self.consume_msg_id()
)
)

except Exception as e:
logging.error(
f"{self.client_info.ip}: _SERVER_ not handled exception",
)
raise e

if cache_id in self.error_msg_cache:
for err_msg in self.error_msg_cache[
cache_id
]:
if err_msg is not None:
yield err_msg
self.error_msg_cache.pop(cache_id)

if cache_id in self.chunk_msg_cache:
self.chunk_msg_cache.pop(cache_id)

else: # ce n'est pas un message envoye en chunks
# now try to have an answer
try:
async for msg in self._handle_message_generator(
Message.reassemble_chunk(
self.chunk_msg_cache[cache_id]
# Test si le protocol est supporte par le serveur
if (
CommunicationProtocol(
etp_input_msg.header.protocol
)
in self.transition_table
):
if msg is not None:
yield msg
else:
if (
cache_id
not in self.error_msg_cache
# demande la reponse au protocols du serveur
try:
async for handled in self.transition_table[
CommunicationProtocol(
etp_input_msg.header.protocol
)
].handle_message(
etp_object=etp_input_msg.body,
msg_header=etp_input_msg.header,
client_info=self.client_info,
):
self.error_msg_cache[cache_id] = []
self.error_msg_cache[cache_id].append(
InvalidMessageError().to_etp_message(
msg_id=self.consume_msg_id()
yield self._handle_answer_and_error(
msg=handled,
req_msg=etp_input_msg,
request_msg_id=current_msg_id,
)
except ETPError as exp_invalid_msg_type:
yield exp_invalid_msg_type.to_etp_message(
msg_id=self.consume_msg_id(),
correlation_id=current_msg_id,
)

except Exception as e:
else:
logging.debug(
f"{self.client_info.ip} : #handle_msg : unkown protocol id : {str(etp_input_msg.header.protocol)}"
)
raise UnsupportedProtocolError(
etp_input_msg.header.protocol
)
except ETPError as etp_err:
logging.error(
f"{self.client_info.ip}: _SERVER_ not handled exception",
f"{self.client_info.ip}: _SERVER_ internal error : {etp_err}"
)
raise e

if cache_id in self.error_msg_cache:
for err_msg in self.error_msg_cache[cache_id]:
if err_msg is not None:
yield err_msg
self.error_msg_cache.pop(cache_id)

if cache_id in self.chunk_msg_cache:
self.chunk_msg_cache.pop(cache_id)

else: # ce n'est pas un message envoye en chunks
# now try to have an answer
try:
# Test si le protocol est supporte par le serveur
if (
CommunicationProtocol(
etp_input_msg.header.protocol
)
in self.transition_table
):
# demande la reponse au protocols du serveur
try:
async for handled in self.transition_table[
CommunicationProtocol(
etp_input_msg.header.protocol
)
].handle_message(
etp_object=etp_input_msg.body,
msg_header=etp_input_msg.header,
client_info=self.client_info,
):
yield self._handle_answer_and_error(
msg=handled,
req_msg=etp_input_msg,
request_msg_id=current_msg_id,
)
except ETPError as exp_invalid_msg_type:
yield exp_invalid_msg_type.to_etp_message(
yield self._handle_answer_and_error(
msg=etp_err.to_etp_message(
msg_id=self.consume_msg_id(),
correlation_id=current_msg_id,
)

else:
logging.debug(
f"{self.client_info.ip} : #handle_msg : unkown protocol id : {str(etp_input_msg.header.protocol)}"
),
req_msg=etp_input_msg,
request_msg_id=current_msg_id,
)
raise UnsupportedProtocolError(
etp_input_msg.header.protocol
except Exception as e:
logging.error(
f"{self.client_info.ip}: _SERVER_ not handled exception",
)
except ETPError as etp_err:
logging.error(
f"{self.client_info.ip}: _SERVER_ internal error : {etp_err}"
)
yield self._handle_answer_and_error(
msg=etp_err.to_etp_message(
msg_id=self.consume_msg_id(),
correlation_id=current_msg_id,
),
req_msg=etp_input_msg,
request_msg_id=current_msg_id,
)
except Exception as e:
logging.error(
f"{self.client_info.ip}: _SERVER_ not handled exception",
)
raise e
else: # not connected
yield InvalidMessageError().to_etp_message(
raise e
else: # not connected
yield InvalidStateError().to_etp_message(
msg_id=self.consume_msg_id()
)
else: # not authenticated
yield AuthorizationRequired().to_etp_message(
msg_id=self.consume_msg_id()
)
else: # null message
Expand Down Expand Up @@ -445,7 +478,7 @@ def get_supported_protocol_list(
) -> List[SupportedProtocol]:
supported_protocols: List[SupportedProtocol] = []
for protocol in cls.transition_table:
if protocol.value != protocol.CORE:
if protocol.value != CommunicationProtocol.CORE:
supported_protocols.append(
SupportedProtocol(
protocol=protocol.value,
Expand Down
Loading

0 comments on commit 992a691

Please sign in to comment.