Skip to content

Commit

Permalink
Changed Dial state machine to support synchronous connection semantics
Browse files Browse the repository at this point in the history
  • Loading branch information
plvines committed Jan 10, 2025
1 parent bc1be3e commit 19000d5
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 57 deletions.
147 changes: 93 additions & 54 deletions source/state-machine/DialStateMachine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ void ApiDialContext::updateDial(const SendOptions &sendOptions,
this->dialCallback = cb;
}

void ApiDialContext::updateConnStateMachineLinkEstablished(
RaceHandle contextHandle, LinkID /*linkId*/, std::string linkAddress) {
if (this->recvConnSMHandle == contextHandle) {
this->recvLinkAddress = linkAddress;
}
};
void ApiDialContext::updateConnStateMachineConnected(RaceHandle contextHandle,
ConnectionID connId,
std::string linkAddress) {
Expand All @@ -58,48 +64,19 @@ struct StateDialInitial : public DialState {
TRACE_METHOD();
auto &ctx = getContext(context);

ChannelId sendChannelId = ctx.opts.send_channel;
// ChannelId sendChannelId = ctx.opts.send_channel;
ChannelId recvChannelId = ctx.opts.recv_channel;
std::string sendRole = ctx.opts.send_role;
std::string recvRole = ctx.opts.recv_role;
std::string sendLinkAddress = ctx.opts.send_address;
if (sendChannelId.empty()) {
helper::logError(logPrefix +
"Invalid send channel id passed to sendReceive");
ctx.dialCallback(ApiStatus::CHANNEL_INVALID, {}, {});
ctx.dialCallback = {};
return EventResult::NOT_SUPPORTED;
} else if (recvChannelId.empty()) {
if (recvChannelId.empty()) {
helper::logError(logPrefix + "Invalid recv channel id passed to recv");
ctx.dialCallback(ApiStatus::CHANNEL_INVALID, {}, {});
ctx.dialCallback = {};
return EventResult::NOT_SUPPORTED;
} else if (sendRole.empty()) {
helper::logError(logPrefix + "Invalid send role passed to sendReceive");
ctx.dialCallback(ApiStatus::INVALID_ARGUMENT, {}, {});
ctx.dialCallback = {};
return EventResult::NOT_SUPPORTED;
} else if (recvRole.empty()) {
helper::logError(logPrefix + "Invalid recv role passed to sendReceive");
ctx.dialCallback(ApiStatus::INVALID_ARGUMENT, {}, {});
ctx.dialCallback = {};
return EventResult::NOT_SUPPORTED;
} else if (sendLinkAddress.empty()) {
helper::logError(logPrefix +
"Invalid send address passed to sendReceive");
ctx.dialCallback(ApiStatus::INVALID_ARGUMENT, {}, {});
ctx.dialCallback = {};
return EventResult::NOT_SUPPORTED;
}

PluginContainer *sendContainer =
ctx.manager.getCore().getChannel(sendChannelId);
if (sendContainer == nullptr) {
helper::logError(logPrefix + "Failed to get channel with id " +
sendChannelId);
ctx.dialCallback(ApiStatus::CHANNEL_INVALID, {}, {});
ctx.dialCallback = {};
return EventResult::NOT_SUPPORTED;
}

PluginContainer *recvContainer =
Expand All @@ -116,14 +93,6 @@ struct StateDialInitial : public DialState {
ctx.manager.getCore().getEntropy(packageIdLen);
ctx.packageId = std::string(packageIdBytes.begin(), packageIdBytes.end());

ctx.sendConnSMHandle = ctx.manager.startConnStateMachine(
ctx.handle, sendChannelId, sendRole, sendLinkAddress, false, true);

if (ctx.sendConnSMHandle == NULL_RACE_HANDLE) {
helper::logError(logPrefix + " starting connection state machine failed");
return EventResult::NOT_SUPPORTED;
}

ctx.recvConnSMHandle = ctx.manager.startConnStateMachine(
ctx.handle, recvChannelId, recvRole, "", true, false);

Expand All @@ -132,22 +101,75 @@ struct StateDialInitial : public DialState {
return EventResult::NOT_SUPPORTED;
}

ctx.manager.registerHandle(ctx, ctx.sendConnSMHandle);
ctx.manager.registerHandle(ctx, ctx.recvConnSMHandle);

return EventResult::SUCCESS;
}
};

struct StateDialWaitingForSecondConnection : public DialState {
explicit StateDialWaitingForSecondConnection(
StateType id = STATE_DIAL_WAITING_FOR_SECOND_CONNECTION)
: DialState(id, "StateDialWaitingForSecondConnection") {}
struct StateDialWaitingForSendConnection : public DialState {
explicit StateDialWaitingForSendConnection(
StateType id = STATE_DIAL_WAITING_FOR_SEND_CONNECTION)
: DialState(id, "StateDialWaitingForSendConnection") {}
virtual EventResult enter(Context &context) {
TRACE_METHOD();
auto &ctx = getContext(context);

// Send connection exists, progress state
if (not ctx.sendConnId.empty()) {
helper::logDebug(logPrefix +
"emitting SATISFIED to move to next state");
ctx.pendingEvents.push(EVENT_SATISFIED);
return EventResult::SUCCESS;
}
helper::logDebug(logPrefix +
"recv link established, triggering connecting for send");

ChannelId sendChannelId = ctx.opts.send_channel;
std::string sendRole = ctx.opts.send_role;
std::string sendLinkAddress = ctx.opts.send_address;
if (sendChannelId.empty()) {
helper::logError(logPrefix +
"Invalid send channel id passed to sendReceive");
ctx.dialCallback(ApiStatus::CHANNEL_INVALID, {}, {});
ctx.dialCallback = {};
return EventResult::NOT_SUPPORTED;
} else if (sendRole.empty()) {
helper::logError(logPrefix + "Invalid send role passed to sendReceive");
ctx.dialCallback(ApiStatus::INVALID_ARGUMENT, {}, {});
ctx.dialCallback = {};
return EventResult::NOT_SUPPORTED;
} else if (sendLinkAddress.empty()) {
helper::logError(logPrefix +
"Invalid send address passed to sendReceive");
ctx.dialCallback(ApiStatus::INVALID_ARGUMENT, {}, {});
ctx.dialCallback = {};
return EventResult::NOT_SUPPORTED;
}
PluginContainer *sendContainer =
ctx.manager.getCore().getChannel(sendChannelId);
if (sendContainer == nullptr) {
helper::logError(logPrefix + "Failed to get channel with id " +
sendChannelId);
ctx.dialCallback(ApiStatus::INVALID_ARGUMENT, {}, {});
ctx.dialCallback = {};
return EventResult::NOT_SUPPORTED;
}

ctx.sendConnSMHandle = ctx.manager.startConnStateMachine(
ctx.handle, sendChannelId, sendRole, sendLinkAddress, false, true);
if (ctx.sendConnSMHandle == NULL_RACE_HANDLE) {
helper::logError(logPrefix + " starting connection state machine failed");
return EventResult::NOT_SUPPORTED;
}
ctx.manager.registerHandle(ctx, ctx.sendConnSMHandle);
return EventResult::SUCCESS;
}
};

struct StateDialConnectionsOpen : public DialState {
explicit StateDialConnectionsOpen(StateType id = STATE_DIAL_CONNECTIONS_OPEN)
: DialState(id, "StateDialConnectionsOpen") {}
struct StateDialSendOpen : public DialState {
explicit StateDialSendOpen(StateType id = STATE_DIAL_SEND_OPEN)
: DialState(id, "StateDialSendOpen") {}
virtual EventResult enter(Context &context) {
TRACE_METHOD();
auto &ctx = getContext(context);
Expand All @@ -156,7 +178,6 @@ struct StateDialConnectionsOpen : public DialState {
RaceHandle connectionHandle = ctx.manager.getCore().generateHandle();

ctx.manager.registerHandle(ctx, connectionHandle);
// ctx.manager.registerId(ctx, ctx.recvConnId);

// TODO: There's better ways to encode than base64 inside json
nlohmann::json json = {
Expand Down Expand Up @@ -187,6 +208,20 @@ struct StateDialConnectionsOpen : public DialState {
}
};

struct StateDialPackageSent : public DialState {
explicit StateDialPackageSent(
StateType id = STATE_DIAL_PACKAGE_SENT)
: DialState(id, "STATE_DIAL_PACKAGE_SENT") {}
virtual EventResult enter(Context &context) {
auto &ctx = getContext(context);
if (not ctx.recvConnId.empty()) {
ctx.manager.registerId(ctx, ctx.recvConnId);
ctx.pendingEvents.push(EVENT_SATISFIED);
}
return EventResult::SUCCESS;
}
};

struct StateDialFinished : public DialState {
explicit StateDialFinished(StateType id = STATE_DIAL_FINISHED)
: DialState(id, "StateDialFinished") {}
Expand Down Expand Up @@ -250,11 +285,12 @@ DialStateEngine::DialStateEngine() {
// connStateMachineConnected
addInitialState<StateDialInitial>(STATE_DIAL_INITIAL);
// does nothing, waits for a second connStateMachineConnected call
addState<StateDialWaitingForSecondConnection>(
STATE_DIAL_WAITING_FOR_SECOND_CONNECTION);
addState<StateDialWaitingForSendConnection>(
STATE_DIAL_WAITING_FOR_SEND_CONNECTION);
// sends initial package with recvLinkAddress and message if any, waits for
// package sent
addState<StateDialConnectionsOpen>(STATE_DIAL_CONNECTIONS_OPEN);
addState<StateDialSendOpen>(STATE_DIAL_SEND_OPEN);
addState<StateDialPackageSent>(STATE_DIAL_PACKAGE_SENT);
// creates connection object, calls dial callback, calls state machine
// finished on manager, final state
addState<StateDialFinished>(STATE_DIAL_FINISHED);
Expand All @@ -263,9 +299,12 @@ DialStateEngine::DialStateEngine() {
addFailedState<StateDialFailed>(STATE_DIAL_FAILED);

// clang-format off
declareStateTransition(STATE_DIAL_INITIAL, EVENT_CONN_STATE_MACHINE_CONNECTED, STATE_DIAL_WAITING_FOR_SECOND_CONNECTION);
declareStateTransition(STATE_DIAL_WAITING_FOR_SECOND_CONNECTION, EVENT_CONN_STATE_MACHINE_CONNECTED, STATE_DIAL_CONNECTIONS_OPEN);
declareStateTransition(STATE_DIAL_CONNECTIONS_OPEN, EVENT_PACKAGE_SENT, STATE_DIAL_FINISHED);
declareStateTransition(STATE_DIAL_INITIAL, EVENT_CONN_STATE_MACHINE_LINK_ESTABLISHED, STATE_DIAL_WAITING_FOR_SEND_CONNECTION);
declareStateTransition(STATE_DIAL_WAITING_FOR_SEND_CONNECTION, EVENT_CONN_STATE_MACHINE_CONNECTED, STATE_DIAL_WAITING_FOR_SEND_CONNECTION);
declareStateTransition(STATE_DIAL_WAITING_FOR_SEND_CONNECTION, EVENT_SATISFIED, STATE_DIAL_SEND_OPEN);
declareStateTransition(STATE_DIAL_SEND_OPEN, EVENT_PACKAGE_SENT, STATE_DIAL_PACKAGE_SENT);
declareStateTransition(STATE_DIAL_PACKAGE_SENT, EVENT_CONN_STATE_MACHINE_CONNECTED, STATE_DIAL_PACKAGE_SENT);
declareStateTransition(STATE_DIAL_PACKAGE_SENT, EVENT_SATISFIED, STATE_DIAL_FINISHED);
// clang-format on
}

Expand Down
4 changes: 3 additions & 1 deletion source/state-machine/DialStateMachine.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ class ApiDialContext : public ApiContext {
virtual void
updateDial(const SendOptions &sendOptions, std::vector<uint8_t> &&data,
std::function<void(ApiStatus, RaceHandle, ConduitProperties)> cb) override;

virtual void
updateConnStateMachineLinkEstablished(RaceHandle contextHandle, LinkID linkId,
std::string linkAddress) override;
virtual void
updateConnStateMachineConnected(RaceHandle contextHandle, ConnectionID connId,
std::string linkAddress) override;
Expand Down
5 changes: 3 additions & 2 deletions source/state-machine/States.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ enum ApiManagerState : StateType {
STATE_LISTEN_FINISHED,
STATE_LISTEN_FAILED,
STATE_DIAL_INITIAL,
STATE_DIAL_WAITING_FOR_SECOND_CONNECTION,
STATE_DIAL_CONNECTIONS_OPEN,
STATE_DIAL_WAITING_FOR_SEND_CONNECTION,
STATE_DIAL_SEND_OPEN,
STATE_DIAL_PACKAGE_SENT,
STATE_DIAL_FINISHED,
STATE_DIAL_FAILED,
STATE_CONNECTION_OBJECT_INITIAL,
Expand Down

0 comments on commit 19000d5

Please sign in to comment.