Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/subscriptionmgr indexerror #186

Merged
merged 3 commits into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added
- On consumer side: Option to configure the time to wait until the event sink is started,
this avoids invalid subscription requests [#147](https://github.com/Draegerwerk/sdc11073/issues/147)

### Changed
- Error handling for invalid subscription requests - sending faults specified in WS-Eventing [#147](https://github.com/Draegerwerk/sdc11073/issues/147)

## [1.1.26] - 2023-06-08

### Added
Expand Down
6 changes: 3 additions & 3 deletions src/sdc11073/pysoap/soapenvelope.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,10 @@ def mkEndpointReference(idstring):


_LANGUAGE_ATTR = '{http://www.w3.org/XML/1998/namespace}lang'
MODE_PUSH = '{}/DeliveryModes/Push'.format(Prefix.WSE.namespace)


class WsSubscribe(object):
MODE_PUSH = '{}/DeliveryModes/Push'.format(Prefix.WSE.namespace)
__slots__ = ('delivery_mode', 'notifyTo', 'endTo', 'expires', 'filter')
def __init__(self, notifyTo,
expires,
Expand All @@ -223,9 +223,9 @@ def __init__(self, notifyTo,
@param notifyTo: a WsaEndpointReferenceType
@param expires: duration in seconds ( absolute date not supported)
@param endTo: a WsaEndpointReferenceType or None
@param delivery_mode: defaults to self.MODE_PUSH
@param delivery_mode: defaults to MODE_PUSH
"""
self.delivery_mode = delivery_mode or self.MODE_PUSH
self.delivery_mode = delivery_mode or MODE_PUSH
self.notifyTo = notifyTo
self.endTo = endTo
self.expires = expires
Expand Down
36 changes: 21 additions & 15 deletions src/sdc11073/sdcclient/sdcclientimpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ def client(self, porttypename):
""" returns the client for the given port type name.
WDP and SDC use different port type names, e.g. WPF="Get", SDC="GetService".
If the port type is not found directly, it tries also with or without "Service" in name.
:param porttypename: string, e.g "Get", or "GetService", ...
@param porttypename: string, e.g "Get", or "GetService", ...
"""
client = self._serviceClients.get(porttypename)
if client is None and porttypename.endswith('Service'):
Expand Down Expand Up @@ -338,22 +338,24 @@ def subscription_mgr(self):
return self._subscriptionMgr

def startAll(self, notSubscribedActions=None, subscriptionsCheckInterval=None, async_dispatch=True,
subscribe_periodic_reports=False):
subscribe_periodic_reports=False, dispatcher_timeout=15.0):
"""
:param notSubscribedActions: a list of pmtypes.Actions elements or None. if None, everything is subscribed.
:param subscriptionsCheckInterval: an interval in seconds or None
:param async_dispatch: if True, incoming requests are queued and response is sent immediately (processing is done later).
@param notSubscribedActions: a list of pmtypes.Actions elements or None. if None, everything is subscribed.
@param subscriptionsCheckInterval: an interval in seconds or None
@param async_dispatch: if True, incoming requests are queued and response is sent immediately (processing is done later).
if False, response is sent after the complete processing is done.
:param subscribe_periodic_reports: boolean
:return: None
@param subscribe_periodic_reports: boolean
@param dispatcher_timeout: time to wait for the event sink aka. dispatcher thread to be started, if timeout is
exceeded a RuntimeError is raised
@return: None
"""
self.discoverHostedServices()
self._startEventSink(async_dispatch)
periodic_actions = set([self.sdc_definitions.Actions.PeriodicMetricReport,
self.sdc_definitions.Actions.PeriodicAlertReport,
self.sdc_definitions.Actions.PeriodicComponentReport,
self.sdc_definitions.Actions.PeriodicContextReport,
self.sdc_definitions.Actions.PeriodicOperationalStateReport])
self._startEventSink(async_dispatch, dispatcher_timeout=dispatcher_timeout)
periodic_actions = {self.sdc_definitions.Actions.PeriodicMetricReport,
self.sdc_definitions.Actions.PeriodicAlertReport,
self.sdc_definitions.Actions.PeriodicComponentReport,
self.sdc_definitions.Actions.PeriodicContextReport,
self.sdc_definitions.Actions.PeriodicOperationalStateReport}
# start subscription manager
self._subscriptionMgr = subscription.SubscriptionManager(self._notificationsDispatcherThread.base_url,
log_prefix=self.log_prefix,
Expand Down Expand Up @@ -508,7 +510,7 @@ def _mkHostedServiceClient(self, porttype, soapClient, hosted):
cls = self._servicesLookup.get(porttype, HostedServiceClient)
return cls(soapClient, hosted, porttype, self.sdc_definitions, self.log_prefix)

def _startEventSink(self, async_dispatch):
def _startEventSink(self, async_dispatch, dispatcher_timeout=15.0):
if self._sslEvents == 'auto':
sslContext = self._sslContext if self._device_uses_https else None
elif self._sslEvents: # True
Expand All @@ -527,7 +529,11 @@ def _startEventSink(self, async_dispatch):
async_dispatch=async_dispatch)

self._notificationsDispatcherThread.start()
self._notificationsDispatcherThread.started_evt.wait(timeout=5)
event_is_set = self._notificationsDispatcherThread.started_evt.wait(timeout=dispatcher_timeout)
if not event_is_set:
self._logger.error('Cannot start consumer, start event of EventSink not set.')
raise RuntimeError('Cannot start consumer, start event of EventSink not set.')

self._logger.info('serving EventSink on {}', self._notificationsDispatcherThread.base_url)

def _stopEventSink(self, closeAllConnections):
Expand Down
29 changes: 28 additions & 1 deletion src/sdc11073/sdcdevice/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
from ..pysoap.soapenvelope import SoapFault, SoapFaultCode, AdressingFault
from ..pysoap.soapenvelope import AdressingFault
from ..pysoap.soapenvelope import MODE_PUSH
from ..pysoap.soapenvelope import SoapFault
from ..pysoap.soapenvelope import SoapFaultCode


class HTTPRequestHandlingError(Exception):
""" This class is used to communicate errors from http request handlers back to http server."""

def __init__(self, status, reason, soapfault):
"""
@param status: integer, e.g. 404
Expand Down Expand Up @@ -38,3 +44,24 @@ def __init__(self, request, path):
code=SoapFaultCode.SENDER,
reason='invalid path {}'.format(path))
super().__init__(400, 'Bad Request', fault.as_xml())


class InvalidMessageError(HTTPRequestHandlingError):
def __init__(self, request, detail):
fault = AdressingFault(request,
code=SoapFaultCode.SENDER,
reason='The message is not valid and cannot be processed.',
details='Detail: {} - The invalid message: {}'.format(detail,
request.rawdata.decode("utf-8")))
super().__init__(400, 'Bad Request', fault.as_xml())


class DeliveryModeRequestedUnavailableError(HTTPRequestHandlingError):
def __init__(self, request, detail=None):
if detail is None:
detail = f"The only supported mode: {MODE_PUSH}"
fault = AdressingFault(request,
code=SoapFaultCode.SENDER,
reason='The requested delivery mode is not supported.',
details=detail)
super().__init__(400, 'Bad Request', fault.as_xml())
4 changes: 2 additions & 2 deletions src/sdc11073/sdcdevice/sdcservicesimpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from ..namespaces import msgTag, domTag, s12Tag, wsxTag, wseTag, dpwsTag, mdpwsTag, nsmap
from .. import pmtypes
from .. import loghelper
from .exceptions import InvalidActionError, FunctionNotImplementedError
from .exceptions import InvalidActionError, FunctionNotImplementedError, InvalidMessageError
_msg_prefix = Prefix.MSG.prefix

_wsdl_ns = Prefix.WSDL.namespace
Expand Down Expand Up @@ -151,7 +151,7 @@ def _onSubscribe(self, httpHeader, soapEnvelope):
"//wse:Filter[@Dialect='{}/Action']".format(Prefix.DPWS.namespace),
namespaces=nsmap)
if len(subscriptionFilters) != 1:
raise Exception
raise InvalidMessageError(request=soapEnvelope, detail="wse:Subscribe/wse:Filter not provided")
else:
sfilters = subscriptionFilters[0].text
for sfilter in sfilters.split():
Expand Down
51 changes: 34 additions & 17 deletions src/sdc11073/sdcdevice/subscriptionmgr.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,32 @@
import uuid
import time
import copy
import http.client
import socket
import time
import traceback
from collections import deque, defaultdict
import urllib
import http.client
import uuid
from collections import defaultdict
from collections import deque

from lxml import etree as etree_
from ..namespaces import xmlTag, wseTag, wsaTag, msgTag, nsmap, DocNamespaceHelper
from ..namespaces import Prefix_Namespace as Prefix
from .. import pysoap

from .exceptions import DeliveryModeRequestedUnavailableError
from .exceptions import InvalidMessageError
from .. import isoduration
from .. import xmlparsing
from .. import observableproperties
from .. import multikey
from .. import loghelper
from .. import multikey
from .. import observableproperties
from .. import pysoap
from .. import xmlparsing
from ..compression import CompressionHandler
from ..namespaces import DocNamespaceHelper
from ..namespaces import Prefix_Namespace as Prefix
from ..namespaces import msgTag
from ..namespaces import nsmap
from ..namespaces import wsaTag
from ..namespaces import wseTag
from ..namespaces import xmlTag


WsAddress = pysoap.soapenvelope.WsAddress
Soap12Envelope = pysoap.soapenvelope.Soap12Envelope
Expand Down Expand Up @@ -258,24 +268,31 @@ def fromSoapEnvelope(cls, soapEnvelope, sslContext, acceptedEncodings, max_subsc
endToAddresses = soapEnvelope.bodyNode.xpath('wse:Subscribe/wse:EndTo', namespaces=nsmap)
if len(endToAddresses) == 1:
endToNode = endToAddresses[0]
endToAddress = endToNode.xpath('wsa:Address/text()', namespaces=nsmap)[0]
endToAddress = endToNode.xpath('wsa:Address/text()', namespaces=nsmap)
if not endToAddress:
raise InvalidMessageError(request=soapEnvelope, detail="wse:Subscribe/wse:EndTo/wsa:Address not set")
endToAddress = endToAddress[0]
endToRefNode = endToNode.find('wsa:ReferenceParameters', namespaces=nsmap)

# determine (mandatory) notification address
deliveryNode = soapEnvelope.bodyNode.xpath('wse:Subscribe/wse:Delivery', namespaces=nsmap)[0]
notifyToNode = deliveryNode.find('wse:NotifyTo', namespaces=nsmap)
if notifyToNode is None:
raise InvalidMessageError(request=soapEnvelope, detail="wse:Subscribe/wse:Delivery/wse:NotifyTo not set")
notifyToAddress = notifyToNode.xpath('wsa:Address/text()', namespaces=nsmap)[0]
notifyRefNode = notifyToNode.find('wsa:ReferenceParameters', namespaces=nsmap)

mode = deliveryNode.get('Mode') # mandatory attribute
mode = deliveryNode.get('Mode', pysoap.soapenvelope.MODE_PUSH)
if mode != pysoap.soapenvelope.MODE_PUSH:
raise DeliveryModeRequestedUnavailableError(request=soapEnvelope)

expiresNodes = soapEnvelope.bodyNode.xpath('wse:Subscribe/wse:Expires/text()', namespaces=nsmap)
if len(expiresNodes) == 0:
expires = None
else:
expires = isoduration.parse_duration(str(expiresNodes[0]))
expires = isoduration.parse_duration(str(expiresNodes[0])) if expiresNodes else None

filter_ = soapEnvelope.bodyNode.xpath('wse:Subscribe/wse:Filter/text()', namespaces=nsmap)[0]
filter_ = soapEnvelope.bodyNode.xpath('wse:Subscribe/wse:Filter/text()', namespaces=nsmap)
if not filter_:
raise InvalidMessageError(request=soapEnvelope, detail="wse:Subscribe/wse:Filter not set")
filter_ = filter_[0]

return cls(str(mode), base_urls, notifyToAddress, notifyRefNode, endToAddress, endToRefNode,
expires, max_subscription_duration, str(filter_), sslContext, acceptedEncodings)
Expand Down