From debae3d4f4cf19f21e5af359b9bdc8c436fb0cc8 Mon Sep 17 00:00:00 2001 From: Joel Bender Date: Wed, 4 Jul 2018 17:51:12 -0400 Subject: [PATCH] version 0.17.3 released --- py25/bacpypes/__init__.py | 2 +- py25/bacpypes/appservice.py | 2 +- py25/bacpypes/bvllservice.py | 303 +++++++++ py25/bacpypes/comm.py | 104 +++ py25/bacpypes/iocb.py | 6 +- py25/bacpypes/netservice.py | 18 +- py25/bacpypes/service/cov.py | 118 ++-- py27/bacpypes/__init__.py | 2 +- py27/bacpypes/analysis.py | 9 +- py27/bacpypes/appservice.py | 2 +- py27/bacpypes/bvllservice.py | 302 +++++++++ py27/bacpypes/comm.py | 103 +++ py27/bacpypes/iocb.py | 6 +- py27/bacpypes/netservice.py | 21 +- py27/bacpypes/service/cov.py | 115 ++-- py34/bacpypes/__init__.py | 2 +- py34/bacpypes/analysis.py | 9 +- py34/bacpypes/appservice.py | 2 +- py34/bacpypes/bvllservice.py | 302 +++++++++ py34/bacpypes/comm.py | 103 +++ py34/bacpypes/iocb.py | 6 +- py34/bacpypes/netservice.py | 21 +- py34/bacpypes/service/cov.py | 115 ++-- samples/DeviceCommunicationControl.py | 6 +- samples/DeviceDiscovery.py | 6 +- samples/DeviceDiscoveryForeign.py | 6 +- samples/EventNotifications.py | 265 ++++++++ samples/IP2VLANRouter.py | 12 +- samples/NATRouter.py | 0 samples/ReadProperty.py | 6 +- samples/ReadProperty25.py | 6 +- samples/ReadWriteEventMessageTexts.py | 6 +- samples/ReadWriteProperty.py | 6 +- samples/Tutorial/WhoIsIAm.py | 6 +- samples/WhoIsIAm.py | 6 +- samples/WhoIsIAmForeign.py | 6 +- samples/switch_demo.py | 137 ++++ sandbox/threading_1.py | 50 ++ sandbox/threading_2.py | 68 ++ sandbox/threading_3.py | 68 ++ sandbox/threading_4.py | 75 ++ sandbox/threading_5.py | 75 ++ test_script.sh | 68 ++ tests/state_machine.py | 42 +- tests/test_service/helpers.py | 134 +++- tests/test_service/test_cov.py | 753 ++++++++++++++++++++- tests/test_service/test_device.py | 54 +- tests/test_utilities/test_state_machine.py | 26 +- 48 files changed, 3275 insertions(+), 285 deletions(-) create mode 100755 samples/EventNotifications.py mode change 100644 => 100755 samples/NATRouter.py create mode 100644 samples/switch_demo.py create mode 100644 sandbox/threading_1.py create mode 100644 sandbox/threading_2.py create mode 100644 sandbox/threading_3.py create mode 100644 sandbox/threading_4.py create mode 100644 sandbox/threading_5.py create mode 100755 test_script.sh diff --git a/py25/bacpypes/__init__.py b/py25/bacpypes/__init__.py index 8e201b5b..dbb45643 100755 --- a/py25/bacpypes/__init__.py +++ b/py25/bacpypes/__init__.py @@ -18,7 +18,7 @@ # Project Metadata # -__version__ = '0.17.2' +__version__ = '0.17.3' __author__ = 'Joel Bender' __email__ = 'joel@carrickbender.com' diff --git a/py25/bacpypes/appservice.py b/py25/bacpypes/appservice.py index b30a7e8a..b411c02a 100755 --- a/py25/bacpypes/appservice.py +++ b/py25/bacpypes/appservice.py @@ -1525,7 +1525,7 @@ def confirmation(self, apdu): xpdu = atype() xpdu.decode(apdu) except Exception, err: - ApplicationServiceAccessPoint._exception("unconfirmed request decoding error: %r", err) + ApplicationServiceAccessPoint._exception("complex ack decoding error: %r", err) return elif isinstance(apdu, ErrorPDU): diff --git a/py25/bacpypes/bvllservice.py b/py25/bacpypes/bvllservice.py index 2588f4c0..e90f53b8 100755 --- a/py25/bacpypes/bvllservice.py +++ b/py25/bacpypes/bvllservice.py @@ -1007,6 +1007,309 @@ def delete_peer(self, addr): bacpypes_debugging(BIPBBMD) +# +# BIPNAT +# + +class BIPNAT(BIPSAP, Client, Server, RecurringTask, DebugContents): + + _debug_contents = ('bbmdAddress', 'bbmdBDT+', 'bbmdFDT+') + + def __init__(self, addr, sapID=None, cid=None, sid=None): + """A BBMD node that is the destination for NATed traffic.""" + if _debug: BIPNAT._debug("__init__ %r sapID=%r cid=%r sid=%r", addr, sapID, cid, sid) + BIPSAP.__init__(self, sapID) + Client.__init__(self, cid) + Server.__init__(self, sid) + RecurringTask.__init__(self, 1000.0) + + self.bbmdAddress = addr + self.bbmdBDT = [] + self.bbmdFDT = [] + + # install so process_task runs + self.install_task() + + def indication(self, pdu): + if _debug: BIPNAT._debug("indication %r", pdu) + + # check for local stations + if pdu.pduDestination.addrType == Address.localStationAddr: + ###TODO the destination should be a peer or a registered foreign device + + # make an original unicast PDU + xpdu = OriginalUnicastNPDU(pdu, user_data=pdu.pduUserData) + xpdu.pduDestination = pdu.pduDestination + if _debug: BIPNAT._debug(" - xpdu: %r", xpdu) + + # send it downstream + self.request(xpdu) + + # check for broadcasts + elif pdu.pduDestination.addrType == Address.localBroadcastAddr: + # make a forwarded PDU + xpdu = ForwardedNPDU(self.bbmdAddress, pdu, user_data=pdu.pduUserData) + if _debug: BIPNAT._debug(" - forwarded xpdu: %r", xpdu) + + # send it to the peers, all of them have all F's mask + for bdte in self.bbmdBDT: + if bdte != self.bbmdAddress: + xpdu.pduDestination = Address((bdte.addrIP, bdte.addrPort)) + BIPNAT._debug(" - sending to peer: %r", xpdu.pduDestination) + self.request(xpdu) + + # send it to the registered foreign devices + for fdte in self.bbmdFDT: + xpdu.pduDestination = fdte.fdAddress + if _debug: BIPNAT._debug(" - sending to foreign device: %r", xpdu.pduDestination) + self.request(xpdu) + + else: + BIPNAT._warning("invalid destination address: %r", pdu.pduDestination) + + def confirmation(self, pdu): + if _debug: BIPNAT._debug("confirmation %r", pdu) + + # some kind of response to a request + if isinstance(pdu, Result): + # send this to the service access point + self.sap_response(pdu) + + elif isinstance(pdu, WriteBroadcastDistributionTable): + ###TODO verify this is from a management network/address + + # build a response + xpdu = Result(code=99, user_data=pdu.pduUserData) + xpdu.pduDestination = pdu.pduSource + + # send it downstream + self.request(xpdu) + + elif isinstance(pdu, ReadBroadcastDistributionTable): + ###TODO verify this is from a management network/address + + # build a response + xpdu = ReadBroadcastDistributionTableAck(self.bbmdBDT, user_data=pdu.pduUserData) + xpdu.pduDestination = pdu.pduSource + if _debug: BIPNAT._debug(" - xpdu: %r", xpdu) + + # send it downstream + self.request(xpdu) + + elif isinstance(pdu, ReadBroadcastDistributionTableAck): + # send this to the service access point + self.sap_response(pdu) + + elif isinstance(pdu, ForwardedNPDU): + ###TODO verify this is from a peer + + # build a PDU with the source from the real source + xpdu = PDU(pdu.pduData, source=pdu.bvlciAddress, destination=LocalBroadcast(), user_data=pdu.pduUserData) + if _debug: BIPNAT._debug(" - upstream xpdu: %r", xpdu) + + # send it upstream + self.response(xpdu) + + # build a forwarded NPDU to send out + xpdu = ForwardedNPDU(pdu.bvlciAddress, pdu, destination=None, user_data=pdu.pduUserData) + if _debug: BIPNAT._debug(" - forwarded xpdu: %r", xpdu) + + # send it to the registered foreign devices + for fdte in self.bbmdFDT: + xpdu.pduDestination = fdte.fdAddress + if _debug: BIPNAT._debug(" - sending to foreign device: %r", xpdu.pduDestination) + self.request(xpdu) + + elif isinstance(pdu, RegisterForeignDevice): + ###TODO verify this is from an acceptable address + + # process the request + stat = self.register_foreign_device(pdu.pduSource, pdu.bvlciTimeToLive) + + # build a response + xpdu = Result(code=stat, destination=pdu.pduSource, user_data=pdu.pduUserData) + if _debug: BIPNAT._debug(" - xpdu: %r", xpdu) + + # send it downstream + self.request(xpdu) + + elif isinstance(pdu, ReadForeignDeviceTable): + ###TODO verify this is from a management network/address + + # build a response + xpdu = ReadForeignDeviceTableAck(self.bbmdFDT, destination=pdu.pduSource, user_data=pdu.pduUserData) + if _debug: BIPNAT._debug(" - xpdu: %r", xpdu) + + # send it downstream + self.request(xpdu) + + elif isinstance(pdu, ReadForeignDeviceTableAck): + # send this to the service access point + self.sap_response(pdu) + + elif isinstance(pdu, DeleteForeignDeviceTableEntry): + ###TODO verify this is from a management network/address + + # process the request + stat = self.delete_foreign_device_table_entry(pdu.bvlciAddress) + + # build a response + xpdu = Result(code=stat, user_data=pdu.pduUserData) + xpdu.pduDestination = pdu.pduSource + if _debug: BIPNAT._debug(" - xpdu: %r", xpdu) + + # send it downstream + self.request(xpdu) + + elif isinstance(pdu, DistributeBroadcastToNetwork): + ###TODO verify this is from a registered foreign device + + # build a PDU with a local broadcast address + xpdu = PDU(pdu.pduData, source=pdu.pduSource, destination=LocalBroadcast(), user_data=pdu.pduUserData) + if _debug: BIPNAT._debug(" - upstream xpdu: %r", xpdu) + + # send it upstream + self.response(xpdu) + + # build a forwarded NPDU to send out + xpdu = ForwardedNPDU(pdu.pduSource, pdu, user_data=pdu.pduUserData) + if _debug: BIPNAT._debug(" - forwarded xpdu: %r", xpdu) + + # send it to the peers + for bdte in self.bbmdBDT: + if bdte == self.bbmdAddress: + if _debug: BIPNAT._debug(" - no local broadcast") + else: + xpdu.pduDestination = Address((bdte.addrIP, bdte.addrPort)) + if _debug: BIPNAT._debug(" - sending to peer: %r", xpdu.pduDestination) + self.request(xpdu) + + # send it to the other registered foreign devices + for fdte in self.bbmdFDT: + if fdte.fdAddress != pdu.pduSource: + xpdu.pduDestination = fdte.fdAddress + if _debug: BIPNAT._debug(" - sending to foreign device: %r", xpdu.pduDestination) + self.request(xpdu) + + elif isinstance(pdu, OriginalUnicastNPDU): + ###TODO verify this is from a peer + + # build a vanilla PDU + xpdu = PDU(pdu.pduData, source=pdu.pduSource, destination=pdu.pduDestination, user_data=pdu.pduUserData) + if _debug: BIPNAT._debug(" - upstream xpdu: %r", xpdu) + + # send it upstream + self.response(xpdu) + + elif isinstance(pdu, OriginalBroadcastNPDU): + if _debug: BIPNAT._debug(" - original broadcast dropped") + + else: + BIPNAT._warning("invalid pdu type: %s", type(pdu)) + + def register_foreign_device(self, addr, ttl): + """Add a foreign device to the FDT.""" + if _debug: BIPNAT._debug("register_foreign_device %r %r", addr, ttl) + + # see if it is an address or make it one + if isinstance(addr, Address): + pass + elif isinstance(addr, str): + addr = LocalStation( addr ) + else: + raise TypeError("addr must be a string or an Address") + + for fdte in self.bbmdFDT: + if addr == fdte.fdAddress: + break + else: + fdte = FDTEntry() + fdte.fdAddress = addr + self.bbmdFDT.append( fdte ) + + fdte.fdTTL = ttl + fdte.fdRemain = ttl + 5 + + # return success + return 0 + + def delete_foreign_device_table_entry(self, addr): + if _debug: BIPNAT._debug("delete_foreign_device_table_entry %r", addr) + + # see if it is an address or make it one + if isinstance(addr, Address): + pass + elif isinstance(addr, str): + addr = LocalStation( addr ) + else: + raise TypeError("addr must be a string or an Address") + + # find it and delete it + stat = 0 + for i in range(len(self.bbmdFDT)-1, -1, -1): + if addr == self.bbmdFDT[i].fdAddress: + del self.bbmdFDT[i] + break + else: + stat = 99 ### entry not found + + # return status + return stat + + def process_task(self): + # look for foreign device registrations that have expired + for i in range(len(self.bbmdFDT)-1, -1, -1): + fdte = self.bbmdFDT[i] + fdte.fdRemain -= 1 + + # delete it if it expired + if fdte.fdRemain <= 0: + if _debug: BIPNAT._debug("foreign device expired: %r", fdte.fdAddress) + del self.bbmdFDT[i] + + def add_peer(self, addr): + if _debug: BIPNAT._debug("add_peer %r", addr) + + # see if it is an address or make it one + if isinstance(addr, Address): + pass + elif isinstance(addr, str): + addr = LocalStation(addr) + else: + raise TypeError("addr must be a string or an Address") + + # if it's this BBMD, make it the first one + if self.bbmdBDT and (addr == self.bbmdAddress): + raise RuntimeError("add self to BDT as first address") + + # see if it's already there + for bdte in self.bbmdBDT: + if addr == bdte: + break + else: + self.bbmdBDT.append(addr) + + def delete_peer(self, addr): + if _debug: BIPNAT._debug("delete_peer %r", addr) + + # see if it is an address or make it one + if isinstance(addr, Address): + pass + elif isinstance(addr, str): + addr = LocalStation(addr) + else: + raise TypeError("addr must be a string or an Address") + + # look for the peer address + for i in range(len(self.bbmdBDT)-1, -1, -1): + if addr == self.bbmdBDT[i]: + del self.bbmdBDT[i] + break + else: + pass + +bacpypes_debugging(BIPNAT) + # # BVLLServiceElement # diff --git a/py25/bacpypes/comm.py b/py25/bacpypes/comm.py index bea40a64..27b7f4e7 100755 --- a/py25/bacpypes/comm.py +++ b/py25/bacpypes/comm.py @@ -383,6 +383,110 @@ def indication(self, *args, **kwargs): bacpypes_debugging(Echo) +# +# Switch +# + +class Switch(Client, Server): + + """ + A Switch is a client and server that wraps around clients and/or servers + and provides a way to switch between them without unbinding and rebinding + the stack. + """ + + class TerminalWrapper(Client, Server): + + def __init__(self, switch, terminal): + self.switch = switch + self.terminal = terminal + + if isinstance(terminal, Server): + bind(self, terminal) + if isinstance(terminal, Client): + bind(terminal, self) + + def indication(self, *args, **kwargs): + self.switch.request(*args, **kwargs) + + def confirmation(self, *args, **kwargs): + self.switch.response(*args, **kwargs) + + def __init__(self, **terminals): + if _debug: Switch._debug("__init__ %r", terminals) + + Client.__init__(self) + Server.__init__(self) + + # wrap the terminals + self.terminals = {k:Switch.TerminalWrapper(self, v) for k, v in terminals.items()} + self.current_terminal = None + + def __getitem__(self, key): + if key not in self.terminals: + raise KeyError("%r not a terminal" % (key,)) + + # return the terminal, not the wrapper + return self.terminals[key].terminal + + def __setitem__(self, key, term): + if key in self.terminals: + raise KeyError("%r already a terminal" % (key,)) + + # build a wrapper and map it + self.terminals[key] = Switch.TerminalWrapper(self, term) + + def __delitem__(self, key): + if key not in self.terminals: + raise KeyError("%r not a terminal" % (key,)) + + # if deleting current terminal, deactivate it + term = self.terminals[key].terminal + if term is self.current_terminal: + terminal_deactivate = getattr(self.current_terminal, 'deactivate', None) + if terminal_deactivate: + terminal_deactivate() + self.current_terminal = None + + del self.terminals[key] + + def switch_terminal(self, key): + if key not in self.terminals: + raise KeyError("%r not a terminal" % (key,)) + + if self.current_terminal: + terminal_deactivate = getattr(self.current_terminal, 'deactivate', None) + if terminal_deactivate: + terminal_deactivate() + + if key is None: + self.current_terminal = None + else: + self.current_terminal = self.terminals[key].terminal + terminal_activate = getattr(self.current_terminal, 'activate', None) + if terminal_activate: + terminal_activate() + + def indication(self, *args, **kwargs): + """Downstream packet, send to current terminal.""" + if not self.current_terminal: + raise RuntimeError("no active terminal") + if not isinstance(self.current_terminal, Server): + raise RuntimeError("current terminal not a server") + + self.current_terminal.indication(*args, **kwargs) + + def confirmation(self, *args, **kwargs): + """Upstream packet, send to current terminal.""" + if not self.current_terminal: + raise RuntimeError("no active terminal") + if not isinstance(self.current_terminal, Client): + raise RuntimeError("current terminal not a client") + + self.current_terminal.confirmation(*args, **kwargs) + +bacpypes_debugging(Switch) + # # ServiceAccessPoint # diff --git a/py25/bacpypes/iocb.py b/py25/bacpypes/iocb.py index 6906d3c4..fadee885 100644 --- a/py25/bacpypes/iocb.py +++ b/py25/bacpypes/iocb.py @@ -140,12 +140,12 @@ def add_callback(self, fn, *args, **kwargs): if self.ioComplete.isSet(): self.trigger() - def wait(self, *args): + def wait(self, *args, **kwargs): """Wait for the completion event to be set.""" - if _debug: IOCB._debug("wait(%d) %r", self.ioID, args) + if _debug: IOCB._debug("wait(%d) %r %r", self.ioID, args, kwargs) # waiting from a non-daemon thread could be trouble - self.ioComplete.wait(*args) + return self.ioComplete.wait(*args, **kwargs) def trigger(self): """Set the completion event and make the callback(s).""" diff --git a/py25/bacpypes/netservice.py b/py25/bacpypes/netservice.py index 9cbf5313..97278ff4 100755 --- a/py25/bacpypes/netservice.py +++ b/py25/bacpypes/netservice.py @@ -776,7 +776,7 @@ def WhoIsRouterToNetwork(self, adapter, npdu): self.response(adapter, iamrtn) else: - if _debug: NetworkServiceElement._debug(" - forwarding request to other adapters") + if _debug: NetworkServiceElement._debug(" - forwarding to other adapters") # build a request whoisrtn = WhoIsRouterToNetwork(dnet, user_data=npdu.pduUserData) @@ -806,19 +806,21 @@ def IAmRouterToNetwork(self, adapter, npdu): sap.add_router_references(adapter.adapterNet, npdu.pduSource, npdu.iartnNetworkList) # skip if this is not a router - if len(sap.adapters) > 1: + if len(sap.adapters) == 1: + if _debug: NetworkServiceElement._debug(" - not a router") + + else: + if _debug: NetworkServiceElement._debug(" - forwarding to other adapters") + # build a broadcast annoucement iamrtn = IAmRouterToNetwork(npdu.iartnNetworkList, user_data=npdu.pduUserData) iamrtn.pduDestination = LocalBroadcast() # send it to all of the connected adapters for xadapter in sap.adapters.values(): - # skip the horse it rode in on - if (xadapter is adapter): - continue - - # request this - self.request(xadapter, iamrtn) + if xadapter is not adapter: + if _debug: NetworkServiceElement._debug(" - sending on adapter: %r", xadapter) + self.request(xadapter, iamrtn) # look for pending NPDUs for the networks for dnet in npdu.iartnNetworkList: diff --git a/py25/bacpypes/service/cov.py b/py25/bacpypes/service/cov.py index 9e179ab7..3e79c9cb 100644 --- a/py25/bacpypes/service/cov.py +++ b/py25/bacpypes/service/cov.py @@ -7,6 +7,7 @@ from ..debugging import bacpypes_debugging, DebugContents, ModuleLogger from ..capability import Capability +from ..core import deferred from ..task import OneShotTask, TaskManager from ..iocb import IOCB @@ -172,8 +173,8 @@ def execute(self): # something changed, send out the notifications self.send_cov_notifications() - def send_cov_notifications(self): - if _debug: COVDetection._debug("send_cov_notifications") + def send_cov_notifications(self, subscription=None): + if _debug: COVDetection._debug("send_cov_notifications %r", subscription) # check for subscriptions if not len(self.cov_subscriptions): @@ -206,8 +207,15 @@ def send_cov_notifications(self): list_of_values.append(property_value) if _debug: COVDetection._debug(" - list_of_values: %r", list_of_values) + # if the specific subscription was provided, that is the notification + # list, otherwise send it to all of them + if subscription is not None: + notification_list = [subscription] + else: + notification_list = self.cov_subscriptions + # loop through the subscriptions and send out notifications - for cov in self.cov_subscriptions: + for cov in notification_list: if _debug: COVDetection._debug(" - cov: %s", repr(cov)) # calculate time remaining @@ -436,49 +444,48 @@ def ReadProperty(self, obj, arrayIndex=None): # start with an empty sequence cov_subscriptions = ListOf(COVSubscription)() - # loop through the object and detection list - for obj, cov_detection in obj._app.cov_detections.items(): - for cov in cov_detection.cov_subscriptions: - # calculate time remaining - if not cov.lifetime: - time_remaining = 0 - else: - time_remaining = int(cov.taskTime - current_time) - - # make sure it is at least one second - if not time_remaining: - time_remaining = 1 - - recipient = Recipient( - address=DeviceAddress( - networkNumber=cov.client_addr.addrNet or 0, - macAddress=cov.client_addr.addrAddr, - ), - ) - if _debug: ActiveCOVSubscriptions._debug(" - recipient: %r", recipient) - if _debug: ActiveCOVSubscriptions._debug(" - client MAC address: %r", cov.client_addr.addrAddr) - - recipient_process = RecipientProcess( - recipient=recipient, - processIdentifier=cov.proc_id, - ) - if _debug: ActiveCOVSubscriptions._debug(" - recipient_process: %r", recipient_process) - - cov_subscription = COVSubscription( - recipient=recipient_process, - monitoredPropertyReference=ObjectPropertyReference( - objectIdentifier=cov.obj_id, - propertyIdentifier=cov_detection.monitored_property_reference, - ), - issueConfirmedNotifications=cov.confirmed, - timeRemaining=time_remaining, - ) - if hasattr(cov_detection, 'covIncrement'): - cov_subscription.covIncrement = cov_detection.covIncrement - if _debug: ActiveCOVSubscriptions._debug(" - cov_subscription: %r", cov_subscription) - - # add the list - cov_subscriptions.append(cov_subscription) + # loop through the subscriptions + for cov in obj._app.subscriptions(): + # calculate time remaining + if not cov.lifetime: + time_remaining = 0 + else: + time_remaining = int(cov.taskTime - current_time) + + # make sure it is at least one second + if not time_remaining: + time_remaining = 1 + + recipient = Recipient( + address=DeviceAddress( + networkNumber=cov.client_addr.addrNet or 0, + macAddress=cov.client_addr.addrAddr, + ), + ) + if _debug: ActiveCOVSubscriptions._debug(" - recipient: %r", recipient) + if _debug: ActiveCOVSubscriptions._debug(" - client MAC address: %r", cov.client_addr.addrAddr) + + recipient_process = RecipientProcess( + recipient=recipient, + processIdentifier=cov.proc_id, + ) + if _debug: ActiveCOVSubscriptions._debug(" - recipient_process: %r", recipient_process) + + cov_subscription = COVSubscription( + recipient=recipient_process, + monitoredPropertyReference=ObjectPropertyReference( + objectIdentifier=cov.obj_id, + propertyIdentifier=cov_detection.monitored_property_reference, + ), + issueConfirmedNotifications=cov.confirmed, + timeRemaining=time_remaining, + ) + if hasattr(cov_detection, 'covIncrement'): + cov_subscription.covIncrement = cov_detection.covIncrement + if _debug: ActiveCOVSubscriptions._debug(" - cov_subscription: %r", cov_subscription) + + # add the list + cov_subscriptions.append(cov_subscription) return cov_subscriptions @@ -536,6 +543,19 @@ def cancel_subscription(self, cov): # delete it from the object map del self.cov_detections[cov.obj_ref] + def subscriptions(self): + """Generator for the active subscriptions.""" + if _debug: ChangeOfValueServices._debug("subscriptions") + + subscription_list = [] + + # loop through the object and detection list + for obj, cov_detection in self.cov_detections.items(): + for cov in cov_detection.cov_subscriptions: + subscription_list.append(cov) + + return subscription_list + def cov_notification(self, cov, request): if _debug: ChangeOfValueServices._debug("cov_notification %s %s", str(cov), str(request)) @@ -651,5 +671,11 @@ def do_SubscribeCOVRequest(self, apdu): # return the result self.response(response) + # if the subscription is not being canceled, it is new or renewed, + # so send it a notification when you get a chance. + if not cancel_subscription: + if _debug: ChangeOfValueServices._debug(" - send a notification") + deferred(cov_detection.send_cov_notifications, cov) + bacpypes_debugging(ChangeOfValueServices) diff --git a/py27/bacpypes/__init__.py b/py27/bacpypes/__init__.py index 8e201b5b..dbb45643 100755 --- a/py27/bacpypes/__init__.py +++ b/py27/bacpypes/__init__.py @@ -18,7 +18,7 @@ # Project Metadata # -__version__ = '0.17.2' +__version__ = '0.17.3' __author__ = 'Joel Bender' __email__ = 'joel@carrickbender.com' diff --git a/py27/bacpypes/analysis.py b/py27/bacpypes/analysis.py index 0840c6ac..96fbd19a 100755 --- a/py27/bacpypes/analysis.py +++ b/py27/bacpypes/analysis.py @@ -355,16 +355,17 @@ def decode_file(fname): if not pcap: raise RuntimeError("failed to import pcap") - # create a pcap object + # create a pcap object, reading from the file p = pcap.pcap(fname) - for timestamp, data in p: + # loop through the packets + for i, (timestamp, data) in enumerate(p): pkt = decode_packet(data) if not pkt: continue - # save the index and timestamp in the packet - # pkt._index = i + # save the packet number (as viewed in Wireshark) and timestamp + pkt._number = i + 1 pkt._timestamp = timestamp yield pkt diff --git a/py27/bacpypes/appservice.py b/py27/bacpypes/appservice.py index 641e6554..7adace61 100755 --- a/py27/bacpypes/appservice.py +++ b/py27/bacpypes/appservice.py @@ -1522,7 +1522,7 @@ def confirmation(self, apdu): xpdu = atype() xpdu.decode(apdu) except Exception as err: - ApplicationServiceAccessPoint._exception("unconfirmed request decoding error: %r", err) + ApplicationServiceAccessPoint._exception("complex ack decoding error: %r", err) return elif isinstance(apdu, ErrorPDU): diff --git a/py27/bacpypes/bvllservice.py b/py27/bacpypes/bvllservice.py index a1147149..8cc4ab88 100755 --- a/py27/bacpypes/bvllservice.py +++ b/py27/bacpypes/bvllservice.py @@ -1000,6 +1000,308 @@ def delete_peer(self, addr): else: pass +# +# BIPNAT +# + +@bacpypes_debugging +class BIPNAT(BIPSAP, Client, Server, RecurringTask, DebugContents): + + _debug_contents = ('bbmdAddress', 'bbmdBDT+', 'bbmdFDT+') + + def __init__(self, addr, sapID=None, cid=None, sid=None): + """A BBMD node that is the destination for NATed traffic.""" + if _debug: BIPNAT._debug("__init__ %r sapID=%r cid=%r sid=%r", addr, sapID, cid, sid) + BIPSAP.__init__(self, sapID) + Client.__init__(self, cid) + Server.__init__(self, sid) + RecurringTask.__init__(self, 1000.0) + + self.bbmdAddress = addr + self.bbmdBDT = [] + self.bbmdFDT = [] + + # install so process_task runs + self.install_task() + + def indication(self, pdu): + if _debug: BIPNAT._debug("indication %r", pdu) + + # check for local stations + if pdu.pduDestination.addrType == Address.localStationAddr: + ###TODO the destination should be a peer or a registered foreign device + + # make an original unicast PDU + xpdu = OriginalUnicastNPDU(pdu, user_data=pdu.pduUserData) + xpdu.pduDestination = pdu.pduDestination + if _debug: BIPNAT._debug(" - xpdu: %r", xpdu) + + # send it downstream + self.request(xpdu) + + # check for broadcasts + elif pdu.pduDestination.addrType == Address.localBroadcastAddr: + # make a forwarded PDU + xpdu = ForwardedNPDU(self.bbmdAddress, pdu, user_data=pdu.pduUserData) + if _debug: BIPNAT._debug(" - forwarded xpdu: %r", xpdu) + + # send it to the peers, all of them have all F's mask + for bdte in self.bbmdBDT: + if bdte != self.bbmdAddress: + xpdu.pduDestination = Address((bdte.addrIP, bdte.addrPort)) + BIPNAT._debug(" - sending to peer: %r", xpdu.pduDestination) + self.request(xpdu) + + # send it to the registered foreign devices + for fdte in self.bbmdFDT: + xpdu.pduDestination = fdte.fdAddress + if _debug: BIPNAT._debug(" - sending to foreign device: %r", xpdu.pduDestination) + self.request(xpdu) + + else: + BIPNAT._warning("invalid destination address: %r", pdu.pduDestination) + + def confirmation(self, pdu): + if _debug: BIPNAT._debug("confirmation %r", pdu) + + # some kind of response to a request + if isinstance(pdu, Result): + # send this to the service access point + self.sap_response(pdu) + + elif isinstance(pdu, WriteBroadcastDistributionTable): + ###TODO verify this is from a management network/address + + # build a response + xpdu = Result(code=99, user_data=pdu.pduUserData) + xpdu.pduDestination = pdu.pduSource + + # send it downstream + self.request(xpdu) + + elif isinstance(pdu, ReadBroadcastDistributionTable): + ###TODO verify this is from a management network/address + + # build a response + xpdu = ReadBroadcastDistributionTableAck(self.bbmdBDT, user_data=pdu.pduUserData) + xpdu.pduDestination = pdu.pduSource + if _debug: BIPNAT._debug(" - xpdu: %r", xpdu) + + # send it downstream + self.request(xpdu) + + elif isinstance(pdu, ReadBroadcastDistributionTableAck): + # send this to the service access point + self.sap_response(pdu) + + elif isinstance(pdu, ForwardedNPDU): + ###TODO verify this is from a peer + + # build a PDU with the source from the real source + xpdu = PDU(pdu.pduData, source=pdu.bvlciAddress, destination=LocalBroadcast(), user_data=pdu.pduUserData) + if _debug: BIPNAT._debug(" - upstream xpdu: %r", xpdu) + + # send it upstream + self.response(xpdu) + + # build a forwarded NPDU to send out + xpdu = ForwardedNPDU(pdu.bvlciAddress, pdu, destination=None, user_data=pdu.pduUserData) + if _debug: BIPNAT._debug(" - forwarded xpdu: %r", xpdu) + + # send it to the registered foreign devices + for fdte in self.bbmdFDT: + xpdu.pduDestination = fdte.fdAddress + if _debug: BIPNAT._debug(" - sending to foreign device: %r", xpdu.pduDestination) + self.request(xpdu) + + elif isinstance(pdu, RegisterForeignDevice): + ###TODO verify this is from an acceptable address + + # process the request + stat = self.register_foreign_device(pdu.pduSource, pdu.bvlciTimeToLive) + + # build a response + xpdu = Result(code=stat, destination=pdu.pduSource, user_data=pdu.pduUserData) + if _debug: BIPNAT._debug(" - xpdu: %r", xpdu) + + # send it downstream + self.request(xpdu) + + elif isinstance(pdu, ReadForeignDeviceTable): + ###TODO verify this is from a management network/address + + # build a response + xpdu = ReadForeignDeviceTableAck(self.bbmdFDT, destination=pdu.pduSource, user_data=pdu.pduUserData) + if _debug: BIPNAT._debug(" - xpdu: %r", xpdu) + + # send it downstream + self.request(xpdu) + + elif isinstance(pdu, ReadForeignDeviceTableAck): + # send this to the service access point + self.sap_response(pdu) + + elif isinstance(pdu, DeleteForeignDeviceTableEntry): + ###TODO verify this is from a management network/address + + # process the request + stat = self.delete_foreign_device_table_entry(pdu.bvlciAddress) + + # build a response + xpdu = Result(code=stat, user_data=pdu.pduUserData) + xpdu.pduDestination = pdu.pduSource + if _debug: BIPNAT._debug(" - xpdu: %r", xpdu) + + # send it downstream + self.request(xpdu) + + elif isinstance(pdu, DistributeBroadcastToNetwork): + ###TODO verify this is from a registered foreign device + + # build a PDU with a local broadcast address + xpdu = PDU(pdu.pduData, source=pdu.pduSource, destination=LocalBroadcast(), user_data=pdu.pduUserData) + if _debug: BIPNAT._debug(" - upstream xpdu: %r", xpdu) + + # send it upstream + self.response(xpdu) + + # build a forwarded NPDU to send out + xpdu = ForwardedNPDU(pdu.pduSource, pdu, user_data=pdu.pduUserData) + if _debug: BIPNAT._debug(" - forwarded xpdu: %r", xpdu) + + # send it to the peers + for bdte in self.bbmdBDT: + if bdte == self.bbmdAddress: + if _debug: BIPNAT._debug(" - no local broadcast") + else: + xpdu.pduDestination = Address((bdte.addrIP, bdte.addrPort)) + if _debug: BIPNAT._debug(" - sending to peer: %r", xpdu.pduDestination) + self.request(xpdu) + + # send it to the other registered foreign devices + for fdte in self.bbmdFDT: + if fdte.fdAddress != pdu.pduSource: + xpdu.pduDestination = fdte.fdAddress + if _debug: BIPNAT._debug(" - sending to foreign device: %r", xpdu.pduDestination) + self.request(xpdu) + + elif isinstance(pdu, OriginalUnicastNPDU): + ###TODO verify this is from a peer + + # build a vanilla PDU + xpdu = PDU(pdu.pduData, source=pdu.pduSource, destination=pdu.pduDestination, user_data=pdu.pduUserData) + if _debug: BIPNAT._debug(" - upstream xpdu: %r", xpdu) + + # send it upstream + self.response(xpdu) + + elif isinstance(pdu, OriginalBroadcastNPDU): + if _debug: BIPNAT._debug(" - original broadcast dropped") + + else: + BIPNAT._warning("invalid pdu type: %s", type(pdu)) + + def register_foreign_device(self, addr, ttl): + """Add a foreign device to the FDT.""" + if _debug: BIPNAT._debug("register_foreign_device %r %r", addr, ttl) + + # see if it is an address or make it one + if isinstance(addr, Address): + pass + elif isinstance(addr, str): + addr = LocalStation( addr ) + else: + raise TypeError("addr must be a string or an Address") + + for fdte in self.bbmdFDT: + if addr == fdte.fdAddress: + break + else: + fdte = FDTEntry() + fdte.fdAddress = addr + self.bbmdFDT.append( fdte ) + + fdte.fdTTL = ttl + fdte.fdRemain = ttl + 5 + + # return success + return 0 + + def delete_foreign_device_table_entry(self, addr): + if _debug: BIPNAT._debug("delete_foreign_device_table_entry %r", addr) + + # see if it is an address or make it one + if isinstance(addr, Address): + pass + elif isinstance(addr, str): + addr = LocalStation( addr ) + else: + raise TypeError("addr must be a string or an Address") + + # find it and delete it + stat = 0 + for i in range(len(self.bbmdFDT)-1, -1, -1): + if addr == self.bbmdFDT[i].fdAddress: + del self.bbmdFDT[i] + break + else: + stat = 99 ### entry not found + + # return status + return stat + + def process_task(self): + # look for foreign device registrations that have expired + for i in range(len(self.bbmdFDT)-1, -1, -1): + fdte = self.bbmdFDT[i] + fdte.fdRemain -= 1 + + # delete it if it expired + if fdte.fdRemain <= 0: + if _debug: BIPNAT._debug("foreign device expired: %r", fdte.fdAddress) + del self.bbmdFDT[i] + + def add_peer(self, addr): + if _debug: BIPNAT._debug("add_peer %r", addr) + + # see if it is an address or make it one + if isinstance(addr, Address): + pass + elif isinstance(addr, str): + addr = LocalStation(addr) + else: + raise TypeError("addr must be a string or an Address") + + # if it's this BBMD, make it the first one + if self.bbmdBDT and (addr == self.bbmdAddress): + raise RuntimeError("add self to BDT as first address") + + # see if it's already there + for bdte in self.bbmdBDT: + if addr == bdte: + break + else: + self.bbmdBDT.append(addr) + + def delete_peer(self, addr): + if _debug: BIPNAT._debug("delete_peer %r", addr) + + # see if it is an address or make it one + if isinstance(addr, Address): + pass + elif isinstance(addr, str): + addr = LocalStation(addr) + else: + raise TypeError("addr must be a string or an Address") + + # look for the peer address + for i in range(len(self.bbmdBDT)-1, -1, -1): + if addr == self.bbmdBDT[i]: + del self.bbmdBDT[i] + break + else: + pass + # # BVLLServiceElement # diff --git a/py27/bacpypes/comm.py b/py27/bacpypes/comm.py index dfef7aae..dcec5472 100755 --- a/py27/bacpypes/comm.py +++ b/py27/bacpypes/comm.py @@ -376,6 +376,109 @@ def indication(self, *args, **kwargs): self.response(*args, **kwargs) +# +# Switch +# + +@bacpypes_debugging +class Switch(Client, Server): + + """ + A Switch is a client and server that wraps around clients and/or servers + and provides a way to switch between them without unbinding and rebinding + the stack. + """ + + class TerminalWrapper(Client, Server): + + def __init__(self, switch, terminal): + self.switch = switch + self.terminal = terminal + + if isinstance(terminal, Server): + bind(self, terminal) + if isinstance(terminal, Client): + bind(terminal, self) + + def indication(self, *args, **kwargs): + self.switch.request(*args, **kwargs) + + def confirmation(self, *args, **kwargs): + self.switch.response(*args, **kwargs) + + def __init__(self, **terminals): + if _debug: Switch._debug("__init__ %r", terminals) + + Client.__init__(self) + Server.__init__(self) + + # wrap the terminals + self.terminals = {k:Switch.TerminalWrapper(self, v) for k, v in terminals.items()} + self.current_terminal = None + + def __getitem__(self, key): + if key not in self.terminals: + raise KeyError("%r not a terminal" % (key,)) + + # return the terminal, not the wrapper + return self.terminals[key].terminal + + def __setitem__(self, key, term): + if key in self.terminals: + raise KeyError("%r already a terminal" % (key,)) + + # build a wrapper and map it + self.terminals[key] = Switch.TerminalWrapper(self, term) + + def __delitem__(self, key): + if key not in self.terminals: + raise KeyError("%r not a terminal" % (key,)) + + # if deleting current terminal, deactivate it + term = self.terminals[key].terminal + if term is self.current_terminal: + terminal_deactivate = getattr(self.current_terminal, 'deactivate', None) + if terminal_deactivate: + terminal_deactivate() + self.current_terminal = None + + del self.terminals[key] + + def switch_terminal(self, key): + if key not in self.terminals: + raise KeyError("%r not a terminal" % (key,)) + + if self.current_terminal: + terminal_deactivate = getattr(self.current_terminal, 'deactivate', None) + if terminal_deactivate: + terminal_deactivate() + + if key is None: + self.current_terminal = None + else: + self.current_terminal = self.terminals[key].terminal + terminal_activate = getattr(self.current_terminal, 'activate', None) + if terminal_activate: + terminal_activate() + + def indication(self, *args, **kwargs): + """Downstream packet, send to current terminal.""" + if not self.current_terminal: + raise RuntimeError("no active terminal") + if not isinstance(self.current_terminal, Server): + raise RuntimeError("current terminal not a server") + + self.current_terminal.indication(*args, **kwargs) + + def confirmation(self, *args, **kwargs): + """Upstream packet, send to current terminal.""" + if not self.current_terminal: + raise RuntimeError("no active terminal") + if not isinstance(self.current_terminal, Client): + raise RuntimeError("current terminal not a client") + + self.current_terminal.confirmation(*args, **kwargs) + # # ServiceAccessPoint # diff --git a/py27/bacpypes/iocb.py b/py27/bacpypes/iocb.py index db96cba9..730d383d 100644 --- a/py27/bacpypes/iocb.py +++ b/py27/bacpypes/iocb.py @@ -141,12 +141,12 @@ def add_callback(self, fn, *args, **kwargs): if self.ioComplete.isSet(): self.trigger() - def wait(self, *args): + def wait(self, *args, **kwargs): """Wait for the completion event to be set.""" - if _debug: IOCB._debug("wait(%d) %r", self.ioID, args) + if _debug: IOCB._debug("wait(%d) %r %r", self.ioID, args, kwargs) # waiting from a non-daemon thread could be trouble - self.ioComplete.wait(*args) + return self.ioComplete.wait(*args, **kwargs) def trigger(self): """Set the completion event and make the callback(s).""" diff --git a/py27/bacpypes/netservice.py b/py27/bacpypes/netservice.py index 835bbf0f..64c5bfb4 100755 --- a/py27/bacpypes/netservice.py +++ b/py27/bacpypes/netservice.py @@ -762,6 +762,9 @@ def WhoIsRouterToNetwork(self, adapter, npdu): if router_net not in sap.adapters: if _debug: NetworkServiceElement._debug(" - path error (6)") return + if sap.adapters[router_net] is adapter: + if _debug: NetworkServiceElement._debug(" - same network") + return # build a response iamrtn = IAmRouterToNetwork([dnet], user_data=npdu.pduUserData) @@ -771,7 +774,7 @@ def WhoIsRouterToNetwork(self, adapter, npdu): self.response(adapter, iamrtn) else: - if _debug: NetworkServiceElement._debug(" - forwarding request to other adapters") + if _debug: NetworkServiceElement._debug(" - forwarding to other adapters") # build a request whoisrtn = WhoIsRouterToNetwork(dnet, user_data=npdu.pduUserData) @@ -801,19 +804,21 @@ def IAmRouterToNetwork(self, adapter, npdu): sap.add_router_references(adapter.adapterNet, npdu.pduSource, npdu.iartnNetworkList) # skip if this is not a router - if len(sap.adapters) > 1: + if len(sap.adapters) == 1: + if _debug: NetworkServiceElement._debug(" - not a router") + + else: + if _debug: NetworkServiceElement._debug(" - forwarding to other adapters") + # build a broadcast annoucement iamrtn = IAmRouterToNetwork(npdu.iartnNetworkList, user_data=npdu.pduUserData) iamrtn.pduDestination = LocalBroadcast() # send it to all of the connected adapters for xadapter in sap.adapters.values(): - # skip the horse it rode in on - if (xadapter is adapter): - continue - - # request this - self.request(xadapter, iamrtn) + if xadapter is not adapter: + if _debug: NetworkServiceElement._debug(" - sending on adapter: %r", xadapter) + self.request(xadapter, iamrtn) # look for pending NPDUs for the networks for dnet in npdu.iartnNetworkList: diff --git a/py27/bacpypes/service/cov.py b/py27/bacpypes/service/cov.py index 7e14cb70..0bb07d5a 100644 --- a/py27/bacpypes/service/cov.py +++ b/py27/bacpypes/service/cov.py @@ -7,6 +7,7 @@ from ..debugging import bacpypes_debugging, DebugContents, ModuleLogger from ..capability import Capability +from ..core import deferred from ..task import OneShotTask, TaskManager from ..iocb import IOCB @@ -172,8 +173,8 @@ def execute(self): # something changed, send out the notifications self.send_cov_notifications() - def send_cov_notifications(self): - if _debug: COVDetection._debug("send_cov_notifications") + def send_cov_notifications(self, subscription=None): + if _debug: COVDetection._debug("send_cov_notifications %r", subscription) # check for subscriptions if not len(self.cov_subscriptions): @@ -206,8 +207,15 @@ def send_cov_notifications(self): list_of_values.append(property_value) if _debug: COVDetection._debug(" - list_of_values: %r", list_of_values) + # if the specific subscription was provided, that is the notification + # list, otherwise send it to all of them + if subscription is not None: + notification_list = [subscription] + else: + notification_list = self.cov_subscriptions + # loop through the subscriptions and send out notifications - for cov in self.cov_subscriptions: + for cov in notification_list: if _debug: COVDetection._debug(" - cov: %s", repr(cov)) # calculate time remaining @@ -434,49 +442,48 @@ def ReadProperty(self, obj, arrayIndex=None): # start with an empty sequence cov_subscriptions = ListOf(COVSubscription)() - # loop through the object and detection list - for obj, cov_detection in obj._app.cov_detections.items(): - for cov in cov_detection.cov_subscriptions: - # calculate time remaining - if not cov.lifetime: - time_remaining = 0 - else: - time_remaining = int(cov.taskTime - current_time) - - # make sure it is at least one second - if not time_remaining: - time_remaining = 1 - - recipient = Recipient( - address=DeviceAddress( - networkNumber=cov.client_addr.addrNet or 0, - macAddress=cov.client_addr.addrAddr, - ), - ) - if _debug: ActiveCOVSubscriptions._debug(" - recipient: %r", recipient) - if _debug: ActiveCOVSubscriptions._debug(" - client MAC address: %r", cov.client_addr.addrAddr) - - recipient_process = RecipientProcess( - recipient=recipient, - processIdentifier=cov.proc_id, - ) - if _debug: ActiveCOVSubscriptions._debug(" - recipient_process: %r", recipient_process) - - cov_subscription = COVSubscription( - recipient=recipient_process, - monitoredPropertyReference=ObjectPropertyReference( - objectIdentifier=cov.obj_id, - propertyIdentifier=cov_detection.monitored_property_reference, - ), - issueConfirmedNotifications=cov.confirmed, - timeRemaining=time_remaining, - ) - if hasattr(cov_detection, 'covIncrement'): - cov_subscription.covIncrement = cov_detection.covIncrement - if _debug: ActiveCOVSubscriptions._debug(" - cov_subscription: %r", cov_subscription) - - # add the list - cov_subscriptions.append(cov_subscription) + # loop through the subscriptions + for cov in obj._app.subscriptions(): + # calculate time remaining + if not cov.lifetime: + time_remaining = 0 + else: + time_remaining = int(cov.taskTime - current_time) + + # make sure it is at least one second + if not time_remaining: + time_remaining = 1 + + recipient = Recipient( + address=DeviceAddress( + networkNumber=cov.client_addr.addrNet or 0, + macAddress=cov.client_addr.addrAddr, + ), + ) + if _debug: ActiveCOVSubscriptions._debug(" - recipient: %r", recipient) + if _debug: ActiveCOVSubscriptions._debug(" - client MAC address: %r", cov.client_addr.addrAddr) + + recipient_process = RecipientProcess( + recipient=recipient, + processIdentifier=cov.proc_id, + ) + if _debug: ActiveCOVSubscriptions._debug(" - recipient_process: %r", recipient_process) + + cov_subscription = COVSubscription( + recipient=recipient_process, + monitoredPropertyReference=ObjectPropertyReference( + objectIdentifier=cov.obj_id, + propertyIdentifier=cov_detection.monitored_property_reference, + ), + issueConfirmedNotifications=cov.confirmed, + timeRemaining=time_remaining, + ) + if hasattr(cov_detection, 'covIncrement'): + cov_subscription.covIncrement = cov_detection.covIncrement + if _debug: ActiveCOVSubscriptions._debug(" - cov_subscription: %r", cov_subscription) + + # add the list + cov_subscriptions.append(cov_subscription) return cov_subscriptions @@ -533,6 +540,15 @@ def cancel_subscription(self, cov): # delete it from the object map del self.cov_detections[cov.obj_ref] + def subscriptions(self): + """Generator for the active subscriptions.""" + if _debug: ChangeOfValueServices._debug("subscriptions") + + # loop through the object and detection list + for obj, cov_detection in self.cov_detections.items(): + for cov in cov_detection.cov_subscriptions: + yield cov + def cov_notification(self, cov, request): if _debug: ChangeOfValueServices._debug("cov_notification %s %s", str(cov), str(request)) @@ -647,3 +663,10 @@ def do_SubscribeCOVRequest(self, apdu): # return the result self.response(response) + + # if the subscription is not being canceled, it is new or renewed, + # so send it a notification when you get a chance. + if not cancel_subscription: + if _debug: ChangeOfValueServices._debug(" - send a notification") + deferred(cov_detection.send_cov_notifications, cov) + diff --git a/py34/bacpypes/__init__.py b/py34/bacpypes/__init__.py index f556be27..bddb1342 100755 --- a/py34/bacpypes/__init__.py +++ b/py34/bacpypes/__init__.py @@ -18,7 +18,7 @@ # Project Metadata # -__version__ = '0.17.2' +__version__ = '0.17.3' __author__ = 'Joel Bender' __email__ = 'joel@carrickbender.com' diff --git a/py34/bacpypes/analysis.py b/py34/bacpypes/analysis.py index 7b64a3f0..2fe2aec4 100755 --- a/py34/bacpypes/analysis.py +++ b/py34/bacpypes/analysis.py @@ -355,16 +355,17 @@ def decode_file(fname): if not pcap: raise RuntimeError("failed to import pcap") - # create a pcap object + # create a pcap object, reading from the file p = pcap.pcap(fname) - for timestamp, data in p: + # loop through the packets + for i, (timestamp, data) in enumerate(p): pkt = decode_packet(data) if not pkt: continue - # save the index and timestamp in the packet - # pkt._index = i + # save the packet number (as viewed in Wireshark) and timestamp + pkt._number = i + 1 pkt._timestamp = timestamp yield pkt diff --git a/py34/bacpypes/appservice.py b/py34/bacpypes/appservice.py index 43316f13..75591f79 100755 --- a/py34/bacpypes/appservice.py +++ b/py34/bacpypes/appservice.py @@ -1522,7 +1522,7 @@ def confirmation(self, apdu): xpdu = atype() xpdu.decode(apdu) except Exception as err: - ApplicationServiceAccessPoint._exception("unconfirmed request decoding error: %r", err) + ApplicationServiceAccessPoint._exception("complex ack decoding error: %r", err) return elif isinstance(apdu, ErrorPDU): diff --git a/py34/bacpypes/bvllservice.py b/py34/bacpypes/bvllservice.py index 577deb72..a2a2248b 100755 --- a/py34/bacpypes/bvllservice.py +++ b/py34/bacpypes/bvllservice.py @@ -999,6 +999,308 @@ def delete_peer(self, addr): else: pass +# +# BIPNAT +# + +@bacpypes_debugging +class BIPNAT(BIPSAP, Client, Server, RecurringTask, DebugContents): + + _debug_contents = ('bbmdAddress', 'bbmdBDT+', 'bbmdFDT+') + + def __init__(self, addr, sapID=None, cid=None, sid=None): + """A BBMD node that is the destination for NATed traffic.""" + if _debug: BIPNAT._debug("__init__ %r sapID=%r cid=%r sid=%r", addr, sapID, cid, sid) + BIPSAP.__init__(self, sapID) + Client.__init__(self, cid) + Server.__init__(self, sid) + RecurringTask.__init__(self, 1000.0) + + self.bbmdAddress = addr + self.bbmdBDT = [] + self.bbmdFDT = [] + + # install so process_task runs + self.install_task() + + def indication(self, pdu): + if _debug: BIPNAT._debug("indication %r", pdu) + + # check for local stations + if pdu.pduDestination.addrType == Address.localStationAddr: + ###TODO the destination should be a peer or a registered foreign device + + # make an original unicast PDU + xpdu = OriginalUnicastNPDU(pdu, user_data=pdu.pduUserData) + xpdu.pduDestination = pdu.pduDestination + if _debug: BIPNAT._debug(" - xpdu: %r", xpdu) + + # send it downstream + self.request(xpdu) + + # check for broadcasts + elif pdu.pduDestination.addrType == Address.localBroadcastAddr: + # make a forwarded PDU + xpdu = ForwardedNPDU(self.bbmdAddress, pdu, user_data=pdu.pduUserData) + if _debug: BIPNAT._debug(" - forwarded xpdu: %r", xpdu) + + # send it to the peers, all of them have all F's mask + for bdte in self.bbmdBDT: + if bdte != self.bbmdAddress: + xpdu.pduDestination = Address((bdte.addrIP, bdte.addrPort)) + BIPNAT._debug(" - sending to peer: %r", xpdu.pduDestination) + self.request(xpdu) + + # send it to the registered foreign devices + for fdte in self.bbmdFDT: + xpdu.pduDestination = fdte.fdAddress + if _debug: BIPNAT._debug(" - sending to foreign device: %r", xpdu.pduDestination) + self.request(xpdu) + + else: + BIPNAT._warning("invalid destination address: %r", pdu.pduDestination) + + def confirmation(self, pdu): + if _debug: BIPNAT._debug("confirmation %r", pdu) + + # some kind of response to a request + if isinstance(pdu, Result): + # send this to the service access point + self.sap_response(pdu) + + elif isinstance(pdu, WriteBroadcastDistributionTable): + ###TODO verify this is from a management network/address + + # build a response + xpdu = Result(code=99, user_data=pdu.pduUserData) + xpdu.pduDestination = pdu.pduSource + + # send it downstream + self.request(xpdu) + + elif isinstance(pdu, ReadBroadcastDistributionTable): + ###TODO verify this is from a management network/address + + # build a response + xpdu = ReadBroadcastDistributionTableAck(self.bbmdBDT, user_data=pdu.pduUserData) + xpdu.pduDestination = pdu.pduSource + if _debug: BIPNAT._debug(" - xpdu: %r", xpdu) + + # send it downstream + self.request(xpdu) + + elif isinstance(pdu, ReadBroadcastDistributionTableAck): + # send this to the service access point + self.sap_response(pdu) + + elif isinstance(pdu, ForwardedNPDU): + ###TODO verify this is from a peer + + # build a PDU with the source from the real source + xpdu = PDU(pdu.pduData, source=pdu.bvlciAddress, destination=LocalBroadcast(), user_data=pdu.pduUserData) + if _debug: BIPNAT._debug(" - upstream xpdu: %r", xpdu) + + # send it upstream + self.response(xpdu) + + # build a forwarded NPDU to send out + xpdu = ForwardedNPDU(pdu.bvlciAddress, pdu, destination=None, user_data=pdu.pduUserData) + if _debug: BIPNAT._debug(" - forwarded xpdu: %r", xpdu) + + # send it to the registered foreign devices + for fdte in self.bbmdFDT: + xpdu.pduDestination = fdte.fdAddress + if _debug: BIPNAT._debug(" - sending to foreign device: %r", xpdu.pduDestination) + self.request(xpdu) + + elif isinstance(pdu, RegisterForeignDevice): + ###TODO verify this is from an acceptable address + + # process the request + stat = self.register_foreign_device(pdu.pduSource, pdu.bvlciTimeToLive) + + # build a response + xpdu = Result(code=stat, destination=pdu.pduSource, user_data=pdu.pduUserData) + if _debug: BIPNAT._debug(" - xpdu: %r", xpdu) + + # send it downstream + self.request(xpdu) + + elif isinstance(pdu, ReadForeignDeviceTable): + ###TODO verify this is from a management network/address + + # build a response + xpdu = ReadForeignDeviceTableAck(self.bbmdFDT, destination=pdu.pduSource, user_data=pdu.pduUserData) + if _debug: BIPNAT._debug(" - xpdu: %r", xpdu) + + # send it downstream + self.request(xpdu) + + elif isinstance(pdu, ReadForeignDeviceTableAck): + # send this to the service access point + self.sap_response(pdu) + + elif isinstance(pdu, DeleteForeignDeviceTableEntry): + ###TODO verify this is from a management network/address + + # process the request + stat = self.delete_foreign_device_table_entry(pdu.bvlciAddress) + + # build a response + xpdu = Result(code=stat, user_data=pdu.pduUserData) + xpdu.pduDestination = pdu.pduSource + if _debug: BIPNAT._debug(" - xpdu: %r", xpdu) + + # send it downstream + self.request(xpdu) + + elif isinstance(pdu, DistributeBroadcastToNetwork): + ###TODO verify this is from a registered foreign device + + # build a PDU with a local broadcast address + xpdu = PDU(pdu.pduData, source=pdu.pduSource, destination=LocalBroadcast(), user_data=pdu.pduUserData) + if _debug: BIPNAT._debug(" - upstream xpdu: %r", xpdu) + + # send it upstream + self.response(xpdu) + + # build a forwarded NPDU to send out + xpdu = ForwardedNPDU(pdu.pduSource, pdu, user_data=pdu.pduUserData) + if _debug: BIPNAT._debug(" - forwarded xpdu: %r", xpdu) + + # send it to the peers + for bdte in self.bbmdBDT: + if bdte == self.bbmdAddress: + if _debug: BIPNAT._debug(" - no local broadcast") + else: + xpdu.pduDestination = Address((bdte.addrIP, bdte.addrPort)) + if _debug: BIPNAT._debug(" - sending to peer: %r", xpdu.pduDestination) + self.request(xpdu) + + # send it to the other registered foreign devices + for fdte in self.bbmdFDT: + if fdte.fdAddress != pdu.pduSource: + xpdu.pduDestination = fdte.fdAddress + if _debug: BIPNAT._debug(" - sending to foreign device: %r", xpdu.pduDestination) + self.request(xpdu) + + elif isinstance(pdu, OriginalUnicastNPDU): + ###TODO verify this is from a peer + + # build a vanilla PDU + xpdu = PDU(pdu.pduData, source=pdu.pduSource, destination=pdu.pduDestination, user_data=pdu.pduUserData) + if _debug: BIPNAT._debug(" - upstream xpdu: %r", xpdu) + + # send it upstream + self.response(xpdu) + + elif isinstance(pdu, OriginalBroadcastNPDU): + if _debug: BIPNAT._debug(" - original broadcast dropped") + + else: + BIPNAT._warning("invalid pdu type: %s", type(pdu)) + + def register_foreign_device(self, addr, ttl): + """Add a foreign device to the FDT.""" + if _debug: BIPNAT._debug("register_foreign_device %r %r", addr, ttl) + + # see if it is an address or make it one + if isinstance(addr, Address): + pass + elif isinstance(addr, str): + addr = LocalStation( addr ) + else: + raise TypeError("addr must be a string or an Address") + + for fdte in self.bbmdFDT: + if addr == fdte.fdAddress: + break + else: + fdte = FDTEntry() + fdte.fdAddress = addr + self.bbmdFDT.append( fdte ) + + fdte.fdTTL = ttl + fdte.fdRemain = ttl + 5 + + # return success + return 0 + + def delete_foreign_device_table_entry(self, addr): + if _debug: BIPNAT._debug("delete_foreign_device_table_entry %r", addr) + + # see if it is an address or make it one + if isinstance(addr, Address): + pass + elif isinstance(addr, str): + addr = LocalStation( addr ) + else: + raise TypeError("addr must be a string or an Address") + + # find it and delete it + stat = 0 + for i in range(len(self.bbmdFDT)-1, -1, -1): + if addr == self.bbmdFDT[i].fdAddress: + del self.bbmdFDT[i] + break + else: + stat = 99 ### entry not found + + # return status + return stat + + def process_task(self): + # look for foreign device registrations that have expired + for i in range(len(self.bbmdFDT)-1, -1, -1): + fdte = self.bbmdFDT[i] + fdte.fdRemain -= 1 + + # delete it if it expired + if fdte.fdRemain <= 0: + if _debug: BIPNAT._debug("foreign device expired: %r", fdte.fdAddress) + del self.bbmdFDT[i] + + def add_peer(self, addr): + if _debug: BIPNAT._debug("add_peer %r", addr) + + # see if it is an address or make it one + if isinstance(addr, Address): + pass + elif isinstance(addr, str): + addr = LocalStation(addr) + else: + raise TypeError("addr must be a string or an Address") + + # if it's this BBMD, make it the first one + if self.bbmdBDT and (addr == self.bbmdAddress): + raise RuntimeError("add self to BDT as first address") + + # see if it's already there + for bdte in self.bbmdBDT: + if addr == bdte: + break + else: + self.bbmdBDT.append(addr) + + def delete_peer(self, addr): + if _debug: BIPNAT._debug("delete_peer %r", addr) + + # see if it is an address or make it one + if isinstance(addr, Address): + pass + elif isinstance(addr, str): + addr = LocalStation(addr) + else: + raise TypeError("addr must be a string or an Address") + + # look for the peer address + for i in range(len(self.bbmdBDT)-1, -1, -1): + if addr == self.bbmdBDT[i]: + del self.bbmdBDT[i] + break + else: + pass + # # BVLLServiceElement # diff --git a/py34/bacpypes/comm.py b/py34/bacpypes/comm.py index 82a10127..6e6ec9ec 100755 --- a/py34/bacpypes/comm.py +++ b/py34/bacpypes/comm.py @@ -386,6 +386,109 @@ def indication(self, *args, **kwargs): self.response(*args, **kwargs) +# +# Switch +# + +@bacpypes_debugging +class Switch(Client, Server): + + """ + A Switch is a client and server that wraps around clients and/or servers + and provides a way to switch between them without unbinding and rebinding + the stack. + """ + + class TerminalWrapper(Client, Server): + + def __init__(self, switch, terminal): + self.switch = switch + self.terminal = terminal + + if isinstance(terminal, Server): + bind(self, terminal) + if isinstance(terminal, Client): + bind(terminal, self) + + def indication(self, *args, **kwargs): + self.switch.request(*args, **kwargs) + + def confirmation(self, *args, **kwargs): + self.switch.response(*args, **kwargs) + + def __init__(self, **terminals): + if _debug: Switch._debug("__init__ %r", terminals) + + Client.__init__(self) + Server.__init__(self) + + # wrap the terminals + self.terminals = {k:Switch.TerminalWrapper(self, v) for k, v in terminals.items()} + self.current_terminal = None + + def __getitem__(self, key): + if key not in self.terminals: + raise KeyError("%r not a terminal" % (key,)) + + # return the terminal, not the wrapper + return self.terminals[key].terminal + + def __setitem__(self, key, term): + if key in self.terminals: + raise KeyError("%r already a terminal" % (key,)) + + # build a wrapper and map it + self.terminals[key] = Switch.TerminalWrapper(self, term) + + def __delitem__(self, key): + if key not in self.terminals: + raise KeyError("%r not a terminal" % (key,)) + + # if deleting current terminal, deactivate it + term = self.terminals[key].terminal + if term is self.current_terminal: + terminal_deactivate = getattr(self.current_terminal, 'deactivate', None) + if terminal_deactivate: + terminal_deactivate() + self.current_terminal = None + + del self.terminals[key] + + def switch_terminal(self, key): + if key not in self.terminals: + raise KeyError("%r not a terminal" % (key,)) + + if self.current_terminal: + terminal_deactivate = getattr(self.current_terminal, 'deactivate', None) + if terminal_deactivate: + terminal_deactivate() + + if key is None: + self.current_terminal = None + else: + self.current_terminal = self.terminals[key].terminal + terminal_activate = getattr(self.current_terminal, 'activate', None) + if terminal_activate: + terminal_activate() + + def indication(self, *args, **kwargs): + """Downstream packet, send to current terminal.""" + if not self.current_terminal: + raise RuntimeError("no active terminal") + if not isinstance(self.current_terminal, Server): + raise RuntimeError("current terminal not a server") + + self.current_terminal.indication(*args, **kwargs) + + def confirmation(self, *args, **kwargs): + """Upstream packet, send to current terminal.""" + if not self.current_terminal: + raise RuntimeError("no active terminal") + if not isinstance(self.current_terminal, Client): + raise RuntimeError("current terminal not a client") + + self.current_terminal.confirmation(*args, **kwargs) + # # ServiceAccessPoint # diff --git a/py34/bacpypes/iocb.py b/py34/bacpypes/iocb.py index db96cba9..730d383d 100644 --- a/py34/bacpypes/iocb.py +++ b/py34/bacpypes/iocb.py @@ -141,12 +141,12 @@ def add_callback(self, fn, *args, **kwargs): if self.ioComplete.isSet(): self.trigger() - def wait(self, *args): + def wait(self, *args, **kwargs): """Wait for the completion event to be set.""" - if _debug: IOCB._debug("wait(%d) %r", self.ioID, args) + if _debug: IOCB._debug("wait(%d) %r %r", self.ioID, args, kwargs) # waiting from a non-daemon thread could be trouble - self.ioComplete.wait(*args) + return self.ioComplete.wait(*args, **kwargs) def trigger(self): """Set the completion event and make the callback(s).""" diff --git a/py34/bacpypes/netservice.py b/py34/bacpypes/netservice.py index 835bbf0f..64c5bfb4 100755 --- a/py34/bacpypes/netservice.py +++ b/py34/bacpypes/netservice.py @@ -762,6 +762,9 @@ def WhoIsRouterToNetwork(self, adapter, npdu): if router_net not in sap.adapters: if _debug: NetworkServiceElement._debug(" - path error (6)") return + if sap.adapters[router_net] is adapter: + if _debug: NetworkServiceElement._debug(" - same network") + return # build a response iamrtn = IAmRouterToNetwork([dnet], user_data=npdu.pduUserData) @@ -771,7 +774,7 @@ def WhoIsRouterToNetwork(self, adapter, npdu): self.response(adapter, iamrtn) else: - if _debug: NetworkServiceElement._debug(" - forwarding request to other adapters") + if _debug: NetworkServiceElement._debug(" - forwarding to other adapters") # build a request whoisrtn = WhoIsRouterToNetwork(dnet, user_data=npdu.pduUserData) @@ -801,19 +804,21 @@ def IAmRouterToNetwork(self, adapter, npdu): sap.add_router_references(adapter.adapterNet, npdu.pduSource, npdu.iartnNetworkList) # skip if this is not a router - if len(sap.adapters) > 1: + if len(sap.adapters) == 1: + if _debug: NetworkServiceElement._debug(" - not a router") + + else: + if _debug: NetworkServiceElement._debug(" - forwarding to other adapters") + # build a broadcast annoucement iamrtn = IAmRouterToNetwork(npdu.iartnNetworkList, user_data=npdu.pduUserData) iamrtn.pduDestination = LocalBroadcast() # send it to all of the connected adapters for xadapter in sap.adapters.values(): - # skip the horse it rode in on - if (xadapter is adapter): - continue - - # request this - self.request(xadapter, iamrtn) + if xadapter is not adapter: + if _debug: NetworkServiceElement._debug(" - sending on adapter: %r", xadapter) + self.request(xadapter, iamrtn) # look for pending NPDUs for the networks for dnet in npdu.iartnNetworkList: diff --git a/py34/bacpypes/service/cov.py b/py34/bacpypes/service/cov.py index 7e14cb70..0bb07d5a 100644 --- a/py34/bacpypes/service/cov.py +++ b/py34/bacpypes/service/cov.py @@ -7,6 +7,7 @@ from ..debugging import bacpypes_debugging, DebugContents, ModuleLogger from ..capability import Capability +from ..core import deferred from ..task import OneShotTask, TaskManager from ..iocb import IOCB @@ -172,8 +173,8 @@ def execute(self): # something changed, send out the notifications self.send_cov_notifications() - def send_cov_notifications(self): - if _debug: COVDetection._debug("send_cov_notifications") + def send_cov_notifications(self, subscription=None): + if _debug: COVDetection._debug("send_cov_notifications %r", subscription) # check for subscriptions if not len(self.cov_subscriptions): @@ -206,8 +207,15 @@ def send_cov_notifications(self): list_of_values.append(property_value) if _debug: COVDetection._debug(" - list_of_values: %r", list_of_values) + # if the specific subscription was provided, that is the notification + # list, otherwise send it to all of them + if subscription is not None: + notification_list = [subscription] + else: + notification_list = self.cov_subscriptions + # loop through the subscriptions and send out notifications - for cov in self.cov_subscriptions: + for cov in notification_list: if _debug: COVDetection._debug(" - cov: %s", repr(cov)) # calculate time remaining @@ -434,49 +442,48 @@ def ReadProperty(self, obj, arrayIndex=None): # start with an empty sequence cov_subscriptions = ListOf(COVSubscription)() - # loop through the object and detection list - for obj, cov_detection in obj._app.cov_detections.items(): - for cov in cov_detection.cov_subscriptions: - # calculate time remaining - if not cov.lifetime: - time_remaining = 0 - else: - time_remaining = int(cov.taskTime - current_time) - - # make sure it is at least one second - if not time_remaining: - time_remaining = 1 - - recipient = Recipient( - address=DeviceAddress( - networkNumber=cov.client_addr.addrNet or 0, - macAddress=cov.client_addr.addrAddr, - ), - ) - if _debug: ActiveCOVSubscriptions._debug(" - recipient: %r", recipient) - if _debug: ActiveCOVSubscriptions._debug(" - client MAC address: %r", cov.client_addr.addrAddr) - - recipient_process = RecipientProcess( - recipient=recipient, - processIdentifier=cov.proc_id, - ) - if _debug: ActiveCOVSubscriptions._debug(" - recipient_process: %r", recipient_process) - - cov_subscription = COVSubscription( - recipient=recipient_process, - monitoredPropertyReference=ObjectPropertyReference( - objectIdentifier=cov.obj_id, - propertyIdentifier=cov_detection.monitored_property_reference, - ), - issueConfirmedNotifications=cov.confirmed, - timeRemaining=time_remaining, - ) - if hasattr(cov_detection, 'covIncrement'): - cov_subscription.covIncrement = cov_detection.covIncrement - if _debug: ActiveCOVSubscriptions._debug(" - cov_subscription: %r", cov_subscription) - - # add the list - cov_subscriptions.append(cov_subscription) + # loop through the subscriptions + for cov in obj._app.subscriptions(): + # calculate time remaining + if not cov.lifetime: + time_remaining = 0 + else: + time_remaining = int(cov.taskTime - current_time) + + # make sure it is at least one second + if not time_remaining: + time_remaining = 1 + + recipient = Recipient( + address=DeviceAddress( + networkNumber=cov.client_addr.addrNet or 0, + macAddress=cov.client_addr.addrAddr, + ), + ) + if _debug: ActiveCOVSubscriptions._debug(" - recipient: %r", recipient) + if _debug: ActiveCOVSubscriptions._debug(" - client MAC address: %r", cov.client_addr.addrAddr) + + recipient_process = RecipientProcess( + recipient=recipient, + processIdentifier=cov.proc_id, + ) + if _debug: ActiveCOVSubscriptions._debug(" - recipient_process: %r", recipient_process) + + cov_subscription = COVSubscription( + recipient=recipient_process, + monitoredPropertyReference=ObjectPropertyReference( + objectIdentifier=cov.obj_id, + propertyIdentifier=cov_detection.monitored_property_reference, + ), + issueConfirmedNotifications=cov.confirmed, + timeRemaining=time_remaining, + ) + if hasattr(cov_detection, 'covIncrement'): + cov_subscription.covIncrement = cov_detection.covIncrement + if _debug: ActiveCOVSubscriptions._debug(" - cov_subscription: %r", cov_subscription) + + # add the list + cov_subscriptions.append(cov_subscription) return cov_subscriptions @@ -533,6 +540,15 @@ def cancel_subscription(self, cov): # delete it from the object map del self.cov_detections[cov.obj_ref] + def subscriptions(self): + """Generator for the active subscriptions.""" + if _debug: ChangeOfValueServices._debug("subscriptions") + + # loop through the object and detection list + for obj, cov_detection in self.cov_detections.items(): + for cov in cov_detection.cov_subscriptions: + yield cov + def cov_notification(self, cov, request): if _debug: ChangeOfValueServices._debug("cov_notification %s %s", str(cov), str(request)) @@ -647,3 +663,10 @@ def do_SubscribeCOVRequest(self, apdu): # return the result self.response(response) + + # if the subscription is not being canceled, it is new or renewed, + # so send it a notification when you get a chance. + if not cancel_subscription: + if _debug: ChangeOfValueServices._debug(" - send a notification") + deferred(cov_detection.send_cov_notifications, cov) + diff --git a/samples/DeviceCommunicationControl.py b/samples/DeviceCommunicationControl.py index d679a097..3f5b4a11 100755 --- a/samples/DeviceCommunicationControl.py +++ b/samples/DeviceCommunicationControl.py @@ -97,16 +97,12 @@ def do_rtn(self, args): args = args.split() if _debug: DCCConsoleCmd._debug("do_rtn %r", args) - # safe to assume only one adapter - adapter = this_application.nsap.adapters[0] - if _debug: DCCConsoleCmd._debug(" - adapter: %r", adapter) - # provide the address and a list of network numbers router_address = Address(args[0]) network_list = [int(arg) for arg in args[1:]] # pass along to the service access point - this_application.nsap.add_router_references(adapter, router_address, network_list) + this_application.nsap.add_router_references(None, router_address, network_list) # diff --git a/samples/DeviceDiscovery.py b/samples/DeviceDiscovery.py index 11b0c1af..09c641b0 100755 --- a/samples/DeviceDiscovery.py +++ b/samples/DeviceDiscovery.py @@ -171,16 +171,12 @@ def do_rtn(self, args): args = args.split() if _debug: DiscoveryConsoleCmd._debug("do_rtn %r", args) - # safe to assume only one adapter - adapter = this_application.nsap.adapters[0] - if _debug: DiscoveryConsoleCmd._debug(" - adapter: %r", adapter) - # provide the address and a list of network numbers router_address = Address(args[0]) network_list = [int(arg) for arg in args[1:]] # pass along to the service access point - this_application.nsap.add_router_references(adapter, router_address, network_list) + this_application.nsap.add_router_references(None, router_address, network_list) # diff --git a/samples/DeviceDiscoveryForeign.py b/samples/DeviceDiscoveryForeign.py index 8634552b..53493f42 100755 --- a/samples/DeviceDiscoveryForeign.py +++ b/samples/DeviceDiscoveryForeign.py @@ -171,16 +171,12 @@ def do_rtn(self, args): args = args.split() if _debug: DiscoveryConsoleCmd._debug("do_rtn %r", args) - # safe to assume only one adapter - adapter = this_application.nsap.adapters[0] - if _debug: DiscoveryConsoleCmd._debug(" - adapter: %r", adapter) - # provide the address and a list of network numbers router_address = Address(args[0]) network_list = [int(arg) for arg in args[1:]] # pass along to the service access point - this_application.nsap.add_router_references(adapter, router_address, network_list) + this_application.nsap.add_router_references(None, router_address, network_list) # diff --git a/samples/EventNotifications.py b/samples/EventNotifications.py new file mode 100755 index 00000000..de5309ed --- /dev/null +++ b/samples/EventNotifications.py @@ -0,0 +1,265 @@ +#!/usr/bin/env python + +""" +This application presents a 'console' prompt to the user asking to save and +restore a recipient list from a notification class object, or re-write one to +the local device. It also listens and acknowledges incoming event notifications. +""" + +import os +import sys + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser +from bacpypes.consolecmd import ConsoleCmd + +from bacpypes.core import run, deferred, enable_sleeping +from bacpypes.iocb import IOCB + +from bacpypes.pdu import Address +from bacpypes.apdu import SimpleAckPDU, \ + ReadPropertyRequest, ReadPropertyACK, WritePropertyRequest +from bacpypes.basetypes import Destination, Recipient +from bacpypes.constructeddata import ListOf, Any + +from bacpypes.app import BIPSimpleApplication +from bacpypes.local.device import LocalDeviceObject + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None +saved_recipent_list = None + +# shortcut type +ListOfDestination = ListOf(Destination) + +# +# EventNotificationApplication +# + +@bacpypes_debugging +class EventNotificationApplication(BIPSimpleApplication): + + def __init__(self, *args): + if _debug: EventNotificationApplication._debug("__init__ %r", args) + BIPSimpleApplication.__init__(self, *args) + + def do_ConfirmedEventNotificationRequest(self, apdu): + if _debug: EventNotificationApplication._debug("do_ConfirmedEventNotificationRequest %r", apdu) + + # dump the APDU contents + apdu.debug_contents(file=sys.stdout) + + # double check the process identifier + if apdu.processIdentifier != os.getpid(): + print("note: not for this process") + + # success + self.response(SimpleAckPDU(context=apdu)) + + def do_UnconfirmedEventNotificationRequest(self, apdu): + if _debug: EventNotificationApplication._debug("do_UnconfirmedEventNotificationRequest %r %r", apdu) + + # dump the APDU contents + apdu.debug_contents(file=sys.stdout) + + # double check the process identifier + if apdu.processIdentifier != os.getpid(): + print("note: not for this process") + +# +# EventNotificationConsoleCmd +# + +@bacpypes_debugging +class EventNotificationConsoleCmd(ConsoleCmd): + + def do_saverl(self, args): + """saverl """ + args = args.split() + if _debug: EventNotificationConsoleCmd._debug("do_saverl %r", args) + global saved_recipent_list + + try: + addr, obj_inst = args + obj_inst = int(obj_inst) + + # build a request + request = ReadPropertyRequest( + objectIdentifier=('notificationClass', obj_inst), + propertyIdentifier='recipientList', + ) + request.pduDestination = Address(addr) + + # make an IOCB + iocb = IOCB(request) + if _debug: EventNotificationConsoleCmd._debug(" - iocb: %r", iocb) + + # give it to the application + deferred(this_application.request_io, iocb) + + # wait for it to complete + iocb.wait() + + # do something for error/reject/abort + if iocb.ioError: + sys.stdout.write(str(iocb.ioError) + '\n') + + # do something for success + elif iocb.ioResponse: + apdu = iocb.ioResponse + + # should be an ack + if not isinstance(apdu, ReadPropertyACK): + if _debug: EventNotificationConsoleCmd._debug(" - not an ack") + return + + # turn the property tag list blob into a list of destinations + saved_recipent_list = apdu.propertyValue.cast_out(ListOfDestination) + + for destination in saved_recipent_list: + destination.debug_contents(file=sys.stdout) + + # do something with nothing? + else: + if _debug: EventNotificationConsoleCmd._debug(" - ioError or ioResponse expected") + + except Exception as error: + EventNotificationConsoleCmd._exception("exception: %r", error) + + def do_restorerl(self, args): + """restorerl """ + args = args.split() + if _debug: EventNotificationConsoleCmd._debug("do_restorerl %r", args) + global saved_recipent_list + + # make sure there is one to restore + if not saved_recipent_list: + print("no saved recipient list") + + addr, obj_inst = args + obj_inst = int(obj_inst) + + # pass along to the shared function + self.write_recipient_list(addr, obj_inst, saved_recipent_list) + + def do_writerl(self, args): + """writerl """ + args = args.split() + if _debug: EventNotificationConsoleCmd._debug("do_writerl %r", args) + + addr, obj_inst = args + obj_inst = int(obj_inst) + + # make a destination for the device identifier + destination = Destination( + validDays=[1, 1, 1, 1, 1, 1, 1], # all days + fromTime=[0, 0, 0, 0], # midnight + toTime=[23, 59, 59, 99], # all day + recipient=Recipient(device=this_device.objectIdentifier), # this device + processIdentifier=os.getpid(), # this process + issueConfirmedNotifications=True, # confirmed service please + transitions=[1, 1, 1], # all transitions + ) + + # pass along to the shared function + self.write_recipient_list(addr, obj_inst, [destination]) + + def write_recipient_list(addr, obj_inst, recipent_list): + if _debug: EventNotificationConsoleCmd._debug("write_recipient_list %r %r %r", addr, obj_inst, recipent_list) + + # the new list has just us + recipient_list = ListOfDestination(recipent_list) + if _debug: EventNotificationConsoleCmd._debug(" - recipient_list: %r", recipient_list) + + # build a request + request = WritePropertyRequest( + objectIdentifier=('notificationClass', obj_inst), + propertyIdentifier='recipientList', + propertyValue=Any(), + ) + request.pduDestination = Address(addr) + + # save the value + request.propertyValue.cast_in(recipient_list) + if _debug: EventNotificationConsoleCmd._debug(" - request: %r", request) + + # make an IOCB + iocb = IOCB(request) + if _debug: EventNotificationConsoleCmd._debug(" - iocb: %r", iocb) + + # give it to the application + deferred(this_application.request_io, iocb) + + # wait for it to complete + iocb.wait() + + # do something for success + if iocb.ioResponse: + # should be an ack + if not isinstance(iocb.ioResponse, SimpleAckPDU): + if _debug: EventNotificationConsoleCmd._debug(" - not an ack") + return + + sys.stdout.write("ack\n") + + # do something for error/reject/abort + if iocb.ioError: + sys.stdout.write(str(iocb.ioError) + '\n') + + def do_rtn(self, args): + """rtn ... """ + args = args.split() + if _debug: EventNotificationConsoleCmd._debug("do_rtn %r", args) + + # provide the address and a list of network numbers + router_address = Address(args[0]) + network_list = [int(arg) for arg in args[1:]] + + # pass along to the service access point + this_application.nsap.add_router_references(None, router_address, network_list) + + +# +# main +# + +def main(): + global this_device, this_application, saved_recipent_list + + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject(ini=args.ini) + if _debug: _log.debug(" - this_device: %r", this_device) + + # make a simple application + this_application = EventNotificationApplication( + this_device, args.ini.address, + ) + if _debug: _log.debug(" - this_application: %r", this_application) + + # make a console + this_console = EventNotificationConsoleCmd() + if _debug: _log.debug(" - this_console: %r", this_console) + + # enable sleeping will help with threads + enable_sleeping() + + _log.debug("running") + + run() + + _log.debug("fini") + + +if __name__ == "__main__": + main() diff --git a/samples/IP2VLANRouter.py b/samples/IP2VLANRouter.py index 87f36870..c3000b25 100755 --- a/samples/IP2VLANRouter.py +++ b/samples/IP2VLANRouter.py @@ -24,7 +24,8 @@ from bacpypes.app import Application from bacpypes.appservice import StateMachineAccessPoint, ApplicationServiceAccessPoint -from bacpypes.service.device import LocalDeviceObject, WhoIsIAmServices +from bacpypes.local.device import LocalDeviceObject +from bacpypes.service.device import WhoIsIAmServices from bacpypes.service.object import ReadWritePropertyServices from bacpypes.primitivedata import Real @@ -235,17 +236,20 @@ def main(): ) _log.debug(" - vlan_device: %r", vlan_device) + vlan_address = Address(device_number) + _log.debug(" - vlan_address: %r", vlan_address) + # make the application, add it to the network - vlan_app = VLANApplication(vlan_device, Address(device_number)) + vlan_app = VLANApplication(vlan_device, vlan_address) vlan.add_node(vlan_app.vlan_node) _log.debug(" - vlan_app: %r", vlan_app) # make a random value object ravo = RandomAnalogValueObject( objectIdentifier=('analogValue', 1), - objectName='Random1' % (device_instance,), + objectName='Random-1-%d' % (device_instance,), ) - _log.debug(" - ravo1: %r", ravo) + _log.debug(" - ravo: %r", ravo) # add it to the device vlan_app.add_object(ravo) diff --git a/samples/NATRouter.py b/samples/NATRouter.py old mode 100644 new mode 100755 diff --git a/samples/ReadProperty.py b/samples/ReadProperty.py index d8551e4c..3eff2bfa 100755 --- a/samples/ReadProperty.py +++ b/samples/ReadProperty.py @@ -125,16 +125,12 @@ def do_rtn(self, args): args = args.split() if _debug: ReadPropertyConsoleCmd._debug("do_rtn %r", args) - # safe to assume only one adapter - adapter = this_application.nsap.adapters[0] - if _debug: ReadPropertyConsoleCmd._debug(" - adapter: %r", adapter) - # provide the address and a list of network numbers router_address = Address(args[0]) network_list = [int(arg) for arg in args[1:]] # pass along to the service access point - this_application.nsap.add_router_references(adapter, router_address, network_list) + this_application.nsap.add_router_references(None, router_address, network_list) # diff --git a/samples/ReadProperty25.py b/samples/ReadProperty25.py index 9885b53a..da74b1e5 100755 --- a/samples/ReadProperty25.py +++ b/samples/ReadProperty25.py @@ -120,16 +120,12 @@ def do_rtn(self, args): args = args.split() if _debug: ReadPropertyConsoleCmd._debug("do_rtn %r", args) - # safe to assume only one adapter - adapter = this_application.nsap.adapters[0] - if _debug: ReadPropertyConsoleCmd._debug(" - adapter: %r", adapter) - # provide the address and a list of network numbers router_address = Address(args[0]) network_list = [int(arg) for arg in args[1:]] # pass along to the service access point - this_application.nsap.add_router_references(adapter, router_address, network_list) + this_application.nsap.add_router_references(None, router_address, network_list) bacpypes_debugging(ReadPropertyConsoleCmd) diff --git a/samples/ReadWriteEventMessageTexts.py b/samples/ReadWriteEventMessageTexts.py index 5effd7f9..ac89da19 100644 --- a/samples/ReadWriteEventMessageTexts.py +++ b/samples/ReadWriteEventMessageTexts.py @@ -186,16 +186,12 @@ def do_rtn(self, args): args = args.split() if _debug: ReadWritePropertyConsoleCmd._debug("do_rtn %r", args) - # safe to assume only one adapter - adapter = this_application.nsap.adapters[0] - if _debug: ReadWritePropertyConsoleCmd._debug(" - adapter: %r", adapter) - # provide the address and a list of network numbers router_address = Address(args[0]) network_list = [int(arg) for arg in args[1:]] # pass along to the service access point - this_application.nsap.add_router_references(adapter, router_address, network_list) + this_application.nsap.add_router_references(None, router_address, network_list) # diff --git a/samples/ReadWriteProperty.py b/samples/ReadWriteProperty.py index d77ff09a..9458f9cc 100755 --- a/samples/ReadWriteProperty.py +++ b/samples/ReadWriteProperty.py @@ -245,16 +245,12 @@ def do_rtn(self, args): args = args.split() if _debug: ReadWritePropertyConsoleCmd._debug("do_rtn %r", args) - # safe to assume only one adapter - adapter = this_application.nsap.adapters[0] - if _debug: ReadWritePropertyConsoleCmd._debug(" - adapter: %r", adapter) - # provide the address and a list of network numbers router_address = Address(args[0]) network_list = [int(arg) for arg in args[1:]] # pass along to the service access point - this_application.nsap.add_router_references(adapter, router_address, network_list) + this_application.nsap.add_router_references(None, router_address, network_list) # diff --git a/samples/Tutorial/WhoIsIAm.py b/samples/Tutorial/WhoIsIAm.py index 67fe1203..9f35cd48 100644 --- a/samples/Tutorial/WhoIsIAm.py +++ b/samples/Tutorial/WhoIsIAm.py @@ -132,16 +132,12 @@ def do_rtn(self, args): args = args.split() if _debug: WhoIsIAmConsoleCmd._debug("do_rtn %r", args) - # safe to assume only one adapter - adapter = this_application.nsap.adapters[0] - if _debug: WhoIsIAmConsoleCmd._debug(" - adapter: %r", adapter) - # provide the address and a list of network numbers router_address = Address(args[0]) network_list = [int(arg) for arg in args[1:]] # pass along to the service access point - this_application.nsap.add_router_references(adapter, router_address, network_list) + this_application.nsap.add_router_references(None, router_address, network_list) # diff --git a/samples/WhoIsIAm.py b/samples/WhoIsIAm.py index 331cf265..5accef8a 100755 --- a/samples/WhoIsIAm.py +++ b/samples/WhoIsIAm.py @@ -153,16 +153,12 @@ def do_rtn(self, args): args = args.split() if _debug: WhoIsIAmConsoleCmd._debug("do_rtn %r", args) - # safe to assume only one adapter - adapter = this_application.nsap.adapters[0] - if _debug: WhoIsIAmConsoleCmd._debug(" - adapter: %r", adapter) - # provide the address and a list of network numbers router_address = Address(args[0]) network_list = [int(arg) for arg in args[1:]] # pass along to the service access point - this_application.nsap.add_router_references(adapter, router_address, network_list) + this_application.nsap.add_router_references(None, router_address, network_list) # diff --git a/samples/WhoIsIAmForeign.py b/samples/WhoIsIAmForeign.py index eeaa88e2..ad43a210 100755 --- a/samples/WhoIsIAmForeign.py +++ b/samples/WhoIsIAmForeign.py @@ -164,16 +164,12 @@ def do_rtn(self, args): args = args.split() if _debug: WhoIsIAmConsoleCmd._debug("do_rtn %r", args) - # safe to assume only one adapter - adapter = this_application.nsap.adapters[0] - if _debug: WhoIsIAmConsoleCmd._debug(" - adapter: %r", adapter) - # provide the address and a list of network numbers router_address = Address(args[0]) network_list = [int(arg) for arg in args[1:]] # pass along to the service access point - this_application.nsap.add_router_references(adapter, router_address, network_list) + this_application.nsap.add_router_references(None, router_address, network_list) # diff --git a/samples/switch_demo.py b/samples/switch_demo.py new file mode 100644 index 00000000..44de18fe --- /dev/null +++ b/samples/switch_demo.py @@ -0,0 +1,137 @@ +#!/usr/bin/python + +""" +""" + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ArgumentParser +from bacpypes.consolecmd import ConsoleCmd + +from bacpypes.comm import Client, Server, Switch, Debug, bind +from bacpypes.core import run, enable_sleeping + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_console = None +this_switch = None + +# +# DebugTerm +# + +class DebugTerm(Debug): + + """ + A simple wrapper around the Debug class that prints out when it has been + activated and deactivated by the switch. + """ + + def activate(self): + print(self.label + " activated") + + def deactivate(self): + print(self.label + " deactivated") + +# +# TestConsoleCmd +# + +@bacpypes_debugging +class TestConsoleCmd(Client, Server, ConsoleCmd): + + def __init__(self): + Client.__init__(self) + Server.__init__(self) + ConsoleCmd.__init__(self) + + def do_request(self, args): + """request """ + args = args.split() + if _debug: TestConsoleCmd._debug("do_request %r", args) + + # send the request down the stack + self.request(args[0]) + + def do_response(self, args): + """response """ + args = args.split() + if _debug: TestConsoleCmd._debug("do_response %r", args) + + # send the response up the stack + self.response(args[0]) + + def do_switch(self, args): + """switch """ + args = args.split() + if _debug: TestConsoleCmd._debug("do_switch %r", args) + global this_switch + + this_switch.switch_terminal(args[0]) + print("switched") + + def do_add(self, args): + args = args.split() + if _debug: TestConsoleCmd._debug("do_add %r", args) + global this_switch + + # make a new terminal + this_switch[args[0]] = DebugTerm(args[0]) + + def do_del(self, args): + args = args.split() + if _debug: TestConsoleCmd._debug("do_del %r", args) + global this_switch + + # delete the terminal + del this_switch[args[0]] + + def indication(self, arg): + """Got a request, echo it back up the stack.""" + print("indication: {}".format(arg)) + + def confirmation(self, arg): + print("confirmation: {}".format(arg)) + +# +# main +# + +@bacpypes_debugging +def main(): + # parse the command line arguments + args = ArgumentParser(description=__doc__).parse_args() + global this_switch, this_console + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make some debugging terminals + debug1 = DebugTerm("a") + debug2 = DebugTerm("b") + + # make a switch with them + this_switch = Switch(a=debug1, b=debug2) + if _debug: _log.debug(" this_switch: %r", this_switch) + + # make a test console + this_console = TestConsoleCmd() + if _debug: _log.debug(" this_console: %r", this_console) + + # bind the console to the top and bottom of the switch + bind(this_console, this_switch, this_console) + + # enable sleeping will help with threads + enable_sleeping() + + _log.debug("running") + + run() + + _log.debug("fini") + +if __name__ == "__main__": + main() + diff --git a/sandbox/threading_1.py b/sandbox/threading_1.py new file mode 100644 index 00000000..73b38346 --- /dev/null +++ b/sandbox/threading_1.py @@ -0,0 +1,50 @@ +#!/usr/bin/python + +""" +This application demonstrates doing something at a regular interval. +""" + +import sys + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ArgumentParser + +from bacpypes.core import run +from bacpypes.task import recurring_function + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + + +def write_flush(text): + """Print the text, flush immediately.""" + sys.stdout.write(text) + sys.stdout.flush() + + +@recurring_function(3000.0) +def ding(): + """Do something in the BACpypes run loop.""" + write_flush(".") + + +def main(): + # parse the command line arguments + parser = ArgumentParser(description=__doc__) + + # now parse the arguments + args = parser.parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + _log.debug("running") + + run() + + _log.debug("fini") + + +if __name__ == "__main__": + main() diff --git a/sandbox/threading_2.py b/sandbox/threading_2.py new file mode 100644 index 00000000..bed315e6 --- /dev/null +++ b/sandbox/threading_2.py @@ -0,0 +1,68 @@ +#!/usr/bin/python + +""" +This application demonstrates doing something at a regular interval. +""" + +import sys +import time +from threading import Thread + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ArgumentParser + +from bacpypes.core import run +from bacpypes.task import recurring_function + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + + +def write_flush(text): + """Print the text, flush immediately.""" + sys.stdout.write(text) + sys.stdout.flush() + + +@recurring_function(3000.0) +def ding(): + """Do something in the BACpypes run loop.""" + write_flush(".") + + +class ProcessThread(Thread): + + def __init__(self): + Thread.__init__(self) + self.daemon = True + + def run(self): + while True: + write_flush("#") + time.sleep(2) + + +def main(): + # parse the command line arguments + parser = ArgumentParser(description=__doc__) + + # now parse the arguments + args = parser.parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make the thread object and start it + process_thread = ProcessThread() + process_thread.start() + + _log.debug("running") + + run() + + _log.debug("fini") + + +if __name__ == "__main__": + main() diff --git a/sandbox/threading_3.py b/sandbox/threading_3.py new file mode 100644 index 00000000..6a94f38e --- /dev/null +++ b/sandbox/threading_3.py @@ -0,0 +1,68 @@ +#!/usr/bin/python + +""" +This application demonstrates doing something at a regular interval. +""" + +import sys +import time +from threading import Thread + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ArgumentParser + +from bacpypes.core import run +from bacpypes.task import recurring_function + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + + +def write_flush(text): + """Print the text, flush immediately.""" + sys.stdout.write(text) + sys.stdout.flush() + + +@recurring_function(3000.0) +def ding(): + """Do something in the BACpypes run loop.""" + write_flush(".") + + +class BACpypesThread(Thread): + + def __init__(self): + Thread.__init__(self) + + def run(self): + _log.debug("running") + + run() + + _log.debug("fini") + + +def main(): + # parse the command line arguments + parser = ArgumentParser(description=__doc__) + + # now parse the arguments + args = parser.parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make the thread object and start it + bacpypes_thread = BACpypesThread() + bacpypes_thread.start() + + # main thread + while True: + write_flush("#") + time.sleep(2) + + +if __name__ == "__main__": + main() diff --git a/sandbox/threading_4.py b/sandbox/threading_4.py new file mode 100644 index 00000000..c96ae75c --- /dev/null +++ b/sandbox/threading_4.py @@ -0,0 +1,75 @@ +#!/usr/bin/python + +""" +This application demonstrates doing something at a regular interval. +""" + +import sys +import time +from threading import Thread, Event + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ArgumentParser + +from bacpypes.core import run +from bacpypes.task import recurring_function + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + + +def write_flush(text): + """Print the text, flush immediately.""" + sys.stdout.write(text) + sys.stdout.flush() + + +@recurring_function(3000.0) +def ding(): + """Do something in the BACpypes run loop.""" + write_flush(".") + + +class ProcessThread(Thread): + + def __init__(self): + Thread.__init__(self) + self._stop_event = Event() + + def run(self): + while not self._stop_event.isSet(): + write_flush("#") + time.sleep(2) + + def stop(self): + self._stop_event.set() + self.join() + + +def main(): + # parse the command line arguments + parser = ArgumentParser(description=__doc__) + + # now parse the arguments + args = parser.parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make the thread object and start it + process_thread = ProcessThread() + process_thread.start() + + _log.debug("running") + + run() + + _log.debug("fini") + + # tell the thread to stop + process_thread.stop() + + +if __name__ == "__main__": + main() diff --git a/sandbox/threading_5.py b/sandbox/threading_5.py new file mode 100644 index 00000000..736c805f --- /dev/null +++ b/sandbox/threading_5.py @@ -0,0 +1,75 @@ +#!/usr/bin/python + +""" +This application demonstrates doing something at a regular interval. +""" + +import sys +import time +from threading import Thread + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ArgumentParser + +from bacpypes.core import run, stop +from bacpypes.task import recurring_function + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + + +def write_flush(text): + """Print the text, flush immediately.""" + sys.stdout.write(text) + sys.stdout.flush() + + +@recurring_function(3000.0) +def ding(): + """Do something in the BACpypes run loop.""" + write_flush(".") + + +class BACpypesThread(Thread): + + def __init__(self): + Thread.__init__(self) + + def run(self): + _log.debug("running") + + run() + + _log.debug("fini") + + def stop(self): + stop() + self.join() + + +def main(): + # parse the command line arguments + parser = ArgumentParser(description=__doc__) + + # now parse the arguments + args = parser.parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make the thread object and start it + bacpypes_thread = BACpypesThread() + bacpypes_thread.start() + + # main thread + while True: + write_flush("#") + time.sleep(2) + + # tell the thread to stop + bacpypes_thread.stop() + + +if __name__ == "__main__": + main() diff --git a/test_script.sh b/test_script.sh new file mode 100755 index 00000000..1304cec5 --- /dev/null +++ b/test_script.sh @@ -0,0 +1,68 @@ +#!/bin/bash + +# +# BACpypes Test Script +# +# This is an example script for running specific tests +# and capturing the debugging output where it can be +# searched for exceptions, errors, failed tests, etc. +# + +version=3 +options="" +keep=0 + +while getopts kv:o: OPTION +do + case $OPTION in + k) + # keep the test results, even if it passes + keep=1 + ;; + v) + # which python version + version=$OPTARG + ;; + o) + options=$OPTARG + ;; + esac +done +shift $((OPTIND-1)) + +# this is where debugging output should go, the name of the +# file matches the name of the script +bugfile=$(basename $0) +bugfile=${bugfile/.sh/.txt} + +# debugging file can rotate, set the file size large to keep +# it from rotating a lot +export BACPYPES_MAXBYTES=10485760 + +# add the modules or classes that need debugging and redirect +# the output to the file +export BACPYPES_DEBUG=" \ + tests.test_service.helpers.ApplicationNetwork:$bugfile \ + tests.test_service.helpers.SnifferStateMachine:$bugfile \ + tests.state_machine.match_pdu:$bugfile \ + " + +# debugging output will open the file 'append' which is +# not very helpful in most cases, remove the existing debugging file +rm -vf $bugfile + +# run the tests for a specific file, the additional options +# are passed to pytest +python$version setup.py test --addopts "tests/test_service/test_cov.py $options" + +# if all the tests pass, remove the debugging output, otherwise +# display for your enjoyment +if [ $? -eq 0 ] +then + if [ $keep -eq 0 ] + then + rm -vf $bugfile + fi +else + less $bugfile +fi diff --git a/tests/state_machine.py b/tests/state_machine.py index 9e4e2ddc..c70e2dd3 100755 --- a/tests/state_machine.py +++ b/tests/state_machine.py @@ -346,6 +346,24 @@ def after_receive(self, pdu): """Called with PDU received after match.""" self.state_machine.after_receive(pdu) + def ignore(self, pdu_type, **pdu_attrs): + """Create a ReceiveTransition from this state to itself, if match + is successful the effect is to ignore the PDU. + + :param criteria: PDU to match + """ + if _debug: State._debug("ignore(%s) %r %r", self.doc_string, pdu_type, pdu_attrs) + + # create a bundle of the match criteria + criteria = (pdu_type, pdu_attrs) + if _debug: State._debug(" - criteria: %r", criteria) + + # add this to the list of transitions + self.receive_transitions.append(ReceiveTransition(criteria, self)) + + # return this state, no new state is created + return self + def unexpected_receive(self, pdu): """Called with PDU that did not match. Unless this is trapped by the state, the default behaviour is to fail.""" @@ -759,12 +777,26 @@ def goto_state(self, state): # pull apart the pieces and call it fn, args, kwargs = current_state.call_transition.fnargs - fn( *args, **kwargs) - if _debug: StateMachine._debug(" - called") + try: + fn( *args, **kwargs) + if _debug: StateMachine._debug(" - called, no exception") + + # check for a transition + next_state = current_state.call_transition.next_state + if _debug: StateMachine._debug(" - next_state: %r", next_state) + + except AssertionError as err: + if _debug: StateMachine._debug(" - called, exception: %r", err) + self.state_transitioning -= 1 + + self.halt() + self.fail() + + # if it is part of a group, let the group know + if self.machine_group and not self._startup_flag: + self.machine_group.stopped(self) - # check for a transition - next_state = current_state.call_transition.next_state - if _debug: StateMachine._debug(" - next_state: %r", next_state) + return else: if _debug: StateMachine._debug(" - no calls") diff --git a/tests/test_service/helpers.py b/tests/test_service/helpers.py index 5aca62d6..c8a6fc1b 100644 --- a/tests/test_service/helpers.py +++ b/tests/test_service/helpers.py @@ -8,14 +8,17 @@ from bacpypes.comm import Client, bind from bacpypes.pdu import Address, LocalBroadcast +from bacpypes.npdu import NPDU +from bacpypes.apdu import APDU, apdu_types + from bacpypes.vlan import Network, Node -from bacpypes.app import Application +from bacpypes.app import ApplicationIOController from bacpypes.appservice import StateMachineAccessPoint, ApplicationServiceAccessPoint from bacpypes.netservice import NetworkServiceAccessPoint, NetworkServiceElement from bacpypes.local.device import LocalDeviceObject -from ..state_machine import StateMachine, StateMachineGroup +from ..state_machine import StateMachine, StateMachineGroup, TrafficLog from ..time_machine import reset_time_machine, run_time_machine @@ -31,16 +34,20 @@ @bacpypes_debugging class ApplicationNetwork(StateMachineGroup): - def __init__(self): - if _debug: ApplicationNetwork._debug("__init__") + def __init__(self, test_name): + if _debug: ApplicationNetwork._debug("__init__ %r", test_name) StateMachineGroup.__init__(self) # reset the time machine reset_time_machine() if _debug: ApplicationNetwork._debug(" - time machine reset") + # create a traffic log + self.traffic_log = TrafficLog() + # make a little LAN self.vlan = Network(broadcast_address=LocalBroadcast()) + self.vlan.traffic_log = self.traffic_log # test device object self.td_device_object = LocalDeviceObject( @@ -84,6 +91,9 @@ def run(self, time_limit=60.0): for direction, pdu in state_machine.transaction_log: ApplicationNetwork._debug(" %s %s", direction, str(pdu)) + # traffic log has what was processed on each vlan + self.traffic_log.dump(ApplicationNetwork._debug) + # check for success all_success, some_failed = super(ApplicationNetwork, self).check_for_success() ApplicationNetwork._debug(" - all_success, some_failed: %r, %r", all_success, some_failed) @@ -94,8 +104,7 @@ def run(self, time_limit=60.0): # SnifferNode # -@bacpypes_debugging -class SnifferNode(Client, StateMachine): +class SnifferNode(Client): def __init__(self, vlan): if _debug: SnifferNode._debug("__init__ %r", vlan) @@ -106,7 +115,6 @@ def __init__(self, vlan): # continue with initialization Client.__init__(self) - StateMachine.__init__(self) # create a promiscuous node, added to the network self.node = Node(self.address, vlan, promiscuous=True) @@ -115,15 +123,114 @@ def __init__(self, vlan): # bind this to the node bind(self, self.node) + def request(self, pdu): + if _debug: SnifferNode._debug("request(%s) %r", self.name, pdu) + raise RuntimeError("sniffers don't request") + + def confirmation(self, pdu): + if _debug: SnifferNode._debug("confirmation(%s) %r", self.name, pdu) + + # it's an NPDU + npdu = NPDU() + npdu.decode(pdu) + + # filter out network layer traffic if there is any, probably not + if npdu.npduNetMessage is not None: + if _debug: SnifferNode._debug(" - network message: %r", npdu.npduNetMessage) + return + + # decode as a generic APDU + apdu = APDU() + apdu.decode(npdu) + + # "lift" the source and destination address + if npdu.npduSADR: + apdu.pduSource = npdu.npduSADR + else: + apdu.pduSource = npdu.pduSource + if npdu.npduDADR: + apdu.pduDestination = npdu.npduDADR + else: + apdu.pduDestination = npdu.pduDestination + + # make a more focused interpretation + atype = apdu_types.get(apdu.apduType) + if _debug: SnifferNode._debug(" - atype: %r", atype) + + xpdu = apdu + apdu = atype() + apdu.decode(xpdu) + + print(repr(apdu)) + apdu.debug_contents() + print("") + +# +# SnifferStateMachine +# + +@bacpypes_debugging +class SnifferStateMachine(Client, StateMachine): + + def __init__(self, vlan): + if _debug: SnifferStateMachine._debug("__init__ %r", vlan) + + # save the name and give it a blank address + self.name = "sniffer" + self.address = Address() + + # continue with initialization + Client.__init__(self) + StateMachine.__init__(self) + + # create a promiscuous node, added to the network + self.node = Node(self.address, vlan, promiscuous=True) + if _debug: SnifferStateMachine._debug(" - node: %r", self.node) + + # bind this to the node + bind(self, self.node) + def send(self, pdu): - if _debug: SnifferNode._debug("send(%s) %r", self.name, pdu) + if _debug: SnifferStateMachine._debug("send(%s) %r", self.name, pdu) raise RuntimeError("sniffers don't send") def confirmation(self, pdu): - if _debug: SnifferNode._debug("confirmation(%s) %r", self.name, pdu) + if _debug: SnifferStateMachine._debug("confirmation(%s) %r", self.name, pdu) + + # it's an NPDU + npdu = NPDU() + npdu.decode(pdu) + + # filter out network layer traffic if there is any, probably not + if npdu.npduNetMessage is not None: + if _debug: SnifferStateMachine._debug(" - network message: %r", npdu.npduNetMessage) + return + + # decode as a generic APDU + apdu = APDU() + apdu.decode(npdu) + + # "lift" the source and destination address + if npdu.npduSADR: + apdu.pduSource = npdu.npduSADR + else: + apdu.pduSource = npdu.pduSource + if npdu.npduDADR: + apdu.pduDestination = npdu.npduDADR + else: + apdu.pduDestination = npdu.pduDestination + + # make a more focused interpretation + atype = apdu_types.get(apdu.apduType) + if _debug: SnifferStateMachine._debug(" - atype: %r", atype) + + xpdu = apdu + apdu = atype() + apdu.decode(xpdu) + if _debug: SnifferStateMachine._debug(" - apdu: %r", apdu) # pass to the state machine - self.receive(pdu) + self.receive(apdu) # @@ -131,7 +238,7 @@ def confirmation(self, pdu): # @bacpypes_debugging -class ApplicationStateMachine(Application, StateMachine): +class ApplicationStateMachine(ApplicationIOController, StateMachine): def __init__(self, localDevice, vlan): if _debug: ApplicationStateMachine._debug("__init__ %r %r", localDevice, vlan) @@ -141,7 +248,7 @@ def __init__(self, localDevice, vlan): if _debug: ApplicationStateMachine._debug(" - address: %r", self.address) # continue with initialization - Application.__init__(self, localDevice, self.address) + ApplicationIOController.__init__(self, localDevice, self.address) StateMachine.__init__(self, name=localDevice.objectName) # include a application decoder @@ -192,3 +299,6 @@ def confirmation(self, apdu): # forward the confirmation to the state machine self.receive(apdu) + # allow the application to process it + super(ApplicationStateMachine, self).confirmation(apdu) + diff --git a/tests/test_service/test_cov.py b/tests/test_service/test_cov.py index fdffa2a0..5aae576d 100644 --- a/tests/test_service/test_cov.py +++ b/tests/test_service/test_cov.py @@ -1 +1,752 @@ -# placeholder +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +Test Device Services +-------------------- +""" + +import unittest + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.capability import Capability + +from bacpypes.pdu import Address, PDU +from bacpypes.basetypes import ( + DeviceAddress, COVSubscription, PropertyValue, + Recipient, RecipientProcess, ObjectPropertyReference, + ) +from bacpypes.apdu import ( + SubscribeCOVRequest, + ReadPropertyRequest, ReadPropertyACK, + ConfirmedCOVNotificationRequest, UnconfirmedCOVNotificationRequest, + SimpleAckPDU, Error, RejectPDU, AbortPDU, + ) + +from bacpypes.service.object import ( + ReadWritePropertyServices, + ) + +from bacpypes.service.cov import ( + ChangeOfValueServices, + ) +from bacpypes.local.device import LocalDeviceObject +from bacpypes.object import ( + BinaryValueObject, + ) + +from .helpers import ApplicationNetwork, ApplicationStateMachine + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + + +# +# COVTestClientServices +# + +@bacpypes_debugging +class COVTestClientServices(Capability): + + def do_ConfirmedCOVNotificationRequest(self, apdu): + if _debug: COVTestClientServices._debug("do_ConfirmedCOVNotificationRequest %r", apdu) + + # the test device needs to set these + assert hasattr(self, 'test_ack') + assert hasattr(self, 'test_reject') + assert hasattr(self, 'test_abort') + + if self.test_ack: + # success + response = SimpleAckPDU(context=apdu) + if _debug: COVTestClientServices._debug(" - simple_ack: %r", response) + + elif self.test_reject: + # reject + response = RejectPDU(reason=self.test_reject, context=apdu) + if _debug: COVTestClientServices._debug(" - reject: %r", response) + + elif self.test_abort: + # abort + response = AbortPDU(reason=self.test_abort, context=apdu) + if _debug: COVTestClientServices._debug(" - abort: %r", response) + + # return the result + self.response(response) + + def do_UnconfirmedCOVNotificationRequest(self, apdu): + if _debug: COVTestClientServices._debug("do_UnconfirmedCOVNotificationRequest %r", apdu) + + +@bacpypes_debugging +class TestBasic(unittest.TestCase): + + def test_basic(self): + """Test basic configuration of a network.""" + if _debug: TestBasic._debug("test_basic") + + # create a network + anet = ApplicationNetwork("test_basic") + + # add the service capability to the IUT + anet.iut.add_capability(ChangeOfValueServices) + + # all start states are successful + anet.td.start_state.success() + anet.iut.start_state.success() + + # run the group + anet.run() + + +@bacpypes_debugging +class TestBinaryValue(unittest.TestCase): + + def test_8_10_1(self): + """Confirmed Notifications Subscription""" + if _debug: TestBinaryValue._debug("test_8_10_1") + + # create a network + anet = ApplicationNetwork("test_8_10_1") + + # add the service capability to the IUT + anet.td.add_capability(COVTestClientServices) + anet.iut.add_capability(ChangeOfValueServices) + anet.iut.add_capability(ReadWritePropertyServices) + + # make a binary value object + test_bv = BinaryValueObject( + objectIdentifier=('binaryValue', 1), + objectName='bv', + presentValue='inactive', + statusFlags=[0, 0, 0, 0], + ) + + # an easy way to change the present value + write_test_bv = lambda v: setattr(test_bv, 'presentValue', v) + + # add it to the implementation + anet.iut.add_object(test_bv) + + # tell the TD how to respond to confirmed notifications + anet.td.test_ack = True + anet.td.test_reject = None + anet.td.test_abort = None + + # wait for the subscription + anet.iut.start_state.doc("8.10.1-1-0") \ + .receive(SubscribeCOVRequest).doc("8.10.1-1-1") \ + .success() + + # send the subscription, wait for the ack + anet.td.start_state.doc("8.10.1-2-0") \ + .send(SubscribeCOVRequest( + destination=anet.iut.address, + subscriberProcessIdentifier=1, + monitoredObjectIdentifier=('binaryValue', 1), + issueConfirmedNotifications=True, + lifetime=30, + )).doc("8.10.1-2-1") \ + .receive(SimpleAckPDU).doc("8.10.1-2-2") \ + .success() + + # run the group + anet.run() + + def test_8_10_2(self): + """Unconfirmed Notifications Subscription""" + if _debug: TestBinaryValue._debug("test_8_10_2") + + # create a network + anet = ApplicationNetwork("test_8_10_2") + + # add the service capability to the IUT + anet.td.add_capability(COVTestClientServices) + anet.iut.add_capability(ChangeOfValueServices) + + # make a binary value object + test_bv = BinaryValueObject( + objectIdentifier=('binaryValue', 1), + objectName='bv', + presentValue='inactive', + statusFlags=[0, 0, 0, 0], + ) + + # an easy way to change the present value + write_test_bv = lambda v: setattr(test_bv, 'presentValue', v) + + # add it to the implementation + anet.iut.add_object(test_bv) + + # tell the TD how to respond to confirmed notifications + anet.td.test_ack = True + anet.td.test_reject = None + anet.td.test_abort = None + + # wait for the subscription + anet.iut.start_state.doc("8.10.2-1-0") \ + .receive(SubscribeCOVRequest).doc("8.10.2-1-1") \ + .success() + + # send the subscription, wait for the ack + anet.td.start_state.doc("8.10.2-2-0") \ + .send(SubscribeCOVRequest( + destination=anet.iut.address, + subscriberProcessIdentifier=1, + monitoredObjectIdentifier=('binaryValue', 1), + issueConfirmedNotifications=False, + lifetime=30, + )).doc("8.10.2-2-1") \ + .receive(SimpleAckPDU).doc("8.10.2-2-2") \ + .success() + + # run the group, cut the time limit short + anet.run(time_limit=5.0) + + # check that the IUT still has the detection + if _debug: TestBinaryValue._debug(" - detections: %r", anet.iut.cov_detections) + assert len(anet.iut.cov_detections) == 1 + + # pop out the subscription list and criteria + obj_ref, criteria = anet.iut.cov_detections.popitem() + if _debug: TestBinaryValue._debug(" - criteria: %r", criteria) + + # get the list of subscriptions from the criteria + subscriptions = criteria.cov_subscriptions.cov_subscriptions + if _debug: TestBinaryValue._debug(" - subscriptions: %r", subscriptions) + assert len(subscriptions) == 1 + + def test_8_10_3(self): + """Canceling a Subscription""" + if _debug: TestBinaryValue._debug("test_8_10_3") + + # create a network + anet = ApplicationNetwork("test_8_10_3") + + # add the service capability to the IUT + anet.td.add_capability(COVTestClientServices) + anet.iut.add_capability(ChangeOfValueServices) + + # make a binary value object + test_bv = BinaryValueObject( + objectIdentifier=('binaryValue', 1), + objectName='bv', + presentValue='inactive', + statusFlags=[0, 0, 0, 0], + ) + + # an easy way to change the present value + write_test_bv = lambda v: setattr(test_bv, 'presentValue', v) + + # add it to the implementation + anet.iut.add_object(test_bv) + + # tell the TD how to respond to confirmed notifications + anet.td.test_ack = True + anet.td.test_reject = None + anet.td.test_abort = None + + # wait for the subscription, then for the cancelation + anet.iut.start_state.doc("8.10.3-1-0") \ + .receive(SubscribeCOVRequest).doc("8.10.3-1-1") \ + .receive(SubscribeCOVRequest).doc("8.10.3-1-2") \ + .success() + + # send the subscription, wait for the ack, then send the cancelation + # and wait for the ack. Ignore the notification that is sent when + # after the subscription + subscription_acked = anet.td.start_state.doc("8.10.3-2-0") \ + .send(SubscribeCOVRequest( + destination=anet.iut.address, + subscriberProcessIdentifier=1, + monitoredObjectIdentifier=('binaryValue', 1), + issueConfirmedNotifications=False, + lifetime=30, + )).doc("8.10.3-2-1") \ + .ignore(UnconfirmedCOVNotificationRequest) \ + .receive(SimpleAckPDU).doc("8.10.3-2-2") \ + .send(SubscribeCOVRequest( + destination=anet.iut.address, + subscriberProcessIdentifier=1, + monitoredObjectIdentifier=('binaryValue', 1), + )).doc("8.10.3-2-1") \ + .ignore(UnconfirmedCOVNotificationRequest) \ + .receive(SimpleAckPDU).doc("8.10.3-2-2") \ + .success() + + # run the group + anet.run() + + def test_8_10_4(self): + """Requests 8 Hour Lifetimes""" + if _debug: TestBinaryValue._debug("test_8_10_4") + + # create a network + anet = ApplicationNetwork("test_8_10_4") + + # add the service capability to the IUT + anet.td.add_capability(COVTestClientServices) + anet.iut.add_capability(ChangeOfValueServices) + + # make a binary value object + test_bv = BinaryValueObject( + objectIdentifier=('binaryValue', 1), + objectName='bv', + presentValue='inactive', + statusFlags=[0, 0, 0, 0], + ) + + # add it to the implementation + anet.iut.add_object(test_bv) + + # tell the TD how to respond to confirmed notifications + anet.td.test_ack = True + anet.td.test_reject = None + anet.td.test_abort = None + + # wait for the subscription + anet.iut.start_state.doc("8.10.4-1-0") \ + .receive(SubscribeCOVRequest).doc("8.10.4-1-1") \ + .success() + + # send the subscription, wait for the ack + anet.td.start_state.doc("8.10.4-2-0") \ + .send(SubscribeCOVRequest( + destination=anet.iut.address, + subscriberProcessIdentifier=1, + monitoredObjectIdentifier=('binaryValue', 1), + issueConfirmedNotifications=True, + lifetime=28800, + )).doc("8.10.4-2-1") \ + .receive(SimpleAckPDU).doc("8.10.4-2-2") \ + .success() + + # run the group + anet.run() + + def test_9_10_1_1(self): + if _debug: TestBinaryValue._debug("test_9_10_1_1") + + notification_fail_time = 0.5 + + # create a network + anet = ApplicationNetwork("test_9_10_1_1") + + # add the service capability to the IUT + anet.td.add_capability(COVTestClientServices) + anet.iut.add_capability(ChangeOfValueServices) + + # make a binary value object + test_bv = BinaryValueObject( + objectIdentifier=('binaryValue', 1), + objectName='bv', + presentValue='inactive', + statusFlags=[0, 0, 0, 0], + ) + + # add it to the implementation + anet.iut.add_object(test_bv) + + # tell the TD how to respond to confirmed notifications + anet.td.test_ack = True + anet.td.test_reject = None + anet.td.test_abort = None + + # wait for the subscription, wait for the notification ack + anet.iut.start_state.doc("9.10.1.1-1-0") \ + .receive(SubscribeCOVRequest).doc("9.10.1.1-1-1") \ + .receive(SimpleAckPDU).doc("9.10.1.1-1-2") \ + .timeout(10).doc("9.10.1.1-1-3") \ + .success() + + # test device is quiet + wait_for_notification = \ + anet.td.start_state.doc("9.10.1.1-2-0") \ + .send(SubscribeCOVRequest( + destination=anet.iut.address, + subscriberProcessIdentifier=1, + monitoredObjectIdentifier=('binaryValue', 1), + issueConfirmedNotifications=True, + lifetime=30, + )).doc("9.10.1.1-2-1") \ + .receive(SimpleAckPDU).doc("9.10.1.1-2-2") + + # after the ack, don't wait too long for the notification + wait_for_notification \ + .timeout(notification_fail_time).doc("9.10.1.1-2-3").fail() + + # if the notification is received, success + wait_for_notification \ + .receive(ConfirmedCOVNotificationRequest).doc("9.10.1.1-2-4") \ + .timeout(10).doc("9.10.1.1-2-5") \ + .success() + + # run the group + anet.run() + + def test_no_traffic(self): + """Test basic configuration of a network.""" + if _debug: TestBinaryValue._debug("test_no_traffic") + + # create a network + anet = ApplicationNetwork("test_no_traffic") + + # add the service capability to the IUT + anet.iut.add_capability(ChangeOfValueServices) + + # make a binary value object + test_bv = BinaryValueObject( + objectIdentifier=('binaryValue', 1), + objectName='bv', + presentValue='inactive', + statusFlags=[0, 0, 0, 0], + ) + + # an easy way to change the present value + write_test_bv = lambda v: setattr(test_bv, 'presentValue', v) + + # add it to the implementation + anet.iut.add_object(test_bv) + + # make some transitions + anet.iut.start_state.doc("1-1-0") \ + .call(write_test_bv, 'active').doc("1-1-1") \ + .timeout(1).doc("1-1-2") \ + .call(write_test_bv, 'inactive').doc("1-1-3") \ + .timeout(1).doc("1-1-4") \ + .success() + + # test device is quiet + anet.td.start_state.timeout(5).success() + + # run the group + anet.run() + + def test_simple_transition_confirmed(self): + if _debug: TestBinaryValue._debug("test_simple_transition_confirmed") + + # create a network + anet = ApplicationNetwork("test_simple_transition_confirmed") + + # add the service capability to the IUT + anet.td.add_capability(COVTestClientServices) + anet.iut.add_capability(ChangeOfValueServices) + + # make a binary value object + test_bv = BinaryValueObject( + objectIdentifier=('binaryValue', 1), + objectName='bv', + presentValue='inactive', + statusFlags=[0, 0, 0, 0], + ) + + # an easy way to change the present value + write_test_bv = lambda v: setattr(test_bv, 'presentValue', v) + + # add it to the implementation + anet.iut.add_object(test_bv) + + # tell the TD how to respond to confirmed notifications + anet.td.test_ack = True + anet.td.test_reject = None + anet.td.test_abort = None + + # receive the subscription request, wait until the client has + # received the ack and the 'instant' notification. Then change the + # value and wait for the ack. + anet.iut.start_state.doc("2-1-0") \ + .receive(SubscribeCOVRequest).doc("2-1-1") \ + .receive(SimpleAckPDU).doc("2-1-2") \ + .wait_event("e1").doc("2-1-3") \ + .call(write_test_bv, 'active').doc("2-1-4") \ + .receive(SimpleAckPDU).doc("2-1-5") \ + .timeout(10).doc("2-2-6") \ + .success() + + # send the subscription request, wait for the ack and the 'instant' + # notification, set the event so the IUT can continue, then wait + # for the next notification + anet.td.start_state.doc("2-2-0") \ + .send(SubscribeCOVRequest( + destination=anet.iut.address, + subscriberProcessIdentifier=1, + monitoredObjectIdentifier=('binaryValue', 1), + issueConfirmedNotifications=True, + lifetime=30, + )).doc("2-2-1") \ + .receive(SimpleAckPDU).doc("2-2-2") \ + .receive(ConfirmedCOVNotificationRequest).doc("2-2-4") \ + .set_event("e1").doc("2-2-3") \ + .receive(ConfirmedCOVNotificationRequest).doc("2-2-4") \ + .timeout(10).doc("2-2-5") \ + .success() + + # run the group + anet.run() + + def test_simple_transition_unconfirmed(self): + if _debug: TestBinaryValue._debug("test_simple_transition_unconfirmed") + + # create a network + anet = ApplicationNetwork("test_simple_transition_unconfirmed") + + # add the service capability to the IUT + anet.td.add_capability(COVTestClientServices) + anet.iut.add_capability(ChangeOfValueServices) + + # make a binary value object + test_bv = BinaryValueObject( + objectIdentifier=('binaryValue', 1), + objectName='bv', + presentValue='inactive', + statusFlags=[0, 0, 0, 0], + ) + + # an easy way to change the present value + write_test_bv = lambda v: setattr(test_bv, 'presentValue', v) + + # add it to the implementation + anet.iut.add_object(test_bv) + + # tell the TD how to respond to confirmed notifications + anet.td.test_ack = True + anet.td.test_reject = None + anet.td.test_abort = None + + # receive the subscription request, wait until the client has + # received the ack and the 'instant' notification. Then change the + # value, no ack coming back + anet.iut.start_state.doc("3-1-0") \ + .receive(SubscribeCOVRequest).doc("3-1-1") \ + .wait_event("e1").doc("3-1-2") \ + .call(write_test_bv, 'active').doc("3-1-3") \ + .timeout(10).doc("3-2-4") \ + .success() + + # test device is quiet + anet.td.start_state.doc("3-2-0") \ + .send(SubscribeCOVRequest( + destination=anet.iut.address, + subscriberProcessIdentifier=1, + monitoredObjectIdentifier=('binaryValue', 1), + issueConfirmedNotifications=False, + lifetime=30, + )).doc("3-2-1") \ + .receive(SimpleAckPDU).doc("3-2-2") \ + .receive(UnconfirmedCOVNotificationRequest).doc("3-2-3") \ + .set_event("e1").doc("3-2-4") \ + .receive(UnconfirmedCOVNotificationRequest).doc("3-2-5") \ + .timeout(10).doc("3-2-6") \ + .success() + + # run the group + anet.run() + + def test_changing_status_flags(self): + """This test changes the status flags of binary value point to verify + that the detection picks up other changes, most tests just change the + present value.""" + if _debug: TestBinaryValue._debug("test_changing_status_flags") + + # create a network + anet = ApplicationNetwork("test_changing_status_flags") + + # add the service capability to the IUT + anet.td.add_capability(COVTestClientServices) + anet.iut.add_capability(ChangeOfValueServices) + + # make a binary value object + test_bv = BinaryValueObject( + objectIdentifier=('binaryValue', 1), + objectName='bv', + presentValue='inactive', + statusFlags=[0, 0, 0, 0], + ) + + # an easy way to change the present value + def test_bv_fault(): + if _debug: TestBinaryValue._debug("test_bv_fault") + test_bv.statusFlags = [0, 1, 0, 0] + + # add it to the implementation + anet.iut.add_object(test_bv) + + # receive the subscription request, wait until the client has + # received the ack and the 'instant' notification. Then change the + # value, no ack coming back + anet.iut.start_state.doc("4-1-0") \ + .receive(SubscribeCOVRequest).doc("4-1-1") \ + .wait_event("e1").doc("4-1-2") \ + .call(test_bv_fault).doc("4-1-3") \ + .timeout(10).doc("4-2-4") \ + .success() + + # test device is quiet + anet.td.start_state.doc("4-2-0") \ + .send(SubscribeCOVRequest( + destination=anet.iut.address, + subscriberProcessIdentifier=1, + monitoredObjectIdentifier=('binaryValue', 1), + issueConfirmedNotifications=False, + lifetime=30, + )).doc("4-2-1") \ + .receive(SimpleAckPDU).doc("4-2-2") \ + .receive(UnconfirmedCOVNotificationRequest).doc("4-2-3") \ + .set_event("e1").doc("4-2-4") \ + .receive(UnconfirmedCOVNotificationRequest).doc("4-2-5") \ + .timeout(10).doc("4-2-6") \ + .success() + + # run the group + anet.run() + + def test_changing_properties(self): + """This test changes the value of multiple properties to verify that + only one COV notification is sent.""" + if _debug: TestBinaryValue._debug("test_changing_properties") + + # create a network + anet = ApplicationNetwork("test_changing_properties") + + # add the service capability to the IUT + anet.td.add_capability(COVTestClientServices) + anet.iut.add_capability(ChangeOfValueServices) + + # make a binary value object + test_bv = BinaryValueObject( + objectIdentifier=('binaryValue', 1), + objectName='bv', + presentValue='inactive', + statusFlags=[0, 0, 0, 0], + ) + + # an easy way to change the present value + def test_bv_fault(): + if _debug: TestBinaryValue._debug("test_bv_fault") + test_bv.presentValue = 'active' + test_bv.statusFlags = [0, 0, 1, 0] + + # add it to the implementation + anet.iut.add_object(test_bv) + + # receive the subscription request, wait until the client has + # received the ack and the 'instant' notification. Then change the + # value, no ack coming back + anet.iut.start_state.doc("5-1-0") \ + .receive(SubscribeCOVRequest).doc("5-1-1") \ + .wait_event("e1").doc("5-1-2") \ + .call(test_bv_fault).doc("5-1-3") \ + .timeout(10).doc("5-2-4") \ + .success() + + # test device is quiet + anet.td.start_state.doc("5-2-0") \ + .send(SubscribeCOVRequest( + destination=anet.iut.address, + subscriberProcessIdentifier=1, + monitoredObjectIdentifier=('binaryValue', 1), + issueConfirmedNotifications=False, + lifetime=30, + )).doc("5-2-1") \ + .receive(SimpleAckPDU).doc("5-2-2") \ + .receive(UnconfirmedCOVNotificationRequest).doc("5-2-3") \ + .set_event("e1").doc("5-2-4") \ + .receive(UnconfirmedCOVNotificationRequest).doc("5-2-5") \ + .timeout(10).doc("5-2-6") \ + .success() + + # run the group + anet.run() + + def test_multiple_subscribers(self): + """This has more than one subscriber for the object.""" + if _debug: TestBinaryValue._debug("test_multiple_subscribers") + + # create a network + anet = ApplicationNetwork("test_multiple_subscribers") + + # add the service capability to the IUT + anet.td.add_capability(COVTestClientServices) + anet.iut.add_capability(ChangeOfValueServices) + + # make a binary value object + test_bv = BinaryValueObject( + objectIdentifier=('binaryValue', 1), + objectName='bv', + presentValue='inactive', + statusFlags=[0, 0, 0, 0], + ) + + # an easy way to change both the present value and status flags + # which should trigger only one notification + def test_bv_fault(): + if _debug: TestBinaryValue._debug("test_bv_fault") + test_bv.presentValue = 'active' + test_bv.statusFlags = [0, 0, 1, 0] + + # add it to the implementation + anet.iut.add_object(test_bv) + + # add another test device object + anet.td2_device_object = LocalDeviceObject( + objectName="td2", + objectIdentifier=("device", 30), + maxApduLengthAccepted=1024, + segmentationSupported='noSegmentation', + vendorIdentifier=999, + ) + + # another test device + anet.td2 = ApplicationStateMachine(anet.td2_device_object, anet.vlan) + anet.td2.add_capability(COVTestClientServices) + anet.append(anet.td2) + + # receive the subscription requests, wait until both clients have + # received the ack and the 'instant' notification. Then change the + # value, no ack coming back + anet.iut.start_state.doc("6-1-0") \ + .receive(SubscribeCOVRequest, pduSource=anet.td.address).doc("6-1-1") \ + .receive(SubscribeCOVRequest, pduSource=anet.td2.address).doc("6-1-2") \ + .wait_event("e2").doc("6-1-3") \ + .call(test_bv_fault).doc("6-1-4") \ + .timeout(10).doc("6-2-5") \ + .success() + + # first test device; send the subscription request, get an ack + # followed by the 'instant' notification + anet.td.start_state.doc("6-2-0") \ + .send(SubscribeCOVRequest( + destination=anet.iut.address, + subscriberProcessIdentifier=1, + monitoredObjectIdentifier=('binaryValue', 1), + issueConfirmedNotifications=False, + lifetime=30, + )).doc("6-2-1") \ + .receive(SimpleAckPDU).doc("6-2-2") \ + .receive(UnconfirmedCOVNotificationRequest).doc("6-2-3") \ + .set_event("e1").doc("6-2-4") \ + .receive(UnconfirmedCOVNotificationRequest).doc("6-2-5") \ + .timeout(10).doc("6-2-6") \ + .success() + + # same pa + anet.td2.start_state.doc("6-3-0") \ + .wait_event("e1").doc("6-3-1") \ + .send(SubscribeCOVRequest( + destination=anet.iut.address, + subscriberProcessIdentifier=1, + monitoredObjectIdentifier=('binaryValue', 1), + issueConfirmedNotifications=False, + lifetime=30, + )).doc("6-3-2") \ + .receive(SimpleAckPDU).doc("6-3-3") \ + .receive(UnconfirmedCOVNotificationRequest).doc("6-3-4") \ + .set_event("e2").doc("6-3-5") \ + .receive(UnconfirmedCOVNotificationRequest).doc("6-3-6") \ + .timeout(10).doc("6-3-7") \ + .success() + + # run the group + anet.run() + diff --git a/tests/test_service/test_device.py b/tests/test_service/test_device.py index 995b9e8d..20af8b43 100644 --- a/tests/test_service/test_device.py +++ b/tests/test_service/test_device.py @@ -17,7 +17,7 @@ WhoIsRequest, IAmRequest, WhoHasRequest, WhoHasObject, IHaveRequest, DeviceCommunicationControlRequest, ReadPropertyRequest, - SimpleAckPDU, Error, RejectPDU, AbortPDU, + ConfirmedRequestPDU, SimpleAckPDU, Error, RejectPDU, AbortPDU, ) from bacpypes.service.device import ( @@ -25,7 +25,7 @@ DeviceCommunicationControlServices, ) -from .helpers import ApplicationNetwork, SnifferNode +from .helpers import ApplicationNetwork, SnifferStateMachine # some debugging _debug = 0 @@ -40,7 +40,7 @@ def test_basic(self): if _debug: TestBasic._debug("test_basic") # create a network - anet = ApplicationNetwork() + anet = ApplicationNetwork("test_basic") # all start states are successful anet.td.start_state.success() @@ -58,7 +58,7 @@ def test_whois_unconstrained(self): if _debug: TestWhoIsIAm._debug("test_whois_unconstrained") # create a network - anet = ApplicationNetwork() + anet = ApplicationNetwork("test_whois_unconstrained") # add the service capability to the IUT anet.iut.add_capability(WhoIsIAmServices) @@ -80,7 +80,7 @@ def test_whois_range_below(self): if _debug: TestWhoIsIAm._debug("test_whois_range_below") # create a network - anet = ApplicationNetwork() + anet = ApplicationNetwork("test_whois_range_below") # add the service capability to the iut anet.iut.add_capability(WhoIsIAmServices) @@ -105,7 +105,7 @@ def test_whois_range_above(self): if _debug: TestWhoIsIAm._debug("test_whois_range_above") # create a network - anet = ApplicationNetwork() + anet = ApplicationNetwork("test_whois_range_above") # add the service capability to the iut anet.iut.add_capability(WhoIsIAmServices) @@ -130,7 +130,7 @@ def test_whois_range(self): if _debug: TestWhoIsIAm._debug("test_whois_range") # create a network - anet = ApplicationNetwork() + anet = ApplicationNetwork("test_whois_range") # add the service capability to the IUT anet.iut.add_capability(WhoIsIAmServices) @@ -160,7 +160,7 @@ def test_who_has_object_by_name(self): if _debug: TestWhoIsIAm._debug("test_who_has_object_by_name") # create a network - anet = ApplicationNetwork() + anet = ApplicationNetwork("test_who_has_object_by_name") # add the service capability to the IUT anet.iut.add_capability(WhoHasIHaveServices) @@ -185,7 +185,7 @@ def test_who_has_object_by_id(self): if _debug: TestWhoIsIAm._debug("test_who_has_object_by_id") # create a network - anet = ApplicationNetwork() + anet = ApplicationNetwork("test_who_has_object_by_id") # add the service capability to the IUT anet.iut.add_capability(WhoHasIHaveServices) @@ -213,7 +213,7 @@ def test_default_behavior(self): if _debug: TestDeviceCommunicationControl._debug("test_default_behavior") # create a network - anet = ApplicationNetwork() + anet = ApplicationNetwork("test_default_behavior") # add the service capability to the IUT anet.iut.add_capability(WhoIsIAmServices) @@ -236,7 +236,7 @@ def test_disable(self): if _debug: TestDeviceCommunicationControl._debug("test_disable") # create a network - anet = ApplicationNetwork() + anet = ApplicationNetwork("test_disable") # add the service capability to the IUT anet.iut.add_capability(WhoIsIAmServices) @@ -263,10 +263,10 @@ def test_disable_initiation(self): """Test disabling initiation. After the DCC request send the IUT a WhoIsRequest and verify that the IAmRequest makes it back. """ - if _debug: TestDeviceCommunicationControl._debug("test_disable") + if _debug: TestDeviceCommunicationControl._debug("test_disable_initiation") # create a network - anet = ApplicationNetwork() + anet = ApplicationNetwork("test_disable_initiation") # add the service capability to the IUT anet.iut.add_capability(WhoIsIAmServices) @@ -297,7 +297,7 @@ def test_disable_time_duration(self): if _debug: TestDeviceCommunicationControl._debug("test_disable_time_duration") # create a network - anet = ApplicationNetwork() + anet = ApplicationNetwork("test_disable_time_duration") # add the service capability to the IUT anet.iut.add_capability(WhoIsIAmServices) @@ -333,7 +333,7 @@ def test_correct_password(self): if _debug: TestDeviceCommunicationControl._debug("test_correct_password") # create a network - anet = ApplicationNetwork() + anet = ApplicationNetwork("test_correct_password") # add the service capability to the IUT anet.iut.add_capability(WhoIsIAmServices) @@ -364,7 +364,7 @@ def test_incorrect_password(self): if _debug: TestDeviceCommunicationControl._debug("test_incorrect_password") # create a network - anet = ApplicationNetwork() + anet = ApplicationNetwork("test_incorrect_password") # add the service capability to the IUT anet.iut.add_capability(WhoIsIAmServices) @@ -409,7 +409,7 @@ def test_9_39_1(self): if _debug: TestUnrecognizedService._debug("test_9_39_1") # create a network - anet = ApplicationNetwork() + anet = ApplicationNetwork("test_9_39_1") # send the request, get it rejected anet.td.start_state.doc("7-6-0") \ @@ -435,13 +435,13 @@ def test_apdu_retry_default(self): if _debug: TestAPDURetryTimeout._debug("test_apdu_retry") # create a network - anet = ApplicationNetwork() + anet = ApplicationNetwork("test_apdu_retry") # adjust test if default retries changes assert anet.iut_device_object.numberOfApduRetries == 3 # add a sniffer to see requests without doing anything - sniffer = SnifferNode(anet.vlan) + sniffer = SnifferStateMachine(anet.vlan) anet.append(sniffer) # no TD application layer matching @@ -459,10 +459,10 @@ def test_apdu_retry_default(self): # see the attempts and nothing else sniffer.start_state.doc("7-8-0") \ - .receive(PDU).doc("7-8-1") \ - .receive(PDU).doc("7-8-2") \ - .receive(PDU).doc("7-8-3") \ - .receive(PDU).doc("7-8-4") \ + .receive(ConfirmedRequestPDU).doc("7-8-1") \ + .receive(ConfirmedRequestPDU).doc("7-8-2") \ + .receive(ConfirmedRequestPDU).doc("7-8-3") \ + .receive(ConfirmedRequestPDU).doc("7-8-4") \ .timeout(10).doc("7-8-5") \ .success() @@ -474,13 +474,13 @@ def test_apdu_retry_1(self): if _debug: TestAPDURetryTimeout._debug("test_apdu_retry_1") # create a network - anet = ApplicationNetwork() + anet = ApplicationNetwork("test_apdu_retry_1") # change the retry count in the device properties anet.iut_device_object.numberOfApduRetries = 1 # add a sniffer to see requests without doing anything - sniffer = SnifferNode(anet.vlan) + sniffer = SnifferStateMachine(anet.vlan) anet.append(sniffer) # no TD application layer matching @@ -498,8 +498,8 @@ def test_apdu_retry_1(self): # see the attempts and nothing else sniffer.start_state.doc("7-10-0") \ - .receive(PDU).doc("7-10-1") \ - .receive(PDU).doc("7-10-2") \ + .receive(ConfirmedRequestPDU).doc("7-10-1") \ + .receive(ConfirmedRequestPDU).doc("7-10-2") \ .timeout(10).doc("7-10-3") \ .success() diff --git a/tests/test_utilities/test_state_machine.py b/tests/test_utilities/test_state_machine.py index f9df30cb..9ee3827d 100644 --- a/tests/test_utilities/test_state_machine.py +++ b/tests/test_utilities/test_state_machine.py @@ -277,7 +277,31 @@ def test_state_machine_call(self): # check for success assert not tsm.running - assert tsm.current_state.is_success_state + assert tsm.is_success_state + + # check for the call + assert self._called + + def test_state_machine_call_exception(self): + if _debug: TestStateMachine._debug("test_state_machine_call_exception") + + # simple hook + self._called = False + + def fn(): + self._called = True + raise AssertionError("error") + + # create a trapped state machine + tsm = TrappedStateMachine() + + # make a send transition from start to success, run the machine + tsm.start_state.call(fn).success() + tsm.run() + + # check for failed call + assert not tsm.running + assert tsm.is_fail_state # check for the call assert self._called