Skip to content

Commit

Permalink
first crack at supporting interface names
Browse files Browse the repository at this point in the history
  • Loading branch information
JoelBender committed Mar 10, 2017
1 parent 665ee4b commit ec01d22
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 0 deletions.
54 changes: 54 additions & 0 deletions py25/bacpypes/pdu.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
import socket
import struct

try:
import netifaces
except ImportError:
netifaces = None

from .debugging import ModuleLogger, bacpypes_debugging, btox, xtob
from .comm import PCI as _PCI, PDUData

Expand Down Expand Up @@ -202,6 +207,55 @@ def decode_address(self, addr):
self.addrAddr = xtob(addr[2:-1])
self.addrLen = len(self.addrAddr)

elif netifaces and interface_re.match(addr):
if _debug: Address._debug(" - interface name with optional port")

interface, port = interface_re.match(addr).groups()
if port is not None:
self.addrPort = int(port)
else:
self.addrPort = 47808

interfaces = netifaces.interfaces()
if interface not in interfaces:
raise ValueError("not an interface: %s" % (interface,))
if _debug: Address._debug(" - interfaces: %r", interfaces)

ifaddresses = netifaces.ifaddresses(interface)
if netifaces.AF_INET not in ifaddresses:
raise ValueError("interface does not support IPv4: %s" % (interface,))

ipv4addresses = ifaddresses[netifaces.AF_INET]
if len(ipv4addresses) > 1:
raise ValueError("interface supports multiple IPv4 addresses: %s" % (interface,))
ifaddress = ipv4addresses[0]
if _debug: Address._debug(" - ifaddress: %r", ifaddress)

addr = ifaddress['addr']
self.addrTuple = (addr, self.addrPort)
if _debug: Address._debug(" - addrTuple: %r", self.addrTuple)

addrstr = socket.inet_aton(addr)
self.addrIP = struct.unpack('!L', addrstr)[0]

if 'netmask' in ifaddress:
maskstr = socket.inet_aton(ifaddress['netmask'])
self.addrMask = struct.unpack('!L', maskstr)[0]
else:
self.addrMask = _long_mask

self.addrHost = (self.addrIP & ~self.addrMask)
self.addrSubnet = (self.addrIP & self.addrMask)

if 'broadcast' in ifaddress:
self.addrBroadcastTuple = (ifaddress['broadcast'], self.addrPort)
else:
self.addrBroadcastTuple = None
if _debug: Address._debug(" - addrBroadcastTuple: %r", self.addrBroadcastTuple)

self.addrAddr = addrstr + struct.pack('!H', self.addrPort & _short_mask)
self.addrLen = 6

else:
raise ValueError("unrecognized format")

Expand Down
49 changes: 49 additions & 0 deletions py27/bacpypes/pdu.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
import socket
import struct

try:
import netifaces
except ImportError:
netifaces = None

from .debugging import ModuleLogger, bacpypes_debugging, btox, xtob
from .comm import PCI as _PCI, PDUData

Expand All @@ -25,6 +30,7 @@

ip_address_mask_port_re = re.compile(r'^(?:(\d+):)?(\d+\.\d+\.\d+\.\d+)(?:/(\d+))?(?::(\d+))?$')
ethernet_re = re.compile(r'^([0-9A-Fa-f][0-9A-Fa-f][:]){5}([0-9A-Fa-f][0-9A-Fa-f])$' )
interface_re = re.compile(r'^(?:([\w]+))(?::(\d+))?$')

@bacpypes_debugging
class Address:
Expand Down Expand Up @@ -203,6 +209,49 @@ def decode_address(self, addr):
self.addrAddr = xtob(addr[2:-1])
self.addrLen = len(self.addrAddr)

elif netifaces and interface_re.match(addr):
interface, port = interface_re.match(addr).groups()
if port is not None:
self.addrPort = int(port)
else:
self.addrPort = 47808

interfaces = netifaces.interfaces()
if interface not in interfaces:
raise ValueError("not an interface: %s" % (interface,))

ifaddresses = netifaces.ifaddresses(interface)
if netifaces.AF_INET not in ifaddresses:
raise ValueError("interface does not support IPv4: %s" % (interface,))

