Skip to content

Commit

Permalink
Add querier session check
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Jan 24, 2025
1 parent 78889b8 commit 5048e78
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 5 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ set(Z_FEATURE_UNICAST_TRANSPORT 1 CACHE STRING "Toggle unicast transport")
set(Z_FEATURE_RAWETH_TRANSPORT 0 CACHE STRING "Toggle raw ethernet transport")
set(Z_FEATURE_TCP_NODELAY 1 CACHE STRING "Toggle TCP_NODELAY")
set(Z_FEATURE_LOCAL_SUBSCRIBER 0 CACHE STRING "Toggle local subscriptions")
set(Z_FEATURE_PUBLISHER_SESSION_CHECK 1 CACHE STRING "Toggle publisher session check")
set(Z_FEATURE_SESSION_CHECK 1 CACHE STRING "Toggle publisher/querier session check")
set(Z_FEATURE_BATCHING 1 CACHE STRING "Toggle batching")
set(Z_FEATURE_MATCHING 1 CACHE STRING "Toggle matching feature")
set(Z_FEATURE_RX_CACHE 0 CACHE STRING "Toggle RX_CACHE")
Expand Down
14 changes: 10 additions & 4 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,7 @@ z_result_t z_publisher_put(const z_loaned_publisher_t *pub, z_moved_bytes_t *pay
_z_keyexpr_t pub_keyexpr = _z_keyexpr_alias_from_user_defined(pub->_key, true);

_z_session_t *session = NULL;
#if Z_FEATURE_PUBLISHER_SESSION_CHECK == 1
#if Z_FEATURE_SESSION_CHECK == 1
// Try to upgrade session rc
_z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&pub->_zn);
if (!_Z_RC_IS_NULL(&sess_rc)) {
Expand Down Expand Up @@ -1054,7 +1054,7 @@ z_result_t z_publisher_put(const z_loaned_publisher_t *pub, z_moved_bytes_t *pay
ret = _Z_ERR_SESSION_CLOSED;
}

#if Z_FEATURE_PUBLISHER_SESSION_CHECK == 1
#if Z_FEATURE_SESSION_CHECK == 1
_z_session_rc_drop(&sess_rc);
#endif

Expand All @@ -1080,7 +1080,7 @@ z_result_t z_publisher_delete(const z_loaned_publisher_t *pub, const z_publisher
_z_keyexpr_t pub_keyexpr = _z_keyexpr_alias_from_user_defined(pub->_key, true);

_z_session_t *session = NULL;
#if Z_FEATURE_PUBLISHER_SESSION_CHECK == 1
#if Z_FEATURE_SESSION_CHECK == 1
// Try to upgrade session rc
_z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&pub->_zn);
if (!_Z_RC_IS_NULL(&sess_rc)) {
Expand All @@ -1096,7 +1096,7 @@ z_result_t z_publisher_delete(const z_loaned_publisher_t *pub, const z_publisher
_z_write(session, pub_keyexpr, _z_bytes_null(), NULL, Z_SAMPLE_KIND_DELETE, pub->_congestion_control,
pub->_priority, pub->_is_express, opt.timestamp, _z_bytes_null(), reliability);

#if Z_FEATURE_PUBLISHER_SESSION_CHECK == 1
#if Z_FEATURE_SESSION_CHECK == 1
// Clean up
_z_session_rc_drop(&sess_rc);
#endif
Expand Down Expand Up @@ -1288,13 +1288,17 @@ z_result_t z_querier_get(const z_loaned_querier_t *querier, const char *paramete
_z_keyexpr_t querier_keyexpr = _z_keyexpr_alias_from_user_defined(querier->_key, true);

_z_session_t *session = NULL;
#if Z_FEATURE_SESSION_CHECK == 1
// Try to upgrade session rc
_z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&querier->_zn);
if (!_Z_RC_IS_NULL(&sess_rc)) {
session = _Z_RC_IN_VAL(&sess_rc);
} else {
ret = _Z_ERR_SESSION_CLOSED;
}
#else
session = _Z_RC_IN_VAL(&querier->_zn);
#endif

z_consolidation_mode_t consolidation_mode = querier->_consolidation_mode;
if (consolidation_mode == Z_CONSOLIDATION_MODE_AUTO) {
Expand All @@ -1318,7 +1322,9 @@ z_result_t z_querier_get(const z_loaned_querier_t *querier, const char *paramete
ret = _Z_ERR_SESSION_CLOSED;
}

#if Z_FEATURE_SESSION_CHECK == 1
_z_session_rc_drop(&sess_rc);
#endif

// Clean-up
z_bytes_drop(opt.payload);
Expand Down

0 comments on commit 5048e78

Please sign in to comment.