Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Reduce memory consumption when migrating huge values #4119

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 38 additions & 13 deletions src/server/rdb_save.cc
Original file line number Diff line number Diff line change
Expand Up @@ -933,14 +933,13 @@ VersionBuffer MakeRdbVersion() {
return buf;
}

CrcBuffer MakeCheckSum(std::string_view dump_res) {
uint64_t chksum = crc64(0, reinterpret_cast<const uint8_t*>(dump_res.data()), dump_res.size());
CrcBuffer WrapCheckSum(uint64_t crc) {
CrcBuffer buf;
absl::little_endian::Store64(buf.data(), chksum);
absl::little_endian::Store64(buf.data(), crc);
return buf;
}

void AppendFooter(io::StringSink* dump_res) {
void AppendFooter(io::Sink* dump_res, uint64_t crc) {
auto to_bytes = [](const auto& buf) {
return io::Bytes(reinterpret_cast<const uint8_t*>(buf.data()), buf.size());
};
Expand All @@ -952,18 +951,45 @@ void AppendFooter(io::StringSink* dump_res) {
* RDB version and CRC are both in little endian.
*/
const auto ver = MakeRdbVersion();
dump_res->Write(to_bytes(ver));
const auto crc = MakeCheckSum(dump_res->str());
dump_res->Write(to_bytes(crc));
auto ver_bytes = to_bytes(ver);
dump_res->Write(ver_bytes);

crc = crc64(crc, ver_bytes.data(), ver_bytes.size());
const auto crc_buf = WrapCheckSum(crc);
dump_res->Write(to_bytes(crc_buf));
}
} // namespace

void SerializerBase::DumpObject(const CompactObj& obj, io::StringSink* out) {
void SerializerBase::DumpObject(const CompactObj& obj, io::Sink* out) {
CompressionMode compression_mode = GetDefaultCompressionMode();
if (compression_mode != CompressionMode::NONE) {
compression_mode = CompressionMode::SINGLE_ENTRY;
}
RdbSerializer serializer(compression_mode);
uint64_t crc = 0;
uint64_t total_size = 0;
auto flush = [&](RdbSerializer* srz) {
io::StringSink s;

Comment on lines +971 to +972
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do want to flush directly to a string and then to the sink ? Is there a way to do it directly ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reason I do it as such is to be able to compute the CRC value for the additional data, so we can flush to the sink directly (otherwise we can't compute the crc)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(otherwise we can't compute the crc)

Yep that answers it 😄

// Use kFlushMidEntry because we append a footer below
auto ec = srz->FlushToSink(&s, SerializerBase::FlushState::kFlushMidEntry);
RETURN_ON_ERR(ec);

string_view sv = s.str();
if (sv.empty()) {
return ec; // Nothing to do
}

total_size += sv.size();
crc = crc64(crc, reinterpret_cast<const uint8_t*>(sv.data()), sv.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice you can accumulate crc based on the previous value + the new serialized

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤓


return out->Write(io::Buffer(sv));
};

RdbSerializer serializer(compression_mode, [&](size_t size, SerializerBase::FlushState) {
auto size_before = total_size;
flush(&serializer);
DCHECK_EQ(size, total_size - size_before);
});
Comment on lines +988 to +992
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to flush unconditionally on large values ? (even when big value serialization is off ?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch! I was under the (false) assumption that this is called only above serialization_max_chunk_size threshold. I'll modify this area.


// According to Redis code we need to
// 1. Save the value itself - without the key
Expand All @@ -974,10 +1000,9 @@ void SerializerBase::DumpObject(const CompactObj& obj, io::StringSink* out) {
CHECK(!ec);
ec = serializer.SaveValue(obj);
CHECK(!ec); // make sure that fully was successful
ec = serializer.FlushToSink(out, SerializerBase::FlushState::kFlushMidEntry);
CHECK(!ec); // make sure that fully was successful
AppendFooter(out); // version and crc
CHECK_GT(out->str().size(), 10u);
ec = flush(&serializer);
CHECK(!ec); // make sure that fully was successful
AppendFooter(out, crc); // version and crc
}

size_t SerializerBase::SerializedLen() const {
Expand Down
2 changes: 1 addition & 1 deletion src/server/rdb_save.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class SerializerBase {
virtual ~SerializerBase() = default;

// Dumps `obj` in DUMP command format into `out`. Uses default compression mode.
static void DumpObject(const CompactObj& obj, io::StringSink* out);
static void DumpObject(const CompactObj& obj, io::Sink* out);

// Internal buffer size. Might shrink after flush due to compression.
size_t SerializedLen() const;
Expand Down
Loading