Skip to content

Commit

Permalink
cleanup, client-side 5m timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
stephen-bialkowski committed May 16, 2024
1 parent 3834da1 commit 9a7461a
Showing 1 changed file with 115 additions and 71 deletions.
186 changes: 115 additions & 71 deletions app/race-cli/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@
#include <libgen.h>
#include <netdb.h>
#include <netinet/in.h>
#include <poll.h>
#include <resolv.h>
#include <signal.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <syslog.h>
Expand Down Expand Up @@ -640,48 +642,57 @@ int handle_server_connect(const CmdOptions &opts) {
/* Forward data between sockets */
void forward_local_to_conduit(int local_sock, Conduit conduit) {
printf("In local_to_conduit\n");
ssize_t n;

std::vector<uint8_t> buffer(BUF_SIZE);
ssize_t received_bytes = ::recv(local_sock, buffer.data(), BUF_SIZE, 0);

// local_sock is a listen() socket fd on client end (netcat)
while ((n = ::recv(local_sock, buffer.data(), BUF_SIZE, 0)) >
0) { // read data from input socket
while (received_bytes > 0) { // read data from input socket
// printf("Relaying data \"%s\"from local socket to conduit\n",
// std::string(buffer.begin(), buffer.end()).c_str());
std::vector<uint8_t> result(buffer.begin(), buffer.begin() + n);
std::vector<uint8_t> result(buffer.begin(),
buffer.begin() + received_bytes);
auto status = conduit.write(result); // send data to output socket
if (status != ApiStatus::OK) {
printf("write failed with status: %i\n", status);
printf("conduit write failed with status: %i\n", status);
break;
}
received_bytes = ::recv(local_sock, buffer.data(), BUF_SIZE, 0);
}
printf("Exited local_to_conduit loop\n");

if (n < 0) {
printf("read error");
exit(1);
if (received_bytes < 0) {
perror("read error\n");
} else { // 0 - indicates graceful disconnect
printf("client socket disconnected\n");
}
}

void forward_conduit_to_local(Conduit conduit, int local_sock) {
printf("In conduit_to_local\n");
ssize_t send_status;

while (true) {
auto [status, buffer] = conduit.read();
if (status != ApiStatus::OK) {
printf("read failed with status: %i\n", status);
printf("conduit read failed with status: %i\n", status);
break;
}
// printf("Relaying data \"%s\" from conduit to local socket\n",
// std::string(buffer.begin(), buffer.end()).c_str());
::send(local_sock, buffer.data(), buffer.size(), 0);
send_status = ::send(local_sock, buffer.data(), buffer.size(), 0);
if (send_status < 0) {
perror("send() failed\n");
break;
} else if (send_status < static_cast<ssize_t>(buffer.size())) {
// this shouldn't happen, but visibility provided just in case
printf("WARNING: sent %zd of %lu bytes\n", send_status, buffer.size());
}
}
printf("Exited conduit_to_local loop\n");
}

int create_server_socket(int port) {
int local_sock, optval = 1;
// int validfamily=0;
int create_listening_socket(int port) {
int listening_sock, optval = 1;
char on = 1;
struct addrinfo hints, *res = NULL;
char portstr[12];

Expand All @@ -696,41 +707,49 @@ int create_server_socket(int port) {

// Try to resolve address if bind_address is a hostname
if (::getaddrinfo(bind_addr.c_str(), portstr, &hints, &res) != 0) {
printf("getadddrinfo failed\n");
printf("getadddrinfo() failed\n");
return -1;
}

local_sock = ::socket(res->ai_family, res->ai_socktype, res->ai_protocol);
listening_sock = ::socket(res->ai_family, res->ai_socktype, res->ai_protocol);

if (local_sock >= 0 && ::setsockopt(local_sock, SOL_SOCKET, SO_REUSEADDR,
&optval, sizeof(optval)) < 0) {
printf("setsockopt failed\n");
::close(local_sock);
local_sock = -1;
if (listening_sock >= 0 &&
::setsockopt(listening_sock, SOL_SOCKET, SO_REUSEADDR, &optval,
sizeof(optval)) < 0) {
printf("setsockopt() failed\n");
::close(listening_sock);
listening_sock = -1;
}

if (local_sock >= 0 &&
::bind(local_sock, res->ai_addr, res->ai_addrlen) == -1) {
printf("bind failed\n");
::close(local_sock);
local_sock = -1;
// make non-blocking so ::poll() can timeout
if (listening_sock >= 0 && ::ioctl(listening_sock, FIONBIO, &on) < 0) {
perror("ioctl() failed to make socket non-blocking\n");
::close(listening_sock);
listening_sock = -1;
}

if (local_sock >= 0 && ::listen(local_sock, 20) < 0) {
printf("listen failed\n");
::close(local_sock);
local_sock = -1;
if (listening_sock >= 0 &&
::bind(listening_sock, res->ai_addr, res->ai_addrlen) == -1) {
printf("bind() failed\n");
::close(listening_sock);
listening_sock = -1;
}

if (listening_sock >= 0 && ::listen(listening_sock, 20) < 0) {
printf("listen() failed\n");
::close(listening_sock);
listening_sock = -1;
}

if (res != nullptr) {
::freeaddrinfo(res);
}
return local_sock;
return listening_sock;
}

int create_client_connection(std::string &hostaddr, uint16_t port) {
struct addrinfo hints, *res = NULL;
int sock;
int client_sock;
char portstr[12];

memset(&hints, 0x00, sizeof(hints));
Expand All @@ -742,21 +761,22 @@ int create_client_connection(std::string &hostaddr, uint16_t port) {

// Try to resolve address if remote_host is a hostname
if (::getaddrinfo(hostaddr.c_str(), portstr, &hints, &res) != 0) {
printf("getadddrinfo failed\n");
printf("getadddrinfo() failed\n");
return -1;
}

sock = ::socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if (sock >= 0 && ::connect(sock, res->ai_addr, res->ai_addrlen) < 0) {
printf("connect failed\n");
::close(sock);
sock = -1;
client_sock = ::socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if (client_sock >= 0 &&
::connect(client_sock, res->ai_addr, res->ai_addrlen) < 0) {
printf("connect() failed\n");
::close(client_sock);
client_sock = -1;
}

if (res != nullptr) {
::freeaddrinfo(res);
}
return sock;
return client_sock;
}

void relay_data_loop(int client_sock, Raceboat::Conduit &connection) {
Expand All @@ -773,11 +793,10 @@ void relay_data_loop(int client_sock, Raceboat::Conduit &connection) {
}

void handle_client(int client_sock, const BootstrapConnectionOptions &conn_opt,
Race &race, const std::string &introductionMsg) {
Race &race) {

printf("calling bootstrap_dial_str\n");
auto [status, connection] =
race.bootstrap_dial_str(conn_opt, introductionMsg);
auto [status, connection] = race.bootstrap_dial_str(conn_opt, "");
if (status != ApiStatus::OK) {
printf("dial failed with status: %i\n", status);
connection.close();
Expand All @@ -789,22 +808,48 @@ void handle_client(int client_sock, const BootstrapConnectionOptions &conn_opt,
connection.close();
}

void client_connection_loop(int local_sock,
void client_connection_loop(int server_sock,
const BootstrapConnectionOptions &conn_opt,
Race &race, const std::string &introductionMsg) {
Race &race) {
int client_sock;

// TODO @Paul - is there a reason to accept() over and over (e.g. support
// multiple localhost connections)? while (true) {
client_sock = accept(local_sock, NULL, 0);
printf("client accept returned %i\n", client_sock);
// close(local_sock);
handle_client(client_sock, conn_opt, race, introductionMsg);

printf("closing local socket\n");
::shutdown(client_sock, SHUT_RDWR); // prevent further socket IO
::close(client_sock);
// }
pollfd poll_fd;
memset(&poll_fd, 0, sizeof(poll_fd));
poll_fd.fd = server_sock;
poll_fd.events = POLLIN;
int timeout = (5 * 60 * 1000); // 5 minute timeout
int poll_result;

// loop allows re-connects, but only allows 1 active connection
do {
poll_result = ::poll(&poll_fd, 1, timeout);
if (poll_result < 0) {
perror("poll() error\n");
} else if (poll_result == 0) {
printf("client loop timed out\n");
} else if (poll_fd.revents != POLLIN) {
printf("unexpected poll event %d. Exiting...\n", poll_fd.revents);
}
if (poll_result > 0) {
if (poll_result > 1) {
printf("\n"); // this shouldn't happen
}
printf("accept()ing client socket\n");
client_sock = accept(server_sock, NULL, 0);
if (client_sock < 0) {
perror("accept() error\n");
} else {
handle_client(client_sock, conn_opt, race);

printf("closing client socket\n");
::shutdown(client_sock, SHUT_RDWR); // prevent further socket IO
::close(client_sock);
}
} else if (poll_result == 0) {
printf("socket timeout\n");
} else {
perror("socket poll error\n");
}
} while (poll_result > 0);
}

int handle_client_bootstrap_connect(const CmdOptions &opts) {
Expand Down Expand Up @@ -835,21 +880,20 @@ int handle_client_bootstrap_connect(const CmdOptions &opts) {
conn_opt.final_recv_channel = opts.final_recv_channel;
conn_opt.final_recv_role = opts.final_recv_role;

std::string introductionMsg = "hello";

int local_port = 9999;
int local_sock;
int server_sock;
printf("CREATING LOCAL SOCKET\n");
if ((local_sock = create_server_socket(local_port)) < 0) { // start server
// start server for client app to connect to
if ((server_sock = create_listening_socket(local_port)) < 0) {
printf("Failed to create local socket\n");
return -1;
}

client_connection_loop(local_sock, conn_opt, race, introductionMsg);
client_connection_loop(server_sock, conn_opt, race);

printf("closing local socket\n");
::shutdown(local_sock, SHUT_RDWR); // prevent further socket IO
::close(local_sock);
::shutdown(server_sock, SHUT_RDWR); // prevent further socket IO
::close(server_sock);

return 0;
}
Expand Down Expand Up @@ -878,13 +922,13 @@ int handle_server_bootstrap_connect(const CmdOptions &opts) {

// assume listening process is or will be running
uint16_t local_port = 7777;
int local_sock = -1;
int client_sock = -1;
std::string host = "localhost";
printf("AWAITING LOCAL CLIENT\n");
while (local_sock < 0) {
if ((local_sock = create_client_connection(host, local_port)) <
while (client_sock < 0) {
if ((client_sock = create_client_connection(host, local_port)) <
0) { // start server
printf("Awaiting 'listening client' \n");
printf("Awaiting 'listening socket' \n");
sleep(5);
}
}
Expand All @@ -910,10 +954,10 @@ int handle_server_bootstrap_connect(const CmdOptions &opts) {
}
printf("accept success\n");

relay_data_loop(local_sock, connection);
relay_data_loop(client_sock, connection);
printf("closing local socket\n");
::shutdown(local_sock, SHUT_RDWR); // stop other processes from using socket
::close(local_sock);
::shutdown(client_sock, SHUT_RDWR); // stop other processes from using socket
::close(client_sock);

printf("closing connection\n");
auto status6 = connection.close();
Expand Down

0 comments on commit 9a7461a

Please sign in to comment.