From 72f7c6c123fc1c0be7ed7d8869f41e2d1490acc2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simen=20S=2E=20R=C3=B8stad?= Date: Wed, 14 Aug 2024 10:53:38 +0200 Subject: [PATCH] on_target: Add test case for network loss MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add test case for network loss Since we are explicitly setting the mode in offline mode, also update calling of lte lc coneval API to not send fatal error if called with an unsupported functional mode. Also, refactor the test a bit to make it easier to add new testcases. and update wait_for_str test function to expect the order to the input list. Signed-off-by: Simen S. Røstad --- app/src/modules/network/network.c | 8 ++- tests/on_target/tests/test_uart_output.py | 78 +++++++++++------------ tests/on_target/utils/uart.py | 19 +++++- 3 files changed, 58 insertions(+), 47 deletions(-) diff --git a/app/src/modules/network/network.c b/app/src/modules/network/network.c index b0ea0c3a..1f8d2718 100644 --- a/app/src/modules/network/network.c +++ b/app/src/modules/network/network.c @@ -160,13 +160,15 @@ static void sample_network_quality(void) struct lte_lc_conn_eval_params conn_eval_params; ret = lte_lc_conn_eval_params_get(&conn_eval_params); - if (ret < 0) { + if (ret == -EOPNOTSUPP) { + LOG_WRN("Connection evaluation not supported in current functional mode"); + return; + } else if (ret < 0) { LOG_ERR("lte_lc_conn_eval_params_get, error: %d", ret); SEND_FATAL_ERROR(); return; } else if (ret > 0) { - LOG_WRN("Connection evaluation failed due to a network/modem related reason: %d", - ret); + LOG_WRN("Connection evaluation failed due to a network related reason: %d", ret); return; } diff --git a/tests/on_target/tests/test_uart_output.py b/tests/on_target/tests/test_uart_output.py index 8c58324d..8b5c198e 100644 --- a/tests/on_target/tests/test_uart_output.py +++ b/tests/on_target/tests/test_uart_output.py @@ -14,56 +14,52 @@ logger = get_logger() -TEST_TIMEOUT = 1 * 60 - - -def check_uart_data(uart, expected_patterns): - start = time.time() - while True: - all_patterns_found = True - missing_pattern = None - for pattern in expected_patterns: - if not pattern.search(uart.log): - all_patterns_found = False - missing_pattern = pattern.pattern - break - - if all_patterns_found or (time.time() > start + TEST_TIMEOUT): - break - - time.sleep(5) - - assert all_patterns_found, f"Timeout Expired, missing log line pattern: {missing_pattern}" - @pytest.mark.dut1 def test_program_board_and_check_uart(t91x_board, hex_file): - t91x_board.uart.flush() flash_device(os.path.abspath(hex_file)) time.sleep(5) t91x_board.uart.xfactoryreset() - reset_device() - expected_lines = ["Network connectivity established", - "Connected to Cloud", - "trigger: trigger_work_fn: Sending data sample trigger" - ] - t91x_board.uart.wait_for_str(expected_lines, timeout=TEST_TIMEOUT) - patterns_boot = [ - re.compile(r"battery: sample: State of charge: ([\d.]+)"), - re.compile(r"environmental_module: sample: temp: ([\d.]+); press: ([\d.]+); humidity: ([\d.]+); iaq: ([\d.]+); CO2: ([\d.]+); VOC: ([\d.]+)"), - re.compile(r"transport: state_connected_ready_run: Payload\s+([0-9a-fA-F ]+)") - ] - check_uart_data(t91x_board.uart, patterns_boot) + "Network connectivity established", + "Connected to Cloud", + "trigger: trigger_work_fn: Sending data sample trigger", + "battery: sample: State of charge:", + "environmental_module: sample: temp:", + "transport: state_connected_ready_run: Payload", + "blocked_run: Location search done", + "trigger: frequent_poll_entry: frequent_poll_entry" + ] + patterns_button_press = [ + "trigger: frequent_poll_run: Button 1 pressed in frequent poll state, restarting duration timer", + "location_module: location_event_handler: Got location: lat:", + "location: location_utils_event_dispatch: Done", + "blocked_run: Location search done", + "trigger: frequent_poll_entry: frequent_poll_entry" + ] + patterns_lte_offline = [ + "network: Network connectivity lost", + ] + patterns_lte_normal = [ + "network: Network connectivity established", + "transport: Connected to Cloud" + ] + # Boot + t91x_board.uart.flush() + reset_device() + t91x_board.uart.wait_for_str(patterns_boot, timeout=120) + # Button press t91x_board.uart.flush() - # Sleep for 5 seconds to allow any location update to complete before button press - time.sleep(5) - # Simulate button press t91x_board.uart.write("zbus button_press\r\n") + t91x_board.uart.wait_for_str(patterns_button_press, timeout=120) - patterns_button_press = patterns_boot.copy() - patterns_button_press.insert(0, re.compile(r"trigger: frequent_poll_run: Button 1 pressed in frequent poll state, restarting duration timer")) - patterns_button_press.append(re.compile(r"location_module: location_event_handler: Got location: lat: ([\d.]+), lon: ([\d.]+), acc: ([\d.]+)")) + # LTE disconnect + t91x_board.uart.flush() + t91x_board.uart.write("lte offline\r\n") + t91x_board.uart.wait_for_str(patterns_lte_offline, timeout=20) - check_uart_data(t91x_board.uart, patterns_button_press) + # LTE reconnect + t91x_board.uart.flush() + t91x_board.uart.write("lte normal\r\n") + t91x_board.uart.wait_for_str(patterns_lte_normal, timeout=120) diff --git a/tests/on_target/utils/uart.py b/tests/on_target/utils/uart.py index 80b1fe9c..3d35b868 100644 --- a/tests/on_target/utils/uart.py +++ b/tests/on_target/utils/uart.py @@ -157,21 +157,34 @@ def wait_for_str( ) -> None: start = time.time() msgs = msgs if isinstance(msgs, (list, tuple)) else [msgs] + while True: time.sleep(1) - if all(msg in self.log for msg in msgs): + + # Check if all msgs appear in the log in the correct order + log_str = ''.join(self.log) + pos = 0 + all_found = True + + for msg in msgs: + pos = log_str.find(msg, pos) + if pos == -1: + all_found = False + break + pos += len(msg) + + if all_found: break if start + timeout < time.time(): not_found = [x for x in msgs if x not in self.log] raise AssertionError( - f"{not_found} missing in UART log. {error_msg}\nUart log:\n{self.log}" + f"{not_found} missing in UART log in the expected order. {error_msg}\nUart log:\n{self.log}" ) if self._evt.is_set(): raise RuntimeError(f"Uart thread stopped, log:\n{self.log}") - class UartBinary(Uart): def __init__( self,