Skip to content

Commit

Permalink
fix(uart_vfs): read() now aligned to POSIX defined behavior
Browse files Browse the repository at this point in the history
- For blocking mode, block until data available
- Return with the bytes available in the file at the time,
  it should not block until reaching the requested size

And read() should not realy return on the newline character
Closes #14155
  • Loading branch information
songruo committed Dec 5, 2024
1 parent 0b2e6d6 commit 233ab81
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 43 deletions.
118 changes: 105 additions & 13 deletions components/vfs/test_apps/main/test_vfs_uart.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: 2015-2023 Espressif Systems (Shanghai) CO LTD
* SPDX-FileCopyrightText: 2015-2024 Espressif Systems (Shanghai) CO LTD
*
* SPDX-License-Identifier: Apache-2.0
*/
Expand Down Expand Up @@ -75,6 +75,12 @@ TEST_CASE("CRs are removed from the stdin correctly", "[vfs]")
esp_vfs_dev_uart_port_set_tx_line_endings(CONFIG_ESP_CONSOLE_UART_NUM, ESP_LINE_ENDINGS_CRLF);

flush_stdin_stdout();

// A test case with no use of uart driver
// For non-uart-driver-involved uart vfs, all reads are non-blocking
// If no data at the moment, read() returns directly;
// If there is data available at the moment, read() also returns directly with the currently available size

