Skip to content

Commit

Permalink
Increase migration speed
Browse files Browse the repository at this point in the history
  • Loading branch information
Qiuwen-chen committed Sep 14, 2024
1 parent b7f8db7 commit 2cff478
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 79 deletions.
6 changes: 1 addition & 5 deletions src/common/core/CoreConst.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,7 @@ static constexpr const int BackupMaxIncrementalPageCount = 1000;
static constexpr const int BackupMaxAllowIncrementalPageCount = 1000000;

#pragma mark - Migrate
static constexpr const double MigrateMaxExpectingDuration = 0.01;
static constexpr const double MigrateMaxInitializeDuration = 0.005;
static constexpr const int MigrationBatchCount = 100;

#pragma mark - Compression
static constexpr const int CompressionBatchCount = 10;
Expand Down Expand Up @@ -234,7 +233,4 @@ WCDBLiteralStringDefine(OperatorCheckIntegrity, "CheckIntegrity");
#pragma mark - Tag
static constexpr const int TagInvalidValue = 0;

#pragma mark - Constraint
static_assert(OperationQueueTimeIntervalForMigration > MigrateMaxExpectingDuration, "");

} // namespace WCDB
69 changes: 11 additions & 58 deletions src/common/core/migration/MigrateHandleOperator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ MigrateHandleOperator::MigrateHandleOperator(InnerHandle* handle)
, m_migratingInfo(nullptr)
, m_migrateStatement(handle->getStatement(DecoratorMigratingHandleStatement))
, m_removeMigratedStatement(handle->getStatement(DecoratorMigratingHandleStatement))
, m_samplePointing(0)
{
}

Expand Down Expand Up @@ -177,25 +176,22 @@ Optional<bool> MigrateHandleOperator::migrateRows(const MigrationInfo* info)
return NullOpt;
}

double timeIntervalWithinTransaction = calculateTimeIntervalWithinTransaction();
SteadyClock beforeTransaction = SteadyClock::now();
Optional<bool> migrated;
if (getHandle()->runTransaction(
[&migrated, &beforeTransaction, &timeIntervalWithinTransaction, this](InnerHandle*) -> bool {
double cost = 0;
if (getHandle()->runTransaction([&migrated, this](InnerHandle* handle) -> bool {
int migratedCount = 0;
do {
migrated = migrateRow();
cost = SteadyClock::timeIntervalSinceSteadyClockToNow(beforeTransaction);
} while (migrated.succeed() && !migrated.value()
&& cost < timeIntervalWithinTransaction);
timeIntervalWithinTransaction = cost;
migratedCount++;
if (handle->checkHasBusyRetry()) {
handle->notifyError(
Error::Code::Notice, "", "Interrupt compression due to busy");
break;
} else if (migratedCount > MigrationBatchCount) {
break;
}
} while (migrated.succeed() && !migrated.value());
return migrated.succeed();
})) {
// update only if succeed
double timeIntervalWholeTranscation
= SteadyClock::timeIntervalSinceSteadyClockToNow(beforeTransaction);
addSample(timeIntervalWithinTransaction, timeIntervalWholeTranscation);

WCTAssert(migrated.succeed());
return migrated;
}
Expand Down Expand Up @@ -227,49 +223,6 @@ void MigrateHandleOperator::finalizeMigrationStatement()
m_removeMigratedStatement->finalize();
}

#pragma mark - Sample
MigrateHandleOperator::Sample::Sample()
: timeIntervalWithinTransaction(0), timeIntervalWholeTransaction(0)
{
}

void MigrateHandleOperator::addSample(double timeIntervalWithinTransaction,
double timeIntervalForWholeTransaction)
{
WCTAssert(timeIntervalWithinTransaction > 0);
WCTAssert(timeIntervalForWholeTransaction > 0);
WCTAssert(m_samplePointing < numberOfSamples);
WCTAssert(timeIntervalForWholeTransaction > timeIntervalWithinTransaction);

Sample& sample = m_samples[m_samplePointing];
sample.timeIntervalWithinTransaction = timeIntervalWithinTransaction;
sample.timeIntervalWholeTransaction = timeIntervalForWholeTransaction;
++m_samplePointing;
if (m_samplePointing >= numberOfSamples) {
m_samplePointing = 0;
}
}

double MigrateHandleOperator::calculateTimeIntervalWithinTransaction() const
{
double totalTimeIntervalWithinTransaction = 0;
double totalTimeIntervalWholeTransaction = 0;
for (const auto& sample : m_samples) {
if (sample.timeIntervalWithinTransaction > 0
&& sample.timeIntervalWholeTransaction > 0) {
totalTimeIntervalWithinTransaction += sample.timeIntervalWithinTransaction;
totalTimeIntervalWholeTransaction += sample.timeIntervalWholeTransaction;
}
}
double timeIntervalWithinTransaction = MigrateMaxExpectingDuration * totalTimeIntervalWithinTransaction
/ totalTimeIntervalWholeTransaction;
if (timeIntervalWithinTransaction > MigrateMaxExpectingDuration
|| timeIntervalWithinTransaction <= 0 || std::isnan(timeIntervalWithinTransaction)) {
timeIntervalWithinTransaction = MigrateMaxInitializeDuration;
}
return timeIntervalWithinTransaction;
}

#pragma mark - Info Initializer
bool MigrateHandleOperator::attachSourceDatabase(const MigrationUserInfo& userInfo)
{
Expand Down
16 changes: 0 additions & 16 deletions src/common/core/migration/MigrateHandleOperator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,6 @@ class MigrateHandleOperator final : public HandleOperator, public Migration::Ste
HandleStatement* m_migrateStatement;
HandleStatement* m_removeMigratedStatement;

#pragma mark - Sample
protected:
void addSample(double timeIntervalWithinTransaction, double timeIntervalForWholeTransaction);
double calculateTimeIntervalWithinTransaction() const;

private:
static constexpr const int numberOfSamples = 10;
struct Sample {
Sample();
double timeIntervalWithinTransaction;
double timeIntervalWholeTransaction;
};
typedef struct Sample Sample;
std::array<Sample, numberOfSamples> m_samples; // FILO
int m_samplePointing;

#pragma mark - Info Initializer
protected:
bool attachSourceDatabase(const MigrationUserInfo& userInfo) override final;
Expand Down

0 comments on commit 2cff478

Please sign in to comment.