From 8f02c2057f323fb9613a596f88becfefb57cc245 Mon Sep 17 00:00:00 2001 From: Fanda Vacek <fvacek@elektroline.cz> Date: Wed, 11 Dec 2024 18:30:39 +0100 Subject: [PATCH 1/4] Fix "message too large" PLC error RpcFrame::toFrameHead() chainpack implementation stored also data to header --- libshviotqt/src/rpc/socket.cpp | 2 +- libshviotqt/tests/test_frame_reader.cpp | 87 +++++++++++++++++++++---- 2 files changed, 75 insertions(+), 14 deletions(-) diff --git a/libshviotqt/src/rpc/socket.cpp b/libshviotqt/src/rpc/socket.cpp index 57d75edb..b1fe9cb3 100644 --- a/libshviotqt/src/rpc/socket.cpp +++ b/libshviotqt/src/rpc/socket.cpp @@ -98,7 +98,7 @@ int FrameReader::tryToReadMeta(std::istringstream &in) try { m_meta = {}; rd->read(m_meta); - auto data_start = in.tellg(); + auto data_start = rd->readCount(); if (data_start == -1) { // meta was read without error, but without closing brackets return 0; diff --git a/libshviotqt/tests/test_frame_reader.cpp b/libshviotqt/tests/test_frame_reader.cpp index c4afe7cf..278dc99a 100644 --- a/libshviotqt/tests/test_frame_reader.cpp +++ b/libshviotqt/tests/test_frame_reader.cpp @@ -1,3 +1,4 @@ +#include "shv/chainpack/utils.h" #include <shv/coreqt/rpc.h> #include <shv/coreqt/log.h> #include <shv/coreqt/utils.h> @@ -29,21 +30,26 @@ doctest::String toString(const QList<int>& lst) return ('[' + sl.join(',') + ']').toLatin1().data(); } -vector<string> msg_to_raw_data(const vector<string> &cpons) +std::string write_stream_frame(const std::string &cpon) { + auto rv = RpcValue::fromCpon(cpon); + auto msg = RpcMessage(rv); + StreamFrameWriter wr; + wr.addFrame(msg.toRpcFrame()); + QByteArray ba; + { + QBuffer buffer(&ba); + buffer.open(QIODevice::WriteOnly); + wr.flushToDevice(&buffer); + } + return std::string(ba.constData(), ba.size()); +} + +vector<string> msg_to_raw_stream_data(const vector<string> &cpons) { vector<string> ret; for (const auto &cpon : cpons) { - auto rv = RpcValue::fromCpon(cpon); - auto msg = RpcMessage(rv); - StreamFrameWriter wr; - wr.addFrame(msg.toRpcFrame()); - QByteArray ba; - { - QBuffer buffer(&ba); - buffer.open(QIODevice::WriteOnly); - wr.flushToDevice(&buffer); - } - ret.emplace_back(ba.constData(), ba.size()); + // shvInfo() << cpon << "-->" << ba; + ret.push_back(write_stream_frame(cpon)); } return ret; } @@ -66,12 +72,25 @@ vector<string> msg_to_raw_data_serial(const vector<string> &cpons, SerialFrameWr return ret; } +bool frame_eq(const RpcFrame &frm1, const RpcFrame &frm2) +{ + return frm1.toFrameData() == frm2.toFrameData(); +} + void test_valid_data(FrameReader *rd, const vector<string> &data) { const auto& rq1 = data[0]; const auto& rs1 = data[1]; const auto& rs2 = data[2]; const auto& sig1 = data[3]; + const auto& dummy = data[4]; + + { + auto ret = rd->addData(dummy); + REQUIRE(ret.isEmpty()); + auto frames = rd->takeFrames(); + REQUIRE(frames.size() == 1); + } { auto ret = rd->addData(rq1); REQUIRE(ret.isEmpty()); @@ -123,11 +142,36 @@ const vector<string> cpons = { R"(<1:1,8:3>i{2:true})", R"(<1:1,8:2>i{2:true})", R"(<1:1,9:"shv",10:"lsmod">i{1:{"cze":true}})", + R"(<>i{})", }; +DOCTEST_TEST_CASE("Stream FrameWriter") +{ + DOCTEST_SUBCASE("Trivial message") + { + auto data = write_stream_frame(R"(<>i{})"); + auto ba = QByteArray::fromHex("03018aff").toStdString(); + REQUIRE(data == ba); + } + DOCTEST_SUBCASE("Trivial message 2") + { + auto data = write_stream_frame(R"(<1:1>i{})"); + auto ba = QByteArray::fromHex("07018b4141ff8aff").toStdString(); + REQUIRE(data == ba); + } + DOCTEST_SUBCASE("Some message") + { + auto data = write_stream_frame(R"(<1:1,8:3,9:"shv",10:"ls">i{1:"cze"})"); + REQUIRE(data[1] == 1); // protocol chainpack + REQUIRE(data[2] == static_cast<char>(0x8b)); // meta map + REQUIRE(data[data.size() - 1] == static_cast<char>(0xff)); + REQUIRE(data[data.size() - 2] == 'e'); + } +} + DOCTEST_TEST_CASE("Stream FrameReader") { - auto data = msg_to_raw_data(cpons); + auto data = msg_to_raw_stream_data(cpons); DOCTEST_SUBCASE("Valid data") { @@ -141,6 +185,23 @@ DOCTEST_TEST_CASE("Stream FrameReader") } } +DOCTEST_TEST_CASE("Stream FrameReader 3 messaqes at once") +{ + auto data1 = QByteArray::fromHex("3f018b414148694986004a86026c734b82c020a54e860273755089860862726f6b657249648603636d6c860773687655736572860566616e6461ff517fff8aff"); + auto data = (data1 + data1 + data1).toStdString(); + + StreamFrameReader rd; + auto ret = rd.addData(data); + REQUIRE(ret.isEmpty()); + for (const auto &frame1 : rd.takeFrames()) { + auto msg = frame1.toRpcMessage(); + auto frame2 = msg.toRpcFrame(); + CAPTURE(QByteArray::fromStdString(frame1.data).toHex()); + CAPTURE(QByteArray::fromStdString(frame2.data).toHex()); + REQUIRE(frame_eq(frame1, frame2)); + } +} + DOCTEST_TEST_CASE("Serial FrameReader with CRC check") { auto crc_check_wr = SerialFrameWriter::CrcCheck::Yes; From b798c448296d851d2220606bc396a73a13f63334 Mon Sep 17 00:00:00 2001 From: Fanda Vacek <fvacek@elektroline.cz> Date: Wed, 11 Dec 2024 18:58:38 +0100 Subject: [PATCH 2/4] Take not existing meta returns default value --- libshvchainpack/src/rpcvalue.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/libshvchainpack/src/rpcvalue.cpp b/libshvchainpack/src/rpcvalue.cpp index d74318d7..2f52ab8c 100644 --- a/libshvchainpack/src/rpcvalue.cpp +++ b/libshvchainpack/src/rpcvalue.cpp @@ -714,7 +714,10 @@ void RpcValue::append(const RpcValue &val) RpcMetaData RpcValue::takeMeta() { - return *std::exchange(m_meta, nullptr); + if (m_meta) { + return *std::exchange(m_meta, nullptr); + } + return RpcMetaData(); } std::string RpcValue::toPrettyString(const std::string &indent) const From 9ff4c608e44d8c7b0b3e83fe4a56b98973c50061 Mon Sep 17 00:00:00 2001 From: Fanda Vacek <fvacek@elektroline.cz> Date: Wed, 11 Dec 2024 19:02:30 +0100 Subject: [PATCH 3/4] Fix tryToReadMeta(), introduce AbstractStreamReader::readCount() Bytes read count cannot be taken from stream::tellg() because data might be read ahead from stream to read buffer --- .../shv/chainpack/abstractstreamreader.h | 2 + libshvchainpack/src/abstractstreamreader.cpp | 9 +++ libshvchainpack/src/chainpackreader.cpp | 5 +- libshvchainpack/src/cponreader.cpp | 2 +- libshvchainpack/src/rpcmessage.cpp | 81 +++++++------------ 5 files changed, 45 insertions(+), 54 deletions(-) diff --git a/libshvchainpack/include/shv/chainpack/abstractstreamreader.h b/libshvchainpack/include/shv/chainpack/abstractstreamreader.h index 8f90ca3e..7b617c63 100644 --- a/libshvchainpack/include/shv/chainpack/abstractstreamreader.h +++ b/libshvchainpack/include/shv/chainpack/abstractstreamreader.h @@ -32,6 +32,8 @@ class SHVCHAINPACK_DECL_EXPORT AbstractStreamReader virtual void read(RpcValue::MetaData &meta_data) = 0; virtual void read(RpcValue &val) = 0; + + ssize_t readCount() const; protected: // NOLINTNEXTLINE(cppcoreguidelines-avoid-const-or-ref-data-members) std::istream &m_in; diff --git a/libshvchainpack/src/abstractstreamreader.cpp b/libshvchainpack/src/abstractstreamreader.cpp index 2d380ac6..ce675fb3 100644 --- a/libshvchainpack/src/abstractstreamreader.cpp +++ b/libshvchainpack/src/abstractstreamreader.cpp @@ -70,4 +70,13 @@ RpcValue AbstractStreamReader::read(std::string *error) return ret; } +ssize_t AbstractStreamReader::readCount() const +{ + ssize_t count = m_in.tellg(); + if (count >= 0) { + count -= m_inCtx.end - m_inCtx.current; + } + return count; +} + } // namespace shv diff --git a/libshvchainpack/src/chainpackreader.cpp b/libshvchainpack/src/chainpackreader.cpp index 49c048f9..2ce2f29f 100644 --- a/libshvchainpack/src/chainpackreader.cpp +++ b/libshvchainpack/src/chainpackreader.cpp @@ -15,7 +15,7 @@ ChainPackReader::ChainPackReader(std::istream &in) void ChainPackReader::throwParseException(const std::string &msg) { std::array<char, 64> buff; - auto err_pos = m_in.tellg(); + auto err_pos = readCount(); auto l = m_in.readsome(buff.data(), buff.size() - 1); buff[l] = 0; auto dump = shv::chainpack::utils::hexDump(buff.data(), l); @@ -256,8 +256,7 @@ void ChainPackReader::parseIMap(RpcValue &out_val) void ChainPackReader::read(RpcValue::MetaData &meta_data) { - auto b = reinterpret_cast<const uint8_t*>(ccpcp_unpack_take_byte(&m_inCtx)); - m_inCtx.current--; + auto b = reinterpret_cast<const uint8_t*>(ccpcp_unpack_peek_byte(&m_inCtx)); if(b && *b == CP_MetaMap) { cchainpack_unpack_next(&m_inCtx); parseMetaData(meta_data); diff --git a/libshvchainpack/src/cponreader.cpp b/libshvchainpack/src/cponreader.cpp index 9c9f491c..fca263b6 100644 --- a/libshvchainpack/src/cponreader.cpp +++ b/libshvchainpack/src/cponreader.cpp @@ -16,7 +16,7 @@ CponReader::CponReader(std::istream &in) void CponReader::throwParseException(const std::string &msg) { std::array<char, 64> buff; - auto err_pos = m_in.tellg(); + auto err_pos = readCount(); auto l = m_in.readsome(buff.data(), buff.size() - 1); buff[l] = 0; std::string msg2 = m_inCtx.err_msg? m_inCtx.err_msg: ""; diff --git a/libshvchainpack/src/rpcmessage.cpp b/libshvchainpack/src/rpcmessage.cpp index 5ea656c3..cbd67eae 100644 --- a/libshvchainpack/src/rpcmessage.cpp +++ b/libshvchainpack/src/rpcmessage.cpp @@ -98,32 +98,23 @@ RpcMessage RpcFrame::toRpcMessage(std::string *errmsg) const std::string RpcFrame::toFrameHead() const { + std::ostringstream out; + std::unique_ptr<AbstractStreamWriter> wr; switch (protocol) { - case Rpc::ProtocolType::ChainPack: { - std::ostringstream out; - { - ChainPackWriter wr(out); - wr << meta; - } - auto ret = out.str(); - ret = static_cast<char>(protocol) + ret; - ret += data; - return ret; - } - case Rpc::ProtocolType::Cpon: { - std::ostringstream out; - { - CponWriter wr(out); - wr << meta; - } - auto ret = out.str(); - ret = static_cast<char>(protocol) + ret; - return ret; - } - default: { + case Rpc::ProtocolType::ChainPack: + wr = std::make_unique<ChainPackWriter>(out); + break; + case Rpc::ProtocolType::Cpon: + wr = std::make_unique<CponWriter>(out); + break; + default: throw std::runtime_error("Invalid protocol type"); } - } + wr->write(meta); + wr->flush(); + auto ret = out.str(); + ret = static_cast<char>(protocol) + ret; + return ret; } std::string RpcFrame::toFrameData() const @@ -136,38 +127,28 @@ std::string RpcFrame::toFrameData() const RpcFrame RpcFrame::fromFrameData(const std::string &frame_data) { std::istringstream in(frame_data); + std::unique_ptr<AbstractStreamReader> rd; auto protocol = static_cast<Rpc::ProtocolType>(in.get()); switch (protocol) { - case Rpc::ProtocolType::ChainPack: { - ChainPackReader rd(in); - RpcValue::MetaData meta; - rd.read(meta); - if(meta.isEmpty()) - throw ParseException(CCPCP_RC_MALFORMED_INPUT, "Metadata missing", -1, {}); - auto pos = in.tellg(); - if(pos < 0) - throw ParseException(CCPCP_RC_MALFORMED_INPUT, "Metadata missing", -1, {}); - auto data = std::string(frame_data, static_cast<size_t>(pos)); - RpcFrame frame(protocol, std::move(meta), std::move(data)); - return frame; - } - case Rpc::ProtocolType::Cpon: { - CponReader rd(in); - RpcValue::MetaData meta; - rd.read(meta); - if(meta.isEmpty()) - throw ParseException(CCPCP_RC_MALFORMED_INPUT, "Metadata missing", -1, {}); - auto pos = in.tellg(); - if(pos < 0) - throw ParseException(CCPCP_RC_MALFORMED_INPUT, "Metadata missing", -1, {}); - auto data = std::string(frame_data, static_cast<size_t>(pos)); - RpcFrame frame(protocol, std::move(meta), std::move(data)); - return frame; - } + case Rpc::ProtocolType::ChainPack: + rd = std::make_unique<ChainPackReader>(in); + break; + case Rpc::ProtocolType::Cpon: + rd = std::make_unique<CponReader>(in); + break; default: throw std::runtime_error("Invalid protocol type"); } - return {}; + RpcValue::MetaData meta; + rd->read(meta); + if(meta.isEmpty()) + throw ParseException(CCPCP_RC_MALFORMED_INPUT, "Metadata missing", -1, {}); + auto pos = rd->readCount(); + if(pos < 0) + throw ParseException(CCPCP_RC_MALFORMED_INPUT, "Metadata missing", -1, {}); + auto data = std::string(frame_data, static_cast<size_t>(pos)); + RpcFrame frame(protocol, std::move(meta), std::move(data)); + return frame; } //================================================================== From 68780d1d203427c6737d16336bd4256fa803e245 Mon Sep 17 00:00:00 2001 From: Fanda Vacek <fanda.vacek@gmail.com> Date: Thu, 12 Dec 2024 10:08:48 +0100 Subject: [PATCH 4/4] Add auto where type is not explicitly needed MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Václav Kubernát <vkubernat@elektroline.cz> --- libshvchainpack/src/abstractstreamreader.cpp | 2 +- libshviotqt/tests/test_frame_reader.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/libshvchainpack/src/abstractstreamreader.cpp b/libshvchainpack/src/abstractstreamreader.cpp index ce675fb3..55e9417b 100644 --- a/libshvchainpack/src/abstractstreamreader.cpp +++ b/libshvchainpack/src/abstractstreamreader.cpp @@ -72,7 +72,7 @@ RpcValue AbstractStreamReader::read(std::string *error) ssize_t AbstractStreamReader::readCount() const { - ssize_t count = m_in.tellg(); + auto count = m_in.tellg(); if (count >= 0) { count -= m_inCtx.end - m_inCtx.current; } diff --git a/libshviotqt/tests/test_frame_reader.cpp b/libshviotqt/tests/test_frame_reader.cpp index 278dc99a..2a9b24d8 100644 --- a/libshviotqt/tests/test_frame_reader.cpp +++ b/libshviotqt/tests/test_frame_reader.cpp @@ -1,4 +1,4 @@ -#include "shv/chainpack/utils.h" +#include <shv/chainpack/utils.h> #include <shv/coreqt/rpc.h> #include <shv/coreqt/log.h> #include <shv/coreqt/utils.h>