Skip to content

Commit

Permalink
fix(azure-iot-device): ReconnectStage State Machine Overhaul (#844)
Browse files Browse the repository at this point in the history
* Auto-reconnect and SAS renewals should no longer cause problems if one or the other is turned off
* Operations now fail if they require a connection, and one cannot be automatically established
* All user-initiated actions should now return errors in case of failure rather than any kind of indefinite retry
* Created completely separate flows for automatic reconnection vs other connections
* Added more explicitness in conditional logic and improved documentation for clarity
  • Loading branch information
cartertinney authored Sep 24, 2021
1 parent adfe9ac commit 43c5fb6
Show file tree
Hide file tree
Showing 7 changed files with 1,477 additions and 1,652 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,6 @@ class ReauthorizeConnectionOperation(PipelineOperation):
Even though this is an base operation, it will most likely be handled by a more specific stage (such as an IoTHub or MQTT stage).
"""

def __init__(self, callback):
self.watchdog_timer = None
super(ReauthorizeConnectionOperation, self).__init__(callback)

pass


Expand All @@ -270,7 +266,9 @@ class DisconnectOperation(PipelineOperation):
Even though this is an base operation, it will most likely be handled by a more specific stage (such as an IoTHub or MQTT stage).
"""

pass
def __init__(self, callback):
self.hard = True # Indicates if this is a "hard" disconnect that kills in-flight ops
super(DisconnectOperation, self).__init__(callback)


class EnableFeatureOperation(PipelineOperation):
Expand Down
601 changes: 350 additions & 251 deletions azure-iot-device/azure/iot/device/common/pipeline/pipeline_stages_base.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,26 @@ def __init__(self):

# The transport will be instantiated upon receiving the InitializePipelineOperation
self.transport = None

# The current in-progress op that affects connection state (Connect, Disconnect, Reauthorize)
self._pending_connection_op = None
# Waitable event indicating the disconnect portion of the reauthorization is complete
self._reauth_disconnection_complete = threading.Event()

@pipeline_thread.runs_on_pipeline_thread
def _cancel_pending_connection_op(self, error=None):
"""
Cancel any running connect, disconnect or reauthorize_connection op. Since our ability to "cancel" is fairly limited,
Cancel any running connect, disconnect or reauthorize connection op. Since our ability to "cancel" is fairly limited,
all this does (for now) is to fail the operation
"""

op = self._pending_connection_op
if op:
# NOTE: This code path should NOT execute in normal flow. There should never already be a pending
# connection op when another is added, due to the SerializeConnectOps stage.
# connection op when another is added, due to the ConnectionLock stage.
# If this block does execute, there is a bug in the codebase.
if not error:
error = pipeline_exceptions.OperationCancelled(
"Cancelling because new ConnectOperation, DisconnectOperation, or ReauthorizeConnectionOperation was issued"
"Cancelling because new ConnectOperation or DisconnectOperationwas issued"
)
self._cancel_connection_watchdog(op)
op.complete(error=error)
Expand Down Expand Up @@ -211,29 +213,40 @@ def _run_op(self, op):
# already doing.

try:
self.transport.disconnect(clear_inflight=True)
# The connect after the disconnect will be triggered upon completion of the
# disconnect in the on_disconnected handler
self.transport.disconnect(clear_inflight=op.hard)
except Exception as e:
logger.info("transport.disconnect raised error while disconnecting")
logger.info(traceback.format_exc())
self._pending_connection_op = None
op.complete(error=e)

elif isinstance(op, pipeline_ops_base.ReauthorizeConnectionOperation):
logger.debug("{}({}): reauthorizing".format(self.name, op.name))
logger.debug(
"{}({}): reauthorizing. Will issue disconnect and then a connect".format(
self.name, op.name
)
)
self_weakref = weakref.ref(self)
reauth_op = op # rename for clarity

self._cancel_pending_connection_op()
self._pending_connection_op = op
# We don't need a watchdog on disconnect because there's no callback to wait for
# and we respond to a watchdog timeout by calling disconnect, which is what we're
# already doing.
def on_disconnect_complete(op, error):
this = self_weakref()
if error:
# Failing a disconnect should still get us disconnected, so can proceed anyway
logger.debug(
"Disconnect failed during reauthorization, continuing with connect"
)
connect_op = reauth_op.spawn_worker_op(pipeline_ops_base.ConnectOperation)

try:
self.transport.disconnect()
except Exception as e:
logger.info("transport.disconnect raised error while reauthorizing")
logger.info(traceback.format_exc())
self._pending_connection_op = None
op.complete(error=e)
# the problem is this doens't unset the disconnect from being the pending op before continuing
this.run_op(connect_op)

disconnect_op = pipeline_ops_base.DisconnectOperation(callback=on_disconnect_complete)
disconnect_op.hard = False

self.run_op(disconnect_op)

elif isinstance(op, pipeline_ops_mqtt.MQTTPublishOperation):
logger.debug("{}({}): publishing on {}".format(self.name, op.name, op.topic))
Expand Down Expand Up @@ -331,7 +344,7 @@ def _on_mqtt_connected(self):
self.send_event_up(pipeline_events_base.ConnectedEvent())

if isinstance(self._pending_connection_op, pipeline_ops_base.ConnectOperation):
logger.debug("completing connect op")
logger.debug("{}: completing connect op".format(self.name))
op = self._pending_connection_op
self._cancel_connection_watchdog(op)
self._pending_connection_op = None
Expand All @@ -340,7 +353,9 @@ def _on_mqtt_connected(self):
# This should indicate something odd is going on.
# If this occurs, either a connect was completed while there was no pending op,
# OR that a connect was completed while a disconnect op was pending
logger.info("Connection was unexpected")
logger.info(
"{}: Connection was unexpected (no connection op pending)".format(self.name)
)

@pipeline_thread.invoke_on_pipeline_thread_nowait
def _on_mqtt_connection_failure(self, cause):
Expand Down Expand Up @@ -378,47 +393,47 @@ def _on_mqtt_disconnected(self, cause=None):

# Send an event to tell other pipeline stages that we're disconnected. Do this before
# we do anything else (in case upper stages have any "are we connected" logic.)
# NOTE: Other stages rely on the fact that this occurs before any op that may be in
# progress is completed. Be careful with changing the order things occur here.
self.send_event_up(pipeline_events_base.DisconnectedEvent())

# Complete any pending connection ops
if self._pending_connection_op:
# on_mqtt_disconnected will cause any pending connect op to complete. This is how Paho
# behaves when there is a connection error, and it also makes sense that on_mqtt_disconnected
# would cause a pending connection op to fail.
logger.debug(
"{}: completing pending {} op".format(self.name, self._pending_connection_op.name)
)

op = self._pending_connection_op
self._cancel_connection_watchdog(op)
self._pending_connection_op = None

if isinstance(op, pipeline_ops_base.DisconnectOperation) or isinstance(
op, pipeline_ops_base.ReauthorizeConnectionOperation
):
if isinstance(op, pipeline_ops_base.DisconnectOperation):
logger.debug(
"{}: Expected disconnect - completing pending disconnect op".format(self.name)
)
# Swallow any errors if we intended to disconnect - even if something went wrong, we
# got to the state we wanted to be in!
#
# NOTE: ReauthorizeConnectionOperation currently completes on disconnect, not when
# the connection is re-established (it leaves connection retry to automatically
# re-establish). This needs to change because it is inaccurate and means that if
# a SASToken expires while connection retry is disabled, a reauthorization cannot
# complete (!!!!)
# TODO: Fix that!
if cause:
handle_exceptions.swallow_unraised_exception(
cause,
log_msg="Unexpected disconnect with error while disconnecting - swallowing error",
log_msg="Unexpected error while disconnecting - swallowing error",
)
# Disconnect complete, no longer pending
self._pending_connection_op = None
op.complete()

else:
logger.debug(
"{}: Unexpected disconnect - completing pending {} operation".format(
self.name, op.name
)
)
# Cancel any potential connection watchdog, and clear the pending op
self._cancel_connection_watchdog(op)
self._pending_connection_op = None
# Complete
if cause:
op.complete(error=cause)
else:
op.complete(
error=transport_exceptions.ConnectionDroppedError("transport disconnected")
)
else:
logger.info("{}: disconnection was unexpected".format(self.name))
logger.info("{}: Unexpected disconnect (no pending connection op)".format(self.name))

# If there is no connection retry, cancel any transport operations waiting on response
# so that they do not get stuck there.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,18 +105,10 @@ def init_kwargs(self, mocker):
return kwargs


class ReauthorizeConnectionOperationInstantiationTests(ReauthorizeConnectionOperationTestConfig):
@pytest.mark.it("Initializes 'watchdog_timer' attribute to 'None'")
def test_retry_timer(self, cls_type, init_kwargs):
op = cls_type(**init_kwargs)
assert op.watchdog_timer is None


pipeline_ops_test.add_operation_tests(
test_module=this_module,
op_class_under_test=pipeline_ops_base.ReauthorizeConnectionOperation,
op_test_config_class=ReauthorizeConnectionOperationTestConfig,
extended_op_instantiation_test_class=ReauthorizeConnectionOperationInstantiationTests,
)


Expand Down
Loading

0 comments on commit 43c5fb6

Please sign in to comment.