diff --git a/README.md b/README.md index 49bac42..c5fe56e 100644 --- a/README.md +++ b/README.md @@ -127,7 +127,9 @@ apt-get update && apt-get install ncat && ncat -vvvvvv --broker --listen -p 7777 Back in the first server container terminal. Consider using faster read timeouts with the -w flag. ``` -race-cli --dir /server-kits --server-bootstrap-connect --recv-channel=twoSixDirectCpp --send-channel=twoSixDirectCpp --final-send-channel=twoSixDirectCpp --final-recv-channel=twoSixDirectCpp --param hostname="10.11.1.2" --param PluginCommsTwoSixStub.startPort=26262 --param PluginCommsTwoSixStub.endPort=26269 --debug | tee rrlog | grep ERROR +ncat --broker --listen -p 7777 -vv --chat + +race-cli --dir /server-kits --server-bootstrap-connect --recv-channel=twoSixDirectCpp --send-channel=twoSixDirectCpp --final-send-channel=twoSixDirectCpp --final-recv-channel=twoSixDirectCpp --param hostname="10.11.1.2" --param PluginCommsTwoSixStub.startPort=26262 --param PluginCommsTwoSixStub.endPort=26269 --debug | tee rrlog | grep SOCKET ``` Start another server terminal, and start a `nc` session to read/write from stdio. @@ -148,7 +150,7 @@ docker run --rm -it --name=rbclient --network=rib-overlay-network --ip=10.11.1.3 -v $(pwd)/scripts/:/scripts/ \ raceboat:latest bash -race-cli --dir /client-kits --client-bootstrap-connect --send-channel=twoSixDirectCpp --send-address="{\"hostname\":\"10.11.1.2\",\"port\":26262}" --recv-channel=twoSixDirectCpp --final-send-channel=twoSixDirectCpp --final-recv-channel=twoSixDirectCpp --param hostname="10.11.1.3" --param PluginCommsTwoSixStub.startPort=26262 --param PluginCommsTwoSixStub.endPort=26265 --param localPort=9999 --debug | tee srlog | grep ERROR & +race-cli --dir /client-kits --client-bootstrap-connect --send-channel=twoSixDirectCpp --send-address="{\"hostname\":\"10.11.1.2\",\"port\":26262}" --recv-channel=twoSixDirectCpp --final-send-channel=twoSixDirectCpp --final-recv-channel=twoSixDirectCpp --param hostname="10.11.1.3" --param PluginCommsTwoSixStub.startPort=26262 --param PluginCommsTwoSixStub.endPort=26265 --param localPort=9999 --debug | tee srlog | grep ERROR nc localhost 9999 @@ -164,7 +166,7 @@ docker run --rm -it --name=rbclient2 --network=rib-overlay-network --ip=10.11.1. -v $(pwd)/scripts/:/scripts/ \ raceboat:latest bash -race-cli --dir /client-kits --client-bootstrap-connect --send-channel=twoSixDirectCpp --send-address="{\"hostname\":\"10.11.1.2\",\"port\":26262}" --recv-channel=twoSixDirectCpp --final-send-channel=twoSixDirectCpp --final-recv-channel=twoSixDirectCpp --param hostname="10.11.1.4" --param PluginCommsTwoSixStub.startPort=26266 --param PluginCommsTwoSixStub.endPort=26269 --param localPort=9998 --debug | tee srlog | grep ERROR & +race-cli --dir /client-kits --client-bootstrap-connect --send-channel=twoSixDirectCpp --send-address="{\"hostname\":\"10.11.1.2\",\"port\":26262}" --recv-channel=twoSixDirectCpp --final-send-channel=twoSixDirectCpp --final-recv-channel=twoSixDirectCpp --param hostname="10.11.1.4" --param PluginCommsTwoSixStub.startPort=26266 --param PluginCommsTwoSixStub.endPort=26269 --param localPort=9998 --debug | tee srlog | grep ERROR nc localhost 9998 ``` diff --git a/app/race-cli/main.cpp b/app/race-cli/main.cpp index d4e647f..1f6824c 100644 --- a/app/race-cli/main.cpp +++ b/app/race-cli/main.cpp @@ -730,11 +730,11 @@ int create_client_connection(std::string &hostaddr, uint16_t port) { return -1; } - printf("new client socket family:%d, type:%d, protocol:%d\n", res->ai_family, res->ai_socktype, res->ai_protocol); + printf("SOCKET new client socket family:%d, type:%d, protocol:%d\n", res->ai_family, res->ai_socktype, res->ai_protocol); client_sock = ::socket(res->ai_family, res->ai_socktype, res->ai_protocol); if (client_sock >= 0 && ::connect(client_sock, res->ai_addr, res->ai_addrlen) < 0) { - printf("connect() failed\n"); + printf("SOCKET connect() failed\n"); close_socket(client_sock); client_sock = -1; } @@ -742,57 +742,99 @@ int create_client_connection(std::string &hostaddr, uint16_t port) { if (res != nullptr) { ::freeaddrinfo(res); } - printf("connected socket %d\n", client_sock); + printf("SOCKET connected socket %d\n", client_sock); return client_sock; } -void forward_local_to_conduit(int local_sock, Conduit &conduit) { +int await_socket_input(const int socket_fd, int timeout_ms) { + pollfd poll_fd; + memset(&poll_fd, 0, sizeof(poll_fd)); + poll_fd.fd = socket_fd; + poll_fd.events = POLLIN; + + int poll_result = ::poll(&poll_fd, 1, timeout_ms); + + if (poll_result < 0) { + perror("poll() error"); + } else if (poll_result == 0) { + printf("poll timed out\n"); + } else if (poll_fd.revents != POLLIN) { + printf("unexpected poll event %d\n", poll_fd.revents); + } + + if (poll_result > 1) { + // this "shouldn't" happen, but here for visibility + printf("poll returned %d\n", poll_result); + } + return poll_result; +} + +// struct ForwardRecord { +// int local_sock; +// Conduit &conduit; +// long timeoutSeconds; +// std::atomic currentConduitTimeoutSinceEpoch; // shared amongst threads +// bool closeSocketsUponExit; // relinquish ownership / close upon exit of forward_*() functions +// }; + +void forward_local_to_conduit(int local_sock, Conduit &conduit, const int timeoutSeconds) { local_sock = dup(local_sock); std::vector buffer(BUF_SIZE); - ssize_t received_bytes = ::recv(local_sock, buffer.data(), BUF_SIZE, 0); - Raceboat::OpHandle handle = conduit.getHandle(); - printf("local_to_conduit with socket fd %d, with conduit handle %lu\n", local_sock, handle); + // TODO -- client (only) local read timeout, if timeout occurs, close local socket and conduit + // 2 cases to fail, user closes local app, or server times out connection + // TODO -- make sure timeout flag supports forever case + printf("local_to_conduit with socket fd %d, and %d second timeout\n", local_sock, timeoutSeconds); + int poll_status = 1; + + // TODO this needs to consider the remaining conduit_to_local timeout duration to timeout about the same time + if(timeoutSeconds != Conduit::BLOCKING_READ) { + poll_status = await_socket_input(local_sock, timeoutSeconds*1000); + } + + while (poll_status > 0) { + ssize_t received_bytes = ::recv(local_sock, buffer.data(), BUF_SIZE, 0); - while (received_bytes > 0) { // read data from input socket - std::vector result(buffer.begin(), + if (received_bytes > 0) { + std::vector result(buffer.begin(), buffer.begin() + received_bytes); - printf("Relaying data %s from local socket to conduit -- %lu\n", - std::string(buffer.begin(), buffer.begin() + received_bytes).c_str(), conduit.getHandle()); - auto status = conduit.write(result); // send data to output socket - if (status != ApiStatus::OK) { - printf("conduit write failed with status: %i on socket\n", status); + printf("Relaying data %s from local socket to conduit -- %lu\n", + std::string(buffer.begin(), buffer.begin() + received_bytes).c_str(), conduit.getHandle()); + + auto status = conduit.write(result); // send data to output socket + if (status != ApiStatus::OK) { + printf("conduit write failed with status: %d on socket\n", status); + break; + } + } else if (received_bytes < 0) { + // EWOULDBLOCK/EAGAIN "shouldn't" occur when poll() indicates received data available or when blocking forever + char buf[64]; + snprintf(buf, sizeof(buf) -1, "recv() failed on fd %d", local_sock); + perror(buf); + break; + } else { // 0 - indicates graceful disconnect + printf("remote socket disconnected\n"); break; } - if (handle != conduit.getHandle()) { - printf(" HANDLE CHANGED! Was %lu, now %lu\n", handle, conduit.getHandle()); + if(timeoutSeconds != Conduit::BLOCKING_READ) { + poll_status = await_socket_input(local_sock, timeoutSeconds*1000); } - - received_bytes = ::recv(local_sock, buffer.data(), BUF_SIZE, 0); } - if (received_bytes < 0) { - char buf[64]; - snprintf(buf, sizeof(buf) -1, "recv() failed on fd %d", local_sock); - perror(buf); - } else { // 0 - indicates graceful disconnect - printf("remote socket disconnected\n"); - } + conduit.close(); // close the conduit, so the forward_conduit_to_local thread will stop blocking and return printf("Exiting local_to_conduit loop\n"); } -void forward_conduit_to_local(Conduit &conduit, int local_sock, const long timeoutSeconds) { +void forward_conduit_to_local(Conduit &conduit, int local_sock, const int timeoutSeconds) { local_sock = dup(local_sock); - printf("conduit_to_local with socket fd %d, with conduit handle %lu, and timeout %ld\n", local_sock, conduit.getHandle(), timeoutSeconds); + printf("conduit_to_local with socket fd %d, with %d second timeout\n", local_sock, timeoutSeconds); ssize_t send_status; while (true) { auto [status, buffer] = conduit.read(timeoutSeconds); - if (status == ApiStatus::TIMEOUT) { - printf("conduit read timed out with status: %i\n", status); - } else if (status != ApiStatus::OK) { - printf("conduit read failed with status: %i\n", status); + if (status != ApiStatus::OK) { + printf("conduit read failed with status: %s\n", apiStatusToString(status).c_str()); break; } else { printf("Relaying data %s from conduit to local socket\n", @@ -809,13 +851,15 @@ void forward_conduit_to_local(Conduit &conduit, int local_sock, const long timeo } } } + + ::close(local_sock); // close the socket so the forward_local_to_conduit thread will stop blocking printf("Exiting conduit_to_local loop\n"); } -void relay_data_loop(int client_sock, Raceboat::Conduit &conduit, long timeoutSeconds, bool blocking) { - printf("relay_data_loop socket: %d with race read timeout %ld\n", client_sock, timeoutSeconds); - std::thread local_to_conduit_thread([client_sock, &conduit]() { - forward_local_to_conduit(client_sock, conduit); +void relay_data_loop(const int client_sock, Raceboat::Conduit conduit, const int timeoutSeconds, const bool blocking) { + printf("relay_data_loop socket: %d with race read timeout %d seconds\n", client_sock, timeoutSeconds); + std::thread local_to_conduit_thread([client_sock, &conduit, timeoutSeconds]() { + forward_local_to_conduit(client_sock, conduit, timeoutSeconds); }); std::thread conduit_to_local_thread([client_sock, &conduit, timeoutSeconds]() { @@ -837,29 +881,25 @@ void client_connection_loop(int server_sock, memset(&poll_fd, 0, sizeof(poll_fd)); poll_fd.fd = server_sock; poll_fd.events = POLLIN; - int timeout = (5 * 60 * 1000); // 5 minute timeout default + int timeoutSeconds = 300; // 5 minute timeout default + int timeout_ms = (timeoutSeconds * 1000); int poll_result; - if (conn_opt.timeout_ms > 0) { - timeout = conn_opt.timeout_ms; + if(conn_opt.timeout_seconds > 0) { + timeoutSeconds = conn_opt.timeout_seconds; + timeout_ms = timeoutSeconds * 1000; + } else if (conn_opt.timeout_seconds == -1) { + timeoutSeconds = Conduit::BLOCKING_READ; + timeout_ms = -1; } - // allow re-connect, but only 1 active connection + + + // allow re-connect, but only 1 active connection due to blocking IO threads do { - poll_result = ::poll(&poll_fd, 1, timeout); - if (poll_result < 0) { - perror("poll() error"); - } else if (poll_result == 0) { - printf("client loop timed out\n"); - } else if (poll_fd.revents != POLLIN) { - printf("unexpected poll event %d. Exiting...\n", poll_fd.revents); - } - + poll_result = await_socket_input(server_sock, timeout_ms); + if (poll_result > 0) { - if (poll_result > 1) { - // this "shouldn't" happen, but here for visibility - printf("poll returned %d\n", poll_result); - } printf("accept()ing client socket\n"); int client_sock = ::accept(server_sock, NULL, 0); printf("accepted socket %d\n", client_sock); @@ -874,9 +914,9 @@ void client_connection_loop(int server_sock, } else { printf("dial success\n"); // block so accept() isn't called until after socket error - relay_data_loop(client_sock, connection, Conduit::BLOCKING_READ, /* blocking threads */ true); - connection.close(); - close_socket(client_sock); + relay_data_loop(client_sock, connection, timeoutSeconds, /* blocking threads */ true); + // connection.close(); + // ::close_socket(client_sock); } } } else if (poll_result == 0) { @@ -927,6 +967,7 @@ int handle_client_bootstrap_connect(const CmdOptions &opts) { conn_opt.final_send_role = opts.final_send_role; conn_opt.final_recv_channel = opts.final_recv_channel; conn_opt.final_recv_role = opts.final_recv_role; + conn_opt.timeout_ms = opts.timeout_ms; int local_port = 9999; check_for_local_port_override(opts, local_port); @@ -947,12 +988,17 @@ int handle_client_bootstrap_connect(const CmdOptions &opts) { return 0; } -ApiStatus server_connections_loop(Race &race, BootstrapConnectionOptions &conn_opt, int client_sock) { + +ApiStatus server_connections_loop(Race &race, BootstrapConnectionOptions &conn_opt, int local_port) { + // establish a connection pair to relay data to and from each other + // close both upon timeout ApiStatus status = ApiStatus::OK; - long timeoutSeconds = 300; // 5 minute read timeout default - if(conn_opt.timeout_ms > 0) { - timeoutSeconds = conn_opt.timeout_ms / 1000; + int timeoutSeconds = 300; // 5 minute read timeout default + if(conn_opt.timeout_seconds > 0) { + timeoutSeconds = conn_opt.timeout_seconds; + } else if (conn_opt.timeout_seconds == -1) { + timeoutSeconds = Conduit::BLOCKING_READ; } printf("CREATING RACE SERVER SOCKET\n"); @@ -962,9 +1008,10 @@ ApiStatus server_connections_loop(Race &race, BootstrapConnectionOptions &conn_o printf("listen failed with status: %i\n", status1); return status1; } - printf("\nlistening on link address: '%s'", link_addr.c_str()); + printf("\nlistening on link address: '%s'\n", link_addr.c_str()); - std::unordered_map connections; + std::string host = "localhost"; + // std::unordered_map connections; while (1) { printf("server calling accept\n"); auto [status2, connection] = listener.accept(); @@ -973,26 +1020,41 @@ ApiStatus server_connections_loop(Race &race, BootstrapConnectionOptions &conn_o status = status2; break; } - printf("accept success\n"); - connections[connection.getHandle()] = connection; - relay_data_loop(client_sock, connections[connection.getHandle()], timeoutSeconds, false); - - for (auto handleConnPair: connections) { - void* ptr = &handleConnPair.second; - printf(" -- %lu:%lu - %p\n", handleConnPair.first, handleConnPair.second.getHandle(), ptr); - if (handleConnPair.first != handleConnPair.second.getHandle()) { - printf("CONDUIT HANDLE CHANGED! Was %lu, now %lu\n", handleConnPair.first, handleConnPair.second.getHandle()); + + printf("conduit accept success\n"); + printf("AWAITING LOCAL CLIENT\n"); + + // create client connection for listening socket to connect on + // assume listening local app process is or will be running + int client_sock = -1; + while (client_sock < 0) { + if ((client_sock = create_client_connection(host, local_port)) < 0) { + printf("Awaiting listening socket \n"); + sleep(5); } } + + // connections[connection.getHandle()] = connection; + printf("SOCKET client_sock: %d\n", client_sock); + // TODO - relinquish ownership of socket / conduit to relay_data_loop + relay_data_loop(client_sock, connection, timeoutSeconds, false); + + // for (auto handleConnPair: connections) { + // void* ptr = &handleConnPair.second; + // printf(" -- %lu:%lu - %p\n", handleConnPair.first, handleConnPair.second.getHandle(), ptr); + // if (handleConnPair.first != handleConnPair.second.getHandle()) { + // printf("CONDUIT HANDLE CHANGED! Was %lu, now %lu\n", handleConnPair.first, handleConnPair.second.getHandle()); + // } + // } } printf("closing race sockets\n"); - for (auto conn: connections) { - auto close_status = conn.second.close(); - if (ApiStatus::OK != close_status) { - printf("close failed with status: %i\n", close_status); - } - } + // for (auto conn: connections) { + // auto close_status = conn.second.close(); + // if (ApiStatus::OK != close_status) { + // printf("close failed with status: %i\n", close_status); + // } + // } return status; } @@ -1017,28 +1079,13 @@ int handle_server_bootstrap_connect(const CmdOptions &opts) { conn_opt.final_recv_role = opts.final_recv_role; conn_opt.final_send_channel = opts.final_send_channel; conn_opt.final_send_role = opts.final_send_role; - conn_opt.timeout_ms = opts.timeout_ms; + conn_opt.timeout_seconds = opts.timeout_ms / 1000; printf("handle_server_bootstrap_connect\n"); - int client_sock = -1; - std::string host = "localhost"; int local_port = 7777; check_for_local_port_override(opts, local_port); - - printf("AWAITING LOCAL CLIENT\n"); - - // create client connection for listening socket to connect on - // assume listening process is or will be running - while (client_sock < 0) { - if ((client_sock = create_client_connection(host, local_port)) < 0) { - printf("Awaiting listening socket \n"); - sleep(5); - } - } - - ApiStatus status = server_connections_loop(race, conn_opt, client_sock); - close_socket(client_sock); + ApiStatus status = server_connections_loop(race, conn_opt, local_port); return (status == ApiStatus::OK); } diff --git a/include/race/Race.h b/include/race/Race.h index 413aa7a..cb006c5 100644 --- a/include/race/Race.h +++ b/include/race/Race.h @@ -78,11 +78,13 @@ struct BootstrapConnectionOptions { std::string init_recv_role; std::string final_send_role; std::string final_recv_role; - int timeout_ms = 0; + int timeout_seconds = 0; }; std::string recvOptionsToString(const ReceiveOptions &sendOptions); std::string sendOptionsToString(const SendOptions &sendOptions); std::string bootstrapConnectionOptionsToString(const BootstrapConnectionOptions &bootstrapConnectionOptions); +std::string apiStatusToString(const ApiStatus status); + class Conduit { public: @@ -93,13 +95,13 @@ class Conduit { OpHandle getHandle(); // debug - std::pair> read(long timeoutTimestamp = BLOCKING_READ); + std::pair> read(int timeoutTimestamp = BLOCKING_READ); std::pair read_str(); ApiStatus write(std::vector message); ApiStatus write_str(const std::string &message); ApiStatus close(); - static const long BLOCKING_READ = 0; + static const int BLOCKING_READ = 0; private: std::shared_ptr core; diff --git a/source/Race.cpp b/source/Race.cpp index d8cb3ec..b9d62ae 100644 --- a/source/Race.cpp +++ b/source/Race.cpp @@ -70,6 +70,30 @@ std::string bootstrapConnectionOptionsToString(const BootstrapConnectionOptions return ss.str(); } +std::string apiStatusToString(const ApiStatus status) { + switch(status) { + case ApiStatus::INVALID: + return "INVALID"; + case ApiStatus::OK: + return "OK"; + case ApiStatus::CLOSING: + return "CLOSING"; + case ApiStatus::CHANNEL_INVALID: + return "CHANNEL_INVALID"; + case ApiStatus::INVALID_ARGUMENT: + return "INVALID_ARGUMENT"; + case ApiStatus::PLUGIN_ERROR: + return "PLUGIN_ERROR"; + case ApiStatus::INTERNAL_ERROR: + return "INTERNAL_ERROR"; + case ApiStatus::TIMEOUT: + return "TIMEOUT"; + default: + return "UNKNOWN"; + } +} + + Conduit::Conduit(std::shared_ptr core, OpHandle handle) : core(core), handle(handle) {} Conduit::Conduit(const Conduit &that) { @@ -81,7 +105,7 @@ OpHandle Conduit::getHandle() { return handle; } -std::pair> Conduit::read(long timeoutSeconds) { +std::pair> Conduit::read(int timeoutSeconds) { TRACE_METHOD(timeoutSeconds); std::promise>> promise; auto future = promise.get_future();