Skip to content

Commit

Permalink
merged multiclient, client timeout prevents sending to nowhere scenario
Browse files Browse the repository at this point in the history
  • Loading branch information
stephen-bialkowski committed May 31, 2024
2 parents d6d061f + 890c973 commit 4ef2041
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 100 deletions.
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ apt-get update && apt-get install ncat && ncat -vvvvvv --broker --listen -p 7777

Back in the first server container terminal. Consider using faster read timeouts with the -w <milliseconds> flag.
```
race-cli --dir /server-kits --server-bootstrap-connect --recv-channel=twoSixDirectCpp --send-channel=twoSixDirectCpp --final-send-channel=twoSixDirectCpp --final-recv-channel=twoSixDirectCpp --param hostname="10.11.1.2" --param PluginCommsTwoSixStub.startPort=26262 --param PluginCommsTwoSixStub.endPort=26269 --debug | tee rrlog | grep ERROR
ncat --broker --listen -p 7777 -vv --chat
race-cli --dir /server-kits --server-bootstrap-connect --recv-channel=twoSixDirectCpp --send-channel=twoSixDirectCpp --final-send-channel=twoSixDirectCpp --final-recv-channel=twoSixDirectCpp --param hostname="10.11.1.2" --param PluginCommsTwoSixStub.startPort=26262 --param PluginCommsTwoSixStub.endPort=26269 --debug | tee rrlog | grep SOCKET
```

Start another server terminal, and start a `nc` session to read/write from stdio.
Expand All @@ -148,7 +150,7 @@ docker run --rm -it --name=rbclient --network=rib-overlay-network --ip=10.11.1.3
-v $(pwd)/scripts/:/scripts/ \
raceboat:latest bash

race-cli --dir /client-kits --client-bootstrap-connect --send-channel=twoSixDirectCpp --send-address="{\"hostname\":\"10.11.1.2\",\"port\":26262}" --recv-channel=twoSixDirectCpp --final-send-channel=twoSixDirectCpp --final-recv-channel=twoSixDirectCpp --param hostname="10.11.1.3" --param PluginCommsTwoSixStub.startPort=26262 --param PluginCommsTwoSixStub.endPort=26265 --param localPort=9999 --debug | tee srlog | grep ERROR &
race-cli --dir /client-kits --client-bootstrap-connect --send-channel=twoSixDirectCpp --send-address="{\"hostname\":\"10.11.1.2\",\"port\":26262}" --recv-channel=twoSixDirectCpp --final-send-channel=twoSixDirectCpp --final-recv-channel=twoSixDirectCpp --param hostname="10.11.1.3" --param PluginCommsTwoSixStub.startPort=26262 --param PluginCommsTwoSixStub.endPort=26265 --param localPort=9999 --debug | tee srlog | grep ERROR

nc localhost 9999

Expand All @@ -164,7 +166,7 @@ docker run --rm -it --name=rbclient2 --network=rib-overlay-network --ip=10.11.1.
-v $(pwd)/scripts/:/scripts/ \
raceboat:latest bash

race-cli --dir /client-kits --client-bootstrap-connect --send-channel=twoSixDirectCpp --send-address="{\"hostname\":\"10.11.1.2\",\"port\":26262}" --recv-channel=twoSixDirectCpp --final-send-channel=twoSixDirectCpp --final-recv-channel=twoSixDirectCpp --param hostname="10.11.1.4" --param PluginCommsTwoSixStub.startPort=26266 --param PluginCommsTwoSixStub.endPort=26269 --param localPort=9998 --debug | tee srlog | grep ERROR &
race-cli --dir /client-kits --client-bootstrap-connect --send-channel=twoSixDirectCpp --send-address="{\"hostname\":\"10.11.1.2\",\"port\":26262}" --recv-channel=twoSixDirectCpp --final-send-channel=twoSixDirectCpp --final-recv-channel=twoSixDirectCpp --param hostname="10.11.1.4" --param PluginCommsTwoSixStub.startPort=26266 --param PluginCommsTwoSixStub.endPort=26269 --param localPort=9998 --debug | tee srlog | grep ERROR

