diff --git a/cloud/replication_test.cc b/cloud/replication_test.cc index 74028e09291..e8a3c766f25 100644 --- a/cloud/replication_test.cc +++ b/cloud/replication_test.cc @@ -274,7 +274,8 @@ class ReplicationTest : public testing::Test { void verifyLSMTEqual(ColumnFamilyHandle* h1, ColumnFamilyHandle* h2) { auto cf1 = static_cast_with_check(h1)->cfd(), cf2 = static_cast_with_check(h2)->cfd(); - ASSERT_EQ(cf1->NumberLevels(), cf2->NumberLevels()); + ASSERT_EQ(cf1->NumberLevels(), cf2->NumberLevels()) + << h1->GetName() << ", " << h2->GetName(); for (int level = 0; level < cf1->NumberLevels(); level++) { auto files1 = cf1->current()->storage_info()->LevelFiles(level), @@ -1191,10 +1192,18 @@ TEST_P(ReplicationTestWithParam, Stress) { leaderFull()->TEST_WaitForBackgroundWork()); }; + auto verifyNextEpochNumber = [&]() { + for (int i = 0; i < kColumnFamilyCount; i++) { + auto cf1 = leaderCFD(cf(i)), cf2 = followerCFD(cf(i)); + ASSERT_EQ(cf1->GetNextEpochNumber(), cf2->GetNextEpochNumber()); + } + }; + do_writes(); catchUpFollower(); verifyEqual(); + verifyNextEpochNumber(); ROCKS_LOG_INFO(info_log_, "reopen leader"); @@ -1218,6 +1227,7 @@ TEST_P(ReplicationTestWithParam, Stress) { catchUpFollower(); verifyEqual(); + verifyNextEpochNumber(); } INSTANTIATE_TEST_CASE_P(ReplicationTest, ReplicationTestWithParam, diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 6e37fe68ea4..3bb2b590c60 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1428,6 +1428,8 @@ Status DBImpl::ApplyReplicationLogRecord(ReplicationLogRecord record, ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s", DescribeVersionEdit(e, cfd).c_str()); auto& newFiles = e.GetNewFiles(); + bool epoch_recovery_succeeded = true; + std::ostringstream err_oss; if (!(flags & AR_REPLICATE_EPOCH_NUM)) { // Epoch number calculation on the fly. // There are two cases in which we need to calculate epoch number @@ -1438,8 +1440,6 @@ Status DBImpl::ApplyReplicationLogRecord(ReplicationLogRecord record, // 2. compaction which merges files in lower levels to higher // levels. epoch number = min epoch number of input files. const auto& deletedFiles = e.GetDeletedFiles(); - bool epoch_recovery_succeeded = true; - std::ostringstream err_oss; if (deletedFiles.empty() && !newFiles.empty()) { // case 1: flush into L0 files. New files must be level 0 @@ -1487,6 +1487,17 @@ Status DBImpl::ApplyReplicationLogRecord(ReplicationLogRecord record, cfd->GetNextEpochNumber(), first_file->epoch_number); cfd->SetNextEpochNumber(first_file->epoch_number); + } else { + ROCKS_LOG_ERROR(immutable_db_options_.info_log, + "[%s] unexpected epoch number: %" PRIu64 + " for file: %" PRIu64 + " ; max epoch number: %" PRIu64, + cfd->GetName().c_str(), + first_file->epoch_number, + first_file->fd.GetNumber(), + max_epoch_number); + s = Status::Corruption("unexpected epoch number for added file"); + break; } } @@ -1521,11 +1532,24 @@ Status DBImpl::ApplyReplicationLogRecord(ReplicationLogRecord record, } } } - - if (!epoch_recovery_succeeded) { - s = Status::Corruption(err_oss.str()); - break; + } else if (newFiles.size() > 0) { + // Maintain next epoch number on follower + auto next_epoch_number = cfd->GetNextEpochNumber(); + for (auto& p : newFiles) { + auto epoch_number = p.second.epoch_number; + // advance next epoch number. next_epoch_number never goes + // backwards + if (epoch_number != kUnknownEpochNumber && + (epoch_number >= next_epoch_number)) { + next_epoch_number = epoch_number + 1; + } } + cfd->SetNextEpochNumber(next_epoch_number); + } + + if (!epoch_recovery_succeeded) { + s = Status::Corruption(err_oss.str()); + break; } } if (!s.ok()) {