Skip to content

Commit

Permalink
Fix tryToReadMeta(), introduce AbstractStreamReader::readCount()
Browse files Browse the repository at this point in the history
Bytes read count cannot be taken from stream::tellg() because data might
be read ahead from stream to read buffer
  • Loading branch information
fvacek committed Dec 11, 2024
1 parent b798c44 commit 9ff4c60
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 54 deletions.
2 changes: 2 additions & 0 deletions libshvchainpack/include/shv/chainpack/abstractstreamreader.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ class SHVCHAINPACK_DECL_EXPORT AbstractStreamReader

virtual void read(RpcValue::MetaData &meta_data) = 0;
virtual void read(RpcValue &val) = 0;

ssize_t readCount() const;
protected:
// NOLINTNEXTLINE(cppcoreguidelines-avoid-const-or-ref-data-members)
std::istream &m_in;
Expand Down
9 changes: 9 additions & 0 deletions libshvchainpack/src/abstractstreamreader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,13 @@ RpcValue AbstractStreamReader::read(std::string *error)
return ret;
}

ssize_t AbstractStreamReader::readCount() const
{
ssize_t count = m_in.tellg();
if (count >= 0) {
count -= m_inCtx.end - m_inCtx.current;
}
return count;
}

} // namespace shv
5 changes: 2 additions & 3 deletions libshvchainpack/src/chainpackreader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ ChainPackReader::ChainPackReader(std::istream &in)
void ChainPackReader::throwParseException(const std::string &msg)
{
std::array<char, 64> buff;
auto err_pos = m_in.tellg();
auto err_pos = readCount();
auto l = m_in.readsome(buff.data(), buff.size() - 1);
buff[l] = 0;
auto dump = shv::chainpack::utils::hexDump(buff.data(), l);
Expand Down Expand Up @@ -256,8 +256,7 @@ void ChainPackReader::parseIMap(RpcValue &out_val)

void ChainPackReader::read(RpcValue::MetaData &meta_data)
{
auto b = reinterpret_cast<const uint8_t*>(ccpcp_unpack_take_byte(&m_inCtx));
m_inCtx.current--;
auto b = reinterpret_cast<const uint8_t*>(ccpcp_unpack_peek_byte(&m_inCtx));
if(b && *b == CP_MetaMap) {
cchainpack_unpack_next(&m_inCtx);
parseMetaData(meta_data);
Expand Down
2 changes: 1 addition & 1 deletion libshvchainpack/src/cponreader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ CponReader::CponReader(std::istream &in)
void CponReader::throwParseException(const std::string &msg)
{
std::array<char, 64> buff;
auto err_pos = m_in.tellg();
auto err_pos = readCount();
auto l = m_in.readsome(buff.data(), buff.size() - 1);
buff[l] = 0;
std::string msg2 = m_inCtx.err_msg? m_inCtx.err_msg: "";
Expand Down
81 changes: 31 additions & 50 deletions libshvchainpack/src/rpcmessage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,32 +98,23 @@ RpcMessage RpcFrame::toRpcMessage(std::string *errmsg) const

std::string RpcFrame::toFrameHead() const
{
std::ostringstream out;
std::unique_ptr<AbstractStreamWriter> wr;
switch (protocol) {
case Rpc::ProtocolType::ChainPack: {
std::ostringstream out;
{
ChainPackWriter wr(out);
wr << meta;
}
auto ret = out.str();
ret = static_cast<char>(protocol) + ret;
ret += data;
return ret;
}
case Rpc::ProtocolType::Cpon: {
std::ostringstream out;
{
CponWriter wr(out);
wr << meta;
}
auto ret = out.str();
ret = static_cast<char>(protocol) + ret;
return ret;
}
default: {
case Rpc::ProtocolType::ChainPack:
wr = std::make_unique<ChainPackWriter>(out);
break;
case Rpc::ProtocolType::Cpon:
wr = std::make_unique<CponWriter>(out);
break;
default:
throw std::runtime_error("Invalid protocol type");
}
}
wr->write(meta);
wr->flush();
auto ret = out.str();
ret = static_cast<char>(protocol) + ret;
return ret;
}

std::string RpcFrame::toFrameData() const
Expand All @@ -136,38 +127,28 @@ std::string RpcFrame::toFrameData() const
RpcFrame RpcFrame::fromFrameData(const std::string &frame_data)
{
std::istringstream in(frame_data);
std::unique_ptr<AbstractStreamReader> rd;
auto protocol = static_cast<Rpc::ProtocolType>(in.get());
switch (protocol) {
case Rpc::ProtocolType::ChainPack: {
ChainPackReader rd(in);
RpcValue::MetaData meta;
rd.read(meta);
if(meta.isEmpty())
throw ParseException(CCPCP_RC_MALFORMED_INPUT, "Metadata missing", -1, {});
auto pos = in.tellg();
if(pos < 0)
throw ParseException(CCPCP_RC_MALFORMED_INPUT, "Metadata missing", -1, {});
auto data = std::string(frame_data, static_cast<size_t>(pos));
RpcFrame frame(protocol, std::move(meta), std::move(data));
return frame;
}
case Rpc::ProtocolType::Cpon: {
CponReader rd(in);
RpcValue::MetaData meta;
rd.read(meta);
if(meta.isEmpty())
throw ParseException(CCPCP_RC_MALFORMED_INPUT, "Metadata missing", -1, {});
auto pos = in.tellg();
if(pos < 0)
throw ParseException(CCPCP_RC_MALFORMED_INPUT, "Metadata missing", -1, {});
auto data = std::string(frame_data, static_cast<size_t>(pos));
RpcFrame frame(protocol, std::move(meta), std::move(data));
return frame;
}
case Rpc::ProtocolType::ChainPack:
rd = std::make_unique<ChainPackReader>(in);
break;
case Rpc::ProtocolType::Cpon:
rd = std::make_unique<CponReader>(in);
break;
default:
throw std::runtime_error("Invalid protocol type");
}
return {};
RpcValue::MetaData meta;
rd->read(meta);
if(meta.isEmpty())
throw ParseException(CCPCP_RC_MALFORMED_INPUT, "Metadata missing", -1, {});
auto pos = rd->readCount();
if(pos < 0)
throw ParseException(CCPCP_RC_MALFORMED_INPUT, "Metadata missing", -1, {});
auto data = std::string(frame_data, static_cast<size_t>(pos));
RpcFrame frame(protocol, std::move(meta), std::move(data));
return frame;
}

//==================================================================
Expand Down

0 comments on commit 9ff4c60

Please sign in to comment.