Skip to content

Commit

Permalink
subtle changes to connection logic and renamed timeout pieces
Browse files Browse the repository at this point in the history
  • Loading branch information
Joel Bender committed Apr 26, 2017
1 parent 32db81e commit 2e28d3a
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 56 deletions.
142 changes: 92 additions & 50 deletions py27/bacpypes/tcp.py
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import asyncore
import socket
import errno

import cPickle as pickle
from time import time as _time, sleep as _sleep
Expand Down Expand Up @@ -92,23 +93,40 @@ def response(self, pdu):
@bacpypes_debugging
class TCPClient(asyncore.dispatcher):

_connect_timeout = None

def __init__(self, peer):
if _debug: TCPClient._debug("__init__ %r", peer)
asyncore.dispatcher.__init__(self)

# ask the dispatcher for a socket
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)

# set the timeout
if self._connect_timeout is not None:
self.settimeout(self._connect_timeout)
if _debug: TCPClient._debug(" - timeout: %r", self._connect_timeout)

# save the peer
self.peer = peer
self.connected = False

# create a request buffer
self.request = b''

# try to connect
try:
if _debug: TCPClient._debug(" - initiate connection")
self.connect(peer)
rslt = self.connect_ex(peer)
if (rslt == 0):
if _debug: TCPClient._debug(" - connected")
self.connected = True
if (rslt == errno.EINPROGRESS):
if _debug: TCPClient._debug(" - in progress")
elif (rslt in (errno.ECONNREFUSED, 111)):
if _debug: TCPClient._debug(" - connection refused")
self.handle_error(rslt)
else:
if _debug: TCPClient._debug(" - connect_ex: %r", rslt)
except socket.error as err:
if _debug: TCPClient._debug(" - connect socket error: %r", err)

Expand All @@ -131,9 +149,10 @@ def handle_connect_event(self):
# check for connection refused
if (err == 0):
if _debug: TCPClient._debug(" - no error")
elif (err == 111):
self.connected = True
elif (err in (errno.ECONNREFUSED, 111)):
if _debug: TCPClient._debug(" - connection to %r refused", self.peer)
self.handle_error(socket.error(111, "connection refused"))
self.handle_error(socket.error(errno.ECONNREFUSED, "connection refused"))
return

# pass along
Expand All @@ -157,7 +176,7 @@ def handle_read(self):
deferred(self.response, PDU(msg))

except socket.error as err:
if (err.args[0] in (61, 111)):
if (err.args[0] in (errno.ECONNREFUSED, 111)):
if _debug: TCPClient._debug(" - connection to %r refused", self.peer)
else:
if _debug: TCPClient._debug(" - recv socket error: %r", err)
Expand All @@ -181,7 +200,7 @@ def handle_write(self):
if (err.args[0] == 32):
if _debug: TCPClient._debug(" - broken pipe to %r", self.peer)
return
elif (err.args[0] in (61, 111)):
elif (err.args[0] in (errno.ECONNREFUSED, 111)):
if _debug: TCPClient._debug(" - connection to %r refused", self.peer)
else:
if _debug: TCPClient._debug(" - send socket error: %s", err)
Expand All @@ -197,7 +216,7 @@ def handle_write_event(self):
if _debug: TCPClient._debug(" - err: %r", err)

# check for connection refused
if (err in (61, 111)):
if (err in (errno.ECONNREFUSED, 111)):
if _debug: TCPClient._debug(" - connection to %r refused", self.peer)
self.handle_error(socket.error(err, "connection refused"))
self.handle_close()
Expand Down Expand Up @@ -247,45 +266,64 @@ class TCPClientActor(TCPClient):

def __init__(self, director, peer):
if _debug: TCPClientActor._debug("__init__ %r %r", director, peer)

# no director yet, no connection error
self.director = None
self._connection_error = None

# pass along the connect timeout from the director
if director.connect_timeout is not None:
self._connect_timeout = director.connect_timeout

# continue with initialization
TCPClient.__init__(self, peer)

# keep track of the director
self.director = director

