From e3b3917e4538f79846ddd37017aa0fd280d0ea3a Mon Sep 17 00:00:00 2001 From: Borys Date: Thu, 28 Nov 2024 23:32:47 +0200 Subject: [PATCH] refactor: remove redundant allocations for streamer --- src/server/journal/streamer.cc | 74 ++++++++++++++-------------------- src/server/journal/streamer.h | 7 +++- 2 files changed, 36 insertions(+), 45 deletions(-) diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index d6654d3aa9eb..0b178683d62b 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -70,7 +70,7 @@ void JournalStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) { io::StringSink sink; JournalWriter writer(&sink); writer.Write(Entry{journal::Op::LSN, item.lsn}); - Write(sink.str()); + Write(std::move(sink).str()); } }); } @@ -83,50 +83,43 @@ void JournalStreamer::Cancel() { } size_t JournalStreamer::GetTotalBufferCapacities() const { - return in_flight_bytes_ + pending_buf_.capacity(); + return in_flight_bytes_ + pending_buf_mem_size; } -void JournalStreamer::Write(std::string_view str) { - DCHECK(!str.empty()); - DVLOG(3) << "Writing " << str.size() << " bytes"; - - size_t total_pending = pending_buf_.size() + str.size(); - +void JournalStreamer::AsyncWrite() { if (in_flight_bytes_ > 0) { // We can not flush data while there are in flight requests because AsyncWrite // is not atomic. Therefore, we just aggregate. - size_t tail = pending_buf_.size(); - pending_buf_.resize(pending_buf_.size() + str.size()); - memcpy(pending_buf_.data() + tail, str.data(), str.size()); return; } - // If we do not have any in flight requests we send the string right a way. - // We can not aggregate it since we do not know when the next update will follow. - // because of potential SOO with strings, we allocate explicitly on heap. - uint8_t* buf(new uint8_t[str.size()]); + in_flight_bytes_ += pending_buf_mem_size; + total_sent_ += pending_buf_mem_size; - // TODO: it is possible to remove these redundant copies if we adjust high level - // interfaces to pass reference-counted buffers. - memcpy(buf, str.data(), str.size()); - in_flight_bytes_ += total_pending; - total_sent_ += total_pending; + const auto v_size = pending_buf_.size(); + absl::InlinedVector v( + v_size); // consider to use inline_vector and make part of JournalStreamer - iovec v[2]; - unsigned next_buf_id = 0; - - if (!pending_buf_.empty()) { - v[0] = IoVec(pending_buf_); - ++next_buf_id; + for (size_t i = 0; i < v_size; ++i) { + const auto* uptr = reinterpret_cast(pending_buf_[i].data()); + v[i] = (IoVec(io::Bytes(uptr, pending_buf_[i].size()))); } - v[next_buf_id++] = IoVec(io::Bytes(buf, str.size())); - dest_->AsyncWrite( - v, next_buf_id, - [buf0 = std::move(pending_buf_), buf, this, len = total_pending](std::error_code ec) { - delete[] buf; - OnCompletion(ec, len); - }); + const auto len = pending_buf_mem_size; + pending_buf_mem_size = 0; + dest_->AsyncWrite(v.data(), v.size(), + [buf0 = std::move(pending_buf_), v0 = std::move(v), this, + len](std::error_code ec) { OnCompletion(ec, len); }); +} + +void JournalStreamer::Write(std::string str) { + DCHECK(!str.empty()); + DVLOG(2) << "Writing " << str.size() << " bytes"; + + pending_buf_mem_size += str.size(); + pending_buf_.push_back(std::move(str)); + + AsyncWrite(); } void JournalStreamer::OnCompletion(std::error_code ec, size_t len) { @@ -136,13 +129,8 @@ void JournalStreamer::OnCompletion(std::error_code ec, size_t len) { in_flight_bytes_ -= len; if (ec && !IsStopped()) { cntx_->ReportError(ec); - } else if (in_flight_bytes_ == 0 && !pending_buf_.empty() && !IsStopped()) { - // If everything was sent but we have a pending buf, flush it. - io::Bytes src(pending_buf_); - in_flight_bytes_ += src.size(); - dest_->AsyncWrite(src, [buf = std::move(pending_buf_), this](std::error_code ec) { - OnCompletion(ec, buf.size()); - }); + } else if (!pending_buf_.empty() && !IsStopped()) { + AsyncWrite(); } // notify ThrottleIfNeeded or WaitForInflightToComplete that waits @@ -182,7 +170,7 @@ void JournalStreamer::WaitForInflightToComplete() { } bool JournalStreamer::IsStalled() const { - return in_flight_bytes_ + pending_buf_.size() >= replication_stream_output_limit_cached; + return in_flight_bytes_ + pending_buf_mem_size >= replication_stream_output_limit_cached; } RestoreStreamer::RestoreStreamer(DbSlice* slice, cluster::SlotSet slots, journal::Journal* journal, @@ -234,7 +222,7 @@ void RestoreStreamer::SendFinalize(long attempt) { io::StringSink sink; JournalWriter writer{&sink}; writer.Write(entry); - Write(sink.str()); + Write(std::move(sink).str()); // TODO: is the intent here to flush everything? // @@ -318,7 +306,7 @@ void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv, uint64_t expire_ms) { - CmdSerializer serializer([&](std::string s) { Write(s); }); + CmdSerializer serializer([&](std::string s) { Write(std::move(s)); }); serializer.SerializeEntry(key, pk, pv, expire_ms); } diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index a5ef1ad978a2..83ad6e29be8b 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -38,7 +38,7 @@ class JournalStreamer { // or wrap JournalItem::data in shared_ptr, we can avoid the cost of copying strings. // Also, for small strings it's more peformant to copy to the intermediate buffer than // to issue an io operation. - void Write(std::string_view str); + void Write(std::string str); // Blocks the if the consumer if not keeping up. void ThrottleIfNeeded(); @@ -53,6 +53,7 @@ class JournalStreamer { Context* cntx_; private: + void AsyncWrite(); void OnCompletion(std::error_code ec, size_t len); bool IsStopped() const { @@ -62,7 +63,9 @@ class JournalStreamer { bool IsStalled() const; journal::Journal* journal_; - std::vector pending_buf_; + + size_t pending_buf_mem_size = 0; + absl::InlinedVector pending_buf_; // can be rewrite as vector size_t in_flight_bytes_ = 0, total_sent_ = 0; time_t last_lsn_time_ = 0;