Skip to content

Commit

Permalink
Merge pull request #19 from tst-race/lazy-connections
Browse files Browse the repository at this point in the history
Lazy connections
  • Loading branch information
plvines authored Jan 14, 2025
2 parents fbcd5f7 + 2b442a8 commit 2befcfa
Show file tree
Hide file tree
Showing 14 changed files with 376 additions and 268 deletions.
253 changes: 115 additions & 138 deletions app/race-cli/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -506,138 +506,6 @@ int handle_recv_respond(const CmdOptions &opts) {
return 0;
}

int handle_client_connect(const CmdOptions &opts) {
ChannelParamStore params = getParams(opts);

Race race(opts.plugin_path, params);

if (opts.init_send_address.empty()) {
printf("link address required\n");
return -1;
}

SendOptions send_opt;
send_opt.send_channel = opts.init_send_channel;
send_opt.send_role = opts.init_send_role;
send_opt.send_address =
opts.init_send_address; // generated in handle_server_connect
send_opt.recv_channel = opts.init_recv_channel;
send_opt.recv_role = opts.init_recv_role;
send_opt.alt_channel = opts.alt_channel;

std::string introductionMsg = "hello";
auto [status, connection] = race.dial_str(send_opt, introductionMsg);
if (status != ApiStatus::OK) {
printf("dial failed with status: %i\n", status);
return -1;
}
printf("dial success\n");

printf("\ntype message to send followed by <ctrl+d>\n");
auto message = readStdin();
std::string msgStr(message.begin(), message.end());

int packages_remaining = opts.num_packages;
while (opts.num_packages == -1 || packages_remaining > 0) {
status = connection.write_str(msgStr);
if (status != ApiStatus::OK) {
printf("write failed with status: %i\n", status);
break;
} else {
printf("wrote message: %s\n", msgStr.c_str());
}

auto [status2, received_message] = connection.read_str();
if (status2 != ApiStatus::OK) {
printf("read_str failed with status: %i\n", status2);
status = status2;
break;
} else {
printf("received message: %s\n", received_message.c_str());
}
}
auto status2 = connection.close();
if (status2 != ApiStatus::OK) {
printf("close failed with status: %i\n", status2);
status = status2;
}

return (status == ApiStatus::OK);
}

int handle_server_connect(const CmdOptions &opts) {
ChannelParamStore params = getParams(opts);

Race race(opts.plugin_path, params);

ReceiveOptions recv_opt;
recv_opt.recv_channel = opts.init_recv_channel;
recv_opt.recv_role = opts.init_recv_role;
recv_opt.send_channel = opts.init_send_channel;
recv_opt.send_role = opts.init_send_role;

auto [status, link_addr, listener] = race.listen(recv_opt);
if (status != ApiStatus::OK) {
printf("listen failed with status: %i\n", status);
return -1;
}

// assume the link address is passed in out of band
// start client with this link address
printf("\nlistening on link address: '%s'\nbe sure to escape quotes for "
"client\n\n",
link_addr.c_str());

auto [status2, connection] = listener.accept();
if (status2 != ApiStatus::OK) {
printf("accept failed with status: %i\n", status2);
return -2;
}
printf("accept success\n");

printf("\ntype message to send followed by <ctrl+d>\n");
auto message = readStdin();
std::string msgStr(message.begin(), message.end());

auto [status5, received_message2] = connection.read_str();
if (status5 != ApiStatus::OK) {
printf("read failed with status: %i\n", status5);
status = status5;
} else {
printf("received message: %s\n", received_message2.c_str());
}

int packages_remaining = opts.num_packages;
while (opts.num_packages == -1 || packages_remaining > 0) {
auto status3 = connection.write_str(msgStr);
if (status3 != ApiStatus::OK) {
printf("write failed with status: %i\n", status3);
status = status3;
break;
} else {
printf("wrote message: %s\n", msgStr.c_str());
}

auto [status4, received_message] = connection.read_str();
if (status4 != ApiStatus::OK) {
printf("read failed with status: %i\n", status4);
status = status4;
break;
} else {
printf("received message: %s\n", received_message.c_str());
}
}

auto status6 = connection.close();
if (status6 != ApiStatus::OK) {
printf("close failed with status: %i\n", status6);
status = status6;
}

return (status == ApiStatus::OK);
}


