diff --git a/docs/api.rst b/docs/api.rst index 1fea537d1..105e3050d 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -400,6 +400,7 @@ Primitives .. autocfunction:: primitives.h::z_sample_timestamp .. autocfunction:: primitives.h::z_sample_encoding .. autocfunction:: primitives.h::z_sample_kind +.. autocfunction:: primitives.h::z_sample_reliability .. autocfunction:: primitives.h::z_sample_attachment .. autocfunction:: primitives.h::z_put_options_default .. autocfunction:: primitives.h::z_delete_options_default diff --git a/include/zenoh-pico/api/primitives.h b/include/zenoh-pico/api/primitives.h index 97f50b63e..8529413b7 100644 --- a/include/zenoh-pico/api/primitives.h +++ b/include/zenoh-pico/api/primitives.h @@ -1609,6 +1609,19 @@ const z_loaned_encoding_t *z_sample_encoding(const z_loaned_sample_t *sample); */ z_sample_kind_t z_sample_kind(const z_loaned_sample_t *sample); +/** + * (unstable) Gets the reliability a sample was received with. + * + * Parameters: + * sample: Pointer to a :c:type:`z_loaned_sample_t` to get the reliability from. + * + * Return: + * The reliability wrapped as a :c:type:`z_reliability_t`. + */ +#if Z_FEATURE_UNSTABLE_API == 1 +z_reliability_t z_sample_reliability(const z_loaned_sample_t *sample); +#endif + /** * Got sample qos congestion control value. * diff --git a/include/zenoh-pico/api/types.h b/include/zenoh-pico/api/types.h index fef59ac5e..3a68ce3ae 100644 --- a/include/zenoh-pico/api/types.h +++ b/include/zenoh-pico/api/types.h @@ -181,16 +181,10 @@ _Z_OWNED_TYPE_VALUE(_z_value_t, reply_err) /** * Represents the configuration used to configure a subscriber upon declaration :c:func:`z_declare_subscriber`. - * - * Members: - * (unstable) z_reliability_t reliability: The subscription reliability value. + *. */ typedef struct { -#if Z_FEATURE_UNSTABLE_API == 1 - z_reliability_t reliability; -#else uint8_t __dummy; // Just to avoid empty structures that might cause undefined behavior -#endif } z_subscriber_options_t; /** @@ -212,12 +206,16 @@ typedef struct { * publisher. * z_priority_t priority: The priority of messages issued by this publisher. * _Bool is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwidth. + * (unstable) z_reliability_t reliability: The reliability that should be used to transmit the data. */ typedef struct { z_moved_encoding_t *encoding; z_congestion_control_t congestion_control; z_priority_t priority; _Bool is_express; +#if Z_FEATURE_UNSTABLE_API == 1 + z_reliability_t reliability; +#endif } z_publisher_options_t; /** @@ -288,6 +286,7 @@ typedef struct { * z_timestamp_t *timestamp: The API level timestamp (e.g. of the data when it was created). * _Bool is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwidth. * z_moved_bytes_t* attachment: An optional attachment to the publication. + * (unstable) z_reliability_t reliability: The reliability that should be used to transmit the data. */ typedef struct { z_moved_encoding_t *encoding; @@ -296,6 +295,9 @@ typedef struct { z_timestamp_t *timestamp; _Bool is_express; z_moved_bytes_t *attachment; +#if Z_FEATURE_UNSTABLE_API == 1 + z_reliability_t reliability; +#endif } z_put_options_t; /** @@ -306,12 +308,16 @@ typedef struct { * z_priority_t priority: The priority of this message when router. * _Bool is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwidth. * z_timestamp_t *timestamp: The API level timestamp (e.g. of the data when it was created). + * (unstable) z_reliability_t reliability: The reliability that should be used to transmit the data. */ typedef struct { z_congestion_control_t congestion_control; z_priority_t priority; _Bool is_express; z_timestamp_t *timestamp; +#if Z_FEATURE_UNSTABLE_API == 1 + z_reliability_t reliability; +#endif } z_delete_options_t; /** diff --git a/include/zenoh-pico/config.h b/include/zenoh-pico/config.h index 82b1b155b..1230a0a4b 100644 --- a/include/zenoh-pico/config.h +++ b/include/zenoh-pico/config.h @@ -16,9 +16,9 @@ #define INCLUDE_ZENOH_PICO_CONFIG_H /*--- CMake generated config; pass values to CMake to change the following tokens ---*/ -#define Z_FRAG_MAX_SIZE 300000 -#define Z_BATCH_UNICAST_SIZE 65535 -#define Z_BATCH_MULTICAST_SIZE 8096 +#define Z_FRAG_MAX_SIZE 4096 +#define Z_BATCH_UNICAST_SIZE 2048 +#define Z_BATCH_MULTICAST_SIZE 2048 #define Z_CONFIG_SOCKET_TIMEOUT 100 #define Z_FEATURE_UNSTABLE_API 0 diff --git a/include/zenoh-pico/net/primitives.h b/include/zenoh-pico/net/primitives.h index c3d610d11..435011cc1 100644 --- a/include/zenoh-pico/net/primitives.h +++ b/include/zenoh-pico/net/primitives.h @@ -84,12 +84,14 @@ int8_t _z_undeclare_resource(_z_session_t *zn, uint16_t rid); * keyexpr: The resource key to publish. The callee gets the ownership * of any allocated value. * encoding: The optional default encoding to use during put. The callee gets the ownership. + * reliability: The reliability of the publisher messages * * Returns: * The created :c:type:`_z_publisher_t` (in null state if the declaration failed).. */ _z_publisher_t _z_declare_publisher(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, _z_encoding_t *encoding, - z_congestion_control_t congestion_control, z_priority_t priority, _Bool is_express); + z_congestion_control_t congestion_control, z_priority_t priority, _Bool is_express, + z_reliability_t reliability); /** * Undeclare a :c:type:`_z_publisher_t`. @@ -118,12 +120,14 @@ int8_t _z_undeclare_publisher(_z_publisher_t *pub); * is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwidth. * timestamp: The timestamp of this write. The API level timestamp (e.g. of the data when it was created). * attachment: An optional attachment to this write. + * reliability: The message reliability. * Returns: * ``0`` in case of success, ``-1`` in case of failure. */ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, _z_bytes_t payload, const _z_encoding_t *encoding, const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl, z_priority_t priority, - _Bool is_express, const _z_timestamp_t *timestamp, const _z_bytes_t attachment); + _Bool is_express, const _z_timestamp_t *timestamp, const _z_bytes_t attachment, + z_reliability_t reliability); #endif #if Z_FEATURE_SUBSCRIPTION == 1 @@ -134,16 +138,14 @@ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, _z_bytes_t payload * zn: The zenoh-net session. The caller keeps its ownership. * keyexpr: The resource key to subscribe. The callee gets the ownership * of any allocated value. - * sub_info: The :c:type:`_z_subinfo_t` to configure the :c:type:`_z_subscriber_t`. - * The callee gets the ownership of any allocated value. * callback: The callback function that will be called each time a data matching the subscribed resource is * received. arg: A pointer that will be passed to the **callback** on each call. * * Returns: * The created :c:type:`_z_subscriber_t` (in null state if the declaration failed). */ -_z_subscriber_t _z_declare_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, _z_subinfo_t sub_info, - _z_data_handler_t callback, _z_drop_handler_t dropper, void *arg); +_z_subscriber_t _z_declare_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, _z_data_handler_t callback, + _z_drop_handler_t dropper, void *arg); /** * Undeclare a :c:type:`_z_subscriber_t`. diff --git a/include/zenoh-pico/net/publish.h b/include/zenoh-pico/net/publish.h index f7f44686e..4be537dd3 100644 --- a/include/zenoh-pico/net/publish.h +++ b/include/zenoh-pico/net/publish.h @@ -29,6 +29,7 @@ typedef struct _z_publisher_t { _z_encoding_t _encoding; z_congestion_control_t _congestion_control; z_priority_t _priority; + z_reliability_t reliability; _Bool _is_express; #if Z_FEATURE_INTEREST == 1 _z_write_filter_t _filter; diff --git a/include/zenoh-pico/net/sample.h b/include/zenoh-pico/net/sample.h index 9cd33fc82..2cc11fbd8 100644 --- a/include/zenoh-pico/net/sample.h +++ b/include/zenoh-pico/net/sample.h @@ -36,6 +36,7 @@ typedef struct _z_sample_t { z_sample_kind_t kind; _z_qos_t qos; _z_bytes_t attachment; + z_reliability_t reliability; } _z_sample_t; void _z_sample_clear(_z_sample_t *sample); @@ -56,6 +57,6 @@ _z_sample_t _z_sample_duplicate(const _z_sample_t *src); _z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t payload, const _z_timestamp_t *timestamp, _z_encoding_t *encoding, const z_sample_kind_t kind, const _z_qos_t qos, - const _z_bytes_t attachment); + const _z_bytes_t attachment, z_reliability_t reliability); #endif /* ZENOH_PICO_SAMPLE_NETAPI_H */ diff --git a/include/zenoh-pico/net/subscribe.h b/include/zenoh-pico/net/subscribe.h index 9aaf60a42..581eb1894 100644 --- a/include/zenoh-pico/net/subscribe.h +++ b/include/zenoh-pico/net/subscribe.h @@ -29,13 +29,6 @@ typedef struct { } _z_subscriber_t; #if Z_FEATURE_SUBSCRIPTION == 1 -/** - * Create a default subscription info for a push subscriber. - * - * Returns: - * A :c:type:`_z_subinfo_t` containing the created subscription info. - */ -_z_subinfo_t _z_subinfo_default(void); void _z_subscriber_clear(_z_subscriber_t *sub); void _z_subscriber_free(_z_subscriber_t **sub); diff --git a/include/zenoh-pico/protocol/core.h b/include/zenoh-pico/protocol/core.h index 78be5e3a7..2fd9a76ec 100644 --- a/include/zenoh-pico/protocol/core.h +++ b/include/zenoh-pico/protocol/core.h @@ -197,17 +197,6 @@ typedef struct { _z_zint_t n; } _z_target_complete_body_t; -/** - * Informations to be passed to :c:func:`_z_declare_subscriber` to configure the created - * :c:type:`_z_subscription_rc_t`. - * - * Members: - * z_reliability_t reliability: The subscription reliability. - */ -typedef struct { - z_reliability_t reliability; -} _z_subinfo_t; - typedef struct { _z_id_t _id; uint32_t _entity_id; diff --git a/include/zenoh-pico/protocol/definitions/declarations.h b/include/zenoh-pico/protocol/definitions/declarations.h index cba210a77..f72037c75 100644 --- a/include/zenoh-pico/protocol/definitions/declarations.h +++ b/include/zenoh-pico/protocol/definitions/declarations.h @@ -33,9 +33,6 @@ _z_undecl_kexpr_t _z_undecl_kexpr_null(void); typedef struct { _z_keyexpr_t _keyexpr; uint32_t _id; - struct { - _Bool _reliable; - } _ext_subinfo; } _z_decl_subscriber_t; _z_decl_subscriber_t _z_decl_subscriber_null(void); typedef struct { @@ -105,7 +102,7 @@ void _z_decl_fix_mapping(_z_declaration_t* msg, uint16_t mapping); _z_declaration_t _z_make_decl_keyexpr(uint16_t id, _Z_MOVE(_z_keyexpr_t) key); _z_declaration_t _z_make_undecl_keyexpr(uint16_t id); -_z_declaration_t _z_make_decl_subscriber(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, _Bool reliable); +_z_declaration_t _z_make_decl_subscriber(_Z_MOVE(_z_keyexpr_t) key, uint32_t id); _z_declaration_t _z_make_undecl_subscriber(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t* key); _z_declaration_t _z_make_decl_queryable(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, uint16_t distance, _Bool complete); diff --git a/include/zenoh-pico/protocol/definitions/network.h b/include/zenoh-pico/protocol/definitions/network.h index 023959cce..e2d9e9de4 100644 --- a/include/zenoh-pico/protocol/definitions/network.h +++ b/include/zenoh-pico/protocol/definitions/network.h @@ -281,6 +281,7 @@ typedef union { typedef struct { enum { _Z_N_DECLARE, _Z_N_PUSH, _Z_N_REQUEST, _Z_N_RESPONSE, _Z_N_RESPONSE_FINAL, _Z_N_INTEREST } _tag; _z_network_body_t _body; + z_reliability_t _reliability; } _z_network_message_t; typedef _z_network_message_t _z_zenoh_message_t; void _z_n_msg_clear(_z_network_message_t *m); diff --git a/include/zenoh-pico/protocol/definitions/transport.h b/include/zenoh-pico/protocol/definitions/transport.h index 74f4de973..28e5d8ee9 100644 --- a/include/zenoh-pico/protocol/definitions/transport.h +++ b/include/zenoh-pico/protocol/definitions/transport.h @@ -496,6 +496,8 @@ typedef struct { } _z_transport_message_t; void _z_t_msg_clear(_z_transport_message_t *msg); +z_reliability_t _z_t_msg_get_reliability(_z_transport_message_t *msg); + /*------------------ Builders ------------------*/ _z_transport_message_t _z_t_msg_make_join(z_whatami_t whatami, _z_zint_t lease, _z_id_t zid, _z_conduit_sn_list_t next_sn); diff --git a/include/zenoh-pico/session/push.h b/include/zenoh-pico/session/push.h index 76c02b2f1..7c44f49fa 100644 --- a/include/zenoh-pico/session/push.h +++ b/include/zenoh-pico/session/push.h @@ -21,6 +21,6 @@ #ifndef ZENOH_PICO_SESSION_PUSH_H #define ZENOH_PICO_SESSION_PUSH_H -int8_t _z_trigger_push(_z_session_t *zn, _z_n_msg_push_t *push); +int8_t _z_trigger_push(_z_session_t *zn, _z_n_msg_push_t *push, z_reliability_t reliability); #endif /* ZENOH_PICO_SESSION_PUSH_H */ diff --git a/include/zenoh-pico/session/session.h b/include/zenoh-pico/session/session.h index de91d28fd..c91b4d979 100644 --- a/include/zenoh-pico/session/session.h +++ b/include/zenoh-pico/session/session.h @@ -63,7 +63,6 @@ typedef struct { _z_data_handler_t _callback; _z_drop_handler_t _dropper; void *_arg; - _z_subinfo_t _info; } _z_subscription_t; _Bool _z_subscription_eq(const _z_subscription_t *one, const _z_subscription_t *two); diff --git a/include/zenoh-pico/session/subscription.h b/include/zenoh-pico/session/subscription.h index bade6d1ce..21897545e 100644 --- a/include/zenoh-pico/session/subscription.h +++ b/include/zenoh-pico/session/subscription.h @@ -21,7 +21,7 @@ /*------------------ Subscription ------------------*/ void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, _z_encoding_t *encoding, const _z_n_qos_t qos, const _z_timestamp_t *timestamp, - const _z_bytes_t attachment); + const _z_bytes_t attachment, z_reliability_t reliability); #if Z_FEATURE_SUBSCRIPTION == 1 _z_subscription_rc_t *_z_get_subscription_by_id(_z_session_t *zn, uint8_t is_local, const _z_zint_t id); @@ -30,7 +30,7 @@ _z_subscription_rc_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, uint8_t _z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_t *sub); int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, _z_encoding_t *encoding, const _z_zint_t kind, const _z_timestamp_t *timestamp, - const _z_n_qos_t qos, const _z_bytes_t attachment); + const _z_n_qos_t qos, const _z_bytes_t attachment, z_reliability_t reliability); void _z_unregister_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_rc_t *sub); void _z_flush_subscriptions(_z_session_t *zn); #endif diff --git a/src/api/api.c b/src/api/api.c index 3c4bf37fa..017b1949b 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -981,6 +981,9 @@ z_result_t z_id_to_string(z_id_t *id, z_owned_string_t *str) { const z_loaned_keyexpr_t *z_sample_keyexpr(const z_loaned_sample_t *sample) { return &sample->keyexpr; } z_sample_kind_t z_sample_kind(const z_loaned_sample_t *sample) { return sample->kind; } +#if Z_FEATURE_UNSTABLE_API == 1 +z_reliability_t z_sample_reliability(const z_loaned_sample_t *sample) { return sample->reliability; } +#endif const z_loaned_bytes_t *z_sample_payload(const z_loaned_sample_t *sample) { return &sample->payload; } const z_timestamp_t *z_sample_timestamp(const z_loaned_sample_t *sample) { if (_z_timestamp_check(&sample->timestamp)) { @@ -1046,6 +1049,9 @@ void z_put_options_default(z_put_options_t *options) { options->is_express = false; options->timestamp = NULL; options->attachment = NULL; +#if Z_FEATURE_UNSTABLE_API == 1 + options->reliability = Z_RELIABILITY_DEFAULT; +#endif } void z_delete_options_default(z_delete_options_t *options) { @@ -1053,6 +1059,9 @@ void z_delete_options_default(z_delete_options_t *options) { options->is_express = false; options->timestamp = NULL; options->priority = Z_PRIORITY_DEFAULT; +#if Z_FEATURE_UNSTABLE_API == 1 + options->reliability = Z_RELIABILITY_DEFAULT; +#endif } int8_t z_put(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr, z_moved_bytes_t *payload, @@ -1062,25 +1071,25 @@ int8_t z_put(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr, z_ z_put_options_t opt; z_put_options_default(&opt); if (options != NULL) { - opt.congestion_control = options->congestion_control; - opt.encoding = options->encoding; - opt.priority = options->priority; - opt.is_express = options->is_express; - opt.timestamp = options->timestamp; - opt.attachment = options->attachment; + opt = *options; } + z_reliability_t reliability = Z_RELIABILITY_DEFAULT; +#if Z_FEATURE_UNSTABLE_API == 1 + reliability = opt.reliability; +#endif _z_keyexpr_t keyexpr_aliased = _z_keyexpr_alias_from_user_defined(*keyexpr, true); ret = _z_write(_Z_RC_IN_VAL(zs), keyexpr_aliased, _z_bytes_from_owned_bytes(&payload->_this), opt.encoding == NULL ? NULL : &opt.encoding->_this._val, Z_SAMPLE_KIND_PUT, opt.congestion_control, - opt.priority, opt.is_express, opt.timestamp, _z_bytes_from_owned_bytes(&opt.attachment->_this)); + opt.priority, opt.is_express, opt.timestamp, _z_bytes_from_owned_bytes(&opt.attachment->_this), + reliability); // Trigger local subscriptions _z_trigger_local_subscriptions( _Z_RC_IN_VAL(zs), keyexpr_aliased, _z_bytes_from_owned_bytes(&payload->_this), opt.encoding == NULL ? NULL : &opt.encoding->_this._val, _z_n_qos_make(opt.is_express, opt.congestion_control == Z_CONGESTION_CONTROL_BLOCK, opt.priority), - opt.timestamp, _z_bytes_from_owned_bytes(&opt.attachment->_this)); + opt.timestamp, _z_bytes_from_owned_bytes(&opt.attachment->_this), reliability); // Clean-up z_encoding_drop(opt.encoding); z_bytes_drop(opt.attachment); @@ -1094,13 +1103,15 @@ int8_t z_delete(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr, z_delete_options_t opt; z_delete_options_default(&opt); if (options != NULL) { - opt.congestion_control = options->congestion_control; - opt.priority = options->priority; - opt.is_express = options->is_express; - opt.timestamp = options->timestamp; + opt = *options; } + z_reliability_t reliability = Z_RELIABILITY_DEFAULT; +#if Z_FEATURE_UNSTABLE_API == 1 + reliability = opt.reliability; +#endif + ret = _z_write(_Z_RC_IN_VAL(zs), *keyexpr, _z_bytes_null(), NULL, Z_SAMPLE_KIND_DELETE, opt.congestion_control, - opt.priority, opt.is_express, opt.timestamp, _z_bytes_null()); + opt.priority, opt.is_express, opt.timestamp, _z_bytes_null(), reliability); return ret; } @@ -1110,6 +1121,9 @@ void z_publisher_options_default(z_publisher_options_t *options) { options->congestion_control = Z_CONGESTION_CONTROL_DEFAULT; options->priority = Z_PRIORITY_DEFAULT; options->is_express = false; +#if Z_FEATURE_UNSTABLE_API == 1 + options->reliability = Z_RELIABILITY_DEFAULT; +#endif } int8_t z_declare_publisher(z_owned_publisher_t *pub, const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr, @@ -1134,9 +1148,14 @@ int8_t z_declare_publisher(z_owned_publisher_t *pub, const z_loaned_session_t *z if (options != NULL) { opt = *options; } + z_reliability_t reliability = Z_RELIABILITY_DEFAULT; +#if Z_FEATURE_UNSTABLE_API == 1 + reliability = opt.reliability; +#endif + // Set publisher _z_publisher_t int_pub = _z_declare_publisher(zs, key, opt.encoding == NULL ? NULL : &opt.encoding->_this._val, - opt.congestion_control, opt.priority, opt.is_express); + opt.congestion_control, opt.priority, opt.is_express, reliability); // Create write filter int8_t res = _z_write_filter_create(&int_pub); if (res != _Z_RES_OK) { @@ -1166,10 +1185,13 @@ int8_t z_publisher_put(const z_loaned_publisher_t *pub, z_moved_bytes_t *payload z_publisher_put_options_t opt; z_publisher_put_options_default(&opt); if (options != NULL) { - opt.encoding = options->encoding; - opt.timestamp = options->timestamp; - opt.attachment = options->attachment; + opt = *options; } + z_reliability_t reliability = Z_RELIABILITY_DEFAULT; +#if Z_FEATURE_UNSTABLE_API == 1 + reliability = pub->reliability; +#endif + _z_encoding_t encoding; if (opt.encoding == NULL) { _Z_RETURN_IF_ERR(_z_encoding_copy(&encoding, &pub->_encoding)); @@ -1184,13 +1206,13 @@ int8_t z_publisher_put(const z_loaned_publisher_t *pub, z_moved_bytes_t *payload // Write value ret = _z_write(_Z_RC_IN_VAL(&pub->_zn), pub_keyexpr, _z_bytes_from_owned_bytes(&payload->_this), &encoding, Z_SAMPLE_KIND_PUT, pub->_congestion_control, pub->_priority, pub->_is_express, opt.timestamp, - _z_bytes_from_owned_bytes(&opt.attachment->_this)); + _z_bytes_from_owned_bytes(&opt.attachment->_this), reliability); } // Trigger local subscriptions _z_trigger_local_subscriptions( _Z_RC_IN_VAL(&pub->_zn), pub_keyexpr, _z_bytes_from_owned_bytes(&payload->_this), &encoding, _z_n_qos_make(pub->_is_express, pub->_congestion_control == Z_CONGESTION_CONTROL_BLOCK, pub->_priority), - opt.timestamp, _z_bytes_from_owned_bytes(&opt.attachment->_this)); + opt.timestamp, _z_bytes_from_owned_bytes(&opt.attachment->_this), reliability); // Clean-up _z_encoding_clear(&encoding); z_bytes_drop(opt.attachment); @@ -1203,13 +1225,18 @@ int8_t z_publisher_delete(const z_loaned_publisher_t *pub, const z_publisher_del z_publisher_delete_options_t opt; z_publisher_delete_options_default(&opt); if (options != NULL) { - opt.timestamp = options->timestamp; + opt = *options; } + z_reliability_t reliability = Z_RELIABILITY_DEFAULT; +#if Z_FEATURE_UNSTABLE_API == 1 + reliability = pub->reliability; +#endif // Remove potentially redundant ke suffix _z_keyexpr_t pub_keyexpr = _z_keyexpr_alias_from_user_defined(pub->_key, true); return _z_write(_Z_RC_IN_VAL(&pub->_zn), pub_keyexpr, _z_bytes_null(), NULL, Z_SAMPLE_KIND_DELETE, - pub->_congestion_control, pub->_priority, pub->_is_express, opt.timestamp, _z_bytes_null()); + pub->_congestion_control, pub->_priority, pub->_is_express, opt.timestamp, _z_bytes_null(), + reliability); } const z_loaned_keyexpr_t *z_publisher_keyexpr(const z_loaned_publisher_t *publisher) { @@ -1530,16 +1557,11 @@ void _z_subscriber_drop(_z_subscriber_t *sub) { _z_undeclare_and_clear_subscribe _Z_OWNED_FUNCTIONS_VALUE_NO_COPY_IMPL(_z_subscriber_t, subscriber, _z_subscriber_check, _z_subscriber_null, _z_subscriber_drop) -void z_subscriber_options_default(z_subscriber_options_t *options) { -#if Z_FEATURE_UNSTABLE_API == 1 - options->reliability = Z_RELIABILITY_DEFAULT; -#else - options->__dummy = 0; -#endif -} +void z_subscriber_options_default(z_subscriber_options_t *options) { options->__dummy = 0; } int8_t z_declare_subscriber(z_owned_subscriber_t *sub, const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr, z_moved_closure_sample_t *callback, const z_subscriber_options_t *options) { + _ZP_UNUSED(options); void *ctx = callback->_this._val.context; callback->_this._val.context = NULL; @@ -1575,16 +1597,7 @@ int8_t z_declare_subscriber(z_owned_subscriber_t *sub, const z_loaned_session_t } } - _z_subinfo_t subinfo = _z_subinfo_default(); - if (options != NULL) { -#if Z_FEATURE_UNSTABLE_API == 1 - subinfo.reliability = options->reliability; -#else - subinfo.reliability = Z_RELIABILITY_DEFAULT; -#endif - } - _z_subscriber_t int_sub = - _z_declare_subscriber(zs, key, subinfo, callback->_this._val.call, callback->_this._val.drop, ctx); + _z_subscriber_t int_sub = _z_declare_subscriber(zs, key, callback->_this._val.call, callback->_this._val.drop, ctx); z_internal_closure_sample_null(&callback->_this); sub->_val = int_sub; diff --git a/src/net/primitives.c b/src/net/primitives.c index 70e4ab6e7..2fd2ea072 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -103,8 +103,8 @@ int8_t _z_undeclare_resource(_z_session_t *zn, uint16_t rid) { #if Z_FEATURE_PUBLICATION == 1 /*------------------ Publisher Declaration ------------------*/ _z_publisher_t _z_declare_publisher(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, _z_encoding_t *encoding, - z_congestion_control_t congestion_control, z_priority_t priority, - _Bool is_express) { + z_congestion_control_t congestion_control, z_priority_t priority, _Bool is_express, + z_reliability_t reliability) { // Allocate publisher _z_publisher_t ret; // Fill publisher @@ -113,6 +113,7 @@ _z_publisher_t _z_declare_publisher(const _z_session_rc_t *zn, _z_keyexpr_t keye ret._congestion_control = congestion_control; ret._priority = priority; ret._is_express = is_express; + ret.reliability = reliability; ret._zn = _z_session_rc_clone(zn); ret._encoding = encoding == NULL ? _z_encoding_null() : _z_encoding_steal(encoding); return ret; @@ -132,13 +133,16 @@ int8_t _z_undeclare_publisher(_z_publisher_t *pub) { /*------------------ Write ------------------*/ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, const _z_encoding_t *encoding, const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl, z_priority_t priority, - _Bool is_express, const _z_timestamp_t *timestamp, const _z_bytes_t attachment) { + _Bool is_express, const _z_timestamp_t *timestamp, const _z_bytes_t attachment, + z_reliability_t reliability) { int8_t ret = _Z_RES_OK; _z_network_message_t msg; switch (kind) { case Z_SAMPLE_KIND_PUT: + // TODO(refactor): use z_n_make_push msg = (_z_network_message_t){ ._tag = _Z_N_PUSH, + ._reliability = Z_RELIABILITY_DEFAULT, ._body._push = { ._key = keyexpr, @@ -157,8 +161,10 @@ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t p }; break; case Z_SAMPLE_KIND_DELETE: + // TODO(refactor): use z_n_make_push msg = (_z_network_message_t){ ._tag = _Z_N_PUSH, + ._reliability = Z_RELIABILITY_DEFAULT, ._body._push = { ._key = keyexpr, @@ -175,7 +181,7 @@ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t p return _Z_ERR_GENERIC; } - if (_z_send_n_msg(zn, &msg, Z_RELIABILITY_RELIABLE, cong_ctrl) != _Z_RES_OK) { + if (_z_send_n_msg(zn, &msg, reliability, cong_ctrl) != _Z_RES_OK) { ret = _Z_ERR_TRANSPORT_TX_FAILED; } @@ -187,13 +193,12 @@ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t p #if Z_FEATURE_SUBSCRIPTION == 1 /*------------------ Subscriber Declaration ------------------*/ -_z_subscriber_t _z_declare_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, _z_subinfo_t sub_info, - _z_data_handler_t callback, _z_drop_handler_t dropper, void *arg) { +_z_subscriber_t _z_declare_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, _z_data_handler_t callback, + _z_drop_handler_t dropper, void *arg) { _z_subscription_t s; s._id = _z_get_entity_id(_Z_RC_IN_VAL(zn)); s._key_id = keyexpr._id; s._key = _z_get_expanded_key_from_key(_Z_RC_IN_VAL(zn), &keyexpr); - s._info = sub_info; s._callback = callback; s._dropper = dropper; s._arg = arg; @@ -206,8 +211,7 @@ _z_subscriber_t _z_declare_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t ke return ret; } // Build the declare message to send on the wire - _z_declaration_t declaration = - _z_make_decl_subscriber(&keyexpr, s._id, sub_info.reliability == Z_RELIABILITY_RELIABLE); + _z_declaration_t declaration = _z_make_decl_subscriber(&keyexpr, s._id); _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); if (_z_send_n_msg(_Z_RC_IN_VAL(zn), &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { _z_unregister_subscription(_Z_RC_IN_VAL(zn), _Z_RESOURCE_IS_LOCAL, sp_s); @@ -338,8 +342,10 @@ int8_t _z_send_reply(const _z_query_t *query, const _z_session_rc_t *zsrc, _z_ke _z_zenoh_message_t z_msg; switch (kind) { case Z_SAMPLE_KIND_PUT: + // TODO(refactor): use z_n_make_reply z_msg = (_z_zenoh_message_t){ ._tag = _Z_N_RESPONSE, + ._reliability = Z_RELIABILITY_DEFAULT, ._body._response = { ._request_id = query->_request_id, @@ -369,8 +375,10 @@ int8_t _z_send_reply(const _z_query_t *query, const _z_session_rc_t *zsrc, _z_ke }; break; case Z_SAMPLE_KIND_DELETE: + // TODO(refactor): use z_n_make_reply z_msg = (_z_zenoh_message_t){ ._tag = _Z_N_RESPONSE, + ._reliability = Z_RELIABILITY_DEFAULT, ._body._response = { ._request_id = query->_request_id, @@ -417,7 +425,9 @@ int8_t _z_send_reply_err(const _z_query_t *query, const _z_session_rc_t *zsrc, c // Build the reply context decorator. This is NOT the final reply. _z_id_t zid = zn->_local_zid; _z_zenoh_message_t msg = { + // TODO(refactor): use z_n_make_reply ._tag = _Z_N_RESPONSE, + ._reliability = Z_RELIABILITY_DEFAULT, ._body._response = { ._request_id = query->_request_id, diff --git a/src/net/sample.c b/src/net/sample.c index fc1444241..74262f4c4 100644 --- a/src/net/sample.c +++ b/src/net/sample.c @@ -81,7 +81,7 @@ _z_sample_t _z_sample_duplicate(const _z_sample_t *src) { #if Z_FEATURE_SUBSCRIPTION == 1 _z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t payload, const _z_timestamp_t *timestamp, _z_encoding_t *encoding, const z_sample_kind_t kind, const _z_qos_t qos, - const _z_bytes_t attachment) { + const _z_bytes_t attachment, z_reliability_t reliability) { _z_sample_t s = _z_sample_null(); s.keyexpr = _z_keyexpr_steal(key); s.kind = kind; @@ -89,6 +89,7 @@ _z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t payload, const s.timestamp = _z_timestamp_duplicate(timestamp); } s.qos = qos; + s.reliability = reliability; _z_bytes_copy(&s.payload, &payload); _z_bytes_copy(&s.attachment, &attachment); if (encoding != NULL) { @@ -99,7 +100,7 @@ _z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t payload, const #else _z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t payload, const _z_timestamp_t *timestamp, _z_encoding_t *encoding, const z_sample_kind_t kind, const _z_qos_t qos, - const _z_bytes_t attachment) { + const _z_bytes_t attachment, z_reliability_t reliability) { _ZP_UNUSED(key); _ZP_UNUSED(payload); _ZP_UNUSED(timestamp); @@ -107,6 +108,7 @@ _z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t payload, const _ZP_UNUSED(kind); _ZP_UNUSED(qos); _ZP_UNUSED(attachment); + _ZP_UNUSED(reliability); return _z_sample_null(); } #endif diff --git a/src/net/subscribe.c b/src/net/subscribe.c index 6774f5daa..e213d01a9 100644 --- a/src/net/subscribe.c +++ b/src/net/subscribe.c @@ -14,11 +14,6 @@ #include "zenoh-pico/net/subscribe.h" #if Z_FEATURE_SUBSCRIPTION == 1 -_z_subinfo_t _z_subinfo_default(void) { - _z_subinfo_t si; - si.reliability = Z_RELIABILITY_BEST_EFFORT; - return si; -} void _z_subscriber_clear(_z_subscriber_t *sub) { // Nothing to clear diff --git a/src/protocol/codec/declarations.c b/src/protocol/codec/declarations.c index 451a1d05c..e91841cc8 100644 --- a/src/protocol/codec/declarations.c +++ b/src/protocol/codec/declarations.c @@ -77,13 +77,7 @@ int8_t _z_decl_commons_encode(_z_wbuf_t *wbf, uint8_t header, _Bool has_extensio } int8_t _z_decl_subscriber_encode(_z_wbuf_t *wbf, const _z_decl_subscriber_t *decl) { uint8_t header = _Z_DECL_SUBSCRIBER_MID; - _Bool has_submode_ext = decl->_ext_subinfo._reliable; - _Z_RETURN_IF_ERR(_z_decl_commons_encode(wbf, header, has_submode_ext, decl->_id, decl->_keyexpr)); - if (has_submode_ext) { - _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ENC_ZINT | 0x01)); - _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, (decl->_ext_subinfo._reliable ? 1 : 0))); - } - + _Z_RETURN_IF_ERR(_z_decl_commons_encode(wbf, header, false, decl->_id, decl->_keyexpr)); return _Z_RES_OK; } int8_t _z_undecl_kexpr_encode(_z_wbuf_t *wbf, const _z_undecl_kexpr_t *decl) { @@ -253,11 +247,8 @@ int8_t _z_decl_commons_decode(_z_zbuf_t *zbf, uint8_t header, _Bool *has_extensi return _Z_RES_OK; } int8_t _z_decl_subscriber_decode_extensions(_z_msg_ext_t *extension, void *ctx) { - _z_decl_subscriber_t *decl = (_z_decl_subscriber_t *)ctx; + _ZP_UNUSED(ctx); switch (extension->_header) { - case _Z_MSG_EXT_ENC_ZINT | 0x01: { - decl->_ext_subinfo._reliable = _Z_HAS_FLAG(extension->_body._zint._val, 1); - } break; default: if (_Z_HAS_FLAG(extension->_header, _Z_MSG_EXT_FLAG_M)) { return _z_msg_ext_unknown_error(extension, 0x14); diff --git a/src/protocol/definitions/declarations.c b/src/protocol/definitions/declarations.c index cd903aa03..9ee3a3720 100644 --- a/src/protocol/definitions/declarations.c +++ b/src/protocol/definitions/declarations.c @@ -62,11 +62,9 @@ _z_declaration_t _z_make_decl_keyexpr(uint16_t id, _Z_MOVE(_z_keyexpr_t) key) { _z_declaration_t _z_make_undecl_keyexpr(uint16_t id) { return (_z_declaration_t){._tag = _Z_UNDECL_KEXPR, ._body = {._undecl_kexpr = {._id = id}}}; } -_z_declaration_t _z_make_decl_subscriber(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, _Bool reliable) { - return (_z_declaration_t){ - ._tag = _Z_DECL_SUBSCRIBER, - ._body = {._decl_subscriber = { - ._id = id, ._keyexpr = _z_keyexpr_steal(key), ._ext_subinfo = {._reliable = reliable}}}}; +_z_declaration_t _z_make_decl_subscriber(_Z_MOVE(_z_keyexpr_t) key, uint32_t id) { + return (_z_declaration_t){._tag = _Z_DECL_SUBSCRIBER, + ._body = {._decl_subscriber = {._id = id, ._keyexpr = _z_keyexpr_steal(key)}}}; } _z_declaration_t _z_make_undecl_subscriber(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t *key) { diff --git a/src/protocol/definitions/network.c b/src/protocol/definitions/network.c index f0f73917d..52279593d 100644 --- a/src/protocol/definitions/network.c +++ b/src/protocol/definitions/network.c @@ -142,6 +142,7 @@ _z_zenoh_message_t _z_msg_make_query(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_slice z_priority_t priority, _Bool is_express) { return (_z_zenoh_message_t){ ._tag = _Z_N_REQUEST, + ._reliability = Z_RELIABILITY_DEFAULT, ._body._request = { ._rid = qid, @@ -166,12 +167,14 @@ _z_zenoh_message_t _z_msg_make_query(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_slice _z_network_message_t _z_n_msg_make_response_final(_z_zint_t rid) { return (_z_zenoh_message_t){ ._tag = _Z_N_RESPONSE_FINAL, + ._reliability = Z_RELIABILITY_DEFAULT, ._body = {._response_final = {._request_id = rid}}, }; } _z_network_message_t _z_n_msg_make_declare(_z_declaration_t declaration, _Bool has_interest_id, uint32_t interest_id) { return (_z_network_message_t){ ._tag = _Z_N_DECLARE, + ._reliability = Z_RELIABILITY_DEFAULT, ._body._declare = { .has_interest_id = has_interest_id, @@ -185,12 +188,14 @@ _z_network_message_t _z_n_msg_make_declare(_z_declaration_t declaration, _Bool h _z_network_message_t _z_n_msg_make_push(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_push_body_t) body) { return (_z_network_message_t){ ._tag = _Z_N_PUSH, + ._reliability = Z_RELIABILITY_DEFAULT, ._body._push = {._key = _z_keyexpr_steal(key), ._body = _z_push_body_steal(body)}, }; } _z_network_message_t _z_n_msg_make_reply(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_push_body_t) body) { return (_z_network_message_t){ ._tag = _Z_N_RESPONSE, + ._reliability = Z_RELIABILITY_DEFAULT, ._body._response = { ._key = _z_keyexpr_steal(key), @@ -216,6 +221,7 @@ _z_network_message_t _z_n_msg_make_reply(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) ke _z_network_message_t _z_n_msg_make_interest(_z_interest_t interest) { return (_z_network_message_t){ ._tag = _Z_N_INTEREST, + ._reliability = Z_RELIABILITY_DEFAULT, ._body._interest = { ._interest = interest, diff --git a/src/protocol/definitions/transport.c b/src/protocol/definitions/transport.c index 28b670f5d..8dcba2981 100644 --- a/src/protocol/definitions/transport.c +++ b/src/protocol/definitions/transport.c @@ -81,6 +81,13 @@ void _z_t_msg_clear(_z_transport_message_t *msg) { } } +z_reliability_t _z_t_msg_get_reliability(_z_transport_message_t *msg) { + if (_Z_HAS_FLAG(msg->_header, _Z_FLAG_T_FRAME_R)) { + return Z_RELIABILITY_RELIABLE; + } + return Z_RELIABILITY_BEST_EFFORT; +} + /*------------------ Join Message ------------------*/ _z_transport_message_t _z_t_msg_make_join(z_whatami_t whatami, _z_zint_t lease, _z_id_t zid, _z_conduit_sn_list_t next_sn) { diff --git a/src/session/interest.c b/src/session/interest.c index ef11c7d1f..ffdc40a15 100644 --- a/src/session/interest.c +++ b/src/session/interest.c @@ -124,8 +124,7 @@ static int8_t _z_interest_send_decl_subscriber(_z_session_t *zn, uint32_t intere _z_subscription_rc_t *sub = _z_subscription_rc_list_head(xs); // Build the declare message to send on the wire _z_keyexpr_t key = _z_keyexpr_alias(_Z_RC_IN_VAL(sub)->_key); - _z_declaration_t declaration = _z_make_decl_subscriber( - &key, _Z_RC_IN_VAL(sub)->_id, _Z_RC_IN_VAL(sub)->_info.reliability == Z_RELIABILITY_RELIABLE); + _z_declaration_t declaration = _z_make_decl_subscriber(&key, _Z_RC_IN_VAL(sub)->_id); _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, true, interest_id); if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { return _Z_ERR_TRANSPORT_TX_FAILED; diff --git a/src/session/push.c b/src/session/push.c index 277f52f38..eeff7ac8d 100644 --- a/src/session/push.c +++ b/src/session/push.c @@ -21,27 +21,31 @@ #include "zenoh-pico/session/subscription.h" #include "zenoh-pico/utils/logging.h" -int8_t _z_trigger_push(_z_session_t *zn, _z_n_msg_push_t *push) { +#if Z_FEATURE_SUBSCRIPTION == 1 +int8_t _z_trigger_push(_z_session_t *zn, _z_n_msg_push_t *push, z_reliability_t reliability) { int8_t ret = _Z_RES_OK; // TODO check body to know where to dispatch -#if Z_FEATURE_SUBSCRIPTION == 1 size_t kind = push->_body._is_put ? Z_SAMPLE_KIND_PUT : Z_SAMPLE_KIND_DELETE; if (push->_body._is_put) { _z_msg_put_t *put = &push->_body._body._put; ret = _z_trigger_subscriptions(zn, push->_key, put->_payload, &put->_encoding, kind, &put->_commons._timestamp, - push->_qos, put->_attachment); + push->_qos, put->_attachment, reliability); } else { _z_encoding_t encoding = _z_encoding_null(); _z_bytes_t payload = _z_bytes_null(); _z_msg_del_t *del = &push->_body._body._del; ret = _z_trigger_subscriptions(zn, push->_key, payload, &encoding, kind, &del->_commons._timestamp, push->_qos, - del->_attachment); + del->_attachment, reliability); } + return ret; +} #else +int8_t _z_trigger_push(_z_session_t *zn, _z_n_msg_push_t *push, z_reliability_t reliability) { _ZP_UNUSED(zn); _ZP_UNUSED(push); -#endif - return ret; + _ZP_UNUSED(reliability); + return _Z_RES_OK; } +#endif diff --git a/src/session/rx.c b/src/session/rx.c index 6109a5455..51ef6369d 100644 --- a/src/session/rx.c +++ b/src/session/rx.c @@ -83,7 +83,7 @@ int8_t _z_handle_network_message(_z_session_rc_t *zsrc, _z_zenoh_message_t *msg, case _Z_N_PUSH: { _Z_DEBUG("Handling _Z_N_PUSH"); _z_n_msg_push_t *push = &msg->_body._push; - ret = _z_trigger_push(zn, push); + ret = _z_trigger_push(zn, push, msg->_reliability); } break; case _Z_N_REQUEST: { _Z_DEBUG("Handling _Z_N_REQUEST"); @@ -102,7 +102,8 @@ int8_t _z_handle_network_message(_z_session_rc_t *zsrc, _z_zenoh_message_t *msg, #if Z_FEATURE_SUBSCRIPTION == 1 _z_msg_put_t put = req->_body._put; ret = _z_trigger_subscriptions(zn, req->_key, put._payload, &put._encoding, Z_SAMPLE_KIND_PUT, - &put._commons._timestamp, req->_ext_qos, put._attachment); + &put._commons._timestamp, req->_ext_qos, put._attachment, + msg->_reliability); #endif if (ret == _Z_RES_OK) { _z_network_message_t final = _z_n_msg_make_response_final(req->_rid); @@ -114,7 +115,8 @@ int8_t _z_handle_network_message(_z_session_rc_t *zsrc, _z_zenoh_message_t *msg, _z_msg_del_t del = req->_body._del; _z_encoding_t encoding = _z_encoding_null(); ret = _z_trigger_subscriptions(zn, req->_key, _z_bytes_null(), &encoding, Z_SAMPLE_KIND_DELETE, - &del._commons._timestamp, req->_ext_qos, del._attachment); + &del._commons._timestamp, req->_ext_qos, del._attachment, + msg->_reliability); #endif if (ret == _Z_RES_OK) { _z_network_message_t final = _z_n_msg_make_response_final(req->_rid); diff --git a/src/session/subscription.c b/src/session/subscription.c index 90a681ff2..a85338fb5 100644 --- a/src/session/subscription.c +++ b/src/session/subscription.c @@ -140,15 +140,15 @@ _z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_loca void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, _z_encoding_t *encoding, const _z_n_qos_t qos, const _z_timestamp_t *timestamp, - const _z_bytes_t attachment) { - int8_t ret = - _z_trigger_subscriptions(zn, keyexpr, payload, encoding, Z_SAMPLE_KIND_PUT, timestamp, qos, attachment); + const _z_bytes_t attachment, z_reliability_t reliability) { + int8_t ret = _z_trigger_subscriptions(zn, keyexpr, payload, encoding, Z_SAMPLE_KIND_PUT, timestamp, qos, attachment, + reliability); (void)ret; } int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, _z_encoding_t *encoding, const _z_zint_t kind, const _z_timestamp_t *timestamp, - const _z_n_qos_t qos, const _z_bytes_t attachment) { + const _z_n_qos_t qos, const _z_bytes_t attachment, z_reliability_t reliability) { int8_t ret = _Z_RES_OK; _zp_session_lock_mutex(zn); @@ -163,7 +163,7 @@ int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, co _zp_session_unlock_mutex(zn); // Build the sample - _z_sample_t sample = _z_sample_create(&key, payload, timestamp, encoding, kind, qos, attachment); + _z_sample_t sample = _z_sample_create(&key, payload, timestamp, encoding, kind, qos, attachment, reliability); // Parse subscription list _z_subscription_rc_list_t *xs = subs; _Z_DEBUG("Triggering %ju subs", (uintmax_t)_z_subscription_rc_list_len(xs)); @@ -209,7 +209,7 @@ void _z_flush_subscriptions(_z_session_t *zn) { void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, _z_encoding_t *encoding, const _z_n_qos_t qos, const _z_timestamp_t *timestamp, - const _z_bytes_t attachment) { + const _z_bytes_t attachment, z_reliability_t reliability) { _ZP_UNUSED(zn); _ZP_UNUSED(keyexpr); _ZP_UNUSED(payload); @@ -217,6 +217,7 @@ void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr _ZP_UNUSED(qos); _ZP_UNUSED(timestamp); _ZP_UNUSED(attachment); + _ZP_UNUSED(reliability); } #endif // Z_FEATURE_SUBSCRIPTION == 1 diff --git a/src/transport/multicast/rx.c b/src/transport/multicast/rx.c index badd82bc0..cf239fa4e 100644 --- a/src/transport/multicast/rx.c +++ b/src/transport/multicast/rx.c @@ -171,6 +171,8 @@ int8_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_t size_t len = _z_vec_len(&t_msg->_body._frame._messages); for (size_t i = 0; i < len; i++) { _z_network_message_t *zm = _z_network_message_vec_get(&t_msg->_body._frame._messages, i); + zm->_reliability = _z_t_msg_get_reliability(t_msg); + _z_msg_fix_mapping(zm, mapping); _z_handle_network_message(ztm->_session, zm, mapping); } @@ -211,6 +213,7 @@ int8_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_t _z_zenoh_message_t zm; ret = _z_network_message_decode(&zm, &zbf); + zm._reliability = _z_t_msg_get_reliability(t_msg); if (ret == _Z_RES_OK) { uint16_t mapping = entry->_peer_id; _z_msg_fix_mapping(&zm, mapping); diff --git a/src/transport/unicast/rx.c b/src/transport/unicast/rx.c index 1046f214f..2ce613ab4 100644 --- a/src/transport/unicast/rx.c +++ b/src/transport/unicast/rx.c @@ -130,9 +130,9 @@ int8_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_trans // Handle all the zenoh message, one by one size_t len = _z_vec_len(&t_msg->_body._frame._messages); for (size_t i = 0; i < len; i++) { - _z_handle_network_message(ztu->_session, - (_z_zenoh_message_t *)_z_vec_get(&t_msg->_body._frame._messages, i), - _Z_KEYEXPR_MAPPING_UNKNOWN_REMOTE); + _z_network_message_t *zm = _z_network_message_vec_get(&t_msg->_body._frame._messages, i); + zm->_reliability = _z_t_msg_get_reliability(t_msg); + _z_handle_network_message(ztu->_session, zm, _Z_KEYEXPR_MAPPING_UNKNOWN_REMOTE); } break; @@ -166,6 +166,7 @@ int8_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_trans _z_zenoh_message_t zm; ret = _z_network_message_decode(&zm, &zbf); + zm._reliability = _z_t_msg_get_reliability(t_msg); if (ret == _Z_RES_OK) { _z_handle_network_message(ztu->_session, &zm, _Z_KEYEXPR_MAPPING_UNKNOWN_REMOTE); _z_msg_clear(&zm); // Clear must be explicitly called for fragmented zenoh messages. Non-fragmented diff --git a/tests/z_msgcodec_test.c b/tests/z_msgcodec_test.c index 810489dec..5667f2188 100644 --- a/tests/z_msgcodec_test.c +++ b/tests/z_msgcodec_test.c @@ -642,20 +642,6 @@ void timestamp_field(void) { _z_wbuf_clear(&wbf); } -/*------------------ SubInfo field ------------------*/ -_z_subinfo_t gen_subinfo(void) { - _z_subinfo_t sm; - sm.reliability = gen_bool() ? Z_RELIABILITY_RELIABLE : Z_RELIABILITY_BEST_EFFORT; - - return sm; -} - -void assert_eq_subinfo(_z_subinfo_t *left, _z_subinfo_t *right) { - printf("SubInfo -> "); - printf("Reliable (%u:%u), ", left->reliability, right->reliability); - assert(left->reliability == right->reliability); -} - /*------------------ ResKey field ------------------*/ _z_keyexpr_t gen_keyexpr(void) { _z_keyexpr_t key; @@ -768,17 +754,13 @@ void resource_declaration(void) { /*------------------ Subscriber declaration ------------------*/ _z_decl_subscriber_t gen_subscriber_declaration(void) { - _z_subinfo_t subinfo = gen_subinfo(); - _z_decl_subscriber_t e_sd = {._keyexpr = gen_keyexpr(), - ._id = (uint32_t)gen_uint64(), - ._ext_subinfo = {._reliable = subinfo.reliability == Z_RELIABILITY_RELIABLE}}; + _z_decl_subscriber_t e_sd = {._keyexpr = gen_keyexpr(), ._id = (uint32_t)gen_uint64()}; return e_sd; } void assert_eq_subscriber_declaration(const _z_decl_subscriber_t *left, const _z_decl_subscriber_t *right) { assert_eq_keyexpr(&left->_keyexpr, &right->_keyexpr); assert(left->_id == right->_id); - assert(left->_ext_subinfo._reliable == right->_ext_subinfo._reliable); } void subscriber_declaration(void) {