const char* send_str = "1234567890\n\r123\r\n4\n";
/* with CONFIG_NEWLIB_STDOUT_ADDCR, the following will be sent on the wire.
* (last character of each part is marked with a hat)
Expand Down Expand Up @@ -134,30 +140,46 @@ struct read_task_arg_t {

struct write_task_arg_t {
const char* str;
size_t str_len;
SemaphoreHandle_t done;
};

static void read_task_fn(void* varg)
static void read_blocking_task_fn(void* varg)
{
struct read_task_arg_t* parg = (struct read_task_arg_t*) varg;
parg->out_buffer[0] = 0;
memset(parg->out_buffer, 0, parg->out_buffer_len);

fgets(parg->out_buffer, parg->out_buffer_len, stdin);
xSemaphoreGive(parg->done);
vTaskDelete(NULL);
}

static void read_non_blocking_task_fn(void* varg)
{
struct read_task_arg_t* parg = (struct read_task_arg_t*) varg;
memset(parg->out_buffer, 0, parg->out_buffer_len);
char *ptr = parg->out_buffer;

while (fgets(ptr, parg->out_buffer_len, stdin) != NULL) {
while (*ptr != 0) {
ptr++;
}
}
xSemaphoreGive(parg->done);
vTaskDelete(NULL);
}

static void write_task_fn(void* varg)
{
struct write_task_arg_t* parg = (struct write_task_arg_t*) varg;
fwrite_str_loopback(parg->str, strlen(parg->str));
fwrite_str_loopback(parg->str, parg->str_len);
xSemaphoreGive(parg->done);
vTaskDelete(NULL);
}

TEST_CASE("can write to UART while another task is reading", "[vfs]")
TEST_CASE("read with uart driver (blocking)", "[vfs]")
{
char out_buffer[32];
char out_buffer[32] = {};
size_t out_buffer_len = sizeof(out_buffer);

struct read_task_arg_t read_arg = {
Expand All @@ -166,9 +188,13 @@ TEST_CASE("can write to UART while another task is reading", "[vfs]")
.done = xSemaphoreCreateBinary()
};

// Send a string with length less than the read requested length
const char in_buffer[] = "!(@*#&(!*@&#((SDasdkjhadsl\n";
size_t in_buffer_len = sizeof(in_buffer);
struct write_task_arg_t write_arg = {
.str = "!(@*#&(!*@&#((SDasdkjhadsl\n",
.done = xSemaphoreCreateBinary()
.str = in_buffer,
.str_len = in_buffer_len,
.done = xSemaphoreCreateBinary()
};

flush_stdin_stdout();
Expand All @@ -178,15 +204,18 @@ TEST_CASE("can write to UART while another task is reading", "[vfs]")
esp_vfs_dev_uart_use_driver(CONFIG_ESP_CONSOLE_UART_NUM);


xTaskCreate(&read_task_fn, "vfs_read", 4096, &read_arg, 5, NULL);
vTaskDelay(10);
xTaskCreate(&write_task_fn, "vfs_write", 4096, &write_arg, 6, NULL);
// Start the read task first, it will block until data incoming
xTaskCreate(&read_blocking_task_fn, "vfs_read", 4096, &read_arg, 5, NULL);

int res = xSemaphoreTake(read_arg.done, 100 / portTICK_PERIOD_MS);
TEST_ASSERT_FALSE(res);

xTaskCreate(&write_task_fn, "vfs_write", 4096, &write_arg, 6, NULL);

int res = xSemaphoreTake(write_arg.done, 100 / portTICK_PERIOD_MS);
res = xSemaphoreTake(write_arg.done, 100 / portTICK_PERIOD_MS);
TEST_ASSERT(res);

res = xSemaphoreTake(read_arg.done, 100 / portTICK_PERIOD_MS);
res = xSemaphoreTake(read_arg.done, 100 / portTICK_PERIOD_MS); // read() returns with currently available size
TEST_ASSERT(res);

TEST_ASSERT_EQUAL(0, strcmp(write_arg.str, read_arg.out_buffer));
Expand All @@ -197,6 +226,69 @@ TEST_CASE("can write to UART while another task is reading", "[vfs]")
vSemaphoreDelete(write_arg.done);
}

TEST_CASE("read with uart driver (non-blocking)", "[vfs]")
{
char out_buffer[32] = {};
size_t out_buffer_len = sizeof(out_buffer);

struct read_task_arg_t read_arg = {
.out_buffer = out_buffer,
.out_buffer_len = out_buffer_len,
.done = xSemaphoreCreateBinary()
};

// Send a string with length less than the read requested length
const char in_buffer[] = "!(@*#&(!*@&#((SDasdkjhad\nce"; // read should not early return on \n
size_t in_buffer_len = sizeof(in_buffer);
struct write_task_arg_t write_arg = {
.str = in_buffer,
.str_len = in_buffer_len,
.done = xSemaphoreCreateBinary()
};

flush_stdin_stdout();

ESP_ERROR_CHECK(uart_driver_install(CONFIG_ESP_CONSOLE_UART_NUM,
256, 0, 0, NULL, 0));
esp_vfs_dev_uart_use_driver(CONFIG_ESP_CONSOLE_UART_NUM);

esp_vfs_dev_uart_port_set_rx_line_endings(CONFIG_ESP_CONSOLE_UART_NUM, ESP_LINE_ENDINGS_LF);
esp_vfs_dev_uart_port_set_tx_line_endings(CONFIG_ESP_CONSOLE_UART_NUM, ESP_LINE_ENDINGS_LF);

int flags = fcntl(STDIN_FILENO, F_GETFL, 0);
fcntl(STDIN_FILENO, F_SETFL, flags | O_NONBLOCK);

// If start the read task first, it will return immediately
xTaskCreate(&read_non_blocking_task_fn, "vfs_read", 4096, &read_arg, 5, NULL);

int res = xSemaphoreTake(read_arg.done, 100 / portTICK_PERIOD_MS);
TEST_ASSERT(res);

xTaskCreate(&write_task_fn, "vfs_write", 4096, &write_arg, 6, NULL);
vTaskDelay(10);
xTaskCreate(&read_non_blocking_task_fn, "vfs_read", 4096, &read_arg, 5, NULL);

res = xSemaphoreTake(write_arg.done, 100 / portTICK_PERIOD_MS);
TEST_ASSERT(res);

res = xSemaphoreTake(read_arg.done, 1000 / portTICK_PERIOD_MS); // read() returns with currently available size
TEST_ASSERT(res);

// string compare
for (int i = 0; i < in_buffer_len; i++) {
TEST_ASSERT_EQUAL(in_buffer[i], out_buffer[i]);
}

esp_vfs_dev_uart_use_nonblocking(CONFIG_ESP_CONSOLE_UART_NUM);
fcntl(STDIN_FILENO, F_SETFL, flags);
esp_vfs_dev_uart_port_set_rx_line_endings(CONFIG_ESP_CONSOLE_UART_NUM, ESP_LINE_ENDINGS_CRLF);
esp_vfs_dev_uart_port_set_tx_line_endings(CONFIG_ESP_CONSOLE_UART_NUM, ESP_LINE_ENDINGS_CRLF);
uart_driver_delete(CONFIG_ESP_CONSOLE_UART_NUM);
vSemaphoreDelete(read_arg.done);
vSemaphoreDelete(write_arg.done);
vTaskDelay(2); // wait for tasks to exit
}

TEST_CASE("fcntl supported in UART VFS", "[vfs]")
{
int flags = fcntl(STDIN_FILENO, F_GETFL, 0);
Expand Down
109 changes: 79 additions & 30 deletions components/vfs/vfs_uart.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,18 @@
typedef void (*tx_func_t)(int, int);
// UART read bytes function type
typedef int (*rx_func_t)(int);
// UART get available received bytes function type
typedef size_t (*get_available_data_len_func_t)(int);

// Basic functions for sending and receiving bytes over UART
// Basic functions for sending, receiving bytes, and get available data length over UART
static void uart_tx_char(int fd, int c);
static int uart_rx_char(int fd);
static size_t uart_get_avail_data_len(int fd);

// Functions for sending and receiving bytes which use UART driver
// Functions for sending, receiving bytes, and get available data length which use UART driver
static void uart_tx_char_via_driver(int fd, int c);
static int uart_rx_char_via_driver(int fd);
static size_t uart_get_avail_data_len_via_driver(int fd);

typedef struct {
// Pointers to UART peripherals
Expand All @@ -77,6 +81,8 @@ typedef struct {
tx_func_t tx_func;
// Functions used to read bytes from UART. Default to "basic" functions.
rx_func_t rx_func;
// Function used to get available data bytes from UART. Default to "basic" functions.
get_available_data_len_func_t get_avail_data_len_func;
} vfs_uart_context_t;

#define VFS_CTX_DEFAULT_VAL(uart_dev) (vfs_uart_context_t) {\
Expand All @@ -86,6 +92,7 @@ typedef struct {
.rx_mode = DEFAULT_RX_MODE,\
.tx_func = uart_tx_char,\
.rx_func = uart_rx_char,\
.get_avail_data_len_func = uart_get_avail_data_len,\
}

//If the context should be dynamically initialized, remove this structure
Expand Down Expand Up @@ -157,6 +164,19 @@ static int uart_open(const char *path, int flags, int mode)
return fd;
}

size_t uart_get_avail_data_len(int fd)
{
uart_dev_t* uart = s_ctx[fd]->uart;
return uart_ll_get_rxfifo_len(uart);
}

size_t uart_get_avail_data_len_via_driver(int fd)
{
size_t buffered_size = 0;
uart_get_buffered_data_len(fd, &buffered_size);
return buffered_size;
}

static void uart_tx_char(int fd, int c)
{
uart_dev_t* uart = s_ctx[fd]->uart;
Expand Down Expand Up @@ -248,38 +268,65 @@ static ssize_t uart_read(int fd, void* data, size_t size)
assert(fd >=0 && fd < 3);
char *data_c = (char *) data;
size_t received = 0;
size_t available_size = 0;
int c = NONE; // store the read char
_lock_acquire_recursive(&s_ctx[fd]->read_lock);
while (received < size) {
int c = uart_read_char(fd);
if (c == '\r') {
if (s_ctx[fd]->rx_mode == ESP_LINE_ENDINGS_CR) {
c = '\n';
} else if (s_ctx[fd]->rx_mode == ESP_LINE_ENDINGS_CRLF) {
/* look ahead */
int c2 = uart_read_char(fd);
if (c2 == NONE) {
/* could not look ahead, put the current character back */
uart_return_char(fd, c);
break;
}
if (c2 == '\n') {
/* this was \r\n sequence. discard \r, return \n */

if (!s_ctx[fd]->non_blocking) {
c = uart_read_char(fd); // blocking until data available for non-O_NONBLOCK mode
}

// find the actual fetch size
available_size += s_ctx[fd]->get_avail_data_len_func(fd);
if (c != NONE) {
available_size++;
}
if (s_ctx[fd]->peek_char != NONE) {
available_size++;
}
size_t fetch_size = MIN(available_size, size);

if (fetch_size > 0) {
do {
if (c == NONE) { // for non-O_NONBLOCK mode, there is already a pre-fetched char
c = uart_read_char(fd);
}
assert(c != NONE);

if (c == '\r') {
if (s_ctx[fd]->rx_mode == ESP_LINE_ENDINGS_CR) {
c = '\n';
} else {
/* \r followed by something else. put the second char back,
* it will be processed on next iteration. return \r now.
*/
uart_return_char(fd, c2);
} else if (s_ctx[fd]->rx_mode == ESP_LINE_ENDINGS_CRLF) {
/* look ahead */
int c2 = uart_read_char(fd);
fetch_size--;
if (c2 == NONE) {
/* could not look ahead, put the current character back */
uart_return_char(fd, c);
c = NONE;
break;
}
if (c2 == '\n') {
/* this was \r\n sequence. discard \r, return \n */
c = '\n';
} else {
/* \r followed by something else. put the second char back,
* it will be processed on next iteration. return \r now.
*/
uart_return_char(fd, c2);
fetch_size++;
}
}
}
} else if (c == NONE) {
break;
}
data_c[received] = (char) c;
++received;
if (c == '\n') {
break;
}

data_c[received] = (char) c;
++received;
c = NONE;
} while (received < fetch_size);
}

