From eeecacc82b79d51a272bafea45121500310fd351 Mon Sep 17 00:00:00 2001 From: qian yangyang Date: Sun, 29 Jan 2023 11:05:40 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0DB=E7=9A=84=E8=AF=BB=E5=86=99?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3=EF=BC=9APut=E3=80=81Get(=E9=83=A8=E5=88=86?= =?UTF-8?q?=E5=AE=8C=E6=88=90)=E3=80=81Delete;?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Changes to be committed: new file: src/db/db.cpp new file: src/db/db.h new file: src/db/db_impl.cpp new file: src/db/db_impl.h new file: src/db/options.h new file: tests/test_db.cpp --- src/db/db.cpp | 40 +++++++++ src/db/db.h | 49 +++++++++++ src/db/db_impl.cpp | 204 +++++++++++++++++++++++++++++++++++++++++++++ src/db/db_impl.h | 80 ++++++++++++++++++ src/db/options.h | 60 +++++++++++++ tests/test_db.cpp | 43 ++++++++++ 6 files changed, 476 insertions(+) create mode 100644 src/db/db.cpp create mode 100644 src/db/db.h create mode 100644 src/db/db_impl.cpp create mode 100644 src/db/db_impl.h create mode 100644 src/db/options.h create mode 100644 tests/test_db.cpp diff --git a/src/db/db.cpp b/src/db/db.cpp new file mode 100644 index 0000000..3a18f23 --- /dev/null +++ b/src/db/db.cpp @@ -0,0 +1,40 @@ +// +// Created by qianyy on 2023/1/28. +// +#include "db.h" +#include "db_impl.h" + +namespace smallkv { + DB::DB(const Options &options) { + db_impl = std::make_unique(options); + } + + DBStatus DB::Put(const WriteOptions &options, + const std::string_view &key, + const std::string_view &value) { + return db_impl->Put(options, key, value); + } + + DBStatus DB::Delete(const WriteOptions &options, + const std::string_view &key) { + return db_impl->Delete(options, key); + } + + DBStatus DB::Get(const ReadOptions &options, + const std::string_view &key, + std::string *value) { + return db_impl->Get(options, key, value); + } + + DBStatus DB::BatchPut(const WriteOptions &options) { + return db_impl->BatchPut(options); + } + + DBStatus DB::BatchDelete(const ReadOptions &options) { + return db_impl->BatchDelete(options); + } + + DBStatus DB::Close() { + return db_impl->Close(); + } +} diff --git a/src/db/db.h b/src/db/db.h new file mode 100644 index 0000000..0b77838 --- /dev/null +++ b/src/db/db.h @@ -0,0 +1,49 @@ +// +// Created by qianyy on 2023/1/27. +// +#include +#include +#include "status.h" +#include "options.h" + +#ifndef SMALLKV_DB_H +#define SMALLKV_DB_H +namespace smallkv { + class DBImpl; + + class DB { + public: + explicit DB(const Options& options); + + ~DB() = default; + + // DB 应该是单例,禁止拷贝、赋值 + DB(const DB &) = delete; + + DB &operator=(const DB &) = delete; + + DBStatus Put(const WriteOptions &options, + const std::string_view &key, + const std::string_view &value); + + DBStatus Delete(const WriteOptions &options, + const std::string_view &key); + + // 将Key对应的值写到value地址上 + DBStatus Get(const ReadOptions &options, + const std::string_view &key, + std::string *value); + + // 批写 + DBStatus BatchPut(const WriteOptions &options); + + DBStatus BatchDelete(const ReadOptions &options); + + // 关闭数据库:调用此函数可以保证所有已写入数据会被持久化到磁盘, + DBStatus Close(); + + private: + std::unique_ptr db_impl; + }; +} +#endif //SMALLKV_DB_H diff --git a/src/db/db_impl.cpp b/src/db/db_impl.cpp new file mode 100644 index 0000000..7bb63bf --- /dev/null +++ b/src/db/db_impl.cpp @@ -0,0 +1,204 @@ +// +// Created by qianyy on 2023/1/28. +// + +#include + +#include "db_impl.h" +#include "cache/cache.h" +#include "utils/codec.h" +#include "memory/allocate.h" +#include "memtable/memtable.h" +#include "wal/wal_writer.h" +#include "file/file_writer.h" +#include "table/sstable_builder.h" + +namespace smallkv { + DBImpl::DBImpl(Options options) : options_(std::move(options)) { + alloc = std::make_shared(); + mem_table = std::make_shared(alloc); + logger = log::get_logger(); + auto file_writer_ = std::make_shared(options_.DB_DIR); + wal_writer = std::make_shared(file_writer_); + + cache = std::make_shared>(options_.CACHE_SIZE); + cache->register_clean_handle([](const std::string &key, std::string *val) { + delete val; + }); + } + + DBStatus DBImpl::Put(const WriteOptions &options, + const std::string_view &key, + const std::string_view &value) { + assert(closed == false); + /* + * 写逻辑: + * 1. 写WAL(fsync同步); + * 2. 写memtable; + * 3. 写缓存(提高读性能); + * 4. 如果memtable超限,应该落盘,并且开启一个新的memtable; + * + * */ + std::unique_lock wlock(rwlock_); + + // 1. 写WAL + char buf[8 + key.size() + value.size()]; + EncodeKV(key, value, buf); // 将K-V编码到buf中 + wal_writer->AddLog(buf); + + // 2. 写memtable + if (mem_table->Contains(key)) { // Update + mem_table->Update(key, value); + } else { // New Insert + mem_table->Add(key, value); + } + + // 3. 写缓存 + // todo: 写入时候不一定需要写入缓存. 如果一次性写入大量数据,实际上不需要每次 + // 都更新缓存,可以设置一种动态的、热点感知的缓存机制。后续有空优化。 + // todo: 此处采用new std::string()性能很差,后续需要修改底层的cache接口。 + cache->insert(key.data(), new std::string(value.data())); + + // 4. 判断MemTable是否超限, 如果超限应该转为L1SST后持久化 + if (mem_table->GetMemUsage() >= options_.MEM_TABLE_MAX_SIZE) { + MemTableToSST(); // 将memtable转为sst + + // 开启写的memtable + mem_table = std::make_shared(alloc); + logger->info("[DBImpl::Put] A new mem_table is created."); + } + return Status::Success; + } + + DBStatus DBImpl::Delete(const WriteOptions &options, + const std::string_view &key) { + assert(closed == false); + /* + * 删除逻辑: + * 1. 写WAL; + * 2. 写memtable; + * 3. 删除缓存; + * 4. 如果memtable超限,应该落盘,并且开启一个新的memtable; + * */ + std::unique_lock wlock(rwlock_); + + // 1. 写WAL + char buf[8 + key.size()]; // 用vel_len=0表示val为空 + EncodeKV(key, "", buf); + wal_writer->AddLog(buf); + + // 2. 写memtable + if (mem_table->Contains(key)) { // 原地标记val=""表示删除 + mem_table->Delete(key); + } else { + mem_table->Add(key, ""); // 墓碑机制 + } + + // 3. 删除缓存 + cache->erase(key.data()); + + // 4. 检查memtable是否超限 + if (mem_table->GetMemUsage() >= options_.MEM_TABLE_MAX_SIZE) { + MemTableToSST(); // 将memtable转为sst + + // 开启写的memtable + mem_table = std::make_shared(alloc); + logger->info("[DBImpl::Delete] A new mem_table is created."); + } + return Status::Success; + } + + DBStatus DBImpl::Get(const ReadOptions &options, + const std::string_view &key, + std::string *value) { + assert(closed == false); + /* + * 读逻辑: + * 1. 读缓存,有则直接返回,否则进入2; + * 2. 依次从memtable、sst文件向下查找; + * 3. 找到的数据写入缓存; + * 4. 返回结果; + * + * */ + std::shared_lock rlock(rwlock_); + + // 1. 读缓存 + if (cache->contains(key.data())) { + *value = *(cache->get(key.data())->val); + return Status::Success; + } + + // 2. 读memtable + if (mem_table->Contains(key)) { + auto val = mem_table->Get(key); + *value = mem_table->Get(key.data()).value(); + return Status::Success; + } + + // 3. 依次读sst文件 + // todo: 后续实现 + + // 4. 找到的数据写入缓存 + // todo + + + return Status::Success; + } + + DBStatus DBImpl::BatchPut(const WriteOptions &options) { + std::unique_lock wlock(rwlock_); + assert(closed == false); + // todo: 稍后实现 + return Status::NotImpl; + } + + DBStatus DBImpl::BatchDelete(const ReadOptions &options) { + std::unique_lock wlock(rwlock_); + assert(closed == false); + // todo: 稍后实现 + return Status::NotImpl; + } + + void DBImpl::EncodeKV(const std::string_view &key, + const std::string_view &value, + char *buf) { + /* + * 暂时采用的编码方法如下: + * +-------------+-----+-------------+-----+ + * | key_len(4B) | key | val_len(4B) | val | + * +-------------+-----+-------------+-----+ + * todo: 存在优化空间,例如使用variant等,后续有空再说 + * + * */ + assert(value.size() < UINT32_MAX); + utils::EncodeFixed32(buf, key.size()); + memcpy(buf + 4, key.data(), key.size()); + utils::EncodeFixed32(buf + 4 + key.size(), value.size()); + memcpy(buf + 4 + key.size() + 4, value.data(), value.size()); + } + + void DBImpl::MemTableToSST() { + // todo: 此处采用同步方法(为了debug方便),后续需要修改为异步 + + // 格式为/.../level_n_sst_i.sst + auto sst_filepath = options_.STORAGE_DIR + "/" + utils::BuildSSTPath(0, options_.LISST_NUM); + logger->info("DBImpl::MemTableToSST() is called. sst_filepath={}", sst_filepath); + + auto file_writer = std::make_shared(sst_filepath); + auto sstable_builder = std::make_shared(mem_table->GetSize(), file_writer); + mem_table->ConvertToL1SST(sst_filepath, sstable_builder); + + ++options_.LISST_NUM; // 下一个sst文件序号+1 + } + + DBStatus DBImpl::Close() { + if (!closed && mem_table->GetSize() > 0) { + // memtable中有数据,就应该落盘 + MemTableToSST(); + + closed = true; + } + logger->info("DB is closed."); + return Status::Success; + } +} diff --git a/src/db/db_impl.h b/src/db/db_impl.h new file mode 100644 index 0000000..0f6394b --- /dev/null +++ b/src/db/db_impl.h @@ -0,0 +1,80 @@ +// +// Created by qianyy on 2023/1/28. +// +#include +#include +#include +#include "status.h" +#include "options.h" +#include "log/log.h" + +#ifndef SMALLKV_DB_IMPL_H +#define SMALLKV_DB_IMPL_H + +namespace smallkv { + template + class Cache; + + class MemTable; + + class WALWriter; + + class FreeListAllocate; + + /* + * 支持并发,线程安全 + * + * */ + class DBImpl { + public: + explicit DBImpl(Options options); + + ~DBImpl() = default; + + // 同时具备Set和Update语义 + DBStatus Put(const WriteOptions &options, + const std::string_view &key, + const std::string_view &value); + + DBStatus Delete(const WriteOptions &options, + const std::string_view &key); + + // 将Key对应的值写到value地址上 + DBStatus Get(const ReadOptions &options, + const std::string_view &key, + std::string *value); + + // 关闭数据库:调用此函数可以保证所有已写入数据会被持久化到磁盘, + DBStatus Close(); + + // 批写 + DBStatus BatchPut(const WriteOptions &options); + + DBStatus BatchDelete(const ReadOptions &options); + + private: + // 将 KV 编码到 buf 中, 必须确保buf长度为8 + key.size() + value.size() + static void EncodeKV(const std::string_view &key, + const std::string_view &value, + char *buf); + + // 将memtable转为sst + void MemTableToSST(); + + private: + std::shared_ptr mem_table; // active memtable + std::shared_ptr logger; // 日志 + std::shared_ptr alloc; // 内存分配器 + std::shared_ptr wal_writer; // 写wal + + std::shared_ptr> cache; // 缓存 + + Options options_; // 配置信息 + + std::shared_mutex rwlock_; // 读写锁 + + bool closed = false; // 表示数据库没有关闭 + }; +} + +#endif //SMALLKV_DB_IMPL_H diff --git a/src/db/options.h b/src/db/options.h new file mode 100644 index 0000000..0c9a3a1 --- /dev/null +++ b/src/db/options.h @@ -0,0 +1,60 @@ +// +// Created by qianyy on 2023/1/27. +// +#include + +#ifndef SMALLKV_OPTIONS_H +#define SMALLKV_OPTIONS_H +namespace smallkv { + // DB的配置信息,如是否开启同步、缓存池等 + struct Options { + //todo: 之前的配置信息已经写到了xxx_config中,后续应该集中到这里 + + // 数据库的存储目录,需要自定义. 例如修改为:"/home/db_storage" + std::string DB_DIR = "/mnt/c/Users/abc/Desktop/smallkv_proj/smallkv/db_storage"; + + // MEM_TABLE的最大大小,超过了就应该落盘 + size_t MEM_TABLE_MAX_SIZE = 4 * 1024 * 1024; // 4MB + + // 缓存的键值对数量 + uint32_t CACHE_SIZE = 4096; + + std::string STORAGE_DIR = "./storage"; + + // 表示当前L1SST的序号。 L1SST的命名类似level_1_sst_0.sst, level_1_sst_1.sst, .... + // 开始的时候需要扫描 STORAGE_DIR 目录,找到下一个sst的LISST_NUM + uint32_t LISST_NUM = 0; + }; + + inline Options MakeOptionsForDebugging() { + return Options{}; + } + + inline Options MakeOptionsForProduction() { + + } + + // 读时候的配置信息 + struct ReadOptions { + // 扩展性备用接口。 + }; + + //写时候的配置信息 + struct WriteOptions { + /* + * 注:C库缓冲 --fflush--> 内核缓冲 --fsync--> 磁盘 + * 解释: + * 1. fsync系统调用可以强制每次写入都被更新到磁盘中,在open()中添加O_SYNC也由此效果; + * 2. fflush是一个在C语言标准输入输出库中的函数,功能是冲洗流中的信息,该函数通常用于 + * 处理磁盘文件。fflush()会强迫将缓冲区内的数据写回参数stream 指定的文件中。 + * 一般地,fsync也不能保证100%安全,因为现在的磁盘也有缓存(比如固态硬盘可能有外置DRAM缓存), + * 如果断电数据也可能会丢失。但是企业级硬盘一般有备用电源,并且很多固态的缓存是用的SLC颗粒(断电不丢失), + * 所以基本可以认为fsync可以保证数据安全。 + * + * */ + // 此处的flush和fflush语义相同,实际上flush不需要设置为true,因为WAL已经保证了数据安全(fsync)。 + // todo: Flush这个开关暂时无效,后续有空实现 + bool Flush = false; + }; +} +#endif //SMALLKV_OPTIONS_H diff --git a/tests/test_db.cpp b/tests/test_db.cpp new file mode 100644 index 0000000..ab21e91 --- /dev/null +++ b/tests/test_db.cpp @@ -0,0 +1,43 @@ +// +// Created by qianyy on 2023/1/29. +// +#include +#include +#include +#include "db/options.h" +#include "db/db.h" +#include "db/db_impl.h" + +namespace smallkv::unittest { + TEST(DB, Put_Get) { + auto logger = log::get_logger(); + auto test_options = MakeOptionsForDebugging(); + auto db_holder = std::make_unique(test_options); + WriteOptions wOp; + ReadOptions rOp; + // 生成测试数据 + const int N = 1000; + std::vector data_key, data_val; + for (int i = 0; i < N; ++i) { + data_key.push_back("key_" + std::to_string(i)); + data_val.push_back("val_" + std::to_string(i)); + } + std::sort(data_key.begin(), data_key.end()); + std::sort(data_val.begin(), data_val.end()); + + // 插入数据 + for (int i = 0; i < N; ++i) { + db_holder->Put(wOp, data_key[i], data_val[i]); + } + + // 检查数据 + std::string *value = new std::string(); + for (int i = 0; i < N; ++i) { + EXPECT_EQ(db_holder->Get(rOp, data_key[i], value), Status::Success); + EXPECT_EQ(*value, data_val[i]); + value->clear(); + } + + db_holder->Close(); + } +} \ No newline at end of file