Skip to content

Commit

Permalink
maintain next epoch number on follower
Browse files Browse the repository at this point in the history
  • Loading branch information
seckcoder committed Aug 17, 2023
1 parent 7fa6b75 commit 1232181
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 7 deletions.
12 changes: 11 additions & 1 deletion cloud/replication_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,8 @@ class ReplicationTest : public testing::Test {
void verifyLSMTEqual(ColumnFamilyHandle* h1, ColumnFamilyHandle* h2) {
auto cf1 = static_cast_with_check<ColumnFamilyHandleImpl>(h1)->cfd(),
cf2 = static_cast_with_check<ColumnFamilyHandleImpl>(h2)->cfd();
ASSERT_EQ(cf1->NumberLevels(), cf2->NumberLevels());
ASSERT_EQ(cf1->NumberLevels(), cf2->NumberLevels())
<< h1->GetName() << ", " << h2->GetName();

for (int level = 0; level < cf1->NumberLevels(); level++) {
auto files1 = cf1->current()->storage_info()->LevelFiles(level),
Expand Down Expand Up @@ -1191,10 +1192,18 @@ TEST_P(ReplicationTestWithParam, Stress) {
leaderFull()->TEST_WaitForBackgroundWork());
};

auto verifyNextEpochNumber = [&]() {
for (int i = 0; i < kColumnFamilyCount; i++) {
auto cf1 = leaderCFD(cf(i)), cf2 = followerCFD(cf(i));
ASSERT_EQ(cf1->GetNextEpochNumber(), cf2->GetNextEpochNumber());
}
};

do_writes();

catchUpFollower();
verifyEqual();
verifyNextEpochNumber();

ROCKS_LOG_INFO(info_log_, "reopen leader");

Expand All @@ -1218,6 +1227,7 @@ TEST_P(ReplicationTestWithParam, Stress) {
catchUpFollower();

verifyEqual();
verifyNextEpochNumber();
}

INSTANTIATE_TEST_CASE_P(ReplicationTest, ReplicationTestWithParam,
Expand Down
36 changes: 30 additions & 6 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,8 @@ Status DBImpl::ApplyReplicationLogRecord(ReplicationLogRecord record,
ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s",
DescribeVersionEdit(e, cfd).c_str());
auto& newFiles = e.GetNewFiles();
bool epoch_recovery_succeeded = true;
std::ostringstream err_oss;
if (!(flags & AR_REPLICATE_EPOCH_NUM)) {
// Epoch number calculation on the fly.
// There are two cases in which we need to calculate epoch number
Expand All @@ -1438,8 +1440,6 @@ Status DBImpl::ApplyReplicationLogRecord(ReplicationLogRecord record,
// 2. compaction which merges files in lower levels to higher
// levels. epoch number = min epoch number of input files.
const auto& deletedFiles = e.GetDeletedFiles();
bool epoch_recovery_succeeded = true;
std::ostringstream err_oss;
if (deletedFiles.empty() && !newFiles.empty()) {
// case 1: flush into L0 files. New files must be level 0

Expand Down Expand Up @@ -1487,6 +1487,17 @@ Status DBImpl::ApplyReplicationLogRecord(ReplicationLogRecord record,
cfd->GetNextEpochNumber(),
first_file->epoch_number);
cfd->SetNextEpochNumber(first_file->epoch_number);
} else {
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"[%s] unexpected epoch number: %" PRIu64
" for file: %" PRIu64
" ; max epoch number: %" PRIu64,
cfd->GetName().c_str(),
first_file->epoch_number,
first_file->fd.GetNumber(),
max_epoch_number);
s = Status::Corruption("unexpected epoch number for added file");
break;
}
}

Expand Down Expand Up @@ -1521,11 +1532,24 @@ Status DBImpl::ApplyReplicationLogRecord(ReplicationLogRecord record,
}
}
}

if (!epoch_recovery_succeeded) {
s = Status::Corruption(err_oss.str());
break;
} else if (newFiles.size() > 0) {
// Maintain next epoch number on follower
auto next_epoch_number = cfd->GetNextEpochNumber();
for (auto& p : newFiles) {
auto epoch_number = p.second.epoch_number;
// advance next epoch number. next_epoch_number never goes
// backwards
if (epoch_number != kUnknownEpochNumber &&
(epoch_number >= next_epoch_number)) {
next_epoch_number = epoch_number + 1;
}
}
cfd->SetNextEpochNumber(next_epoch_number);
}

if (!epoch_recovery_succeeded) {
s = Status::Corruption(err_oss.str());
break;
}
}
if (!s.ok()) {
Expand Down

0 comments on commit 1232181

Please sign in to comment.