Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch reliability from subscriber to publisher. #630

Merged
merged 7 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ Primitives
.. autocfunction:: primitives.h::z_sample_timestamp
.. autocfunction:: primitives.h::z_sample_encoding
.. autocfunction:: primitives.h::z_sample_kind
.. autocfunction:: primitives.h::z_sample_reliability
.. autocfunction:: primitives.h::z_sample_attachment
.. autocfunction:: primitives.h::z_put_options_default
.. autocfunction:: primitives.h::z_delete_options_default
Expand Down
13 changes: 13 additions & 0 deletions include/zenoh-pico/api/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -1609,6 +1609,19 @@ const z_loaned_encoding_t *z_sample_encoding(const z_loaned_sample_t *sample);
*/
z_sample_kind_t z_sample_kind(const z_loaned_sample_t *sample);

/**
* (unstable) Gets the reliability a sample was received with.
*
* Parameters:
* sample: Pointer to a :c:type:`z_loaned_sample_t` to get the reliability from.
*
* Return:
* The reliability wrapped as a :c:type:`z_reliability_t`.
*/
#if Z_FEATURE_UNSTABLE_API == 1
z_reliability_t z_sample_reliability(const z_loaned_sample_t *sample);
#endif

/**
* Got sample qos congestion control value.
*
Expand Down
20 changes: 13 additions & 7 deletions include/zenoh-pico/api/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,16 +181,10 @@ _Z_OWNED_TYPE_VALUE(_z_value_t, reply_err)

/**
* Represents the configuration used to configure a subscriber upon declaration :c:func:`z_declare_subscriber`.
*
* Members:
* (unstable) z_reliability_t reliability: The subscription reliability value.
*.
*/
typedef struct {
#if Z_FEATURE_UNSTABLE_API == 1
z_reliability_t reliability;
#else
uint8_t __dummy; // Just to avoid empty structures that might cause undefined behavior
#endif
} z_subscriber_options_t;

/**
Expand All @@ -212,12 +206,16 @@ typedef struct {
* publisher.
* z_priority_t priority: The priority of messages issued by this publisher.
* _Bool is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwidth.
* (unstable) z_reliability_t reliability: The reliability that should be used to transmit the data.
*/
typedef struct {
z_moved_encoding_t *encoding;
z_congestion_control_t congestion_control;
z_priority_t priority;
_Bool is_express;
#if Z_FEATURE_UNSTABLE_API == 1
z_reliability_t reliability;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reliability is not under Z_FEATURE_UNSTABLE_API anymore ?

#endif
} z_publisher_options_t;

/**
Expand Down Expand Up @@ -288,6 +286,7 @@ typedef struct {
* z_timestamp_t *timestamp: The API level timestamp (e.g. of the data when it was created).
* _Bool is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwidth.
* z_moved_bytes_t* attachment: An optional attachment to the publication.
* (unstable) z_reliability_t reliability: The reliability that should be used to transmit the data.
*/
typedef struct {
z_moved_encoding_t *encoding;
Expand All @@ -296,6 +295,9 @@ typedef struct {
z_timestamp_t *timestamp;
_Bool is_express;
z_moved_bytes_t *attachment;
#if Z_FEATURE_UNSTABLE_API == 1
z_reliability_t reliability;
#endif
} z_put_options_t;

/**
Expand All @@ -306,12 +308,16 @@ typedef struct {
* z_priority_t priority: The priority of this message when router.
* _Bool is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwidth.
* z_timestamp_t *timestamp: The API level timestamp (e.g. of the data when it was created).
* (unstable) z_reliability_t reliability: The reliability that should be used to transmit the data.
*/
typedef struct {
z_congestion_control_t congestion_control;
z_priority_t priority;
_Bool is_express;
z_timestamp_t *timestamp;
#if Z_FEATURE_UNSTABLE_API == 1
z_reliability_t reliability;
#endif
} z_delete_options_t;

/**
Expand Down
6 changes: 3 additions & 3 deletions include/zenoh-pico/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
#define INCLUDE_ZENOH_PICO_CONFIG_H

/*--- CMake generated config; pass values to CMake to change the following tokens ---*/
#define Z_FRAG_MAX_SIZE 300000
#define Z_BATCH_UNICAST_SIZE 65535
#define Z_BATCH_MULTICAST_SIZE 8096
#define Z_FRAG_MAX_SIZE 4096
#define Z_BATCH_UNICAST_SIZE 2048
#define Z_BATCH_MULTICAST_SIZE 2048
#define Z_CONFIG_SOCKET_TIMEOUT 100

#define Z_FEATURE_UNSTABLE_API 0
Expand Down
14 changes: 8 additions & 6 deletions include/zenoh-pico/net/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,14 @@ int8_t _z_undeclare_resource(_z_session_t *zn, uint16_t rid);
* keyexpr: The resource key to publish. The callee gets the ownership
* of any allocated value.
* encoding: The optional default encoding to use during put. The callee gets the ownership.
* reliability: The reliability of the publisher messages
*
* Returns:
* The created :c:type:`_z_publisher_t` (in null state if the declaration failed)..
*/
_z_publisher_t _z_declare_publisher(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, _z_encoding_t *encoding,
z_congestion_control_t congestion_control, z_priority_t priority, _Bool is_express);
z_congestion_control_t congestion_control, z_priority_t priority, _Bool is_express,
z_reliability_t reliability);

