From 0af466117a2cd9fea48f498114d8b4169c37f886 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Thu, 16 Jan 2025 16:01:27 +0100 Subject: [PATCH 1/3] Add matching subscribers --- .github/workflows/integration.yaml | 2 + CMakeLists.txt | 10 + examples/unix/c11/z_pub.c | 31 +- include/zenoh-pico/api/macros.h | 437 +++++++++++++++----------- include/zenoh-pico/api/primitives.h | 78 ++++- include/zenoh-pico/api/types.h | 26 +- include/zenoh-pico/config.h | 1 + include/zenoh-pico/config.h.in | 1 + include/zenoh-pico/net/matching.h | 48 +++ include/zenoh-pico/net/session.h | 5 + include/zenoh-pico/session/matching.h | 66 ++++ src/api/api.c | 57 +++- src/net/matching.c | 119 +++++++ src/net/primitives.c | 4 + src/session/matching.c | 50 +++ src/session/utils.c | 10 + tests/memory_leak.py | 3 + tests/z_api_matching_test.c | 259 +++++++++++++++ tests/z_collections_test.c | 22 ++ 19 files changed, 1034 insertions(+), 195 deletions(-) create mode 100644 include/zenoh-pico/net/matching.h create mode 100644 include/zenoh-pico/session/matching.h create mode 100644 src/net/matching.c create mode 100644 src/session/matching.c create mode 100644 tests/z_api_matching_test.c diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml index 021674289..38deab1bf 100644 --- a/.github/workflows/integration.yaml +++ b/.github/workflows/integration.yaml @@ -36,6 +36,7 @@ jobs: BUILD_TESTING: OFF BUILD_MULTICAST: OFF BUILD_INTEGRATION: ON + Z_FEATURE_UNSTABLE_API: 1 - name: Test debug run: make test @@ -45,4 +46,5 @@ jobs: BUILD_TESTING: OFF # Workaround for Windows as it seems the previous step is being ignored BUILD_MULTICAST: OFF # Workaround for Windows as it seems the previous step is being ignored BUILD_INTEGRATION: ON # Workaround for Windows as it seems the previous step is being ignored + Z_FEATURE_UNSTABLE_API: 1 ZENOH_BRANCH: main diff --git a/CMakeLists.txt b/CMakeLists.txt index f9fd2e845..22760432f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -243,6 +243,7 @@ set(Z_FEATURE_TCP_NODELAY 1 CACHE STRING "Toggle TCP_NODELAY") set(Z_FEATURE_LOCAL_SUBSCRIBER 0 CACHE STRING "Toggle local subscriptions") set(Z_FEATURE_PUBLISHER_SESSION_CHECK 1 CACHE STRING "Toggle publisher session check") set(Z_FEATURE_BATCHING 1 CACHE STRING "Toggle batching") +set(Z_FEATURE_MATCHING 1 CACHE STRING "Toggle matching feature") set(Z_FEATURE_RX_CACHE 0 CACHE STRING "Toggle RX_CACHE") set(Z_FEATURE_AUTO_RECONNECT 1 CACHE STRING "Toggle automatic reconnection") @@ -252,6 +253,11 @@ if(Z_FEATURE_LINK_SERIAL_USB AND NOT Z_FEATURE_UNSTABLE_API) set(Z_FEATURE_LINK_SERIAL_USB 0 CACHE STRING "Toggle Serial USB links" FORCE) endif() +if(Z_FEATURE_MATCHING AND NOT Z_FEATURE_UNSTABLE_API) + message(WARNING "Z_FEATURE_MATCHING can only be enabled when Z_FEATURE_UNSTABLE_API is also enabled. Disabling Z_FEATURE_MATCHING.") + set(Z_FEATURE_MATCHING 0 CACHE STRING "Toggle matching feature" FORCE) +endif() + add_compile_definitions("Z_BUILD_DEBUG=$") message(STATUS "Building with feature confing:\n\ * UNSTABLE_API: ${Z_FEATURE_UNSTABLE_API}\n\ @@ -263,6 +269,7 @@ message(STATUS "Building with feature confing:\n\ * LIVELINESS: ${Z_FEATURE_LIVELINESS}\n\ * INTEREST: ${Z_FEATURE_INTEREST}\n\ * AUTO_RECONNECT: ${Z_FEATURE_AUTO_RECONNECT}\n\ +* MATCHING: ${Z_FEATURE_MATCHING}\n\ * RAWETH: ${Z_FEATURE_RAWETH_TRANSPORT}") configure_file( @@ -572,11 +579,13 @@ if(UNIX OR MSVC) add_executable(z_api_alignment_test ${PROJECT_SOURCE_DIR}/tests/z_api_alignment_test.c) add_executable(z_session_test ${PROJECT_SOURCE_DIR}/tests/z_session_test.c) add_executable(z_api_liveliness_test ${PROJECT_SOURCE_DIR}/tests/z_api_liveliness_test.c) + add_executable(z_api_matching_test ${PROJECT_SOURCE_DIR}/tests/z_api_matching_test.c) target_link_libraries(z_client_test zenohpico::lib) target_link_libraries(z_api_alignment_test zenohpico::lib) target_link_libraries(z_session_test zenohpico::lib) target_link_libraries(z_api_liveliness_test zenohpico::lib) + target_link_libraries(z_api_matching_test zenohpico::lib) configure_file(${PROJECT_SOURCE_DIR}/tests/routed.sh ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/routed.sh COPYONLY) configure_file(${PROJECT_SOURCE_DIR}/tests/api.sh ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/api.sh COPYONLY) @@ -586,6 +595,7 @@ if(UNIX OR MSVC) add_test(z_api_alignment_test bash ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/api.sh z_api_alignment_test) add_test(z_session_test bash ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/api.sh z_session_test) add_test(z_api_liveliness_test bash ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/api.sh z_api_liveliness_test) + add_test(z_api_matching_test bash ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/api.sh z_api_matching_test) endif() endif() endif() diff --git a/examples/unix/c11/z_pub.c b/examples/unix/c11/z_pub.c index ddd1dc11d..a91e60378 100644 --- a/examples/unix/c11/z_pub.c +++ b/examples/unix/c11/z_pub.c @@ -20,9 +20,19 @@ #include #include -#include "zenoh-pico/system/platform.h" - #if Z_FEATURE_PUBLICATION == 1 + +#if Z_FEATURE_MATCHING == 1 +void matching_status_handler(const z_matching_status_t *matching_status, void *arg) { + (void)arg; + if (matching_status->matching) { + printf("Publisher has matching subscribers.\n"); + } else { + printf("Publisher has NO MORE matching subscribers.\n"); + } +} +#endif + int main(int argc, char **argv) { const char *keyexpr = "demo/example/zenoh-pico-pub"; char *const default_value = "Pub from Pico!"; @@ -31,9 +41,10 @@ int main(int argc, char **argv) { char *clocator = NULL; char *llocator = NULL; int n = 2147483647; // max int value by default + bool add_matching_listener = false; int opt; - while ((opt = getopt(argc, argv, "k:v:e:m:l:n:")) != -1) { + while ((opt = getopt(argc, argv, "k:v:e:m:l:n:a")) != -1) { switch (opt) { case 'k': keyexpr = optarg; @@ -53,6 +64,9 @@ int main(int argc, char **argv) { case 'n': n = atoi(optarg); break; + case 'a': + add_matching_listener = true; + break; case '?': if (optopt == 'k' || optopt == 'v' || optopt == 'e' || optopt == 'm' || optopt == 'l' || optopt == 'n') { @@ -104,6 +118,17 @@ int main(int argc, char **argv) { return -1; } + if (add_matching_listener) { +#if Z_FEATURE_MATCHING == 1 + z_owned_closure_matching_status_t callback; + z_closure(&callback, matching_status_handler, NULL, NULL); + z_publisher_declare_background_matching_listener(z_loan(pub), z_move(callback)); +#else + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_MATCHING but this example requires it.\n"); + return -2; +#endif + } + // Publish data printf("Press CTRL-C to quit...\n"); char buf[256]; diff --git a/include/zenoh-pico/api/macros.h b/include/zenoh-pico/api/macros.h index f60ba22bd..24a49599e 100644 --- a/include/zenoh-pico/api/macros.h +++ b/include/zenoh-pico/api/macros.h @@ -37,42 +37,44 @@ */ #define z_loan(x) _Generic((x), \ - z_owned_keyexpr_t : z_keyexpr_loan, \ - z_view_keyexpr_t : z_view_keyexpr_loan, \ - z_owned_config_t : z_config_loan, \ - z_owned_session_t : z_session_loan, \ - z_owned_subscriber_t : z_subscriber_loan, \ - z_owned_publisher_t : z_publisher_loan, \ - z_owned_queryable_t : z_queryable_loan, \ - z_owned_liveliness_token_t : z_liveliness_token_loan, \ - z_owned_reply_t : z_reply_loan, \ - z_owned_hello_t : z_hello_loan, \ - z_owned_string_t : z_string_loan, \ - z_view_string_t : z_view_string_loan, \ - z_owned_string_array_t : z_string_array_loan, \ - z_owned_sample_t : z_sample_loan, \ - z_owned_query_t : z_query_loan, \ - z_owned_slice_t : z_slice_loan, \ - z_view_slice_t : z_view_slice_loan, \ - z_owned_bytes_t : z_bytes_loan, \ - z_owned_encoding_t : z_encoding_loan, \ - z_owned_task_t : z_task_loan, \ - z_owned_mutex_t : z_mutex_loan, \ - z_owned_condvar_t : z_condvar_loan, \ - z_owned_fifo_handler_query_t : z_fifo_handler_query_loan, \ - z_owned_fifo_handler_reply_t : z_fifo_handler_reply_loan, \ - z_owned_fifo_handler_sample_t : z_fifo_handler_sample_loan, \ - z_owned_ring_handler_query_t : z_ring_handler_query_loan, \ - z_owned_ring_handler_reply_t : z_ring_handler_reply_loan, \ - z_owned_ring_handler_sample_t : z_ring_handler_sample_loan, \ - z_owned_reply_err_t : z_reply_err_loan, \ - z_owned_closure_sample_t : z_closure_sample_loan, \ - z_owned_closure_reply_t : z_closure_reply_loan, \ - z_owned_closure_query_t : z_closure_query_loan, \ - z_owned_closure_hello_t : z_closure_hello_loan, \ - z_owned_closure_zid_t : z_closure_zid_loan, \ - ze_owned_serializer_t : ze_serializer_loan, \ - z_owned_bytes_writer_t : z_bytes_writer_loan \ + z_owned_keyexpr_t : z_keyexpr_loan, \ + z_view_keyexpr_t : z_view_keyexpr_loan, \ + z_owned_config_t : z_config_loan, \ + z_owned_session_t : z_session_loan, \ + z_owned_subscriber_t : z_subscriber_loan, \ + z_owned_publisher_t : z_publisher_loan, \ + z_owned_matching_listener_t : z_matching_listener_loan, \ + z_owned_queryable_t : z_queryable_loan, \ + z_owned_liveliness_token_t : z_liveliness_token_loan, \ + z_owned_reply_t : z_reply_loan, \ + z_owned_hello_t : z_hello_loan, \ + z_owned_string_t : z_string_loan, \ + z_view_string_t : z_view_string_loan, \ + z_owned_string_array_t : z_string_array_loan, \ + z_owned_sample_t : z_sample_loan, \ + z_owned_query_t : z_query_loan, \ + z_owned_slice_t : z_slice_loan, \ + z_view_slice_t : z_view_slice_loan, \ + z_owned_bytes_t : z_bytes_loan, \ + z_owned_encoding_t : z_encoding_loan, \ + z_owned_task_t : z_task_loan, \ + z_owned_mutex_t : z_mutex_loan, \ + z_owned_condvar_t : z_condvar_loan, \ + z_owned_fifo_handler_query_t : z_fifo_handler_query_loan, \ + z_owned_fifo_handler_reply_t : z_fifo_handler_reply_loan, \ + z_owned_fifo_handler_sample_t : z_fifo_handler_sample_loan, \ + z_owned_ring_handler_query_t : z_ring_handler_query_loan, \ + z_owned_ring_handler_reply_t : z_ring_handler_reply_loan, \ + z_owned_ring_handler_sample_t : z_ring_handler_sample_loan, \ + z_owned_reply_err_t : z_reply_err_loan, \ + z_owned_closure_sample_t : z_closure_sample_loan, \ + z_owned_closure_reply_t : z_closure_reply_loan, \ + z_owned_closure_query_t : z_closure_query_loan, \ + z_owned_closure_hello_t : z_closure_hello_loan, \ + z_owned_closure_zid_t : z_closure_zid_loan, \ + z_owned_closure_matching_status_t : z_closure_matching_status_loan, \ + ze_owned_serializer_t : ze_serializer_loan, \ + z_owned_bytes_writer_t : z_bytes_writer_loan \ )(&x) #define z_loan_mut(x) _Generic((x), \ @@ -80,6 +82,7 @@ z_owned_config_t : z_config_loan_mut, \ z_owned_session_t : z_session_loan_mut, \ z_owned_publisher_t : z_publisher_loan_mut, \ + z_owned_matching_listener_t : z_matching_listener_loan_mut, \ z_owned_queryable_t : z_queryable_loan_mut, \ z_owned_liveliness_token_t : z_liveliness_token_loan_mut, \ z_owned_subscriber_t : z_subscriber_loan_mut, \ @@ -108,39 +111,41 @@ * x: The instance to drop. */ #define z_drop(x) _Generic((x), \ - z_moved_keyexpr_t* : z_keyexpr_drop, \ - z_moved_config_t* : z_config_drop, \ - z_moved_session_t* : z_session_drop, \ - z_moved_subscriber_t* : z_subscriber_drop, \ - z_moved_publisher_t* : z_publisher_drop, \ - z_moved_queryable_t* : z_queryable_drop, \ - z_moved_liveliness_token_t* : z_liveliness_token_drop, \ - z_moved_reply_t* : z_reply_drop, \ - z_moved_hello_t* : z_hello_drop, \ - z_moved_string_t* : z_string_drop, \ - z_moved_string_array_t* : z_string_array_drop, \ - z_moved_sample_t* : z_sample_drop, \ - z_moved_query_t* : z_query_drop, \ - z_moved_encoding_t* : z_encoding_drop, \ - z_moved_slice_t* : z_slice_drop, \ - z_moved_bytes_t* : z_bytes_drop, \ - z_moved_closure_sample_t* : z_closure_sample_drop, \ - z_moved_closure_query_t* : z_closure_query_drop, \ - z_moved_closure_reply_t* : z_closure_reply_drop, \ - z_moved_closure_hello_t* : z_closure_hello_drop, \ - z_moved_closure_zid_t* : z_closure_zid_drop, \ - z_moved_task_t* : z_task_join, \ - z_moved_mutex_t* : z_mutex_drop, \ - z_moved_condvar_t* : z_condvar_drop, \ - z_moved_fifo_handler_query_t* : z_fifo_handler_query_drop, \ - z_moved_fifo_handler_reply_t* : z_fifo_handler_reply_drop, \ - z_moved_fifo_handler_sample_t* : z_fifo_handler_sample_drop, \ - z_moved_ring_handler_query_t* : z_ring_handler_query_drop, \ - z_moved_ring_handler_reply_t* : z_ring_handler_reply_drop, \ - z_moved_ring_handler_sample_t* : z_ring_handler_sample_drop, \ - z_moved_reply_err_t* : z_reply_err_drop, \ - ze_moved_serializer_t* : ze_serializer_drop, \ - z_moved_bytes_writer_t* : z_bytes_writer_drop \ + z_moved_keyexpr_t* : z_keyexpr_drop, \ + z_moved_config_t* : z_config_drop, \ + z_moved_session_t* : z_session_drop, \ + z_moved_subscriber_t* : z_subscriber_drop, \ + z_moved_publisher_t* : z_publisher_drop, \ + z_moved_matching_listener_t* : z_matching_listener_drop, \ + z_moved_queryable_t* : z_queryable_drop, \ + z_moved_liveliness_token_t* : z_liveliness_token_drop, \ + z_moved_reply_t* : z_reply_drop, \ + z_moved_hello_t* : z_hello_drop, \ + z_moved_string_t* : z_string_drop, \ + z_moved_string_array_t* : z_string_array_drop, \ + z_moved_sample_t* : z_sample_drop, \ + z_moved_query_t* : z_query_drop, \ + z_moved_encoding_t* : z_encoding_drop, \ + z_moved_slice_t* : z_slice_drop, \ + z_moved_bytes_t* : z_bytes_drop, \ + z_moved_closure_sample_t* : z_closure_sample_drop, \ + z_moved_closure_query_t* : z_closure_query_drop, \ + z_moved_closure_reply_t* : z_closure_reply_drop, \ + z_moved_closure_hello_t* : z_closure_hello_drop, \ + z_moved_closure_zid_t* : z_closure_zid_drop, \ + z_moved_closure_matching_status_t* : z_closure_matching_status_drop, \ + z_moved_task_t* : z_task_join, \ + z_moved_mutex_t* : z_mutex_drop, \ + z_moved_condvar_t* : z_condvar_drop, \ + z_moved_fifo_handler_query_t* : z_fifo_handler_query_drop, \ + z_moved_fifo_handler_reply_t* : z_fifo_handler_reply_drop, \ + z_moved_fifo_handler_sample_t* : z_fifo_handler_sample_drop, \ + z_moved_ring_handler_query_t* : z_ring_handler_query_drop, \ + z_moved_ring_handler_reply_t* : z_ring_handler_reply_drop, \ + z_moved_ring_handler_sample_t* : z_ring_handler_sample_drop, \ + z_moved_reply_err_t* : z_reply_err_drop, \ + ze_moved_serializer_t* : ze_serializer_drop, \ + z_moved_bytes_writer_t* : z_bytes_writer_drop \ )(x) /** @@ -154,30 +159,32 @@ */ #define z_internal_check(x) _Generic((x), \ - z_owned_keyexpr_t : z_internal_keyexpr_check, \ - z_owned_reply_err_t : z_internal_reply_err_check, \ - z_owned_config_t : z_internal_config_check, \ - z_owned_session_t : z_internal_session_check, \ - z_owned_subscriber_t : z_internal_subscriber_check, \ - z_owned_publisher_t : z_internal_publisher_check, \ - z_owned_queryable_t : z_internal_queryable_check, \ - z_owned_liveliness_token_t : z_internal_liveliness_token_check, \ - z_owned_reply_t : z_internal_reply_check, \ - z_owned_hello_t : z_internal_hello_check, \ - z_owned_string_t : z_internal_string_check, \ - z_owned_string_array_t : z_internal_string_array_check, \ - z_owned_closure_sample_t : z_internal_closure_sample_check, \ - z_owned_closure_query_t : z_internal_closure_query_check, \ - z_owned_closure_reply_t : z_internal_closure_reply_check, \ - z_owned_closure_hello_t : z_internal_closure_hello_check, \ - z_owned_closure_zid_t : z_internal_closure_zid_check, \ - z_owned_slice_t : z_internal_slice_check, \ - z_owned_bytes_t : z_internal_bytes_check, \ - z_owned_sample_t : z_internal_sample_check, \ - z_owned_query_t : z_internal_query_check, \ - z_owned_encoding_t : z_internal_encoding_check, \ - ze_owned_serializer_t : ze_internal_serializer_check, \ - z_owned_bytes_writer_t : z_internal_bytes_writer_check \ + z_owned_keyexpr_t : z_internal_keyexpr_check, \ + z_owned_reply_err_t : z_internal_reply_err_check, \ + z_owned_config_t : z_internal_config_check, \ + z_owned_session_t : z_internal_session_check, \ + z_owned_subscriber_t : z_internal_subscriber_check, \ + z_owned_publisher_t : z_internal_publisher_check, \ + z_owned_matching_listener_t : z_internal_matching_listener_check, \ + z_owned_queryable_t : z_internal_queryable_check, \ + z_owned_liveliness_token_t : z_internal_liveliness_token_check, \ + z_owned_reply_t : z_internal_reply_check, \ + z_owned_hello_t : z_internal_hello_check, \ + z_owned_string_t : z_internal_string_check, \ + z_owned_string_array_t : z_internal_string_array_check, \ + z_owned_closure_sample_t : z_internal_closure_sample_check, \ + z_owned_closure_query_t : z_internal_closure_query_check, \ + z_owned_closure_reply_t : z_internal_closure_reply_check, \ + z_owned_closure_hello_t : z_internal_closure_hello_check, \ + z_owned_closure_zid_t : z_internal_closure_zid_check, \ + z_owned_closure_matching_status_t : z_internal_closure_matching_status_check, \ + z_owned_slice_t : z_internal_slice_check, \ + z_owned_bytes_t : z_internal_bytes_check, \ + z_owned_sample_t : z_internal_sample_check, \ + z_owned_query_t : z_internal_query_check, \ + z_owned_encoding_t : z_internal_encoding_check, \ + ze_owned_serializer_t : ze_internal_serializer_check, \ + z_owned_bytes_writer_t : z_internal_bytes_writer_check \ )(&x) /** @@ -187,11 +194,12 @@ * x: The closure to call */ #define z_call(x, ...) \ - _Generic((x), z_loaned_closure_sample_t : z_closure_sample_call, \ - z_loaned_closure_query_t : z_closure_query_call, \ - z_loaned_closure_reply_t : z_closure_reply_call, \ - z_loaned_closure_hello_t : z_closure_hello_call, \ - z_loaned_closure_zid_t : z_closure_zid_call \ + _Generic((x), z_loaned_closure_sample_t : z_closure_sample_call, \ + z_loaned_closure_query_t : z_closure_query_call, \ + z_loaned_closure_reply_t : z_closure_reply_call, \ + z_loaned_closure_hello_t : z_closure_hello_call, \ + z_loaned_closure_zid_t : z_closure_zid_call, \ + z_loaned_closure_matching_status_t : z_closure_matching_status_call \ ) (&x, __VA_ARGS__) #define z_try_recv(x, ...) \ @@ -224,39 +232,41 @@ * Returns the instance associated with `x`. */ #define z_move(x) _Generic((x), \ - z_owned_keyexpr_t : z_keyexpr_move, \ - z_owned_config_t : z_config_move, \ - z_owned_session_t : z_session_move, \ - z_owned_subscriber_t : z_subscriber_move, \ - z_owned_publisher_t : z_publisher_move, \ - z_owned_queryable_t : z_queryable_move, \ - z_owned_liveliness_token_t : z_liveliness_token_move, \ - z_owned_reply_t : z_reply_move, \ - z_owned_hello_t : z_hello_move, \ - z_owned_string_t : z_string_move, \ - z_owned_string_array_t : z_string_array_move, \ - z_owned_closure_sample_t : z_closure_sample_move, \ - z_owned_closure_query_t : z_closure_query_move, \ - z_owned_closure_reply_t : z_closure_reply_move, \ - z_owned_closure_hello_t : z_closure_hello_move, \ - z_owned_closure_zid_t : z_closure_zid_move, \ - z_owned_sample_t : z_sample_move, \ - z_owned_query_t : z_query_move, \ - z_owned_slice_t : z_slice_move, \ - z_owned_bytes_t : z_bytes_move, \ - z_owned_encoding_t : z_encoding_move, \ - z_owned_task_t : z_task_move, \ - z_owned_mutex_t : z_mutex_move, \ - z_owned_condvar_t : z_condvar_move, \ - z_owned_ring_handler_query_t : z_ring_handler_query_move, \ - z_owned_ring_handler_reply_t : z_ring_handler_reply_move, \ - z_owned_ring_handler_sample_t : z_ring_handler_sample_move, \ - z_owned_fifo_handler_query_t : z_fifo_handler_query_move, \ - z_owned_fifo_handler_reply_t : z_fifo_handler_reply_move, \ - z_owned_fifo_handler_sample_t : z_fifo_handler_sample_move, \ - z_owned_reply_err_t : z_reply_err_move, \ - ze_owned_serializer_t : ze_serializer_move, \ - z_owned_bytes_writer_t : z_bytes_writer_move \ + z_owned_keyexpr_t : z_keyexpr_move, \ + z_owned_config_t : z_config_move, \ + z_owned_session_t : z_session_move, \ + z_owned_subscriber_t : z_subscriber_move, \ + z_owned_publisher_t : z_publisher_move, \ + z_owned_matching_listener_t: z_matching_listener_move, \ + z_owned_queryable_t : z_queryable_move, \ + z_owned_liveliness_token_t : z_liveliness_token_move, \ + z_owned_reply_t : z_reply_move, \ + z_owned_hello_t : z_hello_move, \ + z_owned_string_t : z_string_move, \ + z_owned_string_array_t : z_string_array_move, \ + z_owned_closure_sample_t : z_closure_sample_move, \ + z_owned_closure_query_t : z_closure_query_move, \ + z_owned_closure_reply_t : z_closure_reply_move, \ + z_owned_closure_hello_t : z_closure_hello_move, \ + z_owned_closure_zid_t : z_closure_zid_move, \ + z_owned_closure_matching_status_t : z_closure_matching_status_move, \ + z_owned_sample_t : z_sample_move, \ + z_owned_query_t : z_query_move, \ + z_owned_slice_t : z_slice_move, \ + z_owned_bytes_t : z_bytes_move, \ + z_owned_encoding_t : z_encoding_move, \ + z_owned_task_t : z_task_move, \ + z_owned_mutex_t : z_mutex_move, \ + z_owned_condvar_t : z_condvar_move, \ + z_owned_ring_handler_query_t : z_ring_handler_query_move, \ + z_owned_ring_handler_reply_t : z_ring_handler_reply_move, \ + z_owned_ring_handler_sample_t : z_ring_handler_sample_move, \ + z_owned_fifo_handler_query_t : z_fifo_handler_query_move, \ + z_owned_fifo_handler_reply_t : z_fifo_handler_reply_move, \ + z_owned_fifo_handler_sample_t : z_fifo_handler_sample_move, \ + z_owned_reply_err_t : z_reply_err_move, \ + ze_owned_serializer_t : ze_serializer_move, \ + z_owned_bytes_writer_t : z_bytes_writer_move \ )(&x) /** @@ -269,40 +279,42 @@ * Returns: * Returns the instance associated with `x`. */ -#define z_take(this_, x) \ - _Generic((this_), \ - z_owned_bytes_t *: z_bytes_take, \ - z_owned_closure_hello_t *: z_closure_hello_take, \ - z_owned_closure_query_t *: z_closure_query_take, \ - z_owned_closure_reply_t *: z_closure_reply_take, \ - z_owned_closure_sample_t *: z_closure_sample_take, \ - z_owned_closure_zid_t * : z_closure_zid_take, \ - z_owned_condvar_t *: z_condvar_take, \ - z_owned_config_t *: z_config_take, \ - z_owned_encoding_t *: z_encoding_take, \ - z_owned_fifo_handler_query_t *: z_fifo_handler_query_take, \ - z_owned_fifo_handler_reply_t *: z_fifo_handler_reply_take, \ - z_owned_fifo_handler_sample_t *: z_fifo_handler_sample_take,\ - z_owned_hello_t *: z_hello_take, \ - z_owned_keyexpr_t *: z_keyexpr_take, \ - z_owned_mutex_t *: z_mutex_take, \ - z_owned_publisher_t *: z_publisher_take, \ - z_owned_query_t *: z_query_take, \ - z_owned_queryable_t *: z_queryable_take, \ - z_owned_liveliness_token_t *: z_liveliness_token_take, \ - z_owned_reply_t *: z_reply_take, \ - z_owned_reply_err_t *: z_reply_err_take, \ - z_owned_ring_handler_query_t *: z_ring_handler_query_take, \ - z_owned_ring_handler_reply_t *: z_ring_handler_reply_take, \ - z_owned_ring_handler_sample_t *: z_ring_handler_sample_take,\ - z_owned_sample_t *: z_sample_take, \ - z_owned_session_t *: z_session_take, \ - z_owned_slice_t *: z_slice_take, \ - z_owned_string_array_t *: z_string_array_take, \ - z_owned_string_t *: z_string_take, \ - z_owned_subscriber_t *: z_subscriber_take, \ - ze_owned_serializer_t *: ze_serializer_take, \ - z_owned_bytes_writer_t *: z_bytes_writer_take \ +#define z_take(this_, x) \ + _Generic((this_), \ + z_owned_bytes_t *: z_bytes_take, \ + z_owned_closure_hello_t *: z_closure_hello_take, \ + z_owned_closure_query_t *: z_closure_query_take, \ + z_owned_closure_reply_t *: z_closure_reply_take, \ + z_owned_closure_sample_t *: z_closure_sample_take, \ + z_owned_closure_zid_t * : z_closure_zid_take, \ + z_owned_closure_matching_status_t * : z_closure_matching_status_take, \ + z_owned_condvar_t *: z_condvar_take, \ + z_owned_config_t *: z_config_take, \ + z_owned_encoding_t *: z_encoding_take, \ + z_owned_fifo_handler_query_t *: z_fifo_handler_query_take, \ + z_owned_fifo_handler_reply_t *: z_fifo_handler_reply_take, \ + z_owned_fifo_handler_sample_t *: z_fifo_handler_sample_take, \ + z_owned_hello_t *: z_hello_take, \ + z_owned_keyexpr_t *: z_keyexpr_take, \ + z_owned_mutex_t *: z_mutex_take, \ + z_owned_publisher_t *: z_publisher_take, \ + z_owned_matching_listener_t *: z_matching_listener_take, \ + z_owned_query_t *: z_query_take, \ + z_owned_queryable_t *: z_queryable_take, \ + z_owned_liveliness_token_t *: z_liveliness_token_take, \ + z_owned_reply_t *: z_reply_take, \ + z_owned_reply_err_t *: z_reply_err_take, \ + z_owned_ring_handler_query_t *: z_ring_handler_query_take, \ + z_owned_ring_handler_reply_t *: z_ring_handler_reply_take, \ + z_owned_ring_handler_sample_t *: z_ring_handler_sample_take, \ + z_owned_sample_t *: z_sample_take, \ + z_owned_session_t *: z_session_take, \ + z_owned_slice_t *: z_slice_take, \ + z_owned_string_array_t *: z_string_array_take, \ + z_owned_string_t *: z_string_take, \ + z_owned_subscriber_t *: z_subscriber_take, \ + ze_owned_serializer_t *: ze_serializer_take, \ + z_owned_bytes_writer_t *: z_bytes_writer_take \ )(this_, x) /** @@ -337,30 +349,32 @@ * x: The instance to nullify. */ #define z_internal_null(x) _Generic((x), \ - z_owned_session_t * : z_internal_session_null, \ - z_owned_publisher_t * : z_internal_publisher_null, \ - z_owned_keyexpr_t * : z_internal_keyexpr_null, \ - z_owned_config_t * : z_internal_config_null, \ - z_owned_subscriber_t * : z_internal_subscriber_null, \ - z_owned_queryable_t * : z_internal_queryable_null, \ - z_owned_liveliness_token_t * : z_internal_liveliness_token_null, \ - z_owned_query_t * : z_internal_query_null, \ - z_owned_reply_t * : z_internal_reply_null, \ - z_owned_hello_t * : z_internal_hello_null, \ - z_owned_string_t * : z_internal_string_null, \ - z_owned_string_array_t * : z_internal_string_array_null, \ - z_owned_slice_t *: z_internal_slice_null, \ - z_owned_bytes_t *: z_internal_bytes_null, \ - z_owned_closure_sample_t * : z_internal_closure_sample_null, \ - z_owned_closure_query_t * : z_internal_closure_query_null, \ - z_owned_closure_reply_t * : z_internal_closure_reply_null, \ - z_owned_closure_hello_t * : z_internal_closure_hello_null, \ - z_owned_closure_zid_t * : z_internal_closure_zid_null, \ - z_owned_sample_t * : z_internal_sample_null, \ - z_owned_encoding_t * : z_internal_encoding_null, \ - z_owned_reply_err_t * : z_internal_reply_err_null, \ - ze_owned_serializer_t * : ze_internal_serializer_null, \ - z_owned_bytes_writer_t * : z_internal_bytes_writer_null \ + z_owned_session_t * : z_internal_session_null, \ + z_owned_publisher_t * : z_internal_publisher_null, \ + z_owned_matching_listener_t * : z_internal_matching_listener_null, \ + z_owned_keyexpr_t * : z_internal_keyexpr_null, \ + z_owned_config_t * : z_internal_config_null, \ + z_owned_subscriber_t * : z_internal_subscriber_null, \ + z_owned_queryable_t * : z_internal_queryable_null, \ + z_owned_liveliness_token_t * : z_internal_liveliness_token_null, \ + z_owned_query_t * : z_internal_query_null, \ + z_owned_reply_t * : z_internal_reply_null, \ + z_owned_hello_t * : z_internal_hello_null, \ + z_owned_string_t * : z_internal_string_null, \ + z_owned_string_array_t * : z_internal_string_array_null, \ + z_owned_slice_t *: z_internal_slice_null, \ + z_owned_bytes_t *: z_internal_bytes_null, \ + z_owned_closure_sample_t * : z_internal_closure_sample_null, \ + z_owned_closure_query_t * : z_internal_closure_query_null, \ + z_owned_closure_reply_t * : z_internal_closure_reply_null, \ + z_owned_closure_hello_t * : z_internal_closure_hello_null, \ + z_owned_closure_zid_t * : z_internal_closure_zid_null, \ + z_owned_closure_matching_status_t * : z_internal_closure_matching_status_null, \ + z_owned_sample_t * : z_internal_sample_null, \ + z_owned_encoding_t * : z_internal_encoding_null, \ + z_owned_reply_err_t * : z_internal_reply_err_null, \ + ze_owned_serializer_t * : ze_internal_serializer_null, \ + z_owned_bytes_writer_t * : z_internal_bytes_writer_null \ )(x) // clang-format on @@ -396,6 +410,7 @@ inline const z_loaned_config_t* z_loan(const z_owned_config_t& x) { return z_con inline const z_loaned_session_t* z_loan(const z_owned_session_t& x) { return z_session_loan(&x); } inline const z_loaned_subscriber_t* z_loan(const z_owned_subscriber_t& x) { return z_subscriber_loan(&x); } inline const z_loaned_publisher_t* z_loan(const z_owned_publisher_t& x) { return z_publisher_loan(&x); } +inline const z_loaned_matching_listener_t* z_loan(const z_owned_matching_listener_t& x) { return z_matching_listener_loan(&x); } inline const z_loaned_queryable_t* z_loan(const z_owned_queryable_t& x) { return z_queryable_loan(&x); } inline const z_loaned_liveliness_token_t* z_loan(const z_owned_liveliness_token_t& x) { return z_liveliness_token_loan(&x); } inline const z_loaned_reply_t* z_loan(const z_owned_reply_t& x) { return z_reply_loan(&x); } @@ -418,6 +433,7 @@ inline const z_loaned_closure_reply_t* z_loan(const z_owned_closure_reply_t& x) inline const z_loaned_closure_query_t* z_loan(const z_owned_closure_query_t& x) { return z_closure_query_loan(&x); } inline const z_loaned_closure_hello_t* z_loan(const z_owned_closure_hello_t& x) { return z_closure_hello_loan(&x); } inline const z_loaned_closure_zid_t* z_loan(const z_owned_closure_zid_t& x) { return z_closure_zid_loan(&x); } +inline const z_loaned_closure_matching_status_t* z_loan(const z_owned_closure_matching_status_t& x) { return z_closure_matching_status_loan(&x); } inline const z_loaned_fifo_handler_query_t* z_loan(const z_owned_fifo_handler_query_t& x) { return z_fifo_handler_query_loan(&x); } inline const z_loaned_fifo_handler_reply_t* z_loan(const z_owned_fifo_handler_reply_t& x) { return z_fifo_handler_reply_loan(&x); } inline const z_loaned_fifo_handler_sample_t* z_loan(const z_owned_fifo_handler_sample_t& x) { return z_fifo_handler_sample_loan(&x); } @@ -433,6 +449,7 @@ inline z_loaned_keyexpr_t* z_loan_mut(z_view_keyexpr_t& x) { return z_view_keyex inline z_loaned_config_t* z_loan_mut(z_owned_config_t& x) { return z_config_loan_mut(&x); } inline z_loaned_session_t* z_loan_mut(z_owned_session_t& x) { return z_session_loan_mut(&x); } inline z_loaned_publisher_t* z_loan_mut(z_owned_publisher_t& x) { return z_publisher_loan_mut(&x); } +inline z_loaned_matching_listener_t* z_loan_mut(z_owned_matching_listener_t& x) { return z_matching_listener_loan_mut(&x); } inline z_loaned_queryable_t* z_loan_mut(z_owned_queryable_t& x) { return z_queryable_loan_mut(&x); } inline z_loaned_liveliness_token_t* z_loan_mut(z_owned_liveliness_token_t& x) { return z_liveliness_token_loan_mut(&x); } inline z_loaned_subscriber_t* z_loan_mut(z_owned_subscriber_t& x) { return z_subscriber_loan_mut(&x); } @@ -457,6 +474,7 @@ inline ze_loaned_serializer_t* z_loan_mut(ze_owned_serializer_t& x) { return ze_ // z_drop definition inline void z_drop(z_moved_session_t* v) { z_session_drop(v); } inline void z_drop(z_moved_publisher_t* v) { z_publisher_drop(v); } +inline void z_drop(z_moved_matching_listener_t* v) { z_matching_listener_drop(v); } inline void z_drop(z_moved_keyexpr_t* v) { z_keyexpr_drop(v); } inline void z_drop(z_moved_config_t* v) { z_config_drop(v); } inline void z_drop(z_moved_subscriber_t* v) { z_subscriber_drop(v); } @@ -479,6 +497,7 @@ inline void z_drop(z_moved_closure_query_t* v) { z_closure_query_drop(v); } inline void z_drop(z_moved_closure_reply_t* v) { z_closure_reply_drop(v); } inline void z_drop(z_moved_closure_hello_t* v) { z_closure_hello_drop(v); } inline void z_drop(z_moved_closure_zid_t* v) { z_closure_zid_drop(v); } +inline void z_drop(z_moved_closure_matching_status_t* v) { z_closure_matching_status_drop(v); } inline void z_drop(z_moved_ring_handler_sample_t* v) { z_ring_handler_sample_drop(v); } inline void z_drop(z_moved_fifo_handler_sample_t* v) { z_fifo_handler_sample_drop(v); } inline void z_drop(z_moved_ring_handler_query_t* v) { z_ring_handler_query_drop(v); } @@ -491,6 +510,7 @@ inline void z_drop(ze_moved_serializer_t* v) { ze_serializer_drop(v); } // z_internal_null definition inline void z_internal_null(z_owned_session_t* v) { z_internal_session_null(v); } inline void z_internal_null(z_owned_publisher_t* v) { z_internal_publisher_null(v); } +inline void z_internal_null(z_owned_matching_listener_t* v) { z_internal_matching_listener_null(v); } inline void z_internal_null(z_owned_keyexpr_t* v) { z_internal_keyexpr_null(v); } inline void z_internal_null(z_owned_config_t* v) { z_internal_config_null(v); } inline void z_internal_null(z_owned_subscriber_t* v) { z_internal_subscriber_null(v); } @@ -509,6 +529,7 @@ inline void z_internal_null(z_owned_closure_query_t* v) { z_internal_closure_que inline void z_internal_null(z_owned_closure_reply_t* v) { z_internal_closure_reply_null(v); } inline void z_internal_null(z_owned_closure_hello_t* v) { z_internal_closure_hello_null(v); } inline void z_internal_null(z_owned_closure_zid_t* v) { z_internal_closure_zid_null(v); } +inline void z_internal_null(z_owned_closure_matching_status_t* v) { z_internal_closure_matching_status_null(v); } inline void z_internal_null(z_owned_ring_handler_query_t* v) { return z_internal_ring_handler_query_null(v); } inline void z_internal_null(z_owned_ring_handler_reply_t* v) { return z_internal_ring_handler_reply_null(v); } inline void z_internal_null(z_owned_ring_handler_sample_t* v) { return z_internal_ring_handler_sample_null(v); } @@ -521,6 +542,7 @@ inline void z_internal_null(ze_owned_serializer_t* v) { return ze_internal_seria // z_internal_check definition inline bool z_internal_check(const z_owned_session_t& v) { return z_internal_session_check(&v); } inline bool z_internal_check(const z_owned_publisher_t& v) { return z_internal_publisher_check(&v); } +inline bool z_internal_check(const z_owned_matching_listener_t& v) { return z_internal_matching_listener_check(&v); } inline bool z_internal_check(const z_owned_keyexpr_t& v) { return z_internal_keyexpr_check(&v); } inline bool z_internal_check(const z_owned_config_t& v) { return z_internal_config_check(&v); } inline bool z_internal_check(const z_owned_subscriber_t& v) { return z_internal_subscriber_check(&v); } @@ -554,6 +576,8 @@ inline void z_call(const z_loaned_closure_hello_t &closure, z_loaned_hello_t *he { z_closure_hello_call(&closure, hello); } inline void z_call(const z_loaned_closure_zid_t &closure, const z_id_t *zid) { z_closure_zid_call(&closure, zid); } +inline void z_call(const z_loaned_closure_matching_status_t &closure, const z_matching_status_t *status) + { z_closure_matching_status_call(&closure, status); } inline void z_closure( z_owned_closure_hello_t* closure, @@ -600,6 +624,15 @@ inline void z_closure( closure->_val.drop = drop; closure->_val.call = call; } +inline void z_closure( + z_owned_closure_matching_status_t* closure, + void (*call)(const z_matching_status_t*, void*), + void (*drop)(void*), + void *context) { + closure->_val.context = context; + closure->_val.drop = drop; + closure->_val.call = call; +} inline z_result_t z_try_recv(const z_loaned_fifo_handler_query_t* this_, z_owned_query_t* query) { return z_fifo_handler_query_try_recv(this_, query); @@ -648,12 +681,16 @@ inline z_moved_closure_query_t* z_move(z_owned_closure_query_t& closure) { retur inline z_moved_closure_reply_t* z_move(z_owned_closure_reply_t& closure) { return z_closure_reply_move(&closure); } inline z_moved_closure_sample_t* z_move(z_owned_closure_sample_t& closure) { return z_closure_sample_move(&closure); } inline z_moved_closure_zid_t* z_move(z_owned_closure_zid_t& closure) { return z_closure_zid_move(&closure); } +inline z_moved_closure_matching_status_t* z_move(z_owned_closure_matching_status_t& closure) { + return z_closure_matching_status_move(&closure); +} inline z_moved_config_t* z_move(z_owned_config_t& x) { return z_config_move(&x); } inline z_moved_encoding_t* z_move(z_owned_encoding_t& x) { return z_encoding_move(&x); } inline z_moved_reply_err_t* z_move(z_owned_reply_err_t& x) { return z_reply_err_move(&x); } inline z_moved_hello_t* z_move(z_owned_hello_t& x) { return z_hello_move(&x); } inline z_moved_keyexpr_t* z_move(z_owned_keyexpr_t& x) { return z_keyexpr_move(&x); } inline z_moved_publisher_t* z_move(z_owned_publisher_t& x) { return z_publisher_move(&x); } +inline z_moved_matching_listener_t* z_move(z_owned_matching_listener_t& x) { return z_matching_listener_move(&x); } inline z_moved_query_t* z_move(z_owned_query_t& x) { return z_query_move(&x); } inline z_moved_queryable_t* z_move(z_owned_queryable_t& x) { return z_queryable_move(&x); } inline z_moved_liveliness_token_t* z_move(z_owned_liveliness_token_t& x) { return z_liveliness_token_move(&x); } @@ -680,6 +717,9 @@ inline ze_moved_serializer_t* z_move(ze_owned_serializer_t& x) { return ze_seria // z_take definition inline void z_take(z_owned_session_t* this_, z_moved_session_t* v) { return z_session_take(this_, v); } inline void z_take(z_owned_publisher_t* this_, z_moved_publisher_t* v) { return z_publisher_take(this_, v); } +inline void z_take(z_owned_matching_listener_t* this_, z_moved_matching_listener_t* v) { + return z_matching_listener_take(this_, v); +} inline void z_take(z_owned_keyexpr_t* this_, z_moved_keyexpr_t* v) { z_keyexpr_take(this_, v); } inline void z_take(z_owned_config_t* this_, z_moved_config_t* v) { z_config_take(this_, v); } inline void z_take(z_owned_subscriber_t* this_, z_moved_subscriber_t* v) { return z_subscriber_take(this_, v); } @@ -704,6 +744,9 @@ inline void z_take(z_owned_closure_query_t* this_, z_moved_closure_query_t* v) { inline void z_take(z_owned_closure_reply_t* this_, z_moved_closure_reply_t* v) { z_closure_reply_take(this_, v); } inline void z_take(z_owned_closure_hello_t* this_, z_moved_closure_hello_t* v) { z_closure_hello_take(this_, v); } inline void z_take(z_owned_closure_zid_t* this_, z_moved_closure_zid_t* v) { z_closure_zid_take(this_, v); } +inline void z_take(z_owned_closure_matching_status_t* this_, z_moved_closure_matching_status_t* v) { + z_closure_matching_status_take(this_, v); +} inline void z_take(z_owned_ring_handler_sample_t* this_, z_moved_ring_handler_sample_t* v) { z_ring_handler_sample_take(this_, v); } @@ -804,6 +847,14 @@ struct z_owned_to_loaned_type_t { typedef z_loaned_publisher_t type; }; template <> +struct z_loaned_to_owned_type_t { + typedef z_owned_matching_listener_t type; +}; +template <> +struct z_owned_to_loaned_type_t { + typedef z_loaned_matching_listener_t type; +}; +template <> struct z_loaned_to_owned_type_t { typedef z_owned_query_t type; }; @@ -924,6 +975,14 @@ struct z_loaned_to_owned_type_t { typedef z_owned_closure_zid_t type; }; template <> +struct z_owned_to_loaned_type_t { + typedef z_loaned_closure_matching_status_t type; +}; +template <> +struct z_loaned_to_owned_type_t { + typedef z_owned_closure_matching_status_t type; +}; +template <> struct z_loaned_to_owned_type_t { typedef z_owned_fifo_handler_query_t type; }; diff --git a/include/zenoh-pico/api/primitives.h b/include/zenoh-pico/api/primitives.h index 9cce79d99..6c84b65c0 100644 --- a/include/zenoh-pico/api/primitives.h +++ b/include/zenoh-pico/api/primitives.h @@ -1111,6 +1111,33 @@ z_result_t z_closure_zid(z_owned_closure_zid_t *closure, z_closure_zid_callback_ */ void z_closure_zid_call(const z_loaned_closure_zid_t *closure, const z_id_t *id); +/** + * Builds a new matching status closure. + * It consists on a structure that contains all the elements for stateful, memory-leak-free callbacks. + * + * Parameters: + * closure: Pointer to an uninitialized :c:type:`z_owned_closure_matching_status_t`. + * call: Pointer to the callback function. ``context`` will be passed as its last argument. + * drop: Pointer to the function that will free the callback state. ``context`` will be passed as its last argument. + * context: Pointer to an arbitrary state. + * + * Return: + * ``0`` in case of success, negative error code otherwise + */ +z_result_t z_closure_matching_status(z_owned_closure_matching_status_t *closure, + z_closure_matching_status_callback_t call, z_closure_drop_callback_t drop, + void *context); + +/** + * Calls a matching status closure. + * + * Parameters: + * closure: Pointer to the :c:type:`z_loaned_closure_matching_status_t` to call. + * status: Pointer to the :c:type:`z_matching_status_t` to pass to the closure. + */ +void z_closure_matching_status_call(const z_loaned_closure_matching_status_t *closure, + const z_matching_status_t *status); + /**************** Loans ****************/ _Z_OWNED_FUNCTIONS_DEF(string) _Z_OWNED_FUNCTIONS_DEF(keyexpr) @@ -1118,6 +1145,7 @@ _Z_OWNED_FUNCTIONS_DEF(config) _Z_OWNED_FUNCTIONS_NO_COPY_DEF(session) _Z_OWNED_FUNCTIONS_NO_COPY_DEF(subscriber) _Z_OWNED_FUNCTIONS_NO_COPY_DEF(publisher) +_Z_OWNED_FUNCTIONS_NO_COPY_DEF(matching_listener) _Z_OWNED_FUNCTIONS_NO_COPY_DEF(queryable) _Z_OWNED_FUNCTIONS_DEF(hello) _Z_OWNED_FUNCTIONS_DEF(reply) @@ -1135,6 +1163,7 @@ _Z_OWNED_FUNCTIONS_CLOSURE_DEF(closure_query) _Z_OWNED_FUNCTIONS_CLOSURE_DEF(closure_reply) _Z_OWNED_FUNCTIONS_CLOSURE_DEF(closure_hello) _Z_OWNED_FUNCTIONS_CLOSURE_DEF(closure_zid) +_Z_OWNED_FUNCTIONS_CLOSURE_DEF(closure_matching_status) _Z_VIEW_FUNCTIONS_DEF(keyexpr) _Z_VIEW_FUNCTIONS_DEF(string) @@ -1661,7 +1690,54 @@ z_result_t z_publisher_delete(const z_loaned_publisher_t *pub, const z_publisher * The keyexpr wrapped as a :c:type:`z_loaned_keyexpr_t`. */ const z_loaned_keyexpr_t *z_publisher_keyexpr(const z_loaned_publisher_t *publisher); -#endif + +#if Z_FEATURE_MATCHING == 1 +/** + * Declares a matching listener, registering a callback for notifying subscribers matching with a given publisher. + * The callback will be run in the background until the corresponding publisher is dropped. + * + * Parameters: + * publisher: A publisher to associate with matching listener. + * callback: A closure that will be called every time the matching status of the publisher changes (If last subscriber + * disconnects or when the first subscriber connects). + * + * Return: + * ``0`` if execution was successful, ``negative value`` otherwise. + * + * .. warning:: This API has been marked as unstable: it works as advertised, but it may be changed in a future release. + */ +z_result_t z_publisher_declare_background_matching_listener(const z_loaned_publisher_t *publisher, + z_moved_closure_matching_status_t *callback); +/** + * Constructs matching listener, registering a callback for notifying subscribers matching with a given publisher. + * + * Parameters: + * publisher: A publisher to associate with matching listener. + * matching_listener: An uninitialized memory location where matching listener will be constructed. The matching + * listener's callback will be automatically dropped when the publisher is dropped. callback: A closure that will be + * called every time the matching status of the publisher changes (If last subscriber disconnects or when the first + * subscriber connects). + * + * Return: + * ``0`` if execution was successful, ``negative value`` otherwise. + * + * .. warning:: This API has been marked as unstable: it works as advertised, but it may be changed in a future release. + */ +z_result_t z_publisher_declare_matching_listener(const z_loaned_publisher_t *publisher, + z_owned_matching_listener_t *matching_listener, + z_moved_closure_matching_status_t *callback); +/** + * Gets publisher matching status - i.e. if there are any subscribers matching its key expression. + * + * Return: + * ``0`` if execution was successful, ``negative value`` otherwise. + * + * .. warning:: This API has been marked as unstable: it works as advertised, but it may be changed in a future release. + */ +z_result_t z_publisher_get_matching_status(const z_loaned_publisher_t *publisher, z_matching_status_t *matching_status); +#endif // Z_FEATURE_MATCHING == 1 + +#endif // Z_FEATURE_PUBLICATION == 1 #if Z_FEATURE_QUERY == 1 /** diff --git a/include/zenoh-pico/api/types.h b/include/zenoh-pico/api/types.h index c5a76dbd8..25725c734 100644 --- a/include/zenoh-pico/api/types.h +++ b/include/zenoh-pico/api/types.h @@ -22,6 +22,7 @@ #include "zenoh-pico/collections/slice.h" #include "zenoh-pico/collections/string.h" #include "zenoh-pico/net/encoding.h" +#include "zenoh-pico/net/matching.h" #include "zenoh-pico/net/publish.h" #include "zenoh-pico/net/query.h" #include "zenoh-pico/net/reply.h" @@ -29,6 +30,7 @@ #include "zenoh-pico/net/session.h" #include "zenoh-pico/net/subscribe.h" #include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/session/session.h" #ifdef __cplusplus extern "C" { @@ -110,6 +112,11 @@ _Z_OWNED_TYPE_VALUE(_z_subscriber_t, subscriber) */ _Z_OWNED_TYPE_VALUE(_z_publisher_t, publisher) +/** + * Represents a Zenoh Matching listener entity. + */ +_Z_OWNED_TYPE_VALUE(_z_matching_listener_t, matching_listener) + /** * Represents a Zenoh Queryable entity. */ @@ -130,6 +137,14 @@ _Z_OWNED_TYPE_VALUE(_z_encoding_t, encoding) */ _Z_OWNED_TYPE_VALUE(_z_value_t, reply_err) +#if defined(Z_FEATURE_MATCHING) +/** + * A struct that indicates if there exist Subscribers matching the Publisher's key expression or Queryables matching + * Querier's key expression and target. + */ +typedef _z_matching_status_t z_matching_status_t; +#endif + /** * Represents the configuration used to configure a subscriber upon declaration :c:func:`z_declare_subscriber`. */ @@ -410,7 +425,7 @@ _Z_OWNED_TYPE_VALUE(_z_reply_t, reply) */ _Z_OWNED_TYPE_VALUE(_z_string_svec_t, string_array) -typedef void (*z_closure_drop_callback_t)(void *arg); +typedef _z_drop_handler_t z_closure_drop_callback_t; typedef _z_closure_sample_callback_t z_closure_sample_callback_t; typedef struct { @@ -476,6 +491,15 @@ typedef struct { */ _Z_OWNED_TYPE_VALUE(_z_closure_zid_t, closure_zid) +#if defined(Z_FEATURE_MATCHING) +typedef _z_closure_matching_status_callback_t z_closure_matching_status_callback_t; +typedef _z_closure_matching_status_t z_closure_matching_status_t; +/** + * Represents the matching status callback closure. + */ +_Z_OWNED_TYPE_VALUE(_z_closure_matching_status_t, closure_matching_status) +#endif + #ifdef __cplusplus } #endif diff --git a/include/zenoh-pico/config.h b/include/zenoh-pico/config.h index 3542c7d1d..abce70efd 100644 --- a/include/zenoh-pico/config.h +++ b/include/zenoh-pico/config.h @@ -47,6 +47,7 @@ #define Z_FEATURE_LOCAL_SUBSCRIBER 0 #define Z_FEATURE_PUBLISHER_SESSION_CHECK 1 #define Z_FEATURE_BATCHING 1 +#define Z_FEATURE_MATCHING 1 #define Z_FEATURE_RX_CACHE 0 #define Z_FEATURE_AUTO_RECONNECT 1 // End of CMake generation diff --git a/include/zenoh-pico/config.h.in b/include/zenoh-pico/config.h.in index a3748c432..897526cc0 100644 --- a/include/zenoh-pico/config.h.in +++ b/include/zenoh-pico/config.h.in @@ -47,6 +47,7 @@ #define Z_FEATURE_LOCAL_SUBSCRIBER @Z_FEATURE_LOCAL_SUBSCRIBER@ #define Z_FEATURE_PUBLISHER_SESSION_CHECK @Z_FEATURE_PUBLISHER_SESSION_CHECK@ #define Z_FEATURE_BATCHING @Z_FEATURE_BATCHING@ +#define Z_FEATURE_MATCHING @Z_FEATURE_MATCHING@ #define Z_FEATURE_RX_CACHE @Z_FEATURE_RX_CACHE@ #define Z_FEATURE_AUTO_RECONNECT @Z_FEATURE_AUTO_RECONNECT@ // End of CMake generation diff --git a/include/zenoh-pico/net/matching.h b/include/zenoh-pico/net/matching.h new file mode 100644 index 000000000..6da923482 --- /dev/null +++ b/include/zenoh-pico/net/matching.h @@ -0,0 +1,48 @@ +// +// Copyright (c) 2025 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#ifndef INCLUDE_ZENOH_PICO_NET_MATCHING_H +#define INCLUDE_ZENOH_PICO_NET_MATCHING_H + +#include "zenoh-pico/net/session.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct _z_matching_listener_t { + uint32_t _id; + uint32_t _interest_id; + _z_session_weak_t _zn; +} _z_matching_listener_t; + +#if Z_FEATURE_MATCHING == 1 +_z_matching_listener_t _z_matching_listener_declare(_z_session_rc_t *zn, const _z_keyexpr_t *key, _z_zint_t entity_id, + _z_closure_matching_status_t callback); +z_result_t _z_matching_listener_entity_undeclare(_z_session_t *zn, _z_zint_t entity_id); +z_result_t _z_matching_listener_undeclare(_z_matching_listener_t *listener); +// Warning: None of the sub-types require a non-0 initialization. Add a init function if it changes. +static inline _z_matching_listener_t _z_matching_listener_null(void) { return (_z_matching_listener_t){0}; } +static inline bool _z_matching_listener_check(const _z_matching_listener_t *matching_listener) { + return !_Z_RC_IS_NULL(&matching_listener->_zn); +} +void _z_matching_listener_clear(_z_matching_listener_t *pub); +void _z_matching_listener_free(_z_matching_listener_t **pub); +#endif + +#ifdef __cplusplus +} +#endif + +#endif /* INCLUDE_ZENOH_PICO_NET_MATCHING_H */ diff --git a/include/zenoh-pico/net/session.h b/include/zenoh-pico/net/session.h index 5a608f933..62ebb543c 100644 --- a/include/zenoh-pico/net/session.h +++ b/include/zenoh-pico/net/session.h @@ -23,6 +23,7 @@ #include "zenoh-pico/protocol/core.h" #include "zenoh-pico/protocol/definitions/network.h" #include "zenoh-pico/session/liveliness.h" +#include "zenoh-pico/session/matching.h" #include "zenoh-pico/session/queryable.h" #include "zenoh-pico/session/session.h" #include "zenoh-pico/session/subscription.h" @@ -93,6 +94,10 @@ typedef struct _z_session_t { _z_pending_query_list_t *_pending_queries; #endif +#if Z_FEATURE_MATCHING == 1 + _z_matching_listener_intmap_t _matching_listeners; +#endif + // Session interests #if Z_FEATURE_INTEREST == 1 _z_session_interest_rc_list_t *_local_interests; diff --git a/include/zenoh-pico/session/matching.h b/include/zenoh-pico/session/matching.h new file mode 100644 index 000000000..b9afa1372 --- /dev/null +++ b/include/zenoh-pico/session/matching.h @@ -0,0 +1,66 @@ +// +// Copyright (c) 2025 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#ifndef INCLUDE_ZENOH_PICO_SESSION_MATCHING_H +#define INCLUDE_ZENOH_PICO_SESSION_MATCHING_H + +#include "zenoh-pico/collections/element.h" +#include "zenoh-pico/collections/intmap.h" +#include "zenoh-pico/session/session.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct { + bool matching; // true if there exist matching Zenoh entities, false otherwise. +} _z_matching_status_t; + +typedef void (*_z_closure_matching_status_callback_t)(const _z_matching_status_t *status, void *arg); + +typedef struct { + void *context; + _z_closure_matching_status_callback_t call; + _z_drop_handler_t drop; +} _z_closure_matching_status_t; + +#if Z_FEATURE_MATCHING == 1 +typedef struct _z_matching_listener_ctx_t { + uint32_t decl_id; + _z_closure_matching_status_t callback; +} _z_matching_listener_ctx_t; + +typedef struct { + uint32_t interest_id; + _z_zint_t entity_id; + _z_matching_listener_ctx_t *ctx; +} _z_matching_listener_state_t; + +_z_matching_listener_ctx_t *_z_matching_listener_ctx_new(_z_closure_matching_status_t callback); +void _z_matching_listener_ctx_clear(_z_matching_listener_ctx_t *ctx); + +_z_matching_listener_state_t *_z_matching_listener_state_new(uint32_t interest_id, _z_zint_t entity_id, + _z_matching_listener_ctx_t *ctx); +void _z_matching_listener_state_clear(_z_matching_listener_state_t *state); + +_Z_ELEM_DEFINE(_z_matching_listener, _z_matching_listener_state_t, _z_noop_size, _z_matching_listener_state_clear, + _z_noop_copy, _z_noop_move) +_Z_INT_MAP_DEFINE(_z_matching_listener, _z_matching_listener_state_t) +#endif + +#ifdef __cplusplus +} +#endif + +#endif /* INCLUDE_ZENOH_PICO_SESSION_MATCHING_H */ diff --git a/src/api/api.c b/src/api/api.c index f6a209ac9..bb83f39d5 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -28,6 +28,7 @@ #include "zenoh-pico/net/config.h" #include "zenoh-pico/net/filtering.h" #include "zenoh-pico/net/logger.h" +#include "zenoh-pico/net/matching.h" #include "zenoh-pico/net/primitives.h" #include "zenoh-pico/net/sample.h" #include "zenoh-pico/net/session.h" @@ -477,6 +478,13 @@ void z_closure_zid_call(const z_loaned_closure_zid_t *closure, const z_id_t *id) } } +void z_closure_matching_status_call(const z_loaned_closure_matching_status_t *closure, + const z_matching_status_t *status) { + if (closure->call != NULL) { + (closure->call)(status, closure->context); + } +} + bool _z_config_check(const _z_config_t *config) { return !_z_str_intmap_is_empty(config); } _z_config_t _z_config_null(void) { return _z_str_intmap_make(); } z_result_t _z_config_copy(_z_config_t *dst, const _z_config_t *src) { @@ -570,6 +578,8 @@ _Z_OWNED_FUNCTIONS_CLOSURE_IMPL(closure_query, _z_closure_query_callback_t, z_cl _Z_OWNED_FUNCTIONS_CLOSURE_IMPL(closure_reply, _z_closure_reply_callback_t, z_closure_drop_callback_t) _Z_OWNED_FUNCTIONS_CLOSURE_IMPL(closure_hello, z_closure_hello_callback_t, z_closure_drop_callback_t) _Z_OWNED_FUNCTIONS_CLOSURE_IMPL(closure_zid, z_closure_zid_callback_t, z_closure_drop_callback_t) +_Z_OWNED_FUNCTIONS_CLOSURE_IMPL(closure_matching_status, _z_closure_matching_status_callback_t, + z_closure_drop_callback_t) /************* Primitives **************/ typedef struct __z_hello_handler_wrapper_t { @@ -1096,7 +1106,41 @@ z_result_t z_publisher_delete(const z_loaned_publisher_t *pub, const z_publisher const z_loaned_keyexpr_t *z_publisher_keyexpr(const z_loaned_publisher_t *publisher) { return (const z_loaned_keyexpr_t *)&publisher->_key; } -#endif + +#if Z_FEATURE_MATCHING == 1 +z_result_t z_publisher_declare_background_matching_listener(const z_loaned_publisher_t *publisher, + z_moved_closure_matching_status_t *callback) { + z_owned_matching_listener_t listener; + _Z_RETURN_IF_ERR(z_publisher_declare_matching_listener(publisher, &listener, callback)); + _z_matching_listener_clear(&listener._val); + return _Z_RES_OK; +} + +z_result_t z_publisher_declare_matching_listener(const z_loaned_publisher_t *publisher, + z_owned_matching_listener_t *matching_listener, + z_moved_closure_matching_status_t *callback) { + _z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&publisher->_zn); + _z_matching_listener_t listener = + _z_matching_listener_declare(&sess_rc, &publisher->_key, publisher->_id, callback->_this._val); + _z_session_rc_drop(&sess_rc); + + z_internal_closure_matching_status_null(&callback->_this); + + matching_listener->_val = listener; + + return _z_matching_listener_check(&listener) ? _Z_RES_OK : _Z_ERR_GENERIC; +} + +z_result_t z_publisher_get_matching_status(const z_loaned_publisher_t *publisher, + z_matching_status_t *matching_status) { + // Ideally this should be implemented as a real request to the router, but this works much faster. + // And it works as long as filtering is enabled along with interest + matching_status->matching = publisher->_filter.ctx->state != WRITE_FILTER_ACTIVE; + return _Z_RES_OK; +} +#endif // Z_FEATURE_MATCHING == 1 + +#endif // Z_FEATURE_PUBLICATION == 1 #if Z_FEATURE_QUERY == 1 bool _z_reply_check(const _z_reply_t *reply) { @@ -1519,6 +1563,17 @@ z_result_t zp_batch_stop(const z_loaned_session_t *zs) { } #endif +#if Z_FEATURE_MATCHING == 1 +void _z_matching_listener_drop(_z_matching_listener_t *listener) { + _z_matching_listener_undeclare(listener); + _z_matching_listener_clear(listener); +} + +_Z_OWNED_FUNCTIONS_VALUE_NO_COPY_IMPL(_z_matching_listener_t, matching_listener, _z_matching_listener_check, + _z_matching_listener_null, _z_matching_listener_drop) + +#endif + /**************** Tasks ****************/ void zp_task_read_options_default(zp_task_read_options_t *options) { #if Z_FEATURE_MULTI_THREAD == 1 diff --git a/src/net/matching.c b/src/net/matching.c new file mode 100644 index 000000000..e2c7c8ada --- /dev/null +++ b/src/net/matching.c @@ -0,0 +1,119 @@ +// +// Copyright (c) 2025 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include "zenoh-pico/net/matching.h" + +#include + +#include "zenoh-pico/api/primitives.h" +#include "zenoh-pico/api/types.h" +#include "zenoh-pico/net/primitives.h" +#include "zenoh-pico/net/session.h" +#include "zenoh-pico/session/matching.h" +#include "zenoh-pico/session/resource.h" +#include "zenoh-pico/utils/logging.h" +#include "zenoh-pico/utils/result.h" + +#if Z_FEATURE_MATCHING == 1 +static void _z_matching_listener_callback(const _z_interest_msg_t *msg, void *arg) { + _z_matching_listener_ctx_t *ctx = (_z_matching_listener_ctx_t *)arg; + + switch (msg->type) { + case _Z_INTEREST_MSG_TYPE_DECL_SUBSCRIBER: { + ctx->decl_id = msg->id; + z_matching_status_t status = {.matching = true}; + z_closure_matching_status_call(&ctx->callback, &status); + break; + } + + case _Z_INTEREST_MSG_TYPE_UNDECL_SUBSCRIBER: { + if (ctx->decl_id == msg->id) { + ctx->decl_id = 0; + z_matching_status_t status = {.matching = false}; + z_closure_matching_status_call(&ctx->callback, &status); + } + break; + } + + default: + break; + } +} + +_z_matching_listener_t _z_matching_listener_declare(_z_session_rc_t *zn, const _z_keyexpr_t *key, _z_zint_t entity_id, + _z_closure_matching_status_t callback) { + uint8_t flags = _Z_INTEREST_FLAG_SUBSCRIBERS | _Z_INTEREST_FLAG_RESTRICTED | _Z_INTEREST_FLAG_FUTURE | + _Z_INTEREST_FLAG_AGGREGATE; + _z_matching_listener_t ret = _z_matching_listener_null(); + + _z_matching_listener_ctx_t *ctx = _z_matching_listener_ctx_new(callback); + if (ctx == NULL) { + return ret; + } + + ret._interest_id = _z_add_interest(_Z_RC_IN_VAL(zn), _z_keyexpr_alias_from_user_defined(*key, true), + _z_matching_listener_callback, flags, (void *)ctx); + if (ret._interest_id == 0) { + _z_matching_listener_ctx_clear(ctx); + return ret; + } + + ret._id = _z_get_entity_id(_Z_RC_IN_VAL(zn)); + ret._zn = _z_session_rc_clone_as_weak(zn); + + _z_matching_listener_intmap_insert(&_Z_RC_IN_VAL(zn)->_matching_listeners, ret._id, + _z_matching_listener_state_new(ret._interest_id, entity_id, ctx)); + + return ret; +} + +z_result_t _z_matching_listener_entity_undeclare(_z_session_t *zn, _z_zint_t entity_id) { + _z_matching_listener_intmap_iterator_t iter = _z_matching_listener_intmap_iterator_make(&zn->_matching_listeners); + bool has_next = _z_matching_listener_intmap_iterator_next(&iter); + while (has_next) { + size_t key = _z_matching_listener_intmap_iterator_key(&iter); + _z_matching_listener_state_t *listener = _z_matching_listener_intmap_iterator_value(&iter); + has_next = _z_matching_listener_intmap_iterator_next(&iter); + if (listener->entity_id == entity_id) { + _Z_DEBUG("_z_matching_listener_entity_undeclare: entity=%i, listener=%i", (int)entity_id, (int)key); + _z_remove_interest(zn, listener->interest_id); + _z_matching_listener_intmap_remove(&zn->_matching_listeners, key); + } + } + return _Z_RES_OK; +} + +z_result_t _z_matching_listener_undeclare(_z_matching_listener_t *listener) { + _z_session_t *zn = _Z_RC_IN_VAL(&listener->_zn); + _z_matching_listener_intmap_remove(&zn->_matching_listeners, listener->_id); + return _z_remove_interest(zn, listener->_interest_id); +} + +void _z_matching_listener_clear(_z_matching_listener_t *listener) { + _z_session_weak_drop(&listener->_zn); + *listener = _z_matching_listener_null(); +} + +void _z_matching_listener_free(_z_matching_listener_t **listener) { + _z_matching_listener_t *ptr = *listener; + + if (ptr != NULL) { + _z_matching_listener_clear(ptr); + + z_free(ptr); + *listener = NULL; + } +} + +#endif diff --git a/src/net/primitives.c b/src/net/primitives.c index dcad5d6b9..c81b37d1b 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -22,6 +22,7 @@ #include "zenoh-pico/config.h" #include "zenoh-pico/net/filtering.h" #include "zenoh-pico/net/logger.h" +#include "zenoh-pico/net/matching.h" #include "zenoh-pico/net/sample.h" #include "zenoh-pico/net/session.h" #include "zenoh-pico/protocol/core.h" @@ -180,6 +181,9 @@ z_result_t _z_undeclare_publisher(_z_publisher_t *pub) { if (pub == NULL || _Z_RC_IS_NULL(&pub->_zn)) { return _Z_ERR_ENTITY_UNKNOWN; } +#if Z_FEATURE_MATCHING == 1 + _z_matching_listener_entity_undeclare(_Z_RC_IN_VAL(&pub->_zn), pub->_id); +#endif // Clear publisher _z_write_filter_destroy(pub); _z_undeclare_resource(_Z_RC_IN_VAL(&pub->_zn), pub->_key._id); diff --git a/src/session/matching.c b/src/session/matching.c new file mode 100644 index 000000000..ddeb425ca --- /dev/null +++ b/src/session/matching.c @@ -0,0 +1,50 @@ +// +// Copyright (c) 2025 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include "zenoh-pico/session/matching.h" + +#if Z_FEATURE_MATCHING == 1 +_z_matching_listener_ctx_t *_z_matching_listener_ctx_new(_z_closure_matching_status_t callback) { + _z_matching_listener_ctx_t *ctx = z_malloc(sizeof(_z_matching_listener_ctx_t)); + + ctx->decl_id = 0; + ctx->callback = callback; + + return ctx; +} + +void _z_matching_listener_ctx_clear(_z_matching_listener_ctx_t *ctx) { + if (ctx->callback.drop != NULL) { + ctx->callback.drop(ctx->callback.context); + } +} + +_z_matching_listener_state_t *_z_matching_listener_state_new(uint32_t interest_id, _z_zint_t entity_id, + _z_matching_listener_ctx_t *ctx) { + _z_matching_listener_state_t *state = z_malloc(sizeof(_z_matching_listener_state_t)); + + state->interest_id = interest_id; + state->entity_id = entity_id; + state->ctx = ctx; + + return state; +} + +void _z_matching_listener_state_clear(_z_matching_listener_state_t *state) { + if (state->ctx != NULL) { + _z_matching_listener_ctx_clear(state->ctx); + z_free(state->ctx); + } +} +#endif // Z_FEATURE_MATCHING == 1 diff --git a/src/session/utils.c b/src/session/utils.c index 2a9c3f7d5..5767cd410 100644 --- a/src/session/utils.c +++ b/src/session/utils.c @@ -21,6 +21,7 @@ #include "zenoh-pico/protocol/definitions/network.h" #include "zenoh-pico/session/interest.h" #include "zenoh-pico/session/liveliness.h" +#include "zenoh-pico/session/matching.h" #include "zenoh-pico/session/query.h" #include "zenoh-pico/session/queryable.h" #include "zenoh-pico/session/resource.h" @@ -95,6 +96,10 @@ z_result_t _z_session_init(_z_session_t *zn, const _z_id_t *zid) { _z_liveliness_init(zn); #endif +#if Z_FEATURE_MATCHING == 1 + zn->_matching_listeners = _z_matching_listener_intmap_make(); +#endif + _z_interest_init(zn); zn->_local_zid = *zid; @@ -141,6 +146,11 @@ void _z_session_clear(_z_session_t *zn) { #if Z_FEATURE_LIVELINESS == 1 _z_liveliness_clear(zn); #endif + +#if Z_FEATURE_MATCHING == 1 + _z_matching_listener_intmap_clear(&zn->_matching_listeners); +#endif + _z_flush_interest(zn); #if Z_FEATURE_MULTI_THREAD == 1 diff --git a/tests/memory_leak.py b/tests/memory_leak.py index bc37bbf2f..89e727f0a 100644 --- a/tests/memory_leak.py +++ b/tests/memory_leak.py @@ -167,6 +167,9 @@ def query_and_queryable(query_cmd, queryable_cmd): print("*** Pub & sub attachment test ***") if pub_and_sub('z_pub_attachment -n 1', 'z_sub_attachment -n 1') == 1: EXIT_STATUS = 1 + print("*** Pub & sub listener test ***") + if pub_and_sub('z_pub -n 1 -a', 'z_sub -n 1') == 1: + EXIT_STATUS = 1 # Test query and queryable examples print("*** Query & queryable test ***") if query_and_queryable('z_get', 'z_queryable -n 1') == 1: diff --git a/tests/z_api_matching_test.c b/tests/z_api_matching_test.c new file mode 100644 index 000000000..f77a726c8 --- /dev/null +++ b/tests/z_api_matching_test.c @@ -0,0 +1,259 @@ +// +// Copyright (c) 2025 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#include +#include +#include +#include +#include +#include + +#include "zenoh-pico.h" + +#if Z_FEATURE_MATCHING == 1 && Z_FEATURE_SUBSCRIPTION == 1 && Z_FEATURE_PUBLICATION == 1 + +#undef NDEBUG +#include + +typedef enum { NONE, MATCH, UNMATCH, DROP } context_state_t; + +typedef struct context_t { + z_owned_condvar_t cv; + z_owned_mutex_t m; + context_state_t state; +} context_t; + +static void _context_init(context_t* c) { + z_condvar_init(&c->cv); + z_mutex_init(&c->m); + c->state = NONE; +} + +static void _context_drop(context_t* c) { + z_condvar_drop(z_condvar_move(&c->cv)); + z_mutex_drop(z_mutex_move(&c->m)); +} + +static void _context_wait(context_t* c, context_state_t state, int timeout_s) { + z_mutex_lock(z_mutex_loan_mut(&c->m)); + if (c->state != state) { + printf("Waiting for state %d...\n", state); +#ifdef ZENOH_MACOS + _ZP_UNUSED(timeout_s); + z_condvar_wait(z_condvar_loan_mut(&c->cv), z_mutex_loan_mut(&c->m)); +#else + z_clock_t clock = z_clock_now(); + z_clock_advance_s(&clock, timeout_s); + z_result_t res = z_condvar_wait_until(z_condvar_loan_mut(&c->cv), z_mutex_loan_mut(&c->m), &clock); + if (res == Z_ETIMEDOUT) { + fprintf(stderr, "Timeout waiting for state %d\n", state); + assert(false); + } +#endif + if (c->state != state) { + fprintf(stderr, "Expected state %d, got %d\n", state, c->state); + assert(false); + } + } + c->state = NONE; + z_mutex_unlock(z_mutex_loan_mut(&c->m)); +} + +static void _context_notify(context_t* c, context_state_t state) { + z_mutex_lock(z_mutex_loan_mut(&c->m)); + if (c->state != NONE) { + fprintf(stderr, "State already set %d\n", c->state); + assert(false); + } + c->state = state; + fprintf(stderr, "State recieved %d\n", state); + z_condvar_signal(z_condvar_loan_mut(&c->cv)); + z_mutex_unlock(z_mutex_loan_mut(&c->m)); +} + +#define assert_ok(x) \ + { \ + int ret = (int)x; \ + if (ret != Z_OK) { \ + fprintf(stderr, "%s failed: %d\n", #x, ret); \ + assert(false); \ + } \ + } + +const char* pub_expr = "zenoh-pico/matching/test/val"; +const char* sub_expr = "zenoh-pico/matching/test/*"; +const char* sub_expr_wrong = "zenoh-pico/matching/test_wrong/*"; + +void on_receive(const z_matching_status_t* s, void* context) { + context_t* c = (context_t*)context; + _context_notify(c, s->matching ? MATCH : UNMATCH); +} + +void on_drop(void* context) { + context_t* c = (context_t*)context; + _context_notify(c, DROP); +} + +void test_matching_sub(bool background) { + printf("test_matching_sub: background=%d\n", background); + + context_t context = {0}; + _context_init(&context); + + z_owned_session_t s1, s2; + z_owned_config_t c1, c2; + z_config_default(&c1); + z_config_default(&c2); + z_view_keyexpr_t k_sub, k_pub; + z_view_keyexpr_from_str(&k_sub, sub_expr); + z_view_keyexpr_from_str(&k_pub, pub_expr); + + assert_ok(z_open(&s1, z_config_move(&c1), NULL)); + assert_ok(z_open(&s2, z_config_move(&c2), NULL)); + + assert_ok(zp_start_read_task(z_loan_mut(s1), NULL)); + assert_ok(zp_start_read_task(z_loan_mut(s2), NULL)); + assert_ok(zp_start_lease_task(z_loan_mut(s1), NULL)); + assert_ok(zp_start_lease_task(z_loan_mut(s2), NULL)); + + z_owned_publisher_t pub; + assert_ok(z_declare_publisher(z_session_loan(&s1), &pub, z_view_keyexpr_loan(&k_pub), NULL)); + + z_owned_closure_matching_status_t closure; + z_closure_matching_status(&closure, on_receive, on_drop, (void*)(&context)); + + z_owned_matching_listener_t matching_listener; + if (background) { + assert_ok(z_publisher_declare_background_matching_listener(z_publisher_loan(&pub), + z_closure_matching_status_move(&closure))); + } else { + assert_ok(z_publisher_declare_matching_listener(z_publisher_loan(&pub), &matching_listener, + z_closure_matching_status_move(&closure))); + } + + z_owned_subscriber_t sub; + z_owned_closure_sample_t callback; + z_closure_sample(&callback, NULL, NULL, NULL); + assert_ok(z_declare_subscriber(z_session_loan(&s2), &sub, z_view_keyexpr_loan(&k_sub), + z_closure_sample_move(&callback), NULL)); + + _context_wait(&context, MATCH, 10); + + z_subscriber_drop(z_subscriber_move(&sub)); + + _context_wait(&context, UNMATCH, 10); + + z_publisher_drop(z_publisher_move(&pub)); + + _context_wait(&context, DROP, 10); + + if (!background) { + z_matching_listener_drop(z_matching_listener_move(&matching_listener)); + } + + assert_ok(zp_stop_read_task(z_loan_mut(s1))); + assert_ok(zp_stop_read_task(z_loan_mut(s2))); + assert_ok(zp_stop_lease_task(z_loan_mut(s1))); + assert_ok(zp_stop_lease_task(z_loan_mut(s2))); + + z_session_drop(z_session_move(&s1)); + z_session_drop(z_session_move(&s2)); + + _context_drop(&context); +} + +static void _check_status(z_owned_publisher_t* pub, bool expected) { + z_matching_status_t status; + status.matching = !expected; + z_clock_t clock = z_clock_now(); + while (status.matching != expected && z_clock_elapsed_s(&clock) < 10) { + assert_ok(z_publisher_get_matching_status(z_publisher_loan(pub), &status)); + z_sleep_ms(100); + } + if (status.matching != expected) { + fprintf(stderr, "Expected matching status %d, got %d\n", expected, status.matching); + assert(false); + } +} + +void test_matching_get(void) { + z_owned_session_t s1, s2; + z_owned_config_t c1, c2; + z_config_default(&c1); + z_config_default(&c2); + z_view_keyexpr_t k_sub, k_pub, k_sub_wrong; + z_view_keyexpr_from_str(&k_sub, sub_expr); + z_view_keyexpr_from_str(&k_pub, pub_expr); + z_view_keyexpr_from_str(&k_sub_wrong, sub_expr_wrong); + + assert_ok(z_open(&s1, z_config_move(&c1), NULL)); + assert_ok(z_open(&s2, z_config_move(&c2), NULL)); + + assert_ok(zp_start_read_task(z_loan_mut(s1), NULL)); + assert_ok(zp_start_read_task(z_loan_mut(s2), NULL)); + assert_ok(zp_start_lease_task(z_loan_mut(s1), NULL)); + assert_ok(zp_start_lease_task(z_loan_mut(s2), NULL)); + + z_owned_publisher_t pub; + assert_ok(z_declare_publisher(z_session_loan(&s1), &pub, z_view_keyexpr_loan(&k_pub), NULL)); + z_sleep_s(1); + + _check_status(&pub, false); + + z_owned_subscriber_t sub_wrong; + z_owned_closure_sample_t callback_wrong; + z_closure_sample(&callback_wrong, NULL, NULL, NULL); + assert_ok(z_declare_subscriber(z_session_loan(&s2), &sub_wrong, z_view_keyexpr_loan(&k_sub_wrong), + z_closure_sample_move(&callback_wrong), NULL)); + z_sleep_s(1); + + _check_status(&pub, false); + + z_owned_subscriber_t sub; + z_owned_closure_sample_t callback; + z_closure_sample(&callback, NULL, NULL, NULL); + assert_ok(z_declare_subscriber(z_session_loan(&s2), &sub, z_view_keyexpr_loan(&k_sub), + z_closure_sample_move(&callback), NULL)); + + _check_status(&pub, true); + + z_subscriber_drop(z_subscriber_move(&sub)); + + _check_status(&pub, false); + + z_publisher_drop(z_publisher_move(&pub)); + z_subscriber_drop(z_subscriber_move(&sub_wrong)); + + assert_ok(zp_stop_read_task(z_loan_mut(s1))); + assert_ok(zp_stop_read_task(z_loan_mut(s2))); + assert_ok(zp_stop_lease_task(z_loan_mut(s1))); + assert_ok(zp_stop_lease_task(z_loan_mut(s2))); + + z_session_drop(z_session_move(&s1)); + z_session_drop(z_session_move(&s2)); +} + +int main(int argc, char** argv) { + (void)argc; + (void)argv; + test_matching_sub(true); + test_matching_sub(false); + test_matching_get(); +} + +#else +int main(int argc, char** argv) { + (void)argc; + (void)argv; +} +#endif diff --git a/tests/z_collections_test.c b/tests/z_collections_test.c index 0fe9bab17..c49121569 100644 --- a/tests/z_collections_test.c +++ b/tests/z_collections_test.c @@ -351,6 +351,27 @@ void int_map_iterator_test(void) { #undef TEST_MAP } +void int_map_iterator_deletion_test(void) { + _z_str_intmap_t map; + + map = _z_str_intmap_make(); + _z_str_intmap_insert(&map, 10, _z_str_clone("A")); + _z_str_intmap_insert(&map, 20, _z_str_clone("B")); + _z_str_intmap_insert(&map, 30, _z_str_clone("C")); + _z_str_intmap_insert(&map, 40, _z_str_clone("D")); + + _z_str_intmap_iterator_t iter = _z_str_intmap_iterator_make(&map); + _z_str_intmap_iterator_next(&iter); + for (size_t s = 4; s != 0; s--) { + assert(s == _z_str_intmap_len(&map)); + size_t key = _z_str_intmap_iterator_key(&iter); + assert(strlen(_z_str_intmap_iterator_value(&iter)) == 1); + _z_str_intmap_remove(&map, key); + _z_str_intmap_iterator_next(&iter); + } + _z_str_intmap_clear(&map); +} + int main(void) { ring_test(); ring_test_init_free(); @@ -360,4 +381,5 @@ int main(void) { fifo_test_init_free(); int_map_iterator_test(); + int_map_iterator_deletion_test(); } From ee4832d83e2edc548d9af145f94af2f187f83e8d Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Thu, 16 Jan 2025 17:13:04 +0100 Subject: [PATCH 2/3] Add z_undeclare_matching_listener --- include/zenoh-pico/api/primitives.h | 12 ++++++++++++ include/zenoh-pico/net/matching.h | 2 +- include/zenoh-pico/session/matching.h | 2 +- src/api/api.c | 5 +++++ 4 files changed, 19 insertions(+), 2 deletions(-) diff --git a/include/zenoh-pico/api/primitives.h b/include/zenoh-pico/api/primitives.h index 6c84b65c0..3b756a583 100644 --- a/include/zenoh-pico/api/primitives.h +++ b/include/zenoh-pico/api/primitives.h @@ -1735,6 +1735,18 @@ z_result_t z_publisher_declare_matching_listener(const z_loaned_publisher_t *pub * .. warning:: This API has been marked as unstable: it works as advertised, but it may be changed in a future release. */ z_result_t z_publisher_get_matching_status(const z_loaned_publisher_t *publisher, z_matching_status_t *matching_status); + +/** + * Undeclares the matching listener. + * + * Parameters: + * listener: Moved :c:type:`z_owned_matching_listener_t` to undeclare. + * + * Return: + * ``0`` if undeclare is successful, ``negative value`` otherwise. + */ +z_result_t z_undeclare_matching_listener(z_moved_matching_listener_t *listener); + #endif // Z_FEATURE_MATCHING == 1 #endif // Z_FEATURE_PUBLICATION == 1 diff --git a/include/zenoh-pico/net/matching.h b/include/zenoh-pico/net/matching.h index 6da923482..d5ab26c97 100644 --- a/include/zenoh-pico/net/matching.h +++ b/include/zenoh-pico/net/matching.h @@ -39,7 +39,7 @@ static inline bool _z_matching_listener_check(const _z_matching_listener_t *matc } void _z_matching_listener_clear(_z_matching_listener_t *pub); void _z_matching_listener_free(_z_matching_listener_t **pub); -#endif +#endif // Z_FEATURE_MATCHING == 1 #ifdef __cplusplus } diff --git a/include/zenoh-pico/session/matching.h b/include/zenoh-pico/session/matching.h index b9afa1372..e878a8d4b 100644 --- a/include/zenoh-pico/session/matching.h +++ b/include/zenoh-pico/session/matching.h @@ -57,7 +57,7 @@ void _z_matching_listener_state_clear(_z_matching_listener_state_t *state); _Z_ELEM_DEFINE(_z_matching_listener, _z_matching_listener_state_t, _z_noop_size, _z_matching_listener_state_clear, _z_noop_copy, _z_noop_move) _Z_INT_MAP_DEFINE(_z_matching_listener, _z_matching_listener_state_t) -#endif +#endif // Z_FEATURE_MATCHING == 1 #ifdef __cplusplus } diff --git a/src/api/api.c b/src/api/api.c index bb83f39d5..afbda16f8 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -1572,6 +1572,11 @@ void _z_matching_listener_drop(_z_matching_listener_t *listener) { _Z_OWNED_FUNCTIONS_VALUE_NO_COPY_IMPL(_z_matching_listener_t, matching_listener, _z_matching_listener_check, _z_matching_listener_null, _z_matching_listener_drop) +z_result_t z_undeclare_matching_listener(z_moved_matching_listener_t *listener) { + z_result_t ret = _z_matching_listener_undeclare(&listener->_this._val); + _z_matching_listener_clear(&listener->_this._val); + return ret; +} #endif /**************** Tasks ****************/ From ce01ea049e0c26ae27cbb39052ca8c175bfc51f6 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Thu, 16 Jan 2025 17:15:43 +0100 Subject: [PATCH 3/3] Update docs --- docs/api.rst | 58 ++++++++++++++++++++++++++++++++++ docs/conf.py | 1 + include/zenoh-pico/api/types.h | 6 ++-- 3 files changed, 61 insertions(+), 4 deletions(-) diff --git a/docs/api.rst b/docs/api.rst index 2f669d228..6b8a60c3e 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -626,6 +626,41 @@ See details at :ref:`owned_types_concept` .. c:function:: void z_closure_zid_drop(z_moved_closure_zid_t * closure) +Matching closure +---------- +Types +^^^^^ + +See details at :ref:`owned_types_concept` + +.. c:type:: z_owned_closure_matching_status_t +.. c:type:: z_loaned_closure_matching_status_t +.. c:type:: z_moved_closure_matching_status_t + +.. c:type:: void (* z_closure_matching_status_callback_t)(z_matching_status_t * status, void * arg); + + Function pointer type for handling matching status response. + Represents a callback function that is invoked when a matching status was changed. + + Parameters: + - **status** - Pointer to a :c:type:`z_matching_status_t`. + - **arg** - A user-defined pointer to additional data that can be used during the processing of the matching status. + + +Functions +^^^^^^^^^ +.. autocfunction:: primitives.h::z_closure_matching_status +.. autocfunction:: primitives.h::z_closure_matching_status_call + +Ownership Functions +^^^^^^^^^^^^^^^^^^^ + +See details at :ref:`owned_types_concept` + +.. c:function:: const z_loaned_closure_matching_status_t * z_closure_matching_status_loan(const z_owned_closure_matching_status_t * closure) +.. c:function:: void z_closure_matching_status_drop(z_moved_closure_matching_status_t * closure) + + .. _channels_concept: Channels @@ -955,6 +990,26 @@ See details at :ref:`owned_types_concept` .. c:function:: void z_session_drop(z_moved_session_t * closure) +Matching +======== + +Types +----- +See details at :ref:`owned_types_concept` + +.. c:type:: z_owned_matching_listener_t +.. c:type:: z_loaned_matching_listener_t +.. c:type:: z_moved_matching_listener_t + + +.. autoctype:: types.h::z_matching_status_t + +Functions +--------- + +.. autocfunction:: primitives.h::z_undeclare_matching_listener + + Publication =========== @@ -1002,6 +1057,9 @@ Functions .. autocfunction:: primitives.h::z_publisher_put_options_default .. autocfunction:: primitives.h::z_publisher_delete_options_default .. autocfunction:: primitives.h::z_reliability_default +.. autocfunction:: primitives.h::z_publisher_get_matching_status +.. autocfunction:: primitives.h::z_publisher_declare_matching_listener +.. autocfunction:: primitives.h::z_publisher_declare_background_matching_listener Ownership Functions ------------------- diff --git a/docs/conf.py b/docs/conf.py index e1ee877ba..6b541879f 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -38,6 +38,7 @@ "-DZ_FEATURE_QUERYABLE=1", "-DZ_FEATURE_ENCODING_VALUES=1", "-DZ_FEATURE_LIVELINESS=1", + "-DZ_FEATURE_MATCHING=1", ] # -- Options for HTML output ------------------------------------------------- diff --git a/include/zenoh-pico/api/types.h b/include/zenoh-pico/api/types.h index 25725c734..1712a2954 100644 --- a/include/zenoh-pico/api/types.h +++ b/include/zenoh-pico/api/types.h @@ -137,13 +137,13 @@ _Z_OWNED_TYPE_VALUE(_z_encoding_t, encoding) */ _Z_OWNED_TYPE_VALUE(_z_value_t, reply_err) -#if defined(Z_FEATURE_MATCHING) /** * A struct that indicates if there exist Subscribers matching the Publisher's key expression or Queryables matching * Querier's key expression and target. + * Members: + * bool matching: true if there exist matching Zenoh entities, false otherwise. */ typedef _z_matching_status_t z_matching_status_t; -#endif /** * Represents the configuration used to configure a subscriber upon declaration :c:func:`z_declare_subscriber`. @@ -491,14 +491,12 @@ typedef struct { */ _Z_OWNED_TYPE_VALUE(_z_closure_zid_t, closure_zid) -#if defined(Z_FEATURE_MATCHING) typedef _z_closure_matching_status_callback_t z_closure_matching_status_callback_t; typedef _z_closure_matching_status_t z_closure_matching_status_t; /** * Represents the matching status callback closure. */ _Z_OWNED_TYPE_VALUE(_z_closure_matching_status_t, closure_matching_status) -#endif #ifdef __cplusplus }