Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#4 Write a toy C++ example to illustrate basic buffer management #41

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 244 additions & 0 deletions src/buffer/buffer_manager.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@

#include <cassert>
#include <iostream>
#include <string>
#include <limits.h>
#include <memory>

#include "buffer/buffer_manager.h"
#include "common/macros.h"
#include "storage/file.h"

#define UNUSED(p) ((void)(p))

namespace buzzdb {

char* BufferFrame::get_data() {
return reinterpret_cast<char*>(data.data());
}

BufferFrame::BufferFrame(size_t page_size) {
this->data = std::vector<uint64_t>(page_size);
this->count = 0;
this->dirty = false;
}

BufferManager::BufferManager(size_t page_size, size_t page_count) {
this->page_size = page_size;
this->page_count = page_count;
for(size_t i=0; i<page_count; i++){
pool_.push_back(std::make_unique<BufferFrame>(page_size));
}
}



BufferManager::~BufferManager() {
for (size_t i = 0; i < pool_.size(); i++)
{
if(pool_[i]->dirty){
flush(i);
}
}
}

std::pair<bool, uint64_t> BufferManager::page_in_fifo_queue(uint64_t page_id){

bool found_page = false;
uint64_t page_frame_id = INT_MAX;
for(size_t i = 0; i < fifo_queue_.size(); i++){
auto frame_id = fifo_queue_[i];
if(pool_[frame_id]->page_id==page_id){
found_page=true;
page_frame_id=frame_id;
pool_[page_frame_id]->count++;
fifo_queue_.erase(fifo_queue_.begin() + i);
lru_queue_.push_back(frame_id);
break;
}
}
return std::make_pair(found_page, page_frame_id);
}

//find page in lru queue
std::pair<bool, uint64_t> BufferManager::page_in_lru_queue(uint64_t page_id){
bool found_page = false;
uint64_t page_frame_id = INT_MAX;
for(size_t i = 0; i < lru_queue_.size(); i++){
auto frame_id = lru_queue_[i];
if(pool_[frame_id]->page_id==page_id){
found_page=true;
page_frame_id=frame_id;
pool_[page_frame_id]->count++;
lru_queue_.erase(lru_queue_.begin() + i);
lru_queue_.push_back(frame_id);
break;
}
}
return std::make_pair(found_page, page_frame_id);
}

// read page to data
void BufferManager::read_frame(uint64_t frame_id){
file_use_mutex_.lock();
int16_t segment_id = BufferManager::get_segment_id(pool_[frame_id]->page_id);
uint64_t segment_page_id = BufferManager::get_segment_page_id(pool_[frame_id]->page_id);
auto file_handle = File::open_file(std::to_string(segment_id).c_str(), File::WRITE);
if(file_handle->size() >= segment_page_id * page_size + page_size){
file_handle->read_block(segment_page_id * page_size, page_size, pool_[frame_id]->get_data());
}
file_use_mutex_.unlock();
}

// write frame to disk
void BufferManager::flush(uint64_t frame_id){
file_use_mutex_.lock();
uint16_t segment_id = BufferManager::get_segment_id(pool_[frame_id]->page_id);
uint64_t segment_page_id = BufferManager::get_segment_page_id(pool_[frame_id]->page_id);
auto file_handle = File::open_file(std::to_string(segment_id).c_str(), File::WRITE);
file_handle->write_block(pool_[frame_id]->get_data(), segment_page_id * page_size, page_size);
file_use_mutex_.unlock();
}

void BufferManager::lock_frame(uint64_t frame_id, bool exclusive){
// std::cout << "current frame count in lock: "<< pool_[frame_id]->count.load() << std::endl;
if(!exclusive){
pool_[frame_id]->mutex_.lock_shared();
} else{
pool_[frame_id]->mutex_.lock();
pool_[frame_id]->exclusive_thread_id = std::this_thread::get_id();
}
pool_[frame_id]->exclusive = exclusive;
}

void BufferManager::unlock_frame(uint64_t frame_id){

pool_[frame_id]->count--;
// std::cout << "current frame count in unlock: "<< pool_[frame_id]->count << std::endl;
if(!pool_[frame_id]->exclusive){
pool_[frame_id]->mutex_.unlock_shared();
} else{
pool_[frame_id]->mutex_.unlock();
pool_[frame_id]->exclusive = false;
}
}

BufferFrame& BufferManager::fix_page(uint64_t page_id, bool exclusive) {
fifo_mutex_.lock();
lru_mutex_.lock();
std::cout << "page_id " << page_id<< " this_thread_id before lock: " << std::this_thread::get_id() << std::endl;
// find in lru queue
std::pair<bool, uint64_t> lru_result = page_in_lru_queue(page_id);
if(lru_result.first){
// std::cout << "lru result count after unlock: " << pool_[lru_result.second]->count << std::endl;
fifo_mutex_.unlock();
lru_mutex_.unlock();
lock_frame(lru_result.second, exclusive);
return *pool_[lru_result.second];
}
// find in fifo queue
std::pair<bool, uint64_t> fifo_result = page_in_fifo_queue(page_id);
if(fifo_result.first){
// std::cout << "fifo after unlock: " << pool_[fifo_result.second]->count << std::endl;
fifo_mutex_.unlock();
lru_mutex_.unlock();
lock_frame(fifo_result.second, exclusive);
return *pool_[fifo_result.second];
}

// find a free slot
uint64_t page_frame_id = 0;
bool found_page = false;
// buffer is full
if(buffer_size == page_count){
// evict unfixed page from fifo queue
for(size_t i = 0; i < fifo_queue_.size(); i++){
auto frame_id = fifo_queue_[i];
// std::cout << "current frame count fifo: "<< pool_[frame_id]->count << std::endl;
if(pool_[frame_id]->count == 0){
found_page=true;
page_frame_id=frame_id;
if(pool_[frame_id]->dirty){
flush(frame_id);
}
fifo_queue_.erase(fifo_queue_.begin() + i);
break;
}
}
if(!found_page){
// evict unfixed page from lru queue
for(size_t i = 0; i < lru_queue_.size(); i++){
auto frame_id = lru_queue_[i];
// std::cout << "current frame count lru: "<< pool_[frame_id]->count << std::endl;
if(pool_[frame_id]->count == 0){
found_page=true;
page_frame_id=frame_id;
if(pool_[frame_id]->dirty){
flush(frame_id);
}
lru_queue_.erase(lru_queue_.begin() + i);
break;
}
}
}
if(!found_page){
fifo_mutex_.unlock();
lru_mutex_.unlock();
throw buffer_full_error{};
}
} else{
page_frame_id = buffer_size;
buffer_size.fetch_add(1);
}

lock_frame(page_frame_id, true);
pool_[page_frame_id]->page_id = page_id;
pool_[page_frame_id]->frame_id = page_frame_id;
pool_[page_frame_id]->dirty = false;
pool_[page_frame_id]->count = 2;
fifo_queue_.push_back(page_frame_id);
fifo_mutex_.unlock();
lru_mutex_.unlock();
// std::cout << "this_thread_id after unlock: " << std::this_thread::get_id() << std::endl;
read_frame(page_frame_id);
// std::cout << "this_thread_id after unlock: " << std::this_thread::get_id() << std::endl;
unlock_frame(page_frame_id);
// std::cout << "this_thread_id after unlock: " << std::this_thread::get_id() << std::endl;
lock_frame(page_frame_id, exclusive);
// std::cout << "this_thread_id after unlock: " << std::this_thread::get_id() << std::endl;

return *pool_[page_frame_id];
}



void BufferManager::unfix_page(BufferFrame& page, bool is_dirty) {
if(!page.dirty){
page.dirty = is_dirty;
}
for(uint64_t i = 0; i < pool_.size(); i++){
if(pool_[i].get() == &page){
unlock_frame(i);
}
}
}


std::vector<uint64_t> BufferManager::get_fifo_list() const {
std::vector<uint64_t> ans(fifo_queue_.size());
for(size_t i = 0; i < fifo_queue_.size(); i++){
ans[i] = pool_[fifo_queue_[i]]->page_id;
}
return ans;
}


std::vector<uint64_t> BufferManager::get_lru_list() const {
std::vector<uint64_t> ans(lru_queue_.size());
for(size_t i = 0; i < lru_queue_.size(); i++){
ans[i] = pool_[lru_queue_[i]]->page_id;
}
return ans;
}

} // namespace buzzdb
132 changes: 132 additions & 0 deletions src/include/buffer/buffer_manager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
#pragma once