/**
* Undeclare a :c:type:`_z_publisher_t`.
Expand Down Expand Up @@ -118,12 +120,14 @@ int8_t _z_undeclare_publisher(_z_publisher_t *pub);
* is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwidth.
* timestamp: The timestamp of this write. The API level timestamp (e.g. of the data when it was created).
* attachment: An optional attachment to this write.
* reliability: The message reliability.
* Returns:
* ``0`` in case of success, ``-1`` in case of failure.
*/
int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, _z_bytes_t payload, const _z_encoding_t *encoding,
const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl, z_priority_t priority,
_Bool is_express, const _z_timestamp_t *timestamp, const _z_bytes_t attachment);
_Bool is_express, const _z_timestamp_t *timestamp, const _z_bytes_t attachment,
z_reliability_t reliability);
#endif

#if Z_FEATURE_SUBSCRIPTION == 1
Expand All @@ -134,16 +138,14 @@ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, _z_bytes_t payload
* zn: The zenoh-net session. The caller keeps its ownership.
* keyexpr: The resource key to subscribe. The callee gets the ownership
* of any allocated value.
* sub_info: The :c:type:`_z_subinfo_t` to configure the :c:type:`_z_subscriber_t`.
* The callee gets the ownership of any allocated value.
* callback: The callback function that will be called each time a data matching the subscribed resource is
* received. arg: A pointer that will be passed to the **callback** on each call.
*
* Returns:
* The created :c:type:`_z_subscriber_t` (in null state if the declaration failed).
*/
_z_subscriber_t _z_declare_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, _z_subinfo_t sub_info,
_z_data_handler_t callback, _z_drop_handler_t dropper, void *arg);
_z_subscriber_t _z_declare_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, _z_data_handler_t callback,
_z_drop_handler_t dropper, void *arg);

/**
* Undeclare a :c:type:`_z_subscriber_t`.
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/net/publish.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ typedef struct _z_publisher_t {
_z_encoding_t _encoding;
z_congestion_control_t _congestion_control;
z_priority_t _priority;
z_reliability_t reliability;
_Bool _is_express;
#if Z_FEATURE_INTEREST == 1
_z_write_filter_t _filter;
Expand Down
3 changes: 2 additions & 1 deletion include/zenoh-pico/net/sample.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ typedef struct _z_sample_t {
z_sample_kind_t kind;
_z_qos_t qos;
_z_bytes_t attachment;
z_reliability_t reliability;
} _z_sample_t;
void _z_sample_clear(_z_sample_t *sample);

Expand All @@ -56,6 +57,6 @@ _z_sample_t _z_sample_duplicate(const _z_sample_t *src);

_z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t payload, const _z_timestamp_t *timestamp,
_z_encoding_t *encoding, const z_sample_kind_t kind, const _z_qos_t qos,
const _z_bytes_t attachment);
const _z_bytes_t attachment, z_reliability_t reliability);

#endif /* ZENOH_PICO_SAMPLE_NETAPI_H */
7 changes: 0 additions & 7 deletions include/zenoh-pico/net/subscribe.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,6 @@ typedef struct {
} _z_subscriber_t;

#if Z_FEATURE_SUBSCRIPTION == 1
/**
* Create a default subscription info for a push subscriber.
*
* Returns:
* A :c:type:`_z_subinfo_t` containing the created subscription info.
*/
_z_subinfo_t _z_subinfo_default(void);

void _z_subscriber_clear(_z_subscriber_t *sub);
void _z_subscriber_free(_z_subscriber_t **sub);
Expand Down
11 changes: 0 additions & 11 deletions include/zenoh-pico/protocol/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,17 +197,6 @@ typedef struct {
_z_zint_t n;
} _z_target_complete_body_t;