if (c != NONE) { // fetched, but not used
uart_return_char(fd, c);
}
_lock_release_recursive(&s_ctx[fd]->read_lock);
if (received > 0) {
Expand Down Expand Up @@ -1054,6 +1101,7 @@ void esp_vfs_dev_uart_use_nonblocking(int uart_num)
_lock_acquire_recursive(&s_ctx[uart_num]->write_lock);
s_ctx[uart_num]->tx_func = uart_tx_char;
s_ctx[uart_num]->rx_func = uart_rx_char;
s_ctx[uart_num]->get_avail_data_len_func = uart_get_avail_data_len;
_lock_release_recursive(&s_ctx[uart_num]->write_lock);
_lock_release_recursive(&s_ctx[uart_num]->read_lock);
}
Expand All @@ -1064,6 +1112,7 @@ void esp_vfs_dev_uart_use_driver(int uart_num)
_lock_acquire_recursive(&s_ctx[uart_num]->write_lock);
s_ctx[uart_num]->tx_func = uart_tx_char_via_driver;
s_ctx[uart_num]->rx_func = uart_rx_char_via_driver;
s_ctx[uart_num]->get_avail_data_len_func = uart_get_avail_data_len_via_driver;
_lock_release_recursive(&s_ctx[uart_num]->write_lock);
_lock_release_recursive(&s_ctx[uart_num]->read_lock);
}

0 comments on commit 233ab81

Please sign in to comment.