Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bring rpc cpon back #525

Merged
merged 4 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion libshvchainpack/include/shv/chainpack/rpcdriver.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@ class SHVCHAINPACK_DECL_EXPORT RpcDriver
virtual void onRpcFrameReceived(RpcFrame &&frame);
virtual void onParseDataException(const shv::chainpack::ParseException &e) = 0;
virtual void onRpcMessageReceived(const shv::chainpack::RpcMessage &msg) = 0;

void setClientProtocolType(Rpc::ProtocolType pt) { m_clientProtocolType = pt; }
private:
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
static int s_defaultRpcTimeoutMsec;
Rpc::ProtocolType m_clientProtocolType = Rpc::ProtocolType::ChainPack;
/// We must remember recent message protocol type to support legacy CPON clients
Rpc::ProtocolType m_clientProtocolType = Rpc::ProtocolType::Invalid;
};

}
2 changes: 1 addition & 1 deletion libshvchainpack/include/shv/chainpack/rpcmessage.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ struct SHVCHAINPACK_DECL_EXPORT RpcFrame
std::string data;

RpcFrame() = default;
RpcFrame(RpcValue::MetaData &&meta, std::string &&data) : meta(std::move(meta)), data(std::move(data)) {}
RpcFrame(Rpc::ProtocolType protocol, RpcValue::MetaData &&meta, std::string &&data) : protocol(protocol), meta(std::move(meta)), data(std::move(data)) {}
bool isValid() const { return !meta.isEmpty() && !data.empty(); }
RpcMessage toRpcMessage(std::string *errmsg = nullptr) const;
std::string toFrameData() const;
Expand Down
4 changes: 3 additions & 1 deletion libshvchainpack/src/rpcdriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ void RpcDriver::onFrameDataRead(const std::string &frame_data)

// set client protocol type according to protocol type received from it
// default protocol type is chainpack, this is needed just for legacy devices support
m_clientProtocolType = frame.protocol;
if (m_clientProtocolType == Rpc::ProtocolType::Invalid) {
m_clientProtocolType = frame.protocol;
}

onRpcFrameReceived(std::move(frame));
}
Expand Down
9 changes: 3 additions & 6 deletions libshvchainpack/src/rpcmessage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,7 @@ RpcFrame RpcFrame::fromFrameData(const std::string &frame_data)
if(pos < 0)
throw ParseException(CCPCP_RC_MALFORMED_INPUT, "Metadata missing", -1, {});
auto data = std::string(frame_data, static_cast<size_t>(pos));
RpcFrame frame(std::move(meta), std::move(data));
frame.protocol = protocol;
RpcFrame frame(protocol, std::move(meta), std::move(data));
return frame;
}
case Rpc::ProtocolType::Cpon: {
Expand All @@ -156,8 +155,7 @@ RpcFrame RpcFrame::fromFrameData(const std::string &frame_data)
if(pos < 0)
throw ParseException(CCPCP_RC_MALFORMED_INPUT, "Metadata missing", -1, {});
auto data = std::string(frame_data, static_cast<size_t>(pos));
RpcFrame frame(std::move(meta), std::move(data));
frame.protocol = protocol;
RpcFrame frame(protocol, std::move(meta), std::move(data));
return frame;
}
default:
Expand Down Expand Up @@ -654,8 +652,7 @@ RpcFrame RpcMessage::toToRpcFrame(Rpc::ProtocolType protocol) const
default:
throw std::runtime_error("Invalid protocol type");
}
RpcFrame frame{std::move(meta), std::move(data)};
frame.protocol = protocol;
RpcFrame frame{protocol, std::move(meta), std::move(data)};
return frame;
}

Expand Down
3 changes: 3 additions & 0 deletions libshviotqt/include/shv/iotqt/rpc/clientconnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ class SHVIOTQT_DECL_EXPORT ClientConnection : public SocketRpcConnection

void setTunnelOptions(const shv::chainpack::RpcValue &opts);

// for CPON clients
void setProtocolType(shv::chainpack::Rpc::ProtocolType protocol_type);

void setCheckBrokerConnectedInterval(int ms);
int checkBrokerConnectedInterval() const;

Expand Down
1 change: 1 addition & 0 deletions libshviotqt/include/shv/iotqt/rpc/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class SHVIOTQT_DECL_EXPORT FrameReader {
int tryToReadMeta(std::istringstream &in);
protected:
std::vector<chainpack::RpcFrame> m_frames;
chainpack::Rpc::ProtocolType m_protocol;
chainpack::RpcValue::MetaData m_meta;
std::optional<size_t> m_dataStart;

Expand Down
6 changes: 6 additions & 0 deletions libshviotqt/src/rpc/clientconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ ClientConnection::ClientConnection(QObject *parent)
, m_loginType(IRpcConnection::LoginType::Sha1)
{
connect(this, &SocketRpcConnection::socketConnectedChanged, this, &ClientConnection::onSocketConnectedChanged);
setProtocolType(cp::Rpc::ProtocolType::ChainPack);
}

ClientConnection::~ClientConnection()
Expand Down Expand Up @@ -201,6 +202,11 @@ void ClientConnection::setTunnelOptions(const chainpack::RpcValue &opts)
setConnectionOptions(conn_opts);
}

void ClientConnection::setProtocolType(chainpack::Rpc::ProtocolType protocol_type)
{
setClientProtocolType(protocol_type);
}

