Skip to content
This repository has been archived by the owner on Oct 14, 2023. It is now read-only.

Commit

Permalink
增加DB的读写接口:Put、Get(部分完成)、Delete;
Browse files Browse the repository at this point in the history
Changes to be committed:
	new file:   src/db/db.cpp
	new file:   src/db/db.h
	new file:   src/db/db_impl.cpp
	new file:   src/db/db_impl.h
	new file:   src/db/options.h
	new file:   tests/test_db.cpp
  • Loading branch information
yangyang233333 committed Jan 29, 2023
1 parent ac6457f commit eeecacc
Show file tree
Hide file tree
Showing 6 changed files with 476 additions and 0 deletions.
40 changes: 40 additions & 0 deletions src/db/db.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
//
// Created by qianyy on 2023/1/28.
//
#include "db.h"
#include "db_impl.h"

namespace smallkv {
DB::DB(const Options &options) {
db_impl = std::make_unique<DBImpl>(options);
}

DBStatus DB::Put(const WriteOptions &options,
const std::string_view &key,
const std::string_view &value) {
return db_impl->Put(options, key, value);
}

DBStatus DB::Delete(const WriteOptions &options,
const std::string_view &key) {
return db_impl->Delete(options, key);
}

DBStatus DB::Get(const ReadOptions &options,
const std::string_view &key,
std::string *value) {
return db_impl->Get(options, key, value);
}

DBStatus DB::BatchPut(const WriteOptions &options) {
return db_impl->BatchPut(options);
}

DBStatus DB::BatchDelete(const ReadOptions &options) {
return db_impl->BatchDelete(options);
}

DBStatus DB::Close() {
return db_impl->Close();
}
}
49 changes: 49 additions & 0 deletions src/db/db.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
//
// Created by qianyy on 2023/1/27.
//
#include <memory>
#include <string_view>
#include "status.h"
#include "options.h"

#ifndef SMALLKV_DB_H
#define SMALLKV_DB_H
namespace smallkv {
class DBImpl;

class DB {
public:
explicit DB(const Options& options);

~DB() = default;

// DB 应该是单例,禁止拷贝、赋值
DB(const DB &) = delete;

DB &operator=(const DB &) = delete;

DBStatus Put(const WriteOptions &options,
const std::string_view &key,
const std::string_view &value);

DBStatus Delete(const WriteOptions &options,
const std::string_view &key);

// 将Key对应的值写到value地址上
DBStatus Get(const ReadOptions &options,
const std::string_view &key,
std::string *value);

// 批写
DBStatus BatchPut(const WriteOptions &options);

DBStatus BatchDelete(const ReadOptions &options);

// 关闭数据库:调用此函数可以保证所有已写入数据会被持久化到磁盘,
DBStatus Close();

private:
std::unique_ptr<DBImpl> db_impl;
};
}
#endif //SMALLKV_DB_H
204 changes: 204 additions & 0 deletions src/db/db_impl.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
//
// Created by qianyy on 2023/1/28.
//

#include <utility>

#include "db_impl.h"
#include "cache/cache.h"
#include "utils/codec.h"
#include "memory/allocate.h"
#include "memtable/memtable.h"
#include "wal/wal_writer.h"
#include "file/file_writer.h"
#include "table/sstable_builder.h"

