Skip to content
This repository has been archived by the owner on Oct 14, 2023. It is now read-only.

1. 修改data_block_builder.cpp中的前缀压缩bug #6

Merged
merged 1 commit into from
Jan 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 66 additions & 65 deletions src/table/data_block_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,84 +6,85 @@
#include "sst_config.h"
#include "data_block_builder.h"
#include "../utils/codec.h"
#include "../log/log.h"

namespace smallkv {
// todo: 热点函数,后续需要更多优化
DBStatus DataBlockBuilder::add(const std::string_view &key, const std::string_view &value) {
if (key.empty()) {
return Status::Success;
}
bool need_fullkey = false;
if (record_num % SSTConfigInfo::RESTART_INTERVAL == 0) { // 判断是否需要插入重启点
need_fullkey = true; // 第一条数据需要完整key
record_group_offset.push_back(static_cast<int32_t>(get_records_size()));
}
++record_num; // DataBlock中的Record的数量加一
bool need_fullkey = false;
if (record_num % SSTConfigInfo::RESTART_INTERVAL == 0) { // 判断是否需要插入重启点
need_fullkey = true; // 第一条数据需要完整key
record_group_offset.push_back(static_cast<int32_t>(get_records_size()));
}
++record_num; // DataBlock中的Record的数量加一

if (need_fullkey) {
// 是Record Group的第一条数据,不需要压缩
utils::PutFixed32(_data, 0); // shared_key_len
utils::PutFixed32(_data, static_cast<int32_t>(key.size())); // unshared_key_len
utils::PutFixed32(_data, static_cast<int32_t>(value.size())); // value_len
_data.append(key); // unshared_key_content
_data.append(value); // value_content
} else {
// 非第一条数据,压缩
// 计算当前key和前一个key的公共部分
auto min_len = std::min(key.size(), pre_key.size());
int shared_key_len = 0;
for (int i = 0; i < min_len; ++i) {
if (key[i] == pre_key[i]) {
++shared_key_len;
} else {
break;
}
if (need_fullkey) {
// 是Record Group的第一条数据,不需要压缩
utils::PutFixed32(_data, 0); // shared_key_len
utils::PutFixed32(_data, static_cast<int32_t>(key.size())); // unshared_key_len
utils::PutFixed32(_data, static_cast<int32_t>(value.size())); // value_len
_data.append(key); // unshared_key_content
_data.append(value); // value_content
} else {
// 非第一条数据,压缩
// 计算当前key和前一个key的公共部分
auto min_len = std::min(key.size(), pre_key.size());
int shared_key_len = 0;
for (int i = 0; i < min_len; ++i) {
if (key[i] == pre_key[i]) {
++shared_key_len;
} else {
break;
}
int unshared_key_len = static_cast<int>(key.size()) - shared_key_len;

// todo: 这里可以尝试一下variant编码
utils::PutFixed32(_data, shared_key_len);
utils::PutFixed32(_data, unshared_key_len);
utils::PutFixed32(_data, static_cast<int32_t>(value.size()));
_data.append(key);
_data.append(value);
}
pre_key = key; // 更新前一个key
int unshared_key_len = static_cast<int>(key.size()) - shared_key_len;

return Status::Success;
// todo: 这里可以尝试一下variant编码
utils::PutFixed32(_data, shared_key_len);
utils::PutFixed32(_data, unshared_key_len);
utils::PutFixed32(_data, static_cast<int32_t>(value.size()));
_data.append(key.substr(shared_key_len));
_data.append(value);
}
pre_key = key; // 更新前一个key

DBStatus DataBlockBuilder::add_restart_points() {
assert(get_records_size() >= SSTConfigInfo::MAX_DATA_BLOCK_SIZE);
int restart_point_num = static_cast<int>((record_num - 0.5) / 16) + 1;
int last_offset = static_cast<int>(_data.size()); // 记录_data的最后一个字节的位置
for (int i = 0; i < record_group_offset.size(); ++i) {
//
// Restart_Point的schema如下:
// +----------------+----------------+
// | record_num(4B) | OffsetInfo(8B) |
// +----------------+----------------+
//
int32_t _record_num; // 当前Record Group中的Record数量
OffsetInfo _offsetInfo{0, record_group_offset[i]};
if (i != record_group_offset.size() - 1) {
_offsetInfo.size = record_group_offset[i + 1] - record_group_offset[i];
_record_num = SSTConfigInfo::RESTART_INTERVAL;
} else {
_offsetInfo.size = last_offset - record_group_offset[i];
_record_num = record_num % SSTConfigInfo::RESTART_INTERVAL;
}
utils::PutFixed32(_data, _record_num);
utils::PutFixed64(_data, *reinterpret_cast<int64_t *>(&_offsetInfo));
return Status::Success;
}

DBStatus DataBlockBuilder::add_restart_points() {
int restart_point_num = static_cast<int>((record_num - 0.5) / 16) + 1;
int last_offset = static_cast<int>(_data.size()); // 记录_data的最后一个字节的位置
log::get_logger()->info("record_group_offset.size={}", record_group_offset.size());
for (int i = 0; i < record_group_offset.size(); ++i) {
//
// Restart_Point的schema如下:
// +----------------+----------------+
// | record_num(4B) | OffsetInfo(8B) |
// +----------------+----------------+
//
int32_t _record_num; // 当前Record Group中的Record数量
OffsetInfo _offsetInfo{0, record_group_offset[i]};
if (i != record_group_offset.size() - 1) {
_offsetInfo.size = record_group_offset[i + 1] - record_group_offset[i];
_record_num = SSTConfigInfo::RESTART_INTERVAL;
} else {
_offsetInfo.size = last_offset - record_group_offset[i];
_record_num = record_num % SSTConfigInfo::RESTART_INTERVAL;
}
utils::PutFixed32(_data, restart_point_num);
return Status::Success;
utils::PutFixed32(_data, _record_num);
utils::PutFixed64(_data, *reinterpret_cast<int64_t *>(&_offsetInfo));
}
utils::PutFixed32(_data, restart_point_num);
return Status::Success;
}

void DataBlockBuilder::reset() {
record_group_offset.clear();
pre_key = "";
_data = "";
record_num = 0;
}
}
void DataBlockBuilder::reset() {
record_group_offset.clear();
pre_key = "";
_data = "";
record_num = 0;
}
}
2 changes: 1 addition & 1 deletion src/table/data_block_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ namespace smallkv {
// 当Block中的所有Record的大小超过阈值时,停止写入,并且生成重启点等信息
DBStatus add_restart_points();

const char *data() { return _data.c_str(); }
std::string_view data() { return _data; }

// 清空DataBlock中的所有数据
void reset();
Expand Down
24 changes: 22 additions & 2 deletions src/utils/codec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@

namespace smallkv::utils {

void EncodeFixed32(char *buf, int32_t val) {
void EncodeFixed32(char *buf, uint32_t val) {
buf[0] = val & 0xff;
buf[1] = (val >> 8) & 0xff;
buf[2] = (val >> 16) & 0xff;
buf[3] = (val >> 24) & 0xff;
}

void EncodeFixed64(char *buf, int64_t val) {
void EncodeFixed64(char *buf, uint64_t val) {
buf[0] = val & 0xff;
buf[1] = (val >> 8) & 0xff;
buf[2] = (val >> 16) & 0xff;
Expand All @@ -24,4 +24,24 @@ namespace smallkv::utils {
buf[7] = (val >> 56) & 0xff;
}

uint32_t DecodeFixed32(const char *data) {
auto _data = reinterpret_cast<const uint8_t *>(data);
return static_cast<uint32_t>(_data[0]) |
(static_cast<uint32_t>(_data[1]) << 8) |
(static_cast<uint32_t>(_data[2]) << 16) |
(static_cast<uint32_t>(_data[3]) << 24);
}

uint64_t DecodeFixed64(const char *data) {
auto _data = reinterpret_cast<const uint8_t *>(data);
return static_cast<uint64_t>(_data[0]) |
(static_cast<uint64_t>(_data[1]) << 8) |
(static_cast<uint64_t>(_data[2]) << 16) |
(static_cast<uint64_t>(_data[3]) << 24) |
(static_cast<uint64_t>(_data[4]) << 32) |
(static_cast<uint64_t>(_data[5]) << 40) |
(static_cast<uint64_t>(_data[6]) << 48) |
(static_cast<uint64_t>(_data[7]) << 56);
}

}
15 changes: 6 additions & 9 deletions src/utils/codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,21 @@
#define SMALLKV_CODEC_H
namespace smallkv::utils {
// 编解码
void EncodeFixed32(char *buf, int32_t val);
void EncodeFixed32(char *buf, uint32_t val);

void EncodeFixed64(char *buf, int64_t val);
void EncodeFixed64(char *buf, uint64_t val);

// todo
void DecodeFixed32();
uint32_t DecodeFixed32(const char *data);

// todo
void DecodeFixed64();
uint64_t DecodeFixed64(const char *data);


inline void PutFixed32(std::string &dst, int32_t val) {
inline void PutFixed32(std::string &dst, uint32_t val) {
char buf[sizeof(val)];
EncodeFixed32(buf, val);
dst.append(buf, sizeof(val));
}

inline void PutFixed64(std::string &dst, int64_t val) {
inline void PutFixed64(std::string &dst, uint64_t val) {
char buf[sizeof(val)];
EncodeFixed64(buf, val);
dst.append(buf, sizeof(val));
Expand Down
70 changes: 70 additions & 0 deletions tests/test_data_block_builder.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
//
// Created by qianyy on 2023/1/16.
//
#include <memory>
#include <gtest/gtest.h>
#include <string>
#include "../src/table/data_block_builder.h"
#include "../src/log/log.h"
#include "../src/utils/codec.h"
#include "../src/db/offset_info.h"

namespace smallkv {
TEST(data_block_builder, add) {
auto logger = log::get_logger();
auto builder = std::make_unique<DataBlockBuilder>();

/*
* 每条记录的前面有三个int32_t, 然后才是key、value
* Record的schema如下:
* +--------------------+----------------------+---------------+----------------------+---------------+
* | shared_key_len(4B) | unshared_key_len(4B) | value_len(4B) | unshared_key_content | value_content |
* +--------------------+----------------------+---------------+----------------------+---------------+
*
* */
builder->add("key_0", "val_0");
builder->add("key_1", "val_1");
builder->add("key_2", "val_2");
EXPECT_EQ(builder->data().substr(12, 10), "key_0val_0");
EXPECT_EQ(builder->data().substr(34, 6), "1val_1");
EXPECT_EQ(builder->data().substr(52, 6), "2val_2");
EXPECT_EQ(builder->data().size(), 12 + 10 + 12 + 6 + 12 + 6);
}

TEST(data_block_builder, add_restart_points) {
auto logger = log::get_logger();
auto builder = std::make_unique<DataBlockBuilder>();
builder->add("key_0", "val_0");
builder->add("key_1", "val_1");
builder->add("key_2", "val_2");
EXPECT_EQ(builder->data().substr(12, 10), "key_0val_0");
EXPECT_EQ(builder->data().substr(34, 6), "1val_1");
EXPECT_EQ(builder->data().substr(52, 6), "2val_2");
EXPECT_EQ(builder->data().size(), 58); // 插入了三条数据,此时全部size应该为58
/*
* Restart_Point的schema如下:
* +----------------+----------------+
* | record_num(4B) | OffsetInfo(8B) |
* +----------------+----------------+
*
* */
// 添加重启点
builder->add_restart_points();
EXPECT_EQ(builder->data().size(), 74); // 加上重启点和Restart Point Num后,全体size应该为58+12+4=74B
EXPECT_EQ(utils::DecodeFixed32(builder->data().substr(58, 4).data()), 3); // record_num == 3
OffsetInfo offsetInfo = *reinterpret_cast<const OffsetInfo *>(
builder->data().substr(62, 8).data()); // OffsetInfo
EXPECT_EQ(offsetInfo.size, 58); // 第一个Record Group的大小
EXPECT_EQ(offsetInfo.offset, 0); // 第一个Record Group的偏移量

EXPECT_EQ(utils::DecodeFixed32(builder->data().substr(70, 4).data()), 1); // restart_point_num == 1
}

TEST(data_block_builder, all) {
//todo: 需要写一个iterator模块,来遍历data_block,否则debug太复杂

// todo:测试大数据量情况下的add和add_restart_points


}
}