Skip to content

Commit

Permalink
add a mark is_destroy for fwdConn and stop self if is_destroy is true
Browse files Browse the repository at this point in the history
  • Loading branch information
lzydmxy committed Dec 11, 2023
1 parent a252c97 commit 0e7a657
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 8 deletions.
2 changes: 2 additions & 0 deletions src/Service/ForwardResponse.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ std::string toString(ForwardType type)
return "UpdateSession";
case ForwardType::Operation:
return "Operation";
case ForwardType::Destroy:
return "Destroy";
default:
break;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Service/ForwardResponse.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ enum ForwardType : int8_t
GetSession = 3, /// get session id
UpdateSession = 4, /// session reconnect
Operation = 5, /// all write requests after the connection is established
Destroy = 6,
Destroy = 6, /// Only used in server side to indicate that the connection is stale and server should close it
};

std::string toString(ForwardType type);
Expand Down
14 changes: 10 additions & 4 deletions src/Service/ForwardingConnectionHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,13 @@ void ForwardingConnectionHandler::onSocketWritable(const AutoPtr<WritableNotific

try
{
if (is_destroy)
{
LOG_INFO(log, "Get destroy response!");
delete this;
return;
}

if (responses->empty() && send_buf.used() == 0)
{
remove_event_handler_if_needed();
Expand Down Expand Up @@ -371,14 +378,13 @@ void ForwardingConnectionHandler::onSocketError(const AutoPtr<ErrorNotification>

void ForwardingConnectionHandler::sendResponse(ForwardResponsePtr response)
{
LOG_TRACE(log, "Send response {}", response->toString());

if (response->forwardType() == ForwardType::Destroy)
{
delete this;
return;
is_destroy = true;
}

LOG_TRACE(log, "Send response {}", response->toString());

WriteBufferFromFiFoBuffer buf;
response->write(buf);

Expand Down
3 changes: 3 additions & 0 deletions src/Service/ForwardingConnectionHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ class ForwardingConnectionHandler
void processHandshake();
void processRaftRequest(ForwardRequestPtr request);
void processSessions(ForwardRequestPtr request);

// mark is stop by dispatcher
bool is_destroy;
};

}
6 changes: 3 additions & 3 deletions src/Service/KeeperDispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,11 +339,11 @@ void KeeperDispatcher::registerForward(ForwardingClientId client_id, ForwardResp

if (forward_to_response_callback.contains(client_id))
{
LOG_INFO(log, "Get server_id {}, client_id {} new registered, will destroy older one", client_id.first, client_id.second);
auto call_back = forward_to_response_callback[client_id];
forward_to_response_callback.erase(client_id);
LOG_WARNING(log, "Receive new forwarding connection from server_id {}, client_id {}, will destroy the older one", client_id.first, client_id.second);
auto & call_back = forward_to_response_callback[client_id];
auto response = std::make_shared<ForwardDestryResponse>();
call_back(response);
forward_to_response_callback.erase(client_id);
}

forward_to_response_callback.emplace(client_id, callback);
Expand Down

0 comments on commit 0e7a657

Please sign in to comment.