Skip to content

Commit

Permalink
cleanup, partial support for multiple server race connections
Browse files Browse the repository at this point in the history
  • Loading branch information
stephen-bialkowski committed May 16, 2024
1 parent 9a7461a commit 9fef3ff
Showing 1 changed file with 122 additions and 95 deletions.
217 changes: 122 additions & 95 deletions app/race-cli/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -639,57 +639,17 @@ int handle_server_connect(const CmdOptions &opts) {
return (status == ApiStatus::OK);
}

/* Forward data between sockets */
void forward_local_to_conduit(int local_sock, Conduit conduit) {
printf("In local_to_conduit\n");
std::vector<uint8_t> buffer(BUF_SIZE);
ssize_t received_bytes = ::recv(local_sock, buffer.data(), BUF_SIZE, 0);

while (received_bytes > 0) { // read data from input socket
// printf("Relaying data \"%s\"from local socket to conduit\n",
// std::string(buffer.begin(), buffer.end()).c_str());
std::vector<uint8_t> result(buffer.begin(),
buffer.begin() + received_bytes);
auto status = conduit.write(result); // send data to output socket
if (status != ApiStatus::OK) {
printf("conduit write failed with status: %i\n", status);
break;
}
received_bytes = ::recv(local_sock, buffer.data(), BUF_SIZE, 0);
void close_socket(int socket_fd) {
printf("closing socket %d\n", socket_fd);
if (-1 == ::shutdown(socket_fd, SHUT_RDWR)) { // prevent further socket IO
printf("failed to shutdown() socket %d (%d): %s", socket_fd, errno, strerror(errno));
}
printf("Exited local_to_conduit loop\n");

if (received_bytes < 0) {
perror("read error\n");
} else { // 0 - indicates graceful disconnect
printf("client socket disconnected\n");
if (-1 == ::close(socket_fd)) {
printf("failed to close() socket %d (%d): %s", socket_fd, errno, strerror(errno));
}
}

void forward_conduit_to_local(Conduit conduit, int local_sock) {
printf("In conduit_to_local\n");
ssize_t send_status;

while (true) {
auto [status, buffer] = conduit.read();
if (status != ApiStatus::OK) {
printf("conduit read failed with status: %i\n", status);
break;
}
// printf("Relaying data \"%s\" from conduit to local socket\n",
// std::string(buffer.begin(), buffer.end()).c_str());
send_status = ::send(local_sock, buffer.data(), buffer.size(), 0);
if (send_status < 0) {
perror("send() failed\n");
break;
} else if (send_status < static_cast<ssize_t>(buffer.size())) {
// this shouldn't happen, but visibility provided just in case
printf("WARNING: sent %zd of %lu bytes\n", send_status, buffer.size());
}
}
printf("Exited conduit_to_local loop\n");
}

int create_listening_socket(int port) {
int listening_sock, optval = 1;
char on = 1;
Expand Down Expand Up @@ -717,33 +677,36 @@ int create_listening_socket(int port) {
::setsockopt(listening_sock, SOL_SOCKET, SO_REUSEADDR, &optval,
sizeof(optval)) < 0) {
printf("setsockopt() failed\n");
::close(listening_sock);
close_socket(listening_sock);
listening_sock = -1;
}

// make non-blocking so ::poll() can timeout
if (listening_sock >= 0 && ::ioctl(listening_sock, FIONBIO, &on) < 0) {
perror("ioctl() failed to make socket non-blocking\n");
::close(listening_sock);
perror("ioctl() failed to make socket non-blocking");
close_socket(listening_sock);
listening_sock = -1;
}

if (listening_sock >= 0 &&
::bind(listening_sock, res->ai_addr, res->ai_addrlen) == -1) {
printf("bind() failed\n");
::close(listening_sock);
perror("bind() failed");
close_socket(listening_sock);
listening_sock = -1;
}

if (listening_sock >= 0 && ::listen(listening_sock, 20) < 0) {
printf("listen() failed\n");
::close(listening_sock);
close_socket(listening_sock);
listening_sock = -1;
}

if (res != nullptr) {
::freeaddrinfo(res);
}
if (listening_sock > 0) {
printf("created listening socket %d\n", listening_sock);
}
return listening_sock;
}

Expand All @@ -769,16 +732,67 @@ int create_client_connection(std::string &hostaddr, uint16_t port) {
if (client_sock >= 0 &&
::connect(client_sock, res->ai_addr, res->ai_addrlen) < 0) {
printf("connect() failed\n");
::close(client_sock);
close_socket(client_sock);
client_sock = -1;
}

if (res != nullptr) {
::freeaddrinfo(res);
}
printf("connected socket %d\n", client_sock);
return client_sock;
}

void forward_local_to_conduit(int local_sock, Conduit conduit) {
printf("local_to_conduit\n");
std::vector<uint8_t> buffer(BUF_SIZE);
ssize_t received_bytes = ::recv(local_sock, buffer.data(), BUF_SIZE, 0);

while (received_bytes > 0) { // read data from input socket
printf("Relaying data \"%s\"from local socket to conduit\n",
std::string(buffer.begin(), buffer.end()).c_str());
std::vector<uint8_t> result(buffer.begin(),
buffer.begin() + received_bytes);
auto status = conduit.write(result); // send data to output socket
if (status != ApiStatus::OK) {
printf("conduit write failed with status: %i\n", status);
break;
}
received_bytes = ::recv(local_sock, buffer.data(), BUF_SIZE, 0);
}
printf("Exited local_to_conduit loop\n");

if (received_bytes < 0) {
perror("read error");
} else { // 0 - indicates graceful disconnect
printf("remote socket disconnected\n");
}
}

void forward_conduit_to_local(Conduit conduit, int local_sock) {
printf("conduit_to_local\n");
ssize_t send_status;

while (true) {
auto [status, buffer] = conduit.read();
if (status != ApiStatus::OK) {
printf("conduit read failed with status: %i\n", status);
break;
}
printf("Relaying data \"%s\" from conduit to local socket\n",
std::string(buffer.begin(), buffer.end()).c_str());
send_status = ::send(local_sock, buffer.data(), buffer.size(), 0);
if (send_status < 0) {
perror("send() failed");
break;
} else if (send_status < static_cast<ssize_t>(buffer.size())) {
// this "shouldn't" happen, but visibility provided just in case
printf("WARNING: sent %zd of %lu bytes\n", send_status, buffer.size());
}
}
printf("Exited conduit_to_local loop\n");
}

void relay_data_loop(int client_sock, Raceboat::Conduit &connection) {
std::thread local_to_conduit_thread([&client_sock, &connection]() {
forward_local_to_conduit(client_sock, connection);
Expand Down Expand Up @@ -823,7 +837,7 @@ void client_connection_loop(int server_sock,
do {
poll_result = ::poll(&poll_fd, 1, timeout);
if (poll_result < 0) {
perror("poll() error\n");
perror("poll() error");
} else if (poll_result == 0) {
printf("client loop timed out\n");
} else if (poll_fd.revents != POLLIN) {
Expand All @@ -836,18 +850,17 @@ void client_connection_loop(int server_sock,
printf("accept()ing client socket\n");
client_sock = accept(server_sock, NULL, 0);
if (client_sock < 0) {
perror("accept() error\n");
perror("accept() error");
} else {
handle_client(client_sock, conn_opt, race);

printf("closing client socket\n");
::shutdown(client_sock, SHUT_RDWR); // prevent further socket IO
::close(client_sock);
close_socket(client_sock);
}
} else if (poll_result == 0) {
printf("socket timeout\n");
} else {
perror("socket poll error\n");
perror("socket poll error");
}
} while (poll_result > 0);
}
Expand Down Expand Up @@ -892,12 +905,55 @@ int handle_client_bootstrap_connect(const CmdOptions &opts) {
client_connection_loop(server_sock, conn_opt, race);

printf("closing local socket\n");
::shutdown(server_sock, SHUT_RDWR); // prevent further socket IO
::close(server_sock);
close_socket(server_sock);

return 0;
}

ApiStatus run_server(Race &race,
BootstrapConnectionOptions &conn_opt, int client_sock) {

ApiStatus status = ApiStatus::OK;

printf("CREATING RACE SERVER SOCKET\n");
// listen on race side
auto [status1, link_addr, listener] = race.bootstrap_listen(conn_opt);
if (status1 != ApiStatus::OK) {
printf("listen failed with status: %i\n", status1);
return status1;
}
printf("\nlistening on link address: '%s'", link_addr.c_str());

// while (1) {
// blocking accept() on new thread to accept multiple connections
// std::thread accept_thread([&]() {
// assume the link address is passed in out of band
// start client with this link address

auto [status2, connection] = listener.accept();
if (status2 != ApiStatus::OK) {
printf("accept failed with status: %i\n", status2);
status = status2;
// break;
}
printf("accept success\n");

relay_data_loop(client_sock, connection);
printf("closing local socket\n");

printf("closing connection\n");
auto status3 = connection.close();
if (status3 != ApiStatus::OK) {
printf("close failed with status: %i\n", status3);
status = status3;
// break;
}
// });
// accept_thread.detach();
// }
return status;
}

int handle_server_bootstrap_connect(const CmdOptions &opts) {
// connect to localhost
// listen for race connections
Expand All @@ -920,51 +976,22 @@ int handle_server_bootstrap_connect(const CmdOptions &opts) {
conn_opt.final_send_channel = opts.final_send_channel;
conn_opt.final_send_role = opts.final_send_role;

// assume listening process is or will be running
uint16_t local_port = 7777;
int client_sock = -1;
std::string host = "localhost";
printf("AWAITING LOCAL CLIENT\n");

// create client connection for listening socket to connect on
// assume listening process is or will be running
while (client_sock < 0) {
if ((client_sock = create_client_connection(host, local_port)) <
0) { // start server
if ((client_sock = create_client_connection(host, local_port)) < 0) {
printf("Awaiting 'listening socket' \n");
sleep(5);
}
}

printf("CREATING RACE SERVER SOCKET\n");
// listen on race side
auto [status, link_addr, listener] = race.bootstrap_listen(conn_opt);
if (status != ApiStatus::OK) {
printf("listen failed with status: %i\n", status);
return -1;
}

// assume the link address is passed in out of band
// start client with this link address
printf("\nlistening on link address: '%s'\nbe sure to escape quotes for "
"client\n\n",
link_addr.c_str());

auto [status2, connection] = listener.accept();
if (status2 != ApiStatus::OK) {
printf("accept failed with status: %i\n", status2);
return -2;
}
printf("accept success\n");

relay_data_loop(client_sock, connection);
printf("closing local socket\n");
::shutdown(client_sock, SHUT_RDWR); // stop other processes from using socket
::close(client_sock);

printf("closing connection\n");
auto status6 = connection.close();
if (status6 != ApiStatus::OK) {
printf("close failed with status: %i\n", status6);
status = status6;
}
ApiStatus status = run_server(race, conn_opt, client_sock);
close_socket(client_sock);

return (status == ApiStatus::OK);
}
Expand Down

0 comments on commit 9fef3ff

Please sign in to comment.