namespace smallkv {
DBImpl::DBImpl(Options options) : options_(std::move(options)) {
alloc = std::make_shared<FreeListAllocate>();
mem_table = std::make_shared<MemTable>(alloc);
logger = log::get_logger();
auto file_writer_ = std::make_shared<FileWriter>(options_.DB_DIR);
wal_writer = std::make_shared<WALWriter>(file_writer_);

cache = std::make_shared<Cache<std::string, std::string>>(options_.CACHE_SIZE);
cache->register_clean_handle([](const std::string &key, std::string *val) {
delete val;
});
}

DBStatus DBImpl::Put(const WriteOptions &options,
const std::string_view &key,
const std::string_view &value) {
assert(closed == false);
/*
* 写逻辑:
* 1. 写WAL(fsync同步);
* 2. 写memtable;
* 3. 写缓存(提高读性能);
* 4. 如果memtable超限,应该落盘,并且开启一个新的memtable;
*
* */
std::unique_lock<std::shared_mutex> wlock(rwlock_);

// 1. 写WAL
char buf[8 + key.size() + value.size()];
EncodeKV(key, value, buf); // 将K-V编码到buf中
wal_writer->AddLog(buf);

// 2. 写memtable
if (mem_table->Contains(key)) { // Update
mem_table->Update(key, value);
} else { // New Insert
mem_table->Add(key, value);
}

// 3. 写缓存
// todo: 写入时候不一定需要写入缓存. 如果一次性写入大量数据,实际上不需要每次
// 都更新缓存,可以设置一种动态的、热点感知的缓存机制。后续有空优化。
// todo: 此处采用new std::string()性能很差,后续需要修改底层的cache接口。
cache->insert(key.data(), new std::string(value.data()));

// 4. 判断MemTable是否超限, 如果超限应该转为L1SST后持久化
if (mem_table->GetMemUsage() >= options_.MEM_TABLE_MAX_SIZE) {
MemTableToSST(); // 将memtable转为sst

// 开启写的memtable
mem_table = std::make_shared<MemTable>(alloc);
logger->info("[DBImpl::Put] A new mem_table is created.");
}
return Status::Success;
}

DBStatus DBImpl::Delete(const WriteOptions &options,
const std::string_view &key) {
assert(closed == false);
/*
* 删除逻辑:
* 1. 写WAL;
* 2. 写memtable;
* 3. 删除缓存;
* 4. 如果memtable超限,应该落盘,并且开启一个新的memtable;
* */
std::unique_lock<std::shared_mutex> wlock(rwlock_);

// 1. 写WAL
char buf[8 + key.size()]; // 用vel_len=0表示val为空
EncodeKV(key, "", buf);
wal_writer->AddLog(buf);

// 2. 写memtable
if (mem_table->Contains(key)) { // 原地标记val=""表示删除
mem_table->Delete(key);
} else {
mem_table->Add(key, ""); // 墓碑机制
}

// 3. 删除缓存
cache->erase(key.data());

// 4. 检查memtable是否超限
if (mem_table->GetMemUsage() >= options_.MEM_TABLE_MAX_SIZE) {
MemTableToSST(); // 将memtable转为sst

// 开启写的memtable
mem_table = std::make_shared<MemTable>(alloc);
logger->info("[DBImpl::Delete] A new mem_table is created.");
}
return Status::Success;
}

DBStatus DBImpl::Get(const ReadOptions &options,
const std::string_view &key,
std::string *value) {
assert(closed == false);
/*
* 读逻辑:
* 1. 读缓存,有则直接返回,否则进入2;
* 2. 依次从memtable、sst文件向下查找;
* 3. 找到的数据写入缓存;
* 4. 返回结果;
*
* */
std::shared_lock<std::shared_mutex> rlock(rwlock_);

// 1. 读缓存
if (cache->contains(key.data())) {
*value = *(cache->get(key.data())->val);
return Status::Success;
}

// 2. 读memtable
if (mem_table->Contains(key)) {
auto val = mem_table->Get(key);
*value = mem_table->Get(key.data()).value();
return Status::Success;
}

// 3. 依次读sst文件
// todo: 后续实现

// 4. 找到的数据写入缓存
// todo


return Status::Success;
}

DBStatus DBImpl::BatchPut(const WriteOptions &options) {
std::unique_lock<std::shared_mutex> wlock(rwlock_);
assert(closed == false);
// todo: 稍后实现
return Status::NotImpl;
}

DBStatus DBImpl::BatchDelete(const ReadOptions &options) {
std::unique_lock<std::shared_mutex> wlock(rwlock_);
assert(closed == false);
// todo: 稍后实现
return Status::NotImpl;
}

void DBImpl::EncodeKV(const std::string_view &key,
const std::string_view &value,
char *buf) {
/*
* 暂时采用的编码方法如下:
* +-------------+-----+-------------+-----+
* | key_len(4B) | key | val_len(4B) | val |
* +-------------+-----+-------------+-----+
* todo: 存在优化空间,例如使用variant等,后续有空再说
*
* */
assert(value.size() < UINT32_MAX);
utils::EncodeFixed32(buf, key.size());
memcpy(buf + 4, key.data(), key.size());
utils::EncodeFixed32(buf + 4 + key.size(), value.size());
memcpy(buf + 4 + key.size() + 4, value.data(), value.size());
}

void DBImpl::MemTableToSST() {
// todo: 此处采用同步方法(为了debug方便),后续需要修改为异步

// 格式为/.../level_n_sst_i.sst
auto sst_filepath = options_.STORAGE_DIR + "/" + utils::BuildSSTPath(0, options_.LISST_NUM);
logger->info("DBImpl::MemTableToSST() is called. sst_filepath={}", sst_filepath);

auto file_writer = std::make_shared<FileWriter>(sst_filepath);
auto sstable_builder = std::make_shared<SSTableBuilder>(mem_table->GetSize(), file_writer);
mem_table->ConvertToL1SST(sst_filepath, sstable_builder);

++options_.LISST_NUM; // 下一个sst文件序号+1
}

DBStatus DBImpl::Close() {
if (!closed && mem_table->GetSize() > 0) {
// memtable中有数据,就应该落盘
MemTableToSST();

closed = true;
}
logger->info("DB is closed.");
return Status::Success;
}
}
80 changes: 80 additions & 0 deletions src/db/db_impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
//
// Created by qianyy on 2023/1/28.
//
#include <memory>
#include <string_view>
#include <shared_mutex>
#include "status.h"
#include "options.h"
#include "log/log.h"

