Skip to content

Commit

Permalink
feat: add ability reading stream_listpacks_2/3 rdb types (#4192)
Browse files Browse the repository at this point in the history
* feat: add ability reading stream_listpacks_2/3 rdb types

* refactor: address comments
  • Loading branch information
BorysTheDev authored Nov 26, 2024
1 parent f84e1ee commit 3327e1a
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 4 deletions.
3 changes: 2 additions & 1 deletion src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ cxx_test(string_family_test dfly_test_lib LABELS DFLY)
cxx_test(bitops_family_test dfly_test_lib LABELS DFLY)
cxx_test(rdb_test dfly_test_lib DATA testdata/empty.rdb testdata/redis6_small.rdb
testdata/redis6_stream.rdb testdata/hll.rdb testdata/redis7_small.rdb
testdata/redis_json.rdb LABELS DFLY)
testdata/redis_json.rdb testdata/RDB_TYPE_STREAM_LISTPACKS_2.rdb
testdata/RDB_TYPE_STREAM_LISTPACKS_3.rdb LABELS DFLY)
cxx_test(zset_family_test dfly_test_lib LABELS DFLY)
cxx_test(blocking_controller_test dfly_test_lib LABELS DFLY)
cxx_test(json_family_test dfly_test_lib LABELS DFLY)
Expand Down
55 changes: 53 additions & 2 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1502,7 +1502,9 @@ error_code RdbLoaderBase::ReadObj(int rdbtype, OpaqueObj* dest) {
iores = ReadListQuicklist(rdbtype);
break;
case RDB_TYPE_STREAM_LISTPACKS:
iores = ReadStreams();
case RDB_TYPE_STREAM_LISTPACKS_2:
case RDB_TYPE_STREAM_LISTPACKS_3:
iores = ReadStreams(rdbtype);
break;
case RDB_TYPE_JSON:
iores = ReadJson();
Expand Down Expand Up @@ -1828,7 +1830,7 @@ auto RdbLoaderBase::ReadListQuicklist(int rdbtype) -> io::Result<OpaqueObj> {
return OpaqueObj{std::move(load_trace), rdbtype};
}

auto RdbLoaderBase::ReadStreams() -> io::Result<OpaqueObj> {
auto RdbLoaderBase::ReadStreams(int rdbtype) -> io::Result<OpaqueObj> {
size_t listpacks;
if (pending_read_.remaining > 0) {
listpacks = pending_read_.remaining;
Expand Down Expand Up @@ -1892,6 +1894,30 @@ auto RdbLoaderBase::ReadStreams() -> io::Result<OpaqueObj> {
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->ms);
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->seq);

if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_2) {
/* Load the first entry ID. */
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->first_id.ms);
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->first_id.seq);

/* Load the maximal deleted entry ID. */
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->max_deleted_entry_id.ms);
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->max_deleted_entry_id.seq);

/* Load the offset. */
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->entries_added);
} else {
/* During migration the offset can be initialized to the stream's
* length. At this point, we also don't care about tombstones
* because CG offsets will be later initialized as well. */
load_trace->stream_trace->max_deleted_entry_id.ms = 0;
load_trace->stream_trace->max_deleted_entry_id.seq = 0;
load_trace->stream_trace->entries_added = load_trace->stream_trace->stream_len;

// TODO add implementation, we need to find the first entry's ID.
// The redis code is next
// streamGetEdgeID(s,1,1,&s->first_id);
}

/* Consumer groups loading */
uint64_t cgroups_count;
SET_OR_UNEXPECT(LoadLen(nullptr), cgroups_count);
Expand All @@ -1913,6 +1939,24 @@ auto RdbLoaderBase::ReadStreams() -> io::Result<OpaqueObj> {
SET_OR_UNEXPECT(LoadLen(nullptr), cgroup.ms);
SET_OR_UNEXPECT(LoadLen(nullptr), cgroup.seq);

uint64_t cg_offset;
if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_2) {
SET_OR_UNEXPECT(LoadLen(nullptr), cg_offset);
(void)cg_offset;
} else {
// TODO implement
// cg_offset = should be calculated like streamEstimateDistanceFromFirstEverEntry();
}

// TODO add our implementation for the next Redis logic
// streamCG* cgroup = streamCreateCG(s, cgname, sdslen(cgname), &cg_id, cg_offset);
// if (cgroup == NULL) {
// rdbReportCorruptRDB("Duplicated consumer group name %s", cgname);
// decrRefCount(o);
// sdsfree(cgname);
// return NULL;
// }

