Skip to content

Commit

Permalink
fix(websocket): Fix locking issues of `esp_websocket_client_send_with…
Browse files Browse the repository at this point in the history
…_exact_opcode` API

Extended examples to cover more cases
Added new config CONFIG_ESP_WS_CLIENT_ENABLE_DYNAMIC_BUFFER for testing
  • Loading branch information
suren-gabrielyan-espressif committed Apr 17, 2024
1 parent bd6e120 commit 659ffa9
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 43 deletions.
60 changes: 28 additions & 32 deletions components/esp_websocket_client/esp_websocket_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -552,9 +552,29 @@ static int esp_websocket_client_send_with_exact_opcode(esp_websocket_client_hand
int wlen = 0, widx = 0;
bool contained_fin = opcode & WS_TRANSPORT_OPCODES_FIN;

if (client == NULL || len < 0 || (data == NULL && len > 0)) {
ESP_LOGE(TAG, "Invalid arguments");
return -1;
}

if (!esp_websocket_client_is_connected(client)) {
ESP_LOGE(TAG, "Websocket client is not connected");
return -1;
}

if (client->transport == NULL) {
ESP_LOGE(TAG, "Invalid transport");
return -1;
}

if (xSemaphoreTakeRecursive(client->lock, timeout) != pdPASS) {
ESP_LOGE(TAG, "Could not lock ws-client within %" PRIu32 " timeout", timeout);
return -1;
}

if (esp_websocket_new_buf(client, true) != ESP_OK) {
ESP_LOGE(TAG, "Failed to setup tx buffer");
return -1;
goto unlock_and_return;
}

while (widx < len || opcode) { // allow for sending "current_opcode" only message with len==0
Expand All @@ -580,14 +600,18 @@ static int esp_websocket_client_send_with_exact_opcode(esp_websocket_client_hand
esp_websocket_client_error(client, "esp_transport_write() returned %d, errno=%d", ret, errno);
}
esp_websocket_client_abort_connection(client, WEBSOCKET_ERROR_TYPE_TCP_TRANSPORT);
return ret;
goto unlock_and_return;
}
opcode = 0;
widx += wlen;
need_write = len - widx;
}
esp_websocket_free_buf(client, true);
return widx;
ret = widx;

unlock_and_return:
xSemaphoreGiveRecursive(client->lock);
return ret;
}

esp_websocket_client_handle_t esp_websocket_client_init(const esp_websocket_client_config_t *config)
Expand Down Expand Up @@ -1208,35 +1232,7 @@ int esp_websocket_client_send_fin(esp_websocket_client_handle_t client, TickType

int esp_websocket_client_send_with_opcode(esp_websocket_client_handle_t client, ws_transport_opcodes_t opcode, const uint8_t *data, int len, TickType_t timeout)
{
int ret = -1;
if (client == NULL || len < 0 || (data == NULL && len > 0)) {
ESP_LOGE(TAG, "Invalid arguments");
return -1;
}

if (xSemaphoreTakeRecursive(client->lock, timeout) != pdPASS) {
ESP_LOGE(TAG, "Could not lock ws-client within %" PRIu32 " timeout", timeout);
return -1;
}

if (!esp_websocket_client_is_connected(client)) {
ESP_LOGE(TAG, "Websocket client is not connected");
goto unlock_and_return;
}

if (client->transport == NULL) {
ESP_LOGE(TAG, "Invalid transport");
goto unlock_and_return;
}

ret = esp_websocket_client_send_with_exact_opcode(client, opcode | WS_TRANSPORT_OPCODES_FIN, data, len, timeout);
if (ret < 0) {
ESP_LOGE(TAG, "Failed to send the buffer");
goto unlock_and_return;
}
unlock_and_return:
xSemaphoreGiveRecursive(client->lock);
return ret;
return esp_websocket_client_send_with_exact_opcode(client, opcode | WS_TRANSPORT_OPCODES_FIN, data, len, timeout);
}

bool esp_websocket_client_is_connected(esp_websocket_client_handle_t client)
Expand Down
16 changes: 13 additions & 3 deletions components/esp_websocket_client/examples/linux/main/main.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: 2023 Espressif Systems (Shanghai) CO LTD
* SPDX-FileCopyrightText: 2023-2024 Espressif Systems (Shanghai) CO LTD
*
* SPDX-License-Identifier: Apache-2.0
*/
Expand Down Expand Up @@ -96,14 +96,24 @@ static void websocket_app_start(void)
vTaskDelay(1000 / portTICK_PERIOD_MS);
}

