diff --git a/app/race-cli/main.cpp b/app/race-cli/main.cpp index 34a5aed..ecb10ba 100644 --- a/app/race-cli/main.cpp +++ b/app/race-cli/main.cpp @@ -506,138 +506,6 @@ int handle_recv_respond(const CmdOptions &opts) { return 0; } -int handle_client_connect(const CmdOptions &opts) { - ChannelParamStore params = getParams(opts); - - Race race(opts.plugin_path, params); - - if (opts.init_send_address.empty()) { - printf("link address required\n"); - return -1; - } - - SendOptions send_opt; - send_opt.send_channel = opts.init_send_channel; - send_opt.send_role = opts.init_send_role; - send_opt.send_address = - opts.init_send_address; // generated in handle_server_connect - send_opt.recv_channel = opts.init_recv_channel; - send_opt.recv_role = opts.init_recv_role; - send_opt.alt_channel = opts.alt_channel; - - std::string introductionMsg = "hello"; - auto [status, connection] = race.dial_str(send_opt, introductionMsg); - if (status != ApiStatus::OK) { - printf("dial failed with status: %i\n", status); - return -1; - } - printf("dial success\n"); - - printf("\ntype message to send followed by \n"); - auto message = readStdin(); - std::string msgStr(message.begin(), message.end()); - - int packages_remaining = opts.num_packages; - while (opts.num_packages == -1 || packages_remaining > 0) { - status = connection.write_str(msgStr); - if (status != ApiStatus::OK) { - printf("write failed with status: %i\n", status); - break; - } else { - printf("wrote message: %s\n", msgStr.c_str()); - } - - auto [status2, received_message] = connection.read_str(); - if (status2 != ApiStatus::OK) { - printf("read_str failed with status: %i\n", status2); - status = status2; - break; - } else { - printf("received message: %s\n", received_message.c_str()); - } - } - auto status2 = connection.close(); - if (status2 != ApiStatus::OK) { - printf("close failed with status: %i\n", status2); - status = status2; - } - - return (status == ApiStatus::OK); -} - -int handle_server_connect(const CmdOptions &opts) { - ChannelParamStore params = getParams(opts); - - Race race(opts.plugin_path, params); - - ReceiveOptions recv_opt; - recv_opt.recv_channel = opts.init_recv_channel; - recv_opt.recv_role = opts.init_recv_role; - recv_opt.send_channel = opts.init_send_channel; - recv_opt.send_role = opts.init_send_role; - - auto [status, link_addr, listener] = race.listen(recv_opt); - if (status != ApiStatus::OK) { - printf("listen failed with status: %i\n", status); - return -1; - } - - // assume the link address is passed in out of band - // start client with this link address - printf("\nlistening on link address: '%s'\nbe sure to escape quotes for " - "client\n\n", - link_addr.c_str()); - - auto [status2, connection] = listener.accept(); - if (status2 != ApiStatus::OK) { - printf("accept failed with status: %i\n", status2); - return -2; - } - printf("accept success\n"); - - printf("\ntype message to send followed by \n"); - auto message = readStdin(); - std::string msgStr(message.begin(), message.end()); - - auto [status5, received_message2] = connection.read_str(); - if (status5 != ApiStatus::OK) { - printf("read failed with status: %i\n", status5); - status = status5; - } else { - printf("received message: %s\n", received_message2.c_str()); - } - - int packages_remaining = opts.num_packages; - while (opts.num_packages == -1 || packages_remaining > 0) { - auto status3 = connection.write_str(msgStr); - if (status3 != ApiStatus::OK) { - printf("write failed with status: %i\n", status3); - status = status3; - break; - } else { - printf("wrote message: %s\n", msgStr.c_str()); - } - - auto [status4, received_message] = connection.read_str(); - if (status4 != ApiStatus::OK) { - printf("read failed with status: %i\n", status4); - status = status4; - break; - } else { - printf("received message: %s\n", received_message.c_str()); - } - } - - auto status6 = connection.close(); - if (status6 != ApiStatus::OK) { - printf("close failed with status: %i\n", status6); - status = status6; - } - - return (status == ApiStatus::OK); -} - - void close_socket(int &socket_fd) { printf("closing socket %d\n", socket_fd); if (-1 == ::shutdown(socket_fd, SHUT_RDWR)) { // prevent further socket IO @@ -1031,7 +899,8 @@ void relay_data_loop(const int client_sock, std::shared_ptr c void client_connection_loop(int server_sock, const BootstrapConnectionOptions &conn_opt, - Race &race) { + Race &race, + bool bootstrapping) { pollfd poll_fd; memset(&poll_fd, 0, sizeof(poll_fd)); poll_fd.fd = server_sock; @@ -1067,7 +936,22 @@ void client_connection_loop(int server_sock, if (connection->getHandle() == NULL_RACE_HANDLE) { printf("calling bootstrap_dial_str\n"); // std::tie(status, connection) = race.bootstrap_dial_str(conn_opt, ""); - auto [status, tmp_connection] = race.bootstrap_dial_str(conn_opt, ""); + ApiStatus status; + Conduit tmp_connection; + if (bootstrapping) { + std::tie(status, tmp_connection) = race.bootstrap_dial_str(conn_opt, ""); + } else { + SendOptions send_opt; + send_opt.send_channel = conn_opt.init_send_channel; + send_opt.send_role = conn_opt.init_send_role; + send_opt.send_address = + conn_opt.init_send_address; // generated in handle_server_connect + send_opt.recv_channel = conn_opt.init_recv_channel; + send_opt.recv_role = conn_opt.init_recv_role; + send_opt.alt_channel = conn_opt.final_send_channel; + + std::tie(status, tmp_connection) = race.dial_str(send_opt, "hello"); + } connection = std::make_shared(tmp_connection); if (status != ApiStatus::OK) { printf("dial failed with status: %i\n", status); @@ -1116,6 +1000,55 @@ void check_for_local_port_override(const CmdOptions &opts, int &local_port) { } } +int handle_client_connect(const CmdOptions &opts) { + // listen for localhost connection + // dial into / connect to race connection + // connect to race conduit connections + // relay data to race conduit side + // relay data from race conduit to localhost + + ChannelParamStore params = getParams(opts); + + Race race(opts.plugin_path, params); + + if (opts.init_send_address.empty()) { + printf("link address required\n"); + return -1; + } + + BootstrapConnectionOptions conn_opt; + conn_opt.init_send_channel = opts.init_send_channel; + conn_opt.init_send_role = opts.init_send_role; + conn_opt.init_send_address = + opts.init_send_address; // generated in handle_server_connect + conn_opt.init_recv_channel = opts.init_recv_channel; + conn_opt.init_recv_role = opts.init_recv_role; + conn_opt.final_send_channel = ""; + conn_opt.final_send_role = ""; + conn_opt.final_recv_channel = ""; + conn_opt.final_recv_role = ""; + conn_opt.timeout_ms = opts.timeout_ms; + + int local_port = 9999; + check_for_local_port_override(opts, local_port); + + int server_sock; + printf("CREATING LOCAL SOCKET\n"); + // start server for client app to connect to + server_sock = create_listening_socket(local_port); + if (server_sock < 0) { + printf("Failed to create local socket\n"); + return -1; + } + + client_connection_loop(server_sock, conn_opt, race, false); + + printf("closing local socket\n"); + close_socket(server_sock); + + return 0; +} + int handle_client_bootstrap_connect(const CmdOptions &opts) { // listen for localhost connection // dial into / connect to race connection @@ -1157,7 +1090,7 @@ int handle_client_bootstrap_connect(const CmdOptions &opts) { return -1; } - client_connection_loop(server_sock, conn_opt, race); + client_connection_loop(server_sock, conn_opt, race, true); printf("closing local socket\n"); close_socket(server_sock); @@ -1166,7 +1099,7 @@ int handle_client_bootstrap_connect(const CmdOptions &opts) { } -ApiStatus server_connections_loop(Race &race, BootstrapConnectionOptions &conn_opt, int local_port) { +ApiStatus server_connections_loop(Race &race, BootstrapConnectionOptions &conn_opt, int local_port, bool bootstrapping) { // establish a connection pair to relay data to and from each other // close both upon timeout ApiStatus status = ApiStatus::OK; @@ -1180,7 +1113,19 @@ ApiStatus server_connections_loop(Race &race, BootstrapConnectionOptions &conn_o printf("CREATING RACE SERVER SOCKET\n"); // listen on race side - auto [status1, link_addr, listener] = race.bootstrap_listen(conn_opt); + ApiStatus status1; + LinkAddress link_addr; + AcceptObject listener; + if (bootstrapping) { + std::tie(status1, link_addr, listener) = race.bootstrap_listen(conn_opt); + } else { + ReceiveOptions recv_opt; + recv_opt.recv_channel = conn_opt.init_recv_channel; + recv_opt.recv_role = conn_opt.init_recv_role; + recv_opt.send_channel = conn_opt.init_send_channel; + recv_opt.send_role = conn_opt.init_send_role; + std::tie(status1, link_addr, listener) = race.listen(recv_opt); + } if (status1 != ApiStatus::OK) { printf("listen failed with status: %i\n", status1); return status1; @@ -1219,6 +1164,38 @@ ApiStatus server_connections_loop(Race &race, BootstrapConnectionOptions &conn_o return status; } +int handle_server_connect(const CmdOptions &opts) { + // connect to localhost + // listen for race connections + // relay data from race conduit to localhost + // relay data from localhost to race conduit + + ChannelParamStore params = getParams(opts); + + Race race(opts.plugin_path, params); + + BootstrapConnectionOptions conn_opt; + conn_opt.init_recv_channel = opts.init_recv_channel; + conn_opt.init_recv_role = opts.init_recv_role; + conn_opt.init_recv_address = opts.init_recv_address; + conn_opt.init_send_channel = opts.init_send_channel; + conn_opt.init_send_role = opts.init_send_role; + conn_opt.init_send_address = opts.init_send_address; + conn_opt.final_recv_channel = "NA"; + conn_opt.final_recv_role = "NA"; + conn_opt.final_send_channel = "NA"; + conn_opt.final_send_role = "NA"; + conn_opt.timeout_ms = opts.timeout_ms; + + printf("handle_server_connect\n"); + + int local_port = 7777; + check_for_local_port_override(opts, local_port); + ApiStatus status = server_connections_loop(race, conn_opt, local_port, false); + + return (status == ApiStatus::OK); +} + int handle_server_bootstrap_connect(const CmdOptions &opts) { // connect to localhost // listen for race connections @@ -1246,7 +1223,7 @@ int handle_server_bootstrap_connect(const CmdOptions &opts) { int local_port = 7777; check_for_local_port_override(opts, local_port); - ApiStatus status = server_connections_loop(race, conn_opt, local_port); + ApiStatus status = server_connections_loop(race, conn_opt, local_port, true); return (status == ApiStatus::OK); } diff --git a/include/race/Race.h b/include/race/Race.h index 8177974..a18b214 100644 --- a/include/race/Race.h +++ b/include/race/Race.h @@ -240,11 +240,6 @@ class AcceptObject { public: AcceptObject(std::shared_ptr core, OpHandle handle); AcceptObject() {} - AcceptObject(const AcceptObject &that) noexcept { - core = that.core; - handle = that.handle; - } - std::pair accept(); private: diff --git a/source/api-managers/ApiManager.cpp b/source/api-managers/ApiManager.cpp index 1567f1c..ce360df 100644 --- a/source/api-managers/ApiManager.cpp +++ b/source/api-managers/ApiManager.cpp @@ -301,6 +301,15 @@ SdkResponse ApiManager::onConnectionStatusChanged( connId, status, properties); } +SdkResponse ApiManager::onConnStateMachineLinkEstablished(RaceHandle contextHandle, + LinkID linkId, + std::string linkAddress, + std::string channelId) { + TRACE_METHOD(); + return post(logPrefix, &ApiManagerInternal::onConnStateMachineLinkEstablished, + contextHandle, linkId, linkAddress, channelId); +} + SdkResponse ApiManager::onConnStateMachineConnected(RaceHandle contextHandle, ConnectionID connId, std::string linkAddress, @@ -690,6 +699,14 @@ void ApiManagerInternal::stateMachineFinished(ApiContext &context) { manager.onStateMachineFinished(contextHandle); } +void ApiManagerInternal::connStateMachineLinkEstablished(RaceHandle contextHandle, + LinkID linkId, + std::string linkAddress, + std::string channelId) { + TRACE_METHOD(contextHandle, linkId, linkAddress, channelId); + manager.onConnStateMachineLinkEstablished(contextHandle, linkId, linkAddress, channelId); +} + void ApiManagerInternal::connStateMachineConnected(RaceHandle contextHandle, ConnectionID connId, std::string linkAddress, @@ -736,6 +753,21 @@ void ApiManagerInternal::onStateMachineFinished(uint64_t postId, } } +void ApiManagerInternal::onConnStateMachineLinkEstablished(uint64_t postId, + RaceHandle contextHandle, + LinkID linkId, + std::string linkAddress, + std::string channelId) { + TRACE_METHOD(postId, contextHandle, linkId, linkAddress, channelId); + + auto contexts = getContexts(contextHandle); + for (auto context : contexts) { + context->updateConnStateMachineLinkEstablished(contextHandle, linkId, + linkAddress); + triggerEvent(*context, EVENT_CONN_STATE_MACHINE_LINK_ESTABLISHED); + } +} + void ApiManagerInternal::onConnStateMachineConnected(uint64_t postId, RaceHandle contextHandle, ConnectionID connId, diff --git a/source/api-managers/ApiManager.h b/source/api-managers/ApiManager.h index 09f5102..3db0975 100644 --- a/source/api-managers/ApiManager.h +++ b/source/api-managers/ApiManager.h @@ -133,6 +133,10 @@ class ApiManagerInternal { std::string role); virtual void stateMachineFailed(ApiContext &context); virtual void stateMachineFinished(ApiContext &context); + virtual void connStateMachineLinkEstablished(RaceHandle contextHandle, + LinkID linkId, + std::string linkAddress, + std::string channelId); virtual void connStateMachineConnected(RaceHandle contextHandle, ConnectionID connId, std::string linkAddress, @@ -141,6 +145,11 @@ class ApiManagerInternal { virtual void onStateMachineFailed(uint64_t postId, RaceHandle contextHandle); virtual void onStateMachineFinished(uint64_t postId, RaceHandle contextHandle); + virtual void onConnStateMachineLinkEstablished(uint64_t postId, + RaceHandle contextHandle, + LinkID linkId, + std::string linkAddress, + std::string channelId); virtual void onConnStateMachineConnected(uint64_t postId, RaceHandle contextHandle, ConnectionID connId, @@ -330,6 +339,10 @@ class ApiManager { // These are queued on the work thread queue to prevent stack issues virtual SdkResponse onStateMachineFailed(RaceHandle contextHandle); virtual SdkResponse onStateMachineFinished(RaceHandle contextHandle); + virtual SdkResponse onConnStateMachineLinkEstablished(RaceHandle contextHandle, + LinkID linkId, + std::string linkAddress, + std::string channelId); virtual SdkResponse onConnStateMachineConnected(RaceHandle contextHandle, ConnectionID connId, std::string linkAddress, diff --git a/source/state-machine/ApiContext.h b/source/state-machine/ApiContext.h index f1c067e..609df86 100644 --- a/source/state-machine/ApiContext.h +++ b/source/state-machine/ApiContext.h @@ -109,6 +109,9 @@ class ApiContext : public Context { virtual void updateStateMachineFinished(RaceHandle /* contextHandle */){}; virtual void updateDependent(RaceHandle /* contextHandle */){}; virtual void updateDetach(RaceHandle /* contextHandle */){}; + virtual void updateConnStateMachineLinkEstablished(RaceHandle /* contextHandle */, + LinkID /* linkId */, + std::string /* linkAddress */){}; virtual void updateConnStateMachineConnected(RaceHandle /* contextHandle */, ConnectionID /* connId */, std::string /* linkAddress */){}; diff --git a/source/state-machine/ConnectionStateMachine.cpp b/source/state-machine/ConnectionStateMachine.cpp index 3445951..b5de95a 100644 --- a/source/state-machine/ConnectionStateMachine.cpp +++ b/source/state-machine/ConnectionStateMachine.cpp @@ -188,6 +188,9 @@ struct StateConnLinkEstablished : public ConnState { } ctx.manager.registerHandle(ctx, openConnHandle); + ctx.manager.connStateMachineLinkEstablished(ctx.handle, ctx.linkId, + ctx.updatedLinkAddress, + ctx.channelId); return EventResult::SUCCESS; } diff --git a/source/state-machine/DialStateMachine.cpp b/source/state-machine/DialStateMachine.cpp index 057f321..6926790 100644 --- a/source/state-machine/DialStateMachine.cpp +++ b/source/state-machine/DialStateMachine.cpp @@ -37,6 +37,12 @@ void ApiDialContext::updateDial(const SendOptions &sendOptions, this->dialCallback = cb; } +void ApiDialContext::updateConnStateMachineLinkEstablished( + RaceHandle contextHandle, LinkID /*linkId*/, std::string linkAddress) { + if (this->recvConnSMHandle == contextHandle) { + this->recvLinkAddress = linkAddress; + } +}; void ApiDialContext::updateConnStateMachineConnected(RaceHandle contextHandle, ConnectionID connId, std::string linkAddress) { @@ -58,48 +64,19 @@ struct StateDialInitial : public DialState { TRACE_METHOD(); auto &ctx = getContext(context); - ChannelId sendChannelId = ctx.opts.send_channel; + // ChannelId sendChannelId = ctx.opts.send_channel; ChannelId recvChannelId = ctx.opts.recv_channel; - std::string sendRole = ctx.opts.send_role; std::string recvRole = ctx.opts.recv_role; - std::string sendLinkAddress = ctx.opts.send_address; - if (sendChannelId.empty()) { - helper::logError(logPrefix + - "Invalid send channel id passed to sendReceive"); - ctx.dialCallback(ApiStatus::CHANNEL_INVALID, {}, {}); - ctx.dialCallback = {}; - return EventResult::NOT_SUPPORTED; - } else if (recvChannelId.empty()) { + if (recvChannelId.empty()) { helper::logError(logPrefix + "Invalid recv channel id passed to recv"); ctx.dialCallback(ApiStatus::CHANNEL_INVALID, {}, {}); ctx.dialCallback = {}; return EventResult::NOT_SUPPORTED; - } else if (sendRole.empty()) { - helper::logError(logPrefix + "Invalid send role passed to sendReceive"); - ctx.dialCallback(ApiStatus::INVALID_ARGUMENT, {}, {}); - ctx.dialCallback = {}; - return EventResult::NOT_SUPPORTED; } else if (recvRole.empty()) { helper::logError(logPrefix + "Invalid recv role passed to sendReceive"); ctx.dialCallback(ApiStatus::INVALID_ARGUMENT, {}, {}); ctx.dialCallback = {}; return EventResult::NOT_SUPPORTED; - } else if (sendLinkAddress.empty()) { - helper::logError(logPrefix + - "Invalid send address passed to sendReceive"); - ctx.dialCallback(ApiStatus::INVALID_ARGUMENT, {}, {}); - ctx.dialCallback = {}; - return EventResult::NOT_SUPPORTED; - } - - PluginContainer *sendContainer = - ctx.manager.getCore().getChannel(sendChannelId); - if (sendContainer == nullptr) { - helper::logError(logPrefix + "Failed to get channel with id " + - sendChannelId); - ctx.dialCallback(ApiStatus::CHANNEL_INVALID, {}, {}); - ctx.dialCallback = {}; - return EventResult::NOT_SUPPORTED; } PluginContainer *recvContainer = @@ -116,14 +93,6 @@ struct StateDialInitial : public DialState { ctx.manager.getCore().getEntropy(packageIdLen); ctx.packageId = std::string(packageIdBytes.begin(), packageIdBytes.end()); - ctx.sendConnSMHandle = ctx.manager.startConnStateMachine( - ctx.handle, sendChannelId, sendRole, sendLinkAddress, false, true); - - if (ctx.sendConnSMHandle == NULL_RACE_HANDLE) { - helper::logError(logPrefix + " starting connection state machine failed"); - return EventResult::NOT_SUPPORTED; - } - ctx.recvConnSMHandle = ctx.manager.startConnStateMachine( ctx.handle, recvChannelId, recvRole, "", true, false); @@ -132,22 +101,79 @@ struct StateDialInitial : public DialState { return EventResult::NOT_SUPPORTED; } - ctx.manager.registerHandle(ctx, ctx.sendConnSMHandle); ctx.manager.registerHandle(ctx, ctx.recvConnSMHandle); return EventResult::SUCCESS; } }; -struct StateDialWaitingForSecondConnection : public DialState { - explicit StateDialWaitingForSecondConnection( - StateType id = STATE_DIAL_WAITING_FOR_SECOND_CONNECTION) - : DialState(id, "StateDialWaitingForSecondConnection") {} +struct StateDialWaitingForSendConnection : public DialState { + explicit StateDialWaitingForSendConnection( + StateType id = STATE_DIAL_WAITING_FOR_SEND_CONNECTION) + : DialState(id, "StateDialWaitingForSendConnection") {} + virtual EventResult enter(Context &context) { + TRACE_METHOD(); + auto &ctx = getContext(context); + + // Send connection exists, progress state + if (not ctx.sendConnId.empty()) { + helper::logDebug(logPrefix + + "emitting SATISFIED to move to next state"); + ctx.pendingEvents.push(EVENT_SATISFIED); + return EventResult::SUCCESS; + } else if (ctx.sendConnSMHandle != 0) { + helper::logDebug(logPrefix + + "Send Connection already in-progress, nothing left to do in this state until it finishes."); + return EventResult::SUCCESS; + } + helper::logDebug(logPrefix + + "recv link established, triggering connecting for send"); + + ChannelId sendChannelId = ctx.opts.send_channel; + std::string sendRole = ctx.opts.send_role; + std::string sendLinkAddress = ctx.opts.send_address; + if (sendChannelId.empty()) { + helper::logError(logPrefix + + "Invalid send channel id passed to sendReceive"); + ctx.dialCallback(ApiStatus::CHANNEL_INVALID, {}, {}); + ctx.dialCallback = {}; + return EventResult::NOT_SUPPORTED; + } else if (sendRole.empty()) { + helper::logError(logPrefix + "Invalid send role passed to sendReceive"); + ctx.dialCallback(ApiStatus::INVALID_ARGUMENT, {}, {}); + ctx.dialCallback = {}; + return EventResult::NOT_SUPPORTED; + } else if (sendLinkAddress.empty()) { + helper::logError(logPrefix + + "Invalid send address passed to sendReceive"); + ctx.dialCallback(ApiStatus::INVALID_ARGUMENT, {}, {}); + ctx.dialCallback = {}; + return EventResult::NOT_SUPPORTED; + } + PluginContainer *sendContainer = + ctx.manager.getCore().getChannel(sendChannelId); + if (sendContainer == nullptr) { + helper::logError(logPrefix + "Failed to get channel with id " + + sendChannelId); + ctx.dialCallback(ApiStatus::INVALID_ARGUMENT, {}, {}); + ctx.dialCallback = {}; + return EventResult::NOT_SUPPORTED; + } + + ctx.sendConnSMHandle = ctx.manager.startConnStateMachine( + ctx.handle, sendChannelId, sendRole, sendLinkAddress, false, true); + if (ctx.sendConnSMHandle == NULL_RACE_HANDLE) { + helper::logError(logPrefix + " starting connection state machine failed"); + return EventResult::NOT_SUPPORTED; + } + ctx.manager.registerHandle(ctx, ctx.sendConnSMHandle); + return EventResult::SUCCESS; + } }; -struct StateDialConnectionsOpen : public DialState { - explicit StateDialConnectionsOpen(StateType id = STATE_DIAL_CONNECTIONS_OPEN) - : DialState(id, "StateDialConnectionsOpen") {} +struct StateDialSendOpen : public DialState { + explicit StateDialSendOpen(StateType id = STATE_DIAL_SEND_OPEN) + : DialState(id, "StateDialSendOpen") {} virtual EventResult enter(Context &context) { TRACE_METHOD(); auto &ctx = getContext(context); @@ -156,7 +182,6 @@ struct StateDialConnectionsOpen : public DialState { RaceHandle connectionHandle = ctx.manager.getCore().generateHandle(); ctx.manager.registerHandle(ctx, connectionHandle); - // ctx.manager.registerId(ctx, ctx.recvConnId); // TODO: There's better ways to encode than base64 inside json nlohmann::json json = { @@ -187,6 +212,20 @@ struct StateDialConnectionsOpen : public DialState { } }; +struct StateDialPackageSent : public DialState { + explicit StateDialPackageSent( + StateType id = STATE_DIAL_PACKAGE_SENT) + : DialState(id, "STATE_DIAL_PACKAGE_SENT") {} + virtual EventResult enter(Context &context) { + auto &ctx = getContext(context); + if (not ctx.recvConnId.empty()) { + ctx.manager.registerId(ctx, ctx.recvConnId); + ctx.pendingEvents.push(EVENT_SATISFIED); + } + return EventResult::SUCCESS; + } +}; + struct StateDialFinished : public DialState { explicit StateDialFinished(StateType id = STATE_DIAL_FINISHED) : DialState(id, "StateDialFinished") {} @@ -250,11 +289,12 @@ DialStateEngine::DialStateEngine() { // connStateMachineConnected addInitialState(STATE_DIAL_INITIAL); // does nothing, waits for a second connStateMachineConnected call - addState( - STATE_DIAL_WAITING_FOR_SECOND_CONNECTION); + addState( + STATE_DIAL_WAITING_FOR_SEND_CONNECTION); // sends initial package with recvLinkAddress and message if any, waits for // package sent - addState(STATE_DIAL_CONNECTIONS_OPEN); + addState(STATE_DIAL_SEND_OPEN); + addState(STATE_DIAL_PACKAGE_SENT); // creates connection object, calls dial callback, calls state machine // finished on manager, final state addState(STATE_DIAL_FINISHED); @@ -263,9 +303,12 @@ DialStateEngine::DialStateEngine() { addFailedState(STATE_DIAL_FAILED); // clang-format off - declareStateTransition(STATE_DIAL_INITIAL, EVENT_CONN_STATE_MACHINE_CONNECTED, STATE_DIAL_WAITING_FOR_SECOND_CONNECTION); - declareStateTransition(STATE_DIAL_WAITING_FOR_SECOND_CONNECTION, EVENT_CONN_STATE_MACHINE_CONNECTED, STATE_DIAL_CONNECTIONS_OPEN); - declareStateTransition(STATE_DIAL_CONNECTIONS_OPEN, EVENT_PACKAGE_SENT, STATE_DIAL_FINISHED); + declareStateTransition(STATE_DIAL_INITIAL, EVENT_CONN_STATE_MACHINE_LINK_ESTABLISHED, STATE_DIAL_WAITING_FOR_SEND_CONNECTION); + declareStateTransition(STATE_DIAL_WAITING_FOR_SEND_CONNECTION, EVENT_CONN_STATE_MACHINE_CONNECTED, STATE_DIAL_WAITING_FOR_SEND_CONNECTION); + declareStateTransition(STATE_DIAL_WAITING_FOR_SEND_CONNECTION, EVENT_SATISFIED, STATE_DIAL_SEND_OPEN); + declareStateTransition(STATE_DIAL_SEND_OPEN, EVENT_PACKAGE_SENT, STATE_DIAL_PACKAGE_SENT); + declareStateTransition(STATE_DIAL_PACKAGE_SENT, EVENT_CONN_STATE_MACHINE_CONNECTED, STATE_DIAL_PACKAGE_SENT); + declareStateTransition(STATE_DIAL_PACKAGE_SENT, EVENT_SATISFIED, STATE_DIAL_FINISHED); // clang-format on } diff --git a/source/state-machine/DialStateMachine.h b/source/state-machine/DialStateMachine.h index 6044eb3..b01f0a0 100644 --- a/source/state-machine/DialStateMachine.h +++ b/source/state-machine/DialStateMachine.h @@ -23,12 +23,14 @@ namespace Raceboat { class ApiDialContext : public ApiContext { public: ApiDialContext(ApiManagerInternal &manager, StateEngine &engine) - : ApiContext(manager, engine) {} + : ApiContext(manager, engine), sendConnSMHandle(0), recvConnSMHandle(0) {} virtual void updateDial(const SendOptions &sendOptions, std::vector &&data, std::function cb) override; - + virtual void + updateConnStateMachineLinkEstablished(RaceHandle contextHandle, LinkID linkId, + std::string linkAddress) override; virtual void updateConnStateMachineConnected(RaceHandle contextHandle, ConnectionID connId, std::string linkAddress) override; diff --git a/source/state-machine/Events.cpp b/source/state-machine/Events.cpp index a4aa12f..ee02caf 100644 --- a/source/state-machine/Events.cpp +++ b/source/state-machine/Events.cpp @@ -56,6 +56,8 @@ std::string eventToString(EventType event) { return "EVENT_ADD_DEPENDENT"; case EVENT_DETACH_DEPENDENT: return "EVENT_DETACH_DEPENDENT"; + case EVENT_CONN_STATE_MACHINE_LINK_ESTABLISHED: + return "EVENT_CONN_STATE_MACHINE_LINK_ESTABLISHED"; case EVENT_CONN_STATE_MACHINE_CONNECTED: return "EVENT_CONN_STATE_MACHINE_CONNECTED"; case EVENT_CONN_CLOSE: @@ -76,4 +78,4 @@ std::string eventToString(EventType event) { return "event " + std::to_string(event); } -} // namespace Raceboat \ No newline at end of file +} // namespace Raceboat diff --git a/source/state-machine/Events.h b/source/state-machine/Events.h index fec2398..c82678a 100644 --- a/source/state-machine/Events.h +++ b/source/state-machine/Events.h @@ -39,6 +39,7 @@ enum ApiManagerEvent : EventType { EVENT_STATE_MACHINE_FINISHED, EVENT_ADD_DEPENDENT, EVENT_DETACH_DEPENDENT, + EVENT_CONN_STATE_MACHINE_LINK_ESTABLISHED, EVENT_CONN_STATE_MACHINE_CONNECTED, EVENT_CONN_CLOSE, EVENT_READ, diff --git a/source/state-machine/SendReceiveStateMachine.cpp b/source/state-machine/SendReceiveStateMachine.cpp index 8f894d1..3ded124 100644 --- a/source/state-machine/SendReceiveStateMachine.cpp +++ b/source/state-machine/SendReceiveStateMachine.cpp @@ -39,6 +39,12 @@ void ApiSendReceiveContext::updateSendReceive( this->data = _data; this->callback = _cb; } +void ApiSendReceiveContext::updateConnStateMachineLinkEstablished( + RaceHandle contextHandle, LinkID /*linkId*/, std::string linkAddress) { + if (this->recvConnSMHandle == contextHandle) { + this->recvLinkAddress = linkAddress; + } +}; void ApiSendReceiveContext::updateConnStateMachineConnected( RaceHandle contextHandle, ConnectionID connId, std::string linkAddress) { if (this->recvConnSMHandle == contextHandle) { @@ -63,10 +69,69 @@ struct StateSendReceiveInitial : public SendReceiveState { TRACE_METHOD(); auto &ctx = getContext(context); - ChannelId sendChannelId = ctx.opts.send_channel; ChannelId recvChannelId = ctx.opts.recv_channel; - std::string sendRole = ctx.opts.send_role; std::string recvRole = ctx.opts.recv_role; + if (recvChannelId.empty()) { + helper::logError(logPrefix + "Invalid recv channel id passed to recv"); + ctx.callback(ApiStatus::CHANNEL_INVALID, {}); + ctx.callback = {}; + return EventResult::NOT_SUPPORTED; + } else if (recvRole.empty()) { + helper::logError(logPrefix + "Invalid recv role passed to sendReceive"); + ctx.callback(ApiStatus::INVALID_ARGUMENT, {}); + ctx.callback = {}; + return EventResult::NOT_SUPPORTED; + } + PluginContainer *recvContainer = + ctx.manager.getCore().getChannel(recvChannelId); + if (recvContainer == nullptr) { + helper::logError(logPrefix + "Failed to get channel with id " + + recvChannelId); + ctx.callback(ApiStatus::CHANNEL_INVALID, {}); + ctx.callback = {}; + return EventResult::NOT_SUPPORTED; + } + + + ctx.recvConnSMHandle = ctx.manager.startConnStateMachine( + ctx.handle, recvChannelId, recvRole, "", true, false); + + if (ctx.recvConnSMHandle == NULL_RACE_HANDLE) { + helper::logError(logPrefix + " starting connection state machine failed"); + return EventResult::NOT_SUPPORTED; + } + + ctx.manager.registerHandle(ctx, ctx.recvConnSMHandle); + + return EventResult::SUCCESS; + } +}; + +struct StateSendReceiveWaitingForSendConnection : public SendReceiveState { + explicit StateSendReceiveWaitingForSendConnection( + StateType id = STATE_SEND_RECEIVE_WAITING_FOR_SEND_CONNECTION) + : SendReceiveState(id, + "STATE_SEND_RECEIVE_WAITING_FOR_SEND_CONNECTION") {} + virtual EventResult enter(Context &context) { + TRACE_METHOD(); + auto &ctx = getContext(context); + + // Send connection exists, progress state + if (not ctx.sendConnId.empty()) { + helper::logDebug(logPrefix + + "emitting SATISFIED to move to next state"); + ctx.pendingEvents.push(EVENT_SATISFIED); + return EventResult::SUCCESS; + } else if (ctx.sendConnSMHandle != 0) { + helper::logDebug(logPrefix + + "Send Connection already in-progress, nothing left to do in this state until it finishes."); + return EventResult::SUCCESS; + } + helper::logDebug(logPrefix + + "recv link established, triggering connecting for send"); + + ChannelId sendChannelId = ctx.opts.send_channel; + std::string sendRole = ctx.opts.send_role; std::string sendLinkAddress = ctx.opts.send_address; if (sendChannelId.empty()) { helper::logError(logPrefix + @@ -74,21 +139,11 @@ struct StateSendReceiveInitial : public SendReceiveState { ctx.callback(ApiStatus::CHANNEL_INVALID, {}); ctx.callback = {}; return EventResult::NOT_SUPPORTED; - } else if (recvChannelId.empty()) { - helper::logError(logPrefix + "Invalid recv channel id passed to recv"); - ctx.callback(ApiStatus::CHANNEL_INVALID, {}); - ctx.callback = {}; - return EventResult::NOT_SUPPORTED; } else if (sendRole.empty()) { helper::logError(logPrefix + "Invalid send role passed to sendReceive"); ctx.callback(ApiStatus::INVALID_ARGUMENT, {}); ctx.callback = {}; return EventResult::NOT_SUPPORTED; - } else if (recvRole.empty()) { - helper::logError(logPrefix + "Invalid recv role passed to sendReceive"); - ctx.callback(ApiStatus::INVALID_ARGUMENT, {}); - ctx.callback = {}; - return EventResult::NOT_SUPPORTED; } else if (sendLinkAddress.empty()) { helper::logError(logPrefix + "Invalid send address passed to sendReceive"); @@ -96,7 +151,6 @@ struct StateSendReceiveInitial : public SendReceiveState { ctx.callback = {}; return EventResult::NOT_SUPPORTED; } - PluginContainer *sendContainer = ctx.manager.getCore().getChannel(sendChannelId); if (sendContainer == nullptr) { @@ -107,60 +161,27 @@ struct StateSendReceiveInitial : public SendReceiveState { return EventResult::NOT_SUPPORTED; } - PluginContainer *recvContainer = - ctx.manager.getCore().getChannel(recvChannelId); - if (recvContainer == nullptr) { - helper::logError(logPrefix + "Failed to get channel with id " + - recvChannelId); - ctx.callback(ApiStatus::CHANNEL_INVALID, {}); - ctx.callback = {}; - return EventResult::NOT_SUPPORTED; - } - ctx.sendConnSMHandle = ctx.manager.startConnStateMachine( ctx.handle, sendChannelId, sendRole, sendLinkAddress, false, true); - if (ctx.sendConnSMHandle == NULL_RACE_HANDLE) { helper::logError(logPrefix + " starting connection state machine failed"); return EventResult::NOT_SUPPORTED; } - - ctx.recvConnSMHandle = ctx.manager.startConnStateMachine( - ctx.handle, recvChannelId, recvRole, "", true, false); - - if (ctx.recvConnSMHandle == NULL_RACE_HANDLE) { - helper::logError(logPrefix + " starting connection state machine failed"); - return EventResult::NOT_SUPPORTED; - } - ctx.manager.registerHandle(ctx, ctx.sendConnSMHandle); - ctx.manager.registerHandle(ctx, ctx.recvConnSMHandle); - return EventResult::SUCCESS; } }; -struct StateSendReceiveWaitingForSecondConnection : public SendReceiveState { - explicit StateSendReceiveWaitingForSecondConnection( - StateType id = STATE_SEND_RECEIVE_WAITING_FOR_SECOND_CONNECTION) - : SendReceiveState(id, - "STATE_SEND_RECEIVE_WAITING_FOR_SECOND_CONNECTION") {} -}; - -struct StateSendReceiveConnectionsOpen : public SendReceiveState { - explicit StateSendReceiveConnectionsOpen( - StateType id = STATE_SEND_RECEIVE_CONNECTIONS_OPEN) - : SendReceiveState(id, "STATE_SEND_RECEIVE_CONNECTIONS_OPEN") {} +struct StateSendReceiveSendOpen : public SendReceiveState { + explicit StateSendReceiveSendOpen( + StateType id = STATE_SEND_RECEIVE_SEND_OPEN) + : SendReceiveState(id, "STATE_SEND_RECEIVE_SEND_OPEN") {} virtual EventResult enter(Context &context) { TRACE_METHOD(); auto &ctx = getContext(context); PluginWrapper &plugin = getPlugin(ctx, ctx.opts.send_channel); RaceHandle pkgHandle = ctx.manager.getCore().generateHandle(); - if (ctx.recvConnId.empty()) { - return EventResult::NOT_SUPPORTED; - } - // TODO: There's better ways to encode than base64 inside json std::string dataB64 = base64::encode(std::move(ctx.data)); nlohmann::json json = { @@ -182,7 +203,6 @@ struct StateSendReceiveConnectionsOpen : public SendReceiveState { } ctx.manager.registerHandle(ctx, pkgHandle); - ctx.manager.registerId(ctx, ctx.recvConnId); return EventResult::SUCCESS; } @@ -191,7 +211,14 @@ struct StateSendReceiveConnectionsOpen : public SendReceiveState { struct StateSendReceivePackageSent : public SendReceiveState { explicit StateSendReceivePackageSent( StateType id = STATE_SEND_RECEIVE_PACKAGE_SENT) - : SendReceiveState(id, "STATE_SEND_RECEIVE_PACKAGE_SENT") {} + : SendReceiveState(id, "STATE_SEND_RECEIVE_PACKAGE_SENT") {} + virtual EventResult enter(Context &context) { + auto &ctx = getContext(context); + if (not ctx.recvConnId.empty()) { + ctx.manager.registerId(ctx, ctx.recvConnId); + } + return EventResult::SUCCESS; + } }; struct StateSendReceiveFinished : public SendReceiveState { @@ -239,13 +266,13 @@ SendReceiveStateEngine::SendReceiveStateEngine() { // connStateMachineConnected addInitialState(STATE_SEND_RECEIVE_INITIAL); - // does nothing, waits for a second connStateMachineConnected call - addState( - STATE_SEND_RECEIVE_WAITING_FOR_SECOND_CONNECTION); + // does nothing, waits for a send connStateMachineConnected call + addState( + STATE_SEND_RECEIVE_WAITING_FOR_SEND_CONNECTION); // calls sendPackage on plugin, waits for // onPackageStatusChanged(status=PACKAGE_SENT) - addState( - STATE_SEND_RECEIVE_CONNECTIONS_OPEN); + addState( + STATE_SEND_RECEIVE_SEND_OPEN); // does nothing, waits for a receiveEncPkg call addState(STATE_SEND_RECEIVE_PACKAGE_SENT); // calls callback with received message, calls state machine finished on @@ -256,9 +283,13 @@ SendReceiveStateEngine::SendReceiveStateEngine() { addFailedState(STATE_SEND_RECEIVE_FAILED); // clang-format off - declareStateTransition(STATE_SEND_RECEIVE_INITIAL, EVENT_CONN_STATE_MACHINE_CONNECTED, STATE_SEND_RECEIVE_WAITING_FOR_SECOND_CONNECTION); - declareStateTransition(STATE_SEND_RECEIVE_WAITING_FOR_SECOND_CONNECTION, EVENT_CONN_STATE_MACHINE_CONNECTED, STATE_SEND_RECEIVE_CONNECTIONS_OPEN); - declareStateTransition(STATE_SEND_RECEIVE_CONNECTIONS_OPEN, EVENT_PACKAGE_SENT, STATE_SEND_RECEIVE_PACKAGE_SENT); + declareStateTransition(STATE_SEND_RECEIVE_INITIAL, EVENT_CONN_STATE_MACHINE_LINK_ESTABLISHED, STATE_SEND_RECEIVE_WAITING_FOR_SEND_CONNECTION); + declareStateTransition(STATE_SEND_RECEIVE_WAITING_FOR_SEND_CONNECTION, EVENT_CONN_STATE_MACHINE_CONNECTED, STATE_SEND_RECEIVE_WAITING_FOR_SEND_CONNECTION); + declareStateTransition(STATE_SEND_RECEIVE_WAITING_FOR_SEND_CONNECTION, EVENT_SATISFIED, STATE_SEND_RECEIVE_SEND_OPEN); + declareStateTransition(STATE_SEND_RECEIVE_SEND_OPEN, + EVENT_PACKAGE_SENT, + STATE_SEND_RECEIVE_PACKAGE_SENT); + declareStateTransition(STATE_SEND_RECEIVE_PACKAGE_SENT, EVENT_CONN_STATE_MACHINE_CONNECTED, STATE_SEND_RECEIVE_PACKAGE_SENT); declareStateTransition(STATE_SEND_RECEIVE_PACKAGE_SENT, EVENT_RECEIVE_PACKAGE, STATE_SEND_RECEIVE_FINISHED); // clang-format on } diff --git a/source/state-machine/SendReceiveStateMachine.h b/source/state-machine/SendReceiveStateMachine.h index 6977189..4322b10 100644 --- a/source/state-machine/SendReceiveStateMachine.h +++ b/source/state-machine/SendReceiveStateMachine.h @@ -23,15 +23,18 @@ namespace Raceboat { class ApiSendReceiveContext : public ApiContext { public: ApiSendReceiveContext(ApiManagerInternal &manager, StateEngine &engine) - : ApiContext(manager, engine) {} + : ApiContext(manager, engine), sendConnSMHandle(0), recvConnSMHandle(0){} virtual void updateSendReceive( const SendOptions &sendOptions, std::vector &&data, std::function)> cb) override; virtual void - updateConnStateMachineConnected(RaceHandle contextHandle, ConnectionID connId, + updateConnStateMachineLinkEstablished(RaceHandle contextHandle, LinkID linkId, std::string linkAddress) override; virtual void + updateConnStateMachineConnected(RaceHandle contextHandle, ConnectionID connId, + std::string linkAddress) override; + virtual void updateReceiveEncPkg(ConnectionID connId, std::shared_ptr> data) override; @@ -57,4 +60,4 @@ class SendReceiveStateEngine : public StateEngine { using SendReceiveState = BaseApiState; -} // namespace Raceboat \ No newline at end of file +} // namespace Raceboat diff --git a/source/state-machine/StateMachine.cpp b/source/state-machine/StateMachine.cpp index 1fe0eea..f3fdec6 100644 --- a/source/state-machine/StateMachine.cpp +++ b/source/state-machine/StateMachine.cpp @@ -199,7 +199,8 @@ bool StateEngine::stateHandlesEvent(StateType stateId, EventType eventId) { success = true; } else { helper::logDebug(logPrefix + " event " + eventToString(eventId) + - " not registered for state " + stateToString(stateId)); + " not registered for state " + stateToString(stateId) + ". Continuing without executing a transition."); + success = true; } } return success; diff --git a/source/state-machine/States.h b/source/state-machine/States.h index 5568c13..7ee897e 100644 --- a/source/state-machine/States.h +++ b/source/state-machine/States.h @@ -46,9 +46,10 @@ enum ApiManagerState : StateType { STATE_RECV_FINISHED, STATE_RECV_FAILED, STATE_SEND_RECEIVE_INITIAL, - STATE_SEND_RECEIVE_WAITING_FOR_SECOND_CONNECTION, - STATE_SEND_RECEIVE_CONNECTIONS_OPEN, + STATE_SEND_RECEIVE_WAITING_FOR_SEND_CONNECTION, + STATE_SEND_RECEIVE_SEND_OPEN, STATE_SEND_RECEIVE_PACKAGE_SENT, + STATE_SEND_RECEIVE_RECEIVE_OPEN, STATE_SEND_RECEIVE_FINISHED, STATE_SEND_RECEIVE_FAILED, STATE_LISTEN_INITIAL, @@ -57,8 +58,9 @@ enum ApiManagerState : StateType { STATE_LISTEN_FINISHED, STATE_LISTEN_FAILED, STATE_DIAL_INITIAL, - STATE_DIAL_WAITING_FOR_SECOND_CONNECTION, - STATE_DIAL_CONNECTIONS_OPEN, + STATE_DIAL_WAITING_FOR_SEND_CONNECTION, + STATE_DIAL_SEND_OPEN, + STATE_DIAL_PACKAGE_SENT, STATE_DIAL_FINISHED, STATE_DIAL_FAILED, STATE_CONNECTION_OBJECT_INITIAL,