Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Double-buffering for JSON async stream #1571

Merged
merged 1 commit into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 28 additions & 22 deletions olp-cpp-sdk-dataservice-read/src/repositories/AsyncJsonStream.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2023 HERE Europe B.V.
* Copyright (C) 2023-2025 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,51 +19,57 @@

#include "AsyncJsonStream.h"

#include <cstring>

namespace olp {
namespace dataservice {
namespace read {
namespace repository {

RapidJsonByteStream::Ch RapidJsonByteStream::Peek() const {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [=]() { return !Empty(); });
return buffer_[count_];
RapidJsonByteStream::Ch RapidJsonByteStream::Peek() {
if (ReadEmpty()) {
SwapBuffers();
}
return read_buffer_[count_];
}

RapidJsonByteStream::Ch RapidJsonByteStream::Take() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [=]() { return !Empty(); });
return buffer_[count_++];
if (ReadEmpty()) {
SwapBuffers();
}
full_count_++;
return read_buffer_[count_++];
}

size_t RapidJsonByteStream::Tell() const { return count_; }
size_t RapidJsonByteStream::Tell() const { return full_count_; }

// Not implemented
char* RapidJsonByteStream::PutBegin() { return 0; }
void RapidJsonByteStream::Put(char) {}
void RapidJsonByteStream::Flush() {}
size_t RapidJsonByteStream::PutEnd(char*) { return 0; }

bool RapidJsonByteStream::Empty() const { return count_ == buffer_.size(); }
bool RapidJsonByteStream::ReadEmpty() const {
return count_ == read_buffer_.size();
}
bool RapidJsonByteStream::WriteEmpty() const { return write_buffer_.empty(); }

void RapidJsonByteStream::AppendContent(const char* content, size_t length) {
std::unique_lock<std::mutex> lock(mutex_);

if (Empty()) {
buffer_.resize(length);
std::memcpy(buffer_.data(), content, length);
count_ = 0;
} else {
const auto buffer_size = buffer_.size();
buffer_.resize(buffer_size + length);
std::memcpy(buffer_.data() + buffer_size, content, length);
}
const auto buffer_size = write_buffer_.size();
write_buffer_.reserve(buffer_size + length);
write_buffer_.insert(write_buffer_.end(), content, content + length);

cv_.notify_one();
}

void RapidJsonByteStream::SwapBuffers() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [&]() { return !WriteEmpty(); });
std::swap(read_buffer_, write_buffer_);
write_buffer_.clear();
count_ = 0;
}

AsyncJsonStream::AsyncJsonStream()
: current_stream_(std::make_shared<RapidJsonByteStream>()),
closed_{false} {}
Expand Down Expand Up @@ -97,7 +103,7 @@ void AsyncJsonStream::CloseStream(boost::optional<client::ApiError> error) {
return;
}
current_stream_->AppendContent("\0", 1);
error_ = error;
error_ = std::move(error);
closed_ = true;
}

Expand Down
17 changes: 11 additions & 6 deletions olp-cpp-sdk-dataservice-read/src/repositories/AsyncJsonStream.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2023 HERE Europe B.V.
* Copyright (C) 2023-2025 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,13 +32,13 @@ namespace dataservice {
namespace read {
namespace repository {

// Json byte stream class. Implements rapidjson input stream concept.
/// Json byte stream class. Implements rapidjson input stream concept.
class RapidJsonByteStream {
public:
typedef char Ch;

/// Read the current character from stream without moving the read cursor.
Ch Peek() const;
Ch Peek();

/// Read the current character from stream and moving the read cursor to next
/// character.
Expand All @@ -47,19 +47,24 @@ class RapidJsonByteStream {
/// Get the current read cursor.
size_t Tell() const;

// Not needed for reading.
/// Not needed for reading.
char* PutBegin();
void Put(char);
void Flush();
size_t PutEnd(char*);

bool Empty() const;
bool ReadEmpty() const;
bool WriteEmpty() const;

void AppendContent(const char* content, size_t length);

private:
void SwapBuffers();

mutable std::mutex mutex_;
std::vector<char> buffer_; // Current buffer
std::vector<char> read_buffer_; // Current buffer
std::vector<char> write_buffer_; // Current buffer
size_t full_count_{0}; // Bytes read from the buffer
size_t count_{0}; // Bytes read from the buffer
mutable std::condition_variable cv_; // Condition for next portion of content
};
Expand Down
16 changes: 9 additions & 7 deletions olp-cpp-sdk-dataservice-read/tests/AsyncJsonStreamTest.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2023 HERE Europe B.V.
* Copyright (C) 2023-2025 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -47,16 +47,18 @@ TEST(AsyncJsonStreamTest, NormalFlow) {

EXPECT_EQ(current_stream->Peek(), '\0');
EXPECT_EQ(current_stream->Take(), '\0');
EXPECT_TRUE(current_stream->Empty());
EXPECT_TRUE(current_stream->ReadEmpty());

EXPECT_EQ(new_current_stream->Peek(), '2');
EXPECT_EQ(new_current_stream->Take(), '2');
EXPECT_EQ(new_current_stream->Take(), '3');
EXPECT_EQ(new_current_stream->Take(), '4');
EXPECT_TRUE(new_current_stream->Empty());
EXPECT_TRUE(new_current_stream->ReadEmpty());

stream.AppendContent("5", 1);
EXPECT_FALSE(new_current_stream->Empty());
// Read buffer is empty here because swap is on Take/Peek
EXPECT_FALSE(new_current_stream->WriteEmpty());
EXPECT_TRUE(new_current_stream->ReadEmpty());

stream.CloseStream(olp::client::ApiError::Cancelled());

Expand All @@ -73,11 +75,11 @@ TEST(AsyncJsonStreamTest, NormalFlow) {
EXPECT_TRUE(stream.GetError()->GetErrorCode() ==
olp::client::ErrorCode::Cancelled);

EXPECT_TRUE(new_current_stream->Empty());
EXPECT_TRUE(new_current_stream->ReadEmpty());
stream.AppendContent("17", 2);
EXPECT_TRUE(new_current_stream->Empty());
EXPECT_TRUE(new_current_stream->ReadEmpty());
stream.ResetStream("4", 1);
EXPECT_TRUE(new_current_stream->Empty());
EXPECT_TRUE(new_current_stream->ReadEmpty());
}

} // namespace
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2019-2024 HERE Europe B.V.
* Copyright (C) 2019-2025 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -2029,7 +2029,9 @@ TEST_F(PartitionsRepositoryTest, StreamPartitions) {
[&](const repository::AsyncJsonStream& async_stream) -> std::string {
std::string result;
auto stream = async_stream.GetCurrentStream();
while (!stream->Empty()) {
// Enforce buffers swap
OLP_SDK_CORE_UNUSED(stream->Peek());
while (!stream->ReadEmpty()) {
result += stream->Take();
}
return result;
Expand Down
Loading