ESP_LOGI(TAG, "Sending fragmented message");
vTaskDelay(1000 / portTICK_PERIOD_MS);
// Sending text data
ESP_LOGI(TAG, "Sending fragmented text message");
memset(data, 'a', sizeof(data));
esp_websocket_client_send_text_partial(client, data, sizeof(data), portMAX_DELAY);
memset(data, 'b', sizeof(data));
esp_websocket_client_send_cont_msg(client, data, sizeof(data), portMAX_DELAY);
esp_websocket_client_send_fin(client, portMAX_DELAY);

vTaskDelay(1000 / portTICK_PERIOD_MS);
// Sending binary data
ESP_LOGI(TAG, "Sending fragmented binary message");
char binary_data[128];
memset(binary_data, 0, sizeof(binary_data));
esp_websocket_client_send_bin_partial(client, binary_data, sizeof(binary_data), portMAX_DELAY);
memset(binary_data, 1, sizeof(binary_data));
esp_websocket_client_send_cont_msg(client, binary_data, sizeof(binary_data), portMAX_DELAY);
esp_websocket_client_send_fin(client, portMAX_DELAY);

esp_websocket_client_destroy(client);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,31 @@ static void websocket_app_start(void)
vTaskDelay(1000 / portTICK_PERIOD_MS);
}

ESP_LOGI(TAG, "Sending fragmented message");
vTaskDelay(1000 / portTICK_PERIOD_MS);
// Sending text data
ESP_LOGI(TAG, "Sending fragmented text message");
memset(data, 'a', sizeof(data));
esp_websocket_client_send_text_partial(client, data, sizeof(data), portMAX_DELAY);
memset(data, 'b', sizeof(data));
esp_websocket_client_send_cont_msg(client, data, sizeof(data), portMAX_DELAY);
esp_websocket_client_send_fin(client, portMAX_DELAY);
vTaskDelay(1000 / portTICK_PERIOD_MS);

// Sending binary data
ESP_LOGI(TAG, "Sending fragmented binary message");
char binary_data[128];
memset(binary_data, 0, sizeof(binary_data));
esp_websocket_client_send_bin_partial(client, binary_data, sizeof(binary_data), portMAX_DELAY);
memset(binary_data, 1, sizeof(binary_data));
esp_websocket_client_send_cont_msg(client, binary_data, sizeof(binary_data), portMAX_DELAY);
esp_websocket_client_send_fin(client, portMAX_DELAY);
vTaskDelay(1000 / portTICK_PERIOD_MS);

// Sending text data longer than ws buffer (default 1024)
ESP_LOGI(TAG, "Sending text longer than ws buffer (default 1024)");
const int size = 2000;
char *long_data = malloc(size);
memset(long_data, 'a', size);
esp_websocket_client_send_text(client, long_data, size, portMAX_DELAY);

xSemaphoreTake(shutdown_sema, portMAX_DELAY);
esp_websocket_client_close(client, portMAX_DELAY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,18 @@ def get_my_ip():
return IP


class WebsocketTestEcho(WebSocket):
def is_binary_strict(data):
return all(x in (0, 1) for x in data)


class WebsocketTestEcho(WebSocket):
def handleMessage(self):
self.sendMessage(self.data)
print('\n Server sent: {}\n'.format(self.data))
if is_binary_strict(self.data):
self.sendMessage(self.data.hex())
print('\n Server sent: {}\n'.format(self.data.hex()))
else:
self.sendMessage(self.data)
print('\n Server sent: {}\n'.format(self.data))

def handleConnected(self):
print('Connection from: {}'.format(self.address))
Expand Down Expand Up @@ -86,19 +93,26 @@ def test_examples_protocol_websocket(dut):
3. send and receive data
"""

# Test for echo functionality:
# Sends a series of simple "hello" messages to the WebSocket server and verifies that each one is echoed back correctly.
# This tests the basic responsiveness and correctness of the WebSocket connection.
def test_echo(dut):
dut.expect('WEBSOCKET_EVENT_CONNECTED')
for i in range(0, 5):
dut.expect(re.compile(b'Received=hello (\\d)'))
print('All echos received')
sys.stdout.flush()

# Test for clean closure of the WebSocket connection:
# Ensures that the WebSocket can correctly receive a close frame and terminate the connection without issues.
def test_close(dut):
code = dut.expect(
re.compile(
b'websocket: Received closed message with code=(\\d*)'))[0]
print('Received close frame with code {}'.format(code))

# Test for JSON message handling:
# Sends a JSON formatted string and verifies that the received message matches the expected JSON structure.
def test_json(dut, websocket):
json_string = """
[
Expand Down Expand Up @@ -126,6 +140,8 @@ def test_json(dut, websocket):
\nreceived: {}\nwith length {}'.format(
data[0], len(data[0]), match, len(match)))