nc localhost 9998
```
Expand Down
233 changes: 140 additions & 93 deletions app/race-cli/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -730,69 +730,111 @@ int create_client_connection(std::string &hostaddr, uint16_t port) {
return -1;
}

printf("new client socket family:%d, type:%d, protocol:%d\n", res->ai_family, res->ai_socktype, res->ai_protocol);
printf("SOCKET new client socket family:%d, type:%d, protocol:%d\n", res->ai_family, res->ai_socktype, res->ai_protocol);
client_sock = ::socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if (client_sock >= 0 &&
::connect(client_sock, res->ai_addr, res->ai_addrlen) < 0) {
printf("connect() failed\n");
printf("SOCKET connect() failed\n");
close_socket(client_sock);
client_sock = -1;
}

if (res != nullptr) {
::freeaddrinfo(res);
}
printf("connected socket %d\n", client_sock);
printf("SOCKET connected socket %d\n", client_sock);
return client_sock;
}

void forward_local_to_conduit(int local_sock, Conduit &conduit) {
int await_socket_input(const int socket_fd, int timeout_ms) {
pollfd poll_fd;
memset(&poll_fd, 0, sizeof(poll_fd));
poll_fd.fd = socket_fd;
poll_fd.events = POLLIN;

int poll_result = ::poll(&poll_fd, 1, timeout_ms);

if (poll_result < 0) {
perror("poll() error");
} else if (poll_result == 0) {
printf("poll timed out\n");
} else if (poll_fd.revents != POLLIN) {
printf("unexpected poll event %d\n", poll_fd.revents);
}

if (poll_result > 1) {
// this "shouldn't" happen, but here for visibility
printf("poll returned %d\n", poll_result);
}
return poll_result;
}

// struct ForwardRecord {
// int local_sock;
// Conduit &conduit;
// long timeoutSeconds;
// std::atomic<long> currentConduitTimeoutSinceEpoch; // shared amongst threads
// bool closeSocketsUponExit; // relinquish ownership / close upon exit of forward_*() functions
// };

void forward_local_to_conduit(int local_sock, Conduit &conduit, const int timeoutSeconds) {
local_sock = dup(local_sock);
std::vector<uint8_t> buffer(BUF_SIZE);
ssize_t received_bytes = ::recv(local_sock, buffer.data(), BUF_SIZE, 0);
Raceboat::OpHandle handle = conduit.getHandle();
printf("local_to_conduit with socket fd %d, with conduit handle %lu\n", local_sock, handle);
// TODO -- client (only) local read timeout, if timeout occurs, close local socket and conduit
// 2 cases to fail, user closes local app, or server times out connection
// TODO -- make sure timeout flag supports forever case
printf("local_to_conduit with socket fd %d, and %d second timeout\n", local_sock, timeoutSeconds);
int poll_status = 1;

// TODO this needs to consider the remaining conduit_to_local timeout duration to timeout about the same time
if(timeoutSeconds != Conduit::BLOCKING_READ) {
poll_status = await_socket_input(local_sock, timeoutSeconds*1000);
}

while (poll_status > 0) {
ssize_t received_bytes = ::recv(local_sock, buffer.data(), BUF_SIZE, 0);

while (received_bytes > 0) { // read data from input socket
std::vector<uint8_t> result(buffer.begin(),
if (received_bytes > 0) {
std::vector<uint8_t> result(buffer.begin(),
buffer.begin() + received_bytes);
printf("Relaying data %s from local socket to conduit -- %lu\n",
std::string(buffer.begin(), buffer.begin() + received_bytes).c_str(), conduit.getHandle());
auto status = conduit.write(result); // send data to output socket
if (status != ApiStatus::OK) {
printf("conduit write failed with status: %i on socket\n", status);
printf("Relaying data %s from local socket to conduit -- %lu\n",
std::string(buffer.begin(), buffer.begin() + received_bytes).c_str(), conduit.getHandle());

auto status = conduit.write(result); // send data to output socket
if (status != ApiStatus::OK) {
printf("conduit write failed with status: %d on socket\n", status);
break;
}
} else if (received_bytes < 0) {
// EWOULDBLOCK/EAGAIN "shouldn't" occur when poll() indicates received data available or when blocking forever
char buf[64];
snprintf(buf, sizeof(buf) -1, "recv() failed on fd %d", local_sock);
perror(buf);
break;
} else { // 0 - indicates graceful disconnect
printf("remote socket disconnected\n");
break;
}

if (handle != conduit.getHandle()) {
printf(" HANDLE CHANGED! Was %lu, now %lu\n", handle, conduit.getHandle());
if(timeoutSeconds != Conduit::BLOCKING_READ) {
poll_status = await_socket_input(local_sock, timeoutSeconds*1000);
}

received_bytes = ::recv(local_sock, buffer.data(), BUF_SIZE, 0);
}

if (received_bytes < 0) {
char buf[64];
snprintf(buf, sizeof(buf) -1, "recv() failed on fd %d", local_sock);
perror(buf);
} else { // 0 - indicates graceful disconnect
printf("remote socket disconnected\n");
}
conduit.close(); // close the conduit, so the forward_conduit_to_local thread will stop blocking and return
printf("Exiting local_to_conduit loop\n");
}

void forward_conduit_to_local(Conduit &conduit, int local_sock, const long timeoutSeconds) {
void forward_conduit_to_local(Conduit &conduit, int local_sock, const int timeoutSeconds) {
local_sock = dup(local_sock);
printf("conduit_to_local with socket fd %d, with conduit handle %lu, and timeout %ld\n", local_sock, conduit.getHandle(), timeoutSeconds);
printf("conduit_to_local with socket fd %d, with %d second timeout\n", local_sock, timeoutSeconds);
ssize_t send_status;

while (true) {
auto [status, buffer] = conduit.read(timeoutSeconds);

if (status == ApiStatus::TIMEOUT) {
printf("conduit read timed out with status: %i\n", status);
} else if (status != ApiStatus::OK) {
printf("conduit read failed with status: %i\n", status);
if (status != ApiStatus::OK) {
printf("conduit read failed with status: %s\n", apiStatusToString(status).c_str());
break;
} else {
printf("Relaying data %s from conduit to local socket\n",
Expand All @@ -809,13 +851,15 @@ void forward_conduit_to_local(Conduit &conduit, int local_sock, const long timeo
}
}
}

::close(local_sock); // close the socket so the forward_local_to_conduit thread will stop blocking
printf("Exiting conduit_to_local loop\n");
}

void relay_data_loop(int client_sock, Raceboat::Conduit &conduit, long timeoutSeconds, bool blocking) {
printf("relay_data_loop socket: %d with race read timeout %ld\n", client_sock, timeoutSeconds);
std::thread local_to_conduit_thread([client_sock, &conduit]() {
forward_local_to_conduit(client_sock, conduit);
void relay_data_loop(const int client_sock, Raceboat::Conduit conduit, const int timeoutSeconds, const bool blocking) {
printf("relay_data_loop socket: %d with race read timeout %d seconds\n", client_sock, timeoutSeconds);
std::thread local_to_conduit_thread([client_sock, &conduit, timeoutSeconds]() {
forward_local_to_conduit(client_sock, conduit, timeoutSeconds);
});

std::thread conduit_to_local_thread([client_sock, &conduit, timeoutSeconds]() {
Expand All @@ -837,29 +881,25 @@ void client_connection_loop(int server_sock,
memset(&poll_fd, 0, sizeof(poll_fd));
poll_fd.fd = server_sock;
poll_fd.events = POLLIN;
int timeout = (5 * 60 * 1000); // 5 minute timeout default
int timeoutSeconds = 300; // 5 minute timeout default
int timeout_ms = (timeoutSeconds * 1000);
int poll_result;

if (conn_opt.timeout_ms > 0) {
timeout = conn_opt.timeout_ms;
if(conn_opt.timeout_seconds > 0) {
timeoutSeconds = conn_opt.timeout_seconds;
timeout_ms = timeoutSeconds * 1000;
} else if (conn_opt.timeout_seconds == -1) {
timeoutSeconds = Conduit::BLOCKING_READ;
timeout_ms = -1;
}

// allow re-connect, but only 1 active connection


// allow re-connect, but only 1 active connection due to blocking IO threads
do {
poll_result = ::poll(&poll_fd, 1, timeout);
if (poll_result < 0) {
perror("poll() error");
} else if (poll_result == 0) {
printf("client loop timed out\n");
} else if (poll_fd.revents != POLLIN) {
printf("unexpected poll event %d. Exiting...\n", poll_fd.revents);
}

poll_result = await_socket_input(server_sock, timeout_ms);

if (poll_result > 0) {
if (poll_result > 1) {
// this "shouldn't" happen, but here for visibility
printf("poll returned %d\n", poll_result);
}
printf("accept()ing client socket\n");
int client_sock = ::accept(server_sock, NULL, 0);
printf("accepted socket %d\n", client_sock);
Expand All @@ -874,9 +914,9 @@ void client_connection_loop(int server_sock,
} else {
printf("dial success\n");
// block so accept() isn't called until after socket error
relay_data_loop(client_sock, connection, Conduit::BLOCKING_READ, /* blocking threads */ true);
connection.close();
close_socket(client_sock);
relay_data_loop(client_sock, connection, timeoutSeconds, /* blocking threads */ true);
// connection.close();
// ::close_socket(client_sock);
}
}
} else if (poll_result == 0) {
Expand Down Expand Up @@ -927,6 +967,7 @@ int handle_client_bootstrap_connect(const CmdOptions &opts) {
conn_opt.final_send_role = opts.final_send_role;
conn_opt.final_recv_channel = opts.final_recv_channel;
conn_opt.final_recv_role = opts.final_recv_role;
conn_opt.timeout_ms = opts.timeout_ms;

int local_port = 9999;
check_for_local_port_override(opts, local_port);
Expand All @@ -947,12 +988,17 @@ int handle_client_bootstrap_connect(const CmdOptions &opts) {
return 0;
}

ApiStatus server_connections_loop(Race &race, BootstrapConnectionOptions &conn_opt, int client_sock) {

ApiStatus server_connections_loop(Race &race, BootstrapConnectionOptions &conn_opt, int local_port) {
// establish a <local-app-socket, conduit> connection pair to relay data to and from each other
// close both upon timeout
ApiStatus status = ApiStatus::OK;

long timeoutSeconds = 300; // 5 minute read timeout default
if(conn_opt.timeout_ms > 0) {
timeoutSeconds = conn_opt.timeout_ms / 1000;
int timeoutSeconds = 300; // 5 minute read timeout default
if(conn_opt.timeout_seconds > 0) {
timeoutSeconds = conn_opt.timeout_seconds;
} else if (conn_opt.timeout_seconds == -1) {
timeoutSeconds = Conduit::BLOCKING_READ;
}

printf("CREATING RACE SERVER SOCKET\n");
Expand All @@ -962,9 +1008,10 @@ ApiStatus server_connections_loop(Race &race, BootstrapConnectionOptions &conn_o
printf("listen failed with status: %i\n", status1);
return status1;
}
printf("\nlistening on link address: '%s'", link_addr.c_str());
printf("\nlistening on link address: '%s'\n", link_addr.c_str());

std::unordered_map<OpHandle, Raceboat::Conduit> connections;
std::string host = "localhost";
// std::unordered_map<OpHandle, Raceboat::Conduit> connections;
while (1) {
printf("server calling accept\n");
auto [status2, connection] = listener.accept();
Expand All @@ -973,26 +1020,41 @@ ApiStatus server_connections_loop(Race &race, BootstrapConnectionOptions &conn_o
status = status2;
break;
}
printf("accept success\n");
connections[connection.getHandle()] = connection;
relay_data_loop(client_sock, connections[connection.getHandle()], timeoutSeconds, false);

for (auto handleConnPair: connections) {
void* ptr = &handleConnPair.second;
printf(" -- %lu:%lu - %p\n", handleConnPair.first, handleConnPair.second.getHandle(), ptr);
if (handleConnPair.first != handleConnPair.second.getHandle()) {
printf("CONDUIT HANDLE CHANGED! Was %lu, now %lu\n", handleConnPair.first, handleConnPair.second.getHandle());

printf("conduit accept success\n");
printf("AWAITING LOCAL CLIENT\n");

// create client connection for listening socket to connect on
// assume listening local app process is or will be running
int client_sock = -1;
while (client_sock < 0) {
if ((client_sock = create_client_connection(host, local_port)) < 0) {
printf("Awaiting listening socket \n");
sleep(5);
}
}

// connections[connection.getHandle()] = connection;
printf("SOCKET client_sock: %d\n", client_sock);
// TODO - relinquish ownership of socket / conduit to relay_data_loop
relay_data_loop(client_sock, connection, timeoutSeconds, false);

// for (auto handleConnPair: connections) {
// void* ptr = &handleConnPair.second;
// printf(" -- %lu:%lu - %p\n", handleConnPair.first, handleConnPair.second.getHandle(), ptr);
// if (handleConnPair.first != handleConnPair.second.getHandle()) {
// printf("CONDUIT HANDLE CHANGED! Was %lu, now %lu\n", handleConnPair.first, handleConnPair.second.getHandle());
// }
// }
}

printf("closing race sockets\n");
for (auto conn: connections) {
auto close_status = conn.second.close();
if (ApiStatus::OK != close_status) {
printf("close failed with status: %i\n", close_status);
}
}
// for (auto conn: connections) {
// auto close_status = conn.second.close();
// if (ApiStatus::OK != close_status) {
// printf("close failed with status: %i\n", close_status);
// }
// }
return status;
}

Expand All @@ -1017,28 +1079,13 @@ int handle_server_bootstrap_connect(const CmdOptions &opts) {
conn_opt.final_recv_role = opts.final_recv_role;
conn_opt.final_send_channel = opts.final_send_channel;
conn_opt.final_send_role = opts.final_send_role;
conn_opt.timeout_ms = opts.timeout_ms;
conn_opt.timeout_seconds = opts.timeout_ms / 1000;

printf("handle_server_bootstrap_connect\n");

int client_sock = -1;
std::string host = "localhost";
int local_port = 7777;
check_for_local_port_override(opts, local_port);

printf("AWAITING LOCAL CLIENT\n");

// create client connection for listening socket to connect on
// assume listening process is or will be running
while (client_sock < 0) {
if ((client_sock = create_client_connection(host, local_port)) < 0) {
printf("Awaiting listening socket \n");
sleep(5);
}
}

ApiStatus status = server_connections_loop(race, conn_opt, client_sock);
close_socket(client_sock);
ApiStatus status = server_connections_loop(race, conn_opt, local_port);

return (status == ApiStatus::OK);
}
Expand Down
Loading

0 comments on commit 4ef2041

Please sign in to comment.