From 6393fcd79a5f2862c5909078bd79a9652eccb0cb Mon Sep 17 00:00:00 2001 From: Suren Gabrielyan Date: Fri, 12 Apr 2024 13:14:22 +0400 Subject: [PATCH] fix(websocket): Fix locking issues of `esp_websocket_client_send_with_exact_opcode` API Extended examples to cover more cases Added new config CONFIG_ESP_WS_CLIENT_ENABLE_DYNAMIC_BUFFER for testing --- .../esp_websocket_client.c | 60 +++++++-------- .../examples/linux/main/main.c | 16 +++- .../examples/target/main/websocket_example.c | 25 ++++++- .../examples/target/pytest_websocket.py | 73 ++++++++++++++++--- .../target/sdkconfig.ci.dynamic_buffer | 14 ++++ 5 files changed, 142 insertions(+), 46 deletions(-) create mode 100644 components/esp_websocket_client/examples/target/sdkconfig.ci.dynamic_buffer diff --git a/components/esp_websocket_client/esp_websocket_client.c b/components/esp_websocket_client/esp_websocket_client.c index b07e020d43..83d7e45d5f 100644 --- a/components/esp_websocket_client/esp_websocket_client.c +++ b/components/esp_websocket_client/esp_websocket_client.c @@ -555,9 +555,29 @@ static int esp_websocket_client_send_with_exact_opcode(esp_websocket_client_hand int wlen = 0, widx = 0; bool contained_fin = opcode & WS_TRANSPORT_OPCODES_FIN; + if (client == NULL || len < 0 || (data == NULL && len > 0)) { + ESP_LOGE(TAG, "Invalid arguments"); + return -1; + } + + if (!esp_websocket_client_is_connected(client)) { + ESP_LOGE(TAG, "Websocket client is not connected"); + return -1; + } + + if (client->transport == NULL) { + ESP_LOGE(TAG, "Invalid transport"); + return -1; + } + + if (xSemaphoreTakeRecursive(client->lock, timeout) != pdPASS) { + ESP_LOGE(TAG, "Could not lock ws-client within %" PRIu32 " timeout", timeout); + return -1; + } + if (esp_websocket_new_buf(client, true) != ESP_OK) { ESP_LOGE(TAG, "Failed to setup tx buffer"); - return -1; + goto unlock_and_return; } while (widx < len || opcode) { // allow for sending "current_opcode" only message with len==0 @@ -583,14 +603,18 @@ static int esp_websocket_client_send_with_exact_opcode(esp_websocket_client_hand esp_websocket_client_error(client, "esp_transport_write() returned %d, errno=%d", ret, errno); } esp_websocket_client_abort_connection(client, WEBSOCKET_ERROR_TYPE_TCP_TRANSPORT); - return ret; + goto unlock_and_return; } opcode = 0; widx += wlen; need_write = len - widx; } esp_websocket_free_buf(client, true); - return widx; + ret = widx; + +unlock_and_return: + xSemaphoreGiveRecursive(client->lock); + return ret; } esp_websocket_client_handle_t esp_websocket_client_init(const esp_websocket_client_config_t *config) @@ -1211,35 +1235,7 @@ int esp_websocket_client_send_fin(esp_websocket_client_handle_t client, TickType int esp_websocket_client_send_with_opcode(esp_websocket_client_handle_t client, ws_transport_opcodes_t opcode, const uint8_t *data, int len, TickType_t timeout) { - int ret = -1; - if (client == NULL || len < 0 || (data == NULL && len > 0)) { - ESP_LOGE(TAG, "Invalid arguments"); - return -1; - } - - if (xSemaphoreTakeRecursive(client->lock, timeout) != pdPASS) { - ESP_LOGE(TAG, "Could not lock ws-client within %" PRIu32 " timeout", timeout); - return -1; - } - - if (!esp_websocket_client_is_connected(client)) { - ESP_LOGE(TAG, "Websocket client is not connected"); - goto unlock_and_return; - } - - if (client->transport == NULL) { - ESP_LOGE(TAG, "Invalid transport"); - goto unlock_and_return; - } - - ret = esp_websocket_client_send_with_exact_opcode(client, opcode | WS_TRANSPORT_OPCODES_FIN, data, len, timeout); - if (ret < 0) { - ESP_LOGE(TAG, "Failed to send the buffer"); - goto unlock_and_return; - } -unlock_and_return: - xSemaphoreGiveRecursive(client->lock); - return ret; + return esp_websocket_client_send_with_exact_opcode(client, opcode | WS_TRANSPORT_OPCODES_FIN, data, len, timeout); } bool esp_websocket_client_is_connected(esp_websocket_client_handle_t client) diff --git a/components/esp_websocket_client/examples/linux/main/main.c b/components/esp_websocket_client/examples/linux/main/main.c index e52dd42fd3..6356fd9c75 100644 --- a/components/esp_websocket_client/examples/linux/main/main.c +++ b/components/esp_websocket_client/examples/linux/main/main.c @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: 2023 Espressif Systems (Shanghai) CO LTD + * SPDX-FileCopyrightText: 2023-2024 Espressif Systems (Shanghai) CO LTD * * SPDX-License-Identifier: Apache-2.0 */ @@ -96,14 +96,24 @@ static void websocket_app_start(void) vTaskDelay(1000 / portTICK_PERIOD_MS); } - ESP_LOGI(TAG, "Sending fragmented message"); - vTaskDelay(1000 / portTICK_PERIOD_MS); + // Sending text data + ESP_LOGI(TAG, "Sending fragmented text message"); memset(data, 'a', sizeof(data)); esp_websocket_client_send_text_partial(client, data, sizeof(data), portMAX_DELAY); memset(data, 'b', sizeof(data)); esp_websocket_client_send_cont_msg(client, data, sizeof(data), portMAX_DELAY); esp_websocket_client_send_fin(client, portMAX_DELAY); + vTaskDelay(1000 / portTICK_PERIOD_MS); + // Sending binary data + ESP_LOGI(TAG, "Sending fragmented binary message"); + char binary_data[128]; + memset(binary_data, 0, sizeof(binary_data)); + esp_websocket_client_send_bin_partial(client, binary_data, sizeof(binary_data), portMAX_DELAY); + memset(binary_data, 1, sizeof(binary_data)); + esp_websocket_client_send_cont_msg(client, binary_data, sizeof(binary_data), portMAX_DELAY); + esp_websocket_client_send_fin(client, portMAX_DELAY); + esp_websocket_client_destroy(client); } diff --git a/components/esp_websocket_client/examples/target/main/websocket_example.c b/components/esp_websocket_client/examples/target/main/websocket_example.c index 4024264535..112e9285c1 100644 --- a/components/esp_websocket_client/examples/target/main/websocket_example.c +++ b/components/esp_websocket_client/examples/target/main/websocket_example.c @@ -88,7 +88,9 @@ static void websocket_event_handler(void *handler_args, esp_event_base_t base, i case WEBSOCKET_EVENT_DATA: ESP_LOGI(TAG, "WEBSOCKET_EVENT_DATA"); ESP_LOGI(TAG, "Received opcode=%d", data->op_code); - if (data->op_code == 0x08 && data->data_len == 2) { + if (data->op_code == 0x2) { // Opcode 0x2 indicates binary data + ESP_LOG_BUFFER_HEX("Received binary data", data->data_ptr, data->data_len); + } else if (data->op_code == 0x08 && data->data_len == 2) { ESP_LOGW(TAG, "Received closed message with code=%d", 256 * data->data_ptr[0] + data->data_ptr[1]); } else { ESP_LOGW(TAG, "Received=%.*s\n\n", data->data_len, (char *)data->data_ptr); @@ -183,13 +185,32 @@ static void websocket_app_start(void) vTaskDelay(1000 / portTICK_PERIOD_MS); } - ESP_LOGI(TAG, "Sending fragmented message"); vTaskDelay(1000 / portTICK_PERIOD_MS); + // Sending text data + ESP_LOGI(TAG, "Sending fragmented text message"); memset(data, 'a', sizeof(data)); esp_websocket_client_send_text_partial(client, data, sizeof(data), portMAX_DELAY); memset(data, 'b', sizeof(data)); esp_websocket_client_send_cont_msg(client, data, sizeof(data), portMAX_DELAY); esp_websocket_client_send_fin(client, portMAX_DELAY); + vTaskDelay(1000 / portTICK_PERIOD_MS); + + // Sending binary data + ESP_LOGI(TAG, "Sending fragmented binary message"); + char binary_data[5]; + memset(binary_data, 0, sizeof(binary_data)); + esp_websocket_client_send_bin_partial(client, binary_data, sizeof(binary_data), portMAX_DELAY); + memset(binary_data, 1, sizeof(binary_data)); + esp_websocket_client_send_cont_msg(client, binary_data, sizeof(binary_data), portMAX_DELAY); + esp_websocket_client_send_fin(client, portMAX_DELAY); + vTaskDelay(1000 / portTICK_PERIOD_MS); + + // Sending text data longer than ws buffer (default 1024) + ESP_LOGI(TAG, "Sending text longer than ws buffer (default 1024)"); + const int size = 2000; + char *long_data = malloc(size); + memset(long_data, 'a', size); + esp_websocket_client_send_text(client, long_data, size, portMAX_DELAY); xSemaphoreTake(shutdown_sema, portMAX_DELAY); esp_websocket_client_close(client, portMAX_DELAY); diff --git a/components/esp_websocket_client/examples/target/pytest_websocket.py b/components/esp_websocket_client/examples/target/pytest_websocket.py index 8f7cff79ce..ac4c7fbddf 100644 --- a/components/esp_websocket_client/examples/target/pytest_websocket.py +++ b/components/esp_websocket_client/examples/target/pytest_websocket.py @@ -27,10 +27,13 @@ def get_my_ip(): class WebsocketTestEcho(WebSocket): - def handleMessage(self): - self.sendMessage(self.data) - print('\n Server sent: {}\n'.format(self.data)) + if isinstance(self.data, bytes): + print(f'\n Server received binary data: {self.data.hex()}\n') + self.sendMessage(self.data, binary=True) + else: + print(f'\n Server received: {self.data}\n') + self.sendMessage(self.data) def handleConnected(self): print('Connection from: {}'.format(self.address)) @@ -86,6 +89,9 @@ def test_examples_protocol_websocket(dut): 3. send and receive data """ + # Test for echo functionality: + # Sends a series of simple "hello" messages to the WebSocket server and verifies that each one is echoed back correctly. + # This tests the basic responsiveness and correctness of the WebSocket connection. def test_echo(dut): dut.expect('WEBSOCKET_EVENT_CONNECTED') for i in range(0, 5): @@ -93,12 +99,16 @@ def test_echo(dut): print('All echos received') sys.stdout.flush() + # Test for clean closure of the WebSocket connection: + # Ensures that the WebSocket can correctly receive a close frame and terminate the connection without issues. def test_close(dut): code = dut.expect( re.compile( b'websocket: Received closed message with code=(\\d*)'))[0] print('Received close frame with code {}'.format(code)) + # Test for JSON message handling: + # Sends a JSON formatted string and verifies that the received message matches the expected JSON structure. def test_json(dut, websocket): json_string = """ [ @@ -118,7 +128,7 @@ def test_json(dut, websocket): match = dut.expect( re.compile(b'Json=({[a-zA-Z0-9]*).*}')).group(0).decode()[5:] if match == str(data[0]): - print('Sent message and received message are equal \n') + print('\n Sent message and received message are equal \n') sys.stdout.flush() else: raise ValueError( @@ -126,6 +136,8 @@ def test_json(dut, websocket): \nreceived: {}\nwith length {}'.format( data[0], len(data[0]), match, len(match))) + # Test for receiving long messages: + # This sends a message with a specified length (2000 characters) to ensure the WebSocket can handle large data payloads. Repeated 3 times for reliability. def test_recv_long_msg(dut, websocket, msg_len, repeats): send_msg = ''.join( @@ -142,7 +154,7 @@ def test_recv_long_msg(dut, websocket, msg_len, repeats): recv_msg += match if recv_msg == send_msg: - print('Sent message and received message are equal \n') + print('\n Sent message and received message are equal \n') sys.stdout.flush() else: raise ValueError( @@ -150,9 +162,50 @@ def test_recv_long_msg(dut, websocket, msg_len, repeats): \nreceived: {}\nwith length {}'.format( send_msg, len(send_msg), recv_msg, len(recv_msg))) - def test_fragmented_msg(dut): + # Test for receiving the first fragment of a large message: + # Verifies the WebSocket's ability to correctly process the initial segment of a fragmented message. + def test_recv_fragmented_msg1(dut): + dut.expect('websocket: Total payload length=2000, data_len=1024, current payload offset=0') + + # Test for receiving the second fragment of a large message: + # Confirms that the WebSocket can correctly handle and process the subsequent segment of a fragmented message. + def test_recv_fragmented_msg2(dut): + dut.expect('websocket: Total payload length=2000, data_len=976, current payload offset=1024') + + # Test for receiving fragmented text messages: + # Checks if the WebSocket can accurately reconstruct a message sent in several smaller parts. + def test_fragmented_txt_msg(dut): dut.expect('Received=' + 32 * 'a' + 32 * 'b') - print('Fragmented data received') + print('\nFragmented data received\n') + + # Extract the hexdump portion of the log line + def parse_hexdump(line): + match = re.search(r'\(.*\) Received binary data: ([0-9A-Fa-f ]+)', line) + if match: + hexdump = match.group(1).strip().replace(' ', '') + # Convert the hexdump string to a bytearray + return bytearray.fromhex(hexdump) + return bytearray() + + # Capture the binary log output from the DUT + def test_fragmented_binary_msg(dut): + match = dut.expect(r'\(.*\) Received binary data: .*') + if match: + line = match.group(0).strip() + if isinstance(line, bytes): + line = line.decode('utf-8') + + # Parse the hexdump from the log line + received_data = parse_hexdump(line) + + # Create the expected bytearray with the specified pattern + expected_data = bytearray([0, 0, 0, 0, 0, 1, 1, 1, 1, 1]) + + # Validate the received data + assert received_data == expected_data, f'Received data does not match expected data. Received: {received_data}, Expected: {expected_data}' + print('\nFragmented data received\n') + else: + assert False, 'Log line with binary data not found' # Starting of the test try: @@ -184,10 +237,12 @@ def test_fragmented_msg(dut): dut.expect('Please enter uri of websocket endpoint', timeout=30) dut.write(uri) test_echo(dut) - # Message length should exceed DUT's buffer size to test fragmentation, default is 1024 byte test_recv_long_msg(dut, ws, 2000, 3) test_json(dut, ws) - test_fragmented_msg(dut) + test_fragmented_txt_msg(dut) + test_fragmented_binary_msg(dut) + test_recv_fragmented_msg1(dut) + test_recv_fragmented_msg2(dut) test_close(dut) else: print('DUT connecting to {}'.format(uri)) diff --git a/components/esp_websocket_client/examples/target/sdkconfig.ci.dynamic_buffer b/components/esp_websocket_client/examples/target/sdkconfig.ci.dynamic_buffer new file mode 100644 index 0000000000..f74b16dac9 --- /dev/null +++ b/components/esp_websocket_client/examples/target/sdkconfig.ci.dynamic_buffer @@ -0,0 +1,14 @@ +CONFIG_IDF_TARGET="esp32" +CONFIG_IDF_TARGET_LINUX=n +CONFIG_WEBSOCKET_URI_FROM_STDIN=n +CONFIG_WEBSOCKET_URI_FROM_STRING=y +CONFIG_EXAMPLE_CONNECT_ETHERNET=y +CONFIG_EXAMPLE_CONNECT_WIFI=n +CONFIG_EXAMPLE_USE_INTERNAL_ETHERNET=y +CONFIG_EXAMPLE_ETH_PHY_IP101=y +CONFIG_EXAMPLE_ETH_MDC_GPIO=23 +CONFIG_EXAMPLE_ETH_MDIO_GPIO=18 +CONFIG_EXAMPLE_ETH_PHY_RST_GPIO=5 +CONFIG_EXAMPLE_ETH_PHY_ADDR=1 +CONFIG_EXAMPLE_CONNECT_IPV6=y +CONFIG_ESP_WS_CLIENT_ENABLE_DYNAMIC_BUFFER=y