From d74b22268245e3a170c6d958f8c06c3eeca9e986 Mon Sep 17 00:00:00 2001 From: olivakar Date: Wed, 15 Mar 2023 09:45:07 -0700 Subject: [PATCH] sample(azure-iot-device): dps re-provision sample (#1109) --- ...on_symmetric_key_failover_device_delete.md | 74 ++++ ...on_symmetric_key_failover_device_delete.py | 374 ++++++++++++++++++ 2 files changed, 448 insertions(+) create mode 100644 samples/solutions/provision_symmetric_key_failover_device_delete.md create mode 100644 samples/solutions/provision_symmetric_key_failover_device_delete.py diff --git a/samples/solutions/provision_symmetric_key_failover_device_delete.md b/samples/solutions/provision_symmetric_key_failover_device_delete.md new file mode 100644 index 000000000..d055af846 --- /dev/null +++ b/samples/solutions/provision_symmetric_key_failover_device_delete.md @@ -0,0 +1,74 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- + +## CUSTOMER PERSONA +This application illustrates that a provisioning request can be sent again and multiple times to the Device Provisioning Service. +On first loading of the application the device registers for the first time. The application waits for the registration +process to be completely successful. Once an IotHub assignment is done the device connects to the assigned IoTHub and +starts sending telemetry at a constant rate. Currently, messages are sent at some interval consistently as long as +network connection remains. In case of disconnection the customer wants to retry the connection. Currently, the time at +which messages are sent is at TELEMETRY_INTERVAL secs. All connection failed attempts are retried starting with an +initial value of INITIAL_SLEEP_TIME_BETWEEN_CONNECTION_ATTEMPTS after which the interval between each retry attempt +increases geometrically. Once the sleep time reaches an upper threshold set by THRESHOLD_FOR_RETRY_CONNECTION the +application retries the provisioning process again. It is to be noted that the application does not retry provisioning +in case there is an error from provisioning itself. All values are configurable and customizable as per the scenario needs. + +## TESTING +The device was deliberately deleted from the assigned hub to force a disconnection. And after retrying connections +couple of times based on the sleep time , once it reached a certain threshold the application went +back into provisioning the device again. + +## WORKING APP + +The application should work seamlessly and continuously as long as the customer does not exit the application. +The application can also raise an unrecoverable exception and exit itself. +In case of recoverable error where the network connection drops, the application should try to establish connection again. + +As long as interval time between connection attempts does not go beyond a certain threshold the application will retry connection. +Once above a certain threshold the application starts the provisioning process again. + +In case the provisioning process raises an error the application will exit. So the application does not +retry provisioning in case of error from provisioning itself. Some errors from provisioning are related due to wrong +configuration of the enrollment. During these times it is best to start over. + +The application has significant logging as well to check on progress and troubleshoot issues. + +## APP SPECIFIC LOGS + +Several log files will be generated as the application runs. The DEBUG and INFO logs are generated +on a timed rotating logging handler. So multiple of DEBUG and INFO files based on time-stamp will be generated. +The debug log files will be named like `debug.log.2023-01-04_11-28-49` and info log files will be named as +`info.log.2023-01-04_11-28-49` with the date and timestamp. The next debug and log files will be generated with names +like `debug.log.2023-01-04_12-28-49` and `info.log.2023-01-04_12-28-49` with a rotation interval of 1 hour set by LOG_ROTATION_INTERVAL. + +The `sample.log` file will contain logging output only from the solution. The solution also prints similar texts onto the console for visual purposes. +Customer can modify the current logging and set it to a different level by changing one of the loggers. + +## ADD LIBRARY SPECIFIC LOGGING + +Customer can also add logging for example say into the MQTT Library Paho by doing +```python +paho_log_handler = logging.handlers.TimedRotatingFileHandler( + filename="{}/paho.log".format(LOG_DIRECTORY), + when="S", + interval=LOG_ROTATION_INTERVAL, + backupCount=LOG_BACKUP_COUNT, +) +paho_log_handler.setLevel(level=logging.DEBUG) +paho_log_handler.setFormatter(log_formatter) +paho_logger = logging.getLogger("paho") +paho_logger.addHandler(paho_log_handler) +``` + +## TROUBLESHOOTING TIPS +Currently, whenever connection drops due it is considered to be recoverable, and it is retried for a fixed set of times. + +In the event the application has stopped working for any of the above errors, it will establish connection on its own +and resume the application whenever the network is back. Such intermittent disruptions are temporary and this is +a correct process of operation. + +In case the application has stopped and exited it could be either the provisioning process has run into an error out +or there is some other unrecoverable error that has caused the exit. The cause of such a thing can be found out from the logs. \ No newline at end of file diff --git a/samples/solutions/provision_symmetric_key_failover_device_delete.py b/samples/solutions/provision_symmetric_key_failover_device_delete.py new file mode 100644 index 000000000..a15836c4a --- /dev/null +++ b/samples/solutions/provision_symmetric_key_failover_device_delete.py @@ -0,0 +1,374 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- + +import asyncio +from azure.iot.device.iothub.aio import IoTHubDeviceClient +from azure.iot.device.aio import ProvisioningDeviceClient + +import logging.handlers +import glob +import os +import traceback + +# Current DPS time out configured in the python SDK is itself 30 secs , +# so call times need to be configured keeping that in mind. +# The interval at which to check for registrations after a registration has been successful +MONITOR_TIME_BETWEEN_SUCCESS_ASSIGNMENTS = 45 +# The interval at which to send telemetry +TELEMETRY_INTERVAL = 10 +# Interval in seconds between to check if device was provisioned to some hub +# This is used when registration was underway and other processes are checking if it is completed. +SLEEP_TIME_BETWEEN_CHECKING_REGISTRATION = 10 +# Initial interval in seconds between consecutive connection attempts in case of error +INITIAL_SLEEP_TIME_BETWEEN_CONNECTION_ATTEMPTS = 3 +# Threshold for retrying connection attempts after which the app will error +THRESHOLD_FOR_RETRY_CONNECTION = 90 +# Interval for rotating logs, in seconds +LOG_ROTATION_INTERVAL = 3600 +# How many logs to keep before recycling +LOG_BACKUP_COUNT = 6 +# Directory for storing log files +LOG_DIRECTORY = "./logs/event_loop_dpsfailover/device-delete-new-client-3" +messages_to_send = 10 + +# logger = logging.getLogger() +# logger.setLevel(level=logging.DEBUG) + +# Prepare the log directory +os.makedirs(LOG_DIRECTORY, exist_ok=True) +for filename in glob.glob("{}/*.log".format(LOG_DIRECTORY)): + os.remove(filename) + +log_formatter = logging.Formatter( + "%(asctime)s %(levelname)-5s (%(threadName)s) %(filename)s:%(funcName)s():%(message)s" +) + +info_log_handler = logging.handlers.TimedRotatingFileHandler( + filename="{}/info.log".format(LOG_DIRECTORY), + when="S", + interval=LOG_ROTATION_INTERVAL, + backupCount=LOG_BACKUP_COUNT, +) +info_log_handler.setLevel(level=logging.INFO) +info_log_handler.setFormatter(log_formatter) + +debug_log_handler = logging.handlers.TimedRotatingFileHandler( + filename="{}/debug.log".format(LOG_DIRECTORY), + when="S", + interval=LOG_ROTATION_INTERVAL, + backupCount=LOG_BACKUP_COUNT, +) +debug_log_handler.setLevel(level=logging.DEBUG) +debug_log_handler.setFormatter(log_formatter) + +root_logger = logging.getLogger() +root_logger.setLevel(level=logging.DEBUG) +root_logger.addHandler(info_log_handler) +root_logger.addHandler(debug_log_handler) + +sample_log_handler = logging.FileHandler(filename="{}/sample.log".format(LOG_DIRECTORY)) +sample_log_handler.setLevel(level=logging.DEBUG) +sample_log_handler.setFormatter(log_formatter) +logger = logging.getLogger(__name__) +logger.addHandler(sample_log_handler) + + +def get_type_name(e): + return type(e).__name__ + + +class Application(object): + async def initiate(self): + self.connected_event = asyncio.Event() + self.disconnected_event = asyncio.Event() + self.exit_app_event = asyncio.Event() + self.iothub_assignment_fail_event = asyncio.Event() + self.iothub_assignment_sucess_event = asyncio.Event() + # Baseline that device has not been assigned + self.iothub_assignment_fail_event.set() + # Baseline that device has not been connected + self.disconnected_event.set() + + self.iothub_client = None + self.first_connect = True + self.registration_attempt_on = True + # Power factor for increasing interval between consecutive connection attempts. + # This will increase with iteration + self.retry_increase_factor = 1 + # The nth number for attempting connection + self.sleep_time_between_conns = INITIAL_SLEEP_TIME_BETWEEN_CONNECTION_ATTEMPTS + self.try_number = 1 + + async def create_dps_client(self, symmetric_key): + self.log_info_and_print("Will create provisioning device client to provision device...") + provisioning_host = os.getenv("PROVISIONING_HOST") + id_scope = os.getenv("PROVISIONING_IDSCOPE") + registration_id = os.getenv("PROVISIONING_REGISTRATION_ID") + provisioning_client = ProvisioningDeviceClient.create_from_symmetric_key( + provisioning_host=provisioning_host, + registration_id=registration_id, + id_scope=id_scope, + symmetric_key=symmetric_key, + ) + return provisioning_client + + async def create_hub_client(self, registration_result, symmetric_key): + try: + # Create a Device Client + self.iothub_client = IoTHubDeviceClient.create_from_symmetric_key( + symmetric_key=symmetric_key, + hostname=registration_result.registration_state.assigned_hub, + device_id=registration_result.registration_state.device_id, + ) + # Attach the connection state handler + self.iothub_client.on_connection_state_change = self.handle_on_connection_state_change + except Exception as e: + self.log_error_and_print( + "Caught exception while trying to attach handler : {}".format(get_type_name(e)) + ) + raise Exception( + "Caught exception while trying to attach handler. Will exit application..." + ) + + async def handle_on_connection_state_change(self): + self.log_info_and_print( + "handle_on_connection_state_change fired. Connected status : {}".format( + self.iothub_client.connected + ) + ) + if self.iothub_client.connected: + self.log_info_and_print("Connected connected_event is set...") + self.disconnected_event.clear() + main_event_loop.call_soon_threadsafe(self.connected_event.set) + # Reset the power factor, sleep time and the try number to what it was originally + # on every successful connection. + self.retry_increase_factor = 1 + self.sleep_time_between_conns = INITIAL_SLEEP_TIME_BETWEEN_CONNECTION_ATTEMPTS + self.try_number = 1 + else: + self.log_info_and_print("Disconnected connected_event is set...") + main_event_loop.call_soon_threadsafe(self.disconnected_event.set) + self.connected_event.clear() + + async def register_loop(self): + while True: + provisioning_client = None + self.log_info_and_print("Entry register_loop") + if self.iothub_assignment_sucess_event.is_set(): + self.log_info_and_print("Device has been successfully provisioned already...") + await asyncio.sleep(MONITOR_TIME_BETWEEN_SUCCESS_ASSIGNMENTS) + elif self.iothub_assignment_fail_event.is_set(): + try: + symmetric_key = os.getenv("PROVISIONING_SYMMETRIC_KEY") + provisioning_client = await self.create_dps_client(symmetric_key) + self.log_info_and_print("Registering the device...") + registration_result = await provisioning_client.register() + print("The complete registration result is") + print(registration_result.registration_state) + if registration_result.status == "assigned": + self.log_info_and_print( + "Will create hub client to send telemetry from the provisioned device" + ) + await self.create_hub_client(registration_result, symmetric_key) + self.iothub_assignment_sucess_event.set() + self.iothub_assignment_fail_event.clear() + self.log_error_and_print( + "Registration was done and device was assigned correctly to an " + "IoTHub via the registration process.Will check if all is right " + "after some time..." + ) + await asyncio.sleep(MONITOR_TIME_BETWEEN_SUCCESS_ASSIGNMENTS) + else: + self.log_error_and_print( + "Registration was done but device was not assigned correctly to an " + "IoTHub via the registration process.Will try registration " + "again after some time..." + ) + await asyncio.sleep(SLEEP_TIME_BETWEEN_CHECKING_REGISTRATION) + self.iothub_assignment_fail_event.set() + self.iothub_assignment_sucess_event.clear() + except Exception as e: + self.log_error_and_print( + "Registration process failed because of error {}".format(get_type_name(e)) + ) + raise Exception( + "Caught an unrecoverable error that needs to be " + "fixed from user end while registering. Will exit application..." + ) + finally: + await provisioning_client.shutdown() + + if self.exit_app_event.is_set(): + return + + async def wait_for_connect_and_send_telemetry(self): + id = 1 + while True: + if not self.iothub_client: + # Time to check if device has been provisioned + self.log_info_and_print( + "IoTHub client is not assigned. Telemetry operation failed. " + "Will check after {} secs...".format(SLEEP_TIME_BETWEEN_CHECKING_REGISTRATION) + ) + await asyncio.sleep(SLEEP_TIME_BETWEEN_CHECKING_REGISTRATION) + elif not self.iothub_client.connected: + self.log_info_and_print("IoTHub client is existent. But waiting for connection ...") + await self.connected_event.wait() + else: + self.log_info_and_print("sending message with id {}....".format(id)) + await self.iothub_client.send_message("message number {}".format(id)) + id += 1 + self.log_info_and_print("sent message.....") + self.log_info_and_print("sleeping for {} secs...".format(TELEMETRY_INTERVAL)) + await asyncio.sleep(TELEMETRY_INTERVAL) + if self.exit_app_event.is_set(): + return + + async def if_disconnected_then_connect_with_retry(self): + while True: + self.log_info_and_print("Entry for retry after disconnection") + done, pending = await asyncio.wait( + [ + self.disconnected_event.wait(), + self.exit_app_event.wait(), + ], + return_when=asyncio.FIRST_COMPLETED, + ) + self.log_info_and_print("Exit for retry after disconnection") + await asyncio.gather(*done) + [x.cancel() for x in pending] + if self.exit_app_event.is_set(): + self.log_info_and_print("Exiting while connected") + return + if not self.iothub_client or self.iothub_assignment_fail_event.is_set(): + self.log_info_and_print( + "IoTHub client is invalid, re-provisioning is required to proceed." + "Will check after {} secs...".format(SLEEP_TIME_BETWEEN_CHECKING_REGISTRATION) + ) + await asyncio.sleep(SLEEP_TIME_BETWEEN_CHECKING_REGISTRATION) + elif not self.iothub_assignment_fail_event.is_set(): + try: + self.log_info_and_print( + "Attempting to connect the device client try number {}....".format( + self.try_number + ) + ) + await self.iothub_client.connect() + if self.first_connect: + self.first_connect = False + self.log_info_and_print("Successfully connected the device client...") + except Exception as e: + self.log_error_and_print( + "Caught exception while trying to connect: {}".format(get_type_name(e)) + ) + self.log_error_and_print("Exception details...") + self.log_error_and_print( + "Detailed exception: {}".format(traceback.format_exception_only(type(e), e)) + ) + if self.first_connect: + self.log_info_and_print( + "Very first connection never occurred so will retry immediately..." + ) + self.first_connect = False + sleep_time = 0 + else: + self.log_info_and_print( + "Retry attempt interval is {} and increase power factor is {}".format( + self.sleep_time_between_conns, self.retry_increase_factor + ) + ) + sleep_time = pow(self.sleep_time_between_conns, self.retry_increase_factor) + + if sleep_time > THRESHOLD_FOR_RETRY_CONNECTION: + self.log_error_and_print( + "Failed to connect the device client couple of times." + "Retry time is greater than upper limit set. Will be reprovisioning the device again." + ) + self.try_number = 1 + self.retry_increase_factor = 1 + self.iothub_assignment_fail_event.set() + self.iothub_assignment_sucess_event.clear() + sleep_time = SLEEP_TIME_BETWEEN_CHECKING_REGISTRATION + self.log_error_and_print( + "Will not retry connection as trying re-provisioing again. " + "Will try connection again after some time {}...".format(sleep_time) + ) + await asyncio.sleep(sleep_time) + else: + self.log_error_and_print( + "Failed to connect the device client due to error :{}.Sleeping and retrying after {} seconds".format( + get_type_name(e), sleep_time + ) + ) + self.retry_increase_factor += 1 + self.try_number += 1 + await asyncio.sleep(sleep_time) + + def log_error_and_print(self, s): + logger.error(s) + + print(s) + + def log_info_and_print(self, s): + logger.info(s) + print(s) + + async def run_sample(self): + await self.initiate() + + self.log_error_and_print( + "asyncio debug is set to {}".format(os.getenv("PYTHONASYNCIODEBUG")) + ) + + tasks = [ + asyncio.create_task(self.register_loop()), + asyncio.create_task(self.wait_for_connect_and_send_telemetry()), + asyncio.create_task(self.if_disconnected_then_connect_with_retry()), + ] + + pending = [] + + try: + done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION) + await asyncio.gather(*done) + except KeyboardInterrupt: + self.log_error_and_print("IoTHubClient sample stopped by user") + except Exception as e: + self.log_error_and_print("Exception in run sample loop: {}".format(get_type_name(e))) + finally: + self.log_info_and_print("Exiting app") + self.exit_app_event.set() + self.log_info_and_print("Waiting for all coroutines to exit") + if pending: + await asyncio.wait_for( + asyncio.wait(pending, return_when=asyncio.ALL_COMPLETED), timeout=5 + ) + self.log_info_and_print( + "Shutting down both ProvisioningClient and IoTHubClient and exiting Application" + ) + await self.iothub_client.shutdown() + + def main(self): + global main_event_loop + print("IoT Hub Sample #1 - Constant Connection With Telemetry") + print("Press Ctrl-C to exit") + + main_event_loop = asyncio.get_event_loop() + + try: + main_event_loop.run_until_complete(Application().run_sample()) + except Exception as e: + self.log_error_and_print( + "Any other exception in the main calling: {}".format(get_type_name(e)) + ) + except KeyboardInterrupt: + print("IoTHubClient sample stopped by user") + finally: + main_event_loop.close() + + +if __name__ == "__main__": + Application().main()