Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add matching subscribers #853

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ set(Z_FEATURE_TCP_NODELAY 1 CACHE STRING "Toggle TCP_NODELAY")
set(Z_FEATURE_LOCAL_SUBSCRIBER 0 CACHE STRING "Toggle local subscriptions")
set(Z_FEATURE_PUBLISHER_SESSION_CHECK 1 CACHE STRING "Toggle publisher session check")
set(Z_FEATURE_BATCHING 1 CACHE STRING "Toggle batching")
set(Z_FEATURE_MATCHING 1 CACHE STRING "Toggle matching feature")
set(Z_FEATURE_RX_CACHE 0 CACHE STRING "Toggle RX_CACHE")
set(Z_FEATURE_AUTO_RECONNECT 1 CACHE STRING "Toggle automatic reconnection")

Expand Down
31 changes: 28 additions & 3 deletions examples/unix/c11/z_pub.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,19 @@
#include <unistd.h>
#include <zenoh-pico.h>

#include "zenoh-pico/system/platform.h"

#if Z_FEATURE_PUBLICATION == 1

#if defined(Z_FEATURE_MATCHING)
void matching_status_handler(const z_matching_status_t *matching_status, void *arg) {
(void)arg;
if (matching_status->matching) {
printf("Publisher has matching subscribers.\n");
} else {
printf("Publisher has NO MORE matching subscribers.\n");
}
}
#endif