# add a timer
self.timeout = director.timeout
if self.timeout > 0:
self.timer = FunctionTask(self.idle_timeout)
self.timer.install_task(_time() + self.timeout)
self._idle_timeout = director.idle_timeout
if self._idle_timeout > 0:
self.idle_timeout_task = FunctionTask(self.idle_timeout)
self.idle_timeout_task.install_task(_time() + self._idle_timeout)
else:
self.timer = None
self.idle_timeout_task = None

# this may have a flush state
self.flushTask = None
self.flush_task = None

# tell the director this is a new actor
self.director.add_actor(self)

# if there was a connection error, pass it to the director
if self._connection_error:
if _debug: TCPClientActor._debug(" - had connection error")
self.director.actor_error(self, self._connection_error)

def handle_error(self, error=None):
"""Trap for TCPClient errors, otherwise continue."""
if _debug: TCPClientActor._debug("handle_error %r", error)

# pass along to the director
if error is not None:
self.director.actor_error(self, error)
# this error may be during startup
if not self.director:
self._connection_error = error
else:
self.director.actor_error(self, error)
else:
TCPClient.handle_error(self)

def handle_close(self):
if _debug: TCPClientActor._debug("handle_close")

# if there's a flush task, cancel it
if self.flushTask:
self.flushTask.suspend_task()
if self.flush_task:
self.flush_task.suspend_task()

# cancel the timer
if self.timer:
self.timer.suspend_task()
if self.idle_timeout_task:
self.idle_timeout_task.suspend_task()

# tell the director this is gone
self.director.del_actor(self)
Expand All @@ -303,13 +341,13 @@ def indication(self, pdu):
if _debug: TCPClientActor._debug("indication %r", pdu)

# additional downstream data is tossed while flushing
if self.flushTask:
if self.flush_task:
if _debug: TCPServerActor._debug(" - flushing")
return

# reschedule the timer
if self.timer:
self.timer.install_task(_time() + self.timeout)
if self.idle_timeout_task:
self.idle_timeout_task.install_task(_time() + self._idle_timeout)

# continue as usual
TCPClient.indication(self, pdu)
Expand All @@ -321,8 +359,8 @@ def response(self, pdu):
pdu.pduSource = self.peer

# reschedule the timer
if self.timer:
self.timer.install_task(_time() + self.timeout)
if self.idle_timeout_task:
self.idle_timeout_task.install_task(_time() + self._idle_timeout)

# process this as a response from the director
self.director.response(pdu)
Expand All @@ -331,11 +369,11 @@ def flush(self):
if _debug: TCPClientActor._debug("flush")

# clear out the old task
self.flushTask = None
self.flush_task = None

# if the outgoing buffer has data, re-schedule another attempt
if self.request:
self.flushTask = OneShotFunction(self.flush)
self.flush_task = OneShotFunction(self.flush)
return

# close up shop, all done
Expand All @@ -361,10 +399,13 @@ class TCPPickleClientActor(PickleActorMixIn, TCPClientActor):
@bacpypes_debugging
class TCPClientDirector(Server, ServiceAccessPoint, DebugContents):

_debug_contents = ('timeout', 'actorClass', 'clients', 'reconnect')
_debug_contents = ('connect_timeout', 'idle_timeout', 'actorClass', 'clients', 'reconnect')

def __init__(self, timeout=0, actorClass=TCPClientActor, sid=None, sapID=None):
if _debug: TCPClientDirector._debug("__init__ timeout=%r actorClass=%r sid=%r sapID=%r", timeout, actorClass, sid, sapID)
def __init__(self, connect_timeout=None, idle_timeout=0, actorClass=TCPClientActor, sid=None, sapID=None):
if _debug:
TCPClientDirector._debug("__init__ connect_timeout=%r idle_timeout=%r actorClass=%r sid=%r sapID=%r",
connect_timeout, idle_timeout, actorClass, sid, sapID,
)
Server.__init__(self, sid)
ServiceAccessPoint.__init__(self, sapID)

Expand All @@ -374,7 +415,8 @@ def __init__(self, timeout=0, actorClass=TCPClientActor, sid=None, sapID=None):
self.actorClass = actorClass

# save the timeout for actors
self.timeout = timeout
self.connect_timeout = connect_timeout
self.idle_timeout = idle_timeout

