Skip to content

Commit

Permalink
chore: support qlist compression when accounting for memory
Browse files Browse the repository at this point in the history
Also fix "debug objhist" so that its value histogram will show effective malloc
used distributions for all types.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange committed Nov 29, 2024
1 parent a4b3724 commit f36f9d0
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 34 deletions.
65 changes: 35 additions & 30 deletions src/core/qlist.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,6 @@ inline ssize_t NodeSetEntry(quicklistNode* node, uint8_t* entry) {
bool CompressNode(quicklistNode* node) {
DCHECK(node->encoding == QUICKLIST_NODE_ENCODING_RAW);
DCHECK(!node->dont_compress);
#ifdef SERVER_TEST
node->attempted_compress = 1;
#endif

/* validate that the node is neither
* tail nor head (it has prev and next)*/
Expand Down Expand Up @@ -219,12 +216,13 @@ bool CompressNode(quicklistNode* node) {
return true;
}

bool CompressNodeIfNeeded(quicklistNode* node) {
ssize_t CompressNodeIfNeeded(quicklistNode* node) {
DCHECK(node);
if (node->encoding == QUICKLIST_NODE_ENCODING_RAW && !node->dont_compress) {
return CompressNode(node);
if (CompressNode(node))
return ((quicklistLZF*)node->entry)->sz - node->sz;
}
return false;
return 0;
}

/* Uncompress the listpack in 'node' and update encoding details.
Expand All @@ -247,17 +245,24 @@ bool DecompressNode(bool recompress, quicklistNode* node) {

/* Decompress only compressed nodes.
recompress: if true, the node will be marked for recompression after decompression.
returns by how much the size of the node has increased.
*/
void DecompressNodeIfNeeded(bool recompress, quicklistNode* node) {
ssize_t DecompressNodeIfNeeded(bool recompress, quicklistNode* node) {
if ((node) && (node)->encoding == QUICKLIST_NODE_ENCODING_LZF) {
DecompressNode(recompress, node);
size_t compressed_sz = ((quicklistLZF*)node->entry)->sz;
if (DecompressNode(recompress, node)) {
return node->sz - compressed_sz;
}
}
return 0;
}

void RecompressOnly(quicklistNode* node) {
ssize_t RecompressOnly(quicklistNode* node) {
if (node->recompress && !node->dont_compress) {
CompressNode(node);
if (CompressNode(node))
return ((quicklistLZF*)node->entry)->sz - node->sz;
}
return 0;
}

quicklistNode* SplitNode(quicklistNode* node, int offset, bool after, ssize_t* diff) {
Expand Down Expand Up @@ -564,7 +569,7 @@ void QList::Insert(Iterator it, std::string_view elem, InsertOpt insert_opt) {
if (QL_NODE_IS_PLAIN(node) || (at_tail && after) || (at_head && !after)) {
InsertPlainNode(node, elem, insert_opt);
} else {
DecompressNodeIfNeeded(true, node);
malloc_size_ += DecompressNodeIfNeeded(true, node);
ssize_t diff_existing = 0;
quicklistNode* new_node = SplitNode(node, it.offset_, after, &diff_existing);
quicklistNode* entry_node = InsertPlainNode(node, elem, insert_opt);
Expand All @@ -576,32 +581,32 @@ void QList::Insert(Iterator it, std::string_view elem, InsertOpt insert_opt) {

/* Now determine where and how to insert the new element */
if (!full) {
DecompressNodeIfNeeded(true, node);
malloc_size_ += DecompressNodeIfNeeded(true, node);
uint8_t* new_entry = LP_Insert(node->entry, elem, it.zi_, after ? LP_AFTER : LP_BEFORE);
malloc_size_ += NodeSetEntry(node, new_entry);
node->count++;
RecompressOnly(node);
malloc_size_ += RecompressOnly(node);
} else {
bool insert_tail = at_tail && after;
bool insert_head = at_head && !after;
if (insert_tail && avail_next) {
/* If we are: at tail, next has free space, and inserting after:
* - insert entry at head of next node. */
auto* new_node = node->next;
DecompressNodeIfNeeded(true, new_node);
malloc_size_ += DecompressNodeIfNeeded(true, new_node);
malloc_size_ += NodeSetEntry(new_node, LP_Prepend(new_node->entry, elem));
new_node->count++;
RecompressOnly(new_node);
RecompressOnly(node);
malloc_size_ += RecompressOnly(new_node);
malloc_size_ += RecompressOnly(node);
} else if (insert_head && avail_prev) {
/* If we are: at head, previous has free space, and inserting before:
* - insert entry at tail of previous node. */
auto* new_node = node->prev;
DecompressNodeIfNeeded(true, new_node);
malloc_size_ += DecompressNodeIfNeeded(true, new_node);
malloc_size_ += NodeSetEntry(new_node, LP_Append(new_node->entry, elem));
new_node->count++;
RecompressOnly(new_node);
RecompressOnly(node);
malloc_size_ += RecompressOnly(new_node);
malloc_size_ += RecompressOnly(node);
} else if (insert_tail || insert_head) {
/* If we are: full, and our prev/next has no available space, then:
* - create new node and attach to qlist */
Expand All @@ -610,7 +615,7 @@ void QList::Insert(Iterator it, std::string_view elem, InsertOpt insert_opt) {
} else {
/* else, node is full we need to split it. */
/* covers both after and !after cases */
DecompressNodeIfNeeded(true, node);
malloc_size_ += DecompressNodeIfNeeded(true, node);
ssize_t diff_existing = 0;
auto* new_node = SplitNode(node, it.offset_, after, &diff_existing);
auto func = after ? LP_Prepend : LP_Append;
Expand Down Expand Up @@ -710,8 +715,8 @@ void QList::Compress(quicklistNode* node) {
int depth = 0;
int in_depth = 0;
while (depth++ < compress_) {
DecompressNodeIfNeeded(false, forward);
DecompressNodeIfNeeded(false, reverse);
malloc_size_ += DecompressNodeIfNeeded(false, forward);
malloc_size_ += DecompressNodeIfNeeded(false, reverse);

if (forward == node || reverse == node)
in_depth = 1;
Expand All @@ -726,11 +731,11 @@ void QList::Compress(quicklistNode* node) {
}

if (!in_depth && node)
CompressNodeIfNeeded(node);
malloc_size_ += CompressNodeIfNeeded(node);

/* At this point, forward and reverse are one node beyond depth */
CompressNodeIfNeeded(forward);
CompressNodeIfNeeded(reverse);
malloc_size_ += CompressNodeIfNeeded(forward);
malloc_size_ += CompressNodeIfNeeded(reverse);
}

/* Attempt to merge listpacks within two nodes on either side of 'center'.
Expand Down Expand Up @@ -801,8 +806,8 @@ quicklistNode* QList::MergeNodes(quicklistNode* center) {
* Returns the input node picked to merge against or NULL if
* merging was not possible. */
quicklistNode* QList::ListpackMerge(quicklistNode* a, quicklistNode* b) {
DecompressNodeIfNeeded(false, a);
DecompressNodeIfNeeded(false, b);
malloc_size_ += DecompressNodeIfNeeded(false, a);
malloc_size_ += DecompressNodeIfNeeded(false, b);
if ((lpMerge(&a->entry, &b->entry))) {
/* We merged listpacks! Now remove the unused quicklistNode. */
quicklistNode *keep = NULL, *nokeep = NULL;
Expand Down Expand Up @@ -1047,14 +1052,14 @@ bool QList::Erase(const long start, unsigned count) {
if (delete_entire_node || QL_NODE_IS_PLAIN(node)) {
DelNode(node);
} else {
DecompressNodeIfNeeded(true, node);
malloc_size_ += DecompressNodeIfNeeded(true, node);
malloc_size_ += NodeSetEntry(node, lpDeleteRange(node->entry, offset, del));
node->count -= del;
count_ -= del;
if (node->count == 0) {
DelNode(node);
} else {
RecompressOnly(node);
malloc_size_ += RecompressOnly(node);
}
}

Expand All @@ -1081,7 +1086,7 @@ bool QList::Iterator::Next() {
int plain = QL_NODE_IS_PLAIN(current_);
if (!zi_) {
/* If !zi, use current index. */
DecompressNodeIfNeeded(true, current_);
const_cast<QList*>(owner_)->malloc_size_ += DecompressNodeIfNeeded(true, current_);
if (ABSL_PREDICT_FALSE(plain))
zi_ = current_->entry;
else
Expand Down
58 changes: 54 additions & 4 deletions src/server/debugcmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ extern "C" {
#include "base/logging.h"
#include "core/compact_object.h"
#include "core/qlist.h"
#include "core/sorted_map.h"
#include "core/string_map.h"
#include "core/string_set.h"
#include "server/blocking_controller.h"
#include "server/container_utils.h"
#include "server/engine_shard_set.h"
Expand Down Expand Up @@ -65,6 +67,7 @@ struct ObjInfo {

// for lists - how many nodes do they have.
unsigned num_nodes = 0;
unsigned num_compressed = 0;

enum LockStatus { NONE, S, X } lock_status = NONE;

Expand Down Expand Up @@ -212,11 +215,23 @@ unsigned AddObjHist(PrimeIterator it, ObjHist* hist) {

if (pv.ObjType() == OBJ_LIST) {
IterateList(pv, per_entry_cb, 0, -1);
if (pv.Encoding() == kEncodingQL2) {
const QList* ql = static_cast<QList*>(pv.RObjPtr());
val_len = ql->MallocUsed(true);
}
} else if (pv.ObjType() == OBJ_ZSET) {
IterateSortedSet(pv.GetRobjWrapper(),
[&](ContainerEntry entry, double) { return per_entry_cb(entry); });
if (pv.Encoding() == OBJ_ENCODING_SKIPLIST) {
detail::SortedMap* smap = static_cast<detail::SortedMap*>(pv.RObjPtr());
val_len = smap->MallocSize();
}
} else if (pv.ObjType() == OBJ_SET) {
IterateSet(pv, per_entry_cb);
if (pv.Encoding() == kEncodingStrMap2) {
StringSet* ss = static_cast<StringSet*>(pv.RObjPtr());
val_len = ss->ObjMallocUsed() + ss->SetMallocUsed();
}
} else if (pv.ObjType() == OBJ_HASH) {
if (pv.Encoding() == kEncodingListPack) {
uint8_t intbuf[LP_INTBUF_SIZE];
Expand Down Expand Up @@ -322,6 +337,14 @@ ObjInfo InspectOp(ConnectionContext* cntx, string_view key) {
if (pv.ObjType() == OBJ_LIST && pv.Encoding() == kEncodingQL2) {
const QList* qlist = static_cast<const QList*>(pv.RObjPtr());
oinfo.num_nodes = qlist->node_count();
auto* node = qlist->Head();

while (node) {
if (node->encoding == QUICKLIST_NODE_ENCODING_LZF) {
++oinfo.num_compressed;
}
node = node->next;
}
}

if (pv.IsExternal()) {
Expand Down Expand Up @@ -357,13 +380,31 @@ OpResult<ValueCompressInfo> EstimateCompression(ConnectionContext* cntx, string_
}

// Only strings are supported right now.
if (it->second.ObjType() != OBJ_STRING) {
if (it->second.ObjType() != OBJ_STRING && it->second.ObjType() != OBJ_LIST) {
return OpStatus::WRONG_TYPE;
}
ValueCompressInfo info;

if (it->second.ObjType() == OBJ_LIST) {
if (it->second.Encoding() != kEncodingQL2) {
return OpStatus::WRONG_TYPE;
}

const QList* src = static_cast<const QList*>(it->second.RObjPtr());
info.raw_size = src->MallocUsed(true);
QList qlist(-2, 1);
auto copy_cb = [&](QList::Entry entry) {
qlist.Push(entry.view(), QList::HEAD);
return true;
};
src->Iterate(copy_cb, 0, -1);
info.compressed_size = qlist.MallocUsed(true);
return info;
}

string scratch;
string_view value = it->second.GetSlice(&scratch);

ValueCompressInfo info;
info.raw_size = value.size();
info.compressed_size = info.raw_size;

Expand Down Expand Up @@ -849,7 +890,10 @@ void DebugCmd::Inspect(string_view key, CmdArgList args, facade::SinkReplyBuilde
ShardId sid = Shard(key, ess.size());
VLOG(1) << "DebugCmd::Inspect " << key;

bool check_compression = (args.size() == 1) && ArgS(args, 0) == "COMPRESS";
bool check_compression = false;
if (args.size() == 1) {
check_compression = absl::AsciiStrToUpper(ArgS(args, 0)) == "COMPRESS";
}
string resp;

if (check_compression) {
Expand Down Expand Up @@ -886,7 +930,13 @@ void DebugCmd::Inspect(string_view key, CmdArgList args, facade::SinkReplyBuilde
}

if (res.num_nodes) {
StrAppend(&resp, " ns:", res.num_nodes);
// node count
StrAppend(&resp, " nc:", res.num_nodes);
}

if (res.num_compressed) {
// compressed nodes
StrAppend(&resp, " cn:", res.num_compressed);
}

if (res.lock_status != ObjInfo::NONE) {
Expand Down

0 comments on commit f36f9d0

Please sign in to comment.