From 2e28d3af4d58a7d82e7de85c9329652ceb93af9e Mon Sep 17 00:00:00 2001 From: Joel Bender Date: Wed, 26 Apr 2017 11:45:54 -0400 Subject: [PATCH] subtle changes to connection logic and renamed timeout pieces --- py27/bacpypes/tcp.py | 142 ++++++++++++++++++++++++++++--------------- samples/TCPClient.py | 29 +++++++-- samples/TCPServer.py | 8 ++- 3 files changed, 123 insertions(+), 56 deletions(-) mode change 100755 => 100644 py27/bacpypes/tcp.py mode change 100755 => 100644 samples/TCPClient.py diff --git a/py27/bacpypes/tcp.py b/py27/bacpypes/tcp.py old mode 100755 new mode 100644 index 6131916b..39442a1d --- a/py27/bacpypes/tcp.py +++ b/py27/bacpypes/tcp.py @@ -6,6 +6,7 @@ import asyncore import socket +import errno import cPickle as pickle from time import time as _time, sleep as _sleep @@ -92,6 +93,8 @@ def response(self, pdu): @bacpypes_debugging class TCPClient(asyncore.dispatcher): + _connect_timeout = None + def __init__(self, peer): if _debug: TCPClient._debug("__init__ %r", peer) asyncore.dispatcher.__init__(self) @@ -99,16 +102,31 @@ def __init__(self, peer): # ask the dispatcher for a socket self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + # set the timeout + if self._connect_timeout is not None: + self.settimeout(self._connect_timeout) + if _debug: TCPClient._debug(" - timeout: %r", self._connect_timeout) + # save the peer self.peer = peer + self.connected = False # create a request buffer self.request = b'' # try to connect try: - if _debug: TCPClient._debug(" - initiate connection") - self.connect(peer) + rslt = self.connect_ex(peer) + if (rslt == 0): + if _debug: TCPClient._debug(" - connected") + self.connected = True + if (rslt == errno.EINPROGRESS): + if _debug: TCPClient._debug(" - in progress") + elif (rslt in (errno.ECONNREFUSED, 111)): + if _debug: TCPClient._debug(" - connection refused") + self.handle_error(rslt) + else: + if _debug: TCPClient._debug(" - connect_ex: %r", rslt) except socket.error as err: if _debug: TCPClient._debug(" - connect socket error: %r", err) @@ -131,9 +149,10 @@ def handle_connect_event(self): # check for connection refused if (err == 0): if _debug: TCPClient._debug(" - no error") - elif (err == 111): + self.connected = True + elif (err in (errno.ECONNREFUSED, 111)): if _debug: TCPClient._debug(" - connection to %r refused", self.peer) - self.handle_error(socket.error(111, "connection refused")) + self.handle_error(socket.error(errno.ECONNREFUSED, "connection refused")) return # pass along @@ -157,7 +176,7 @@ def handle_read(self): deferred(self.response, PDU(msg)) except socket.error as err: - if (err.args[0] in (61, 111)): + if (err.args[0] in (errno.ECONNREFUSED, 111)): if _debug: TCPClient._debug(" - connection to %r refused", self.peer) else: if _debug: TCPClient._debug(" - recv socket error: %r", err) @@ -181,7 +200,7 @@ def handle_write(self): if (err.args[0] == 32): if _debug: TCPClient._debug(" - broken pipe to %r", self.peer) return - elif (err.args[0] in (61, 111)): + elif (err.args[0] in (errno.ECONNREFUSED, 111)): if _debug: TCPClient._debug(" - connection to %r refused", self.peer) else: if _debug: TCPClient._debug(" - send socket error: %s", err) @@ -197,7 +216,7 @@ def handle_write_event(self): if _debug: TCPClient._debug(" - err: %r", err) # check for connection refused - if (err in (61, 111)): + if (err in (errno.ECONNREFUSED, 111)): if _debug: TCPClient._debug(" - connection to %r refused", self.peer) self.handle_error(socket.error(err, "connection refused")) self.handle_close() @@ -247,32 +266,51 @@ class TCPClientActor(TCPClient): def __init__(self, director, peer): if _debug: TCPClientActor._debug("__init__ %r %r", director, peer) + + # no director yet, no connection error + self.director = None + self._connection_error = None + + # pass along the connect timeout from the director + if director.connect_timeout is not None: + self._connect_timeout = director.connect_timeout + + # continue with initialization TCPClient.__init__(self, peer) # keep track of the director self.director = director # add a timer - self.timeout = director.timeout - if self.timeout > 0: - self.timer = FunctionTask(self.idle_timeout) - self.timer.install_task(_time() + self.timeout) + self._idle_timeout = director.idle_timeout + if self._idle_timeout > 0: + self.idle_timeout_task = FunctionTask(self.idle_timeout) + self.idle_timeout_task.install_task(_time() + self._idle_timeout) else: - self.timer = None + self.idle_timeout_task = None # this may have a flush state - self.flushTask = None + self.flush_task = None # tell the director this is a new actor self.director.add_actor(self) + # if there was a connection error, pass it to the director + if self._connection_error: + if _debug: TCPClientActor._debug(" - had connection error") + self.director.actor_error(self, self._connection_error) + def handle_error(self, error=None): """Trap for TCPClient errors, otherwise continue.""" if _debug: TCPClientActor._debug("handle_error %r", error) # pass along to the director if error is not None: - self.director.actor_error(self, error) + # this error may be during startup + if not self.director: + self._connection_error = error + else: + self.director.actor_error(self, error) else: TCPClient.handle_error(self) @@ -280,12 +318,12 @@ def handle_close(self): if _debug: TCPClientActor._debug("handle_close") # if there's a flush task, cancel it - if self.flushTask: - self.flushTask.suspend_task() + if self.flush_task: + self.flush_task.suspend_task() # cancel the timer - if self.timer: - self.timer.suspend_task() + if self.idle_timeout_task: + self.idle_timeout_task.suspend_task() # tell the director this is gone self.director.del_actor(self) @@ -303,13 +341,13 @@ def indication(self, pdu): if _debug: TCPClientActor._debug("indication %r", pdu) # additional downstream data is tossed while flushing - if self.flushTask: + if self.flush_task: if _debug: TCPServerActor._debug(" - flushing") return # reschedule the timer - if self.timer: - self.timer.install_task(_time() + self.timeout) + if self.idle_timeout_task: + self.idle_timeout_task.install_task(_time() + self._idle_timeout) # continue as usual TCPClient.indication(self, pdu) @@ -321,8 +359,8 @@ def response(self, pdu): pdu.pduSource = self.peer # reschedule the timer - if self.timer: - self.timer.install_task(_time() + self.timeout) + if self.idle_timeout_task: + self.idle_timeout_task.install_task(_time() + self._idle_timeout) # process this as a response from the director self.director.response(pdu) @@ -331,11 +369,11 @@ def flush(self): if _debug: TCPClientActor._debug("flush") # clear out the old task - self.flushTask = None + self.flush_task = None # if the outgoing buffer has data, re-schedule another attempt if self.request: - self.flushTask = OneShotFunction(self.flush) + self.flush_task = OneShotFunction(self.flush) return # close up shop, all done @@ -361,10 +399,13 @@ class TCPPickleClientActor(PickleActorMixIn, TCPClientActor): @bacpypes_debugging class TCPClientDirector(Server, ServiceAccessPoint, DebugContents): - _debug_contents = ('timeout', 'actorClass', 'clients', 'reconnect') + _debug_contents = ('connect_timeout', 'idle_timeout', 'actorClass', 'clients', 'reconnect') - def __init__(self, timeout=0, actorClass=TCPClientActor, sid=None, sapID=None): - if _debug: TCPClientDirector._debug("__init__ timeout=%r actorClass=%r sid=%r sapID=%r", timeout, actorClass, sid, sapID) + def __init__(self, connect_timeout=None, idle_timeout=0, actorClass=TCPClientActor, sid=None, sapID=None): + if _debug: + TCPClientDirector._debug("__init__ connect_timeout=%r idle_timeout=%r actorClass=%r sid=%r sapID=%r", + connect_timeout, idle_timeout, actorClass, sid, sapID, + ) Server.__init__(self, sid) ServiceAccessPoint.__init__(self, sapID) @@ -374,7 +415,8 @@ def __init__(self, timeout=0, actorClass=TCPClientActor, sid=None, sapID=None): self.actorClass = actorClass # save the timeout for actors - self.timeout = timeout + self.connect_timeout = connect_timeout + self.idle_timeout = idle_timeout # start with an empty client pool self.clients = {} @@ -567,15 +609,15 @@ def __init__(self, director, sock, peer): self.director = director # add a timer - self.timeout = director.timeout - if self.timeout > 0: - self.timer = FunctionTask(self.idle_timeout) - self.timer.install_task(_time() + self.timeout) + self._idle_timeout = director.idle_timeout + if self._idle_timeout > 0: + self.idle_timeout_task = FunctionTask(self.idle_timeout) + self.idle_timeout_task.install_task(_time() + self._idle_timeout) else: - self.timer = None + self.idle_timeout_task = None # this may have a flush state - self.flushTask = None + self.flush_task = None # tell the director this is a new actor self.director.add_actor(self) @@ -594,8 +636,8 @@ def handle_close(self): if _debug: TCPServerActor._debug("handle_close") # if there's a flush task, cancel it - if self.flushTask: - self.flushTask.suspend_task() + if self.flush_task: + self.flush_task.suspend_task() # tell the director this is gone self.director.del_actor(self) @@ -613,13 +655,13 @@ def indication(self, pdu): if _debug: TCPServerActor._debug("indication %r", pdu) # additional downstream data is tossed while flushing - if self.flushTask: + if self.flush_task: if _debug: TCPServerActor._debug(" - flushing") return # reschedule the timer - if self.timer: - self.timer.install_task(_time() + self.timeout) + if self.idle_timeout_task: + self.idle_timeout_task.install_task(_time() + self._idle_timeout) # continue as usual TCPServer.indication(self, pdu) @@ -628,7 +670,7 @@ def response(self, pdu): if _debug: TCPServerActor._debug("response %r", pdu) # upstream data is tossed while flushing - if self.flushTask: + if self.flush_task: if _debug: TCPServerActor._debug(" - flushing") return @@ -636,8 +678,8 @@ def response(self, pdu): pdu.pduSource = self.peer # reschedule the timer - if self.timer: - self.timer.install_task(_time() + self.timeout) + if self.idle_timeout_task: + self.idle_timeout_task.install_task(_time() + self._idle_timeout) # process this as a response from the director self.director.response(pdu) @@ -646,11 +688,11 @@ def flush(self): if _debug: TCPServerActor._debug("flush") # clear out the old task - self.flushTask = None + self.flush_task = None # if the outgoing buffer has data, re-schedule another attempt if self.request: - self.flushTask = OneShotFunction(self.flush) + self.flush_task = OneShotFunction(self.flush) return # close up shop, all done @@ -670,19 +712,19 @@ class TCPPickleServerActor(PickleActorMixIn, TCPServerActor): @bacpypes_debugging class TCPServerDirector(asyncore.dispatcher, Server, ServiceAccessPoint, DebugContents): - _debug_contents = ('port', 'timeout', 'actorClass', 'servers') + _debug_contents = ('port', 'idle_timeout', 'actorClass', 'servers') - def __init__(self, address, listeners=5, timeout=0, reuse=False, actorClass=TCPServerActor, cid=None, sapID=None): + def __init__(self, address, listeners=5, idle_timeout=0, reuse=False, actorClass=TCPServerActor, cid=None, sapID=None): if _debug: - TCPServerDirector._debug("__init__ %r listeners=%r timeout=%r reuse=%r actorClass=%r cid=%r sapID=%r" - , address, listeners, timeout, reuse, actorClass, cid, sapID + TCPServerDirector._debug("__init__ %r listeners=%r idle_timeout=%r reuse=%r actorClass=%r cid=%r sapID=%r" + , address, listeners, idle_timeout, reuse, actorClass, cid, sapID ) Server.__init__(self, cid) ServiceAccessPoint.__init__(self, sapID) # save the address and timeout self.port = address - self.timeout = timeout + self.idle_timeout = idle_timeout # check the actor class if not issubclass(actorClass, TCPServerActor): diff --git a/samples/TCPClient.py b/samples/TCPClient.py old mode 100755 new mode 100644 index b86355d8..83b5e5bf --- a/samples/TCPClient.py +++ b/samples/TCPClient.py @@ -25,8 +25,11 @@ # settings SERVER_HOST = os.getenv('SERVER_HOST', '127.0.0.1') SERVER_PORT = int(os.getenv('SERVER_PORT', 9000)) +CONNECT_TIMEOUT = int(os.getenv('CONNECT_TIMEOUT', 0)) or None +IDLE_TIMEOUT = int(os.getenv('IDLE_TIMEOUT', 0)) or None # globals +args = None server_address = None # @@ -47,7 +50,8 @@ def indication(self, pdu): # no data means EOF, stop if not pdu.pduData: - stop() + # ask the director (downstream peer) to close the connection + self.clientPeer.disconnect(server_address) return # pass it along @@ -86,10 +90,12 @@ def indication(self, add_actor=None, del_actor=None, actor_error=None, error=Non if actor_error: if _debug: MiddleManASE._debug("indication actor_error=%r error=%r", actor_error, error) + # tell the director to close + self.elementService.disconnect(actor_error.peer) # if there are no clients, quit if not self.elementService.clients: - if _debug: MiddleManASE._debug(" - quitting") + if _debug: MiddleManASE._debug(" - no clients, stopping") stop() bacpypes_debugging(MiddleManASE) @@ -102,7 +108,7 @@ def main(): """ Main function, called when run as an application. """ - global server_address + global args, server_address # parse the command line arguments parser = ArgumentParser(description=__doc__) @@ -121,6 +127,16 @@ def main(): default=False, help="send a hello message", ) + parser.add_argument( + "--connect-timeout", nargs='?', type=int, + help="idle connection timeout", + default=CONNECT_TIMEOUT, + ) + parser.add_argument( + "--idle-timeout", nargs='?', type=int, + help="idle connection timeout", + default=IDLE_TIMEOUT, + ) args = parser.parse_args() if _debug: _log.debug("initialization") @@ -139,7 +155,10 @@ def main(): this_middle_man = MiddleMan() if _debug: _log.debug(" - this_middle_man: %r", this_middle_man) - this_director = TCPClientDirector() + this_director = TCPClientDirector( + connect_timeout=args.connect_timeout, + idle_timeout=args.idle_timeout, + ) if _debug: _log.debug(" - this_director: %r", this_director) bind(this_console, this_middle_man, this_director) @@ -154,7 +173,7 @@ def main(): # send hello maybe if args.hello: - deferred(this_middle_man.indication, PDU(xtob('68656c6c6f0a'))) + deferred(this_middle_man.indication, PDU(b'Hello, world!\n')) if _debug: _log.debug("running") diff --git a/samples/TCPServer.py b/samples/TCPServer.py index baa469a5..4164d0ad 100755 --- a/samples/TCPServer.py +++ b/samples/TCPServer.py @@ -23,6 +23,7 @@ # settings SERVER_HOST = os.getenv('SERVER_HOST', 'any') SERVER_PORT = int(os.getenv('SERVER_PORT', 9000)) +IDLE_TIMEOUT = int(os.getenv('IDLE_TIMEOUT', 0)) or None # # EchoMaster @@ -78,6 +79,11 @@ def main(): help="server port (default %r)" % (SERVER_PORT,), default=SERVER_PORT, ) + parser.add_argument( + "--idle-timeout", nargs='?', type=int, + help="idle connection timeout", + default=IDLE_TIMEOUT, + ) args = parser.parse_args() if _debug: _log.debug("initialization") @@ -91,7 +97,7 @@ def main(): if _debug: _log.debug(" - server_address: %r", server_address) # create a director listening to the address - this_director = TCPServerDirector(server_address) + this_director = TCPServerDirector(server_address, idle_timeout=args.idle_timeout) if _debug: _log.debug(" - this_director: %r", this_director) # create an echo