ipv4addresses = ifaddresses[netifaces.AF_INET]
if len(ipv4addresses) > 1:
raise ValueError("interface supports multiple IPv4 addresses: %s" % (interface,))
ifaddress = ipv4addresses[0]

addr = ifaddress['addr']
self.addrTuple = (addr, self.addrPort)

addrstr = socket.inet_aton(addr)
self.addrIP = struct.unpack('!L', addrstr)[0]

if 'netmask' in ifaddress:
maskstr = socket.inet_aton(ifaddress['netmask'])
self.addrMask = struct.unpack('!L', maskstr)[0]
else:
self.addrMask = _long_mask

self.addrHost = (self.addrIP & ~self.addrMask)
self.addrSubnet = (self.addrIP & self.addrMask)

if 'broadcast' in ifaddress:
self.addrBroadcastTuple = (ifaddress['broadcast'], self.addrPort)
else:
self.addrBroadcastTuple = None

self.addrAddr = addrstr + struct.pack('!H', self.addrPort & _short_mask)
self.addrLen = 6

else:
raise ValueError("unrecognized format")

Expand Down
55 changes: 55 additions & 0 deletions py34/bacpypes/pdu.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
import socket
import struct

try:
import netifaces
except ImportError:
netifaces = None

from .debugging import ModuleLogger, bacpypes_debugging, btox, xtob
from .comm import PCI as _PCI, PDUData

Expand All @@ -25,6 +30,7 @@

ip_address_mask_port_re = re.compile(r'^(?:(\d+):)?(\d+\.\d+\.\d+\.\d+)(?:/(\d+))?(?::(\d+))?$')
ethernet_re = re.compile(r'^([0-9A-Fa-f][0-9A-Fa-f][:]){5}([0-9A-Fa-f][0-9A-Fa-f])$' )
interface_re = re.compile(r'^(?:([\w]+))(?::(\d+))?$')

@bacpypes_debugging
class Address:
Expand Down Expand Up @@ -219,6 +225,55 @@ def decode_address(self, addr):
self.addrAddr = xtob(addr[2:-1])
self.addrLen = len(self.addrAddr)

elif netifaces and interface_re.match(addr):
if _debug: Address._debug(" - interface name with optional port")

interface, port = interface_re.match(addr).groups()
if port is not None:
self.addrPort = int(port)
else:
self.addrPort = 47808

interfaces = netifaces.interfaces()
if interface not in interfaces:
raise ValueError("not an interface: %s" % (interface,))
if _debug: Address._debug(" - interfaces: %r", interfaces)

ifaddresses = netifaces.ifaddresses(interface)
if netifaces.AF_INET not in ifaddresses:
raise ValueError("interface does not support IPv4: %s" % (interface,))

ipv4addresses = ifaddresses[netifaces.AF_INET]
if len(ipv4addresses) > 1:
raise ValueError("interface supports multiple IPv4 addresses: %s" % (interface,))
ifaddress = ipv4addresses[0]
if _debug: Address._debug(" - ifaddress: %r", ifaddress)

addr = ifaddress['addr']
self.addrTuple = (addr, self.addrPort)
if _debug: Address._debug(" - addrTuple: %r", self.addrTuple)

addrstr = socket.inet_aton(addr)
self.addrIP = struct.unpack('!L', addrstr)[0]

if 'netmask' in ifaddress:
maskstr = socket.inet_aton(ifaddress['netmask'])
self.addrMask = struct.unpack('!L', maskstr)[0]
else:
self.addrMask = _long_mask

self.addrHost = (self.addrIP & ~self.addrMask)
self.addrSubnet = (self.addrIP & self.addrMask)

if 'broadcast' in ifaddress:
self.addrBroadcastTuple = (ifaddress['broadcast'], self.addrPort)
else:
self.addrBroadcastTuple = None
if _debug: Address._debug(" - addrBroadcastTuple: %r", self.addrBroadcastTuple)

self.addrAddr = addrstr + struct.pack('!H', self.addrPort & _short_mask)
self.addrLen = 6

else:
raise ValueError("unrecognized format")

Expand Down

0 comments on commit ec01d22

Please sign in to comment.