From ec01d22eae90df34f81ce17785ee1be8f31059b1 Mon Sep 17 00:00:00 2001 From: Joel Bender Date: Fri, 10 Mar 2017 00:39:35 -0500 Subject: [PATCH] first crack at supporting interface names --- py25/bacpypes/pdu.py | 54 +++++++++++++++++++++++++++++++++++++++++++ py27/bacpypes/pdu.py | 49 +++++++++++++++++++++++++++++++++++++++ py34/bacpypes/pdu.py | 55 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 158 insertions(+) diff --git a/py25/bacpypes/pdu.py b/py25/bacpypes/pdu.py index a25ceeca..4ed3df52 100755 --- a/py25/bacpypes/pdu.py +++ b/py25/bacpypes/pdu.py @@ -8,6 +8,11 @@ import socket import struct +try: + import netifaces +except ImportError: + netifaces = None + from .debugging import ModuleLogger, bacpypes_debugging, btox, xtob from .comm import PCI as _PCI, PDUData @@ -202,6 +207,55 @@ def decode_address(self, addr): self.addrAddr = xtob(addr[2:-1]) self.addrLen = len(self.addrAddr) + elif netifaces and interface_re.match(addr): + if _debug: Address._debug(" - interface name with optional port") + + interface, port = interface_re.match(addr).groups() + if port is not None: + self.addrPort = int(port) + else: + self.addrPort = 47808 + + interfaces = netifaces.interfaces() + if interface not in interfaces: + raise ValueError("not an interface: %s" % (interface,)) + if _debug: Address._debug(" - interfaces: %r", interfaces) + + ifaddresses = netifaces.ifaddresses(interface) + if netifaces.AF_INET not in ifaddresses: + raise ValueError("interface does not support IPv4: %s" % (interface,)) + + ipv4addresses = ifaddresses[netifaces.AF_INET] + if len(ipv4addresses) > 1: + raise ValueError("interface supports multiple IPv4 addresses: %s" % (interface,)) + ifaddress = ipv4addresses[0] + if _debug: Address._debug(" - ifaddress: %r", ifaddress) + + addr = ifaddress['addr'] + self.addrTuple = (addr, self.addrPort) + if _debug: Address._debug(" - addrTuple: %r", self.addrTuple) + + addrstr = socket.inet_aton(addr) + self.addrIP = struct.unpack('!L', addrstr)[0] + + if 'netmask' in ifaddress: + maskstr = socket.inet_aton(ifaddress['netmask']) + self.addrMask = struct.unpack('!L', maskstr)[0] + else: + self.addrMask = _long_mask + + self.addrHost = (self.addrIP & ~self.addrMask) + self.addrSubnet = (self.addrIP & self.addrMask) + + if 'broadcast' in ifaddress: + self.addrBroadcastTuple = (ifaddress['broadcast'], self.addrPort) + else: + self.addrBroadcastTuple = None + if _debug: Address._debug(" - addrBroadcastTuple: %r", self.addrBroadcastTuple) + + self.addrAddr = addrstr + struct.pack('!H', self.addrPort & _short_mask) + self.addrLen = 6 + else: raise ValueError("unrecognized format") diff --git a/py27/bacpypes/pdu.py b/py27/bacpypes/pdu.py index cf427463..55053617 100755 --- a/py27/bacpypes/pdu.py +++ b/py27/bacpypes/pdu.py @@ -8,6 +8,11 @@ import socket import struct +try: + import netifaces +except ImportError: + netifaces = None + from .debugging import ModuleLogger, bacpypes_debugging, btox, xtob from .comm import PCI as _PCI, PDUData @@ -25,6 +30,7 @@ ip_address_mask_port_re = re.compile(r'^(?:(\d+):)?(\d+\.\d+\.\d+\.\d+)(?:/(\d+))?(?::(\d+))?$') ethernet_re = re.compile(r'^([0-9A-Fa-f][0-9A-Fa-f][:]){5}([0-9A-Fa-f][0-9A-Fa-f])$' ) +interface_re = re.compile(r'^(?:([\w]+))(?::(\d+))?$') @bacpypes_debugging class Address: @@ -203,6 +209,49 @@ def decode_address(self, addr): self.addrAddr = xtob(addr[2:-1]) self.addrLen = len(self.addrAddr) + elif netifaces and interface_re.match(addr): + interface, port = interface_re.match(addr).groups() + if port is not None: + self.addrPort = int(port) + else: + self.addrPort = 47808 + + interfaces = netifaces.interfaces() + if interface not in interfaces: + raise ValueError("not an interface: %s" % (interface,)) + + ifaddresses = netifaces.ifaddresses(interface) + if netifaces.AF_INET not in ifaddresses: + raise ValueError("interface does not support IPv4: %s" % (interface,)) + + ipv4addresses = ifaddresses[netifaces.AF_INET] + if len(ipv4addresses) > 1: + raise ValueError("interface supports multiple IPv4 addresses: %s" % (interface,)) + ifaddress = ipv4addresses[0] + + addr = ifaddress['addr'] + self.addrTuple = (addr, self.addrPort) + + addrstr = socket.inet_aton(addr) + self.addrIP = struct.unpack('!L', addrstr)[0] + + if 'netmask' in ifaddress: + maskstr = socket.inet_aton(ifaddress['netmask']) + self.addrMask = struct.unpack('!L', maskstr)[0] + else: + self.addrMask = _long_mask + + self.addrHost = (self.addrIP & ~self.addrMask) + self.addrSubnet = (self.addrIP & self.addrMask) + + if 'broadcast' in ifaddress: + self.addrBroadcastTuple = (ifaddress['broadcast'], self.addrPort) + else: + self.addrBroadcastTuple = None + + self.addrAddr = addrstr + struct.pack('!H', self.addrPort & _short_mask) + self.addrLen = 6 + else: raise ValueError("unrecognized format") diff --git a/py34/bacpypes/pdu.py b/py34/bacpypes/pdu.py index 152519ef..0454e69c 100755 --- a/py34/bacpypes/pdu.py +++ b/py34/bacpypes/pdu.py @@ -8,6 +8,11 @@ import socket import struct +try: + import netifaces +except ImportError: + netifaces = None + from .debugging import ModuleLogger, bacpypes_debugging, btox, xtob from .comm import PCI as _PCI, PDUData @@ -25,6 +30,7 @@ ip_address_mask_port_re = re.compile(r'^(?:(\d+):)?(\d+\.\d+\.\d+\.\d+)(?:/(\d+))?(?::(\d+))?$') ethernet_re = re.compile(r'^([0-9A-Fa-f][0-9A-Fa-f][:]){5}([0-9A-Fa-f][0-9A-Fa-f])$' ) +interface_re = re.compile(r'^(?:([\w]+))(?::(\d+))?$') @bacpypes_debugging class Address: @@ -219,6 +225,55 @@ def decode_address(self, addr): self.addrAddr = xtob(addr[2:-1]) self.addrLen = len(self.addrAddr) + elif netifaces and interface_re.match(addr): + if _debug: Address._debug(" - interface name with optional port") + + interface, port = interface_re.match(addr).groups() + if port is not None: + self.addrPort = int(port) + else: + self.addrPort = 47808 + + interfaces = netifaces.interfaces() + if interface not in interfaces: + raise ValueError("not an interface: %s" % (interface,)) + if _debug: Address._debug(" - interfaces: %r", interfaces) + + ifaddresses = netifaces.ifaddresses(interface) + if netifaces.AF_INET not in ifaddresses: + raise ValueError("interface does not support IPv4: %s" % (interface,)) + + ipv4addresses = ifaddresses[netifaces.AF_INET] + if len(ipv4addresses) > 1: + raise ValueError("interface supports multiple IPv4 addresses: %s" % (interface,)) + ifaddress = ipv4addresses[0] + if _debug: Address._debug(" - ifaddress: %r", ifaddress) + + addr = ifaddress['addr'] + self.addrTuple = (addr, self.addrPort) + if _debug: Address._debug(" - addrTuple: %r", self.addrTuple) + + addrstr = socket.inet_aton(addr) + self.addrIP = struct.unpack('!L', addrstr)[0] + + if 'netmask' in ifaddress: + maskstr = socket.inet_aton(ifaddress['netmask']) + self.addrMask = struct.unpack('!L', maskstr)[0] + else: + self.addrMask = _long_mask + + self.addrHost = (self.addrIP & ~self.addrMask) + self.addrSubnet = (self.addrIP & self.addrMask) + + if 'broadcast' in ifaddress: + self.addrBroadcastTuple = (ifaddress['broadcast'], self.addrPort) + else: + self.addrBroadcastTuple = None + if _debug: Address._debug(" - addrBroadcastTuple: %r", self.addrBroadcastTuple) + + self.addrAddr = addrstr + struct.pack('!H', self.addrPort & _short_mask) + self.addrLen = 6 + else: raise ValueError("unrecognized format")