Skip to content

Commit

Permalink
refactor: remove redundant allocations for streamer
Browse files Browse the repository at this point in the history
  • Loading branch information
BorysTheDev committed Nov 29, 2024
1 parent 3ad5b38 commit e3b3917
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 45 deletions.
74 changes: 31 additions & 43 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ void JournalStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) {
io::StringSink sink;
JournalWriter writer(&sink);
writer.Write(Entry{journal::Op::LSN, item.lsn});
Write(sink.str());
Write(std::move(sink).str());
}
});
}
Expand All @@ -83,50 +83,43 @@ void JournalStreamer::Cancel() {
}

size_t JournalStreamer::GetTotalBufferCapacities() const {
return in_flight_bytes_ + pending_buf_.capacity();
return in_flight_bytes_ + pending_buf_mem_size;
}

void JournalStreamer::Write(std::string_view str) {
DCHECK(!str.empty());
DVLOG(3) << "Writing " << str.size() << " bytes";

size_t total_pending = pending_buf_.size() + str.size();

void JournalStreamer::AsyncWrite() {
if (in_flight_bytes_ > 0) {
// We can not flush data while there are in flight requests because AsyncWrite
// is not atomic. Therefore, we just aggregate.
size_t tail = pending_buf_.size();
pending_buf_.resize(pending_buf_.size() + str.size());
memcpy(pending_buf_.data() + tail, str.data(), str.size());
return;
}

// If we do not have any in flight requests we send the string right a way.
// We can not aggregate it since we do not know when the next update will follow.
// because of potential SOO with strings, we allocate explicitly on heap.
uint8_t* buf(new uint8_t[str.size()]);
in_flight_bytes_ += pending_buf_mem_size;
total_sent_ += pending_buf_mem_size;

// TODO: it is possible to remove these redundant copies if we adjust high level
// interfaces to pass reference-counted buffers.
memcpy(buf, str.data(), str.size());
in_flight_bytes_ += total_pending;
total_sent_ += total_pending;
const auto v_size = pending_buf_.size();
absl::InlinedVector<iovec, 4> v(
v_size); // consider to use inline_vector and make part of JournalStreamer

iovec v[2];
unsigned next_buf_id = 0;

if (!pending_buf_.empty()) {
v[0] = IoVec(pending_buf_);
++next_buf_id;
for (size_t i = 0; i < v_size; ++i) {
const auto* uptr = reinterpret_cast<const uint8_t*>(pending_buf_[i].data());
v[i] = (IoVec(io::Bytes(uptr, pending_buf_[i].size())));
}
v[next_buf_id++] = IoVec(io::Bytes(buf, str.size()));

dest_->AsyncWrite(
v, next_buf_id,
[buf0 = std::move(pending_buf_), buf, this, len = total_pending](std::error_code ec) {
delete[] buf;
OnCompletion(ec, len);
});
const auto len = pending_buf_mem_size;
pending_buf_mem_size = 0;
dest_->AsyncWrite(v.data(), v.size(),
[buf0 = std::move(pending_buf_), v0 = std::move(v), this,
len](std::error_code ec) { OnCompletion(ec, len); });
}

void JournalStreamer::Write(std::string str) {
DCHECK(!str.empty());
DVLOG(2) << "Writing " << str.size() << " bytes";

pending_buf_mem_size += str.size();
pending_buf_.push_back(std::move(str));

AsyncWrite();
}

void JournalStreamer::OnCompletion(std::error_code ec, size_t len) {
Expand All @@ -136,13 +129,8 @@ void JournalStreamer::OnCompletion(std::error_code ec, size_t len) {
in_flight_bytes_ -= len;
if (ec && !IsStopped()) {
cntx_->ReportError(ec);
} else if (in_flight_bytes_ == 0 && !pending_buf_.empty() && !IsStopped()) {
// If everything was sent but we have a pending buf, flush it.
io::Bytes src(pending_buf_);
in_flight_bytes_ += src.size();
dest_->AsyncWrite(src, [buf = std::move(pending_buf_), this](std::error_code ec) {
OnCompletion(ec, buf.size());
});
} else if (!pending_buf_.empty() && !IsStopped()) {
AsyncWrite();
}

// notify ThrottleIfNeeded or WaitForInflightToComplete that waits
Expand Down Expand Up @@ -182,7 +170,7 @@ void JournalStreamer::WaitForInflightToComplete() {
}

bool JournalStreamer::IsStalled() const {
return in_flight_bytes_ + pending_buf_.size() >= replication_stream_output_limit_cached;
return in_flight_bytes_ + pending_buf_mem_size >= replication_stream_output_limit_cached;
}

RestoreStreamer::RestoreStreamer(DbSlice* slice, cluster::SlotSet slots, journal::Journal* journal,
Expand Down Expand Up @@ -234,7 +222,7 @@ void RestoreStreamer::SendFinalize(long attempt) {
io::StringSink sink;
JournalWriter writer{&sink};
writer.Write(entry);
Write(sink.str());
Write(std::move(sink).str());

// TODO: is the intent here to flush everything?
//
Expand Down Expand Up @@ -318,7 +306,7 @@ void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req

void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms) {
CmdSerializer serializer([&](std::string s) { Write(s); });
CmdSerializer serializer([&](std::string s) { Write(std::move(s)); });
serializer.SerializeEntry(key, pk, pv, expire_ms);
}

Expand Down
7 changes: 5 additions & 2 deletions src/server/journal/streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class JournalStreamer {
// or wrap JournalItem::data in shared_ptr, we can avoid the cost of copying strings.
// Also, for small strings it's more peformant to copy to the intermediate buffer than
// to issue an io operation.
void Write(std::string_view str);
void Write(std::string str);

// Blocks the if the consumer if not keeping up.
void ThrottleIfNeeded();
Expand All @@ -53,6 +53,7 @@ class JournalStreamer {
Context* cntx_;

private:
void AsyncWrite();
void OnCompletion(std::error_code ec, size_t len);

bool IsStopped() const {
Expand All @@ -62,7 +63,9 @@ class JournalStreamer {
bool IsStalled() const;

journal::Journal* journal_;
std::vector<uint8_t> pending_buf_;

size_t pending_buf_mem_size = 0;
absl::InlinedVector<std::string, 4> pending_buf_; // can be rewrite as vector<iovec>
size_t in_flight_bytes_ = 0, total_sent_ = 0;

time_t last_lsn_time_ = 0;
Expand Down

0 comments on commit e3b3917

Please sign in to comment.