Skip to content

Commit

Permalink
Add result code to response messages (#515)
Browse files Browse the repository at this point in the history
* Add result code to response messages

In some cases, the response is marked as accepted even if there was an
error. Therefore, including the accepted tag in the response message is
not enough: a follower may believe that an auto forwarded request was
successful when it wasn't.

Signed-off-by: Alex Michon <[email protected]>

* [Update PR] Add `int32_t` to `cmd_result_code`

---------

Signed-off-by: Alex Michon <[email protected]>
Co-authored-by: Jung-Sang Ahn <[email protected]>
  • Loading branch information
amichon-kalray and greensky00 authored Jun 21, 2024
1 parent 8731295 commit 52d9236
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 4 deletions.
2 changes: 1 addition & 1 deletion include/libnuraft/async.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ limitations under the License.

namespace nuraft {

enum cmd_result_code {
enum cmd_result_code : int32_t {
OK = 0,
CANCELLED = -1,
TIMEOUT = -2,
Expand Down
35 changes: 32 additions & 3 deletions src/asio_service.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ limitations under the License.
// If set, each log entry will contain a CRC on the payload.
#define CRC_ON_PAYLOAD (0x10)

// If set, RPC message (response) includes result code
#define INCLUDE_RESULT_CODE (0x20)

// =======================

namespace nuraft {
Expand Down Expand Up @@ -735,6 +738,7 @@ class rpc_session
try {
ptr<buffer> resp_ctx = resp->get_ctx();
int32 resp_ctx_size = (resp_ctx) ? resp_ctx->size() : 0;
int32 result_code_size = sizeof(int32_t);

uint32_t flags = 0x0;
size_t resp_meta_size = 0;
Expand All @@ -759,6 +763,13 @@ class rpc_session

size_t carried_data_size = resp_meta_size + resp_hint_size + resp_ctx_size;

if (req->get_type() == msg_type::client_request ||
req->get_type() == msg_type::add_server_request ||
req->get_type() == msg_type::remove_server_request) {
flags |= INCLUDE_RESULT_CODE;
carried_data_size += result_code_size;
}

int buf_size = RPC_RESP_HEADER_SIZE + carried_data_size;
ptr<buffer> resp_buf = buffer::alloc(buf_size);
buffer_serializer bs(resp_buf);
Expand Down Expand Up @@ -798,6 +809,11 @@ class rpc_session
bs.put_buffer(*resp_ctx);
}

/* Put result code at the end to avoid breaking backward compatibility */
if (flags & INCLUDE_RESULT_CODE) {
bs.put_i32(resp->get_result_code());
}

aa::write( ssl_enabled_, ssl_socket_, socket_,
asio::buffer(resp_buf->data_begin(), resp_buf->size()),
[this, self, resp_buf]
Expand Down Expand Up @@ -1688,8 +1704,9 @@ class asio_rpc_client
size_t bytes_transferred)
{
if ( !(flags & INCLUDE_META) &&
!(flags & INCLUDE_HINT) ) {
// Neither meta nor hint exists,
!(flags & INCLUDE_HINT) &&
!(flags & INCLUDE_RESULT_CODE)) {
// Neither meta nor hint nor result code exists,
// just use the buffer as it is for ctx.
ctx_buf->pos(0);
rsp->set_ctx(ctx_buf);
Expand Down Expand Up @@ -1739,9 +1756,21 @@ class asio_rpc_client
assert(remaining_len >= 0);
if (remaining_len) {
// It has context, read it.
ptr<buffer> actual_ctx = buffer::alloc(remaining_len);
size_t ctx_len = remaining_len;
if (flags & INCLUDE_RESULT_CODE) {
ctx_len -= sizeof(int32_t);
}
ptr<buffer> actual_ctx = buffer::alloc(ctx_len);
bs.get_buffer(actual_ctx);
rsp->set_ctx(actual_ctx);
remaining_len -= ctx_len;
}

// 4) Result code
if (flags & INCLUDE_RESULT_CODE) {
assert((size_t)remaining_len >= sizeof(int32_t));
cmd_result_code res = static_cast<cmd_result_code>(bs.get_i32());
rsp->set_result_code(res);
}

operation_timer_.cancel();
Expand Down

0 comments on commit 52d9236

Please sign in to comment.