Skip to content
This repository has been archived by the owner on Oct 14, 2023. It is now read-only.

Commit

Permalink
Merge pull request #6 from yangyang233333/sstable
Browse files Browse the repository at this point in the history
1. 修改data_block_builder.cpp中的前缀压缩bug
  • Loading branch information
yangyang233333 authored Jan 17, 2023
2 parents 19e678d + 0e6b019 commit 880e4bf
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 77 deletions.
131 changes: 66 additions & 65 deletions src/table/data_block_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,84 +6,85 @@
#include "sst_config.h"
#include "data_block_builder.h"
#include "../utils/codec.h"
#include "../log/log.h"

namespace smallkv {
// todo: 热点函数,后续需要更多优化
DBStatus DataBlockBuilder::add(const std::string_view &key, const std::string_view &value) {
if (key.empty()) {
return Status::Success;
}
bool need_fullkey = false;
if (record_num % SSTConfigInfo::RESTART_INTERVAL == 0) { // 判断是否需要插入重启点
need_fullkey = true; // 第一条数据需要完整key
record_group_offset.push_back(static_cast<int32_t>(get_records_size()));
}
++record_num; // DataBlock中的Record的数量加一
bool need_fullkey = false;
if (record_num % SSTConfigInfo::RESTART_INTERVAL == 0) { // 判断是否需要插入重启点
need_fullkey = true; // 第一条数据需要完整key
record_group_offset.push_back(static_cast<int32_t>(get_records_size()));
}
++record_num; // DataBlock中的Record的数量加一

if (need_fullkey) {
// 是Record Group的第一条数据,不需要压缩
utils::PutFixed32(_data, 0); // shared_key_len
utils::PutFixed32(_data, static_cast<int32_t>(key.size())); // unshared_key_len
utils::PutFixed32(_data, static_cast<int32_t>(value.size())); // value_len
_data.append(key); // unshared_key_content
_data.append(value); // value_content
} else {
// 非第一条数据,压缩
// 计算当前key和前一个key的公共部分
auto min_len = std::min(key.size(), pre_key.size());
int shared_key_len = 0;
for (int i = 0; i < min_len; ++i) {
if (key[i] == pre_key[i]) {
++shared_key_len;
} else {
break;
}
if (need_fullkey) {
// 是Record Group的第一条数据,不需要压缩
utils::PutFixed32(_data, 0); // shared_key_len
utils::PutFixed32(_data, static_cast<int32_t>(key.size())); // unshared_key_len
utils::PutFixed32(_data, static_cast<int32_t>(value.size())); // value_len
_data.append(key); // unshared_key_content
_data.append(value); // value_content
} else {
// 非第一条数据,压缩
// 计算当前key和前一个key的公共部分
auto min_len = std::min(key.size(), pre_key.size());
int shared_key_len = 0;
for (int i = 0; i < min_len; ++i) {
if (key[i] == pre_key[i]) {
++shared_key_len;
} else {
break;
}
int unshared_key_len = static_cast<int>(key.size()) - shared_key_len;

// todo: 这里可以尝试一下variant编码
utils::PutFixed32(_data, shared_key_len);
utils::PutFixed32(_data, unshared_key_len);
utils::PutFixed32(_data, static_cast<int32_t>(value.size()));
_data.append(key);
_data.append(value);
}
pre_key = key; // 更新前一个key
int unshared_key_len = static_cast<int>(key.size()) - shared_key_len;

return Status::Success;
// todo: 这里可以尝试一下variant编码
utils::PutFixed32(_data, shared_key_len);
utils::PutFixed32(_data, unshared_key_len);
utils::PutFixed32(_data, static_cast<int32_t>(value.size()));
_data.append(key.substr(shared_key_len));
_data.append(value);
}
pre_key = key; // 更新前一个key

DBStatus DataBlockBuilder::add_restart_points() {
assert(get_records_size() >= SSTConfigInfo::MAX_DATA_BLOCK_SIZE);
int restart_point_num = static_cast<int>((record_num - 0.5) / 16) + 1;
int last_offset = static_cast<int>(_data.size()); // 记录_data的最后一个字节的位置
for (int i = 0; i < record_group_offset.size(); ++i) {
//
// Restart_Point的schema如下:
// +----------------+----------------+
// | record_num(4B) | OffsetInfo(8B) |
// +----------------+----------------+
//
int32_t _record_num; // 当前Record Group中的Record数量
OffsetInfo _offsetInfo{0, record_group_offset[i]};
if (i != record_group_offset.size() - 1) {
_offsetInfo.size = record_group_offset[i + 1] - record_group_offset[i];
_record_num = SSTConfigInfo::RESTART_INTERVAL;
} else {
_offsetInfo.size = last_offset - record_group_offset[i];
_record_num = record_num % SSTConfigInfo::RESTART_INTERVAL;
}
utils::PutFixed32(_data, _record_num);
utils::PutFixed64(_data, *reinterpret_cast<int64_t *>(&_offsetInfo));
return Status::Success;
}

DBStatus DataBlockBuilder::add_restart_points() {
int restart_point_num = static_cast<int>((record_num - 0.5) / 16) + 1;
int last_offset = static_cast<int>(_data.size()); // 记录_data的最后一个字节的位置
log::get_logger()->info("record_group_offset.size={}", record_group_offset.size());
for (int i = 0; i < record_group_offset.size(); ++i) {
//
// Restart_Point的schema如下:
// +----------------+----------------+
// | record_num(4B) | OffsetInfo(8B) |
// +----------------+----------------+
//
int32_t _record_num; // 当前Record Group中的Record数量
OffsetInfo _offsetInfo{0, record_group_offset[i]};
if (i != record_group_offset.size() - 1) {
_offsetInfo.size = record_group_offset[i + 1] - record_group_offset[i];
_record_num = SSTConfigInfo::RESTART_INTERVAL;
} else {
_offsetInfo.size = last_offset - record_group_offset[i];
_record_num = record_num % SSTConfigInfo::RESTART_INTERVAL;
}
utils::PutFixed32(_data, restart_point_num);
return Status::Success;
utils::PutFixed32(_data, _record_num);
utils::PutFixed64(_data, *reinterpret_cast<int64_t *>(&_offsetInfo));
}
utils::PutFixed32(_data, restart_point_num);
return Status::Success;
}

void DataBlockBuilder::reset() {
record_group_offset.clear();
pre_key = "";
_data = "";
record_num = 0;
}
}
void DataBlockBuilder::reset() {
record_group_offset.clear();
pre_key = "";
_data = "";
record_num = 0;
}
}
2 changes: 1 addition & 1 deletion src/table/data_block_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ namespace smallkv {
// 当Block中的所有Record的大小超过阈值时,停止写入,并且生成重启点等信息
DBStatus add_restart_points();

const char *data() { return _data.c_str(); }
std::string_view data() { return _data; }

// 清空DataBlock中的所有数据
void reset();
Expand Down
24 changes: 22 additions & 2 deletions src/utils/codec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@

namespace smallkv::utils {

void EncodeFixed32(char *buf, int32_t val) {
void EncodeFixed32(char *buf, uint32_t val) {
buf[0] = val & 0xff;
buf[1] = (val >> 8) & 0xff;
buf[2] = (val >> 16) & 0xff;
buf[3] = (val >> 24) & 0xff;
}

void EncodeFixed64(char *buf, int64_t val) {
void EncodeFixed64(char *buf, uint64_t val) {
buf[0] = val & 0xff;
buf[1] = (val >> 8) & 0xff;
buf[2] = (val >> 16) & 0xff;
Expand All @@ -24,4 +24,24 @@ namespace smallkv::utils {
buf[7] = (val >> 56) & 0xff;
}

uint32_t DecodeFixed32(const char *data) {
auto _data = reinterpret_cast<const uint8_t *>(data);
return static_cast<uint32_t>(_data[0]) |
(static_cast<uint32_t>(_data[1]) << 8) |
(static_cast<uint32_t>(_data[2]) << 16) |
(static_cast<uint32_t>(_data[3]) << 24);
}

uint64_t DecodeFixed64(const char *data) {
auto _data = reinterpret_cast<const uint8_t *>(data);
return static_cast<uint64_t>(_data[0]) |
(static_cast<uint64_t>(_data[1]) << 8) |
(static_cast<uint64_t>(_data[2]) << 16) |
(static_cast<uint64_t>(_data[3]) << 24) |
(static_cast<uint64_t>(_data[4]) << 32) |
(static_cast<uint64_t>(_data[5]) << 40) |
(static_cast<uint64_t>(_data[6]) << 48) |
(static_cast<uint64_t>(_data[7]) << 56);
}

}
15 changes: 6 additions & 9 deletions src/utils/codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,21 @@
#define SMALLKV_CODEC_H
namespace smallkv::utils {
// 编解码
void EncodeFixed32(char *buf, int32_t val);
void EncodeFixed32(char *buf, uint32_t val);

void EncodeFixed64(char *buf, int64_t val);
void EncodeFixed64(char *buf, uint64_t val);

// todo
void DecodeFixed32();
uint32_t DecodeFixed32(const char *data);

// todo
void DecodeFixed64();
uint64_t DecodeFixed64(const char *data);


inline void PutFixed32(std::string &dst, int32_t val) {
inline void PutFixed32(std::string &dst, uint32_t val) {
char buf[sizeof(val)];
EncodeFixed32(buf, val);
dst.append(buf, sizeof(val));
}

inline void PutFixed64(std::string &dst, int64_t val) {
inline void PutFixed64(std::string &dst, uint64_t val) {
char buf[sizeof(val)];
EncodeFixed64(buf, val);
dst.append(buf, sizeof(val));
Expand Down
70 changes: 70 additions & 0 deletions tests/test_data_block_builder.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
//
// Created by qianyy on 2023/1/16.
//
#include <memory>
#include <gtest/gtest.h>
#include <string>
#include "../src/table/data_block_builder.h"
#include "../src/log/log.h"
#include "../src/utils/codec.h"
#include "../src/db/offset_info.h"

namespace smallkv {
TEST(data_block_builder, add) {
auto logger = log::get_logger();
auto builder = std::make_unique<DataBlockBuilder>();

/*
* 每条记录的前面有三个int32_t, 然后才是key、value
* Record的schema如下:
* +--------------------+----------------------+---------------+----------------------+---------------+
* | shared_key_len(4B) | unshared_key_len(4B) | value_len(4B) | unshared_key_content | value_content |
* +--------------------+----------------------+---------------+----------------------+---------------+
*
* */
builder->add("key_0", "val_0");
builder->add("key_1", "val_1");
builder->add("key_2", "val_2");
EXPECT_EQ(builder->data().substr(12, 10), "key_0val_0");
EXPECT_EQ(builder->data().substr(34, 6), "1val_1");
EXPECT_EQ(builder->data().substr(52, 6), "2val_2");
EXPECT_EQ(builder->data().size(), 12 + 10 + 12 + 6 + 12 + 6);
}

TEST(data_block_builder, add_restart_points) {
auto logger = log::get_logger();
auto builder = std::make_unique<DataBlockBuilder>();
builder->add("key_0", "val_0");
builder->add("key_1", "val_1");
builder->add("key_2", "val_2");
EXPECT_EQ(builder->data().substr(12, 10), "key_0val_0");
EXPECT_EQ(builder->data().substr(34, 6), "1val_1");
EXPECT_EQ(builder->data().substr(52, 6), "2val_2");
EXPECT_EQ(builder->data().size(), 58); // 插入了三条数据,此时全部size应该为58
/*
* Restart_Point的schema如下:
* +----------------+----------------+
* | record_num(4B) | OffsetInfo(8B) |
* +----------------+----------------+
*
* */
// 添加重启点
builder->add_restart_points();
EXPECT_EQ(builder->data().size(), 74); // 加上重启点和Restart Point Num后,全体size应该为58+12+4=74B
EXPECT_EQ(utils::DecodeFixed32(builder->data().substr(58, 4).data()), 3); // record_num == 3
OffsetInfo offsetInfo = *reinterpret_cast<const OffsetInfo *>(
builder->data().substr(62, 8).data()); // OffsetInfo
EXPECT_EQ(offsetInfo.size, 58); // 第一个Record Group的大小
EXPECT_EQ(offsetInfo.offset, 0); // 第一个Record Group的偏移量

EXPECT_EQ(utils::DecodeFixed32(builder->data().substr(70, 4).data()), 1); // restart_point_num == 1
}

TEST(data_block_builder, all) {
//todo: 需要写一个iterator模块,来遍历data_block,否则debug太复杂

// todo:测试大数据量情况下的add和add_restart_points


}
}

0 comments on commit 880e4bf

Please sign in to comment.