int main(int argc, char **argv) {
const char *keyexpr = "demo/example/zenoh-pico-pub";
char *const default_value = "Pub from Pico!";
Expand All @@ -31,9 +41,10 @@ int main(int argc, char **argv) {
char *clocator = NULL;
char *llocator = NULL;
int n = 2147483647; // max int value by default
bool add_matching_listener = false;

int opt;
while ((opt = getopt(argc, argv, "k:v:e:m:l:n:")) != -1) {
while ((opt = getopt(argc, argv, "k:v:e:m:l:n:a")) != -1) {
switch (opt) {
case 'k':
keyexpr = optarg;
Expand All @@ -53,6 +64,9 @@ int main(int argc, char **argv) {
case 'n':
n = atoi(optarg);
break;
case 'a':
add_matching_listener = true;
break;
case '?':
if (optopt == 'k' || optopt == 'v' || optopt == 'e' || optopt == 'm' || optopt == 'l' ||
optopt == 'n') {
Expand Down Expand Up @@ -104,6 +118,17 @@ int main(int argc, char **argv) {
return -1;
}

if (add_matching_listener) {
#if defined(Z_FEATURE_MATCHING)
z_owned_closure_matching_status_t callback;
z_closure(&callback, matching_status_handler, NULL, NULL);
z_publisher_declare_background_matching_listener(z_loan(pub), z_move(callback));
#else
printf("ERROR: Zenoh pico was compiled without Z_FEATURE_MATCHING but this example requires it.\n");
return -2;
#endif
}

// Publish data
printf("Press CTRL-C to quit...\n");
char buf[256];
Expand Down
437 changes: 248 additions & 189 deletions include/zenoh-pico/api/macros.h

Large diffs are not rendered by default.

61 changes: 60 additions & 1 deletion include/zenoh-pico/api/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -1111,13 +1111,24 @@ z_result_t z_closure_zid(z_owned_closure_zid_t *closure, z_closure_zid_callback_
*/
void z_closure_zid_call(const z_loaned_closure_zid_t *closure, const z_id_t *id);

/**
* Calls a matching status closure.
*
* Parameters:
* closure: Pointer to the :c:type:`z_loaned_closure_matching_status_t` to call.
* status: Pointer to the :c:type:`z_matching_status_t` to pass to the closure.
*/
void z_closure_matching_status_call(const z_loaned_closure_matching_status_t *closure,
const z_matching_status_t *status);

/**************** Loans ****************/
_Z_OWNED_FUNCTIONS_DEF(string)
_Z_OWNED_FUNCTIONS_DEF(keyexpr)
_Z_OWNED_FUNCTIONS_DEF(config)
_Z_OWNED_FUNCTIONS_NO_COPY_DEF(session)
_Z_OWNED_FUNCTIONS_NO_COPY_DEF(subscriber)
_Z_OWNED_FUNCTIONS_NO_COPY_DEF(publisher)
_Z_OWNED_FUNCTIONS_NO_COPY_DEF(matching_listener)
_Z_OWNED_FUNCTIONS_NO_COPY_DEF(queryable)
_Z_OWNED_FUNCTIONS_DEF(hello)
_Z_OWNED_FUNCTIONS_DEF(reply)
Expand All @@ -1135,6 +1146,7 @@ _Z_OWNED_FUNCTIONS_CLOSURE_DEF(closure_query)
_Z_OWNED_FUNCTIONS_CLOSURE_DEF(closure_reply)
_Z_OWNED_FUNCTIONS_CLOSURE_DEF(closure_hello)
_Z_OWNED_FUNCTIONS_CLOSURE_DEF(closure_zid)
_Z_OWNED_FUNCTIONS_CLOSURE_DEF(closure_matching_status)

_Z_VIEW_FUNCTIONS_DEF(keyexpr)
_Z_VIEW_FUNCTIONS_DEF(string)
Expand Down Expand Up @@ -1661,7 +1673,54 @@ z_result_t z_publisher_delete(const z_loaned_publisher_t *pub, const z_publisher
* The keyexpr wrapped as a :c:type:`z_loaned_keyexpr_t`.
*/
const z_loaned_keyexpr_t *z_publisher_keyexpr(const z_loaned_publisher_t *publisher);
#endif

#if Z_FEATURE_MATCHING == 1
/**
* Declares a matching listener, registering a callback for notifying subscribers matching with a given publisher.
* The callback will be run in the background until the corresponding publisher is dropped.
*
* Parameters:
* publisher: A publisher to associate with matching listener.
* callback: A closure that will be called every time the matching status of the publisher changes (If last subscriber
* disconnects or when the first subscriber connects).
*
* Return:
* ``0`` if execution was successful, ``negative value`` otherwise.
*
* .. warning:: This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
*/
z_result_t z_publisher_declare_background_matching_listener(const z_loaned_publisher_t *publisher,
z_moved_closure_matching_status_t *callback);
/**
* Constructs matching listener, registering a callback for notifying subscribers matching with a given publisher.
*
* Parameters:
* publisher: A publisher to associate with matching listener.
* matching_listener: An uninitialized memory location where matching listener will be constructed. The matching
* listener's callback will be automatically dropped when the publisher is dropped. callback: A closure that will be
* called every time the matching status of the publisher changes (If last subscriber disconnects or when the first
* subscriber connects).
*
* Return:
* ``0`` if execution was successful, ``negative value`` otherwise.
*
* .. warning:: This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
*/
z_result_t z_publisher_declare_matching_listener(const z_loaned_publisher_t *publisher,
z_owned_matching_listener_t *matching_listener,
z_moved_closure_matching_status_t *callback);
/**
* Gets publisher matching status - i.e. if there are any subscribers matching its key expression.
*
* Return:
* ``0`` if execution was successful, ``negative value`` otherwise.
*
* .. warning:: This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
*/
z_result_t z_publisher_get_matching_status(const z_loaned_publisher_t *publisher, z_matching_status_t *matching_status);
#endif // Z_FEATURE_MATCHING == 1

#endif // Z_FEATURE_PUBLICATION == 1

#if Z_FEATURE_QUERY == 1
/**
Expand Down
37 changes: 37 additions & 0 deletions include/zenoh-pico/api/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "zenoh-pico/collections/slice.h"
#include "zenoh-pico/collections/string.h"
#include "zenoh-pico/net/encoding.h"
#include "zenoh-pico/net/matching.h"
#include "zenoh-pico/net/publish.h"
#include "zenoh-pico/net/query.h"
#include "zenoh-pico/net/reply.h"
Expand Down Expand Up @@ -110,6 +111,11 @@ _Z_OWNED_TYPE_VALUE(_z_subscriber_t, subscriber)
*/
_Z_OWNED_TYPE_VALUE(_z_publisher_t, publisher)

/**
* Represents a Zenoh Matching listener entity.
*/
_Z_OWNED_TYPE_VALUE(_z_matching_listener_t, matching_listener)

/**
* Represents a Zenoh Queryable entity.
*/
Expand All @@ -130,6 +136,16 @@ _Z_OWNED_TYPE_VALUE(_z_encoding_t, encoding)
*/
_Z_OWNED_TYPE_VALUE(_z_value_t, reply_err)

#if defined(Z_FEATURE_MATCHING)
/**
* A struct that indicates if there exist Subscribers matching the Publisher's key expression or Queryables matching
* Querier's key expression and target.
*/
typedef struct {
bool matching; // true if there exist matching Zenoh entities, false otherwise.
} z_matching_status_t;
#endif

/**
* Represents the configuration used to configure a subscriber upon declaration :c:func:`z_declare_subscriber`.
*/
Expand Down Expand Up @@ -476,6 +492,27 @@ typedef struct {
*/
_Z_OWNED_TYPE_VALUE(_z_closure_zid_t, closure_zid)

#if defined(Z_FEATURE_MATCHING)
typedef void (*z_closure_matching_status_callback_t)(const z_matching_status_t *status, void *arg);

typedef struct {
void *context;
z_closure_matching_status_callback_t call;
z_closure_drop_callback_t drop;
} _z_closure_matching_status_t;

// TODO(sashacmc): found better place?
typedef struct _z_matching_listener_ctx_t {
uint32_t decl_id;
_z_closure_matching_status_t callback;
} _z_matching_listener_ctx_t;

/**
* Represents the matching status callback closure.
*/
_Z_OWNED_TYPE_VALUE(_z_closure_matching_status_t, closure_matching_status)
#endif

#ifdef __cplusplus
}
#endif
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#define Z_FEATURE_LOCAL_SUBSCRIBER 0
#define Z_FEATURE_PUBLISHER_SESSION_CHECK 1
#define Z_FEATURE_BATCHING 1
#define Z_FEATURE_MATCHING 1
#define Z_FEATURE_RX_CACHE 0
#define Z_FEATURE_AUTO_RECONNECT 1
// End of CMake generation
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/config.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#define Z_FEATURE_LOCAL_SUBSCRIBER @Z_FEATURE_LOCAL_SUBSCRIBER@
#define Z_FEATURE_PUBLISHER_SESSION_CHECK @Z_FEATURE_PUBLISHER_SESSION_CHECK@
#define Z_FEATURE_BATCHING @Z_FEATURE_BATCHING@
#define Z_FEATURE_MATCHING @Z_FEATURE_MATCHING@
#define Z_FEATURE_RX_CACHE @Z_FEATURE_RX_CACHE@
#define Z_FEATURE_AUTO_RECONNECT @Z_FEATURE_AUTO_RECONNECT@
// End of CMake generation
Expand Down
43 changes: 43 additions & 0 deletions include/zenoh-pico/net/matching.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
//
// Copyright (c) 2025 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

#ifndef INCLUDE_ZENOH_PICO_NET_MATCHING_H
#define INCLUDE_ZENOH_PICO_NET_MATCHING_H

#include "zenoh-pico/net/session.h"

#ifdef __cplusplus
extern "C" {
#endif

#if Z_FEATURE_MATCHING == 1
typedef struct _z_matching_listener_t {
uint32_t _interest_id;
_z_session_weak_t _zn;
} _z_matching_listener_t;

// Warning: None of the sub-types require a non-0 initialization. Add a init function if it changes.
static inline _z_matching_listener_t _z_matching_listener_null(void) { return (_z_matching_listener_t){0}; }
static inline bool _z_matching_listener_check(const _z_matching_listener_t *matching_listener) {
return !_Z_RC_IS_NULL(&matching_listener->_zn);
}
void _z_matching_listener_clear(_z_matching_listener_t *pub);
void _z_matching_listener_free(_z_matching_listener_t **pub);
#endif

#ifdef __cplusplus
}
#endif

#endif /* INCLUDE_ZENOH_PICO_NET_MATCHING_H */
5 changes: 5 additions & 0 deletions include/zenoh-pico/net/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/protocol/definitions/network.h"
#include "zenoh-pico/session/liveliness.h"
#include "zenoh-pico/session/matching.h"
#include "zenoh-pico/session/queryable.h"
#include "zenoh-pico/session/session.h"
#include "zenoh-pico/session/subscription.h"
Expand Down Expand Up @@ -93,6 +94,10 @@ typedef struct _z_session_t {
_z_pending_query_list_t *_pending_queries;
#endif

#if Z_FEATURE_INTEREST == 1
_z_matching_listener_item_intmap_t *_matching_listeners;
#endif

// Session interests
#if Z_FEATURE_INTEREST == 1
_z_session_interest_rc_list_t *_local_interests;
Expand Down
42 changes: 42 additions & 0 deletions include/zenoh-pico/session/matching.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//
// Copyright (c) 2025 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

#ifndef INCLUDE_ZENOH_PICO_SESSION_MATCHING_H
#define INCLUDE_ZENOH_PICO_SESSION_MATCHING_H

#include "zenoh-pico/collections/element.h"
#include "zenoh-pico/collections/intmap.h"

#ifdef __cplusplus
extern "C" {
#endif

#if Z_FEATURE_MATCHING == 1
typedef struct _z_matching_listener_ctx_t _z_matching_listener_ctx_t;

typedef struct {
uint32_t _interest_id;
_z_matching_listener_ctx_t *ctx;
} _z_matching_listener_state_t;

_Z_ELEM_DEFINE(_z_matching_listener_item, _z_matching_listener_state_t, _z_noop_size, _z_noop_clear, _z_noop_copy,
_z_noop_move)
_Z_INT_MAP_DEFINE(_z_matching_listener_item, _z_matching_listener_state_t)
#endif

#ifdef __cplusplus
}
#endif

#endif /* INCLUDE_ZENOH_PICO_SESSION_MATCHING_H */
Loading
Loading