Skip to content

Commit

Permalink
Double-buffering for JSON async stream (#1571)
Browse files Browse the repository at this point in the history
Decreases time needed to process single JSON, and
reduces memory footprint size.

Relates-To: DATASDK-57

Signed-off-by: Andrey Kashcheev <[email protected]>
  • Loading branch information
andrey-kashcheev authored Jan 6, 2025
1 parent 60d9454 commit bfd15ff
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 37 deletions.
50 changes: 28 additions & 22 deletions olp-cpp-sdk-dataservice-read/src/repositories/AsyncJsonStream.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2023 HERE Europe B.V.
* Copyright (C) 2023-2025 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,51 +19,57 @@

#include "AsyncJsonStream.h"

#include <cstring>

namespace olp {
namespace dataservice {
namespace read {
namespace repository {

RapidJsonByteStream::Ch RapidJsonByteStream::Peek() const {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [=]() { return !Empty(); });
return buffer_[count_];
RapidJsonByteStream::Ch RapidJsonByteStream::Peek() {
if (ReadEmpty()) {
SwapBuffers();
}
return read_buffer_[count_];
}

RapidJsonByteStream::Ch RapidJsonByteStream::Take() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [=]() { return !Empty(); });
return buffer_[count_++];
if (ReadEmpty()) {
SwapBuffers();
}
full_count_++;
return read_buffer_[count_++];
}

size_t RapidJsonByteStream::Tell() const { return count_; }
size_t RapidJsonByteStream::Tell() const { return full_count_; }

// Not implemented
char* RapidJsonByteStream::PutBegin() { return 0; }
void RapidJsonByteStream::Put(char) {}
void RapidJsonByteStream::Flush() {}
size_t RapidJsonByteStream::PutEnd(char*) { return 0; }

bool RapidJsonByteStream::Empty() const { return count_ == buffer_.size(); }
bool RapidJsonByteStream::ReadEmpty() const {
return count_ == read_buffer_.size();
}
bool RapidJsonByteStream::WriteEmpty() const { return write_buffer_.empty(); }

void RapidJsonByteStream::AppendContent(const char* content, size_t length) {
std::unique_lock<std::mutex> lock(mutex_);

if (Empty()) {
buffer_.resize(length);
std::memcpy(buffer_.data(), content, length);
count_ = 0;
} else {
const auto buffer_size = buffer_.size();
buffer_.resize(buffer_size + length);
std::memcpy(buffer_.data() + buffer_size, content, length);
}
const auto buffer_size = write_buffer_.size();
write_buffer_.reserve(buffer_size + length);
write_buffer_.insert(write_buffer_.end(), content, content + length);

cv_.notify_one();
}

void RapidJsonByteStream::SwapBuffers() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [&]() { return !WriteEmpty(); });
std::swap(read_buffer_, write_buffer_);
write_buffer_.clear();
count_ = 0;
}

AsyncJsonStream::AsyncJsonStream()
: current_stream_(std::make_shared<RapidJsonByteStream>()),
closed_{false} {}
Expand Down Expand Up @@ -97,7 +103,7 @@ void AsyncJsonStream::CloseStream(boost::optional<client::ApiError> error) {
return;
}
current_stream_->AppendContent("\0", 1);
error_ = error;
error_ = std::move(error);
closed_ = true;
}

Expand Down
17 changes: 11 additions & 6 deletions olp-cpp-sdk-dataservice-read/src/repositories/AsyncJsonStream.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2023 HERE Europe B.V.
* Copyright (C) 2023-2025 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,13 +32,13 @@ namespace dataservice {
namespace read {
namespace repository {

// Json byte stream class. Implements rapidjson input stream concept.
/// Json byte stream class. Implements rapidjson input stream concept.
class RapidJsonByteStream {
public:
typedef char Ch;

/// Read the current character from stream without moving the read cursor.
Ch Peek() const;
Ch Peek();

/// Read the current character from stream and moving the read cursor to next
/// character.
Expand All @@ -47,19 +47,24 @@ class RapidJsonByteStream {
/// Get the current read cursor.
size_t Tell() const;

// Not needed for reading.
/// Not needed for reading.
char* PutBegin();
void Put(char);
void Flush();
size_t PutEnd(char*);

bool Empty() const;
bool ReadEmpty() const;
bool WriteEmpty() const;

void AppendContent(const char* content, size_t length);

private:
void SwapBuffers();

mutable std::mutex mutex_;
std::vector<char> buffer_; // Current buffer
std::vector<char> read_buffer_; // Current buffer
std::vector<char> write_buffer_; // Current buffer
size_t full_count_{0}; // Bytes read from the buffer
size_t count_{0}; // Bytes read from the buffer
mutable std::condition_variable cv_; // Condition for next portion of content
};
Expand Down
16 changes: 9 additions & 7 deletions olp-cpp-sdk-dataservice-read/tests/AsyncJsonStreamTest.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2023 HERE Europe B.V.
* Copyright (C) 2023-2025 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -47,16 +47,18 @@ TEST(AsyncJsonStreamTest, NormalFlow) {

EXPECT_EQ(current_stream->Peek(), '\0');
EXPECT_EQ(current_stream->Take(), '\0');
EXPECT_TRUE(current_stream->Empty());
EXPECT_TRUE(current_stream->ReadEmpty());

EXPECT_EQ(new_current_stream->Peek(), '2');
EXPECT_EQ(new_current_stream->Take(), '2');
EXPECT_EQ(new_current_stream->Take(), '3');
EXPECT_EQ(new_current_stream->Take(), '4');
EXPECT_TRUE(new_current_stream->Empty());
EXPECT_TRUE(new_current_stream->ReadEmpty());

stream.AppendContent("5", 1);
EXPECT_FALSE(new_current_stream->Empty());
// Read buffer is empty here because swap is on Take/Peek
EXPECT_FALSE(new_current_stream->WriteEmpty());
EXPECT_TRUE(new_current_stream->ReadEmpty());

stream.CloseStream(olp::client::ApiError::Cancelled());

Expand All @@ -73,11 +75,11 @@ TEST(AsyncJsonStreamTest, NormalFlow) {
EXPECT_TRUE(stream.GetError()->GetErrorCode() ==
olp::client::ErrorCode::Cancelled);

EXPECT_TRUE(new_current_stream->Empty());
EXPECT_TRUE(new_current_stream->ReadEmpty());
stream.AppendContent("17", 2);
EXPECT_TRUE(new_current_stream->Empty());
EXPECT_TRUE(new_current_stream->ReadEmpty());
stream.ResetStream("4", 1);
EXPECT_TRUE(new_current_stream->Empty());
EXPECT_TRUE(new_current_stream->ReadEmpty());
}

} // namespace
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2019-2024 HERE Europe B.V.
* Copyright (C) 2019-2025 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -2029,7 +2029,9 @@ TEST_F(PartitionsRepositoryTest, StreamPartitions) {
[&](const repository::AsyncJsonStream& async_stream) -> std::string {
std::string result;
auto stream = async_stream.GetCurrentStream();
while (!stream->Empty()) {
// Enforce buffers swap
OLP_SDK_CORE_UNUSED(stream->Peek());
while (!stream->ReadEmpty()) {
result += stream->Take();
}
return result;
Expand Down

0 comments on commit bfd15ff

Please sign in to comment.