#include <cstddef>
#include <cstdint>
#include <exception>
#include <vector>
#include "common/macros.h"
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <thread>
#include <atomic>
namespace buzzdb {

class BufferFrame {
private:
friend class BufferManager;

public:
BufferFrame(size_t page_size);
BufferFrame(){};
uint64_t page_id;
uint64_t frame_id;
std::vector<uint64_t> data;
bool dirty;
bool exclusive;
int count;
mutable std::shared_mutex mutex_;
std::thread::id exclusive_thread_id;
/// Returns a pointer to this page's data.
char* get_data();
BufferFrame(const BufferFrame& other)
: page_id(other.page_id),
frame_id(other.frame_id),
data(other.data),
dirty(other.dirty),
exclusive(other.exclusive) {}
BufferFrame& operator=(BufferFrame other) {
std::swap(this->page_id, other.page_id);
std::swap(this->frame_id, other.frame_id);
std::swap(this->data, other.data);
std::swap(this->dirty, other.dirty);
std::swap(this->exclusive, other.exclusive);
return *this;
}
};


class buffer_full_error
: public std::exception {
public:
const char* what() const noexcept override {
return "buffer is full";
}
};


class BufferManager {
private:

std::vector<std::unique_ptr<BufferFrame>> pool_;
std::atomic<uint64_t> buffer_size{0};
size_t page_size;
size_t page_count;
mutable std::mutex lru_mutex_;
mutable std::mutex fifo_mutex_;
mutable std::mutex file_use_mutex_;
std::vector<std::unique_ptr<std::shared_mutex>> lock_table_;
std::vector<std::unique_ptr<std::atomic<int16_t>>> use_counters_;

public:
/// Constructor.
std::vector<uint64_t> fifo_queue_;
std::vector<uint64_t> lru_queue_;
/// @param[in] page_size Size in bytes that all pages will have.
/// @param[in] page_count Maximum number of pages that should reside in
// memory at the same time.
BufferManager(size_t page_size, size_t page_count);

/// Destructor. Writes all dirty pages to disk.
~BufferManager();

/// Returns a reference to a `BufferFrame` object for a given page id. When
/// the page is not loaded into memory, it is read from disk. Otherwise the
/// loaded page is used.
/// When the page cannot be loaded because the buffer is full, throws the
/// exception `buffer_full_error`.
/// Is thread-safe w.r.t. other concurrent calls to `fix_page()` and
/// `unfix_page()`.
/// @param[in] page_id Page id of the page that should be loaded.
/// @param[in] exclusive If `exclusive` is true, the page is locked
/// exclusively. Otherwise it is locked
/// non-exclusively (shared).
BufferFrame& fix_page(uint64_t page_id, bool exclusive);

/// Takes a `BufferFrame` reference that was returned by an earlier call to
/// `fix_page()` and unfixes it. When `is_dirty` is / true, the page is
/// written back to disk eventually.
void unfix_page(BufferFrame& page, bool is_dirty);

/// Returns the page ids of all pages (fixed and unfixed) that are in the
/// FIFO list in FIFO order.
/// Is not thread-safe.
std::vector<uint64_t> get_fifo_list() const;

/// Returns the page ids of all pages (fixed and unfixed) that are in the
/// LRU list in LRU order.
/// Is not thread-safe.
std::vector<uint64_t> get_lru_list() const;

/// Returns the segment id for a given page id which is contained in the 16
/// most significant bits of the page id.
static constexpr uint16_t get_segment_id(uint64_t page_id) {
return page_id >> 48;
}

/// Returns the page id within its segment for a given page id. This
/// corresponds to the 48 least significant bits of the page id.
static constexpr uint64_t get_segment_page_id(uint64_t page_id) {
return page_id & ((1ull << 48) - 1);
}

std::pair<bool, uint64_t> page_in_lru_queue(uint64_t page_id);
std::pair<bool, uint64_t> page_in_fifo_queue(uint64_t page_id);
void lock_frame(uint64_t frame_id, bool exclusive);
void unlock_frame(uint64_t frame_id);
void read_frame(uint64_t frame_id);
void flush(uint64_t frame_id);
};


} // namespace moderndbs
Loading