diff --git a/olp-cpp-sdk-dataservice-read/src/repositories/AsyncJsonStream.cpp b/olp-cpp-sdk-dataservice-read/src/repositories/AsyncJsonStream.cpp index 8d9887292..2d9993317 100644 --- a/olp-cpp-sdk-dataservice-read/src/repositories/AsyncJsonStream.cpp +++ b/olp-cpp-sdk-dataservice-read/src/repositories/AsyncJsonStream.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2023 HERE Europe B.V. + * Copyright (C) 2023-2025 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,26 +19,27 @@ #include "AsyncJsonStream.h" -#include - namespace olp { namespace dataservice { namespace read { namespace repository { -RapidJsonByteStream::Ch RapidJsonByteStream::Peek() const { - std::unique_lock lock(mutex_); - cv_.wait(lock, [=]() { return !Empty(); }); - return buffer_[count_]; +RapidJsonByteStream::Ch RapidJsonByteStream::Peek() { + if (ReadEmpty()) { + SwapBuffers(); + } + return read_buffer_[count_]; } RapidJsonByteStream::Ch RapidJsonByteStream::Take() { - std::unique_lock lock(mutex_); - cv_.wait(lock, [=]() { return !Empty(); }); - return buffer_[count_++]; + if (ReadEmpty()) { + SwapBuffers(); + } + full_count_++; + return read_buffer_[count_++]; } -size_t RapidJsonByteStream::Tell() const { return count_; } +size_t RapidJsonByteStream::Tell() const { return full_count_; } // Not implemented char* RapidJsonByteStream::PutBegin() { return 0; } @@ -46,24 +47,29 @@ void RapidJsonByteStream::Put(char) {} void RapidJsonByteStream::Flush() {} size_t RapidJsonByteStream::PutEnd(char*) { return 0; } -bool RapidJsonByteStream::Empty() const { return count_ == buffer_.size(); } +bool RapidJsonByteStream::ReadEmpty() const { + return count_ == read_buffer_.size(); +} +bool RapidJsonByteStream::WriteEmpty() const { return write_buffer_.empty(); } void RapidJsonByteStream::AppendContent(const char* content, size_t length) { std::unique_lock lock(mutex_); - if (Empty()) { - buffer_.resize(length); - std::memcpy(buffer_.data(), content, length); - count_ = 0; - } else { - const auto buffer_size = buffer_.size(); - buffer_.resize(buffer_size + length); - std::memcpy(buffer_.data() + buffer_size, content, length); - } + const auto buffer_size = write_buffer_.size(); + write_buffer_.reserve(buffer_size + length); + write_buffer_.insert(write_buffer_.end(), content, content + length); cv_.notify_one(); } +void RapidJsonByteStream::SwapBuffers() { + std::unique_lock lock(mutex_); + cv_.wait(lock, [&]() { return !WriteEmpty(); }); + std::swap(read_buffer_, write_buffer_); + write_buffer_.clear(); + count_ = 0; +} + AsyncJsonStream::AsyncJsonStream() : current_stream_(std::make_shared()), closed_{false} {} @@ -97,7 +103,7 @@ void AsyncJsonStream::CloseStream(boost::optional error) { return; } current_stream_->AppendContent("\0", 1); - error_ = error; + error_ = std::move(error); closed_ = true; } diff --git a/olp-cpp-sdk-dataservice-read/src/repositories/AsyncJsonStream.h b/olp-cpp-sdk-dataservice-read/src/repositories/AsyncJsonStream.h index 01c71aaa2..863d1e78e 100644 --- a/olp-cpp-sdk-dataservice-read/src/repositories/AsyncJsonStream.h +++ b/olp-cpp-sdk-dataservice-read/src/repositories/AsyncJsonStream.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2023 HERE Europe B.V. + * Copyright (C) 2023-2025 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,13 +32,13 @@ namespace dataservice { namespace read { namespace repository { -// Json byte stream class. Implements rapidjson input stream concept. +/// Json byte stream class. Implements rapidjson input stream concept. class RapidJsonByteStream { public: typedef char Ch; /// Read the current character from stream without moving the read cursor. - Ch Peek() const; + Ch Peek(); /// Read the current character from stream and moving the read cursor to next /// character. @@ -47,19 +47,24 @@ class RapidJsonByteStream { /// Get the current read cursor. size_t Tell() const; - // Not needed for reading. + /// Not needed for reading. char* PutBegin(); void Put(char); void Flush(); size_t PutEnd(char*); - bool Empty() const; + bool ReadEmpty() const; + bool WriteEmpty() const; void AppendContent(const char* content, size_t length); private: + void SwapBuffers(); + mutable std::mutex mutex_; - std::vector buffer_; // Current buffer + std::vector read_buffer_; // Current buffer + std::vector write_buffer_; // Current buffer + size_t full_count_{0}; // Bytes read from the buffer size_t count_{0}; // Bytes read from the buffer mutable std::condition_variable cv_; // Condition for next portion of content }; diff --git a/olp-cpp-sdk-dataservice-read/tests/AsyncJsonStreamTest.cpp b/olp-cpp-sdk-dataservice-read/tests/AsyncJsonStreamTest.cpp index 23766b051..45ad09db0 100644 --- a/olp-cpp-sdk-dataservice-read/tests/AsyncJsonStreamTest.cpp +++ b/olp-cpp-sdk-dataservice-read/tests/AsyncJsonStreamTest.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2023 HERE Europe B.V. + * Copyright (C) 2023-2025 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,16 +47,18 @@ TEST(AsyncJsonStreamTest, NormalFlow) { EXPECT_EQ(current_stream->Peek(), '\0'); EXPECT_EQ(current_stream->Take(), '\0'); - EXPECT_TRUE(current_stream->Empty()); + EXPECT_TRUE(current_stream->ReadEmpty()); EXPECT_EQ(new_current_stream->Peek(), '2'); EXPECT_EQ(new_current_stream->Take(), '2'); EXPECT_EQ(new_current_stream->Take(), '3'); EXPECT_EQ(new_current_stream->Take(), '4'); - EXPECT_TRUE(new_current_stream->Empty()); + EXPECT_TRUE(new_current_stream->ReadEmpty()); stream.AppendContent("5", 1); - EXPECT_FALSE(new_current_stream->Empty()); + // Read buffer is empty here because swap is on Take/Peek + EXPECT_FALSE(new_current_stream->WriteEmpty()); + EXPECT_TRUE(new_current_stream->ReadEmpty()); stream.CloseStream(olp::client::ApiError::Cancelled()); @@ -73,11 +75,11 @@ TEST(AsyncJsonStreamTest, NormalFlow) { EXPECT_TRUE(stream.GetError()->GetErrorCode() == olp::client::ErrorCode::Cancelled); - EXPECT_TRUE(new_current_stream->Empty()); + EXPECT_TRUE(new_current_stream->ReadEmpty()); stream.AppendContent("17", 2); - EXPECT_TRUE(new_current_stream->Empty()); + EXPECT_TRUE(new_current_stream->ReadEmpty()); stream.ResetStream("4", 1); - EXPECT_TRUE(new_current_stream->Empty()); + EXPECT_TRUE(new_current_stream->ReadEmpty()); } } // namespace diff --git a/olp-cpp-sdk-dataservice-read/tests/PartitionsRepositoryTest.cpp b/olp-cpp-sdk-dataservice-read/tests/PartitionsRepositoryTest.cpp index 5dfcd69be..c1d170c70 100644 --- a/olp-cpp-sdk-dataservice-read/tests/PartitionsRepositoryTest.cpp +++ b/olp-cpp-sdk-dataservice-read/tests/PartitionsRepositoryTest.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2019-2024 HERE Europe B.V. + * Copyright (C) 2019-2025 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -2029,7 +2029,9 @@ TEST_F(PartitionsRepositoryTest, StreamPartitions) { [&](const repository::AsyncJsonStream& async_stream) -> std::string { std::string result; auto stream = async_stream.GetCurrentStream(); - while (!stream->Empty()) { + // Enforce buffers swap + OLP_SDK_CORE_UNUSED(stream->Peek()); + while (!stream->ReadEmpty()) { result += stream->Take(); } return result;