/* Load the global PEL for this consumer group, however we'll
* not yet populate the NACK structures with the message
* owner, since consumers for this group and their messages will
Expand Down Expand Up @@ -1958,6 +2002,13 @@ auto RdbLoaderBase::ReadStreams() -> io::Result<OpaqueObj> {

SET_OR_UNEXPECT(FetchInt<int64_t>(), consumer.seen_time);

if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_3) {
SET_OR_UNEXPECT(FetchInt<int64_t>(), consumer.active_time);
} else {
/* That's the best estimate we got */
consumer.active_time = consumer.seen_time;
}

/* Load the PEL about entries owned by this specific
* consumer. */
SET_OR_UNEXPECT(LoadLen(nullptr), pel_size);
Expand Down
11 changes: 10 additions & 1 deletion src/server/rdb_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,15 @@ class RdbLoaderBase {
struct StreamConsumerTrace {
RdbVariant name;
int64_t seen_time;
int64_t active_time;
std::vector<std::array<uint8_t, 16>> nack_arr;
};

struct StreamID {
uint64_t ms;
uint64_t seq;
};

struct StreamCGTrace {
RdbVariant name;
uint64_t ms;
Expand All @@ -95,6 +101,9 @@ class RdbLoaderBase {
size_t lp_len;
size_t stream_len;
uint64_t ms, seq;
StreamID first_id; /* The first non-tombstone entry, zero if empty. */
StreamID max_deleted_entry_id; /* The maximal ID that was deleted. */
uint64_t entries_added; /* All time count of elements added. */
std::vector<StreamCGTrace> cgroup;
};

Expand Down Expand Up @@ -165,7 +174,7 @@ class RdbLoaderBase {
::io::Result<OpaqueObj> ReadZSet(int rdbtype);
::io::Result<OpaqueObj> ReadZSetZL();
::io::Result<OpaqueObj> ReadListQuicklist(int rdbtype);
::io::Result<OpaqueObj> ReadStreams();
::io::Result<OpaqueObj> ReadStreams(int rdbtype);
::io::Result<OpaqueObj> ReadRedisJson();
::io::Result<OpaqueObj> ReadJson();
::io::Result<OpaqueObj> ReadSBF();
Expand Down
24 changes: 24 additions & 0 deletions src/server/rdb_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -655,4 +655,28 @@ TEST_F(RdbTest, SnapshotTooBig) {
ASSERT_THAT(resp, ErrArg("Out of memory"));
}

TEST_F(RdbTest, LoadStream2) {
auto ec = LoadRdb("RDB_TYPE_STREAM_LISTPACKS_2.rdb");
ASSERT_FALSE(ec) << ec.message();
auto res = Run({"XINFO", "STREAM", "mystream"});
EXPECT_THAT(
res.GetVec(),
ElementsAre("length", IntArg(2), "radix-tree-keys", IntArg(1), "radix-tree-nodes", IntArg(2),
"last-generated-id", "1732613360686-0", "max-deleted-entry-id", "0-0",
"entries-added", IntArg(0), "recorded-first-entry-id", "0-0", "groups", IntArg(1),
"first-entry", ArgType(RespExpr::ARRAY), "last-entry", ArgType(RespExpr::ARRAY)));
}

TEST_F(RdbTest, LoadStream3) {
auto ec = LoadRdb("RDB_TYPE_STREAM_LISTPACKS_2.rdb");
ASSERT_FALSE(ec) << ec.message();
auto res = Run({"XINFO", "STREAM", "mystream"});
EXPECT_THAT(
res.GetVec(),
ElementsAre("length", IntArg(2), "radix-tree-keys", IntArg(1), "radix-tree-nodes", IntArg(2),
"last-generated-id", "1732613360686-0", "max-deleted-entry-id", "0-0",
"entries-added", IntArg(0), "recorded-first-entry-id", "0-0", "groups", IntArg(1),
"first-entry", ArgType(RespExpr::ARRAY), "last-entry", ArgType(RespExpr::ARRAY)));
}

} // namespace dfly
Binary file added src/server/testdata/RDB_TYPE_STREAM_LISTPACKS_2.rdb
Binary file not shown.
Binary file added src/server/testdata/RDB_TYPE_STREAM_LISTPACKS_3.rdb
Binary file not shown.

0 comments on commit 3327e1a

Please sign in to comment.