Skip to content

Commit

Permalink
multiple clients supported. More logs. Debug code added for future de…
Browse files Browse the repository at this point in the history
…bugging
  • Loading branch information
stephen-bialkowski committed May 22, 2024
1 parent 9dbad1a commit 6f5c52e
Show file tree
Hide file tree
Showing 13 changed files with 126 additions and 29 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,6 @@ nb-configuration.xml
*.output
some-file-in-tar-gz.txt
**/__pycache__
test/input-files/usr/*
test/input-files/usr/*
rrlog*
srlog*
12 changes: 7 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,15 @@ docker run --rm -it --name=rbserver --network=rib-overlay-network --ip=10.11.1.2
raceboat:latest bash
```

Start another terminal on the server before running `race-cli` to start `netcat`
Open a new terminal, and `docker exec` into the server before running `race-cli` to start `netcat`
```
docker exec -it rbserver bash
apt-get update && apt-get install ncat && ncat -vvvvvv --broker --listen -p 7777
```

back in the first server container terminal
Back in the first server container terminal
```
gdb --args build/LINUX_arm64-v8a/app/race-cli/race-cli --dir /server-kits --server-bootstrap-connect --recv-channel=twoSixDirectCpp --send-channel=twoSixDirectCpp --final-send-channel=twoSixDirectCpp --final-recv-channel=twoSixDirectCpp --param hostname="10.11.1.2" --param PluginCommsTwoSixStub.startPort=26262 --param PluginCommsTwoSixStub.endPort=26269 --debug | tee rrlog | grep ERROR
race-cli --dir /server-kits --server-bootstrap-connect --recv-channel=twoSixDirectCpp --send-channel=twoSixDirectCpp --final-send-channel=twoSixDirectCpp --final-recv-channel=twoSixDirectCpp --param hostname="10.11.1.2" --param PluginCommsTwoSixStub.startPort=26262 --param PluginCommsTwoSixStub.endPort=26269 --debug | tee rrlog | grep ERROR
```

Start another server terminal, and start a `nc` session to read/write from stdio.
Expand All @@ -148,7 +148,7 @@ docker run --rm -it --name=rbclient --network=rib-overlay-network --ip=10.11.1.3
-v $(pwd)/scripts/:/scripts/ \
raceboat:latest bash

gdb --args build/LINUX_arm64-v8a/app/race-cli/race-cli --dir /client-kits --client-bootstrap-connect --send-channel=twoSixDirectCpp --send-address="{\"hostname\":\"10.11.1.2\",\"port\":26262}" --recv-channel=twoSixDirectCpp --final-send-channel=twoSixDirectCpp --final-recv-channel=twoSixDirectCpp --param hostname="10.11.1.3" --param PluginCommsTwoSixStub.startPort=26262 --param PluginCommsTwoSixStub.endPort=26265 --param localPort=9999 --debug | tee srlog | grep ERROR &
race-cli --dir /client-kits --client-bootstrap-connect --send-channel=twoSixDirectCpp --send-address="{\"hostname\":\"10.11.1.2\",\"port\":26262}" --recv-channel=twoSixDirectCpp --final-send-channel=twoSixDirectCpp --final-recv-channel=twoSixDirectCpp --param hostname="10.11.1.3" --param PluginCommsTwoSixStub.startPort=26262 --param PluginCommsTwoSixStub.endPort=26265 --param localPort=9999 --debug | tee srlog | grep ERROR &

nc localhost 9999

Expand All @@ -164,7 +164,9 @@ docker run --rm -it --name=rbclient2 --network=rib-overlay-network --ip=10.11.1.
-v $(pwd)/scripts/:/scripts/ \
raceboat:latest bash

gdb --args build/LINUX_arm64-v8a/app/race-cli/race-cli --dir /client-kits --client-bootstrap-connect --send-channel=twoSixDirectCpp --send-address="{\"hostname\":\"10.11.1.2\",\"port\":26262}" --recv-channel=twoSixDirectCpp --final-send-channel=twoSixDirectCpp --final-recv-channel=twoSixDirectCpp --param hostname="10.11.1.4" --param PluginCommsTwoSixStub.startPort=26266 --param PluginCommsTwoSixStub.endPort=26269 --param localPort=9998 --debug | tee srlog | grep ERROR &
race-cli --dir /client-kits --client-bootstrap-connect --send-channel=twoSixDirectCpp --send-address="{\"hostname\":\"10.11.1.2\",\"port\":26262}" --recv-channel=twoSixDirectCpp --final-send-channel=twoSixDirectCpp --final-recv-channel=twoSixDirectCpp --param hostname="10.11.1.4" --param PluginCommsTwoSixStub.startPort=26266 --param PluginCommsTwoSixStub.endPort=26269 --param localPort=9998 --debug | tee srlog | grep ERROR &

nc localhost 9998
```

Confirm that both clients can send messages to the server via nc. Note that the server will only send messages to one of the clients.
58 changes: 37 additions & 21 deletions app/race-cli/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -747,20 +747,27 @@ int create_client_connection(std::string &hostaddr, uint16_t port) {
}

void forward_local_to_conduit(int local_sock, Conduit &conduit) {
printf("local_to_conduit with socket fd %d\n", local_sock);
local_sock = dup(local_sock);
std::vector<uint8_t> buffer(BUF_SIZE);
ssize_t received_bytes = ::recv(local_sock, buffer.data(), BUF_SIZE, 0);
Raceboat::OpHandle handle = conduit.getHandle();
printf("local_to_conduit with socket fd %d, with conduit handle %lu\n", local_sock, handle);

while (received_bytes > 0) { // read data from input socket
std::vector<uint8_t> result(buffer.begin(),
buffer.begin() + received_bytes);
printf("Relaying data %s from local socket to conduit\n",
std::string(buffer.begin(), buffer.begin() + received_bytes).c_str());
printf("Relaying data %s from local socket to conduit -- %lu\n",
std::string(buffer.begin(), buffer.begin() + received_bytes).c_str(), conduit.getHandle());
auto status = conduit.write(result); // send data to output socket
if (status != ApiStatus::OK) {
printf("conduit write failed with status: %i on socket\n", status);
break;
}

if (handle != conduit.getHandle()) {
printf(" HANDLE CHANGED! Was %lu, now %lu\n", handle, conduit.getHandle());
}

received_bytes = ::recv(local_sock, buffer.data(), BUF_SIZE, 0);
}

Expand All @@ -775,11 +782,16 @@ void forward_local_to_conduit(int local_sock, Conduit &conduit) {
}

void forward_conduit_to_local(Conduit &conduit, int local_sock) {
printf("conduit_to_local with socket fd %d\n", local_sock);
local_sock = dup(local_sock);
printf("conduit_to_local with socket fd %d, with conduit handle %lu\n", local_sock, conduit.getHandle());
ssize_t send_status;
Raceboat::OpHandle handle = conduit.getHandle();

while (true) {
auto [status, buffer] = conduit.read();
if (handle != conduit.getHandle()) {
printf(" HANDLE CHANGED! Was %lu, now %lu\n", handle, conduit.getHandle());
}
if (status != ApiStatus::OK) {
printf("conduit read failed with status: %i\n", status);
break;
Expand All @@ -800,14 +812,14 @@ void forward_conduit_to_local(Conduit &conduit, int local_sock) {
printf("Exiting conduit_to_local loop\n");
}

void relay_data_loop(int client_sock, Raceboat::Conduit &connection, bool blocking=false) {
printf("relay_data_loop socket: %d\n", client_sock);
std::thread local_to_conduit_thread([client_sock, &connection]() {
forward_local_to_conduit(client_sock, connection);
void relay_data_loop(int client_sock, Raceboat::Conduit &conduit, bool blocking=false) {
printf("relay_data_loop socket: %d with conduit handle %lu\n", client_sock, conduit.getHandle());
std::thread local_to_conduit_thread([client_sock, &conduit]() {
forward_local_to_conduit(client_sock, conduit);
});

std::thread conduit_to_local_thread([client_sock, &connection]() {
forward_conduit_to_local(connection, client_sock);
std::thread conduit_to_local_thread([client_sock, &conduit]() {
forward_conduit_to_local(conduit, client_sock);
});
if (blocking) {
local_to_conduit_thread.join();
Expand All @@ -830,7 +842,6 @@ void client_connection_loop(int server_sock,

// allow re-connect, but only 1 active connection
std::vector<int> sockets;
std::vector<Raceboat::Conduit*> connections;
do {
poll_result = ::poll(&poll_fd, 1, timeout);
if (poll_result < 0) {
Expand Down Expand Up @@ -860,9 +871,9 @@ void client_connection_loop(int server_sock,
connection.close();
} else {
printf("dial success\n");
connections.push_back(&connection);
// block so accept() isn't called until after socket error
relay_data_loop(client_sock, connection, /* blocking */ true);
connection.close();
}
}
} else if (poll_result == 0) {
Expand All @@ -876,9 +887,6 @@ void client_connection_loop(int server_sock,
for (auto sock: sockets) {
close_socket(sock);
}
for (auto conn: connections) {
conn->close();
}
printf("exiting client loop\n");
}

Expand Down Expand Up @@ -939,7 +947,7 @@ int handle_client_bootstrap_connect(const CmdOptions &opts) {
return 0;
}

ApiStatus run_server(Race &race, BootstrapConnectionOptions &conn_opt, int client_sock) {
ApiStatus server_connections_loop(Race &race, BootstrapConnectionOptions &conn_opt, int client_sock) {
ApiStatus status = ApiStatus::OK;

printf("CREATING RACE SERVER SOCKET\n");
Expand All @@ -951,7 +959,7 @@ ApiStatus run_server(Race &race, BootstrapConnectionOptions &conn_opt, int clien
}
printf("\nlistening on link address: '%s'", link_addr.c_str());

std::vector<Raceboat::Conduit> connections;
std::unordered_map<OpHandle, Raceboat::Conduit> connections;
while (1) {
printf("server calling accept\n");
auto [status2, connection] = listener.accept();
Expand All @@ -961,13 +969,21 @@ ApiStatus run_server(Race &race, BootstrapConnectionOptions &conn_opt, int clien
break;
}
printf("accept success\n");
connections.push_back(connection);
relay_data_loop(client_sock, connection);
connections[connection.getHandle()] = connection;
relay_data_loop(client_sock, connections[connection.getHandle()]);

for (auto handleConnPair: connections) {
void* ptr = &handleConnPair.second;
printf(" -- %lu:%lu - %p\n", handleConnPair.first, handleConnPair.second.getHandle(), ptr);
if (handleConnPair.first != handleConnPair.second.getHandle()) {
printf("CONDUIT HANDLE CHANGED! Was %lu, now %lu\n", handleConnPair.first, handleConnPair.second.getHandle());
}
}
}

printf("closing race sockets\n");
for (auto conn: connections) {
auto close_status = conn.close();
auto close_status = conn.second.close();
if (ApiStatus::OK != close_status) {
printf("close failed with status: %i\n", close_status);
}
Expand Down Expand Up @@ -1015,7 +1031,7 @@ int handle_server_bootstrap_connect(const CmdOptions &opts) {
}
}

ApiStatus status = run_server(race, conn_opt, client_sock);
ApiStatus status = server_connections_loop(race, conn_opt, client_sock);
close_socket(client_sock);

return (status == ApiStatus::OK);
Expand Down
4 changes: 3 additions & 1 deletion include/race/Race.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,12 @@ std::string bootstrapConnectionOptionsToString(const BootstrapConnectionOptions
class Conduit {
public:
Conduit(std::shared_ptr<Core> core, OpHandle handle);
Conduit() {}
Conduit() : handle(NULL_RACE_HANDLE) {}
Conduit(const Conduit &that);
virtual ~Conduit() {}

OpHandle getHandle(); // debug

std::pair<ApiStatus, std::vector<uint8_t>> read();
std::pair<ApiStatus, std::string> read_str();
ApiStatus write(std::vector<uint8_t> message);
Expand Down
10 changes: 10 additions & 0 deletions source/Race.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <atomic>
#include <future>
#include <sstream>

#include "Core.h"
#include "helper.h"
Expand Down Expand Up @@ -76,7 +77,12 @@ Conduit::Conduit(const Conduit &that) {
handle = that.handle;
}

OpHandle Conduit::getHandle() {
return handle;
}

std::pair<ApiStatus, std::vector<uint8_t>> Conduit::read() {
TRACE_METHOD();
std::promise<std::pair<ApiStatus, std::vector<uint8_t>>> promise;
auto future = promise.get_future();

Expand All @@ -99,12 +105,14 @@ std::pair<ApiStatus, std::vector<uint8_t>> Conduit::read() {
}

std::pair<ApiStatus, std::string> Conduit::read_str() {
TRACE_METHOD();
auto [status, bytes] = read();
std::string str{bytes.begin(), bytes.end()};
return {status, str};
}

ApiStatus Conduit::write(std::vector<uint8_t> bytes) {
TRACE_METHOD();
std::promise<ApiStatus> promise;
auto future = promise.get_future();

Expand All @@ -126,11 +134,13 @@ ApiStatus Conduit::write(std::vector<uint8_t> bytes) {
}

ApiStatus Conduit::write_str(const std::string &message) {
TRACE_METHOD();
std::vector<uint8_t> bytes{message.begin(), message.end()};
return write(bytes);
}

ApiStatus Conduit::close() {
TRACE_METHOD();
std::promise<ApiStatus> promise;
auto future = promise.get_future();
if (core == nullptr) {
Expand Down
40 changes: 40 additions & 0 deletions source/api-managers/ApiManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1219,4 +1219,44 @@ EventResult ApiManagerInternal::triggerEvent(ApiContext &context,
return result;
}

//--------------------------------------------------------
// debug
//--------------------------------------------------------
void ApiManagerInternal::dumpContexts(std::string context){
TRACE_METHOD();

if(context.size()) {
printf("%s\n", context.c_str());
}
printf("dumping activeContexts handles ---\n");
for(std::unordered_map<RaceHandle, std::unique_ptr<Raceboat::ApiContext>>::iterator it = activeContexts.begin(); it != activeContexts.end(); ++it) {
printf(" %lu -- ", it->first);
it->second->dumpContext();
}

printf("dumping handleContextMap ---\n");
for (auto pair: handleContextMap) {
// second is Contexts = std::unordered_set<ApiContext *>;
printf(" %lu: \n", pair.first);
for (auto ctx: pair.second) {
ctx->dumpContext();
}
}

printf("dumping idContextMap ---\n");
for (auto pair: idContextMap) {
printf(" %s: \n", pair.first.c_str());
for (auto ctx: pair.second) {
ctx->dumpContext();
}
}

printf("dumping packageIdContextMap ---\n");
for (auto pair: packageIdContextMap) {
printf(" %s: \n", pair.first.c_str());
for (auto ctx: pair.second) {
ctx->dumpContext();
}
}
}
} // namespace Raceboat
1 change: 1 addition & 0 deletions source/api-managers/ApiManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ class ApiManagerInternal {
const std::string &id);
virtual void unregisterHandle(ApiContext &context, RaceHandle handle);

void dumpContexts(std::string context=""); // debug
using Contexts = std::unordered_set<ApiContext *>;

protected:
Expand Down
5 changes: 5 additions & 0 deletions source/state-machine/ApiContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ class ApiContext : public Context {
virtual void
updateListenAccept(std::function<void(ApiStatus, RaceHandle)> /* cb */) {}

void dumpContext(std::string context="") { // debug
void* thisPtr = this;
printf(" %s %p: %lu\n", context.c_str(), thisPtr, handle);
}

public:
ApiManagerInternal &manager;
StateEngine &engine;
Expand Down
1 change: 1 addition & 0 deletions source/state-machine/BootstrapDialStateMachine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ struct StateBootstrapDialFailed : public BootstrapDialState {
auto &ctx = getContext(context);

if (ctx.dialCallback) {
helper::logDebug(logPrefix + "dial callback not null");
ctx.dialCallback(ApiStatus::INTERNAL_ERROR, {});
ctx.dialCallback = {};
}
Expand Down
4 changes: 4 additions & 0 deletions source/state-machine/BootstrapListenStateMachine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ struct StateBootstrapListenWaitingForHellos : public BootstrapListenState {
RaceHandle preBootstrapConnSMHandle = std::move(ctx.preBootstrapConduitSM.front());
ctx.preBootstrapConduitSM.pop();
if (!ctx.manager.onBootstrapListenAccept(preBootstrapConnSMHandle, cb)) {
helper::logError(logPrefix + "bootstrap listen accept failed");
cb(ApiStatus::INTERNAL_ERROR, {});
};
}
Expand Down Expand Up @@ -405,16 +406,19 @@ struct StateBootstrapListenFailed : public BootstrapListenState {
auto &ctx = getContext(context);

if (ctx.listenCb) {
helper::logDebug(logPrefix + "listen callback not null");
ctx.listenCb(ApiStatus::INTERNAL_ERROR, {}, {});
ctx.listenCb = {};
}

for (auto &cb : ctx.acceptCb) {
helper::logDebug(logPrefix + "accept callback not null");
cb(ApiStatus::INTERNAL_ERROR, {});
}
ctx.acceptCb = {};

if (ctx.closeCb) {
helper::logDebug(logPrefix + "close callback not null");
ctx.closeCb(ApiStatus::INTERNAL_ERROR);
ctx.closeCb = {};
}
Expand Down
1 change: 1 addition & 0 deletions source/state-machine/BootstrapPreConduitStateMachine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ struct StateBootstrapPreConduitFailed : public BootstrapPreConduitState {
auto &ctx = getContext(context);

if (ctx.acceptCb) {
helper::logDebug(logPrefix + "accept callback not null");
ctx.acceptCb(ApiStatus::INTERNAL_ERROR, {});
ctx.acceptCb = {};
}
Expand Down
Loading

0 comments on commit 6f5c52e

Please sign in to comment.