From 825e01643d894111f77daec8a04cb23c62453dfb Mon Sep 17 00:00:00 2001 From: Joel Bender Date: Tue, 20 Oct 2020 10:40:06 -0400 Subject: [PATCH] roll up changes and release 0.18.3 --- .../gettingstarted/gettingstarted001.rst | 196 ++++----- .../gettingstarted/gettingstarted002.rst | 82 ++-- pcap_tools/AddressFilter.py | 129 ------ pcap_tools/COVNotificationSummaryFilter.py | 149 ------- pcap_tools/EventNotificationSummaryFilter.py | 215 ---------- pcap_tools/IAmRouterToNetworkSummaryFilter.py | 161 -------- pcap_tools/PDUsPerMinuteFilter.py | 108 ----- pcap_tools/ReadPropertySummaryFilter.py | 228 ----------- pcap_tools/ReadPropertyTimeoutFilter.py | 177 -------- pcap_tools/WhoIsIAmDeviceFilter.py | 158 -------- pcap_tools/WhoIsIAmSummaryFilter.py | 160 -------- .../WhoIsRouterToNetworkSummaryFilter.py | 160 -------- py25/bacpypes/__init__.py | 2 +- py25/bacpypes/basetypes.py | 4 +- py27/bacpypes/__init__.py | 2 +- py27/bacpypes/basetypes.py | 4 +- py27/bacpypes/local/object.py | 2 +- py34/bacpypes/__init__.py | 2 +- py34/bacpypes/basetypes.py | 4 +- samples/COVServer.py | 198 +++++---- samples/Tutorial/WhoIsIAm.py | 181 --------- samples/WhoIsIAm.py | 83 ++-- sandbox/cov_analog_input_object.py | 378 ++++++++++++++++++ sandbox/io.py | 26 +- 24 files changed, 693 insertions(+), 2116 deletions(-) delete mode 100755 pcap_tools/AddressFilter.py delete mode 100755 pcap_tools/COVNotificationSummaryFilter.py delete mode 100755 pcap_tools/EventNotificationSummaryFilter.py delete mode 100755 pcap_tools/IAmRouterToNetworkSummaryFilter.py delete mode 100755 pcap_tools/PDUsPerMinuteFilter.py delete mode 100755 pcap_tools/ReadPropertySummaryFilter.py delete mode 100755 pcap_tools/ReadPropertyTimeoutFilter.py delete mode 100755 pcap_tools/WhoIsIAmDeviceFilter.py delete mode 100755 pcap_tools/WhoIsIAmSummaryFilter.py delete mode 100755 pcap_tools/WhoIsRouterToNetworkSummaryFilter.py delete mode 100644 samples/Tutorial/WhoIsIAm.py mode change 100755 => 100644 samples/WhoIsIAm.py create mode 100755 sandbox/cov_analog_input_object.py diff --git a/doc/source/gettingstarted/gettingstarted001.rst b/doc/source/gettingstarted/gettingstarted001.rst index b48997ba..9dce3035 100644 --- a/doc/source/gettingstarted/gettingstarted001.rst +++ b/doc/source/gettingstarted/gettingstarted001.rst @@ -3,25 +3,25 @@ Getting Started =============== -Ah, so you are interested in getting started with BACnet and Python. Welcome -to BACpypes, I hope you enjoy your journey. This tutorial starts with -just enough of the basics of BACnet to get a workstation communicating with -another device. We will cover installing the library, downloading and +Ah, so you are interested in getting started with BACnet and Python. Welcome +to BACpypes, I hope you enjoy your journey. This tutorial starts with +just enough of the basics of BACnet to get a workstation communicating with +another device. We will cover installing the library, downloading and configuring the samples applications. Basic Assumptions ----------------- -I will assume you are a software developer and it is your job to communicate -with a device from another company that uses BACnet. Your employer has -given you a test device and purchased a copy of the BACnet standard. I will +I will assume you are a software developer and it is your job to communicate +with a device from another company that uses BACnet. Your employer has +given you a test device and purchased a copy of the BACnet standard. I will need... -- a development workstation running some flavor of Linux or Windows, complete with +- a development workstation running some flavor of Linux or Windows, complete with the latest version of Python (2.7 or >3.4) and `setup tools `_. -- a small Ethernet hub into which you can plug both your workstation and your +- a small Ethernet hub into which you can plug both your workstation and your mysterious BACnet device, so you won't be distracted by lots of other network traffic. - a BACnetIP/BACnet-MSTP Router if your mysterious device is an MSTP device (BACpypes is @@ -33,7 +33,7 @@ need... `Anaconda `_ or Enthought `Canopy `_. -Before getting this test environment set up and while you are still connected +Before getting this test environment set up and while you are still connected to the internet, install the BACpypes library:: $ sudo easy_install bacpypes @@ -42,7 +42,7 @@ or:: $ sudo pip install bacpypes -And while you are at it, get a copy of the BACpypes project from GitHub. It +And while you are at it, get a copy of the BACpypes project from GitHub. It contains the library source code, sample code, and this documentation. Install the `Git `_ software from `here `_, then make a local copy of the @@ -50,11 +50,11 @@ repository by cloning it:: $ git clone https://github.com/JoelBender/bacpypes.git -No protocol analysis workbench would be complete without an installed +No protocol analysis workbench would be complete without an installed copy of `Wireshark `_:: $ sudo apt-get install wireshark - + or if you use Windows, `download it here `_. .. caution:: @@ -67,15 +67,15 @@ or if you use Windows, `download it here `_, and sequentially - higher numbers are used in many applications (i.e. 47809, 47810,...). + The BACnet protocol has been assigned port 47808 (hex 0xBAC0) by + by the `Internet Assigned Numbers Authority `_, and sequentially + higher numbers are used in many applications (i.e. 47809, 47810,...). There are some BACnet routing and networking issues related to using these higher unoffical ports, but that is a topic for another tutorial. @@ -213,43 +213,43 @@ Starting An Application ----------------------- The simplest BACpypes sample application is the **WhoIsIAm.py** -application. It sends out Who-Is and I-Am messages and +application. It sends out Who-Is and I-Am messages and displays the results it receives. What are these things? -As mentioned before, BACnet has unique device identifiers and -most applications use these identifiers in their configuration +As mentioned before, BACnet has unique device identifiers and +most applications use these identifiers in their configuration to know who their peers are. Once these identifiers are given to a device they typically do not change, even as the network topology changes. -BACnet devices use the Who-Is request to translate device -identifiers into network addresses. This is very similar to -a decentralized DNS service, but the names are unsigned -integers. The request is broadcast on the network and the +BACnet devices use the Who-Is request to translate device +identifiers into network addresses. This is very similar to +a decentralized DNS service, but the names are unsigned +integers. The request is broadcast on the network and the client waits around to listen for I-Am messages. The source -address of the I-Am response is "bound" to the device identifier +address of the I-Am response is "bound" to the device identifier and most communications are unicast thereafter. -First, start up Wireshark on your workstation and a capture +First, start up Wireshark on your workstation and a capture session with a BACnet capture filter:: udp and port 47808 -You might start seeing BACnet traffic from your test device, -and if you wait to power it on after starting your capture -you should see at least a broadcast I-Am message. By looking -in the I-Am packet decoding you will see some of its -configuration parameters that should match what you expected +You might start seeing BACnet traffic from your test device, +and if you wait to power it on after starting your capture +you should see at least a broadcast I-Am message. By looking +in the I-Am packet decoding you will see some of its +configuration parameters that should match what you expected them to be. Now start the simplest tutorial application:: - $ python samples/Tutorial/WhoIsIAm.py + $ python samples/WhoIsIAm.py .. note:: The samples folder contains a Tutorial folder holding all the samples - that you will need too follow along this tutorial. + that you will need to follow along with this tutorial. Later, the folder `HandsOnLabs` will be used as it contains the samples that are fully explained in this document (see table of content) @@ -272,7 +272,7 @@ BACnet communications traffic. Generate the basic I-Am message:: > iam -You should see Wireshark capture your I-Am message containing your configuration +You should see Wireshark capture your I-Am message containing your configuration parameters. This is a "global broadcast" message. Your test device will see it but since your test device probably isn't looking for you, it will not respond to the message. @@ -281,27 +281,27 @@ respond to the message. Binding to the Test Device -------------------------- -Next we want to confirm that your workstation can receive the -messages the test device sends out. We do this by generating a -generic Who-Is request. The request will be "unconstrained", meaning +Next we want to confirm that your workstation can receive the +messages the test device sends out. We do this by generating a +generic Who-Is request. The request will be "unconstrained", meaning every device that hears the message will respond with their corresponding -I-Am messages. +I-Am messages. -.. caution:: +.. caution:: - Generating **unconstrained** Who-Is requests on a large network will create - a LOT of traffic, which can lead to network problems caused by the resulting + Generating **unconstrained** Who-Is requests on a large network will create + a LOT of traffic, which can lead to network problems caused by the resulting flood of messages. - + To generate the Who-Is request:: > whois -You should see the Who-Is request captured in Wireshark along with the I-Am -response from your test device, and then the details of the response displayed +You should see the Who-Is request captured in Wireshark along with the I-Am +response from your test device, and then the details of the response displayed on the workstation console.:: - > whois + > whois > pduSource = iAmDeviceIdentifier = ('device', 1000) maxAPDULengthAccepted = 480 @@ -309,13 +309,13 @@ on the workstation console.:: vendorID = 8 -There are a few different forms of the *whois* command supported by this +There are a few different forms of the *whois* command supported by this simple application. You can see these with the help command:: > help whois whois [ ] [ ] -This is like a BNF syntax, the **whois** command is optionally +This is like a BNF syntax, the **whois** command is optionally followed by a BACnet device address, and then optionally followed by a low (address) limit and high (address) limit. The most common use of the Who-Is request is to look for a specific device given its device @@ -329,9 +329,9 @@ building as a group:: > whois 203000 203099 -Every once in a while a contractor might install a BACnet +Every once in a while a contractor might install a BACnet device that hasn't been properly configured. Assuming that -it has an IP address, you can send an **unconstrained Who-Is** request +it has an IP address, you can send an **unconstrained Who-Is** request to the specific device and hope that it responds:: > whois 192.168.0.10 @@ -349,9 +349,9 @@ but that is a subject of an other tutorial. What's Next ----------- -The next tutorial describes the different ways this +The next tutorial describes the different ways this application can be run, and what the commands can tell you -about how it is working. All of the "console" applications -(i.e. those that prompt for commands) use the same basic +about how it is working. All of the "console" applications +(i.e. those that prompt for commands) use the same basic commands and work the same way. diff --git a/doc/source/gettingstarted/gettingstarted002.rst b/doc/source/gettingstarted/gettingstarted002.rst index 4d22180c..a9084b93 100644 --- a/doc/source/gettingstarted/gettingstarted002.rst +++ b/doc/source/gettingstarted/gettingstarted002.rst @@ -3,9 +3,9 @@ Running BACpypes Applications ============================= -All BACpypes sample applications have the same basic set of command line -options so it is easy to move between applications, turn debugging on and -and use different configurations. There may be additional options and +All BACpypes sample applications have the same basic set of command line +options so it is easy to move between applications, turn debugging on and +and use different configurations. There may be additional options and command parameters than just the ones described in this section. Getting Help @@ -14,7 +14,7 @@ Getting Help Whatever the command line parameters and additional options might be for an application, you can start with help:: - $ python Tutorial/WhoIsIAm.py --help + $ python samples/WhoIsIAm.py --help usage: WhoIsIAm.py [-h] [--buggers] [--debug [DEBUG [DEBUG ...]]] [--color] [--ini INI] This application presents a 'console' prompt to the user asking for Who-Is and @@ -33,22 +33,22 @@ an application, you can start with help:: Listing Debugging Loggers ------------------------- -The BACpypes library and sample applications make extensive use of the -built-in *logging* module in Python. Every module in the library, along -with every class and exported function, has a logging object associated -with it. By attaching a log handler to a logger, the log handler is given +The BACpypes library and sample applications make extensive use of the +built-in *logging* module in Python. Every module in the library, along +with every class and exported function, has a logging object associated +with it. By attaching a log handler to a logger, the log handler is given a chance to output the progress of the application. -Because BACpypes modules are deeply interconnected, dumping a complete list -of all of the logger names is a long list. Start out focusing on the +Because BACpypes modules are deeply interconnected, dumping a complete list +of all of the logger names is a long list. Start out focusing on the components of the WhoIsIAm.py application:: - $ python Tutorial/WhoIsIAm.py --buggers | grep __main__ + $ python samples/WhoIsIAm.py --buggers | grep __main__ __main__ __main__.WhoIsIAmApplication __main__.WhoIsIAmConsoleCmd -In this sample, the entire application is called __main__ and it defines +In this sample, the entire application is called __main__ and it defines two classes. Debugging a Module @@ -61,12 +61,12 @@ Telling the application to debug a module is simple:: DEBUG:__main__: - args: Namespace(buggers=False, debug=['__main__'], ini=) DEBUG:__main__.WhoIsIAmApplication:__init__ (, '128.253.109.40/24:47808') DEBUG:__main__:running - > + > -The output is the severity code of the logger (almost always DEBUG), the name -of the module, class, or function, then some message about the progress of the -application. From the output above you can see the application initializing, -setting the args variable, creating an instance of the WhoIsIAmApplication class +The output is the severity code of the logger (almost always DEBUG), the name +of the module, class, or function, then some message about the progress of the +application. From the output above you can see the application initializing, +setting the args variable, creating an instance of the WhoIsIAmApplication class (with some parameters), and then declaring itself - running. @@ -76,52 +76,52 @@ Debugging a Class Debugging all of the classes and functions can generate a lot of output, so it is useful to focus on a specific function or class:: - $ python Tutorial/WhoIsIAm.py --debug __main__.WhoIsIAmApplication + $ python samples/WhoIsIAm.py --debug __main__.WhoIsIAmApplication DEBUG:__main__.WhoIsIAmApplication:__init__ (, '128.253.109.40/24:47808') - > + > -The same method is used to debug the activity of a BACpypes module, for +The same method is used to debug the activity of a BACpypes module, for example, there is a class called UDPActor in the UDP module:: - $ python Tutorial/WhoIsIAm.py --ini BAC0.ini --debug bacpypes.udp.UDPActor + $ python samples/WhoIsIAm.py --ini BAC0.ini --debug bacpypes.udp.UDPActor > DEBUG:bacpypes.udp.UDPActor:__init__ ('128.253.109.254', 47808) DEBUG:bacpypes.udp.UDPActor:response pduSource = ('128.253.109.254', 47808) pduData = x'81.04.00.37.0A.10.6D.45.BA.C0.01.28.FF.FF.00.00.B6.01.05.FD...' -In this sample, an instance of a UDPActor is created and then its response -function is called with an instance of a PDU as a parameter. Following +In this sample, an instance of a UDPActor is created and then its response +function is called with an instance of a PDU as a parameter. Following the function invocation description, the debugging output continues with the -contents of the PDU. Notice, the protocol data is printed as a hex +contents of the PDU. Notice, the protocol data is printed as a hex encoded string (and restricted to just the first 20 bytes of the message). -You can debug a function just as easily. Specify as many different -combinations of logger names as necessary. Note, you cannot debug a +You can debug a function just as easily. Specify as many different +combinations of logger names as necessary. Note, you cannot debug a specific function within a class. Sending Debug Log to a file ---------------------------- -The current --debug command line option takes a list of named debugging access -points and attaches a StreamHandler which sends the output to sys.stderr. -There is a way to send the debugging output to a -RotatingFileHandler by providing a file name, and optionally maxBytes and -backupCount. For example, this invocation sends the main application debugging -to standard error and the debugging output of the bacpypes.udp module to the +The current --debug command line option takes a list of named debugging access +points and attaches a StreamHandler which sends the output to sys.stderr. +There is a way to send the debugging output to a +RotatingFileHandler by providing a file name, and optionally maxBytes and +backupCount. For example, this invocation sends the main application debugging +to standard error and the debugging output of the bacpypes.udp module to the traffic.txt file:: - $ python Tutorial/WhoIsIAm.py --debug __main__ bacpypes.udp:traffic.txt + $ python samples/WhoIsIAm.py --debug __main__ bacpypes.udp:traffic.txt -By default the `maxBytes` is zero so there is no rotating file, but it can be +By default the `maxBytes` is zero so there is no rotating file, but it can be provided, for example this limits the file size to 1MB:: - $ python Tutorial/WhoIsIAm.py --debug __main__ bacpypes.udp:traffic.txt:1048576 + $ python samples/WhoIsIAm.py --debug __main__ bacpypes.udp:traffic.txt:1048576 -If `maxBytes` is provided, then by default the `backupCount` is 10, but it can also +If `maxBytes` is provided, then by default the `backupCount` is 10, but it can also be specified, so this limits the output to one hundred files:: - $ python Tutorial/WhoIsIAm.py --debug __main__ bacpypes.udp:traffic.txt:1048576:100 + $ python samples/WhoIsIAm.py --debug __main__ bacpypes.udp:traffic.txt:1048576:100 .. caution:: @@ -136,18 +136,18 @@ The definition of debug:: Changing INI Files ------------------ -It is not unusual to have a variety of different INI files specifying +It is not unusual to have a variety of different INI files specifying different port numbers or other BACnet communications paramters. Rather than swapping INI files, you can simply provide the INI file on the -command line, overriding the default BACpypes.ini file. For example, I +command line, overriding the default BACpypes.ini file. For example, I have an INI file for port 47808:: - $ python Tutorial/WhoIsIAm.py --ini BAC0.ini + $ python samples/WhoIsIAm.py --ini BAC0.ini And another one for port 47809:: - $ python Tutorial/WhoIsIAm.py --ini BAC1.ini + $ python samples/WhoIsIAm.py --ini BAC1.ini And I switch back and forth between them. diff --git a/pcap_tools/AddressFilter.py b/pcap_tools/AddressFilter.py deleted file mode 100755 index 4130b71c..00000000 --- a/pcap_tools/AddressFilter.py +++ /dev/null @@ -1,129 +0,0 @@ -#!/usr/bin/python - -""" -Address Filter - Sample tool to filter by source, destination and/or host -""" - -import sys - -from bacpypes.debugging import Logging, function_debugging, ModuleLogger -from bacpypes.consolelogging import ConsoleLogHandler - -from bacpypes.pdu import Address -from bacpypes.analysis import trace, strftimestamp, Tracer - -# some debugging -_debug = 0 -_log = ModuleLogger(globals()) - -# globals -filterSource = None -filterDestination = None -filterHost = None - -# -# Match -# - -@function_debugging -def Match(addr1, addr2): - """Return true iff addr1 matches addr2.""" - if _debug: Match._debug("Match %r %r", addr1, addr2) - - if (addr2.addrType == Address.localBroadcastAddr): - # match any local station - return (addr1.addrType == Address.localStationAddr) or (addr1.addrType == Address.localBroadcastAddr) - elif (addr2.addrType == Address.localStationAddr): - # match a specific local station - return (addr1.addrType == Address.localStationAddr) and (addr1.addrAddr == addr2.addrAddr) - elif (addr2.addrType == Address.remoteBroadcastAddr): - # match any remote station or remote broadcast on a matching network - return ((addr1.addrType == Address.remoteStationAddr) or (addr1.addrType == Address.remoteBroadcastAddr)) \ - and (addr1.addrNet == addr2.addrNet) - elif (addr2.addrType == Address.remoteStationAddr): - # match a specific remote station - return (addr1.addrType == Address.remoteStationAddr) and \ - (addr1.addrNet == addr2.addrNet) and (addr1.addrAddr == addr2.addrAddr) - elif (addr2.addrType == Address.globalBroadcastAddr): - # match a global broadcast address - return (addr1.addrType == Address.globalBroadcastAddr) - else: - raise RuntimeError, "invalid match combination" - -# -# AddressFilterTracer -# - -class AddressFilterTracer(Tracer, Logging): - - def __init__(self): - if _debug: AddressFilterTracer._debug("__init__") - Tracer.__init__(self, self.Filter) - - def Filter(self, pkt): - if _debug: AddressFilterTracer._debug("Filter %r", pkt) - - # apply the filters - if filterSource: - if not Match(pkt.pduSource, filterSource): - if _debug: AddressFilterTracer._debug(" - source filter fail") - return - if filterDestination: - if not Match(pkt.pduDestination, filterDestination): - if _debug: AddressFilterTracer._debug(" - destination filter fail") - return - if filterHost: - if (not Match(pkt.pduSource, filterHost)) and (not Match(pkt.pduDestination, filterHost)): - if _debug: AddressFilterTracer._debug(" - host filter fail") - return - - # passed all the filter tests - print strftimestamp(pkt._timestamp), pkt.__class__.__name__ - pkt.debug_contents() - print - -# -# __main__ -# - -try: - if ('--debug' in sys.argv): - indx = sys.argv.index('--debug') - for i in range(indx+1, len(sys.argv)): - ConsoleLogHandler(sys.argv[i]) - del sys.argv[indx:] - - if _debug: _log.debug("initialization") - - # check for src - if ('--src' in sys.argv): - i = sys.argv.index('--src') - filterSource = Address(sys.argv[i+1]) - if _debug: _log.debug(" - filterSource: %r", filterSource) - del sys.argv[i:i+2] - - # check for dest - if ('--dest' in sys.argv): - i = sys.argv.index('--dest') - filterDestination = Address(sys.argv[i+1]) - if _debug: _log.debug(" - filterDestination: %r", filterDestination) - del sys.argv[i:i+2] - - # check for host - if ('--host' in sys.argv): - i = sys.argv.index('--host') - filterHost = Address(sys.argv[i+1]) - if _debug: _log.debug(" - filterHost: %r", filterHost) - del sys.argv[i:i+2] - - # trace the file(s) - for fname in sys.argv[1:]: - trace(fname, [AddressFilterTracer]) - -except KeyboardInterrupt: - pass -except Exception, e: - _log.exception("an error has occurred: %s", e) -finally: - if _debug: _log.debug("finally") - diff --git a/pcap_tools/COVNotificationSummaryFilter.py b/pcap_tools/COVNotificationSummaryFilter.py deleted file mode 100755 index fd8d6eb4..00000000 --- a/pcap_tools/COVNotificationSummaryFilter.py +++ /dev/null @@ -1,149 +0,0 @@ -#!/usr/bin/python - -""" -Summarize COV Notifications -""" - -import sys - -from bacpypes.debugging import Logging, function_debugging, ModuleLogger -from bacpypes.consolelogging import ConsoleLogHandler - -from bacpypes.pdu import Address -from bacpypes.analysis import trace, strftimestamp, Tracer -from bacpypes.apdu import UnconfirmedCOVNotificationRequest - -# some debugging -_debug = 0 -_log = ModuleLogger(globals()) - -# globals -filterSource = None -filterDestination = None -filterHost = None - -# dictionary of requests -requests = {} - -# -# Match -# - -@function_debugging -def Match(addr1, addr2): - """Return true iff addr1 matches addr2.""" - if _debug: Match._debug("Match %r %r", addr1, addr2) - - if (addr2.addrType == Address.localBroadcastAddr): - # match any local station - return (addr1.addrType == Address.localStationAddr) or (addr1.addrType == Address.localBroadcastAddr) - elif (addr2.addrType == Address.localStationAddr): - # match a specific local station - return (addr1.addrType == Address.localStationAddr) and (addr1.addrAddr == addr2.addrAddr) - elif (addr2.addrType == Address.remoteBroadcastAddr): - # match any remote station or remote broadcast on a matching network - return ((addr1.addrType == Address.remoteStationAddr) or (addr1.addrType == Address.remoteBroadcastAddr)) \ - and (addr1.addrNet == addr2.addrNet) - elif (addr2.addrType == Address.remoteStationAddr): - # match a specific remote station - return (addr1.addrType == Address.remoteStationAddr) and \ - (addr1.addrNet == addr2.addrNet) and (addr1.addrAddr == addr2.addrAddr) - elif (addr2.addrType == Address.globalBroadcastAddr): - # match a global broadcast address - return (addr1.addrType == Address.globalBroadcastAddr) - else: - raise RuntimeError, "invalid match combination" - -# -# COVNotificationSummary -# - -class COVNotificationSummary(Tracer, Logging): - - def __init__(self): - if _debug: COVNotificationSummary._debug("__init__") - Tracer.__init__(self, self.Filter) - - def Filter(self, pkt): - if _debug: COVNotificationSummary._debug("Filter %r", pkt) - global requests - - # apply the filters - if filterSource: - if not Match(pkt.pduSource, filterSource): - if _debug: COVNotificationSummary._debug(" - source filter fail") - return - if filterDestination: - if not Match(pkt.pduDestination, filterDestination): - if _debug: COVNotificationSummary._debug(" - destination filter fail") - return - if filterHost: - if (not Match(pkt.pduSource, filterHost)) and (not Match(pkt.pduDestination, filterHost)): - if _debug: COVNotificationSummary._debug(" - host filter fail") - return - - # check for notifications - if isinstance(pkt, UnconfirmedCOVNotificationRequest): - key = (pkt.pduSource, pkt.initiatingDeviceIdentifier[1], pkt.monitoredObjectIdentifier) - if key in requests: - requests[key] += 1 - else: - requests[key] = 1 - -# -# __main__ -# - -try: - if ('--debug' in sys.argv): - indx = sys.argv.index('--debug') - for i in range(indx+1, len(sys.argv)): - ConsoleLogHandler(sys.argv[i]) - del sys.argv[indx:] - - if _debug: _log.debug("initialization") - - # check for src - if ('--src' in sys.argv): - i = sys.argv.index('--src') - filterSource = Address(sys.argv[i+1]) - if _debug: _log.debug(" - filterSource: %r", filterSource) - del sys.argv[i:i+2] - - # check for dest - if ('--dest' in sys.argv): - i = sys.argv.index('--dest') - filterDestination = Address(sys.argv[i+1]) - if _debug: _log.debug(" - filterDestination: %r", filterDestination) - del sys.argv[i:i+2] - - # check for host - if ('--host' in sys.argv): - i = sys.argv.index('--host') - filterHost = Address(sys.argv[i+1]) - if _debug: _log.debug(" - filterHost: %r", filterHost) - del sys.argv[i:i+2] - - # start out with no requests - requests = {} - - # trace the file(s) - for fname in sys.argv[1:]: - trace(fname, [COVNotificationSummary]) - - # sort the result, descending order by count - items = requests.items() - items.sort(lambda x, y: cmp(y[1], x[1])) - - # print everything out - print "%-20s %8s %-15s %4s %5s" % ("Address", "Device", "Object", "", "Count") - for key, count in items: - print "%-20s %8s %-15s %4d %5d" % (key[0], key[1], key[2][0], key[2][1], count) - -except KeyboardInterrupt: - pass -except Exception, e: - _log.exception("an error has occurred: %s", e) -finally: - if _debug: _log.debug("finally") - diff --git a/pcap_tools/EventNotificationSummaryFilter.py b/pcap_tools/EventNotificationSummaryFilter.py deleted file mode 100755 index 093ef628..00000000 --- a/pcap_tools/EventNotificationSummaryFilter.py +++ /dev/null @@ -1,215 +0,0 @@ -#!/usr/bin/python - -""" -Event Notification Summary Filter -""" - -import sys - -from bacpypes.debugging import Logging, function_debugging, ModuleLogger -from bacpypes.consolelogging import ConsoleLogHandler - -from bacpypes.pdu import Address -from bacpypes.analysis import trace, strftimestamp, Tracer -from bacpypes.apdu import ConfirmedEventNotificationRequest, SimpleAckPDU - -try: - from CSStat import Statistics -except ImportError: - Statistics = lambda: None - -# some debugging -_debug = 0 -_log = ModuleLogger(globals()) - -# globals -filterSource = None -filterDestination = None -filterHost = None - -# dictionary of pending requests -requests = {} - -# all traffic -traffic = [] - -# -# Traffic -# - -class Traffic: - - def __init__(self, req): - self.req = req - self.resp = None - - self.ts = req._timestamp - self.retry = 1 - -# -# Match -# - -@function_debugging -def Match(addr1, addr2): - """Return true iff addr1 matches addr2.""" - if _debug: Match._debug("Match %r %r", addr1, addr2) - - if (addr2.addrType == Address.localBroadcastAddr): - # match any local station - return (addr1.addrType == Address.localStationAddr) or (addr1.addrType == Address.localBroadcastAddr) - elif (addr2.addrType == Address.localStationAddr): - # match a specific local station - return (addr1.addrType == Address.localStationAddr) and (addr1.addrAddr == addr2.addrAddr) - elif (addr2.addrType == Address.remoteBroadcastAddr): - # match any remote station or remote broadcast on a matching network - return ((addr1.addrType == Address.remoteStationAddr) or (addr1.addrType == Address.remoteBroadcastAddr)) \ - and (addr1.addrNet == addr2.addrNet) - elif (addr2.addrType == Address.remoteStationAddr): - # match a specific remote station - return (addr1.addrType == Address.remoteStationAddr) and \ - (addr1.addrNet == addr2.addrNet) and (addr1.addrAddr == addr2.addrAddr) - elif (addr2.addrType == Address.globalBroadcastAddr): - # match a global broadcast address - return (addr1.addrType == Address.globalBroadcastAddr) - else: - raise RuntimeError, "invalid match combination" - -# -# ConfirmedEventNotificationSummary -# - -class ConfirmedEventNotificationSummary(Tracer, Logging): - - def __init__(self): - if _debug: ConfirmedEventNotificationSummary._debug("__init__") - Tracer.__init__(self, self.Filter) - - def Filter(self, pkt): - if _debug: ConfirmedEventNotificationSummary._debug("Filter %r", pkt) - global requests - - # apply the filters - if filterSource: - if not Match(pkt.pduSource, filterSource): - if _debug: ConfirmedEventNotificationSummary._debug(" - source filter fail") - return - if filterDestination: - if not Match(pkt.pduDestination, filterDestination): - if _debug: ConfirmedEventNotificationSummary._debug(" - destination filter fail") - return - if filterHost: - if (not Match(pkt.pduSource, filterHost)) and (not Match(pkt.pduDestination, filterHost)): - if _debug: ConfirmedEventNotificationSummary._debug(" - host filter fail") - return - - # check for notifications - if isinstance(pkt, ConfirmedEventNotificationRequest): - key = (pkt.pduSource, pkt.pduDestination, pkt.apduInvokeID) - if key in requests: - if _debug: ConfirmedEventNotificationSummary._debug(" - retry") - requests[key].retry += 1 - else: - if _debug: ConfirmedEventNotificationSummary._debug(" - new request") - msg = Traffic(pkt) - requests[key] = msg - traffic.append(msg) - - # now check for acks - elif isinstance(pkt, SimpleAckPDU): - key = (pkt.pduDestination, pkt.pduSource, pkt.apduInvokeID) - req = requests.get(key, None) - if req: - if _debug: ConfirmedEventNotificationSummary._debug(" - matched with request") - requests[key].resp = pkt - - # delete the request, it stays in the traffic list - del requests[key] - else: - if _debug: ConfirmedEventNotificationSummary._debug(" - unmatched") - -# -# __main__ -# - -try: - if ('--debug' in sys.argv): - indx = sys.argv.index('--debug') - for i in range(indx+1, len(sys.argv)): - ConsoleLogHandler(sys.argv[i]) - del sys.argv[indx:] - - if _debug: _log.debug("initialization") - - # check for src - if ('--src' in sys.argv): - i = sys.argv.index('--src') - filterSource = Address(sys.argv[i+1]) - if _debug: _log.debug(" - filterSource: %r", filterSource) - del sys.argv[i:i+2] - - # check for dest - if ('--dest' in sys.argv): - i = sys.argv.index('--dest') - filterDestination = Address(sys.argv[i+1]) - if _debug: _log.debug(" - filterDestination: %r", filterDestination) - del sys.argv[i:i+2] - - # check for host - if ('--host' in sys.argv): - i = sys.argv.index('--host') - filterHost = Address(sys.argv[i+1]) - if _debug: _log.debug(" - filterHost: %r", filterHost) - del sys.argv[i:i+2] - - # start out with no unmatched requests - requests = {} - - # trace the file(s) - for fname in sys.argv[1:]: - trace(fname, [ConfirmedEventNotificationSummary]) - - # print some stats at the end - stats = Statistics() - - # dump everything - for msg in traffic: - req = msg.req - resp = msg.resp - - if resp: - deltatime = (resp._timestamp - req._timestamp) * 1000 - if stats: - stats.Record(deltatime, req._timestamp) - print strftimestamp(req._timestamp), '\t', req.pduSource, '\t', resp.pduSource, '\t', ("%8.2fms" % (deltatime,)), '\t', msg.retry if (msg.retry != 1) else "" - else: - print strftimestamp(req._timestamp), '\t', req.pduSource, '\t', "----------", '\t', "----------", '\t', msg.retry if (msg.retry != 1) else "" - - if stats: - smin, smax, _, _, _, _ = stats.Stats() - - xlw, lw, q1, m, q3, uw, xuw = stats.Whisker() - if m is not None: - print "\t %-8.1f" % smax - print "\t %-8.1f" % xuw - print "\t--- %-8.1f" % uw - print "\t | " - print "\t.'. %-8.1f" % q3 - print "\t| |" - print "\t|-| %-8.1f" % m - print "\t| |" - print "\t'-' %-8.1f" % q1 - print "\t | " - print "\t--- %-8.1f" % lw - print "\t %-8.1f" % xlw - print "\t %-8.1f" % smin - else: - print "No stats" - -except KeyboardInterrupt: - pass -except Exception, e: - _log.exception("an error has occurred: %s", e) -finally: - if _debug: _log.debug("finally") - diff --git a/pcap_tools/IAmRouterToNetworkSummaryFilter.py b/pcap_tools/IAmRouterToNetworkSummaryFilter.py deleted file mode 100755 index 729c7b42..00000000 --- a/pcap_tools/IAmRouterToNetworkSummaryFilter.py +++ /dev/null @@ -1,161 +0,0 @@ -#!/usr/bin/python - -""" -Summarize I-Am-Router-To-Network Notifications -""" - -import sys -from collections import defaultdict - -from bacpypes.debugging import Logging, function_debugging, ModuleLogger -from bacpypes.consolelogging import ConsoleLogHandler - -from bacpypes.pdu import Address -from bacpypes.analysis import trace, strftimestamp, Tracer -from bacpypes.npdu import IAmRouterToNetwork - -# some debugging -_debug = 0 -_log = ModuleLogger(globals()) - -# globals -filterSource = None -filterDestination = None -filterHost = None - -# dictionary of requests -requests = defaultdict(int) -networks = defaultdict(list) - -# -# Match -# - -@function_debugging -def Match(addr1, addr2): - """Return true iff addr1 matches addr2.""" - if _debug: Match._debug("Match %r %r", addr1, addr2) - - if (addr2.addrType == Address.localBroadcastAddr): - # match any local station - return (addr1.addrType == Address.localStationAddr) or (addr1.addrType == Address.localBroadcastAddr) - elif (addr2.addrType == Address.localStationAddr): - # match a specific local station - return (addr1.addrType == Address.localStationAddr) and (addr1.addrAddr == addr2.addrAddr) - elif (addr2.addrType == Address.remoteBroadcastAddr): - # match any remote station or remote broadcast on a matching network - return ((addr1.addrType == Address.remoteStationAddr) or (addr1.addrType == Address.remoteBroadcastAddr)) \ - and (addr1.addrNet == addr2.addrNet) - elif (addr2.addrType == Address.remoteStationAddr): - # match a specific remote station - return (addr1.addrType == Address.remoteStationAddr) and \ - (addr1.addrNet == addr2.addrNet) and (addr1.addrAddr == addr2.addrAddr) - elif (addr2.addrType == Address.globalBroadcastAddr): - # match a global broadcast address - return (addr1.addrType == Address.globalBroadcastAddr) - else: - raise RuntimeError, "invalid match combination" - -# -# IAmRouterToNetworkSummary -# - -class IAmRouterToNetworkSummary(Tracer, Logging): - - def __init__(self): - if _debug: IAmRouterToNetworkSummary._debug("__init__") - Tracer.__init__(self, self.Filter) - - def Filter(self, pkt): - if _debug: IAmRouterToNetworkSummary._debug("Filter %r", pkt) - global requests, networks - - # check for the packet type - if not isinstance(pkt, IAmRouterToNetwork): - return - - # apply the filters - if filterSource: - if not Match(pkt.pduSource, filterSource): - if _debug: IAmRouterToNetworkSummary._debug(" - source filter fail") - return - if filterDestination: - if not Match(pkt.pduDestination, filterDestination): - if _debug: IAmRouterToNetworkSummary._debug(" - destination filter fail") - return - if filterHost: - if (not Match(pkt.pduSource, filterHost)) and (not Match(pkt.pduDestination, filterHost)): - if _debug: IAmRouterToNetworkSummary._debug(" - host filter fail") - return - - # count it - requests[pkt.pduSource] += 1 - networks[pkt.pduSource].append((pkt.iartnNetworkList)) - -# -# __main__ -# - -try: - if ('--debug' in sys.argv): - indx = sys.argv.index('--debug') - for i in range(indx+1, len(sys.argv)): - ConsoleLogHandler(sys.argv[i]) - del sys.argv[indx:] - - if _debug: _log.debug("initialization") - - # check for src - if ('--src' in sys.argv): - i = sys.argv.index('--src') - filterSource = Address(sys.argv[i+1]) - if _debug: _log.debug(" - filterSource: %r", filterSource) - del sys.argv[i:i+2] - - # check for dest - if ('--dest' in sys.argv): - i = sys.argv.index('--dest') - filterDestination = Address(sys.argv[i+1]) - if _debug: _log.debug(" - filterDestination: %r", filterDestination) - del sys.argv[i:i+2] - - # check for host - if ('--host' in sys.argv): - i = sys.argv.index('--host') - filterHost = Address(sys.argv[i+1]) - if _debug: _log.debug(" - filterHost: %r", filterHost) - del sys.argv[i:i+2] - - # trace the file(s) - for fname in sys.argv[1:]: - trace(fname, [IAmRouterToNetworkSummary]) - - # sort the result, descending order by count - items = requests.items() - items.sort(lambda x, y: cmp(y[1], x[1])) - - # print everything out - print "%-20s %5s" % ("Address", "Count") - for key, count in items: - print "%-20s %5d" % (key, count) - - # count the number of times of each network - net_count = defaultdict(int) - for subnet_list in networks[key]: - for net in subnet_list: - net_count[net] += 1 - - # sort descending - net_count = net_count.items() - net_count.sort(lambda x, y: cmp(y[1], x[1])) - - for net, count in net_count: - print " %5d %5d" % (net, count) - -except KeyboardInterrupt: - pass -except Exception, e: - _log.exception("an error has occurred: %s", e) -finally: - if _debug: _log.debug("finally") - diff --git a/pcap_tools/PDUsPerMinuteFilter.py b/pcap_tools/PDUsPerMinuteFilter.py deleted file mode 100755 index 978d202e..00000000 --- a/pcap_tools/PDUsPerMinuteFilter.py +++ /dev/null @@ -1,108 +0,0 @@ -#!/usr/bin/python - -""" -PDUs per Minute Filter - present a table of number of PDUs per minute -""" - -import sys -from collections import defaultdict - -from bacpypes.debugging import Logging, function_debugging, ModuleLogger -from bacpypes.consolelogging import ConsoleLogHandler - -from bacpypes.analysis import trace, strftimestamp, Tracer - -try: - from CSStat import Statistics -except ImportError: - Statistics = lambda: None - -# some debugging -_debug = 0 -_log = ModuleLogger(globals()) - -# globals -counter = defaultdict(int) - -# -# PDUsPerMinuteTracer -# - -class PDUsPerMinuteTracer(Tracer, Logging): - - def __init__(self): - if _debug: PDUsPerMinuteTracer._debug("__init__") - Tracer.__init__(self, self.Filter) - - def Filter(self, pkt): - if _debug: PDUsPerMinuteTracer._debug("Filter %r", pkt) - - slot = ((int(pkt._timestamp) / interval) * interval) - if _debug: PDUsPerMinuteTracer._debug(" - slot: %r", slot) - - # count the packets in the slot - counter[slot] += 1 - -# -# __main__ -# - -try: - if ('--debug' in sys.argv): - indx = sys.argv.index('--debug') - for i in range(indx+1, len(sys.argv)): - ConsoleLogHandler(sys.argv[i]) - del sys.argv[indx:] - - if _debug: _log.debug("initialization") - - # check for a custom interval - if ('--interval' in sys.argv): - i = sys.argv.index('--interval') - interval = int(sys.argv[i+1]) - if _debug: _log.debug(" - interval: %r", interval) - del sys.argv[i:i+2] - else: - interval = 60 - - # trace the file(s) - for fname in sys.argv[1:]: - trace(fname, [PDUsPerMinuteTracer]) - - # print some stats at the end - stats = Statistics() - - # dump the counters - for ts in range(min(counter), max(counter)+1, interval): - print strftimestamp(ts), counter[ts] - if stats: - stats.Record(counter[ts], ts) - - if stats: - smin, smax, _, _, _, _ = stats.Stats() - - xlw, lw, q1, m, q3, uw, xuw = stats.Whisker() - if m is not None: - print "\t %-8.1f" % smax - print "\t %-8.1f" % xuw - print "\t--- %-8.1f" % uw - print "\t | " - print "\t.'. %-8.1f" % q3 - print "\t| |" - print "\t|-| %-8.1f" % m - print "\t| |" - print "\t'-' %-8.1f" % q1 - print "\t | " - print "\t--- %-8.1f" % lw - print "\t %-8.1f" % xlw - print "\t %-8.1f" % smin - else: - print "No stats" - -except KeyboardInterrupt: - pass -except Exception, e: - _log.exception("an error has occurred: %s", e) -finally: - if _debug: _log.debug("finally") - diff --git a/pcap_tools/ReadPropertySummaryFilter.py b/pcap_tools/ReadPropertySummaryFilter.py deleted file mode 100755 index 5bfd8f70..00000000 --- a/pcap_tools/ReadPropertySummaryFilter.py +++ /dev/null @@ -1,228 +0,0 @@ -#!/usr/bin/python - -""" -Read Property Summary Filter - Summarize Read Property Requests/Responses -""" - -import sys - -from bacpypes.debugging import Logging, function_debugging, ModuleLogger -from bacpypes.consolelogging import ConsoleLogHandler - -from bacpypes.pdu import Address -from bacpypes.analysis import trace, strftimestamp, Tracer -from bacpypes.apdu import ReadPropertyRequest, ReadPropertyACK - -try: - from CSStat import Statistics -except ImportError: - Statistics = lambda: None - -# some debugging -_debug = 0 -_log = ModuleLogger(globals()) - -# globals -filterSource = None -filterDestination = None -filterHost = None -filterEval = None - -# dictionary of pending requests -requests = {} - -# all traffic -traffic = [] - -# -# Traffic -# - -class Traffic: - - def __init__(self, req): - self.req = req - self.resp = None - - self.ts = req._timestamp - self.retry = 1 - -# -# Match -# - -@function_debugging -def Match(addr1, addr2): - """Return true iff addr1 matches addr2.""" - if _debug: Match._debug("Match %r %r", addr1, addr2) - - if (addr2.addrType == Address.localBroadcastAddr): - # match any local station - return (addr1.addrType == Address.localStationAddr) or (addr1.addrType == Address.localBroadcastAddr) - elif (addr2.addrType == Address.localStationAddr): - # match a specific local station - return (addr1.addrType == Address.localStationAddr) and (addr1.addrAddr == addr2.addrAddr) - elif (addr2.addrType == Address.remoteBroadcastAddr): - # match any remote station or remote broadcast on a matching network - return ((addr1.addrType == Address.remoteStationAddr) or (addr1.addrType == Address.remoteBroadcastAddr)) \ - and (addr1.addrNet == addr2.addrNet) - elif (addr2.addrType == Address.remoteStationAddr): - # match a specific remote station - return (addr1.addrType == Address.remoteStationAddr) and \ - (addr1.addrNet == addr2.addrNet) and (addr1.addrAddr == addr2.addrAddr) - elif (addr2.addrType == Address.globalBroadcastAddr): - # match a global broadcast address - return (addr1.addrType == Address.globalBroadcastAddr) - else: - raise RuntimeError, "invalid match combination" - -# -# ReadPropertySummary -# - -class ReadPropertySummary(Tracer, Logging): - - def __init__(self): - if _debug: ReadPropertySummary._debug("__init__") - Tracer.__init__(self, self.Filter) - - def Filter(self, pkt): - if _debug: ReadPropertySummary._debug("Filter %r", pkt) - global requests - - # apply the filters - if filterSource: - if not Match(pkt.pduSource, filterSource): - if _debug: ReadPropertySummary._debug(" - source filter fail") - return - if filterDestination: - if not Match(pkt.pduDestination, filterDestination): - if _debug: ReadPropertySummary._debug(" - destination filter fail") - return - if filterHost: - if (not Match(pkt.pduSource, filterHost)) and (not Match(pkt.pduDestination, filterHost)): - if _debug: ReadPropertySummary._debug(" - host filter fail") - return - if filterEval: - try: - matches = eval(filterEval, {'pkt':pkt}) - if not matches: - if _debug: ReadPropertySummary._debug(" - eval filter fail") - return - except: - if _debug: ReadPropertySummary._debug(" - eval filter massive fail") - return - - # check for reads - if isinstance(pkt, ReadPropertyRequest): - key = (pkt.pduSource, pkt.pduDestination, pkt.apduInvokeID) - if key in requests: - if _debug: ReadPropertySummary._debug(" - retry") - requests[key].retry += 1 - else: - if _debug: ReadPropertySummary._debug(" - new request") - msg = Traffic(pkt) - requests[key] = msg - traffic.append(msg) - - # now check for results - elif isinstance(pkt, ReadPropertyACK): - key = (pkt.pduDestination, pkt.pduSource, pkt.apduInvokeID) - req = requests.get(key, None) - if req: - if _debug: ReadPropertySummary._debug(" - matched with request") - requests[key].resp = pkt - - # delete the request, it stays in the traffic list - del requests[key] - else: - if _debug: ReadPropertySummary._debug(" - unmatched") - -# -# __main__ -# - -try: - if ('--debug' in sys.argv): - indx = sys.argv.index('--debug') - for i in range(indx+1, len(sys.argv)): - ConsoleLogHandler(sys.argv[i]) - del sys.argv[indx:] - - if _debug: _log.debug("initialization") - - # check for src - if ('--src' in sys.argv): - i = sys.argv.index('--src') - filterSource = Address(sys.argv[i+1]) - if _debug: _log.debug(" - filterSource: %r", filterSource) - del sys.argv[i:i+2] - - # check for dest - if ('--dest' in sys.argv): - i = sys.argv.index('--dest') - filterDestination = Address(sys.argv[i+1]) - if _debug: _log.debug(" - filterDestination: %r", filterDestination) - del sys.argv[i:i+2] - - # check for host - if ('--host' in sys.argv): - i = sys.argv.index('--host') - filterHost = Address(sys.argv[i+1]) - if _debug: _log.debug(" - filterHost: %r", filterHost) - del sys.argv[i:i+2] - - # check for eval - if ('--eval' in sys.argv): - i = sys.argv.index('--eval') - filterEval = sys.argv[i+1] - if _debug: _log.debug(" - filterEval: %r", filterEval) - del sys.argv[i:i+2] - - # start out with no unmatched requests - requests = {} - - # trace the file(s) - for fname in sys.argv[1:]: - trace(fname, [ReadPropertySummary]) - - # print some stats at the end - stats = Statistics() - - # dump everything - for msg in traffic: - req = msg.req - resp = msg.resp - - if resp: - deltatime = (resp._timestamp - req._timestamp) * 1000 - if stats: - stats.Record(deltatime, req._timestamp) - print strftimestamp(req._timestamp), '\t', req.pduSource, '\t', resp.pduSource, '\t', ("%6.2fms" % (deltatime,)), '\t', msg.retry if (msg.retry != 1) else "" - else: - print strftimestamp(req._timestamp), '\t', req.pduSource, '\t', "----------", '\t', "----------", '\t', msg.retry if (msg.retry != 1) else "" - - if stats: - xlw, lw, q1, m, q3, uw, xuw = stats.Whisker() - if m is not None: - print "\t %-6.1f" % xuw - print "\t--- %-6.1f" % uw - print "\t | " - print "\t.'. %-6.1f" % q3 - print "\t| |" - print "\t|-| %-6.1f" % m - print "\t| |" - print "\t'-' %-6.1f" % q1 - print "\t | " - print "\t--- %-6.1f" % lw - print "\t %-6.1f" % xlw - else: - print "No stats" - -except KeyboardInterrupt: - pass -except Exception, e: - _log.exception("an error has occurred: %s", e) -finally: - if _debug: _log.debug("finally") - diff --git a/pcap_tools/ReadPropertyTimeoutFilter.py b/pcap_tools/ReadPropertyTimeoutFilter.py deleted file mode 100755 index f6c60ded..00000000 --- a/pcap_tools/ReadPropertyTimeoutFilter.py +++ /dev/null @@ -1,177 +0,0 @@ -#!/usr/bin/python - -""" -Read Property Timeout Filter - Look for Read Property Requests with no response -""" - -import sys - -from bacpypes.debugging import Logging, function_debugging, ModuleLogger -from bacpypes.consolelogging import ConsoleLogHandler - -from bacpypes.pdu import Address -from bacpypes.analysis import trace, strftimestamp, Tracer -from bacpypes.apdu import ReadPropertyRequest, ReadPropertyACK - -# some debugging -_debug = 0 -_log = ModuleLogger(globals()) - -# globals -filterSource = None -filterDestination = None -filterHost = None - -# dictionary of pending requests -requests = {} - -# all traffic -traffic = [] - -# -# Traffic -# - -class Traffic: - - def __init__(self, req): - self.req = req - self.resp = None - - self.ts = req._timestamp - self.retry = 1 - -# -# Match -# - -@function_debugging -def Match(addr1, addr2): - """Return true iff addr1 matches addr2.""" - if _debug: Match._debug("Match %r %r", addr1, addr2) - - if (addr2.addrType == Address.localBroadcastAddr): - # match any local station - return (addr1.addrType == Address.localStationAddr) or (addr1.addrType == Address.localBroadcastAddr) - elif (addr2.addrType == Address.localStationAddr): - # match a specific local station - return (addr1.addrType == Address.localStationAddr) and (addr1.addrAddr == addr2.addrAddr) - elif (addr2.addrType == Address.remoteBroadcastAddr): - # match any remote station or remote broadcast on a matching network - return ((addr1.addrType == Address.remoteStationAddr) or (addr1.addrType == Address.remoteBroadcastAddr)) \ - and (addr1.addrNet == addr2.addrNet) - elif (addr2.addrType == Address.remoteStationAddr): - # match a specific remote station - return (addr1.addrType == Address.remoteStationAddr) and \ - (addr1.addrNet == addr2.addrNet) and (addr1.addrAddr == addr2.addrAddr) - elif (addr2.addrType == Address.globalBroadcastAddr): - # match a global broadcast address - return (addr1.addrType == Address.globalBroadcastAddr) - else: - raise RuntimeError, "invalid match combination" - -# -# ReadPropertySummary -# - -class ReadPropertySummary(Tracer, Logging): - - def __init__(self): - if _debug: ReadPropertySummary._debug("__init__") - Tracer.__init__(self, self.Filter) - - def Filter(self, pkt): - if _debug: ReadPropertySummary._debug("Filter %r", pkt) - global requests - - # apply the filters - if filterSource: - if not Match(pkt.pduSource, filterSource): - if _debug: ReadPropertySummary._debug(" - source filter fail") - return - if filterDestination: - if not Match(pkt.pduDestination, filterDestination): - if _debug: ReadPropertySummary._debug(" - destination filter fail") - return - if filterHost: - if (not Match(pkt.pduSource, filterHost)) and (not Match(pkt.pduDestination, filterHost)): - if _debug: ReadPropertySummary._debug(" - host filter fail") - return - - # check for reads - if isinstance(pkt, ReadPropertyRequest): - key = (pkt.pduSource, pkt.pduDestination, pkt.apduInvokeID) - if key in requests: - if _debug: ReadPropertySummary._debug(" - retry") - requests[key].retry += 1 - else: - if _debug: ReadPropertySummary._debug(" - new request") - msg = Traffic(pkt) - requests[key] = msg - traffic.append(msg) - - # now check for results - elif isinstance(pkt, ReadPropertyACK): - key = (pkt.pduDestination, pkt.pduSource, pkt.apduInvokeID) - req = requests.get(key, None) - if req: - if _debug: ReadPropertySummary._debug(" - matched with request") - requests[key].resp = pkt - - # delete the request, it stays in the traffic list - del requests[key] - else: - if _debug: ReadPropertySummary._debug(" - unmatched") - -# -# __main__ -# - -try: - if ('--debug' in sys.argv): - indx = sys.argv.index('--debug') - for i in range(indx+1, len(sys.argv)): - ConsoleLogHandler(sys.argv[i]) - del sys.argv[indx:] - - if _debug: _log.debug("initialization") - - # check for src - if ('--src' in sys.argv): - i = sys.argv.index('--src') - filterSource = Address(sys.argv[i+1]) - if _debug: _log.debug(" - filterSource: %r", filterSource) - del sys.argv[i:i+2] - - # check for dest - if ('--dest' in sys.argv): - i = sys.argv.index('--dest') - filterDestination = Address(sys.argv[i+1]) - if _debug: _log.debug(" - filterDestination: %r", filterDestination) - del sys.argv[i:i+2] - - # check for host - if ('--host' in sys.argv): - i = sys.argv.index('--host') - filterHost = Address(sys.argv[i+1]) - if _debug: _log.debug(" - filterHost: %r", filterHost) - del sys.argv[i:i+2] - - # start out with no unmatched requests - requests = {} - - # trace the file - trace(sys.argv[1], [ReadPropertySummary]) - - # dump the requests that failed - for msg in traffic: - if not msg.resp: - print strftimestamp(msg.req._timestamp), msg.req.objectIdentifier, msg.req.propertyIdentifier - -except KeyboardInterrupt: - pass -except Exception, e: - _log.exception("an error has occurred: %s", e) -finally: - if _debug: _log.debug("finally") - diff --git a/pcap_tools/WhoIsIAmDeviceFilter.py b/pcap_tools/WhoIsIAmDeviceFilter.py deleted file mode 100755 index 5b6fbc4f..00000000 --- a/pcap_tools/WhoIsIAmDeviceFilter.py +++ /dev/null @@ -1,158 +0,0 @@ -#!/usr/bin/python - -""" -Who-Is and I-Am Device Filter -""" - -import sys - -from bacpypes.debugging import Logging, function_debugging, ModuleLogger -from bacpypes.consolelogging import ConsoleLogHandler - -from bacpypes.pdu import Address -from bacpypes.analysis import trace, strftimestamp, Tracer -from bacpypes.apdu import WhoIsRequest, IAmRequest - -# some debugging -_debug = 0 -_log = ModuleLogger(globals()) - -# globals -filterSource = None -filterDestination = None -filterHost = None -filterDevice = None - -# -# Match -# - -@function_debugging -def Match(addr1, addr2): - """Return true iff addr1 matches addr2.""" - if _debug: Match._debug("Match %r %r", addr1, addr2) - - if (addr2.addrType == Address.localBroadcastAddr): - # match any local station - return (addr1.addrType == Address.localStationAddr) or (addr1.addrType == Address.localBroadcastAddr) - elif (addr2.addrType == Address.localStationAddr): - # match a specific local station - return (addr1.addrType == Address.localStationAddr) and (addr1.addrAddr == addr2.addrAddr) - elif (addr2.addrType == Address.remoteBroadcastAddr): - # match any remote station or remote broadcast on a matching network - return ((addr1.addrType == Address.remoteStationAddr) or (addr1.addrType == Address.remoteBroadcastAddr)) \ - and (addr1.addrNet == addr2.addrNet) - elif (addr2.addrType == Address.remoteStationAddr): - # match a specific remote station - return (addr1.addrType == Address.remoteStationAddr) and \ - (addr1.addrNet == addr2.addrNet) and (addr1.addrAddr == addr2.addrAddr) - elif (addr2.addrType == Address.globalBroadcastAddr): - # match a global broadcast address - return (addr1.addrType == Address.globalBroadcastAddr) - else: - raise RuntimeError, "invalid match combination" - -# -# WhoIsIAmDevice -# - -class WhoIsIAmDevice(Tracer, Logging): - - def __init__(self): - if _debug: WhoIsIAmDevice._debug("__init__") - Tracer.__init__(self, self.Filter) - - def Filter(self, pkt): - if _debug: WhoIsIAmDevice._debug("Filter %r", pkt) - global requests - - # apply the filters - if filterSource: - if not Match(pkt.pduSource, filterSource): - if _debug: WhoIsIAmDevice._debug(" - source filter fail") - return - if filterDestination: - if not Match(pkt.pduDestination, filterDestination): - if _debug: WhoIsIAmDevice._debug(" - destination filter fail") - return - if filterHost: - if (not Match(pkt.pduSource, filterHost)) and (not Match(pkt.pduDestination, filterHost)): - if _debug: WhoIsIAmDevice._debug(" - host filter fail") - return - - # check for Who-Is - if isinstance(pkt, WhoIsRequest): - match = False - if (pkt.deviceInstanceRangeLowLimit is None) or (pkt.deviceInstanceRangeHighLimit is None): - match = True - elif (pkt.deviceInstanceRangeLowLimit >= filterDevice) and (pkt.deviceInstanceRangeHighLimit <= filterDevice): - match = True - - if match: - print "[%d] %s WhoIs %-20s %-20s %8s %8s" % ( - pkt._index + 1, strftimestamp(pkt._timestamp), - pkt.pduSource, pkt.pduDestination, - pkt.deviceInstanceRangeLowLimit, pkt.deviceInstanceRangeHighLimit, - ) - - # check for I-Am - elif isinstance(pkt, IAmRequest): - - if (pkt.iAmDeviceIdentifier[1] == filterDevice): - print "[%d] %s IAm %-20s %-20s" % ( - pkt._index + 1, strftimestamp(pkt._timestamp), - pkt.pduSource, pkt.pduDestination, - ) - -# -# __main__ -# - -try: - if ('--debug' in sys.argv): - indx = sys.argv.index('--debug') - for i in range(indx+1, len(sys.argv)): - ConsoleLogHandler(sys.argv[i]) - del sys.argv[indx:] - - if _debug: _log.debug("initialization") - - # check for src - if ('--src' in sys.argv): - i = sys.argv.index('--src') - filterSource = Address(sys.argv[i+1]) - if _debug: _log.debug(" - filterSource: %r", filterSource) - del sys.argv[i:i+2] - - # check for dest - if ('--dest' in sys.argv): - i = sys.argv.index('--dest') - filterDestination = Address(sys.argv[i+1]) - if _debug: _log.debug(" - filterDestination: %r", filterDestination) - del sys.argv[i:i+2] - - # check for host - if ('--host' in sys.argv): - i = sys.argv.index('--host') - filterHost = Address(sys.argv[i+1]) - if _debug: _log.debug(" - filterHost: %r", filterHost) - del sys.argv[i:i+2] - - # pcap_file - pcap_file = sys.argv[1] - if _debug: _log.debug(" - pcap_file: %r", pcap_file) - - # which device instance to look for - filterDevice = int(sys.argv[2]) - if _debug: _log.debug(" - filterDevice: %r:", filterDevice) - - # trace the file - trace(pcap_file, [WhoIsIAmDevice]) - -except KeyboardInterrupt: - pass -except Exception, e: - _log.exception("an error has occurred: %s", e) -finally: - if _debug: _log.debug("finally") - diff --git a/pcap_tools/WhoIsIAmSummaryFilter.py b/pcap_tools/WhoIsIAmSummaryFilter.py deleted file mode 100755 index d806ba33..00000000 --- a/pcap_tools/WhoIsIAmSummaryFilter.py +++ /dev/null @@ -1,160 +0,0 @@ -#!/usr/bin/python - -""" -Who-Is and I-Am Summary Filter -""" - -import sys -from collections import defaultdict - -from bacpypes.debugging import Logging, function_debugging, ModuleLogger -from bacpypes.consolelogging import ConsoleLogHandler - -from bacpypes.pdu import Address -from bacpypes.analysis import trace, strftimestamp, Tracer -from bacpypes.apdu import WhoIsRequest, IAmRequest - -# some debugging -_debug = 0 -_log = ModuleLogger(globals()) - -# globals -filterSource = None -filterDestination = None -filterHost = None - -# dictionaries of requests -whoIsTraffic = defaultdict(int) -iAmTraffic = defaultdict(int) - -# -# Match -# - -@function_debugging -def Match(addr1, addr2): - """Return true iff addr1 matches addr2.""" - if _debug: Match._debug("Match %r %r", addr1, addr2) - - if (addr2.addrType == Address.localBroadcastAddr): - # match any local station - return (addr1.addrType == Address.localStationAddr) or (addr1.addrType == Address.localBroadcastAddr) - elif (addr2.addrType == Address.localStationAddr): - # match a specific local station - return (addr1.addrType == Address.localStationAddr) and (addr1.addrAddr == addr2.addrAddr) - elif (addr2.addrType == Address.remoteBroadcastAddr): - # match any remote station or remote broadcast on a matching network - return ((addr1.addrType == Address.remoteStationAddr) or (addr1.addrType == Address.remoteBroadcastAddr)) \ - and (addr1.addrNet == addr2.addrNet) - elif (addr2.addrType == Address.remoteStationAddr): - # match a specific remote station - return (addr1.addrType == Address.remoteStationAddr) and \ - (addr1.addrNet == addr2.addrNet) and (addr1.addrAddr == addr2.addrAddr) - elif (addr2.addrType == Address.globalBroadcastAddr): - # match a global broadcast address - return (addr1.addrType == Address.globalBroadcastAddr) - else: - raise RuntimeError, "invalid match combination" - -# -# WhoIsIAmSummary -# - -class WhoIsIAmSummary(Tracer, Logging): - - def __init__(self): - if _debug: WhoIsIAmSummary._debug("__init__") - Tracer.__init__(self, self.Filter) - - def Filter(self, pkt): - if _debug: WhoIsIAmSummary._debug("Filter %r", pkt) - global requests - - # apply the filters - if filterSource: - if not Match(pkt.pduSource, filterSource): - if _debug: WhoIsIAmSummary._debug(" - source filter fail") - return - if filterDestination: - if not Match(pkt.pduDestination, filterDestination): - if _debug: WhoIsIAmSummary._debug(" - destination filter fail") - return - if filterHost: - if (not Match(pkt.pduSource, filterHost)) and (not Match(pkt.pduDestination, filterHost)): - if _debug: WhoIsIAmSummary._debug(" - host filter fail") - return - - # check for Who-Is - if isinstance(pkt, WhoIsRequest): - key = (pkt.pduSource, pkt.deviceInstanceRangeLowLimit, pkt.deviceInstanceRangeHighLimit) - whoIsTraffic[key] += 1 - - # check for I-Am - elif isinstance(pkt, IAmRequest): - key = (pkt.pduSource, pkt.iAmDeviceIdentifier[1]) - iAmTraffic[key] += 1 - -# -# __main__ -# - -try: - if ('--debug' in sys.argv): - indx = sys.argv.index('--debug') - for i in range(indx+1, len(sys.argv)): - ConsoleLogHandler(sys.argv[i]) - del sys.argv[indx:] - - if _debug: _log.debug("initialization") - - # check for src - if ('--src' in sys.argv): - i = sys.argv.index('--src') - filterSource = Address(sys.argv[i+1]) - if _debug: _log.debug(" - filterSource: %r", filterSource) - del sys.argv[i:i+2] - - # check for dest - if ('--dest' in sys.argv): - i = sys.argv.index('--dest') - filterDestination = Address(sys.argv[i+1]) - if _debug: _log.debug(" - filterDestination: %r", filterDestination) - del sys.argv[i:i+2] - - # check for host - if ('--host' in sys.argv): - i = sys.argv.index('--host') - filterHost = Address(sys.argv[i+1]) - if _debug: _log.debug(" - filterHost: %r", filterHost) - del sys.argv[i:i+2] - - # trace the file(s) - for fname in sys.argv[1:]: - trace(fname, [WhoIsIAmSummary]) - - # dump request counts - print "----- Top 20 Who-Is -----" - print - - items = whoIsTraffic.items() - items.sort(lambda x, y: cmp((y[1], y[0][0]), (x[1], x[0][0]))) - for item in items[:20]: - print "%-20s %8s %8s %5d" % (item[0][0], item[0][1], item[0][2], item[1]) - print - - print "----- Top 20 I-Am -----" - print - - items = iAmTraffic.items() - items.sort(lambda x, y: cmp((y[1], y[0][0]), (x[1], x[0][0]))) - for item in items[:20]: - print "%-20s %8s %5d" % (item[0][0], item[0][1], item[1]) - print - -except KeyboardInterrupt: - pass -except Exception, e: - _log.exception("an error has occurred: %s", e) -finally: - if _debug: _log.debug("finally") - diff --git a/pcap_tools/WhoIsRouterToNetworkSummaryFilter.py b/pcap_tools/WhoIsRouterToNetworkSummaryFilter.py deleted file mode 100755 index b719c08e..00000000 --- a/pcap_tools/WhoIsRouterToNetworkSummaryFilter.py +++ /dev/null @@ -1,160 +0,0 @@ -#!/usr/bin/python - -""" -Summarize Who-Is-Router-To-Network Notifications -""" - -import sys -from collections import defaultdict - -from bacpypes.debugging import Logging, function_debugging, ModuleLogger -from bacpypes.consolelogging import ConsoleLogHandler - -from bacpypes.pdu import Address -from bacpypes.analysis import trace, strftimestamp, Tracer -from bacpypes.npdu import WhoIsRouterToNetwork - -# some debugging -_debug = 0 -_log = ModuleLogger(globals()) - -# globals -filterSource = None -filterDestination = None -filterHost = None - -# dictionary of requests -requests = defaultdict(int) -networks = defaultdict(list) - -# -# Match -# - -@function_debugging -def Match(addr1, addr2): - """Return true iff addr1 matches addr2.""" - if _debug: Match._debug("Match %r %r", addr1, addr2) - - if (addr2.addrType == Address.localBroadcastAddr): - # match any local station - return (addr1.addrType == Address.localStationAddr) or (addr1.addrType == Address.localBroadcastAddr) - elif (addr2.addrType == Address.localStationAddr): - # match a specific local station - return (addr1.addrType == Address.localStationAddr) and (addr1.addrAddr == addr2.addrAddr) - elif (addr2.addrType == Address.remoteBroadcastAddr): - # match any remote station or remote broadcast on a matching network - return ((addr1.addrType == Address.remoteStationAddr) or (addr1.addrType == Address.remoteBroadcastAddr)) \ - and (addr1.addrNet == addr2.addrNet) - elif (addr2.addrType == Address.remoteStationAddr): - # match a specific remote station - return (addr1.addrType == Address.remoteStationAddr) and \ - (addr1.addrNet == addr2.addrNet) and (addr1.addrAddr == addr2.addrAddr) - elif (addr2.addrType == Address.globalBroadcastAddr): - # match a global broadcast address - return (addr1.addrType == Address.globalBroadcastAddr) - else: - raise RuntimeError, "invalid match combination" - -# -# WhoIsRouterToNetworkSummary -# - -class WhoIsRouterToNetworkSummary(Tracer, Logging): - - def __init__(self): - if _debug: IAmRouterToNetworkSummary._debug("__init__") - Tracer.__init__(self, self.Filter) - - def Filter(self, pkt): - if _debug: WhoIsRouterToNetworkSummary._debug("Filter %r", pkt) - global requests, networks - - # check for the packet type - if not isinstance(pkt, WhoIsRouterToNetwork): - return - - # apply the filters - if filterSource: - if not Match(pkt.pduSource, filterSource): - if _debug: WhoIsRouterToNetworkSummary._debug(" - source filter fail") - return - if filterDestination: - if not Match(pkt.pduDestination, filterDestination): - if _debug: WhoIsRouterToNetworkSummary._debug(" - destination filter fail") - return - if filterHost: - if (not Match(pkt.pduSource, filterHost)) and (not Match(pkt.pduDestination, filterHost)): - if _debug: WhoIsRouterToNetworkSummary._debug(" - host filter fail") - return - - # count it - requests[pkt.pduSource] += 1 - networks[pkt.pduSource].append(pkt.wirtnNetwork) - -# -# __main__ -# - -try: - if ('--debug' in sys.argv): - indx = sys.argv.index('--debug') - for i in range(indx+1, len(sys.argv)): - ConsoleLogHandler(sys.argv[i]) - del sys.argv[indx:] - - if _debug: _log.debug("initialization") - - # check for src - if ('--src' in sys.argv): - i = sys.argv.index('--src') - filterSource = Address(sys.argv[i+1]) - if _debug: _log.debug(" - filterSource: %r", filterSource) - del sys.argv[i:i+2] - - # check for dest - if ('--dest' in sys.argv): - i = sys.argv.index('--dest') - filterDestination = Address(sys.argv[i+1]) - if _debug: _log.debug(" - filterDestination: %r", filterDestination) - del sys.argv[i:i+2] - - # check for host - if ('--host' in sys.argv): - i = sys.argv.index('--host') - filterHost = Address(sys.argv[i+1]) - if _debug: _log.debug(" - filterHost: %r", filterHost) - del sys.argv[i:i+2] - - # trace the file(s) - for fname in sys.argv[1:]: - trace(fname, [WhoIsRouterToNetworkSummary]) - - # sort the result, descending order by count - items = requests.items() - items.sort(lambda x, y: cmp(y[1], x[1])) - - # print everything out - print "%-20s %5s" % ("Address", "Count") - for key, count in items: - print "%-20s %5d" % (key, count) - - # count the number of times of each network - net_count = defaultdict(int) - for net in networks[key]: - net_count[net] += 1 - - # sort descending - net_count = net_count.items() - net_count.sort(lambda x, y: cmp(y[1], x[1])) - - for net, count in net_count: - print " %5d %5d" % (net, count) - -except KeyboardInterrupt: - pass -except Exception, e: - _log.exception("an error has occurred: %s", e) -finally: - if _debug: _log.debug("finally") - diff --git a/py25/bacpypes/__init__.py b/py25/bacpypes/__init__.py index f11670ee..2090886f 100755 --- a/py25/bacpypes/__init__.py +++ b/py25/bacpypes/__init__.py @@ -18,7 +18,7 @@ # Project Metadata # -__version__ = '0.18.1' +__version__ = '0.18.3' __author__ = 'Joel Bender' __email__ = 'joel@carrickbender.com' diff --git a/py25/bacpypes/basetypes.py b/py25/bacpypes/basetypes.py index 624de8ad..9f0150a9 100755 --- a/py25/bacpypes/basetypes.py +++ b/py25/bacpypes/basetypes.py @@ -2198,9 +2198,9 @@ class EventParameterUnsignedRange(Sequence): ] class EventParameterAccessEventAccessEvent(Sequence): - sequenceevents = \ + sequenceElements = \ [ Element('listOfAccessEvents', SequenceOf(AccessEvent), 0) - , Element('accessEventTimeReference', DeviceObjectPropertyReference, 0) + , Element('accessEventTimeReference', DeviceObjectPropertyReference, 1) ] class EventParameterAccessEvent(Sequence): diff --git a/py27/bacpypes/__init__.py b/py27/bacpypes/__init__.py index f11670ee..2090886f 100755 --- a/py27/bacpypes/__init__.py +++ b/py27/bacpypes/__init__.py @@ -18,7 +18,7 @@ # Project Metadata # -__version__ = '0.18.1' +__version__ = '0.18.3' __author__ = 'Joel Bender' __email__ = 'joel@carrickbender.com' diff --git a/py27/bacpypes/basetypes.py b/py27/bacpypes/basetypes.py index 31b93733..c2dd232f 100755 --- a/py27/bacpypes/basetypes.py +++ b/py27/bacpypes/basetypes.py @@ -2197,9 +2197,9 @@ class EventParameterUnsignedRange(Sequence): ] class EventParameterAccessEventAccessEvent(Sequence): - sequenceevents = \ + sequenceElements = \ [ Element('listOfAccessEvents', SequenceOf(AccessEvent), 0) - , Element('accessEventTimeReference', DeviceObjectPropertyReference, 0) + , Element('accessEventTimeReference', DeviceObjectPropertyReference, 1) ] class EventParameterAccessEvent(Sequence): diff --git a/py27/bacpypes/local/object.py b/py27/bacpypes/local/object.py index 7932042f..e2c62eae 100644 --- a/py27/bacpypes/local/object.py +++ b/py27/bacpypes/local/object.py @@ -94,7 +94,7 @@ class CurrentPropertyListMixIn(Object): u"A-Za-z" u"\u00C0-\u00D6\u00D8-\u00F6\u00F8-\u02FF\u0370-\u037D\u037F-\u1FFF" u"\u200C-\u200D\u2070-\u218F\u2C00-\u2FEF\u3001-\uD7FF\uF900-\uFDCF" - u"\uFDF0-\uFFFD\U00010000-\U000EFFFF" + # u"\uFDF0-\uFFFD\U00010000-\U000EFFFF" - python2.7 on MacOS doesn't like it ) PN_CHARS_U = PN_CHARS_BASE + u"_" diff --git a/py34/bacpypes/__init__.py b/py34/bacpypes/__init__.py index 96e60d08..f1acb644 100755 --- a/py34/bacpypes/__init__.py +++ b/py34/bacpypes/__init__.py @@ -18,7 +18,7 @@ # Project Metadata # -__version__ = '0.18.1' +__version__ = '0.18.3' __author__ = 'Joel Bender' __email__ = 'joel@carrickbender.com' diff --git a/py34/bacpypes/basetypes.py b/py34/bacpypes/basetypes.py index 31b93733..c2dd232f 100755 --- a/py34/bacpypes/basetypes.py +++ b/py34/bacpypes/basetypes.py @@ -2197,9 +2197,9 @@ class EventParameterUnsignedRange(Sequence): ] class EventParameterAccessEventAccessEvent(Sequence): - sequenceevents = \ + sequenceElements = \ [ Element('listOfAccessEvents', SequenceOf(AccessEvent), 0) - , Element('accessEventTimeReference', DeviceObjectPropertyReference, 0) + , Element('accessEventTimeReference', DeviceObjectPropertyReference, 1) ] class EventParameterAccessEvent(Sequence): diff --git a/samples/COVServer.py b/samples/COVServer.py index 4c32dcda..3182362b 100755 --- a/samples/COVServer.py +++ b/samples/COVServer.py @@ -17,7 +17,13 @@ from bacpypes.task import RecurringTask from bacpypes.app import BIPSimpleApplication -from bacpypes.object import AnalogValueObject, BinaryValueObject +from bacpypes.primitivedata import Real +from bacpypes.object import ( + WritableProperty, + AnalogValueObject, + BinaryValueObject, + register_object_type, +) from bacpypes.local.device import LocalDeviceObject from bacpypes.service.cov import ChangeOfValueServices @@ -30,25 +36,34 @@ test_bv = None test_application = None + +@register_object_type +class WritableAnalogValueObject(AnalogValueObject): + properties = [WritableProperty("presentValue", Real)] + + # # SubscribeCOVApplication # + @bacpypes_debugging class SubscribeCOVApplication(BIPSimpleApplication, ChangeOfValueServices): pass + # # COVConsoleCmd # + @bacpypes_debugging class COVConsoleCmd(ConsoleCmd): - def do_status(self, args): """status""" args = args.split() - if _debug: COVConsoleCmd._debug("do_status %r", args) + if _debug: + COVConsoleCmd._debug("do_status %r", args) global test_application # dump from the COV detections dict @@ -56,17 +71,20 @@ def do_status(self, args): print("{} {}".format(obj_ref.objectIdentifier, obj_ref)) for cov_subscription in cov_detection.cov_subscriptions: - print(" {} proc_id={} confirmed={} lifetime={}".format( - cov_subscription.client_addr, - cov_subscription.proc_id, - cov_subscription.confirmed, - cov_subscription.lifetime, - )) + print( + " {} proc_id={} confirmed={} lifetime={}".format( + cov_subscription.client_addr, + cov_subscription.proc_id, + cov_subscription.confirmed, + cov_subscription.lifetime, + ) + ) def do_trigger(self, args): """trigger object_name""" args = args.split() - if _debug: COVConsoleCmd._debug("do_trigger %r", args) + if _debug: + COVConsoleCmd._debug("do_trigger %r", args) global test_application if not args: @@ -90,43 +108,51 @@ def do_trigger(self, args): def do_set(self, args): """set object_name [ . ] property_name [ = ] value""" args = args.split() - if _debug: COVConsoleCmd._debug("do_set %r", args) + if _debug: + COVConsoleCmd._debug("do_set %r", args) global test_application try: object_name = args.pop(0) - if '.' in object_name: - object_name, property_name = object_name.split('.') + if "." in object_name: + object_name, property_name = object_name.split(".") else: property_name = args.pop(0) - if _debug: COVConsoleCmd._debug(" - object_name: %r", object_name) - if _debug: COVConsoleCmd._debug(" - property_name: %r", property_name) + if _debug: + COVConsoleCmd._debug(" - object_name: %r", object_name) + if _debug: + COVConsoleCmd._debug(" - property_name: %r", property_name) obj = test_application.get_object_name(object_name) - if _debug: COVConsoleCmd._debug(" - obj: %r", obj) + if _debug: + COVConsoleCmd._debug(" - obj: %r", obj) if not obj: raise RuntimeError("object not found: %r" % (object_name,)) datatype = obj.get_datatype(property_name) - if _debug: COVConsoleCmd._debug(" - datatype: %r", datatype) + if _debug: + COVConsoleCmd._debug(" - datatype: %r", datatype) if not datatype: raise RuntimeError("not a property: %r" % (property_name,)) # toss the equals - if args[0] == '=': + if args[0] == "=": args.pop(0) # evaluate the value value = eval(args.pop(0)) - if _debug: COVConsoleCmd._debug(" - raw value: %r", value) + if _debug: + COVConsoleCmd._debug(" - raw value: %r", value) # see if it can be built obj_value = datatype(value) - if _debug: COVConsoleCmd._debug(" - obj_value: %r", obj_value) + if _debug: + COVConsoleCmd._debug(" - obj_value: %r", obj_value) # normalize value = obj_value.value - if _debug: COVConsoleCmd._debug(" - normalized value: %r", value) + if _debug: + COVConsoleCmd._debug(" - normalized value: %r", value) # change the value setattr(obj, property_name, value) @@ -139,43 +165,51 @@ def do_set(self, args): def do_write(self, args): """write object_name [ . ] property [ = ] value""" args = args.split() - if _debug: COVConsoleCmd._debug("do_set %r", args) + if _debug: + COVConsoleCmd._debug("do_set %r", args) global test_application try: object_name = args.pop(0) - if '.' in object_name: - object_name, property_name = object_name.split('.') + if "." in object_name: + object_name, property_name = object_name.split(".") else: property_name = args.pop(0) - if _debug: COVConsoleCmd._debug(" - object_name: %r", object_name) - if _debug: COVConsoleCmd._debug(" - property_name: %r", property_name) + if _debug: + COVConsoleCmd._debug(" - object_name: %r", object_name) + if _debug: + COVConsoleCmd._debug(" - property_name: %r", property_name) obj = test_application.get_object_name(object_name) - if _debug: COVConsoleCmd._debug(" - obj: %r", obj) + if _debug: + COVConsoleCmd._debug(" - obj: %r", obj) if not obj: raise RuntimeError("object not found: %r" % (object_name,)) datatype = obj.get_datatype(property_name) - if _debug: COVConsoleCmd._debug(" - datatype: %r", datatype) + if _debug: + COVConsoleCmd._debug(" - datatype: %r", datatype) if not datatype: raise RuntimeError("not a property: %r" % (property_name,)) # toss the equals - if args[0] == '=': + if args[0] == "=": args.pop(0) # evaluate the value value = eval(args.pop(0)) - if _debug: COVConsoleCmd._debug(" - raw value: %r", value) + if _debug: + COVConsoleCmd._debug(" - raw value: %r", value) # see if it can be built obj_value = datatype(value) - if _debug: COVConsoleCmd._debug(" - obj_value: %r", obj_value) + if _debug: + COVConsoleCmd._debug(" - obj_value: %r", obj_value) # normalize value = obj_value.value - if _debug: COVConsoleCmd._debug(" - normalized value: %r", value) + if _debug: + COVConsoleCmd._debug(" - normalized value: %r", value) # pass it along obj.WriteProperty(property_name, value) @@ -196,20 +230,23 @@ class TestAnalogValueTask(RecurringTask): """ def __init__(self, interval): - if _debug: TestAnalogValueTask._debug("__init__ %r", interval) + if _debug: + TestAnalogValueTask._debug("__init__ %r", interval) RecurringTask.__init__(self, interval * 1000) # make a list of test values self.test_values = list(float(i * 10) for i in range(10)) def process_task(self): - if _debug: TestAnalogValueTask._debug("process_task") + if _debug: + TestAnalogValueTask._debug("process_task") global test_av # pop the next value next_value = self.test_values.pop(0) self.test_values.append(next_value) - if _debug: TestAnalogValueTask._debug(" - next_value: %r", next_value) + if _debug: + TestAnalogValueTask._debug(" - next_value: %r", next_value) # change the point test_av.presentValue = next_value @@ -225,7 +262,8 @@ class TestAnalogValueThread(Thread): """ def __init__(self, interval): - if _debug: TestAnalogValueThread._debug("__init__ %r", interval) + if _debug: + TestAnalogValueThread._debug("__init__ %r", interval) Thread.__init__(self) # runs as a daemon @@ -238,14 +276,16 @@ def __init__(self, interval): self.test_values = list(100.0 + float(i * 10) for i in range(10)) def run(self): - if _debug: TestAnalogValueThread._debug("run") + if _debug: + TestAnalogValueThread._debug("run") global test_av while True: # pop the next value next_value = self.test_values.pop(0) self.test_values.append(next_value) - if _debug: TestAnalogValueThread._debug(" - next_value: %r", next_value) + if _debug: + TestAnalogValueThread._debug(" - next_value: %r", next_value) # change the point test_av.presentValue = next_value @@ -264,7 +304,8 @@ class TestBinaryValueTask(RecurringTask): """ def __init__(self, interval): - if _debug: TestBinaryValueTask._debug("__init__ %r", interval) + if _debug: + TestBinaryValueTask._debug("__init__ %r", interval) RecurringTask.__init__(self, interval * 1000) # save the interval @@ -274,13 +315,15 @@ def __init__(self, interval): self.test_values = [True, False] def process_task(self): - if _debug: TestBinaryValueTask._debug("process_task") + if _debug: + TestBinaryValueTask._debug("process_task") global test_bv # pop the next value next_value = self.test_values.pop(0) self.test_values.append(next_value) - if _debug: TestBinaryValueTask._debug(" - next_value: %r", next_value) + if _debug: + TestBinaryValueTask._debug(" - next_value: %r", next_value) # change the point test_bv.presentValue = next_value @@ -296,7 +339,8 @@ class TestBinaryValueThread(RecurringTask, Thread): """ def __init__(self, interval): - if _debug: TestBinaryValueThread._debug("__init__ %r", interval) + if _debug: + TestBinaryValueThread._debug("__init__ %r", interval) Thread.__init__(self) # runs as a daemon @@ -309,14 +353,16 @@ def __init__(self, interval): self.test_values = [True, False] def run(self): - if _debug: TestBinaryValueThread._debug("run") + if _debug: + TestBinaryValueThread._debug("run") global test_bv while True: # pop the next value next_value = self.test_values.pop(0) self.test_values.append(next_value) - if _debug: TestBinaryValueThread._debug(" - next_value: %r", next_value) + if _debug: + TestBinaryValueThread._debug(" - next_value: %r", next_value) # change the point test_bv.presentValue = next_value @@ -330,55 +376,55 @@ def main(): # make a parser parser = ConfigArgumentParser(description=__doc__) - parser.add_argument("--console", - action="store_true", - default=False, - help="create a console", - ) + parser.add_argument( + "--console", action="store_true", default=False, help="create a console", + ) # analog value task and thread - parser.add_argument("--avtask", type=float, - help="analog value recurring task", - ) - parser.add_argument("--avthread", type=float, - help="analog value thread", - ) + parser.add_argument( + "--avtask", type=float, help="analog value recurring task", + ) + parser.add_argument( + "--avthread", type=float, help="analog value thread", + ) # analog value task and thread - parser.add_argument("--bvtask", type=float, - help="binary value recurring task", - ) - parser.add_argument("--bvthread", type=float, - help="binary value thread", - ) + parser.add_argument( + "--bvtask", type=float, help="binary value recurring task", + ) + parser.add_argument( + "--bvthread", type=float, help="binary value thread", + ) # provide a different spin value - parser.add_argument("--spin", type=float, - help="spin time", - default=1.0, - ) + parser.add_argument( + "--spin", type=float, help="spin time", default=1.0, + ) # parse the command line arguments args = parser.parse_args() - if _debug: _log.debug("initialization") - if _debug: _log.debug(" - args: %r", args) + if _debug: + _log.debug("initialization") + if _debug: + _log.debug(" - args: %r", args) # make a device object this_device = LocalDeviceObject(ini=args.ini) - if _debug: _log.debug(" - this_device: %r", this_device) + if _debug: + _log.debug(" - this_device: %r", this_device) # make a sample application test_application = SubscribeCOVApplication(this_device, args.ini.address) # make an analog value object - test_av = AnalogValueObject( - objectIdentifier=('analogValue', 1), - objectName='av', + test_av = WritableAnalogValueObject( + objectIdentifier=("analogValue", 1), + objectName="av", presentValue=0.0, statusFlags=[0, 0, 0, 0], covIncrement=1.0, - ) + ) _log.debug(" - test_av: %r", test_av) # add it to the device @@ -387,11 +433,11 @@ def main(): # make a binary value object test_bv = BinaryValueObject( - objectIdentifier=('binaryValue', 1), - objectName='bv', - presentValue='inactive', + objectIdentifier=("binaryValue", 1), + objectName="bv", + presentValue="inactive", statusFlags=[0, 0, 0, 0], - ) + ) _log.debug(" - test_bv: %r", test_bv) # add it to the device diff --git a/samples/Tutorial/WhoIsIAm.py b/samples/Tutorial/WhoIsIAm.py deleted file mode 100644 index d58455c5..00000000 --- a/samples/Tutorial/WhoIsIAm.py +++ /dev/null @@ -1,181 +0,0 @@ -#!/usr/bin/env python - -""" -This application presents a 'console' prompt to the user asking for Who-Is and I-Am -commands which create the related APDUs, then lines up the corresponding I-Am -for incoming traffic and prints out the contents. -""" - -import sys - -from bacpypes.debugging import bacpypes_debugging, ModuleLogger -from bacpypes.consolelogging import ConfigArgumentParser -from bacpypes.consolecmd import ConsoleCmd - -from bacpypes.core import run, enable_sleeping - -from bacpypes.pdu import Address, GlobalBroadcast -from bacpypes.apdu import WhoIsRequest, IAmRequest -from bacpypes.errors import DecodingError - -from bacpypes.app import BIPSimpleApplication -from bacpypes.local.device import LocalDeviceObject - -# some debugging -_debug = 1 -_log = ModuleLogger(globals()) - -# globals -this_device = None -this_application = None - -# -# WhoIsIAmApplication -# - -@bacpypes_debugging -class WhoIsIAmApplication(BIPSimpleApplication): - - def __init__(self, *args): - if _debug: WhoIsIAmApplication._debug("__init__ %r", args) - BIPSimpleApplication.__init__(self, *args) - - # keep track of requests to line up responses - self._request = None - - def request(self, apdu): - if _debug: WhoIsIAmApplication._debug("request %r", apdu) - - # save a copy of the request - self._request = apdu - - # forward it along - BIPSimpleApplication.request(self, apdu) - - def confirmation(self, apdu): - if _debug: WhoIsIAmApplication._debug("confirmation %r", apdu) - - # forward it along - BIPSimpleApplication.confirmation(self, apdu) - - def indication(self, apdu): - if _debug: WhoIsIAmApplication._debug("indication %r", apdu) - - if (isinstance(self._request, WhoIsRequest)) and (isinstance(apdu, IAmRequest)): - device_type, device_instance = apdu.iAmDeviceIdentifier - if device_type != 'device': - raise DecodingError("invalid object type") - - if (self._request.deviceInstanceRangeLowLimit is not None) and \ - (device_instance < self._request.deviceInstanceRangeLowLimit): - pass - elif (self._request.deviceInstanceRangeHighLimit is not None) and \ - (device_instance > self._request.deviceInstanceRangeHighLimit): - pass - else: - # print out the contents - sys.stdout.write('pduSource = ' + repr(apdu.pduSource) + '\n') - sys.stdout.write('iAmDeviceIdentifier = ' + str(apdu.iAmDeviceIdentifier) + '\n') - sys.stdout.write('maxAPDULengthAccepted = ' + str(apdu.maxAPDULengthAccepted) + '\n') - sys.stdout.write('segmentationSupported = ' + str(apdu.segmentationSupported) + '\n') - sys.stdout.write('vendorID = ' + str(apdu.vendorID) + '\n') - sys.stdout.flush() - - # forward it along - BIPSimpleApplication.indication(self, apdu) - - -# -# WhoIsIAmConsoleCmd -# - -@bacpypes_debugging -class WhoIsIAmConsoleCmd(ConsoleCmd): - - def do_whois(self, args): - """whois [ ] [ ]""" - args = args.split() - if _debug: WhoIsIAmConsoleCmd._debug("do_whois %r", args) - - try: - # gather the parameters - if (len(args) == 1) or (len(args) == 3): - addr = Address(args[0]) - del args[0] - else: - addr = GlobalBroadcast() - - if len(args) == 2: - lolimit = int(args[0]) - hilimit = int(args[1]) - else: - lolimit = hilimit = None - - # code lives in the device service - this_application.who_is(lolimit, hilimit, addr) - - except Exception as error: - WhoIsIAmConsoleCmd._exception("exception: %r", error) - - def do_iam(self, args): - """iam""" - args = args.split() - if _debug: WhoIsIAmConsoleCmd._debug("do_iam %r", args) - - # code lives in the device service - this_application.i_am() - - def do_rtn(self, args): - """rtn ... """ - args = args.split() - if _debug: WhoIsIAmConsoleCmd._debug("do_rtn %r", args) - - # provide the address and a list of network numbers - router_address = Address(args[0]) - network_list = [int(arg) for arg in args[1:]] - - # pass along to the service access point - this_application.nsap.update_router_references(None, router_address, network_list) - - -# -# __main__ -# - -def main(): - global this_device - global this_application - - # parse the command line arguments - args = ConfigArgumentParser(description=__doc__).parse_args() - - if _debug: _log.debug("initialization") - if _debug: _log.debug(" - args: %r", args) - - # make a device object - this_device = LocalDeviceObject( - objectName=args.ini.objectname, - objectIdentifier=int(args.ini.objectidentifier), - maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), - segmentationSupported=args.ini.segmentationsupported, - vendorIdentifier=int(args.ini.vendoridentifier), - ) - - # make a simple application - this_application = WhoIsIAmApplication(this_device, args.ini.address) - - # make a console - this_console = WhoIsIAmConsoleCmd() - if _debug: _log.debug(" - this_console: %r", this_console) - - # enable sleeping will help with threads - enable_sleeping() - - _log.debug("running") - - run() - - _log.debug("fini") - -if __name__ == "__main__": - main() diff --git a/samples/WhoIsIAm.py b/samples/WhoIsIAm.py old mode 100755 new mode 100644 index 71a96bb7..6b64b0f2 --- a/samples/WhoIsIAm.py +++ b/samples/WhoIsIAm.py @@ -2,7 +2,7 @@ """ This application presents a 'console' prompt to the user asking for Who-Is and I-Am -commands which create the related APDUs, then lines up the coorresponding I-Am +commands which create the related APDUs, then lines up the corresponding I-Am for incoming traffic and prints out the contents. """ @@ -12,8 +12,7 @@ from bacpypes.consolelogging import ConfigArgumentParser from bacpypes.consolecmd import ConsoleCmd -from bacpypes.core import run, deferred, enable_sleeping -from bacpypes.iocb import IOCB +from bacpypes.core import run, enable_sleeping from bacpypes.pdu import Address, GlobalBroadcast from bacpypes.apdu import WhoIsRequest, IAmRequest @@ -23,7 +22,7 @@ from bacpypes.local.device import LocalDeviceObject # some debugging -_debug = 0 +_debug = 1 _log = ModuleLogger(globals()) # globals @@ -68,10 +67,10 @@ def indication(self, apdu): raise DecodingError("invalid object type") if (self._request.deviceInstanceRangeLowLimit is not None) and \ - (device_instance < self._request.deviceInstanceRangeLowLimit): + (device_instance < self._request.deviceInstanceRangeLowLimit): pass elif (self._request.deviceInstanceRangeHighLimit is not None) and \ - (device_instance > self._request.deviceInstanceRangeHighLimit): + (device_instance > self._request.deviceInstanceRangeHighLimit): pass else: # print out the contents @@ -85,6 +84,7 @@ def indication(self, apdu): # forward it along BIPSimpleApplication.indication(self, apdu) + # # WhoIsIAmConsoleCmd # @@ -93,60 +93,37 @@ def indication(self, apdu): class WhoIsIAmConsoleCmd(ConsoleCmd): def do_whois(self, args): - """whois [ ] [ ]""" + """whois [ ] [ ]""" args = args.split() if _debug: WhoIsIAmConsoleCmd._debug("do_whois %r", args) try: - # build a request - request = WhoIsRequest() + # gather the parameters if (len(args) == 1) or (len(args) == 3): - request.pduDestination = Address(args[0]) + addr = Address(args[0]) del args[0] else: - request.pduDestination = GlobalBroadcast() + addr = GlobalBroadcast() if len(args) == 2: - request.deviceInstanceRangeLowLimit = int(args[0]) - request.deviceInstanceRangeHighLimit = int(args[1]) - if _debug: WhoIsIAmConsoleCmd._debug(" - request: %r", request) - - # make an IOCB - iocb = IOCB(request) - if _debug: WhoIsIAmConsoleCmd._debug(" - iocb: %r", iocb) + lolimit = int(args[0]) + hilimit = int(args[1]) + else: + lolimit = hilimit = None - # give it to the application - this_application.request_io(iocb) + # code lives in the device service + this_application.who_is(lolimit, hilimit, addr) - except Exception as err: - WhoIsIAmConsoleCmd._exception("exception: %r", err) + except Exception as error: + WhoIsIAmConsoleCmd._exception("exception: %r", error) def do_iam(self, args): """iam""" args = args.split() if _debug: WhoIsIAmConsoleCmd._debug("do_iam %r", args) - try: - # build a request - request = IAmRequest() - request.pduDestination = GlobalBroadcast() - - # set the parameters from the device object - request.iAmDeviceIdentifier = this_device.objectIdentifier - request.maxAPDULengthAccepted = this_device.maxApduLengthAccepted - request.segmentationSupported = this_device.segmentationSupported - request.vendorID = this_device.vendorIdentifier - if _debug: WhoIsIAmConsoleCmd._debug(" - request: %r", request) - - # make an IOCB - iocb = IOCB(request) - if _debug: WhoIsIAmConsoleCmd._debug(" - iocb: %r", iocb) - - # give it to the application - this_application.request_io(iocb) - - except Exception as err: - WhoIsIAmConsoleCmd._exception("exception: %r", err) + # code lives in the device service + this_application.i_am() def do_rtn(self, args): """rtn ... """ @@ -162,11 +139,12 @@ def do_rtn(self, args): # -# main +# __main__ # def main(): - global this_device, this_application + global this_device + global this_application # parse the command line arguments args = ConfigArgumentParser(description=__doc__).parse_args() @@ -175,14 +153,16 @@ def main(): if _debug: _log.debug(" - args: %r", args) # make a device object - this_device = LocalDeviceObject(ini=args.ini) - if _debug: _log.debug(" - this_device: %r", this_device) + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) # make a simple application - this_application = WhoIsIAmApplication( - this_device, args.ini.address, - ) - if _debug: _log.debug(" - this_application: %r", this_application) + this_application = WhoIsIAmApplication(this_device, args.ini.address) # make a console this_console = WhoIsIAmConsoleCmd() @@ -197,6 +177,5 @@ def main(): _log.debug("fini") - if __name__ == "__main__": main() diff --git a/sandbox/cov_analog_input_object.py b/sandbox/cov_analog_input_object.py new file mode 100755 index 00000000..2cb59182 --- /dev/null +++ b/sandbox/cov_analog_input_object.py @@ -0,0 +1,378 @@ +#!/usr/bin/env python + +""" +This sample application is a server that supports COV notification services. +The console accepts commands that change the properties of an object that +triggers the notifications. +""" + +import time +from threading import Thread + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser +from bacpypes.consolecmd import ConsoleCmd + +from bacpypes.core import run, deferred, enable_sleeping +from bacpypes.task import RecurringTask + +from bacpypes.app import BIPSimpleApplication +from bacpypes.object import AnalogInputObject +from bacpypes.local.device import LocalDeviceObject +from bacpypes.service.cov import ChangeOfValueServices + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# test globals +test_ai = None +test_application = None + +# +# SubscribeCOVApplication +# + + +@bacpypes_debugging +class SubscribeCOVApplication(BIPSimpleApplication, ChangeOfValueServices): + pass + + +# +# COVConsoleCmd +# + + +@bacpypes_debugging +class COVConsoleCmd(ConsoleCmd): + def do_status(self, args): + """status""" + args = args.split() + if _debug: + COVConsoleCmd._debug("do_status %r", args) + global test_application + + # dump from the COV detections dict + for obj_ref, cov_detection in test_application.cov_detections.items(): + print("{} {}".format(obj_ref.objectIdentifier, obj_ref)) + + for cov_subscription in cov_detection.cov_subscriptions: + print( + " {} proc_id={} confirmed={} lifetime={}".format( + cov_subscription.client_addr, + cov_subscription.proc_id, + cov_subscription.confirmed, + cov_subscription.lifetime, + ) + ) + + def do_trigger(self, args): + """trigger object_name""" + args = args.split() + if _debug: + COVConsoleCmd._debug("do_trigger %r", args) + global test_application + + if not args: + print("object name required") + return + + obj = test_application.get_object_name(args[0]) + if not obj: + print("no such object") + return + + # get the detection algorithm object + cov_detection = test_application.cov_detections.get(obj, None) + if (not cov_detection) or (len(cov_detection.cov_subscriptions) == 0): + print("no subscriptions for that object") + return + + # tell it to send out notifications + cov_detection.send_cov_notifications() + + def do_set(self, args): + """set object_name [ . ] property_name [ = ] value""" + args = args.split() + if _debug: + COVConsoleCmd._debug("do_set %r", args) + global test_application + + try: + object_name = args.pop(0) + if "." in object_name: + object_name, property_name = object_name.split(".") + else: + property_name = args.pop(0) + if _debug: + COVConsoleCmd._debug(" - object_name: %r", object_name) + if _debug: + COVConsoleCmd._debug(" - property_name: %r", property_name) + + obj = test_application.get_object_name(object_name) + if _debug: + COVConsoleCmd._debug(" - obj: %r", obj) + if not obj: + raise RuntimeError("object not found: %r" % (object_name,)) + + datatype = obj.get_datatype(property_name) + if _debug: + COVConsoleCmd._debug(" - datatype: %r", datatype) + if not datatype: + raise RuntimeError("not a property: %r" % (property_name,)) + + # toss the equals + if args[0] == "=": + args.pop(0) + + # evaluate the value + value = eval(args.pop(0)) + if _debug: + COVConsoleCmd._debug(" - raw value: %r", value) + + # see if it can be built + obj_value = datatype(value) + if _debug: + COVConsoleCmd._debug(" - obj_value: %r", obj_value) + + # normalize + value = obj_value.value + if _debug: + COVConsoleCmd._debug(" - normalized value: %r", value) + + # change the value + setattr(obj, property_name, value) + + except IndexError: + print(COVConsoleCmd.do_set.__doc__) + except Exception as err: + print("exception: %s" % (err,)) + + def do_write(self, args): + """write object_name [ . ] property [ = ] value""" + args = args.split() + if _debug: + COVConsoleCmd._debug("do_set %r", args) + global test_application + + try: + object_name = args.pop(0) + if "." in object_name: + object_name, property_name = object_name.split(".") + else: + property_name = args.pop(0) + if _debug: + COVConsoleCmd._debug(" - object_name: %r", object_name) + if _debug: + COVConsoleCmd._debug(" - property_name: %r", property_name) + + obj = test_application.get_object_name(object_name) + if _debug: + COVConsoleCmd._debug(" - obj: %r", obj) + if not obj: + raise RuntimeError("object not found: %r" % (object_name,)) + + datatype = obj.get_datatype(property_name) + if _debug: + COVConsoleCmd._debug(" - datatype: %r", datatype) + if not datatype: + raise RuntimeError("not a property: %r" % (property_name,)) + + # toss the equals + if args[0] == "=": + args.pop(0) + + # evaluate the value + value = eval(args.pop(0)) + if _debug: + COVConsoleCmd._debug(" - raw value: %r", value) + + # see if it can be built + obj_value = datatype(value) + if _debug: + COVConsoleCmd._debug(" - obj_value: %r", obj_value) + + # normalize + value = obj_value.value + if _debug: + COVConsoleCmd._debug(" - normalized value: %r", value) + + # pass it along + obj.WriteProperty(property_name, value) + + except IndexError: + print(COVConsoleCmd.do_write.__doc__) + except Exception as err: + print("exception: %s" % (err,)) + + +@bacpypes_debugging +class TestAnalogInputTask(RecurringTask): + + """ + An instance of this class is created when '--aitask ' is + specified as a command line argument. Every seconds it + changes the value of the test_ai present value. + """ + + def __init__(self, interval): + if _debug: + TestAnalogInputTask._debug("__init__ %r", interval) + RecurringTask.__init__(self, interval * 1000) + + # make a list of test values + self.test_values = list(float(i * 10) for i in range(10)) + + def process_task(self): + if _debug: + TestAnalogInputTask._debug("process_task") + global test_ai + + # pop the next value + next_value = self.test_values.pop(0) + self.test_values.append(next_value) + if _debug: + TestAnalogInputTask._debug(" - next_value: %r", next_value) + + # change the point + test_ai.presentValue = next_value + + +@bacpypes_debugging +class TestAnalogInputThread(Thread): + + """ + An instance of this class is created when '--aithread ' is + specified as a command line argument. Every seconds it + changes the value of the test_ai present value. + """ + + def __init__(self, interval): + if _debug: + TestAnalogInputThread._debug("__init__ %r", interval) + Thread.__init__(self) + + # runs as a daemon + self.daemon = True + + # save the interval + self.interval = interval + + # make a list of test values + self.test_values = list(100.0 + float(i * 10) for i in range(10)) + + def run(self): + if _debug: + TestAnalogInputThread._debug("run") + global test_ai + + while True: + # pop the next value + next_value = self.test_values.pop(0) + self.test_values.append(next_value) + if _debug: + TestAnalogInputThread._debug(" - next_value: %r", next_value) + + # change the point + test_ai.presentValue = next_value + + # sleep + time.sleep(self.interval) + + +def main(): + global test_ai, test_application + + # make a parser + parser = ConfigArgumentParser(description=__doc__) + parser.add_argument( + "--console", action="store_true", default=False, help="create a console", + ) + + # analog value task and thread + parser.add_argument( + "--aitask", type=float, help="analog input recurring task", + ) + parser.add_argument( + "--aithread", type=float, help="analog input thread", + ) + + # analog value task and thread + parser.add_argument( + "--bvtask", type=float, help="binary value recurring task", + ) + parser.add_argument( + "--bvthread", type=float, help="binary value thread", + ) + + # provide a different spin value + parser.add_argument( + "--spin", type=float, help="spin time", default=1.0, + ) + + # parse the command line arguments + args = parser.parse_args() + + if _debug: + _log.debug("initialization") + if _debug: + _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject(ini=args.ini) + if _debug: + _log.debug(" - this_device: %r", this_device) + + # make a sample application + test_application = SubscribeCOVApplication(this_device, args.ini.address) + + # make an analog value object + test_ai = AnalogInputObject( + objectIdentifier=("analogInput", 1), + objectName="ai", + presentValue=0.0, + covIncrement=0.5, + eventDetectionEnable=True, + eventEnable=[1, 1, 1], + ackedTransitions=[1, 1, 1], + notifyType=1, + reliability=1, + outOfService=False, + eventState=0, + statusFlags=[0, 0, 0, 0], + units=19, + ) + _log.debug(" - test_ai: %r", test_ai) + + # add it to the device + test_application.add_object(test_ai) + _log.debug(" - object list: %r", this_device.objectList) + + # make a console + if args.console: + test_console = COVConsoleCmd() + _log.debug(" - test_console: %r", test_console) + + # enable sleeping will help with threads + enable_sleeping() + + # analog input task + if args.aitask: + test_ai_task = TestAnalogInputTask(args.aitask) + test_ai_task.install_task() + + # analog input thread + if args.aithread: + test_ai_thread = TestAnalogInputThread(args.aithread) + deferred(test_ai_thread.start) + + _log.debug("running") + + run(args.spin) + + _log.debug("fini") + + +if __name__ == "__main__": + main() diff --git a/sandbox/io.py b/sandbox/io.py index a6d98f6a..79133657 100644 --- a/sandbox/io.py +++ b/sandbox/io.py @@ -228,7 +228,7 @@ def set_timeout(self, delay, err=TimeoutError): def __repr__(self): xid = id(self) - if (xid < 0): xid += (1L << 32) + if (xid < 0): xid += (1 << 32) sname = self.__module__ + '.' + self.__class__.__name__ desc = "(%d)" % (self.ioID,) @@ -317,7 +317,7 @@ def abort_io(self, iocb, err): # make sure we're being notified of an abort request from # the iocb we are chained from if iocb is not self.ioChain: - raise RuntimeError, "broken chain" + raise RuntimeError("broken chain") # call my own abort(), which may forward it to a controller or # be overridden by IOGroup @@ -355,7 +355,7 @@ def decode(self): iocb.ioError = self.ioError else: - raise RuntimeError, "invalid state: %d" % (self.ioState,) + raise RuntimeError("invalid state: %d" % (self.ioState,)) # # IOChain @@ -463,7 +463,7 @@ def put(self, iocb): # requests should be pending before being queued if iocb.ioState != PENDING: - raise RuntimeError, "invalid state transition" + raise RuntimeError("invalid state transition") # save that it might have been empty wasempty = not self.notempty.isSet() @@ -566,7 +566,7 @@ def __init__(self, name=None): # register the name if name is not None: if name in _local_controllers: - raise RuntimeError, "already a local controller called '%s': %r" % (name, _local_controllers[name]) + raise RuntimeError("already a local controller called '%s': %r" % (name, _local_controllers[name])) _local_controllers[name] = self def abort(self, err): @@ -600,7 +600,7 @@ def request_io(self, iocb): def process_io(self, iocb): """Figure out how to respond to this request. This must be provided by the derived class.""" - raise NotImplementedError, "IOController must implement process_io()" + raise NotImplementedError("IOController must implement process_io()") def active_io(self, iocb): """Called by a handler to notify the controller that a request is @@ -609,7 +609,7 @@ def active_io(self, iocb): # requests should be idle or pending before coming active if (iocb.ioState != IDLE) and (iocb.ioState != PENDING): - raise RuntimeError, "invalid state transition (currently %d)" % (iocb.ioState,) + raise RuntimeError("invalid state transition (currently %d)" % (iocb.ioState,)) # change the state iocb.ioState = ACTIVE @@ -737,7 +737,7 @@ def request_io(self, iocb): def process_io(self, iocb): """Figure out how to respond to this request. This must be provided by the derived class.""" - raise NotImplementedError, "IOController must implement process_io()" + raise NotImplementedError("IOController must implement process_io()") def active_io(self, iocb): """Called by a handler to notify the controller that a request is @@ -759,7 +759,7 @@ def complete_io(self, iocb, msg): # check to see if it is completing the active one if iocb is not self.active_iocb: - raise RuntimeError, "not the current iocb" + raise RuntimeError("not the current iocb") # normal completion IOController.complete_io(self, iocb, msg) @@ -845,7 +845,7 @@ def _wait_trigger(self): # make sure we are waiting if (self.state != CTRL_WAITING): - raise RuntimeError, "not waiting" + raise RuntimeError("not waiting") # change our state self.state = CTRL_IDLE @@ -1017,7 +1017,7 @@ def callback(self, iocb): elif iocb.ioState == ABORTED: response = (2, clientID, iocb.ioError) else: - raise RuntimeError, "IOCB invalid state" + raise RuntimeError("IOCB invalid state") if _debug: _commlog.debug("<<< %s: S %s %r" % (_strftime(), clientAddr, response)) @@ -1224,7 +1224,7 @@ def abort_io(self, iocb, err): self.request(PDU(request, destination=(iocb.ioServerRef, PORT))) else: - raise RuntimeError, "no reference to aborting iocb: %r" % (iocb.ioID,) + raise RuntimeError("no reference to aborting iocb: %r" % (iocb.ioID,)) # change the state iocb.ioState = ABORTED @@ -1264,7 +1264,7 @@ def abort_iocb(self, addr, iocbid, err): if _debug: IOProxyServer._debug("abort_iocb %r %r %r", addr, iocbid, err) if not self.localIOCB.has_key(iocbid): - raise RuntimeError, "no reference to aborting iocb: %r" % (iocbid,) + raise RuntimeError("no reference to aborting iocb: %r" % (iocbid,)) # get the iocb iocb = self.localIOCB[iocbid]