Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

on_target: Add test case for network loss #287

Merged
merged 1 commit into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions app/src/modules/network/network.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,15 @@ static void sample_network_quality(void)
struct lte_lc_conn_eval_params conn_eval_params;

ret = lte_lc_conn_eval_params_get(&conn_eval_params);
if (ret < 0) {
if (ret == -EOPNOTSUPP) {
LOG_WRN("Connection evaluation not supported in current functional mode");
return;
} else if (ret < 0) {
LOG_ERR("lte_lc_conn_eval_params_get, error: %d", ret);
SEND_FATAL_ERROR();
return;
} else if (ret > 0) {
LOG_WRN("Connection evaluation failed due to a network/modem related reason: %d",
ret);
LOG_WRN("Connection evaluation failed due to a network related reason: %d", ret);
return;
}

Expand Down
78 changes: 37 additions & 41 deletions tests/on_target/tests/test_uart_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,56 +14,52 @@

logger = get_logger()

TEST_TIMEOUT = 1 * 60


def check_uart_data(uart, expected_patterns):
start = time.time()
while True:
all_patterns_found = True
missing_pattern = None
for pattern in expected_patterns:
if not pattern.search(uart.log):
all_patterns_found = False
missing_pattern = pattern.pattern
break

if all_patterns_found or (time.time() > start + TEST_TIMEOUT):
break

time.sleep(5)

assert all_patterns_found, f"Timeout Expired, missing log line pattern: {missing_pattern}"

@pytest.mark.dut1
def test_program_board_and_check_uart(t91x_board, hex_file):
t91x_board.uart.flush()
flash_device(os.path.abspath(hex_file))
time.sleep(5)
t91x_board.uart.xfactoryreset()
reset_device()
expected_lines = ["Network connectivity established",
"Connected to Cloud",
"trigger: trigger_work_fn: Sending data sample trigger"
]
t91x_board.uart.wait_for_str(expected_lines, timeout=TEST_TIMEOUT)

patterns_boot = [
re.compile(r"battery: sample: State of charge: ([\d.]+)"),
re.compile(r"environmental_module: sample: temp: ([\d.]+); press: ([\d.]+); humidity: ([\d.]+); iaq: ([\d.]+); CO2: ([\d.]+); VOC: ([\d.]+)"),
re.compile(r"transport: state_connected_ready_run: Payload\s+([0-9a-fA-F ]+)")
]
check_uart_data(t91x_board.uart, patterns_boot)
"Network connectivity established",
"Connected to Cloud",
"trigger: trigger_work_fn: Sending data sample trigger",
"battery: sample: State of charge:",
"environmental_module: sample: temp:",
"transport: state_connected_ready_run: Payload",
"blocked_run: Location search done",
"trigger: frequent_poll_entry: frequent_poll_entry"
]
patterns_button_press = [
"trigger: frequent_poll_run: Button 1 pressed in frequent poll state, restarting duration timer",
"location_module: location_event_handler: Got location: lat:",
"location: location_utils_event_dispatch: Done",
"blocked_run: Location search done",
"trigger: frequent_poll_entry: frequent_poll_entry"
]
patterns_lte_offline = [
"network: Network connectivity lost",
]
patterns_lte_normal = [
"network: Network connectivity established",
"transport: Connected to Cloud"
]

# Boot
t91x_board.uart.flush()
reset_device()
t91x_board.uart.wait_for_str(patterns_boot, timeout=120)

# Button press
t91x_board.uart.flush()
# Sleep for 5 seconds to allow any location update to complete before button press
time.sleep(5)
# Simulate button press
t91x_board.uart.write("zbus button_press\r\n")
t91x_board.uart.wait_for_str(patterns_button_press, timeout=120)

patterns_button_press = patterns_boot.copy()
patterns_button_press.insert(0, re.compile(r"trigger: frequent_poll_run: Button 1 pressed in frequent poll state, restarting duration timer"))
patterns_button_press.append(re.compile(r"location_module: location_event_handler: Got location: lat: ([\d.]+), lon: ([\d.]+), acc: ([\d.]+)"))
# LTE disconnect
t91x_board.uart.flush()
t91x_board.uart.write("lte offline\r\n")
t91x_board.uart.wait_for_str(patterns_lte_offline, timeout=20)

check_uart_data(t91x_board.uart, patterns_button_press)
# LTE reconnect
t91x_board.uart.flush()
t91x_board.uart.write("lte normal\r\n")
t91x_board.uart.wait_for_str(patterns_lte_normal, timeout=120)
19 changes: 16 additions & 3 deletions tests/on_target/utils/uart.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,21 +157,34 @@ def wait_for_str(
) -> None:
start = time.time()
msgs = msgs if isinstance(msgs, (list, tuple)) else [msgs]

while True:
time.sleep(1)
if all(msg in self.log for msg in msgs):

# Check if all msgs appear in the log in the correct order
log_str = ''.join(self.log)
pos = 0
all_found = True

for msg in msgs:
pos = log_str.find(msg, pos)
if pos == -1:
all_found = False
break
pos += len(msg)

if all_found:
break

if start + timeout < time.time():
not_found = [x for x in msgs if x not in self.log]
raise AssertionError(
f"{not_found} missing in UART log. {error_msg}\nUart log:\n{self.log}"
f"{not_found} missing in UART log in the expected order. {error_msg}\nUart log:\n{self.log}"
)

if self._evt.is_set():
raise RuntimeError(f"Uart thread stopped, log:\n{self.log}")


class UartBinary(Uart):
def __init__(
self,
Expand Down
Loading