Skip to content

Commit

Permalink
replicate epoch number
Browse files Browse the repository at this point in the history
  • Loading branch information
seckcoder committed Aug 17, 2023
1 parent 2ad9544 commit 7fa6b75
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 31 deletions.
134 changes: 110 additions & 24 deletions cloud/replication_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,11 @@ class ReplicationTest : public testing::Test {
DB* currentLeader() const {
return leader_db_.get();
}

DBImpl* leaderFull() const {
return static_cast_with_check<DBImpl>(currentLeader());
}

DB* currentFollower() const {
return follower_db_.get();
}
Expand Down Expand Up @@ -265,12 +270,38 @@ class ReplicationTest : public testing::Test {
return keys;
}

// verify that the current log structured merge tree of two CFs to be the same
void verifyLSMTEqual(ColumnFamilyHandle* h1, ColumnFamilyHandle* h2) {
auto cf1 = static_cast_with_check<ColumnFamilyHandleImpl>(h1)->cfd(),
cf2 = static_cast_with_check<ColumnFamilyHandleImpl>(h2)->cfd();
ASSERT_EQ(cf1->NumberLevels(), cf2->NumberLevels());

for (int level = 0; level < cf1->NumberLevels(); level++) {
auto files1 = cf1->current()->storage_info()->LevelFiles(level),
files2 = cf2->current()->storage_info()->LevelFiles(level);
ASSERT_EQ(files1.size(), files2.size())
<< "mismatched number of files at level: " << level
<< " between cf: " << cf1->GetName()
<< " and cf: " << cf2->GetName();
for (size_t i = 0; i < files1.size(); i++) {
auto f1 = files1[i], f2 = files2[i];
ASSERT_EQ(f1->fd.file_size, f2->fd.file_size);
ASSERT_EQ(f1->fd.smallest_seqno, f2->fd.smallest_seqno);
ASSERT_EQ(f1->fd.largest_seqno, f2->fd.largest_seqno);
ASSERT_EQ(f1->epoch_number, f2->epoch_number);
ASSERT_EQ(f1->file_checksum, f2->file_checksum);
ASSERT_EQ(f1->unique_id, f2->unique_id);
}
}
}

void verifyEqual() {
ASSERT_EQ(leader_cfs_.size(), follower_cfs_.size());
auto leader = leader_db_.get(), follower = follower_db_.get();
for (auto& [name, cf1]: leader_cfs_) {
auto cf2 = followerCF(name);
verifyNextLogNumAndReplSeqConsistency(name);
verifyLSMTEqual(cf1.get(), cf2);

auto itrLeader = std::unique_ptr<Iterator>(
leader->NewIterator(ReadOptions(), cf1.get()));
Expand All @@ -290,6 +321,7 @@ class ReplicationTest : public testing::Test {

protected:
std::shared_ptr<Logger> info_log_;
bool replicate_epoch_number_{true};
void resetFollowerSequence(int new_seq) {
followerSequence_ = new_seq;
}
Expand Down Expand Up @@ -420,6 +452,12 @@ size_t ReplicationTest::catchUpFollower(
MutexLock lock(&log_records_mutex_);
DB::ApplyReplicationLogRecordInfo info;
size_t ret = 0;
unsigned flags = DB::AR_EVICT_OBSOLETE_FILES;
if (replicate_epoch_number_) {
flags |= DB::AR_REPLICATE_EPOCH_NUM;
} else {
flags |= DB::AR_RESET_IF_EPOCH_MISMATCH;
}
for (; followerSequence_ < (int)log_records_.size(); ++followerSequence_) {
if (num_records && ret >= *num_records) {
break;
Expand All @@ -430,8 +468,9 @@ size_t ReplicationTest::catchUpFollower(
[this](Slice) {
return ColumnFamilyOptions(follower_db_->GetOptions());
},
allow_new_manifest_writes, &info, DB::AR_EVICT_OBSOLETE_FILES);
allow_new_manifest_writes, &info, flags);
assert(s.ok());
assert(info.mismatched_epoch_num == 0);
++ret;
}
if (info.has_new_manifest_writes) {
Expand Down Expand Up @@ -1098,7 +1137,18 @@ TEST_F(ReplicationTest, EvictObsoleteFiles) {
static_cast_with_check<DBImpl>(follower)->TEST_table_cache()->GetUsage());
}

TEST_F(ReplicationTest, Stress) {
class ReplicationTestWithParam : public ReplicationTest,
public testing::WithParamInterface<bool> {
public:
ReplicationTestWithParam()
: ReplicationTest() {}

void SetUp() override {
replicate_epoch_number_ = GetParam();
}
};

TEST_P(ReplicationTestWithParam, Stress) {
std::string val;
auto leader = openLeader();
openFollower();
Expand All @@ -1114,41 +1164,54 @@ TEST_F(ReplicationTest, Stress) {
createColumnFamily(cf(i));
}

auto do_writes = [&](int n) {
auto rand = Random::GetTLSInstance();
while (n > 0) {
auto cfi = rand->Uniform(kColumnFamilyCount);
rocksdb::WriteBatch wb;
for (size_t i = 0; i < 3; ++i) {
--n;
wb.Put(leaderCF(cf(cfi)), std::to_string(rand->Uniform(kMaxKey)),
std::to_string(rand->Next()));
auto do_writes = [&]() {
auto writes_per_thread = [&](int n) {
auto rand = Random::GetTLSInstance();
while (n > 0) {
auto cfi = rand->Uniform(kColumnFamilyCount);
rocksdb::WriteBatch wb;
for (size_t i = 0; i < 3; ++i) {
--n;
wb.Put(leaderCF(cf(cfi)), std::to_string(rand->Uniform(kMaxKey)),
std::to_string(rand->Next()));
}
ASSERT_OK(leader->Write(wo(), &wb));
}
ASSERT_OK(leader->Write(wo(), &wb));
};

std::vector<std::thread> threads;
for (size_t i = 0; i < kThreadCount; ++i) {
threads.emplace_back([&]() { writes_per_thread(kWritesPerThread); });
}
for (auto& t : threads) {
t.join();
}

ASSERT_OK(
leaderFull()->TEST_WaitForBackgroundWork());
};

std::vector<std::thread> threads;
for (size_t i = 0; i < kThreadCount; ++i) {
threads.emplace_back([&]() { do_writes(kWritesPerThread); });
}
for (auto& t : threads) {
t.join();
}
ASSERT_OK(
static_cast_with_check<DBImpl>(leader)->TEST_WaitForBackgroundWork());
do_writes();

catchUpFollower();

verifyEqual();

ROCKS_LOG_INFO(info_log_, "reopen leader");

// Reopen leader
closeLeader();
leader = openLeader();
ASSERT_OK(leader->Flush(FlushOptions()));

// memtable might not be empty after reopening leader, since we recover
// replication log when opening it.
ASSERT_OK(leader->Flush({}));
ASSERT_OK(leaderFull()->TEST_WaitForBackgroundWork());
catchUpFollower();
verifyEqual();

do_writes();

ROCKS_LOG_INFO(info_log_, "reopen follower");

// Reopen follower
closeFollower();
openFollower();
Expand All @@ -1157,6 +1220,9 @@ TEST_F(ReplicationTest, Stress) {
verifyEqual();
}

INSTANTIATE_TEST_CASE_P(ReplicationTest, ReplicationTestWithParam,
::testing::Values(false, true));

TEST_F(ReplicationTest, DeleteRange) {
auto leader = openLeader();
openFollower();
Expand Down Expand Up @@ -1201,6 +1267,26 @@ TEST_F(ReplicationTest, DeleteRange) {
verifyEqual();
}

TEST_F(ReplicationTest, EpochNumberSimple) {
auto options = leaderOptions();
options.disable_auto_compactions = true;
auto leader = openLeader();
openFollower();

ASSERT_OK(leader->Put(wo(), "k1", "v1"));
ASSERT_OK(leader->Flush({}));
catchUpFollower();

ASSERT_OK(leader->Put(wo(), "k1", "v2"));
ASSERT_OK(leader->Flush({}));
auto leaderFull = static_cast_with_check<DBImpl>(leader);
ASSERT_OK(leaderFull->TEST_CompactRange(0, nullptr, nullptr, nullptr, true));

catchUpFollower();

verifyEqual();
}

} // namespace ROCKSDB_NAMESPACE

// A black-box test for the cloud wrapper around rocksdb
Expand Down
5 changes: 3 additions & 2 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1590,11 +1590,12 @@ Status CompactionJob::FinishCompactionOutputFile(
outputs.UpdateTableProperties();
ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64
" keys, %" PRIu64 " bytes%s, temperature: %s",
" keys, %" PRIu64 " bytes%s, temperature: %s, epoch number: %" PRIu64,
cfd->GetName().c_str(), job_id_, output_number,
current_entries, meta->fd.file_size,
meta->marked_for_compaction ? " (need compaction)" : "",
temperature_to_string[meta->temperature].c_str());
temperature_to_string[meta->temperature].c_str(),
meta->epoch_number);
}
std::string fname;
FileDescriptor output_fd;
Expand Down
102 changes: 101 additions & 1 deletion db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1217,6 +1217,7 @@ std::string DescribeVersionEdit(const VersionEdit& e, ColumnFamilyData* cfd) {
}
first = false;
oss << f.second.fd.GetNumber();
oss << ":" << f.second.epoch_number;
}
oss << "] ";
}
Expand Down Expand Up @@ -1424,9 +1425,108 @@ Status DBImpl::ApplyReplicationLogRecord(ReplicationLogRecord record,
autovector<VersionEdit*> el;
el.push_back(&e);
edit_lists.push_back(std::move(el));

ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s",
DescribeVersionEdit(e, cfd).c_str());
auto& newFiles = e.GetNewFiles();
if (!(flags & AR_REPLICATE_EPOCH_NUM)) {
// Epoch number calculation on the fly.
// There are two cases in which we need to calculate epoch number
// when applying `kManifestWrite`
// 1. flush which generates L0 files. epoch number is allocated
// based on `next_epoch_number` of each CF. The L0 files are sorted
// based on `largest seqno`.
// 2. compaction which merges files in lower levels to higher
// levels. epoch number = min epoch number of input files.
const auto& deletedFiles = e.GetDeletedFiles();
bool epoch_recovery_succeeded = true;
std::ostringstream err_oss;
if (deletedFiles.empty() && !newFiles.empty()) {
// case 1: flush into L0 files. New files must be level 0

for (auto& p : newFiles) {
if (p.first != 0) {
epoch_recovery_succeeded = false;
err_oss << "newly flushed file: " << p.first << " is not at L0";
break;
}
}

// sort added files by largest seqno
std::vector<FileMetaData*> added_files;
for(auto& p: newFiles) {
added_files.push_back(&p.second);
}

NewestFirstBySeqNo cmp;
std::sort(added_files.begin(), added_files.end(), cmp);
auto first_file = added_files[0];
// Rewind/advance next_epoch_number. This is necessary if epoch_number
// mismtaches due to db reopen.
if (first_file->epoch_number != kUnknownEpochNumber &&
first_file->epoch_number != cfd->GetNextEpochNumber() &&
(flags & AR_RESET_IF_EPOCH_MISMATCH)) {
auto max_epoch_number =
cfd->current()->storage_info()->GetMaxEpochNumberOfFiles();
if (first_file->epoch_number < cfd->GetNextEpochNumber() &&
(first_file->epoch_number == max_epoch_number + 1)) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] rewind next_epoch_number from: %" PRIu64
" to %" PRIu64,
cfd->GetName().c_str(),
cfd->GetNextEpochNumber(),
max_epoch_number + 1);
cfd->SetNextEpochNumber(max_epoch_number + 1);
} else if (first_file->epoch_number >
cfd->GetNextEpochNumber() &&
(cfd->GetNextEpochNumber() ==
max_epoch_number + 1)) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] advance next_epoch_number from: %" PRIu64
" to %" PRIu64,
cfd->GetName().c_str(),
cfd->GetNextEpochNumber(),
first_file->epoch_number);
cfd->SetNextEpochNumber(first_file->epoch_number);
}
}

for (auto meta: added_files) {
auto old_epoch_number = meta->epoch_number;
meta->epoch_number = cfd->NewEpochNumber();
if (old_epoch_number != meta->epoch_number) {
info->mismatched_epoch_num += 1;
}
}
} else if (!deletedFiles.empty() && !newFiles.empty()) {
// case 2: compaction
uint64_t min_input_epoch_number =
std::numeric_limits<uint64_t>::max();
const auto& storage_info = cfd->current()->storage_info();
for (auto [level, file_number] : deletedFiles) {
auto meta = storage_info->GetFileMetaDataByNumber(file_number);
if (!meta) {
err_oss << "deleted file: " << file_number
<< " at level: " << level << " not found";
break;
}
min_input_epoch_number =
std::min(meta->epoch_number, min_input_epoch_number);
}

for (auto& p: newFiles) {
auto old_epoch_number = p.second.epoch_number;
p.second.epoch_number = min_input_epoch_number;
if (old_epoch_number != p.second.epoch_number) {
info->mismatched_epoch_num += 1;
}
}
}

if (!epoch_recovery_succeeded) {
s = Status::Corruption(err_oss.str());
break;
}
}
}
if (!s.ok()) {
break;
Expand Down
4 changes: 2 additions & 2 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -895,9 +895,9 @@ Status FlushJob::WriteLevel0Table() {
NewMergingIterator(&cfd_->internal_comparator(), memtables.data(),
static_cast<int>(memtables.size()), &arena));
ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
"[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started. Epoch number: %" PRIu64,
cfd_->GetName().c_str(), job_context_->job_id,
meta_.fd.GetNumber());
meta_.fd.GetNumber(), meta_.epoch_number);

TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
&output_compression_);
Expand Down
1 change: 1 addition & 0 deletions db/version_edit.h
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ class VersionEdit {
// Retrieve the table files added as well as their associated levels.
using NewFiles = std::vector<std::pair<int, FileMetaData>>;
const NewFiles& GetNewFiles() const { return new_files_; }
NewFiles& GetNewFiles() { return new_files_; }

// Retrieve all the compact cursors
using CompactCursors = std::vector<std::pair<int, InternalKey>>;
Expand Down
6 changes: 4 additions & 2 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5760,8 +5760,10 @@ Status VersionSet::Recover(
}
ROCKS_LOG_INFO(db_options_->info_log,
"Column family [%s] (ID %" PRIu32
"), log number is %" PRIu64 "\n",
cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber());
"), log number is %" PRIu64
", next epoch number is %" PRIu64 "\n",
cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber(),
cfd->GetNextEpochNumber());
}
}

Expand Down
Loading

0 comments on commit 7fa6b75

Please sign in to comment.