diff --git a/include/zenoh-pico/link/endpoint.h b/include/zenoh-pico/link/endpoint.h index 8aa03390e..621d2840f 100644 --- a/include/zenoh-pico/link/endpoint.h +++ b/include/zenoh-pico/link/endpoint.h @@ -54,7 +54,7 @@ bool _z_locator_eq(const _z_locator_t *left, const _z_locator_t *right); void _z_locator_init(_z_locator_t *locator); _z_string_t _z_locator_to_string(const _z_locator_t *loc); -z_result_t _z_locator_from_string(_z_locator_t *lc, _z_string_t *s); +z_result_t _z_locator_from_string(_z_locator_t *lc, const _z_string_t *s); size_t _z_locator_size(_z_locator_t *lc); void _z_locator_clear(_z_locator_t *lc); @@ -72,7 +72,7 @@ typedef struct { } _z_endpoint_t; _z_string_t _z_endpoint_to_string(const _z_endpoint_t *e); -z_result_t _z_endpoint_from_string(_z_endpoint_t *ep, _z_string_t *s); +z_result_t _z_endpoint_from_string(_z_endpoint_t *ep, const _z_string_t *s); void _z_endpoint_clear(_z_endpoint_t *ep); void _z_endpoint_free(_z_endpoint_t **ep); diff --git a/include/zenoh-pico/link/link.h b/include/zenoh-pico/link/link.h index 522721fe8..8b2da1ea7 100644 --- a/include/zenoh-pico/link/link.h +++ b/include/zenoh-pico/link/link.h @@ -141,8 +141,8 @@ typedef struct _z_link_t { void _z_link_clear(_z_link_t *zl); void _z_link_free(_z_link_t **zl); -z_result_t _z_open_link(_z_link_t *zl, _z_string_t *locator); -z_result_t _z_listen_link(_z_link_t *zl, _z_string_t *locator); +z_result_t _z_open_link(_z_link_t *zl, const _z_string_t *locator); +z_result_t _z_listen_link(_z_link_t *zl, const _z_string_t *locator); z_result_t _z_link_send_wbuf(const _z_link_t *zl, const _z_wbuf_t *wbf); size_t _z_link_recv_zbuf(const _z_link_t *zl, _z_zbuf_t *zbf, _z_slice_t *addr); diff --git a/include/zenoh-pico/net/session.h b/include/zenoh-pico/net/session.h index 5ee1556dc..5a50ac308 100644 --- a/include/zenoh-pico/net/session.h +++ b/include/zenoh-pico/net/session.h @@ -21,6 +21,7 @@ #include "zenoh-pico/collections/list.h" #include "zenoh-pico/config.h" #include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/protocol/definitions/network.h" #include "zenoh-pico/session/liveliness.h" #include "zenoh-pico/session/queryable.h" #include "zenoh-pico/session/session.h" @@ -55,6 +56,11 @@ typedef struct _z_session_t { _z_resource_list_t *_local_resources; _z_resource_list_t *_remote_resources; + // Information for session restoring + // Empty _config means session is not restorable + _z_config_t _config; + _z_network_message_list_t *_decalaration_cache; + // Session subscriptions #if Z_FEATURE_SUBSCRIPTION == 1 _z_subscription_rc_list_t *_subscriptions; @@ -99,14 +105,43 @@ _Z_REFCOUNT_DEFINE(_z_session, _z_session) * Open a zenoh-net session * * Parameters: + * zn: A pointer of A :c:type:`_z_session_rc_t` used as a return value. * config: A set of properties. The caller keeps its ownership. - * zn: A pointer of A :c:type:`_z_session_t` used as a return value. + * zid: A pointer to Zenoh ID. + * + * Returns: + * ``0`` in case of success, or a ``negative value`` in case of failure. + */ +z_result_t _z_open(_z_session_rc_t *zn, _z_config_t *config, const _z_id_t *zid); + +/** + * Reopen a disconnected zenoh-net session + * + * Parameters: + * zn: Existing zenoh-net session. * * Returns: * ``0`` in case of success, or a ``negative value`` in case of failure. + */ +z_result_t _z_reopen(_z_session_rc_t *zn); + +/** + * Store declaration network message to cache for resend it after session restore * + * Parameters: + * zs: A zenoh-net session. + * z_msg: Network message with declaration + */ +void _z_cache_declaration(_z_session_t *zs, const _z_network_message_t *n_msg); + +/** + * Remove corresponding declaration from the cache + * + * Parameters: + * zs: A zenoh-net session. + * z_msg: Network message with undeclaration */ -z_result_t _z_open(_z_session_rc_t *zn, _z_config_t *config); +void _z_prune_declaration(_z_session_t *zs, const _z_network_message_t *n_msg); /** * Close a zenoh-net session. diff --git a/include/zenoh-pico/protocol/definitions/network.h b/include/zenoh-pico/protocol/definitions/network.h index f828d48f1..534a12f66 100644 --- a/include/zenoh-pico/protocol/definitions/network.h +++ b/include/zenoh-pico/protocol/definitions/network.h @@ -296,6 +296,7 @@ inline static void _z_msg_clear(_z_zenoh_message_t *msg) { _z_n_msg_clear(msg); inline static void _z_msg_free(_z_zenoh_message_t **msg) { _z_n_msg_free(msg); } _Z_ELEM_DEFINE(_z_network_message, _z_network_message_t, _z_noop_size, _z_n_msg_clear, _z_noop_copy, _z_noop_move) _Z_SVEC_DEFINE(_z_network_message, _z_network_message_t) +_Z_LIST_DEFINE(_z_network_message, _z_network_message_t) void _z_msg_fix_mapping(_z_zenoh_message_t *msg, uint16_t mapping); _z_network_message_t _z_msg_make_query(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_slice_t) parameters, _z_zint_t qid, @@ -308,6 +309,7 @@ _z_network_message_t _z_n_msg_make_declare(_z_declaration_t declaration, bool ha _z_network_message_t _z_n_msg_make_push(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_push_body_t) body); _z_network_message_t _z_n_msg_make_interest(_z_interest_t interest); z_result_t _z_n_msg_copy(_z_network_message_t *dst, const _z_network_message_t *src); +_z_network_message_t *_z_n_msg_clone(const _z_network_message_t *src); #ifdef __cplusplus } diff --git a/include/zenoh-pico/session/utils.h b/include/zenoh-pico/session/utils.h index 44c48ff91..238390e6e 100644 --- a/include/zenoh-pico/session/utils.h +++ b/include/zenoh-pico/session/utils.h @@ -30,7 +30,7 @@ extern "C" { _z_hello_list_t *_z_scout_inner(const z_what_t what, _z_id_t id, _z_string_t *locator, const uint32_t timeout, const bool exit_on_first); -z_result_t _z_session_init(_z_session_rc_t *zsrc, _z_id_t *zid); +z_result_t _z_session_init(_z_session_t *zn, const _z_id_t *zid); void _z_session_clear(_z_session_t *zn); z_result_t _z_session_close(_z_session_t *zn, uint8_t reason); diff --git a/include/zenoh-pico/transport/common/transport.h b/include/zenoh-pico/transport/common/transport.h new file mode 100644 index 000000000..36410df24 --- /dev/null +++ b/include/zenoh-pico/transport/common/transport.h @@ -0,0 +1,30 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#ifndef ZENOH_PICO_COMMON_TRANSPORT_H +#define ZENOH_PICO_COMMON_TRANSPORT_H + +#include "zenoh-pico/transport/transport.h" + +#ifdef __cplusplus +extern "C" { +#endif + +void _z_common_transport_clear(_z_transport_common_t *ztc, bool detach_tasks); + +#ifdef __cplusplus +} +#endif + +#endif /* ZENOH_PICO_COMMON_TRANSPORT_H*/ diff --git a/include/zenoh-pico/transport/manager.h b/include/zenoh-pico/transport/manager.h index b22eff975..c4c8bbe63 100644 --- a/include/zenoh-pico/transport/manager.h +++ b/include/zenoh-pico/transport/manager.h @@ -28,7 +28,8 @@ enum _z_peer_op_e { _Z_PEER_OP_LISTEN = 1, }; -z_result_t _z_new_transport(_z_transport_t *zt, _z_id_t *bs, _z_string_t *locator, z_whatami_t mode, int peer_op); +z_result_t _z_new_transport(_z_transport_t *zt, const _z_id_t *bs, const _z_string_t *locator, z_whatami_t mode, + int peer_op); void _z_free_transport(_z_transport_t **zt); #ifdef __cplusplus diff --git a/include/zenoh-pico/transport/multicast/transport.h b/include/zenoh-pico/transport/multicast/transport.h index 3e61f4bc1..d324dc540 100644 --- a/include/zenoh-pico/transport/multicast/transport.h +++ b/include/zenoh-pico/transport/multicast/transport.h @@ -29,7 +29,7 @@ z_result_t _z_multicast_open_client(_z_transport_multicast_establish_param_t *pa const _z_id_t *local_zid); z_result_t _z_multicast_send_close(_z_transport_multicast_t *ztm, uint8_t reason, bool link_only); z_result_t _z_multicast_transport_close(_z_transport_multicast_t *ztm, uint8_t reason); -void _z_multicast_transport_clear(_z_transport_t *zt); +void _z_multicast_transport_clear(_z_transport_multicast_t *ztm, bool detach_tasks); #if (Z_FEATURE_MULTICAST_TRANSPORT == 1 || Z_FEATURE_RAWETH_TRANSPORT == 1) && Z_FEATURE_MULTI_THREAD == 1 static inline void _z_multicast_peer_mutex_lock(_z_transport_multicast_t *ztm) { _z_mutex_lock(&ztm->_mutex_peer); } diff --git a/include/zenoh-pico/transport/transport.h b/include/zenoh-pico/transport/transport.h index d4fcf7ea7..b26a150bd 100644 --- a/include/zenoh-pico/transport/transport.h +++ b/include/zenoh-pico/transport/transport.h @@ -168,6 +168,7 @@ typedef struct { uint8_t _seq_num_res; } _z_transport_multicast_establish_param_t; +_z_transport_common_t *_z_transport_get_common(_z_transport_t *zt); z_result_t _z_transport_close(_z_transport_t *zt, uint8_t reason); void _z_transport_clear(_z_transport_t *zt); void _z_transport_free(_z_transport_t **zt); diff --git a/include/zenoh-pico/transport/unicast/transport.h b/include/zenoh-pico/transport/unicast/transport.h index 3ca322c42..84e439200 100644 --- a/include/zenoh-pico/transport/unicast/transport.h +++ b/include/zenoh-pico/transport/unicast/transport.h @@ -29,7 +29,7 @@ z_result_t _z_unicast_open_peer(_z_transport_unicast_establish_param_t *param, c const _z_id_t *local_zid, int peer_op); z_result_t _z_unicast_send_close(_z_transport_unicast_t *ztu, uint8_t reason, bool link_only); z_result_t _z_unicast_transport_close(_z_transport_unicast_t *ztu, uint8_t reason); -void _z_unicast_transport_clear(_z_transport_t *zt); +void _z_unicast_transport_clear(_z_transport_unicast_t *ztu, bool detach_tasks); #ifdef __cplusplus } diff --git a/src/api/api.c b/src/api/api.c index c4b827209..7b5e47bd5 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -42,6 +42,7 @@ #include "zenoh-pico/transport/common/tx.h" #include "zenoh-pico/transport/multicast.h" #include "zenoh-pico/transport/unicast.h" +#include "zenoh-pico/utils/config.h" #include "zenoh-pico/utils/endianness.h" #include "zenoh-pico/utils/logging.h" #include "zenoh-pico/utils/pointers.h" @@ -624,34 +625,71 @@ z_result_t z_scout(z_moved_config_t *config, z_moved_closure_hello_t *callback, void z_open_options_default(z_open_options_t *options) { options->__dummy = 0; } -z_result_t z_open(z_owned_session_t *zs, z_moved_config_t *config, const z_open_options_t *options) { - _ZP_UNUSED(options); +static _z_id_t _z_session_get_zid(const _z_config_t *config) { + _z_id_t zid = _z_id_empty(); + char *opt_as_str = _z_config_get(config, Z_CONFIG_SESSION_ZID_KEY); + if (opt_as_str != NULL) { + _z_uuid_to_bytes(zid.id, opt_as_str); + } else { + _z_session_generate_zid(&zid, Z_ZID_LENGTH); + } + return zid; +} + +static z_result_t _z_session_rc_init(z_owned_session_t *zs, _z_id_t *zid) { z_internal_session_null(zs); _z_session_t *s = z_malloc(sizeof(_z_session_t)); if (s == NULL) { - z_config_drop(config); return _Z_ERR_SYSTEM_OUT_OF_MEMORY; } - memset(s, 0, sizeof(_z_session_t)); - // Create rc + + z_result_t ret = _z_session_init(s, zid); + if (ret != _Z_RES_OK) { + z_free(s); + return ret; + } + _z_session_rc_t zsrc = _z_session_rc_new(s); if (zsrc._cnt == NULL) { + _z_session_clear(s); z_free(s); - z_config_drop(config); return _Z_ERR_SYSTEM_OUT_OF_MEMORY; } zs->_rc = zsrc; - // Open session - z_result_t ret = _z_open(&zs->_rc, &config->_this._val); + + return _Z_RES_OK; +} + +z_result_t z_open(z_owned_session_t *zs, z_moved_config_t *config, const z_open_options_t *options) { + _ZP_UNUSED(options); + + _z_config_t *cfg = &config->_this._val; + if (config == NULL) { + _Z_ERROR("A valid config is missing."); + return _Z_ERR_GENERIC; + } + + _z_id_t zid = _z_session_get_zid(cfg); + + z_result_t ret = _z_session_rc_init(zs, &zid); if (ret != _Z_RES_OK) { - _z_session_rc_decr(&zs->_rc); - z_internal_session_null(zs); z_config_drop(config); - z_free(s); + return ret; + } + + ret = _z_open(&zs->_rc, cfg, &zid); + if (ret != _Z_RES_OK) { + z_session_drop(z_session_move(zs)); + z_config_drop(config); return ret; } // Clean up - z_config_drop(config); + if (/* session is restorable*/ true) { + _Z_OWNED_RC_IN_VAL(zs)->_config = config->_this._val; + z_internal_config_null(&config->_this); + } else { + z_config_drop(config); + } return _Z_RES_OK; } diff --git a/src/link/endpoint.c b/src/link/endpoint.c index 1b15f5ec3..5cd621a37 100644 --- a/src/link/endpoint.c +++ b/src/link/endpoint.c @@ -85,7 +85,7 @@ bool _z_locator_eq(const _z_locator_t *left, const _z_locator_t *right) { return res; } -static z_result_t _z_locator_protocol_from_string(_z_string_t *protocol, _z_string_t *str) { +static z_result_t _z_locator_protocol_from_string(_z_string_t *protocol, const _z_string_t *str) { *protocol = _z_string_null(); const char *p_start = _z_string_data(str); @@ -97,7 +97,7 @@ static z_result_t _z_locator_protocol_from_string(_z_string_t *protocol, _z_stri return _z_string_copy_substring(protocol, str, 0, p_len); } -static z_result_t _z_locator_address_from_string(_z_string_t *address, _z_string_t *str) { +static z_result_t _z_locator_address_from_string(_z_string_t *address, const _z_string_t *str) { *address = _z_string_null(); // Find protocol separator @@ -130,7 +130,7 @@ static z_result_t _z_locator_address_from_string(_z_string_t *address, _z_string return _z_string_copy_substring(address, str, start_offset, addr_len); } -z_result_t _z_locator_metadata_from_string(_z_str_intmap_t *strint, _z_string_t *str) { +z_result_t _z_locator_metadata_from_string(_z_str_intmap_t *strint, const _z_string_t *str) { *strint = _z_str_intmap_make(); // Find metadata separator @@ -169,7 +169,7 @@ void _z_locator_metadata_onto_str(char *dst, size_t dst_len, const _z_str_intmap _z_str_intmap_onto_str(dst, dst_len, s, 0, NULL); } -z_result_t _z_locator_from_string(_z_locator_t *lc, _z_string_t *str) { +z_result_t _z_locator_from_string(_z_locator_t *lc, const _z_string_t *str) { if (str == NULL || !_z_string_check(str)) { return _Z_ERR_CONFIG_LOCATOR_INVALID; } @@ -284,7 +284,7 @@ void _z_endpoint_free(_z_endpoint_t **ep) { } } -z_result_t _z_endpoint_config_from_string(_z_str_intmap_t *strint, _z_string_t *str, _z_string_t *proto) { +z_result_t _z_endpoint_config_from_string(_z_str_intmap_t *strint, const _z_string_t *str, _z_string_t *proto) { char *p_start = (char *)memchr(_z_string_data(str), ENDPOINT_CONFIG_SEPARATOR, _z_string_len(str)); if (p_start != NULL) { p_start = _z_ptr_char_offset(p_start, 1); @@ -411,7 +411,7 @@ char *_z_endpoint_config_to_string(const _z_str_intmap_t *s, const _z_string_t * return NULL; } -z_result_t _z_endpoint_from_string(_z_endpoint_t *ep, _z_string_t *str) { +z_result_t _z_endpoint_from_string(_z_endpoint_t *ep, const _z_string_t *str) { _z_endpoint_init(ep); _Z_CLEAN_RETURN_IF_ERR(_z_locator_from_string(&ep->_locator, str), _z_endpoint_clear(ep)); _Z_CLEAN_RETURN_IF_ERR(_z_endpoint_config_from_string(&ep->_config, str, &ep->_locator._protocol), diff --git a/src/link/link.c b/src/link/link.c index 8c5ffb312..76f454b38 100644 --- a/src/link/link.c +++ b/src/link/link.c @@ -21,7 +21,7 @@ #include "zenoh-pico/link/manager.h" #include "zenoh-pico/utils/logging.h" -z_result_t _z_open_link(_z_link_t *zl, _z_string_t *locator) { +z_result_t _z_open_link(_z_link_t *zl, const _z_string_t *locator) { z_result_t ret = _Z_RES_OK; _z_endpoint_t ep; @@ -71,7 +71,7 @@ z_result_t _z_open_link(_z_link_t *zl, _z_string_t *locator) { return ret; } -z_result_t _z_listen_link(_z_link_t *zl, _z_string_t *locator) { +z_result_t _z_listen_link(_z_link_t *zl, const _z_string_t *locator) { z_result_t ret = _Z_RES_OK; _z_endpoint_t ep; diff --git a/src/net/primitives.c b/src/net/primitives.c index d1909188f..4864dc836 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -21,6 +21,9 @@ #include "zenoh-pico/collections/slice.h" #include "zenoh-pico/config.h" #include "zenoh-pico/net/filtering.h" +#include "zenoh-pico/net/logger.h" +#include "zenoh-pico/net/sample.h" +#include "zenoh-pico/net/session.h" #include "zenoh-pico/protocol/core.h" #include "zenoh-pico/protocol/definitions/declarations.h" #include "zenoh-pico/protocol/definitions/interest.h" @@ -38,6 +41,24 @@ #include "zenoh-pico/utils/logging.h" #include "zenoh-pico/utils/result.h" +/*------------------ Declaration Helpers ------------------*/ +static z_result_t _z_send_decalre(_z_session_t *zn, const _z_network_message_t *n_msg) { + z_result_t ret = _Z_RES_OK; + ret = _z_send_n_msg(zn, n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK); + if (ret == _Z_RES_OK) { + _z_cache_declaration(zn, n_msg); + } + return ret; +} + +static z_result_t _z_send_undecalre(_z_session_t *zn, const _z_network_message_t *n_msg) { + z_result_t ret = _Z_RES_OK; + ret = _z_send_n_msg(zn, n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK); + if (ret == _Z_RES_OK) { + _z_prune_declaration(zn, n_msg); + } + return ret; +} /*------------------ Scouting ------------------*/ void _z_scout(const z_what_t what, const _z_id_t zid, _z_string_t *locator, const uint32_t timeout, _z_closure_hello_callback_t callback, void *arg_call, _z_drop_handler_t dropper, void *arg_drop) { @@ -68,7 +89,7 @@ uint16_t _z_declare_resource(_z_session_t *zn, _z_keyexpr_t keyexpr) { _z_keyexpr_t alias = _z_keyexpr_alias(keyexpr); _z_declaration_t declaration = _z_make_decl_keyexpr(id, &alias); _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); - if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) == _Z_RES_OK) { + if (_z_send_decalre(zn, &n_msg) == _Z_RES_OK) { ret = id; } else { _z_unregister_resource(zn, id, _Z_KEYEXPR_MAPPING_LOCAL); @@ -88,7 +109,7 @@ z_result_t _z_undeclare_resource(_z_session_t *zn, uint16_t rid) { // Build the declare message to send on the wire _z_declaration_t declaration = _z_make_undecl_keyexpr(rid); _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); - if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) == _Z_RES_OK) { + if (_z_send_undecalre(zn, &n_msg) == _Z_RES_OK) { _z_unregister_resource(zn, rid, _Z_KEYEXPR_MAPPING_LOCAL); // Only if message is send, local resource is removed } else { @@ -235,7 +256,7 @@ _z_subscriber_t _z_declare_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t ke // Build the declare message to send on the wire _z_declaration_t declaration = _z_make_decl_subscriber(&keyexpr, s._id); _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); - if (_z_send_n_msg(_Z_RC_IN_VAL(zn), &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { + if (_z_send_decalre(_Z_RC_IN_VAL(zn), &n_msg) != _Z_RES_OK) { _z_unregister_subscription(_Z_RC_IN_VAL(zn), _Z_SUBSCRIBER_KIND_SUBSCRIBER, sp_s); _z_subscriber_clear(&ret); return ret; @@ -265,8 +286,7 @@ z_result_t _z_undeclare_subscriber(_z_subscriber_t *sub) { declaration = _z_make_undecl_subscriber(sub->_entity_id, &_Z_RC_IN_VAL(s)->_key); } _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); - if (_z_send_n_msg(_Z_RC_IN_VAL(&sub->_zn), &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != - _Z_RES_OK) { + if (_z_send_undecalre(_Z_RC_IN_VAL(&sub->_zn), &n_msg) != _Z_RES_OK) { return _Z_ERR_TRANSPORT_TX_FAILED; } _z_n_msg_clear(&n_msg); @@ -299,7 +319,7 @@ _z_queryable_t _z_declare_queryable(const _z_session_rc_t *zn, _z_keyexpr_t keye // Build the declare message to send on the wire _z_declaration_t declaration = _z_make_decl_queryable(&keyexpr, q._id, q._complete, _Z_QUERYABLE_DISTANCE_DEFAULT); _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); - if (_z_send_n_msg(_Z_RC_IN_VAL(zn), &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { + if (_z_send_decalre(_Z_RC_IN_VAL(zn), &n_msg) != _Z_RES_OK) { _z_unregister_session_queryable(_Z_RC_IN_VAL(zn), sp_q); _z_queryable_clear(&ret); return ret; @@ -328,8 +348,7 @@ z_result_t _z_undeclare_queryable(_z_queryable_t *qle) { declaration = _z_make_undecl_queryable(qle->_entity_id, &_Z_RC_IN_VAL(q)->_key); } _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); - if (_z_send_n_msg(_Z_RC_IN_VAL(&qle->_zn), &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != - _Z_RES_OK) { + if (_z_send_undecalre(_Z_RC_IN_VAL(&qle->_zn), &n_msg) != _Z_RES_OK) { return _Z_ERR_TRANSPORT_TX_FAILED; } _z_n_msg_clear(&n_msg); diff --git a/src/net/session.c b/src/net/session.c index c70fdbabe..43c07dfe4 100644 --- a/src/net/session.c +++ b/src/net/session.c @@ -17,15 +17,15 @@ #include #include -#include "zenoh-pico/api/primitives.h" -#include "zenoh-pico/collections/slice.h" +#include "zenoh-pico/api/constants.h" #include "zenoh-pico/collections/string.h" #include "zenoh-pico/config.h" -#include "zenoh-pico/net/sample.h" #include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/protocol/definitions/network.h" #include "zenoh-pico/session/utils.h" #include "zenoh-pico/transport/common/lease.h" #include "zenoh-pico/transport/common/read.h" +#include "zenoh-pico/transport/common/tx.h" #include "zenoh-pico/transport/multicast.h" #include "zenoh-pico/transport/multicast/lease.h" #include "zenoh-pico/transport/multicast/read.h" @@ -34,70 +34,50 @@ #include "zenoh-pico/transport/unicast.h" #include "zenoh-pico/transport/unicast/lease.h" #include "zenoh-pico/transport/unicast/read.h" +#include "zenoh-pico/utils/config.h" #include "zenoh-pico/utils/logging.h" +#include "zenoh-pico/utils/result.h" #include "zenoh-pico/utils/uuid.h" -static z_result_t __z_open_inner(_z_session_rc_t *zn, _z_string_t *locator, z_whatami_t mode, int peer_op) { +static z_result_t _z_locators_by_scout(const _z_config_t *config, const _z_id_t *zid, _z_string_svec_t *locators) { z_result_t ret = _Z_RES_OK; - _z_id_t local_zid = _z_id_empty(); - ret = _z_session_generate_zid(&local_zid, Z_ZID_LENGTH); - if (ret != _Z_RES_OK) { - local_zid = _z_id_empty(); - return ret; + char *opt_as_str = _z_config_get(config, Z_CONFIG_SCOUTING_WHAT_KEY); + if (opt_as_str == NULL) { + opt_as_str = (char *)Z_CONFIG_SCOUTING_WHAT_DEFAULT; } - ret = _z_new_transport(&_Z_RC_IN_VAL(zn)->_tp, &local_zid, locator, mode, peer_op); - if (ret != _Z_RES_OK) { - local_zid = _z_id_empty(); - return ret; + z_what_t what = strtol(opt_as_str, NULL, 10); + + opt_as_str = _z_config_get(config, Z_CONFIG_MULTICAST_LOCATOR_KEY); + if (opt_as_str == NULL) { + opt_as_str = (char *)Z_CONFIG_MULTICAST_LOCATOR_DEFAULT; } - ret = _z_session_init(zn, &local_zid); + _z_string_t mcast_locator = _z_string_alias_str(opt_as_str); + + opt_as_str = _z_config_get(config, Z_CONFIG_SCOUTING_TIMEOUT_KEY); + if (opt_as_str == NULL) { + opt_as_str = (char *)Z_CONFIG_SCOUTING_TIMEOUT_DEFAULT; + } + uint32_t timeout = (uint32_t)strtoul(opt_as_str, NULL, 10); + + // Scout and return upon the first result + _z_hello_list_t *hellos = _z_scout_inner(what, *zid, &mcast_locator, timeout, true); + if (hellos != NULL) { + _z_hello_t *hello = _z_hello_list_head(hellos); + _z_string_svec_copy(locators, &hello->_locators, true); + } + _z_hello_list_free(&hellos); return ret; } -z_result_t _z_open(_z_session_rc_t *zn, _z_config_t *config) { +static z_result_t _z_locators_by_config(_z_config_t *config, const _z_id_t *zid, _z_string_svec_t *locators, + int *peer_op) { z_result_t ret = _Z_RES_OK; - _Z_RC_IN_VAL(zn)->_tp._type = _Z_TRANSPORT_NONE; - - _z_id_t zid = _z_id_empty(); - char *opt_as_str = _z_config_get(config, Z_CONFIG_SESSION_ZID_KEY); - if (opt_as_str != NULL) { - _z_uuid_to_bytes(zid.id, opt_as_str); - } - if (config == NULL) { - _Z_ERROR("A valid config is missing."); - return _Z_ERR_GENERIC; - } - int peer_op = _Z_PEER_OP_LISTEN; - _z_string_svec_t locators = _z_string_svec_make(0); char *connect = _z_config_get(config, Z_CONFIG_CONNECT_KEY); char *listen = _z_config_get(config, Z_CONFIG_LISTEN_KEY); - if (connect == NULL && listen == NULL) { // Scout if peer is not configured - opt_as_str = _z_config_get(config, Z_CONFIG_SCOUTING_WHAT_KEY); - if (opt_as_str == NULL) { - opt_as_str = (char *)Z_CONFIG_SCOUTING_WHAT_DEFAULT; - } - z_what_t what = strtol(opt_as_str, NULL, 10); - - opt_as_str = _z_config_get(config, Z_CONFIG_MULTICAST_LOCATOR_KEY); - if (opt_as_str == NULL) { - opt_as_str = (char *)Z_CONFIG_MULTICAST_LOCATOR_DEFAULT; - } - _z_string_t mcast_locator = _z_string_alias_str(opt_as_str); - - opt_as_str = _z_config_get(config, Z_CONFIG_SCOUTING_TIMEOUT_KEY); - if (opt_as_str == NULL) { - opt_as_str = (char *)Z_CONFIG_SCOUTING_TIMEOUT_DEFAULT; - } - uint32_t timeout = (uint32_t)strtoul(opt_as_str, NULL, 10); - - // Scout and return upon the first result - _z_hello_list_t *hellos = _z_scout_inner(what, zid, &mcast_locator, timeout, true); - if (hellos != NULL) { - _z_hello_t *hello = _z_hello_list_head(hellos); - _z_string_svec_copy(&locators, &hello->_locators, true); - } - _z_hello_list_free(&hellos); + if (connect == NULL && listen == NULL) { + // Scout if peer is not configured + ret = _z_locators_by_scout(config, zid, locators); } else { uint_fast8_t key = Z_CONFIG_CONNECT_KEY; if (listen != NULL) { @@ -108,11 +88,62 @@ z_result_t _z_open(_z_session_rc_t *zn, _z_config_t *config) { return _Z_ERR_GENERIC; } } else { - peer_op = _Z_PEER_OP_OPEN; + *peer_op = _Z_PEER_OP_OPEN; } - locators = _z_string_svec_make(1); + *locators = _z_string_svec_make(1); _z_string_t s = _z_string_copy_from_str(_z_config_get(config, key)); - _z_string_svec_append(&locators, &s, true); + _z_string_svec_append(locators, &s, true); + } + return ret; +} + +static z_result_t _z_config_get_mode(const _z_config_t *config, z_whatami_t *mode) { + z_result_t ret = _Z_RES_OK; + char *s_mode = _z_config_get(config, Z_CONFIG_MODE_KEY); + *mode = Z_WHATAMI_CLIENT; // By default, zenoh-pico will operate as a client + if (s_mode != NULL) { + if (_z_str_eq(s_mode, Z_CONFIG_MODE_CLIENT) == true) { + *mode = Z_WHATAMI_CLIENT; + } else if (_z_str_eq(s_mode, Z_CONFIG_MODE_PEER) == true) { + *mode = Z_WHATAMI_PEER; + } else { + _Z_ERROR("Trying to configure an invalid mode: %s", s_mode); + ret = _Z_ERR_CONFIG_INVALID_MODE; + } + } + return ret; +} + +static z_result_t _z_open_inner(_z_session_rc_t *zn, _z_string_t *locator, const _z_id_t *zid, z_whatami_t mode, + int peer_op) { + z_result_t ret = _Z_RES_OK; + + _z_transport_t zt; + ret = _z_new_transport(&zt, zid, locator, mode, peer_op); + if (ret != _Z_RES_OK) { + return ret; + } + + _z_transport_get_common(&zt)->_session = zn; + _Z_RC_IN_VAL(zn)->_tp = zt; + return ret; +} + +z_result_t _z_open(_z_session_rc_t *zn, _z_config_t *config, const _z_id_t *zid) { + z_result_t ret = _Z_RES_OK; + _Z_RC_IN_VAL(zn)->_tp._type = _Z_TRANSPORT_NONE; + + int peer_op = _Z_PEER_OP_LISTEN; + _z_string_svec_t locators = _z_string_svec_make(0); + ret = _z_locators_by_config(config, zid, &locators, &peer_op); + if (ret != _Z_RES_OK) { + return ret; + } + + z_whatami_t mode; + ret = _z_config_get_mode(config, &mode); + if (ret != _Z_RES_OK) { + return ret; } ret = _Z_ERR_SCOUT_NO_RESULTS; @@ -124,32 +155,67 @@ z_result_t _z_open(_z_session_rc_t *zn, _z_config_t *config) { // @TODO: check invalid configurations // For example, client mode in multicast links - // Check operation mode - char *s_mode = _z_config_get(config, Z_CONFIG_MODE_KEY); - z_whatami_t mode = Z_WHATAMI_CLIENT; // By default, zenoh-pico will operate as a client - if (s_mode != NULL) { - if (_z_str_eq(s_mode, Z_CONFIG_MODE_CLIENT) == true) { - mode = Z_WHATAMI_CLIENT; - } else if (_z_str_eq(s_mode, Z_CONFIG_MODE_PEER) == true) { - mode = Z_WHATAMI_PEER; - } else { - ret = _Z_ERR_CONFIG_INVALID_MODE; - } - } - + ret = _z_open_inner(zn, locator, zid, mode, peer_op); if (ret == _Z_RES_OK) { - ret = __z_open_inner(zn, locator, mode, peer_op); - if (ret == _Z_RES_OK) { - break; - } - } else { - _Z_ERROR("Trying to configure an invalid mode."); + break; } } _z_string_svec_clear(&locators); return ret; } +z_result_t _z_reopen(_z_session_rc_t *zn) { + z_result_t ret = _Z_RES_OK; + _z_session_t *zs = _Z_RC_IN_VAL(zn); + if (_z_config_is_empty(&zs->_config)) { + return ret; + } + + do { + ret = _z_open(zn, &zs->_config, &zs->_local_zid); + // TODO(sashacmc): break on fatal error, and add timeout config + if (ret != _Z_RES_OK) { + _Z_DEBUG("Reopen failed: %i, next try in 1s", ret); + z_sleep_s(1); + continue; + } + + // TODO: currnetly we can come to reopen only from task, so we can restart them + // but we have no original attributes (which currently in all known cases is default + _zp_start_lease_task(_Z_RC_IN_VAL(zn), NULL); + _zp_start_read_task(_Z_RC_IN_VAL(zn), NULL); + + if (ret == _Z_RES_OK && !_z_network_message_list_is_empty(zs->_decalaration_cache)) { + _z_network_message_list_t *iter = zs->_decalaration_cache; + while (iter != NULL) { + _z_network_message_t *n_msg = _z_network_message_list_head(zs->_decalaration_cache); + ret = _z_send_n_msg(_Z_RC_IN_VAL(zn), n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK); + if (ret != _Z_RES_OK) { + _Z_DEBUG("Send message during reopen failed: %i", ret); + continue; + } + + iter = _z_network_message_list_tail(iter); + } + } + } while (ret != _Z_RES_OK); + + return ret; +} + +void _z_cache_declaration(_z_session_t *zs, const _z_network_message_t *n_msg) { + if (_z_config_is_empty(&zs->_config)) { + return; + } + zs->_decalaration_cache = _z_network_message_list_push(zs->_decalaration_cache, _z_n_msg_clone(n_msg)); +} + +void _z_prune_declaration(_z_session_t *zs, const _z_network_message_t *n_msg) { + (void)zs; + (void)n_msg; + // TODO(sashacmc): implement +} + void _z_close(_z_session_t *zn) { _z_session_close(zn, _Z_CLOSE_GENERIC); } bool _z_session_is_closed(const _z_session_t *session) { return session->_tp._type == _Z_TRANSPORT_NONE; } diff --git a/src/protocol/definitions/network.c b/src/protocol/definitions/network.c index 1364ac039..44069f619 100644 --- a/src/protocol/definitions/network.c +++ b/src/protocol/definitions/network.c @@ -352,6 +352,12 @@ z_result_t _z_n_msg_copy(_z_network_message_t *dst, const _z_network_message_t * } } +_z_network_message_t *_z_n_msg_clone(const _z_network_message_t *src) { + _z_network_message_t *dst = z_malloc(sizeof(_z_network_message_t)); + _z_n_msg_copy(dst, src); + return dst; +} + void _z_msg_fix_mapping(_z_zenoh_message_t *msg, uint16_t mapping) { switch (msg->_tag) { case _Z_N_DECLARE: { diff --git a/src/session/utils.c b/src/session/utils.c index 90ec97912..09524c138 100644 --- a/src/session/utils.c +++ b/src/session/utils.c @@ -18,12 +18,16 @@ #include "zenoh-pico/config.h" #include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/protocol/definitions/network.h" #include "zenoh-pico/session/interest.h" #include "zenoh-pico/session/liveliness.h" #include "zenoh-pico/session/query.h" #include "zenoh-pico/session/queryable.h" #include "zenoh-pico/session/resource.h" #include "zenoh-pico/session/subscription.h" +#include "zenoh-pico/transport/transport.h" +#include "zenoh-pico/utils/config.h" +#include "zenoh-pico/utils/result.h" /*------------------ clone helpers ------------------*/ void _z_timestamp_copy(_z_timestamp_t *dst, const _z_timestamp_t *src) { *dst = *src; } @@ -47,15 +51,23 @@ z_result_t _z_session_generate_zid(_z_id_t *bs, uint8_t size) { } /*------------------ Init/Free/Close session ------------------*/ -z_result_t _z_session_init(_z_session_rc_t *zsrc, _z_id_t *zid) { +z_result_t _z_session_init(_z_session_t *zn, const _z_id_t *zid) { z_result_t ret = _Z_RES_OK; - _z_session_t *zn = _Z_RC_IN_VAL(zsrc); + +#if Z_FEATURE_MULTI_THREAD == 1 + ret = _z_mutex_init(&zn->_mutex_inner); + if (ret != _Z_RES_OK) { + return ret; + } +#endif // Initialize the counters to 1 zn->_entity_id = 1; zn->_resource_id = 1; zn->_query_id = 1; + zn->_decalaration_cache = NULL; + // Initialize the data structs zn->_local_resources = NULL; zn->_remote_resources = NULL; @@ -76,33 +88,12 @@ z_result_t _z_session_init(_z_session_rc_t *zsrc, _z_id_t *zid) { zn->_pending_queries = NULL; #endif -#if Z_FEATURE_MULTI_THREAD == 1 - ret = _z_mutex_init(&zn->_mutex_inner); - if (ret != _Z_RES_OK) { - _z_transport_clear(&zn->_tp); - return ret; - } -#endif // Z_FEATURE_MULTI_THREAD == 1 - #if Z_FEATURE_LIVELINESS == 1 _z_liveliness_init(zn); #endif zn->_local_zid = *zid; - // Note session in transport - switch (zn->_tp._type) { - case _Z_TRANSPORT_UNICAST_TYPE: - zn->_tp._transport._unicast._common._session = zsrc; - break; - case _Z_TRANSPORT_MULTICAST_TYPE: - zn->_tp._transport._multicast._common._session = zsrc; - break; - case _Z_TRANSPORT_RAWETH_TYPE: - zn->_tp._transport._raweth._common._session = zsrc; - break; - default: - break; - } + return ret; } @@ -114,6 +105,10 @@ void _z_session_clear(_z_session_t *zn) { _zp_stop_read_task(zn); _zp_stop_lease_task(zn); #endif + + _z_config_clear(&zn->_config); + _z_network_message_list_free(&zn->_decalaration_cache); + _z_close(zn); // Clear Zenoh PID // Clean up transports diff --git a/src/transport/common/transport.c b/src/transport/common/transport.c new file mode 100644 index 000000000..7be8fcf9b --- /dev/null +++ b/src/transport/common/transport.c @@ -0,0 +1,57 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#include "zenoh-pico/transport/transport.h" + +#include + +#include "zenoh-pico/link/link.h" +#include "zenoh-pico/system/platform_common.h" + +void _z_common_transport_clear(_z_transport_common_t *ztc, bool detach_tasks) { +#if Z_FEATURE_MULTI_THREAD == 1 + // Clean up tasks + if (ztc->_read_task != NULL) { + ztc->_read_task_running = false; + if (detach_tasks) { + _z_task_detach(ztc->_read_task); + } else { + _z_task_join(ztc->_read_task); + } + z_free(ztc->_read_task); + ztc->_read_task = NULL; + } + if (ztc->_lease_task != NULL) { + ztc->_lease_task_running = false; + if (detach_tasks) { + _z_task_detach(ztc->_lease_task); + } else { + _z_task_join(ztc->_lease_task); + } + z_free(ztc->_lease_task); + ztc->_lease_task = NULL; + } + + // Clean up the mutexes + _z_mutex_drop(&ztc->_mutex_tx); + _z_mutex_drop(&ztc->_mutex_rx); +#endif // Z_FEATURE_MULTI_THREAD == 1 + + // Clean up the buffers + _z_wbuf_clear(&ztc->_wbuf); + _z_zbuf_clear(&ztc->_zbuf); + _z_arc_slice_svec_release(&ztc->_arc_pool); + _z_network_message_svec_release(&ztc->_msg_pool); + + _z_link_clear(&ztc->_link); +} diff --git a/src/transport/manager.c b/src/transport/manager.c index 6b2447eea..77fb21c65 100644 --- a/src/transport/manager.c +++ b/src/transport/manager.c @@ -20,7 +20,7 @@ #include "zenoh-pico/transport/multicast/transport.h" #include "zenoh-pico/transport/unicast/transport.h" -static z_result_t _z_new_transport_client(_z_transport_t *zt, _z_string_t *locator, _z_id_t *local_zid) { +static z_result_t _z_new_transport_client(_z_transport_t *zt, const _z_string_t *locator, const _z_id_t *local_zid) { z_result_t ret = _Z_RES_OK; // Init link _z_link_t zl; @@ -62,7 +62,8 @@ static z_result_t _z_new_transport_client(_z_transport_t *zt, _z_string_t *locat return ret; } -static z_result_t _z_new_transport_peer(_z_transport_t *zt, _z_string_t *locator, _z_id_t *local_zid, int peer_op) { +static z_result_t _z_new_transport_peer(_z_transport_t *zt, const _z_string_t *locator, const _z_id_t *local_zid, + int peer_op) { z_result_t ret = _Z_RES_OK; // Init link _z_link_t zl; @@ -105,7 +106,8 @@ static z_result_t _z_new_transport_peer(_z_transport_t *zt, _z_string_t *locator return ret; } -z_result_t _z_new_transport(_z_transport_t *zt, _z_id_t *bs, _z_string_t *locator, z_whatami_t mode, int peer_op) { +z_result_t _z_new_transport(_z_transport_t *zt, const _z_id_t *bs, const _z_string_t *locator, z_whatami_t mode, + int peer_op) { z_result_t ret; if (mode == Z_WHATAMI_CLIENT) { diff --git a/src/transport/multicast/lease.c b/src/transport/multicast/lease.c index b8f98cd19..515be2585 100644 --- a/src/transport/multicast/lease.c +++ b/src/transport/multicast/lease.c @@ -55,6 +55,8 @@ z_result_t _zp_multicast_send_keep_alive(_z_transport_multicast_t *ztm) { #if Z_FEATURE_MULTI_THREAD == 1 && (Z_FEATURE_MULTICAST_TRANSPORT == 1 || Z_FEATURE_RAWETH_TRANSPORT == 1) +static void _zp_multicast_failed(_z_transport_multicast_t *ztm) { _z_reopen(ztm->_common._session); } + static _z_zint_t _z_get_minimum_lease(_z_transport_peer_entry_list_t *peers, _z_zint_t local_lease) { _z_zint_t ret = local_lease; @@ -133,6 +135,7 @@ void *_zp_multicast_lease_task(void *ztm_arg) { if (ztm->_common._transmitted == false) { if (_zp_multicast_send_keep_alive(ztm) < 0) { _Z_INFO("Send keep alive failed."); + _zp_multicast_failed(ztm); } } // Reset the keep alive parameters diff --git a/src/transport/multicast/transport.c b/src/transport/multicast/transport.c index bbe13bf19..2bdcabf45 100644 --- a/src/transport/multicast/transport.c +++ b/src/transport/multicast/transport.c @@ -11,7 +11,7 @@ // Contributors: // ZettaScale Zenoh Team, -#include "zenoh-pico/transport/multicast/transport.h" +#include "zenoh-pico/transport/common/transport.h" #include #include @@ -20,13 +20,9 @@ #include #include "zenoh-pico/link/link.h" -#include "zenoh-pico/transport/common/lease.h" -#include "zenoh-pico/transport/common/read.h" #include "zenoh-pico/transport/common/tx.h" -#include "zenoh-pico/transport/multicast.h" -#include "zenoh-pico/transport/multicast/rx.h" +#include "zenoh-pico/transport/multicast/transport.h" #include "zenoh-pico/transport/raweth/tx.h" -#include "zenoh-pico/transport/unicast/rx.h" #include "zenoh-pico/transport/utils.h" #include "zenoh-pico/utils/logging.h" @@ -193,33 +189,13 @@ z_result_t _z_multicast_transport_close(_z_transport_multicast_t *ztm, uint8_t r return _z_multicast_send_close(ztm, reason, false); } -void _z_multicast_transport_clear(_z_transport_t *zt) { - _z_transport_multicast_t *ztm = &zt->_transport._multicast; +void _z_multicast_transport_clear(_z_transport_multicast_t *ztm, bool detach_tasks) { + _z_common_transport_clear(&ztm->_common, detach_tasks); #if Z_FEATURE_MULTI_THREAD == 1 - // Clean up tasks - if (ztm->_common._read_task != NULL) { - _z_task_join(ztm->_common._read_task); - z_free(ztm->_common._read_task); - } - if (ztm->_common._lease_task != NULL) { - _z_task_join(ztm->_common._lease_task); - z_free(ztm->_common._lease_task); - } - // Clean up the mutexes - _z_mutex_drop(&ztm->_common._mutex_tx); - _z_mutex_drop(&ztm->_common._mutex_rx); _z_mutex_drop(&ztm->_mutex_peer); #endif // Z_FEATURE_MULTI_THREAD == 1 - // Clean up the buffers - _z_wbuf_clear(&ztm->_common._wbuf); - _z_zbuf_clear(&ztm->_common._zbuf); - _z_arc_slice_svec_release(&ztm->_common._arc_pool); - _z_network_message_svec_release(&ztm->_common._msg_pool); - - // Clean up peer list _z_transport_peer_entry_list_free(&ztm->_peers); - _z_link_clear(&ztm->_common._link); } #else diff --git a/src/transport/transport.c b/src/transport/transport.c index 2540ec5ac..89213a647 100644 --- a/src/transport/transport.c +++ b/src/transport/transport.c @@ -19,17 +19,25 @@ #include #include "zenoh-pico/config.h" -#include "zenoh-pico/link/link.h" #include "zenoh-pico/protocol/core.h" -#include "zenoh-pico/transport/multicast/rx.h" -#include "zenoh-pico/transport/raweth/rx.h" -#include "zenoh-pico/transport/raweth/tx.h" #include "zenoh-pico/transport/transport.h" -#include "zenoh-pico/transport/unicast/rx.h" #include "zenoh-pico/transport/unicast/transport.h" -#include "zenoh-pico/transport/utils.h" #include "zenoh-pico/utils/logging.h" +_z_transport_common_t *_z_transport_get_common(_z_transport_t *zt) { + switch (zt->_type) { + case _Z_TRANSPORT_UNICAST_TYPE: + return &zt->_transport._unicast._common; + case _Z_TRANSPORT_MULTICAST_TYPE: + return &zt->_transport._multicast._common; + case _Z_TRANSPORT_RAWETH_TYPE: + return &zt->_transport._raweth._common; + default: + _Z_DEBUG("None transport, it should never happens"); + return NULL; + } +} + z_result_t _z_send_close(_z_transport_t *zt, uint8_t reason, bool link_only) { z_result_t ret = _Z_RES_OK; // Call transport function @@ -53,11 +61,11 @@ z_result_t _z_transport_close(_z_transport_t *zt, uint8_t reason) { return _z_se void _z_transport_clear(_z_transport_t *zt) { switch (zt->_type) { case _Z_TRANSPORT_UNICAST_TYPE: - _z_unicast_transport_clear(zt); + _z_unicast_transport_clear(&zt->_transport._unicast, false); break; case _Z_TRANSPORT_MULTICAST_TYPE: case _Z_TRANSPORT_RAWETH_TYPE: - _z_multicast_transport_clear(zt); + _z_multicast_transport_clear(&zt->_transport._multicast, false); break; default: break; @@ -78,49 +86,16 @@ void _z_transport_free(_z_transport_t **zt) { #if Z_FEATURE_BATCHING == 1 bool _z_transport_start_batching(_z_transport_t *zt) { - uint8_t *batch_state = NULL; - size_t *batch_count = NULL; - switch (zt->_type) { - case _Z_TRANSPORT_UNICAST_TYPE: - batch_state = &zt->_transport._unicast._common._batch_state; - batch_count = &zt->_transport._unicast._common._batch_count; - break; - case _Z_TRANSPORT_MULTICAST_TYPE: - batch_state = &zt->_transport._multicast._common._batch_state; - batch_count = &zt->_transport._multicast._common._batch_count; - break; - case _Z_TRANSPORT_RAWETH_TYPE: - batch_state = &zt->_transport._raweth._common._batch_state; - batch_count = &zt->_transport._raweth._common._batch_count; - break; - default: - break; - } - if (*batch_state == _Z_BATCHING_ACTIVE) { + _z_transport_common_t *ztc = _z_transport_get_common(zt); + if (ztc->_batch_state == _Z_BATCHING_ACTIVE) { return false; } - *batch_count = 0; - *batch_state = _Z_BATCHING_ACTIVE; + ztc->_batch_count = 0; + ztc->_batch_state = _Z_BATCHING_ACTIVE; return true; } -void _z_transport_stop_batching(_z_transport_t *zt) { - uint8_t *batch_state = NULL; - switch (zt->_type) { - case _Z_TRANSPORT_UNICAST_TYPE: - batch_state = &zt->_transport._unicast._common._batch_state; - break; - case _Z_TRANSPORT_MULTICAST_TYPE: - batch_state = &zt->_transport._multicast._common._batch_state; - break; - case _Z_TRANSPORT_RAWETH_TYPE: - batch_state = &zt->_transport._raweth._common._batch_state; - break; - default: - break; - } - *batch_state = _Z_BATCHING_IDLE; -} +void _z_transport_stop_batching(_z_transport_t *zt) { _z_transport_get_common(zt)->_batch_state = _Z_BATCHING_IDLE; } #endif /** diff --git a/src/transport/unicast/lease.c b/src/transport/unicast/lease.c index c4cd362f1..3b0fab6b1 100644 --- a/src/transport/unicast/lease.c +++ b/src/transport/unicast/lease.c @@ -15,8 +15,9 @@ #include "zenoh-pico/transport/unicast/lease.h" #include "zenoh-pico/session/query.h" -#include "zenoh-pico/session/utils.h" +#include "zenoh-pico/system/platform_common.h" #include "zenoh-pico/transport/common/tx.h" +#include "zenoh-pico/transport/transport.h" #include "zenoh-pico/transport/unicast/transport.h" #include "zenoh-pico/utils/logging.h" @@ -40,6 +41,13 @@ z_result_t _zp_unicast_send_keep_alive(_z_transport_unicast_t *ztu) { #if Z_FEATURE_MULTI_THREAD == 1 && Z_FEATURE_UNICAST_TRANSPORT == 1 +static void _zp_unicast_failed(_z_transport_unicast_t *ztu) { + _z_session_rc_ref_t *zs = ztu->_common._session; + _z_unicast_transport_close(ztu, _Z_CLOSE_EXPIRED); + _z_unicast_transport_clear(ztu, true); + _z_reopen(zs); +} + void *_zp_unicast_lease_task(void *ztu_arg) { _z_transport_unicast_t *ztu = (_z_transport_unicast_t *)ztu_arg; @@ -57,9 +65,8 @@ void *_zp_unicast_lease_task(void *ztu_arg) { ztu->_received = false; } else { _Z_INFO("Closing session because it has expired after %zums", ztu->_common._lease); - ztu->_common._lease_task_running = false; - _z_unicast_transport_close(ztu, _Z_CLOSE_EXPIRED); - break; + _zp_unicast_failed(ztu); + return 0; } next_lease = (int)ztu->_common._lease; } @@ -69,6 +76,8 @@ void *_zp_unicast_lease_task(void *ztu_arg) { if (ztu->_common._transmitted == false) { if (_zp_unicast_send_keep_alive(ztu) < 0) { _Z_INFO("Send keep alive failed."); + _zp_unicast_failed(ztu); + return 0; } } // Reset the keep alive parameters diff --git a/src/transport/unicast/transport.c b/src/transport/unicast/transport.c index 64a1d6540..2e7831359 100644 --- a/src/transport/unicast/transport.c +++ b/src/transport/unicast/transport.c @@ -11,6 +11,8 @@ // Contributors: // ZettaScale Zenoh Team, +#include "zenoh-pico/transport/common/transport.h" + #include #include #include @@ -18,13 +20,10 @@ #include #include "zenoh-pico/link/link.h" +#include "zenoh-pico/system/platform_common.h" #include "zenoh-pico/transport/common/rx.h" #include "zenoh-pico/transport/common/tx.h" -#include "zenoh-pico/transport/multicast/rx.h" -#include "zenoh-pico/transport/unicast.h" -#include "zenoh-pico/transport/unicast/lease.h" -#include "zenoh-pico/transport/unicast/read.h" -#include "zenoh-pico/transport/unicast/rx.h" +#include "zenoh-pico/transport/unicast/transport.h" #include "zenoh-pico/transport/utils.h" #include "zenoh-pico/utils/logging.h" @@ -335,29 +334,9 @@ z_result_t _z_unicast_transport_close(_z_transport_unicast_t *ztu, uint8_t reaso return _z_unicast_send_close(ztu, reason, false); } -void _z_unicast_transport_clear(_z_transport_t *zt) { - _z_transport_unicast_t *ztu = &zt->_transport._unicast; -#if Z_FEATURE_MULTI_THREAD == 1 - // Clean up tasks - if (ztu->_common._read_task != NULL) { - _z_task_join(ztu->_common._read_task); - z_free(ztu->_common._read_task); - } - if (ztu->_common._lease_task != NULL) { - _z_task_join(ztu->_common._lease_task); - z_free(ztu->_common._lease_task); - } - - // Clean up the mutexes - _z_mutex_drop(&ztu->_common._mutex_tx); - _z_mutex_drop(&ztu->_common._mutex_rx); -#endif // Z_FEATURE_MULTI_THREAD == 1 +void _z_unicast_transport_clear(_z_transport_unicast_t *ztu, bool detach_tasks) { + _z_common_transport_clear(&ztu->_common, detach_tasks); - // Clean up the buffers - _z_wbuf_clear(&ztu->_common._wbuf); - _z_zbuf_clear(&ztu->_common._zbuf); - _z_arc_slice_svec_release(&ztu->_common._arc_pool); - _z_network_message_svec_release(&ztu->_common._msg_pool); #if Z_FEATURE_FRAGMENTATION == 1 _z_wbuf_clear(&ztu->_dbuf_reliable); _z_wbuf_clear(&ztu->_dbuf_best_effort); @@ -365,7 +344,6 @@ void _z_unicast_transport_clear(_z_transport_t *zt) { // Clean up PIDs ztu->_remote_zid = _z_id_empty(); - _z_link_clear(&ztu->_common._link); } #else