Skip to content

Commit

Permalink
refactor (azure-iot-device): Remove ConnectionLockStage (#1017)
Browse files Browse the repository at this point in the history
* All remaining `ConnectionLockStage` functionality that was not already implemented by the `ConnectionStateStage` has now been moved there
  • Loading branch information
cartertinney authored Jun 23, 2022
1 parent 0397876 commit 268d4b3
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 813 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -615,124 +615,6 @@ def on_connect_op_complete(op, error):
self.send_op_down(pipeline_ops_base.ConnectOperation(callback=on_connect_op_complete))


class ConnectionLockStage(PipelineStage):
"""
This stage is responsible for serializing connect, disconnect, and reauthorize ops on
the pipeline, such that only a single one of these ops can go past this stage at a
time. This way, we don't have to worry about cases like "what happens if we try to
disconnect if we're in the middle of reauthorizing." This stage will wait for the
reauthorize to complete before letting the disconnect past.
"""

def __init__(self):
super().__init__()
self.queue = queue.Queue()
self.blocked = False

@pipeline_thread.runs_on_pipeline_thread
def _run_op(self, op):

# If this stage is currently blocked (because we're waiting for a connection, etc,
# to complete), we queue up all operations until after the connect completes.
if self.blocked:
logger.debug(
"{}({}): pipeline is blocked waiting for a prior connect/disconnect/reauthorize to complete. queueing.".format(
self.name, op.name
)
)
self.queue.put_nowait(op)

elif isinstance(op, pipeline_ops_base.ConnectOperation) and self.nucleus.connected:
logger.info(
"{}({}): Transport is already connected. Completing.".format(self.name, op.name)
)
op.complete()

# NOTE: We ought not to be checking specific ConnectionStates if it can be helped.
# Unfortunately, here, it can't be - it's okay, this stage will be deleted very soon
# TODO: Move auto-completion logic to ConnectionStateStage and delete this stage.
elif (
isinstance(op, pipeline_ops_base.DisconnectOperation)
and self.nucleus.connection_state is ConnectionState.DISCONNECTED
):
logger.info(
"{}({}): Transport is already disconnected. Completing.".format(self.name, op.name)
)
op.complete()

elif (
isinstance(op, pipeline_ops_base.DisconnectOperation)
or isinstance(op, pipeline_ops_base.ConnectOperation)
or isinstance(op, pipeline_ops_base.ReauthorizeConnectionOperation)
):
self._block(op)

@pipeline_thread.runs_on_pipeline_thread
def on_operation_complete(op, error):
if error:
logger.debug(
"{}({}): op failed. Unblocking queue with error: {}".format(
self.name, op.name, error
)
)
else:
logger.debug(
"{}({}): op succeeded. Unblocking queue".format(self.name, op.name)
)

self._unblock(op, error)

op.add_callback(on_operation_complete)
self.send_op_down(op)

else:
self.send_op_down(op)

@pipeline_thread.runs_on_pipeline_thread
def _block(self, op):
"""
block this stage while we're waiting for the connect/disconnect/reauthorize operation to complete.
"""
logger.debug("{}({}): blocking".format(self.name, op.name))
self.blocked = True

@pipeline_thread.runs_on_pipeline_thread
def _unblock(self, op, error):
"""
Unblock this stage after the connect/disconnect/reauthorize operation is complete. This also means
releasing all the operations that were queued up.
"""
logger.debug("{}({}): unblocking and releasing queued ops.".format(self.name, op.name))
self.blocked = False
logger.debug(
"{}({}): processing {} items in queue for error={}".format(
self.name, op.name, self.queue.qsize(), error
)
)
# Loop through our queue and release all the blocked operations
# Put a new Queue in self.queue because releasing ops might put them back in the
# queue, especially if there's a ConnectOperation in the list of ops to release
old_queue = self.queue
self.queue = queue.Queue()
while not old_queue.empty():
op_to_release = old_queue.get_nowait()
if error:
# if we're unblocking the queue because something (like a connect operation) failed,
# then we fail all of the blocked operations with the same error.
logger.debug(
"{}({}): failing {} op because of error".format(
self.name, op.name, op_to_release.name
)
)
op_to_release.complete(error=error)
else:
logger.debug(
"{}({}): releasing {} op.".format(self.name, op.name, op_to_release.name)
)
# call run_op directly here so operations go through this stage again (especially connect/disconnect ops)
self.run_op(op_to_release)


class CoordinateRequestAndResponseStage(PipelineStage):
"""
Pipeline stage which is responsible for coordinating RequestAndResponseOperation operations. For each
Expand Down Expand Up @@ -1065,11 +947,6 @@ class ConnectionStateStage(PipelineStage):
ConnectionState.DISCONNECTING,
ConnectionState.REAUTHORIZING,
]
connection_ops = [
pipeline_ops_base.ConnectOperation,
pipeline_ops_base.DisconnectOperation,
pipeline_ops_base.ReauthorizeConnectionOperation,
]
transient_connect_errors = [
pipeline_exceptions.OperationCancelled,
pipeline_exceptions.OperationTimeout,
Expand All @@ -1092,17 +969,9 @@ def __init__(self):
@pipeline_thread.runs_on_pipeline_thread
def _run_op(self, op):

# If receiving a connection op while one is already in progress, wait for the current
# one to finish. This is kind of like a ConnectionLockStage, but inside this one.
# It has to happen here because just relying on a ConnectionLockStage before or after
# in the pipeline is insufficient, given that operations can spawn in this stage.
# We need a way to wait ops without letting them pass through and affect the connection
# state in order to address edge cases e.g. a user-initiated connect and a automatic
# reconnect connect happen at approximately the same time.
if (
self.nucleus.connection_state in self.intermediate_states
and type(op) in self.connection_ops
):
# If receiving an operation while the connection state is changing, wait for the
# connection state to reach a stable state before continuing.
if self.nucleus.connection_state in self.intermediate_states:
logger.debug(
"{}({}): State is {} - waiting for in-progress operation to finish".format(
self.name, op.name, self.nucleus.connection_state
Expand All @@ -1114,14 +983,11 @@ def _run_op(self, op):
if isinstance(op, pipeline_ops_base.ConnectOperation):
if self.nucleus.connection_state is ConnectionState.CONNECTED:
logger.debug(
"{}({}): State is already CONNECTED. Sending op down".format(
"{}({}): State is already CONNECTED. Completing operation".format(
self.name, op.name
)
)
self._add_connection_op_callback(op)
# NOTE: This is the safest thing to do while the ConnectionLockStage is
# doing auto-completes based on connection status. When it is revisited,
# this logic may need to be updated.
op.complete()
elif self.nucleus.connection_state is ConnectionState.DISCONNECTED:
logger.debug(
"{}({}): State changes DISCONNECTED -> CONNECTING. Sending op down".format(
Expand All @@ -1130,6 +996,7 @@ def _run_op(self, op):
)
self.nucleus.connection_state = ConnectionState.CONNECTING
self._add_connection_op_callback(op)
self.send_op_down(op)
else:
# This should be impossible to reach. If the state were intermediate, it
# would have been added to the waiting ops queue above.
Expand All @@ -1138,6 +1005,7 @@ def _run_op(self, op):
self.name, op.name, self.nucleus.connection_state
)
)
self.send_op_down(op)

elif isinstance(op, pipeline_ops_base.DisconnectOperation):
# First, always clear any reconnect timer. Because a manual disconnection is
Expand All @@ -1152,16 +1020,14 @@ def _run_op(self, op):
)
self.nucleus.connection_state = ConnectionState.DISCONNECTING
self._add_connection_op_callback(op)
self.send_op_down(op)
elif self.nucleus.connection_state is ConnectionState.DISCONNECTED:
logger.debug(
"{}({}): State is already DISCONNECTED. Sending op down".format(
"{}({}): State is already DISCONNECTED. Completing operation".format(
self.name, op.name
)
)
self._add_connection_op_callback(op)
# NOTE: This is the safest thing to do while the ConnectionLockStage is
# doing auto-completes based on connection status. When it is revisited,
# this logic may need to be updated.
op.complete()
else:
# This should be impossible to reach. If the state were intermediate, it
# would have been added to the waiting ops queue above.
Expand All @@ -1170,6 +1036,7 @@ def _run_op(self, op):
self.name, op.name, self.nucleus.connection_state
)
)
self.send_op_down(op)

elif isinstance(op, pipeline_ops_base.ReauthorizeConnectionOperation):
if self.nucleus.connection_state is ConnectionState.CONNECTED:
Expand All @@ -1180,6 +1047,7 @@ def _run_op(self, op):
)
self.nucleus.connection_state = ConnectionState.REAUTHORIZING
self._add_connection_op_callback(op)
self.send_op_down(op)
elif self.nucleus.connection_state is ConnectionState.DISCONNECTED:
logger.debug(
"{}({}): State changes DISCONNECTED -> REAUTHORIZING. Sending op down".format(
Expand All @@ -1188,6 +1056,7 @@ def _run_op(self, op):
)
self.nucleus.connection_state = ConnectionState.REAUTHORIZING
self._add_connection_op_callback(op)
self.send_op_down(op)
else:
# This should be impossible to reach. If the state were intermediate, it
# would have been added to the waiting ops queue above.
Expand All @@ -1196,6 +1065,7 @@ def _run_op(self, op):
self.name, op.name, self.nucleus.connection_state
)
)
self.send_op_down(op)

elif isinstance(op, pipeline_ops_base.ShutdownPipelineOperation):
self._clear_reconnect_timer()
Expand All @@ -1206,9 +1076,10 @@ def _run_op(self, op):
"Operation waiting in ConnectionStateStage cancelled by shutdown"
)
waiting_op.complete(error=cancel_error)
self.send_op_down(op)

# In all cases the op gets sent down
self.send_op_down(op)
else:
self.send_op_down(op)

@pipeline_thread.runs_on_pipeline_thread
def _handle_pipeline_event(self, event):
Expand Down Expand Up @@ -1327,10 +1198,6 @@ def _add_connection_op_callback(self, op):
"""Adds callback to a connection op passing through to do necessary stage upkeep"""
self_weakref = weakref.ref(self)

# NOTE: we are currently protected from connect failing due to being already connected
# by the ConnectionLockStage. If the ConnectionLockStage changes functionality,
# we may need some logic changes to address an op that can fail while leaving us CONNECTED

@pipeline_thread.runs_on_pipeline_thread
def on_complete(op, error):
this = self_weakref()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,6 @@ def __init__(self, pipeline_configuration):
#
.append_stage(pipeline_stages_base.ConnectionStateStage())
#
# ConnectionLockStage needs to be after ConnectionStateStage because we want any ops that
# ConnectionStateStage creates to go through the ConnectionLockStage gate
#
.append_stage(pipeline_stages_base.ConnectionLockStage())
#
# RetryStage needs to be near the end because it's retrying low-level MQTT operations.
#
.append_stage(pipeline_stages_base.RetryStage())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,6 @@ def __init__(self, pipeline_configuration):
#
.append_stage(pipeline_stages_base.ConnectionStateStage())
#
# ConnectionLockStage needs to be after ConnectionStateStage because we want any ops that
# ConnectionStateStage creates to go through the ConnectionLockStage gate
#
.append_stage(pipeline_stages_base.ConnectionLockStage())
#
# RetryStage needs to be near the end because it's retrying low-level MQTT operations.
#
.append_stage(pipeline_stages_base.RetryStage())
Expand Down
Loading

0 comments on commit 268d4b3

Please sign in to comment.