void ClientConnection::open()
{
if(!hasSocket()) {
Expand Down
2 changes: 1 addition & 1 deletion libshviotqt/src/rpc/localsocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ void LocalSocket::onDataReadyRead()
}
}
catch (const std::runtime_error &e) {
shvWarning() << "Corrupted meta data received:\n" << shv::chainpack::utils::hexDump(std::string_view(ba.constData(), std::min(ba.size(), static_cast<decltype(ba.size())>(64))));
shvWarning() << "Corrupted meta data received:" << e.what() << "\n" << shv::chainpack::utils::hexDump(std::string_view(ba.constData(), std::min(ba.size(), static_cast<decltype(ba.size())>(64))));
emit error(QAbstractSocket::SocketError::UnknownSocketError);
}
}
Expand Down
2 changes: 1 addition & 1 deletion libshviotqt/src/rpc/serialportsocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ void SerialFrameReader::finishFrame()
}
shvDebug() << "ADD FRAME:" << chainpack::utils::hexArray(m_readBuffer.data(), m_readBuffer.size());
auto frame_data = std::string(m_readBuffer, m_dataStart.value());
m_frames.emplace_back(std::move(m_meta), std::move(frame_data));
m_frames.emplace_back(m_protocol, std::move(m_meta), std::move(frame_data));
setState(ReadState::WaitingForStx);
}

Expand Down
21 changes: 16 additions & 5 deletions libshviotqt/src/rpc/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <shv/coreqt/log.h>
#include <shv/chainpack/utils.h>
#include <shv/chainpack/chainpackreader.h>
#include <shv/chainpack/cponreader.h>
#include <shv/chainpack/chainpackwriter.h>
#include <shv/chainpack/irpcconnection.h>

Expand Down Expand Up @@ -72,11 +73,21 @@ int FrameReader::tryToReadMeta(std::istringstream &in)
if (!m_dataStart.has_value()) {
using namespace chainpack;
static constexpr auto protocol_chainpack = static_cast<int>(shv::chainpack::Rpc::ProtocolType::ChainPack);
if (auto b = in.get(); b == protocol_chainpack) {
chainpack::ChainPackReader rd(in);
static constexpr auto protocol_cpon = static_cast<int>(shv::chainpack::Rpc::ProtocolType::Cpon);
auto protocol = in.get();
AbstractStreamReader *rd = nullptr;
if (protocol == protocol_cpon) {
m_protocol = shv::chainpack::Rpc::ProtocolType::Cpon;
rd = new chainpack::CponReader(in);
}
else if (protocol == protocol_chainpack) {
m_protocol = shv::chainpack::Rpc::ProtocolType::ChainPack;
rd = new chainpack::ChainPackReader(in);
}
if (rd) {
try {
m_meta = {};
rd.read(m_meta);
rd->read(m_meta);
auto data_start = in.tellg();
if (data_start == -1) {
// meta was read without error, but without closing brackets
Expand Down Expand Up @@ -133,7 +144,7 @@ QList<int> StreamFrameReader::addData(std::string_view data)
}
auto frame_data = std::string(m_readBuffer, m_dataStart.value(), frame_size);
m_readBuffer = std::string(m_readBuffer, consumed_len + frame_size);
m_frames.emplace_back(std::move(m_meta), std::move(frame_data));
m_frames.emplace_back(m_protocol, std::move(m_meta), std::move(frame_data));
m_meta = {};
m_dataStart = {};
}
Expand Down Expand Up @@ -328,7 +339,7 @@ void TcpSocket::onDataReadyRead()
}
}
catch (const std::runtime_error &e) {
shvWarning() << "Corrupted meta data received:\n" << shv::chainpack::utils::hexDump(std::string_view(ba.constData(), std::min(ba.size(), static_cast<decltype(ba.size())>(64))));
shvWarning() << "Corrupted meta data received:" << e.what() << "\n" << shv::chainpack::utils::hexDump(std::string_view(ba.constData(), std::min(ba.size(), static_cast<decltype(ba.size())>(64))));
emit error(QAbstractSocket::SocketError::UnknownSocketError);
}
}
Expand Down
4 changes: 2 additions & 2 deletions libshviotqt/src/rpc/websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ void WebSocket::onTextMessageReceived(const QString &message)
}
}
catch (const std::runtime_error &e) {
shvWarning() << "Corrupted meta data received:\n" << shv::chainpack::utils::hexDump(std::string_view(data.constData(), std::min(data.size(), static_cast<decltype(data.size())>(64))));
shvWarning() << "Corrupted meta data received:" << e.what() << "\n" << shv::chainpack::utils::hexDump(std::string_view(data.constData(), std::min(data.size(), static_cast<decltype(data.size())>(64))));
}
}

Expand All @@ -112,7 +112,7 @@ void WebSocket::onBinaryMessageReceived(const QByteArray &message)
}
}
catch (const std::runtime_error &e) {
shvWarning() << "Corrupted meta data received:\n" << shv::chainpack::utils::hexDump(std::string_view(message.constData(), std::min(message.size(), static_cast<decltype(message.size())>(64))));
shvWarning() << "Corrupted meta data received:" << e.what() << "\n" << shv::chainpack::utils::hexDump(std::string_view(message.constData(), std::min(message.size(), static_cast<decltype(message.size())>(64))));
}
}

Expand Down
Loading