Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
lalinsky committed Mar 17, 2024
1 parent a18d5eb commit bc1f5e7
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 7 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ project(fpserver)
cmake_policy(SET CMP0071 NEW)

set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED YES)

set(CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/cmake/modules)

Expand Down
2 changes: 2 additions & 0 deletions src/fpindex/base_segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ class BaseSegment {
BaseSegment &operator=(const BaseSegment &) = delete;
virtual ~BaseSegment() = default;

const SegmentInfo &info() const { return info_; }

uint32_t id() const { return info_.id(); }
uint64_t min_oplog_id() const { return info_.min_oplog_id(); }
uint64_t max_oplog_id() const { return info_.max_oplog_id(); }
Expand Down
44 changes: 37 additions & 7 deletions src/fpindex/index.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include "fpindex/index.h"

#include <atomic>
#include <unordered_map>

#include "fpindex/logging.h"
Expand All @@ -15,6 +14,11 @@ constexpr size_t MAX_STAGE_SIZE = 1000000;

std::string GenerateSegmentFileName(uint32_t id) { return "segment-" + std::to_string(id) + ".data"; }

IndexSnapshot::IndexSnapshot(std::shared_ptr<IndexInfo> info, std::map<uint32_t, std::shared_ptr<BaseSegment>> segments)
: info_(info), segments_(segments) {}

bool IndexSnapshot::Search(const std::vector<uint32_t>& hashes, std::vector<SearchResult>* results) { return false; }

void Index::Writer() {
while (true) {
std::unique_lock<std::mutex> writer_lock(writer_mutex_);
Expand Down Expand Up @@ -71,14 +75,22 @@ bool Index::Open() {
return false;
}

auto index_info = std::make_shared<IndexInfo>();
for (auto& segment_info : index_info->segments()) {
info_ = std::make_shared<IndexInfo>();
for (auto& segment_info : info_->segments()) {
auto segment = std::make_shared<Segment>(segment_info.id());
if (!segment->Load(nullptr)) {
auto file_name = GenerateSegmentFileName(segment->id());
auto file = dir_->OpenFile(file_name, true);
if (!file) {
LOG_ERROR() << "failed to open file" << QString::fromStdString(file_name);
return false;
}
if (!segment->Load(file)) {
LOG_ERROR() << "failed to load segment" << QString::fromStdString(file_name);
return false;
}
segments_.insert({segment->id(), segment});
}
snapshot_ = std::make_shared<IndexSnapshot>(info_, segments_);

return true;
}
Expand All @@ -100,21 +112,37 @@ bool Index::Close() {

bool Index::IsReady() { return oplog_ && oplog_->IsReady() && writer_thread_; }

bool Index::Search(const std::vector<uint32_t>& hashes, std::vector<SearchResult>* results) { return false; }
bool Index::Search(const std::vector<uint32_t>& hashes, std::vector<SearchResult>* results) {
auto snapshot = snapshot_.load();
return snapshot->Search(hashes, results);
}

void Index::AddSegment(std::shared_ptr<BaseSegment> segment) {
segments_.insert({segment->id(), segment});
for (int i = info_->segments_size() - 1; i >= 0; i--) {
if (info_->segments(i).id() == segment->id()) {
info_->mutable_segments(i)->CopyFrom(segment->info());
return;
}
}
info_->add_segments()->CopyFrom(current_segment_->info());
auto snapshot = std::make_shared<IndexSnapshot>(info_, segments_);
snapshot_.store(snapshot);
}

std::shared_ptr<SegmentBuilder> Index::GetCurrentSegment() {
std::lock_guard<std::mutex> lock(mutex_);

if (!current_segment_) {
current_segment_ = std::make_shared<SegmentBuilder>(0);
segments_.insert({current_segment_->id(), current_segment_});
AddSegment(current_segment_);
}

if (current_segment_->Size() >= MAX_STAGE_SIZE) {
auto next_segment = std::make_shared<SegmentBuilder>(current_segment_->id() + 1);
segments_.insert({next_segment->id(), next_segment});
segments_to_write_.push_back(current_segment_);
current_segment_ = next_segment;
AddSegment(current_segment_);
writer_cv_.notify_one();
}

Expand Down Expand Up @@ -152,6 +180,8 @@ bool Index::Update(IndexUpdate&& update) {
}

current_segment->Update(entries);
AddSegment(current_segment);

return true;
}

Expand Down
16 changes: 16 additions & 0 deletions src/fpindex/index.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@ class BaseSegment;
class SegmentBuilder;
class Oplog;

class IndexSnapshot {
public:
IndexSnapshot(std::shared_ptr<IndexInfo> info, std::map<uint32_t, std::shared_ptr<BaseSegment>> segments);

bool Search(const std::vector<uint32_t> &hashes, std::vector<SearchResult> *results);

private:
std::shared_ptr<IndexInfo> info_;
std::map<uint32_t, std::shared_ptr<BaseSegment>> segments_;
};

class Index {
public:
Index(std::shared_ptr<io::Directory> dir);
Expand All @@ -34,18 +45,23 @@ class Index {
std::shared_ptr<SegmentBuilder> GetCurrentSegment();
void Writer();

void AddSegment(std::shared_ptr<BaseSegment> segment);

std::mutex mutex_;
std::atomic<bool> stop_{false};

std::mutex writer_mutex_;
std::condition_variable writer_cv_;
std::unique_ptr<std::thread> writer_thread_;

std::shared_ptr<IndexInfo> info_;
std::map<uint32_t, std::shared_ptr<BaseSegment>> segments_;

std::deque<std::shared_ptr<SegmentBuilder>> segments_to_write_;
std::shared_ptr<SegmentBuilder> current_segment_;

std::atomic<std::shared_ptr<IndexSnapshot>> snapshot_;

std::shared_ptr<io::Directory> dir_;
std::shared_ptr<io::Database> db_;
std::shared_ptr<Oplog> oplog_;
Expand Down
1 change: 1 addition & 0 deletions src/fpindex/oplog.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <mutex>
#include <optional>
#include <functional>

#include "fpindex/io/database.h"
#include "fpindex/proto/internal.pb.h"
Expand Down

0 comments on commit bc1f5e7

Please sign in to comment.