# Test for receiving long messages:
# This sends a message with a specified length (2000 characters) to ensure the WebSocket can handle large data payloads. Repeated 3 times for reliability.
def test_recv_long_msg(dut, websocket, msg_len, repeats):

send_msg = ''.join(
Expand All @@ -150,10 +166,30 @@ def test_recv_long_msg(dut, websocket, msg_len, repeats):
\nreceived: {}\nwith length {}'.format(
send_msg, len(send_msg), recv_msg, len(recv_msg)))

def test_fragmented_msg(dut):
# Test for receiving the first fragment of a large message:
# Verifies the WebSocket's ability to correctly process the initial segment of a fragmented message.
def test_recv_fragmented_msg1(dut):
dut.expect('websocket: Total payload length=2000, data_len=1024, current payload offset=0')

# Test for receiving the second fragment of a large message:
# Confirms that the WebSocket can correctly handle and process the subsequent segment of a fragmented message.
def test_recv_fragmented_msg2(dut):
dut.expect('websocket: Total payload length=2000, data_len=976, current payload offset=1024')

# Test for receiving fragmented text messages:
# Checks if the WebSocket can accurately reconstruct a message sent in several smaller parts.
def test_fragmented_txt_msg(dut):
dut.expect('Received=' + 32 * 'a' + 32 * 'b')
print('Fragmented data received')

# Test for receiving fragmented binary messages:
# Ensures the WebSocket can handle and reconstruct binary data sent in fragments correctly.
def test_fragmented_binary_msg(dut):
# Creating the bytearray with specified pattern
data = bytearray([0] * 128 + [1] * 128).hex()
dut.expect('Received=' + data)
print('Fragmented data received')

# Starting of the test
try:
if dut.app.sdkconfig.get('WEBSOCKET_URI_FROM_STDIN') is True:
Expand Down Expand Up @@ -184,10 +220,12 @@ def test_fragmented_msg(dut):
dut.expect('Please enter uri of websocket endpoint', timeout=30)
dut.write(uri)
test_echo(dut)
# Message length should exceed DUT's buffer size to test fragmentation, default is 1024 byte
test_recv_long_msg(dut, ws, 2000, 3)
test_json(dut, ws)
test_fragmented_msg(dut)
test_fragmented_txt_msg(dut)
test_fragmented_binary_msg(dut)
test_recv_fragmented_msg1(dut)
test_recv_fragmented_msg2(dut)
test_close(dut)
else:
print('DUT connecting to {}'.format(uri))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CONFIG_IDF_TARGET="esp32"
CONFIG_IDF_TARGET_LINUX=n
CONFIG_WEBSOCKET_URI_FROM_STDIN=n
CONFIG_WEBSOCKET_URI_FROM_STRING=y
CONFIG_EXAMPLE_CONNECT_ETHERNET=y
CONFIG_EXAMPLE_CONNECT_WIFI=n
CONFIG_EXAMPLE_USE_INTERNAL_ETHERNET=y
CONFIG_EXAMPLE_ETH_PHY_IP101=y
CONFIG_EXAMPLE_ETH_MDC_GPIO=23
CONFIG_EXAMPLE_ETH_MDIO_GPIO=18
CONFIG_EXAMPLE_ETH_PHY_RST_GPIO=5
CONFIG_EXAMPLE_ETH_PHY_ADDR=1
CONFIG_EXAMPLE_CONNECT_IPV6=y
CONFIG_ESP_WS_CLIENT_ENABLE_DYNAMIC_BUFFER=y

0 comments on commit 659ffa9

Please sign in to comment.