void close_socket(int &socket_fd) {
printf("closing socket %d\n", socket_fd);
if (-1 == ::shutdown(socket_fd, SHUT_RDWR)) { // prevent further socket IO
Expand Down Expand Up @@ -1031,7 +899,8 @@ void relay_data_loop(const int client_sock, std::shared_ptr<Raceboat::Conduit> c

void client_connection_loop(int server_sock,
const BootstrapConnectionOptions &conn_opt,
Race &race) {
Race &race,
bool bootstrapping) {
pollfd poll_fd;
memset(&poll_fd, 0, sizeof(poll_fd));
poll_fd.fd = server_sock;
Expand Down Expand Up @@ -1067,7 +936,22 @@ void client_connection_loop(int server_sock,
if (connection->getHandle() == NULL_RACE_HANDLE) {
printf("calling bootstrap_dial_str\n");
// std::tie(status, connection) = race.bootstrap_dial_str(conn_opt, "");
auto [status, tmp_connection] = race.bootstrap_dial_str(conn_opt, "");
ApiStatus status;
Conduit tmp_connection;
if (bootstrapping) {
std::tie(status, tmp_connection) = race.bootstrap_dial_str(conn_opt, "");
} else {
SendOptions send_opt;
send_opt.send_channel = conn_opt.init_send_channel;
send_opt.send_role = conn_opt.init_send_role;
send_opt.send_address =
conn_opt.init_send_address; // generated in handle_server_connect
send_opt.recv_channel = conn_opt.init_recv_channel;
send_opt.recv_role = conn_opt.init_recv_role;
send_opt.alt_channel = conn_opt.final_send_channel;

std::tie(status, tmp_connection) = race.dial_str(send_opt, "hello");
}
connection = std::make_shared<Conduit>(tmp_connection);
if (status != ApiStatus::OK) {
printf("dial failed with status: %i\n", status);
Expand Down Expand Up @@ -1116,6 +1000,55 @@ void check_for_local_port_override(const CmdOptions &opts, int &local_port) {
}
}

int handle_client_connect(const CmdOptions &opts) {
// listen for localhost connection
// dial into / connect to race connection
// connect to race conduit connections
// relay data to race conduit side
// relay data from race conduit to localhost

ChannelParamStore params = getParams(opts);

Race race(opts.plugin_path, params);

if (opts.init_send_address.empty()) {
printf("link address required\n");
return -1;
}

BootstrapConnectionOptions conn_opt;
conn_opt.init_send_channel = opts.init_send_channel;
conn_opt.init_send_role = opts.init_send_role;
conn_opt.init_send_address =
opts.init_send_address; // generated in handle_server_connect
conn_opt.init_recv_channel = opts.init_recv_channel;
conn_opt.init_recv_role = opts.init_recv_role;
conn_opt.final_send_channel = "";
conn_opt.final_send_role = "";
conn_opt.final_recv_channel = "";
conn_opt.final_recv_role = "";
conn_opt.timeout_ms = opts.timeout_ms;

int local_port = 9999;
check_for_local_port_override(opts, local_port);

int server_sock;
printf("CREATING LOCAL SOCKET\n");
// start server for client app to connect to
server_sock = create_listening_socket(local_port);
if (server_sock < 0) {
printf("Failed to create local socket\n");
return -1;
}

client_connection_loop(server_sock, conn_opt, race, false);

printf("closing local socket\n");
close_socket(server_sock);

return 0;
}

int handle_client_bootstrap_connect(const CmdOptions &opts) {
// listen for localhost connection
// dial into / connect to race connection
Expand Down Expand Up @@ -1157,7 +1090,7 @@ int handle_client_bootstrap_connect(const CmdOptions &opts) {
return -1;
}

client_connection_loop(server_sock, conn_opt, race);
client_connection_loop(server_sock, conn_opt, race, true);

printf("closing local socket\n");
close_socket(server_sock);
Expand All @@ -1166,7 +1099,7 @@ int handle_client_bootstrap_connect(const CmdOptions &opts) {
}


ApiStatus server_connections_loop(Race &race, BootstrapConnectionOptions &conn_opt, int local_port) {
ApiStatus server_connections_loop(Race &race, BootstrapConnectionOptions &conn_opt, int local_port, bool bootstrapping) {
// establish a <local-app-socket, conduit> connection pair to relay data to and from each other
// close both upon timeout
ApiStatus status = ApiStatus::OK;
Expand All @@ -1180,7 +1113,19 @@ ApiStatus server_connections_loop(Race &race, BootstrapConnectionOptions &conn_o

printf("CREATING RACE SERVER SOCKET\n");
// listen on race side
auto [status1, link_addr, listener] = race.bootstrap_listen(conn_opt);
ApiStatus status1;
LinkAddress link_addr;
AcceptObject listener;
if (bootstrapping) {
std::tie(status1, link_addr, listener) = race.bootstrap_listen(conn_opt);
} else {
ReceiveOptions recv_opt;
recv_opt.recv_channel = conn_opt.init_recv_channel;
recv_opt.recv_role = conn_opt.init_recv_role;
recv_opt.send_channel = conn_opt.init_send_channel;
recv_opt.send_role = conn_opt.init_send_role;
std::tie(status1, link_addr, listener) = race.listen(recv_opt);
}
if (status1 != ApiStatus::OK) {
printf("listen failed with status: %i\n", status1);
return status1;
Expand Down Expand Up @@ -1219,6 +1164,38 @@ ApiStatus server_connections_loop(Race &race, BootstrapConnectionOptions &conn_o
return status;
}

int handle_server_connect(const CmdOptions &opts) {
// connect to localhost
// listen for race connections
// relay data from race conduit to localhost
// relay data from localhost to race conduit

ChannelParamStore params = getParams(opts);

Race race(opts.plugin_path, params);

BootstrapConnectionOptions conn_opt;
conn_opt.init_recv_channel = opts.init_recv_channel;
conn_opt.init_recv_role = opts.init_recv_role;
conn_opt.init_recv_address = opts.init_recv_address;
conn_opt.init_send_channel = opts.init_send_channel;
conn_opt.init_send_role = opts.init_send_role;
conn_opt.init_send_address = opts.init_send_address;
conn_opt.final_recv_channel = "NA";
conn_opt.final_recv_role = "NA";
conn_opt.final_send_channel = "NA";
conn_opt.final_send_role = "NA";
conn_opt.timeout_ms = opts.timeout_ms;

printf("handle_server_connect\n");

int local_port = 7777;
check_for_local_port_override(opts, local_port);
ApiStatus status = server_connections_loop(race, conn_opt, local_port, false);

return (status == ApiStatus::OK);
}

int handle_server_bootstrap_connect(const CmdOptions &opts) {
// connect to localhost
// listen for race connections
Expand Down Expand Up @@ -1246,7 +1223,7 @@ int handle_server_bootstrap_connect(const CmdOptions &opts) {

int local_port = 7777;
check_for_local_port_override(opts, local_port);
ApiStatus status = server_connections_loop(race, conn_opt, local_port);
ApiStatus status = server_connections_loop(race, conn_opt, local_port, true);

return (status == ApiStatus::OK);
}
Expand Down
5 changes: 0 additions & 5 deletions include/race/Race.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,6 @@ class AcceptObject {
public:
AcceptObject(std::shared_ptr<Core> core, OpHandle handle);
AcceptObject() {}
AcceptObject(const AcceptObject &that) noexcept {
core = that.core;
handle = that.handle;
}

std::pair<ApiStatus, Conduit> accept();

private:
Expand Down
32 changes: 32 additions & 0 deletions source/api-managers/ApiManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,15 @@ SdkResponse ApiManager::onConnectionStatusChanged(
connId, status, properties);
}

SdkResponse ApiManager::onConnStateMachineLinkEstablished(RaceHandle contextHandle,
LinkID linkId,
std::string linkAddress,
std::string channelId) {
TRACE_METHOD();
return post(logPrefix, &ApiManagerInternal::onConnStateMachineLinkEstablished,
contextHandle, linkId, linkAddress, channelId);
}

SdkResponse ApiManager::onConnStateMachineConnected(RaceHandle contextHandle,
ConnectionID connId,
std::string linkAddress,
Expand Down Expand Up @@ -690,6 +699,14 @@ void ApiManagerInternal::stateMachineFinished(ApiContext &context) {
manager.onStateMachineFinished(contextHandle);
}

void ApiManagerInternal::connStateMachineLinkEstablished(RaceHandle contextHandle,
LinkID linkId,
std::string linkAddress,
std::string channelId) {
TRACE_METHOD(contextHandle, linkId, linkAddress, channelId);
manager.onConnStateMachineLinkEstablished(contextHandle, linkId, linkAddress, channelId);
}

void ApiManagerInternal::connStateMachineConnected(RaceHandle contextHandle,
ConnectionID connId,
std::string linkAddress,
Expand Down Expand Up @@ -736,6 +753,21 @@ void ApiManagerInternal::onStateMachineFinished(uint64_t postId,
}
}

void ApiManagerInternal::onConnStateMachineLinkEstablished(uint64_t postId,
RaceHandle contextHandle,
LinkID linkId,
std::string linkAddress,
std::string channelId) {
TRACE_METHOD(postId, contextHandle, linkId, linkAddress, channelId);

auto contexts = getContexts(contextHandle);
for (auto context : contexts) {
context->updateConnStateMachineLinkEstablished(contextHandle, linkId,
linkAddress);
triggerEvent(*context, EVENT_CONN_STATE_MACHINE_LINK_ESTABLISHED);
}
}

void ApiManagerInternal::onConnStateMachineConnected(uint64_t postId,
RaceHandle contextHandle,
ConnectionID connId,
Expand Down
Loading

0 comments on commit 2befcfa

Please sign in to comment.