/**
* Informations to be passed to :c:func:`_z_declare_subscriber` to configure the created
* :c:type:`_z_subscription_rc_t`.
*
* Members:
* z_reliability_t reliability: The subscription reliability.
*/
typedef struct {
z_reliability_t reliability;
} _z_subinfo_t;

typedef struct {
_z_id_t _id;
uint32_t _entity_id;
Expand Down
5 changes: 1 addition & 4 deletions include/zenoh-pico/protocol/definitions/declarations.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ _z_undecl_kexpr_t _z_undecl_kexpr_null(void);
typedef struct {
_z_keyexpr_t _keyexpr;
uint32_t _id;
struct {
_Bool _reliable;
} _ext_subinfo;
} _z_decl_subscriber_t;
_z_decl_subscriber_t _z_decl_subscriber_null(void);
typedef struct {
Expand Down Expand Up @@ -105,7 +102,7 @@ void _z_decl_fix_mapping(_z_declaration_t* msg, uint16_t mapping);
_z_declaration_t _z_make_decl_keyexpr(uint16_t id, _Z_MOVE(_z_keyexpr_t) key);
_z_declaration_t _z_make_undecl_keyexpr(uint16_t id);

_z_declaration_t _z_make_decl_subscriber(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, _Bool reliable);
_z_declaration_t _z_make_decl_subscriber(_Z_MOVE(_z_keyexpr_t) key, uint32_t id);
_z_declaration_t _z_make_undecl_subscriber(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t* key);

_z_declaration_t _z_make_decl_queryable(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, uint16_t distance, _Bool complete);
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/protocol/definitions/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ typedef union {
typedef struct {
enum { _Z_N_DECLARE, _Z_N_PUSH, _Z_N_REQUEST, _Z_N_RESPONSE, _Z_N_RESPONSE_FINAL, _Z_N_INTEREST } _tag;
_z_network_body_t _body;
z_reliability_t _reliability;
} _z_network_message_t;
typedef _z_network_message_t _z_zenoh_message_t;
void _z_n_msg_clear(_z_network_message_t *m);
Expand Down
2 changes: 2 additions & 0 deletions include/zenoh-pico/protocol/definitions/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,8 @@ typedef struct {
} _z_transport_message_t;
void _z_t_msg_clear(_z_transport_message_t *msg);

z_reliability_t _z_t_msg_get_reliability(_z_transport_message_t *msg);

/*------------------ Builders ------------------*/
_z_transport_message_t _z_t_msg_make_join(z_whatami_t whatami, _z_zint_t lease, _z_id_t zid,
_z_conduit_sn_list_t next_sn);
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/session/push.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@
#ifndef ZENOH_PICO_SESSION_PUSH_H
#define ZENOH_PICO_SESSION_PUSH_H

int8_t _z_trigger_push(_z_session_t *zn, _z_n_msg_push_t *push);
int8_t _z_trigger_push(_z_session_t *zn, _z_n_msg_push_t *push, z_reliability_t reliability);

#endif /* ZENOH_PICO_SESSION_PUSH_H */
1 change: 0 additions & 1 deletion include/zenoh-pico/session/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ typedef struct {
_z_data_handler_t _callback;
_z_drop_handler_t _dropper;
void *_arg;
_z_subinfo_t _info;
} _z_subscription_t;

_Bool _z_subscription_eq(const _z_subscription_t *one, const _z_subscription_t *two);
Expand Down
4 changes: 2 additions & 2 deletions include/zenoh-pico/session/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
/*------------------ Subscription ------------------*/
void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
_z_encoding_t *encoding, const _z_n_qos_t qos, const _z_timestamp_t *timestamp,
const _z_bytes_t attachment);
const _z_bytes_t attachment, z_reliability_t reliability);

#if Z_FEATURE_SUBSCRIPTION == 1
_z_subscription_rc_t *_z_get_subscription_by_id(_z_session_t *zn, uint8_t is_local, const _z_zint_t id);
Expand All @@ -30,7 +30,7 @@ _z_subscription_rc_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, uint8_t
_z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_t *sub);
int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
_z_encoding_t *encoding, const _z_zint_t kind, const _z_timestamp_t *timestamp,
const _z_n_qos_t qos, const _z_bytes_t attachment);
const _z_n_qos_t qos, const _z_bytes_t attachment, z_reliability_t reliability);
void _z_unregister_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_rc_t *sub);
void _z_flush_subscriptions(_z_session_t *zn);
#endif
Expand Down
Loading
Loading