diff --git a/samples/ReadBBMD.py b/samples/ReadWriteBBMD.py similarity index 58% rename from samples/ReadBBMD.py rename to samples/ReadWriteBBMD.py index b4dd8395..700ce219 100755 --- a/samples/ReadBBMD.py +++ b/samples/ReadWriteBBMD.py @@ -13,9 +13,14 @@ from bacpypes.core import run, enable_sleeping from bacpypes.pdu import Address -from bacpypes.bvll import ReadBroadcastDistributionTable, \ - ReadBroadcastDistributionTableAck, ReadForeignDeviceTable, \ - ReadForeignDeviceTableAck, Result +from bacpypes.bvll import ( + ReadBroadcastDistributionTable, + ReadBroadcastDistributionTableAck, + ReadForeignDeviceTable, + ReadForeignDeviceTableAck, + WriteBroadcastDistributionTable, + Result, +) from bacpypes.bvllservice import AnnexJCodec, UDPMultiplexer # some debugging @@ -27,33 +32,63 @@ @bacpypes_debugging -class ReadBBMDConsoleClient(ConsoleCmd, Client): +class ReadWriteBBMDConsoleClient(ConsoleCmd, Client): def do_readbdt(self, args): """readbdt """ args = args.split() if _debug: - ReadBBMDConsoleClient._debug("do_readbdt %r", args) + ReadWriteBBMDConsoleClient._debug("do_readbdt %r", args) # build a request and send it downstream read_bdt = ReadBroadcastDistributionTable(destination=Address(args[0])) + if _debug: + ReadWriteBBMDConsoleClient._debug(" - read_bdt: %r", read_bdt) + self.request(read_bdt) def do_readfdt(self, args): """readfdt """ args = args.split() if _debug: - ReadBBMDConsoleClient._debug("do_readfdt %r", args) + ReadWriteBBMDConsoleClient._debug("do_readfdt %r", args) # build a request and send it downstream read_fdt = ReadForeignDeviceTable(destination=Address(args[0])) + if _debug: + ReadWriteBBMDConsoleClient._debug(" - read_fdt: %r", read_fdt) + self.request(read_fdt) + def do_writebdt(self, args): + """writebdt ...""" + args = args.split() + if _debug: + ReadWriteBBMDConsoleClient._debug("do_writebdt %r", args) + + # build a list of broadcast distribution table entries which just so + # happen to be BACpypes IPv4 addresses + bdt = [] + for addr in args[1:]: + bdte = Address(addr) + bdt.append(bdte) + + # build a request and send it downstream + write_bdt = WriteBroadcastDistributionTable( + destination=Address(args[0]), bdt=bdt + ) + if _debug: + ReadWriteBBMDConsoleClient._debug(" - write_bdt: %r", write_bdt) + + self.request(write_bdt) + def confirmation(self, pdu): """Filter for the acks and errors.""" if _debug: - ReadBBMDConsoleClient._debug("confirmation %r", pdu) + ReadWriteBBMDConsoleClient._debug("confirmation %r", pdu) - if isinstance(pdu, (ReadBroadcastDistributionTableAck, ReadForeignDeviceTableAck, Result)): + if isinstance( + pdu, (ReadBroadcastDistributionTableAck, ReadForeignDeviceTableAck, Result) + ): pdu.debug_contents() @@ -65,7 +100,7 @@ def main(): parser.add_argument( "local_address", help="IPv4 address", - ) + ) args = parser.parse_args() if _debug: @@ -77,7 +112,7 @@ def main(): _log.debug(" - local_address: %r", local_address) # make a console - this_console = ReadBBMDConsoleClient() + this_console = ReadWriteBBMDConsoleClient() if _debug: _log.debug(" - this_console: %r", this_console)