Skip to content

Commit

Permalink
upgrade sample to add writing a BDT #451
Browse files Browse the repository at this point in the history
  • Loading branch information
JoelBender committed Mar 25, 2022
1 parent 7cd1a1a commit 00bd5fe
Showing 1 changed file with 45 additions and 10 deletions.
55 changes: 45 additions & 10 deletions samples/ReadBBMD.py → samples/ReadWriteBBMD.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@
from bacpypes.core import run, enable_sleeping

from bacpypes.pdu import Address
from bacpypes.bvll import ReadBroadcastDistributionTable, \
ReadBroadcastDistributionTableAck, ReadForeignDeviceTable, \
ReadForeignDeviceTableAck, Result
from bacpypes.bvll import (
ReadBroadcastDistributionTable,
ReadBroadcastDistributionTableAck,
ReadForeignDeviceTable,
ReadForeignDeviceTableAck,
WriteBroadcastDistributionTable,
Result,
)
from bacpypes.bvllservice import AnnexJCodec, UDPMultiplexer

# some debugging
Expand All @@ -27,33 +32,63 @@


@bacpypes_debugging
class ReadBBMDConsoleClient(ConsoleCmd, Client):
class ReadWriteBBMDConsoleClient(ConsoleCmd, Client):
def do_readbdt(self, args):
"""readbdt <addr>"""
args = args.split()
if _debug:
ReadBBMDConsoleClient._debug("do_readbdt %r", args)
ReadWriteBBMDConsoleClient._debug("do_readbdt %r", args)

# build a request and send it downstream
read_bdt = ReadBroadcastDistributionTable(destination=Address(args[0]))
if _debug:
ReadWriteBBMDConsoleClient._debug(" - read_bdt: %r", read_bdt)

self.request(read_bdt)

def do_readfdt(self, args):
"""readfdt <addr>"""
args = args.split()
if _debug:
ReadBBMDConsoleClient._debug("do_readfdt %r", args)
ReadWriteBBMDConsoleClient._debug("do_readfdt %r", args)

# build a request and send it downstream
read_fdt = ReadForeignDeviceTable(destination=Address(args[0]))
if _debug:
ReadWriteBBMDConsoleClient._debug(" - read_fdt: %r", read_fdt)

self.request(read_fdt)

def do_writebdt(self, args):
"""writebdt <addr> <entry> ..."""
args = args.split()
if _debug:
ReadWriteBBMDConsoleClient._debug("do_writebdt %r", args)

# build a list of broadcast distribution table entries which just so
# happen to be BACpypes IPv4 addresses
bdt = []
for addr in args[1:]:
bdte = Address(addr)
bdt.append(bdte)

# build a request and send it downstream
write_bdt = WriteBroadcastDistributionTable(
destination=Address(args[0]), bdt=bdt
)
if _debug:
ReadWriteBBMDConsoleClient._debug(" - write_bdt: %r", write_bdt)

self.request(write_bdt)

def confirmation(self, pdu):
"""Filter for the acks and errors."""
if _debug:
ReadBBMDConsoleClient._debug("confirmation %r", pdu)
ReadWriteBBMDConsoleClient._debug("confirmation %r", pdu)

if isinstance(pdu, (ReadBroadcastDistributionTableAck, ReadForeignDeviceTableAck, Result)):
if isinstance(
pdu, (ReadBroadcastDistributionTableAck, ReadForeignDeviceTableAck, Result)
):
pdu.debug_contents()


Expand All @@ -65,7 +100,7 @@ def main():
parser.add_argument(
"local_address",
help="IPv4 address",
)
)
args = parser.parse_args()

if _debug:
Expand All @@ -77,7 +112,7 @@ def main():
_log.debug(" - local_address: %r", local_address)

# make a console
this_console = ReadBBMDConsoleClient()
this_console = ReadWriteBBMDConsoleClient()
if _debug:
_log.debug(" - this_console: %r", this_console)

Expand Down

0 comments on commit 00bd5fe

Please sign in to comment.