Skip to content

Commit

Permalink
Add querier session check
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Jan 24, 2025
1 parent 78889b8 commit d8a84b8
Showing 1 changed file with 10 additions and 4 deletions.
14 changes: 10 additions & 4 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,7 @@ z_result_t z_publisher_put(const z_loaned_publisher_t *pub, z_moved_bytes_t *pay
_z_keyexpr_t pub_keyexpr = _z_keyexpr_alias_from_user_defined(pub->_key, true);

_z_session_t *session = NULL;
#if Z_FEATURE_PUBLISHER_SESSION_CHECK == 1
#if Z_FEATURE_SESSION_CHECK == 1
// Try to upgrade session rc
_z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&pub->_zn);
if (!_Z_RC_IS_NULL(&sess_rc)) {
Expand Down Expand Up @@ -1054,7 +1054,7 @@ z_result_t z_publisher_put(const z_loaned_publisher_t *pub, z_moved_bytes_t *pay
ret = _Z_ERR_SESSION_CLOSED;
}

#if Z_FEATURE_PUBLISHER_SESSION_CHECK == 1
#if Z_FEATURE_SESSION_CHECK == 1
_z_session_rc_drop(&sess_rc);
#endif

Expand All @@ -1080,7 +1080,7 @@ z_result_t z_publisher_delete(const z_loaned_publisher_t *pub, const z_publisher
_z_keyexpr_t pub_keyexpr = _z_keyexpr_alias_from_user_defined(pub->_key, true);

_z_session_t *session = NULL;
#if Z_FEATURE_PUBLISHER_SESSION_CHECK == 1
#if Z_FEATURE_SESSION_CHECK == 1
// Try to upgrade session rc
_z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&pub->_zn);
if (!_Z_RC_IS_NULL(&sess_rc)) {
Expand All @@ -1096,7 +1096,7 @@ z_result_t z_publisher_delete(const z_loaned_publisher_t *pub, const z_publisher
_z_write(session, pub_keyexpr, _z_bytes_null(), NULL, Z_SAMPLE_KIND_DELETE, pub->_congestion_control,
pub->_priority, pub->_is_express, opt.timestamp, _z_bytes_null(), reliability);

#if Z_FEATURE_PUBLISHER_SESSION_CHECK == 1
#if Z_FEATURE_SESSION_CHECK == 1
// Clean up
_z_session_rc_drop(&sess_rc);
#endif
Expand Down Expand Up @@ -1288,13 +1288,17 @@ z_result_t z_querier_get(const z_loaned_querier_t *querier, const char *paramete
_z_keyexpr_t querier_keyexpr = _z_keyexpr_alias_from_user_defined(querier->_key, true);

_z_session_t *session = NULL;
#if Z_FEATURE_SESSION_CHECK == 1
// Try to upgrade session rc
_z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&querier->_zn);
if (!_Z_RC_IS_NULL(&sess_rc)) {
session = _Z_RC_IN_VAL(&sess_rc);
} else {
ret = _Z_ERR_SESSION_CLOSED;
}
#else
session = _Z_RC_IN_VAL(&pub->_zn);
#endif

z_consolidation_mode_t consolidation_mode = querier->_consolidation_mode;
if (consolidation_mode == Z_CONSOLIDATION_MODE_AUTO) {
Expand All @@ -1318,7 +1322,9 @@ z_result_t z_querier_get(const z_loaned_querier_t *querier, const char *paramete
ret = _Z_ERR_SESSION_CLOSED;
}

#if Z_FEATURE_SESSION_CHECK == 1
_z_session_rc_drop(&sess_rc);
#endif

// Clean-up
z_bytes_drop(opt.payload);
Expand Down

0 comments on commit d8a84b8

Please sign in to comment.