diff --git a/src/table/data_block_builder.cpp b/src/table/data_block_builder.cpp index 72c14b9..4737efa 100644 --- a/src/table/data_block_builder.cpp +++ b/src/table/data_block_builder.cpp @@ -6,6 +6,7 @@ #include "sst_config.h" #include "data_block_builder.h" #include "../utils/codec.h" +#include "../log/log.h" namespace smallkv { // todo: 热点函数,后续需要更多优化 @@ -13,77 +14,77 @@ namespace smallkv { if (key.empty()) { return Status::Success; } - bool need_fullkey = false; - if (record_num % SSTConfigInfo::RESTART_INTERVAL == 0) { // 判断是否需要插入重启点 - need_fullkey = true; // 第一条数据需要完整key - record_group_offset.push_back(static_cast(get_records_size())); - } - ++record_num; // DataBlock中的Record的数量加一 + bool need_fullkey = false; + if (record_num % SSTConfigInfo::RESTART_INTERVAL == 0) { // 判断是否需要插入重启点 + need_fullkey = true; // 第一条数据需要完整key + record_group_offset.push_back(static_cast(get_records_size())); + } + ++record_num; // DataBlock中的Record的数量加一 - if (need_fullkey) { - // 是Record Group的第一条数据,不需要压缩 - utils::PutFixed32(_data, 0); // shared_key_len - utils::PutFixed32(_data, static_cast(key.size())); // unshared_key_len - utils::PutFixed32(_data, static_cast(value.size())); // value_len - _data.append(key); // unshared_key_content - _data.append(value); // value_content - } else { - // 非第一条数据,压缩 - // 计算当前key和前一个key的公共部分 - auto min_len = std::min(key.size(), pre_key.size()); - int shared_key_len = 0; - for (int i = 0; i < min_len; ++i) { - if (key[i] == pre_key[i]) { - ++shared_key_len; - } else { - break; - } + if (need_fullkey) { + // 是Record Group的第一条数据,不需要压缩 + utils::PutFixed32(_data, 0); // shared_key_len + utils::PutFixed32(_data, static_cast(key.size())); // unshared_key_len + utils::PutFixed32(_data, static_cast(value.size())); // value_len + _data.append(key); // unshared_key_content + _data.append(value); // value_content + } else { + // 非第一条数据,压缩 + // 计算当前key和前一个key的公共部分 + auto min_len = std::min(key.size(), pre_key.size()); + int shared_key_len = 0; + for (int i = 0; i < min_len; ++i) { + if (key[i] == pre_key[i]) { + ++shared_key_len; + } else { + break; } - int unshared_key_len = static_cast(key.size()) - shared_key_len; - - // todo: 这里可以尝试一下variant编码 - utils::PutFixed32(_data, shared_key_len); - utils::PutFixed32(_data, unshared_key_len); - utils::PutFixed32(_data, static_cast(value.size())); - _data.append(key); - _data.append(value); } - pre_key = key; // 更新前一个key + int unshared_key_len = static_cast(key.size()) - shared_key_len; - return Status::Success; + // todo: 这里可以尝试一下variant编码 + utils::PutFixed32(_data, shared_key_len); + utils::PutFixed32(_data, unshared_key_len); + utils::PutFixed32(_data, static_cast(value.size())); + _data.append(key.substr(shared_key_len)); + _data.append(value); } + pre_key = key; // 更新前一个key - DBStatus DataBlockBuilder::add_restart_points() { - assert(get_records_size() >= SSTConfigInfo::MAX_DATA_BLOCK_SIZE); - int restart_point_num = static_cast((record_num - 0.5) / 16) + 1; - int last_offset = static_cast(_data.size()); // 记录_data的最后一个字节的位置 - for (int i = 0; i < record_group_offset.size(); ++i) { - // - // Restart_Point的schema如下: - // +----------------+----------------+ - // | record_num(4B) | OffsetInfo(8B) | - // +----------------+----------------+ - // - int32_t _record_num; // 当前Record Group中的Record数量 - OffsetInfo _offsetInfo{0, record_group_offset[i]}; - if (i != record_group_offset.size() - 1) { - _offsetInfo.size = record_group_offset[i + 1] - record_group_offset[i]; - _record_num = SSTConfigInfo::RESTART_INTERVAL; - } else { - _offsetInfo.size = last_offset - record_group_offset[i]; - _record_num = record_num % SSTConfigInfo::RESTART_INTERVAL; - } - utils::PutFixed32(_data, _record_num); - utils::PutFixed64(_data, *reinterpret_cast(&_offsetInfo)); + return Status::Success; + } + + DBStatus DataBlockBuilder::add_restart_points() { + int restart_point_num = static_cast((record_num - 0.5) / 16) + 1; + int last_offset = static_cast(_data.size()); // 记录_data的最后一个字节的位置 + log::get_logger()->info("record_group_offset.size={}", record_group_offset.size()); + for (int i = 0; i < record_group_offset.size(); ++i) { + // + // Restart_Point的schema如下: + // +----------------+----------------+ + // | record_num(4B) | OffsetInfo(8B) | + // +----------------+----------------+ + // + int32_t _record_num; // 当前Record Group中的Record数量 + OffsetInfo _offsetInfo{0, record_group_offset[i]}; + if (i != record_group_offset.size() - 1) { + _offsetInfo.size = record_group_offset[i + 1] - record_group_offset[i]; + _record_num = SSTConfigInfo::RESTART_INTERVAL; + } else { + _offsetInfo.size = last_offset - record_group_offset[i]; + _record_num = record_num % SSTConfigInfo::RESTART_INTERVAL; } - utils::PutFixed32(_data, restart_point_num); - return Status::Success; + utils::PutFixed32(_data, _record_num); + utils::PutFixed64(_data, *reinterpret_cast(&_offsetInfo)); } + utils::PutFixed32(_data, restart_point_num); + return Status::Success; + } - void DataBlockBuilder::reset() { - record_group_offset.clear(); - pre_key = ""; - _data = ""; - record_num = 0; - } - } \ No newline at end of file + void DataBlockBuilder::reset() { + record_group_offset.clear(); + pre_key = ""; + _data = ""; + record_num = 0; + } +} \ No newline at end of file diff --git a/src/table/data_block_builder.h b/src/table/data_block_builder.h index 6a6295f..ac39f50 100644 --- a/src/table/data_block_builder.h +++ b/src/table/data_block_builder.h @@ -76,7 +76,7 @@ namespace smallkv { // 当Block中的所有Record的大小超过阈值时,停止写入,并且生成重启点等信息 DBStatus add_restart_points(); - const char *data() { return _data.c_str(); } + std::string_view data() { return _data; } // 清空DataBlock中的所有数据 void reset(); diff --git a/src/utils/codec.cpp b/src/utils/codec.cpp index 5458a04..794b202 100644 --- a/src/utils/codec.cpp +++ b/src/utils/codec.cpp @@ -6,14 +6,14 @@ namespace smallkv::utils { - void EncodeFixed32(char *buf, int32_t val) { + void EncodeFixed32(char *buf, uint32_t val) { buf[0] = val & 0xff; buf[1] = (val >> 8) & 0xff; buf[2] = (val >> 16) & 0xff; buf[3] = (val >> 24) & 0xff; } - void EncodeFixed64(char *buf, int64_t val) { + void EncodeFixed64(char *buf, uint64_t val) { buf[0] = val & 0xff; buf[1] = (val >> 8) & 0xff; buf[2] = (val >> 16) & 0xff; @@ -24,4 +24,24 @@ namespace smallkv::utils { buf[7] = (val >> 56) & 0xff; } + uint32_t DecodeFixed32(const char *data) { + auto _data = reinterpret_cast(data); + return static_cast(_data[0]) | + (static_cast(_data[1]) << 8) | + (static_cast(_data[2]) << 16) | + (static_cast(_data[3]) << 24); + } + + uint64_t DecodeFixed64(const char *data) { + auto _data = reinterpret_cast(data); + return static_cast(_data[0]) | + (static_cast(_data[1]) << 8) | + (static_cast(_data[2]) << 16) | + (static_cast(_data[3]) << 24) | + (static_cast(_data[4]) << 32) | + (static_cast(_data[5]) << 40) | + (static_cast(_data[6]) << 48) | + (static_cast(_data[7]) << 56); + } + } \ No newline at end of file diff --git a/src/utils/codec.h b/src/utils/codec.h index 6c23de8..5fe7bee 100644 --- a/src/utils/codec.h +++ b/src/utils/codec.h @@ -8,24 +8,21 @@ #define SMALLKV_CODEC_H namespace smallkv::utils { // 编解码 - void EncodeFixed32(char *buf, int32_t val); + void EncodeFixed32(char *buf, uint32_t val); - void EncodeFixed64(char *buf, int64_t val); + void EncodeFixed64(char *buf, uint64_t val); - // todo - void DecodeFixed32(); + uint32_t DecodeFixed32(const char *data); - // todo - void DecodeFixed64(); + uint64_t DecodeFixed64(const char *data); - - inline void PutFixed32(std::string &dst, int32_t val) { + inline void PutFixed32(std::string &dst, uint32_t val) { char buf[sizeof(val)]; EncodeFixed32(buf, val); dst.append(buf, sizeof(val)); } - inline void PutFixed64(std::string &dst, int64_t val) { + inline void PutFixed64(std::string &dst, uint64_t val) { char buf[sizeof(val)]; EncodeFixed64(buf, val); dst.append(buf, sizeof(val)); diff --git a/tests/test_data_block_builder.cpp b/tests/test_data_block_builder.cpp new file mode 100644 index 0000000..0db9f38 --- /dev/null +++ b/tests/test_data_block_builder.cpp @@ -0,0 +1,70 @@ +// +// Created by qianyy on 2023/1/16. +// +#include +#include +#include +#include "../src/table/data_block_builder.h" +#include "../src/log/log.h" +#include "../src/utils/codec.h" +#include "../src/db/offset_info.h" + +namespace smallkv { + TEST(data_block_builder, add) { + auto logger = log::get_logger(); + auto builder = std::make_unique(); + + /* + * 每条记录的前面有三个int32_t, 然后才是key、value + * Record的schema如下: + * +--------------------+----------------------+---------------+----------------------+---------------+ + * | shared_key_len(4B) | unshared_key_len(4B) | value_len(4B) | unshared_key_content | value_content | + * +--------------------+----------------------+---------------+----------------------+---------------+ + * + * */ + builder->add("key_0", "val_0"); + builder->add("key_1", "val_1"); + builder->add("key_2", "val_2"); + EXPECT_EQ(builder->data().substr(12, 10), "key_0val_0"); + EXPECT_EQ(builder->data().substr(34, 6), "1val_1"); + EXPECT_EQ(builder->data().substr(52, 6), "2val_2"); + EXPECT_EQ(builder->data().size(), 12 + 10 + 12 + 6 + 12 + 6); + } + + TEST(data_block_builder, add_restart_points) { + auto logger = log::get_logger(); + auto builder = std::make_unique(); + builder->add("key_0", "val_0"); + builder->add("key_1", "val_1"); + builder->add("key_2", "val_2"); + EXPECT_EQ(builder->data().substr(12, 10), "key_0val_0"); + EXPECT_EQ(builder->data().substr(34, 6), "1val_1"); + EXPECT_EQ(builder->data().substr(52, 6), "2val_2"); + EXPECT_EQ(builder->data().size(), 58); // 插入了三条数据,此时全部size应该为58 + /* + * Restart_Point的schema如下: + * +----------------+----------------+ + * | record_num(4B) | OffsetInfo(8B) | + * +----------------+----------------+ + * + * */ + // 添加重启点 + builder->add_restart_points(); + EXPECT_EQ(builder->data().size(), 74); // 加上重启点和Restart Point Num后,全体size应该为58+12+4=74B + EXPECT_EQ(utils::DecodeFixed32(builder->data().substr(58, 4).data()), 3); // record_num == 3 + OffsetInfo offsetInfo = *reinterpret_cast( + builder->data().substr(62, 8).data()); // OffsetInfo + EXPECT_EQ(offsetInfo.size, 58); // 第一个Record Group的大小 + EXPECT_EQ(offsetInfo.offset, 0); // 第一个Record Group的偏移量 + + EXPECT_EQ(utils::DecodeFixed32(builder->data().substr(70, 4).data()), 1); // restart_point_num == 1 + } + + TEST(data_block_builder, all) { + //todo: 需要写一个iterator模块,来遍历data_block,否则debug太复杂 + + // todo:测试大数据量情况下的add和add_restart_points + + + } +} \ No newline at end of file