#ifndef SMALLKV_DB_IMPL_H
#define SMALLKV_DB_IMPL_H

namespace smallkv {
template<typename K, typename V>
class Cache;

class MemTable;

class WALWriter;

class FreeListAllocate;

/*
* 支持并发,线程安全
*
* */
class DBImpl {
public:
explicit DBImpl(Options options);

~DBImpl() = default;

// 同时具备Set和Update语义
DBStatus Put(const WriteOptions &options,
const std::string_view &key,
const std::string_view &value);

DBStatus Delete(const WriteOptions &options,
const std::string_view &key);

// 将Key对应的值写到value地址上
DBStatus Get(const ReadOptions &options,
const std::string_view &key,
std::string *value);

// 关闭数据库:调用此函数可以保证所有已写入数据会被持久化到磁盘,
DBStatus Close();

// 批写
DBStatus BatchPut(const WriteOptions &options);

DBStatus BatchDelete(const ReadOptions &options);

private:
// 将 KV 编码到 buf 中, 必须确保buf长度为8 + key.size() + value.size()
static void EncodeKV(const std::string_view &key,
const std::string_view &value,
char *buf);

// 将memtable转为sst
void MemTableToSST();

private:
std::shared_ptr<MemTable> mem_table; // active memtable
std::shared_ptr<spdlog::logger> logger; // 日志
std::shared_ptr<FreeListAllocate> alloc; // 内存分配器
std::shared_ptr<WALWriter> wal_writer; // 写wal

std::shared_ptr<Cache<std::string, std::string>> cache; // 缓存

Options options_; // 配置信息

std::shared_mutex rwlock_; // 读写锁

bool closed = false; // 表示数据库没有关闭
};
}

#endif //SMALLKV_DB_IMPL_H
Loading

0 comments on commit eeecacc

Please sign in to comment.