diff --git a/py34/bacpypes/tcp.py b/py34/bacpypes/tcp.py index 226c5857..3f0654d9 100755 --- a/py34/bacpypes/tcp.py +++ b/py34/bacpypes/tcp.py @@ -6,6 +6,8 @@ import asyncore import socket +import errno + import pickle from time import time as _time, sleep as _sleep from io import StringIO @@ -23,6 +25,7 @@ # globals REBIND_SLEEP_INTERVAL = 2.0 +CONNECT_TIMEOUT = 30.0 # # PickleActorMixIn @@ -91,6 +94,8 @@ def response(self, pdu): @bacpypes_debugging class TCPClient(asyncore.dispatcher): + _connect_timeout = CONNECT_TIMEOUT + def __init__(self, peer): if _debug: TCPClient._debug("__init__ %r", peer) asyncore.dispatcher.__init__(self) @@ -98,22 +103,39 @@ def __init__(self, peer): # ask the dispatcher for a socket self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + # set the timeout + self.socket.settimeout(self._connect_timeout) + if _debug: TCPClient._debug(" - timeout: %r", self._connect_timeout) + # save the peer self.peer = peer + self.connected = False # create a request buffer self.request = b'' # try to connect try: - if _debug: TCPClient._debug(" - initiate connection") - self.connect(peer) + rslt = self.socket.connect_ex(peer) + if (rslt == 0): + if _debug: TCPClient._debug(" - connected") + self.connected = True + elif (rslt == errno.EINPROGRESS): + if _debug: TCPClient._debug(" - in progress") + elif (rslt in (errno.ECONNREFUSED, 111)): + if _debug: TCPClient._debug(" - connection refused") + self.handle_error(rslt) + else: + if _debug: TCPClient._debug(" - connect_ex: %r", rslt) except socket.error as err: if _debug: TCPClient._debug(" - connect socket error: %r", err) # pass along to a handler self.handle_error(err) + def handle_accept(self): + if _debug: TCPClient._debug("handle_accept") + def handle_connect(self): if _debug: TCPClient._debug("handle_connect") @@ -127,9 +149,10 @@ def handle_connect_event(self): # check for connection refused if (err == 0): if _debug: TCPClient._debug(" - no error") - elif (err == 111): + self.connected = True + elif (err in (errno.ECONNREFUSED, 111)): if _debug: TCPClient._debug(" - connection to %r refused", self.peer) - self.handle_error(socket.error(111, "connection refused")) + self.handle_error(socket.error(errno.ECONNREFUSED, "connection refused")) return # pass along @@ -149,11 +172,11 @@ def handle_read(self): if not self.socket: if _debug: TCPClient._debug(" - socket was closed") else: - # sent the data upstream + # send the data upstream deferred(self.response, PDU(msg)) except socket.error as err: - if (err.args[0] in (61, 111)): + if (err.args[0] in (errno.ECONNREFUSED, 111)): if _debug: TCPClient._debug(" - connection to %r refused", self.peer) else: if _debug: TCPClient._debug(" - recv socket error: %r", err) @@ -177,7 +200,7 @@ def handle_write(self): if (err.args[0] == 32): if _debug: TCPClient._debug(" - broken pipe to %r", self.peer) return - elif (err.args[0] in (61, 111)): + elif (err.args[0] in (errno.ECONNREFUSED, 111)): if _debug: TCPClient._debug(" - connection to %r refused", self.peer) else: if _debug: TCPClient._debug(" - send socket error: %s", err) @@ -193,7 +216,11 @@ def handle_write_event(self): if _debug: TCPClient._debug(" - err: %r", err) # check for connection refused - if (err in (61, 111)): + if err == 0: + if not self.connected: + if _debug: TCPClient._debug(" - connected") + self.connected = True + elif (err in (errno.ECONNREFUSED, 111)): if _debug: TCPClient._debug(" - connection to %r refused", self.peer) self.handle_error(socket.error(err, "connection refused")) self.handle_close() @@ -206,7 +233,7 @@ def handle_close(self): if _debug: TCPClient._debug("handle_close") # close the socket - self.close() + self.socket.close() # make sure other routines know the socket is closed self.socket = None @@ -215,6 +242,11 @@ def handle_error(self, error=None): """Trap for TCPClient errors, otherwise continue.""" if _debug: TCPClient._debug("handle_error %r", error) + # if there is no socket, it was closed + if not self.socket: + if _debug: TCPClient._debug(" - error already handled") + return + # core does not take parameters asyncore.dispatcher.handle_error(self) @@ -236,32 +268,51 @@ class TCPClientActor(TCPClient): def __init__(self, director, peer): if _debug: TCPClientActor._debug("__init__ %r %r", director, peer) + + # no director yet, no connection error + self.director = None + self._connection_error = None + + # pass along the connect timeout from the director + if director.connect_timeout is not None: + self._connect_timeout = director.connect_timeout + + # continue with initialization TCPClient.__init__(self, peer) # keep track of the director self.director = director # add a timer - self.timeout = director.timeout - if self.timeout > 0: - self.timer = FunctionTask(self.idle_timeout) - self.timer.install_task(_time() + self.timeout) + self._idle_timeout = director.idle_timeout + if self._idle_timeout: + self.idle_timeout_task = FunctionTask(self.idle_timeout) + self.idle_timeout_task.install_task(_time() + self._idle_timeout) else: - self.timer = None + self.idle_timeout_task = None # this may have a flush state - self.flushTask = None + self.flush_task = None # tell the director this is a new actor self.director.add_actor(self) + # if there was a connection error, pass it to the director + if self._connection_error: + if _debug: TCPClientActor._debug(" - had connection error") + self.director.actor_error(self, self._connection_error) + def handle_error(self, error=None): """Trap for TCPClient errors, otherwise continue.""" if _debug: TCPClientActor._debug("handle_error %r", error) # pass along to the director if error is not None: - self.director.actor_error(self, error) + # this error may be during startup + if not self.director: + self._connection_error = error + else: + self.director.actor_error(self, error) else: TCPClient.handle_error(self) @@ -269,12 +320,12 @@ def handle_close(self): if _debug: TCPClientActor._debug("handle_close") # if there's a flush task, cancel it - if self.flushTask: - self.flushTask.suspend_task() + if self.flush_task: + self.flush_task.suspend_task() # cancel the timer - if self.timer: - self.timer.suspend_task() + if self.idle_timeout_task: + self.idle_timeout_task.suspend_task() # tell the director this is gone self.director.del_actor(self) @@ -292,13 +343,13 @@ def indication(self, pdu): if _debug: TCPClientActor._debug("indication %r", pdu) # additional downstream data is tossed while flushing - if self.flushTask: + if self.flush_task: if _debug: TCPServerActor._debug(" - flushing") return # reschedule the timer - if self.timer: - self.timer.install_task(_time() + self.timeout) + if self.idle_timeout_task: + self.idle_timeout_task.install_task(_time() + self._idle_timeout) # continue as usual TCPClient.indication(self, pdu) @@ -310,8 +361,8 @@ def response(self, pdu): pdu.pduSource = self.peer # reschedule the timer - if self.timer: - self.timer.install_task(_time() + self.timeout) + if self.idle_timeout_task: + self.idle_timeout_task.install_task(_time() + self._idle_timeout) # process this as a response from the director self.director.response(pdu) @@ -320,11 +371,11 @@ def flush(self): if _debug: TCPClientActor._debug("flush") # clear out the old task - self.flushTask = None + self.flush_task = None # if the outgoing buffer has data, re-schedule another attempt if self.request: - self.flushTask = OneShotFunction(self.flush) + self.flush_task = OneShotFunction(self.flush) return # close up shop, all done @@ -350,10 +401,13 @@ class TCPPickleClientActor(PickleActorMixIn, TCPClientActor): @bacpypes_debugging class TCPClientDirector(Server, ServiceAccessPoint, DebugContents): - _debug_contents = ('timeout', 'actorClass', 'clients', 'reconnect') + _debug_contents = ('connect_timeout', 'idle_timeout', 'actorClass', 'clients', 'reconnect') - def __init__(self, timeout=0, actorClass=TCPClientActor, sid=None, sapID=None): - if _debug: TCPClientDirector._debug("__init__ timeout=%r actorClass=%r sid=%r sapID=%r", timeout, actorClass, sid, sapID) + def __init__(self, connect_timeout=None, idle_timeout=None, actorClass=TCPClientActor, sid=None, sapID=None): + if _debug: + TCPClientDirector._debug("__init__ connect_timeout=%r idle_timeout=%r actorClass=%r sid=%r sapID=%r", + connect_timeout, idle_timeout, actorClass, sid, sapID, + ) Server.__init__(self, sid) ServiceAccessPoint.__init__(self, sapID) @@ -363,7 +417,8 @@ def __init__(self, timeout=0, actorClass=TCPClientActor, sid=None, sapID=None): self.actorClass = actorClass # save the timeout for actors - self.timeout = timeout + self.connect_timeout = connect_timeout + self.idle_timeout = idle_timeout # start with an empty client pool self.clients = {} @@ -469,7 +524,7 @@ def handle_connect(self): if _debug: TCPServer._debug("handle_connect") def readable(self): - return 1 + return self.connected def handle_read(self): if _debug: TCPServer._debug("handle_read") @@ -512,17 +567,17 @@ def handle_write(self): else: if _debug: TCPServer._debug(" - send socket error: %s", err) - # pass along to a handler + # sent the exception upstream self.handle_error(err) def handle_close(self): if _debug: TCPServer._debug("handle_close") if not self: - if _debug: TCPServer._warning("handle_close: self is None") + if _debug: TCPServer._debug(" - self is None") return if not self.socket: - if _debug: TCPServer._warning("handle_close: socket already closed") + if _debug: TCPServer._debug(" - socket already closed") return self.close() @@ -556,15 +611,15 @@ def __init__(self, director, sock, peer): self.director = director # add a timer - self.timeout = director.timeout - if self.timeout > 0: - self.timer = FunctionTask(self.idle_timeout) - self.timer.install_task(_time() + self.timeout) + self._idle_timeout = director.idle_timeout + if self._idle_timeout: + self.idle_timeout_task = FunctionTask(self.idle_timeout) + self.idle_timeout_task.install_task(_time() + self._idle_timeout) else: - self.timer = None + self.idle_timeout_task = None # this may have a flush state - self.flushTask = None + self.flush_task = None # tell the director this is a new actor self.director.add_actor(self) @@ -583,8 +638,8 @@ def handle_close(self): if _debug: TCPServerActor._debug("handle_close") # if there's a flush task, cancel it - if self.flushTask: - self.flushTask.suspend_task() + if self.flush_task: + self.flush_task.suspend_task() # tell the director this is gone self.director.del_actor(self) @@ -602,13 +657,13 @@ def indication(self, pdu): if _debug: TCPServerActor._debug("indication %r", pdu) # additional downstream data is tossed while flushing - if self.flushTask: + if self.flush_task: if _debug: TCPServerActor._debug(" - flushing") return # reschedule the timer - if self.timer: - self.timer.install_task(_time() + self.timeout) + if self.idle_timeout_task: + self.idle_timeout_task.install_task(_time() + self._idle_timeout) # continue as usual TCPServer.indication(self, pdu) @@ -617,7 +672,7 @@ def response(self, pdu): if _debug: TCPServerActor._debug("response %r", pdu) # upstream data is tossed while flushing - if self.flushTask: + if self.flush_task: if _debug: TCPServerActor._debug(" - flushing") return @@ -625,8 +680,8 @@ def response(self, pdu): pdu.pduSource = self.peer # reschedule the timer - if self.timer: - self.timer.install_task(_time() + self.timeout) + if self.idle_timeout_task: + self.idle_timeout_task.install_task(_time() + self._idle_timeout) # process this as a response from the director self.director.response(pdu) @@ -635,11 +690,11 @@ def flush(self): if _debug: TCPServerActor._debug("flush") # clear out the old task - self.flushTask = None + self.flush_task = None # if the outgoing buffer has data, re-schedule another attempt if self.request: - self.flushTask = OneShotFunction(self.flush) + self.flush_task = OneShotFunction(self.flush) return # close up shop, all done @@ -659,19 +714,19 @@ class TCPPickleServerActor(PickleActorMixIn, TCPServerActor): @bacpypes_debugging class TCPServerDirector(asyncore.dispatcher, Server, ServiceAccessPoint, DebugContents): - _debug_contents = ('port', 'timeout', 'actorClass', 'servers') + _debug_contents = ('port', 'idle_timeout', 'actorClass', 'servers') - def __init__(self, address, listeners=5, timeout=0, reuse=False, actorClass=TCPServerActor, cid=None, sapID=None): + def __init__(self, address, listeners=5, idle_timeout=0, reuse=False, actorClass=TCPServerActor, cid=None, sapID=None): if _debug: - TCPServerDirector._debug("__init__ %r listeners=%r timeout=%r reuse=%r actorClass=%r cid=%r sapID=%r" - , address, listeners, timeout, reuse, actorClass, cid, sapID + TCPServerDirector._debug("__init__ %r listeners=%r idle_timeout=%r reuse=%r actorClass=%r cid=%r sapID=%r" + , address, listeners, idle_timeout, reuse, actorClass, cid, sapID ) Server.__init__(self, cid) ServiceAccessPoint.__init__(self, sapID) # save the address and timeout self.port = address - self.timeout = timeout + self.idle_timeout = idle_timeout # check the actor class if not issubclass(actorClass, TCPServerActor):