From cf014dacbbc05a7a2ba174e3cae439a7008238ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82a=C5=BCej=20Sowa?= Date: Fri, 6 Dec 2024 23:01:13 +0100 Subject: [PATCH 1/6] Fix Mbed condition variable implementation --- src/system/mbed/system.cpp | 86 +++++++++++++++++++++++++++++++++++--- 1 file changed, 80 insertions(+), 6 deletions(-) diff --git a/src/system/mbed/system.cpp b/src/system/mbed/system.cpp index 9b8378ea0..70dd5f946 100644 --- a/src/system/mbed/system.cpp +++ b/src/system/mbed/system.cpp @@ -94,26 +94,100 @@ z_result_t _z_mutex_unlock(_z_mutex_t *m) { } /*------------------ Condvar ------------------*/ -z_result_t _z_condvar_init(_z_condvar_t *cv) { return 0; } +struct condvar { + struct waiter { + Semaphore sem; + waiter *prev = nullptr; + waiter *next = nullptr; + bool in_list = false; + }; + waiter *wait_list = nullptr; +}; + +static void add_wait_list(condvar::waiter **wait_list, condvar::waiter *waiter) { + if (nullptr == *wait_list) { + *wait_list = waiter; + waiter->next = waiter; + waiter->prev = waiter; + } else { + condvar::waiter *first = *wait_list; + condvar::waiter *last = (*wait_list)->prev; + + waiter->next = first; + waiter->prev = last; + + first->prev = waiter; + last->next = waiter; + } + waiter->in_list = true; +} + +static void remove_wait_list(condvar::waiter **wait_list, condvar::waiter *waiter) { + condvar::waiter *prev = waiter->prev; + condvar::waiter *next = waiter->next; + + prev->next = waiter->next; + next->prev = waiter->prev; + *wait_list = waiter->next; + + if (*wait_list == waiter) { + *wait_list = nullptr; + } + + waiter->next = nullptr; + waiter->prev = nullptr; + waiter->in_list = false; +} + +z_result_t _z_condvar_init(_z_condvar_t *cv) { + *cv = new condvar(); + return 0; +} z_result_t _z_condvar_drop(_z_condvar_t *cv) { - delete ((ConditionVariable *)*cv); + delete ((condvar *)*cv); return 0; } z_result_t _z_condvar_signal(_z_condvar_t *cv) { - ((ConditionVariable *)*cv)->notify_one(); + auto &cond_var = *(condvar *)*cv; + + if (cond_var.wait_list != nullptr) { + cond_var.wait_list->sem.release(); + remove_wait_list(&cond_var.wait_list, cond_var.wait_list); + } + return 0; } z_result_t _z_condvar_signal_all(_z_condvar_t *cv) { - ((ConditionVariable *)*cv)->notify_all(); + auto &cond_var = *(condvar *)*cv; + + while (cond_var.wait_list != nullptr) { + cond_var.wait_list->sem.release(); + remove_wait_list(&cond_var.wait_list, cond_var.wait_list); + } + return 0; } z_result_t _z_condvar_wait(_z_condvar_t *cv, _z_mutex_t *m) { - *cv = new ConditionVariable(*((Mutex *)*m)); - ((ConditionVariable *)*cv)->wait(); + auto &cond_var = *(condvar *)*cv; + auto &mutex = *(Mutex *)*m; + + condvar::waiter current_thread; + add_wait_list(&cond_var.wait_list, ¤t_thread); + + mutex.unlock(); + + current_thread.sem.acquire(); + + mutex.lock(); + + if (current_thread.in_list) { + remove_wait_list(&cond_var.wait_list, ¤t_thread); + } + return 0; } #endif // Z_FEATURE_MULTI_THREAD == 1 From fa8b1ee05bca19c4a9fea264aff4446ceda815a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82a=C5=BCej=20Sowa?= Date: Fri, 6 Dec 2024 23:03:23 +0100 Subject: [PATCH 2/6] Implement condition variable in FreeRTOS-Plus-TCP port --- src/system/freertos_plus_tcp/system.c | 112 ++++++++++++++++++++++++-- 1 file changed, 106 insertions(+), 6 deletions(-) diff --git a/src/system/freertos_plus_tcp/system.c b/src/system/freertos_plus_tcp/system.c index 0876240fe..e397517e5 100644 --- a/src/system/freertos_plus_tcp/system.c +++ b/src/system/freertos_plus_tcp/system.c @@ -157,12 +157,112 @@ z_result_t _z_mutex_try_lock(_z_mutex_t *m) { return xSemaphoreTakeRecursive(*m, z_result_t _z_mutex_unlock(_z_mutex_t *m) { return xSemaphoreGiveRecursive(*m) == pdTRUE ? 0 : -1; } /*------------------ CondVar ------------------*/ -// Condition variables not supported in FreeRTOS -z_result_t _z_condvar_init(_z_condvar_t *cv) { return -1; } -z_result_t _z_condvar_drop(_z_condvar_t *cv) { return -1; } -z_result_t _z_condvar_signal(_z_condvar_t *cv) { return -1; } -z_result_t _z_condvar_signal_all(_z_condvar_t *cv) { return -1; } -z_result_t _z_condvar_wait(_z_condvar_t *cv, _z_mutex_t *m) { return -1; } +typedef struct waiter_t { + SemaphoreHandle_t sem; + struct waiter_t *prev; + struct waiter_t *next; + bool in_list; +} waiter_t; + +typedef struct condvar_t { + waiter_t *wait_list; +} condvar_t; + +static void add_wait_list(waiter_t **wait_list, waiter_t *waiter) { + if (*wait_list == NULL) { + *wait_list = waiter; + waiter->next = waiter; + waiter->prev = waiter; + } else { + waiter_t *first = *wait_list; + waiter_t *last = (*wait_list)->prev; + + waiter->next = first; + waiter->prev = last; + + first->prev = waiter; + last->next = waiter; + } + waiter->in_list = true; +} + +static void remove_wait_list(waiter_t **wait_list, waiter_t *waiter) { + waiter_t *prev = waiter->prev; + waiter_t *next = waiter->next; + + prev->next = waiter->next; + next->prev = waiter->prev; + *wait_list = waiter->next; + + if (*wait_list == waiter) { + *wait_list = NULL; + } + + waiter->next = NULL; + waiter->prev = NULL; + waiter->in_list = false; +} + +z_result_t _z_condvar_init(_z_condvar_t *cv) { + *cv = (condvar_t *)z_malloc(sizeof(condvar_t)); + if (*cv == NULL) { + return -1; + } + ((condvar_t *)*cv)->wait_list = NULL; + return 0; +} + +z_result_t _z_condvar_drop(_z_condvar_t *cv) { + z_free(*cv); + return 0; +} + +z_result_t _z_condvar_signal(_z_condvar_t *cv) { + condvar_t *cond_var = (condvar_t *)*cv; + + if (cond_var->wait_list != NULL) { + xSemaphoreGive(cond_var->wait_list->sem); + remove_wait_list(&cond_var->wait_list, cond_var->wait_list); + } + + return 0; +} + +z_result_t _z_condvar_signal_all(_z_condvar_t *cv) { + condvar_t *cond_var = (condvar_t *)*cv; + + while (cond_var->wait_list != NULL) { + xSemaphoreGive(cond_var->wait_list->sem); + remove_wait_list(&cond_var->wait_list, cond_var->wait_list); + } + + return 0; +} + +z_result_t _z_condvar_wait(_z_condvar_t *cv, _z_mutex_t *m) { + condvar_t *cond_var = (condvar_t *)*cv; + SemaphoreHandle_t mutex = *m; + + waiter_t current_thread; +#if (configSUPPORT_STATIC_ALLOCATION == 1) + StaticSemaphore_t current_thread_sem_buffer; + current_thread.sem = xSemaphoreCreateCountingStatic(1, 0, ¤t_thread_sem_buffer); +#else + current_thread.sem = xSemaphoreCreateCounting(1, 0); +#endif + add_wait_list(&cond_var->wait_list, ¤t_thread); + + xSemaphoreGiveRecursive(mutex); + xSemaphoreTake(current_thread.sem, portMAX_DELAY); + xSemaphoreTakeRecursive(mutex, portMAX_DELAY); + + if (current_thread.in_list) { + remove_wait_list(&cond_var->wait_list, ¤t_thread); + } + + vSemaphoreDelete(current_thread.sem); + return 0; +} #endif // Z_MULTI_THREAD == 1 /*------------------ Sleep ------------------*/ From a5e66e8b8a15110dd27037211cabde917b7da168 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82a=C5=BCej=20Sowa?= Date: Sat, 7 Dec 2024 01:35:34 +0100 Subject: [PATCH 3/6] Use counting semaphore condvar implementation in FreeRTOS-Plus-TCP port Co-Authored-By: Alexander Bushnev --- .../system/platform/freertos_plus_tcp.h | 10 +- src/system/freertos_plus_tcp/system.c | 128 +++++++----------- 2 files changed, 58 insertions(+), 80 deletions(-) diff --git a/include/zenoh-pico/system/platform/freertos_plus_tcp.h b/include/zenoh-pico/system/platform/freertos_plus_tcp.h index 31a2dfc83..97d6c1b10 100644 --- a/include/zenoh-pico/system/platform/freertos_plus_tcp.h +++ b/include/zenoh-pico/system/platform/freertos_plus_tcp.h @@ -40,7 +40,15 @@ typedef struct { } _z_task_t; typedef SemaphoreHandle_t _z_mutex_t; -typedef void *_z_condvar_t; +typedef struct { + SemaphoreHandle_t mutex; + SemaphoreHandle_t sem; + int waiters; +#if (configSUPPORT_STATIC_ALLOCATION == 1) + StaticSemaphore_t mutex_buffer; + StaticSemaphore_t sem_buffer; +#endif /* SUPPORT_STATIC_ALLOCATION */ +} _z_condvar_t; #endif // Z_MULTI_THREAD == 1 typedef TickType_t z_clock_t; diff --git a/src/system/freertos_plus_tcp/system.c b/src/system/freertos_plus_tcp/system.c index e397517e5..33adc95d1 100644 --- a/src/system/freertos_plus_tcp/system.c +++ b/src/system/freertos_plus_tcp/system.c @@ -157,111 +157,81 @@ z_result_t _z_mutex_try_lock(_z_mutex_t *m) { return xSemaphoreTakeRecursive(*m, z_result_t _z_mutex_unlock(_z_mutex_t *m) { return xSemaphoreGiveRecursive(*m) == pdTRUE ? 0 : -1; } /*------------------ CondVar ------------------*/ -typedef struct waiter_t { - SemaphoreHandle_t sem; - struct waiter_t *prev; - struct waiter_t *next; - bool in_list; -} waiter_t; - -typedef struct condvar_t { - waiter_t *wait_list; -} condvar_t; - -static void add_wait_list(waiter_t **wait_list, waiter_t *waiter) { - if (*wait_list == NULL) { - *wait_list = waiter; - waiter->next = waiter; - waiter->prev = waiter; - } else { - waiter_t *first = *wait_list; - waiter_t *last = (*wait_list)->prev; - - waiter->next = first; - waiter->prev = last; - - first->prev = waiter; - last->next = waiter; - } - waiter->in_list = true; -} - -static void remove_wait_list(waiter_t **wait_list, waiter_t *waiter) { - waiter_t *prev = waiter->prev; - waiter_t *next = waiter->next; - - prev->next = waiter->next; - next->prev = waiter->prev; - *wait_list = waiter->next; - - if (*wait_list == waiter) { - *wait_list = NULL; +z_result_t _z_condvar_init(_z_condvar_t *cv) { + if (!cv) { + return _Z_ERR_GENERIC; } - waiter->next = NULL; - waiter->prev = NULL; - waiter->in_list = false; -} +#if (configSUPPORT_STATIC_ALLOCATION == 1) + cv->mutex = xSemaphoreCreateRecursiveMutexStatic(&cv->mutex_buffer); + cv->sem = xSemaphoreCreateCountingStatic((UBaseType_t)(~0), 0, &cv->sem_buffer); +#else + cv->mutex = xSemaphoreCreateMutex(); + cv->sem = xSemaphoreCreateCounting((UBaseType_t)(~0), 0); +#endif /* SUPPORT_STATIC_ALLOCATION */ + cv->waiters = 0; -z_result_t _z_condvar_init(_z_condvar_t *cv) { - *cv = (condvar_t *)z_malloc(sizeof(condvar_t)); - if (*cv == NULL) { - return -1; + if (!cv->mutex || !cv->sem) { + return _Z_ERR_GENERIC; } - ((condvar_t *)*cv)->wait_list = NULL; - return 0; + return _Z_RES_OK; } z_result_t _z_condvar_drop(_z_condvar_t *cv) { - z_free(*cv); - return 0; + if (!cv) { + return _Z_ERR_GENERIC; + } + vSemaphoreDelete(cv->sem); + vSemaphoreDelete(cv->mutex); + return _Z_RES_OK; } z_result_t _z_condvar_signal(_z_condvar_t *cv) { - condvar_t *cond_var = (condvar_t *)*cv; + if (!cv) { + return _Z_ERR_GENERIC; + } - if (cond_var->wait_list != NULL) { - xSemaphoreGive(cond_var->wait_list->sem); - remove_wait_list(&cond_var->wait_list, cond_var->wait_list); + xSemaphoreTake(cv->mutex, portMAX_DELAY); + if (cv->waiters > 0) { + xSemaphoreGive(cv->sem); + cv->waiters--; } + xSemaphoreGive(cv->mutex); - return 0; + return _Z_RES_OK; } z_result_t _z_condvar_signal_all(_z_condvar_t *cv) { - condvar_t *cond_var = (condvar_t *)*cv; + if (!cv) { + return _Z_ERR_GENERIC; + } - while (cond_var->wait_list != NULL) { - xSemaphoreGive(cond_var->wait_list->sem); - remove_wait_list(&cond_var->wait_list, cond_var->wait_list); + xSemaphoreTake(cv->mutex, portMAX_DELAY); + while (cv->waiters > 0) { + xSemaphoreGive(cv->sem); + cv->waiters--; } + xSemaphoreGive(cv->mutex); - return 0; + return _Z_RES_OK; } z_result_t _z_condvar_wait(_z_condvar_t *cv, _z_mutex_t *m) { - condvar_t *cond_var = (condvar_t *)*cv; - SemaphoreHandle_t mutex = *m; + if (!cv || !m) { + return _Z_ERR_GENERIC; + } - waiter_t current_thread; -#if (configSUPPORT_STATIC_ALLOCATION == 1) - StaticSemaphore_t current_thread_sem_buffer; - current_thread.sem = xSemaphoreCreateCountingStatic(1, 0, ¤t_thread_sem_buffer); -#else - current_thread.sem = xSemaphoreCreateCounting(1, 0); -#endif - add_wait_list(&cond_var->wait_list, ¤t_thread); + xSemaphoreTake(cv->mutex, portMAX_DELAY); + cv->waiters++; + xSemaphoreGive(cv->mutex); - xSemaphoreGiveRecursive(mutex); - xSemaphoreTake(current_thread.sem, portMAX_DELAY); - xSemaphoreTakeRecursive(mutex, portMAX_DELAY); + _z_mutex_unlock(m); - if (current_thread.in_list) { - remove_wait_list(&cond_var->wait_list, ¤t_thread); - } + xSemaphoreTake(cv->sem, portMAX_DELAY); - vSemaphoreDelete(current_thread.sem); - return 0; + _z_mutex_lock(m); + + return _Z_RES_OK; } #endif // Z_MULTI_THREAD == 1 From 175c9a5d5f201ba06a1c41c3d43ed722bff62734 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82a=C5=BCej=20Sowa?= Date: Sat, 7 Dec 2024 02:02:36 +0100 Subject: [PATCH 4/6] Use counting semaphore condvar implementation in Mbed port --- src/system/mbed/system.cpp | 101 +++++++++++++++---------------------- 1 file changed, 41 insertions(+), 60 deletions(-) diff --git a/src/system/mbed/system.cpp b/src/system/mbed/system.cpp index 70dd5f946..871048bfa 100644 --- a/src/system/mbed/system.cpp +++ b/src/system/mbed/system.cpp @@ -95,100 +95,81 @@ z_result_t _z_mutex_unlock(_z_mutex_t *m) { /*------------------ Condvar ------------------*/ struct condvar { - struct waiter { - Semaphore sem; - waiter *prev = nullptr; - waiter *next = nullptr; - bool in_list = false; - }; - waiter *wait_list = nullptr; + Mutex mutex; + Semaphore sem{0}; + int waiters{0}; }; -static void add_wait_list(condvar::waiter **wait_list, condvar::waiter *waiter) { - if (nullptr == *wait_list) { - *wait_list = waiter; - waiter->next = waiter; - waiter->prev = waiter; - } else { - condvar::waiter *first = *wait_list; - condvar::waiter *last = (*wait_list)->prev; - - waiter->next = first; - waiter->prev = last; - - first->prev = waiter; - last->next = waiter; - } - waiter->in_list = true; -} - -static void remove_wait_list(condvar::waiter **wait_list, condvar::waiter *waiter) { - condvar::waiter *prev = waiter->prev; - condvar::waiter *next = waiter->next; - - prev->next = waiter->next; - next->prev = waiter->prev; - *wait_list = waiter->next; - - if (*wait_list == waiter) { - *wait_list = nullptr; +z_result_t _z_condvar_init(_z_condvar_t *cv) { + if (!cv) { + return _Z_ERR_GENERIC; } - waiter->next = nullptr; - waiter->prev = nullptr; - waiter->in_list = false; -} - -z_result_t _z_condvar_init(_z_condvar_t *cv) { *cv = new condvar(); return 0; } z_result_t _z_condvar_drop(_z_condvar_t *cv) { + if (!cv) { + return _Z_ERR_GENERIC; + } + delete ((condvar *)*cv); return 0; } z_result_t _z_condvar_signal(_z_condvar_t *cv) { + if (!cv) { + return _Z_ERR_GENERIC; + } + auto &cond_var = *(condvar *)*cv; - if (cond_var.wait_list != nullptr) { - cond_var.wait_list->sem.release(); - remove_wait_list(&cond_var.wait_list, cond_var.wait_list); + cond_var.mutex.lock(); + if (cond_var.waiters > 0) { + cond_var.sem.release(); + cond_var.waiters--; } + cond_var.mutex.unlock(); - return 0; + return _Z_RES_OK; } z_result_t _z_condvar_signal_all(_z_condvar_t *cv) { + if (!cv) { + return _Z_ERR_GENERIC; + } + auto &cond_var = *(condvar *)*cv; - while (cond_var.wait_list != nullptr) { - cond_var.wait_list->sem.release(); - remove_wait_list(&cond_var.wait_list, cond_var.wait_list); + cond_var.mutex.lock(); + while (cond_var.waiters > 0) { + cond_var.sem.release(); + cond_var.waiters--; } + cond_var.mutex.unlock(); - return 0; + return _z_RES_OK; } z_result_t _z_condvar_wait(_z_condvar_t *cv, _z_mutex_t *m) { - auto &cond_var = *(condvar *)*cv; - auto &mutex = *(Mutex *)*m; + if (!cv || !m) { + return _Z_ERR_GENERIC; + } - condvar::waiter current_thread; - add_wait_list(&cond_var.wait_list, ¤t_thread); + auto &cond_var = *(condvar *)*cv; - mutex.unlock(); + cond_var.mutex.lock(); + cond_var.waiters++; + cond_var.mutex.unlock(); - current_thread.sem.acquire(); + _z_mutex_unlock(m); - mutex.lock(); + cond_var.sem.acquire(); - if (current_thread.in_list) { - remove_wait_list(&cond_var.wait_list, ¤t_thread); - } + _z_mutex_lock(m); - return 0; + return _Z_RES_OK; } #endif // Z_FEATURE_MULTI_THREAD == 1 From ba96a810c7015df253bac3fc1fc380dbfd1ee5f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82a=C5=BCej=20Sowa?= Date: Sat, 7 Dec 2024 02:12:58 +0100 Subject: [PATCH 5/6] Use normal mutex instead of recursive --- src/system/freertos_plus_tcp/system.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/system/freertos_plus_tcp/system.c b/src/system/freertos_plus_tcp/system.c index 33adc95d1..395e6bc59 100644 --- a/src/system/freertos_plus_tcp/system.c +++ b/src/system/freertos_plus_tcp/system.c @@ -163,7 +163,7 @@ z_result_t _z_condvar_init(_z_condvar_t *cv) { } #if (configSUPPORT_STATIC_ALLOCATION == 1) - cv->mutex = xSemaphoreCreateRecursiveMutexStatic(&cv->mutex_buffer); + cv->mutex = xSemaphoreCreateMutexStatic(&cv->mutex_buffer); cv->sem = xSemaphoreCreateCountingStatic((UBaseType_t)(~0), 0, &cv->sem_buffer); #else cv->mutex = xSemaphoreCreateMutex(); From 9b2bc049b7a692f078a8a77ffdd0cdfc742b6bf6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82a=C5=BCej=20Sowa?= Date: Sat, 7 Dec 2024 02:34:21 +0100 Subject: [PATCH 6/6] Fix typo in mbed port --- src/system/mbed/system.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/system/mbed/system.cpp b/src/system/mbed/system.cpp index 871048bfa..92ed064ac 100644 --- a/src/system/mbed/system.cpp +++ b/src/system/mbed/system.cpp @@ -149,7 +149,7 @@ z_result_t _z_condvar_signal_all(_z_condvar_t *cv) { } cond_var.mutex.unlock(); - return _z_RES_OK; + return _Z_RES_OK; } z_result_t _z_condvar_wait(_z_condvar_t *cv, _z_mutex_t *m) {