# start with an empty client pool
self.clients = {}
Expand Down Expand Up @@ -567,15 +609,15 @@ def __init__(self, director, sock, peer):
self.director = director

# add a timer
self.timeout = director.timeout
if self.timeout > 0:
self.timer = FunctionTask(self.idle_timeout)
self.timer.install_task(_time() + self.timeout)
self._idle_timeout = director.idle_timeout
if self._idle_timeout > 0:
self.idle_timeout_task = FunctionTask(self.idle_timeout)
self.idle_timeout_task.install_task(_time() + self._idle_timeout)
else:
self.timer = None
self.idle_timeout_task = None

# this may have a flush state
self.flushTask = None
self.flush_task = None

# tell the director this is a new actor
self.director.add_actor(self)
Expand All @@ -594,8 +636,8 @@ def handle_close(self):
if _debug: TCPServerActor._debug("handle_close")

# if there's a flush task, cancel it
if self.flushTask:
self.flushTask.suspend_task()
if self.flush_task:
self.flush_task.suspend_task()

# tell the director this is gone
self.director.del_actor(self)
Expand All @@ -613,13 +655,13 @@ def indication(self, pdu):
if _debug: TCPServerActor._debug("indication %r", pdu)

# additional downstream data is tossed while flushing
if self.flushTask:
if self.flush_task:
if _debug: TCPServerActor._debug(" - flushing")
return

# reschedule the timer
if self.timer:
self.timer.install_task(_time() + self.timeout)
if self.idle_timeout_task:
self.idle_timeout_task.install_task(_time() + self._idle_timeout)

# continue as usual
TCPServer.indication(self, pdu)
Expand All @@ -628,16 +670,16 @@ def response(self, pdu):
if _debug: TCPServerActor._debug("response %r", pdu)

# upstream data is tossed while flushing
if self.flushTask:
if self.flush_task:
if _debug: TCPServerActor._debug(" - flushing")
return

# save the source
pdu.pduSource = self.peer

# reschedule the timer
if self.timer:
self.timer.install_task(_time() + self.timeout)
if self.idle_timeout_task:
self.idle_timeout_task.install_task(_time() + self._idle_timeout)

# process this as a response from the director
self.director.response(pdu)
Expand All @@ -646,11 +688,11 @@ def flush(self):
if _debug: TCPServerActor._debug("flush")

# clear out the old task
self.flushTask = None
self.flush_task = None

# if the outgoing buffer has data, re-schedule another attempt
if self.request:
self.flushTask = OneShotFunction(self.flush)
self.flush_task = OneShotFunction(self.flush)
return

# close up shop, all done
Expand All @@ -670,19 +712,19 @@ class TCPPickleServerActor(PickleActorMixIn, TCPServerActor):
@bacpypes_debugging
class TCPServerDirector(asyncore.dispatcher, Server, ServiceAccessPoint, DebugContents):

_debug_contents = ('port', 'timeout', 'actorClass', 'servers')
_debug_contents = ('port', 'idle_timeout', 'actorClass', 'servers')

def __init__(self, address, listeners=5, timeout=0, reuse=False, actorClass=TCPServerActor, cid=None, sapID=None):
def __init__(self, address, listeners=5, idle_timeout=0, reuse=False, actorClass=TCPServerActor, cid=None, sapID=None):
if _debug:
TCPServerDirector._debug("__init__ %r listeners=%r timeout=%r reuse=%r actorClass=%r cid=%r sapID=%r"
, address, listeners, timeout, reuse, actorClass, cid, sapID
TCPServerDirector._debug("__init__ %r listeners=%r idle_timeout=%r reuse=%r actorClass=%r cid=%r sapID=%r"
, address, listeners, idle_timeout, reuse, actorClass, cid, sapID
)
Server.__init__(self, cid)
ServiceAccessPoint.__init__(self, sapID)

# save the address and timeout
self.port = address
self.timeout = timeout
self.idle_timeout = idle_timeout

# check the actor class
if not issubclass(actorClass, TCPServerActor):
Expand Down
Loading

0 comments on commit 2e28d3a

Please sign in to comment.