Skip to content

Commit

Permalink
implement out of order migrations.
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Simons <[email protected]>
  • Loading branch information
michael-simons committed Nov 28, 2024
1 parent 1eb4aa4 commit 92772da
Show file tree
Hide file tree
Showing 16 changed files with 317 additions and 47 deletions.
13 changes: 13 additions & 0 deletions docs/modules/ROOT/pages/usage.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ It applies all locally resolved migrations to the target database and stores the

It returns the last applied version.

That operation does not allow migrations out-of-order by default.
That means if you add version 2 after you already migrated to 5, it will fail with an appropriate error.
You can spot these cases beforehand by validating your chain of migrations (see below).
If you absolutely must however, you can use `withOutOfOrderAllowed` on the config object or the corresponding property in either the Spring Boot starter or the Quarkus extension.
Setting this to true will integrate out-of-order migrations into the chain.

[[usage_common_repair]]
=== Repair

Expand Down Expand Up @@ -677,6 +683,13 @@ A configurable delay that will be applied in between applying two migrations.
- Type: `java.time.Duration`
- Default: `false`

`org.neo4j.migrations.out-of-order`::
A flag to enable out-of-order migrations.

- Type: `java.lang.Boolean`
- Default: `false`


NOTE: Migrations can be disabled by setting `org.neo4j.migrations.enabled` to `false`.


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;

import org.neo4j.driver.Record;
import org.neo4j.driver.Result;
Expand Down Expand Up @@ -57,10 +58,10 @@ final class ChainBuilder {
* @param discoveredMigrations A list of migrations sorted by {@link Migration#getVersion()}.
* It is not yet known whether those are pending or not.
* @return The full migration chain.
* @see #buildChain(MigrationContext, List, boolean, ChainBuilderMode)
* @see #buildChain(MigrationContext, List, boolean, ChainBuilderMode, boolean)
*/
MigrationChain buildChain(MigrationContext context, List<Migration> discoveredMigrations) {
return buildChain(context, discoveredMigrations, false, ChainBuilderMode.COMPARE);
return buildChain(context, discoveredMigrations, false, ChainBuilderMode.COMPARE, false);
}

/**
Expand All @@ -70,14 +71,14 @@ MigrationChain buildChain(MigrationContext context, List<Migration> discoveredMi
* @param detailedCauses set to {@literal true} to add causes to possible exceptions
* @return The full migration chain.
*/
MigrationChain buildChain(MigrationContext context, List<Migration> discoveredMigrations, boolean detailedCauses, ChainBuilderMode mode) {
MigrationChain buildChain(MigrationContext context, List<Migration> discoveredMigrations, boolean detailedCauses, ChainBuilderMode mode, boolean forceInOrder) {

final Map<MigrationVersion, Element> elements = buildChain0(context, discoveredMigrations, detailedCauses, mode);
final Map<MigrationVersion, Element> elements = buildChain0(context, discoveredMigrations, detailedCauses, mode, forceInOrder);
return new DefaultMigrationChain(context.getConnectionDetails(), elements);
}

@SuppressWarnings("squid:S3776") // Yep, this is a complex validation, but it still fits on one screen
private Map<MigrationVersion, Element> buildChain0(MigrationContext context, List<Migration> discoveredMigrations, boolean detailedCauses, ChainBuilderMode mode) {
private Map<MigrationVersion, Element> buildChain0(MigrationContext context, List<Migration> discoveredMigrations, boolean detailedCauses, ChainBuilderMode mode, boolean forceInOrder) {

Map<MigrationVersion, Element> appliedMigrations =
mode == ChainBuilderMode.LOCAL ? Collections.emptyMap() : getChainOfAppliedMigrations(context);
Expand All @@ -87,41 +88,50 @@ private Map<MigrationVersion, Element> buildChain0(MigrationContext context, Lis
}

final String incompleteMigrationsMessage = "More migrations have been applied to the database than locally resolved.";
Map<MigrationVersion, Element> fullMigrationChain = new LinkedHashMap<>(
discoveredMigrations.size() + appliedMigrations.size());
Map<MigrationVersion, Element> fullMigrationChain = new TreeMap<>(context.getConfig().getVersionComparator());
boolean outOfOrderAllowed = context.getConfig().isOutOfOrder() && !forceInOrder;
int i = 0;
for (Map.Entry<MigrationVersion, Element> entry : appliedMigrations.entrySet()) {
MigrationVersion expectedVersion = entry.getKey();
Optional<String> expectedChecksum = entry.getValue().getChecksum();

Migration newMigration;
try {
newMigration = discoveredMigrations.get(i);
} catch (IndexOutOfBoundsException e) {
if (detailedCauses) {
throw new MigrationsException(incompleteMigrationsMessage, e);
boolean checkNext = true;
while (checkNext) {
checkNext = false;
Migration newMigration;
try {
newMigration = discoveredMigrations.get(i++);
} catch (IndexOutOfBoundsException e) {
if (detailedCauses) {
throw new MigrationsException(incompleteMigrationsMessage, e);
}
throw new MigrationsException(incompleteMigrationsMessage);
}
throw new MigrationsException(incompleteMigrationsMessage);
}

if (!newMigration.getVersion().equals(expectedVersion)) {
if (getNumberOfAppliedMigrations(context) > discoveredMigrations.size()) {
throw new MigrationsException(incompleteMigrationsMessage, new IndexOutOfBoundsException());
if (!newMigration.getVersion().equals(expectedVersion)) {
if (getNumberOfAppliedMigrations(context) > discoveredMigrations.size()) {
throw new MigrationsException(incompleteMigrationsMessage, new IndexOutOfBoundsException());
}
if (outOfOrderAllowed) {
fullMigrationChain.put(newMigration.getVersion(), DefaultMigrationChainElement.pendingElement(newMigration));
checkNext = true;
continue;
} else {
throw new MigrationsException("Unexpected migration at index " + (i - 1) + ": " + Migrations.toString(newMigration) + ".");
}
}
throw new MigrationsException("Unexpected migration at index " + i + ": " + Migrations.toString(newMigration) + ".");
}

if (newMigration.isRepeatable() != expectedVersion.isRepeatable()) {
throw new MigrationsException("State of " + Migrations.toString(newMigration) + " changed from " + (expectedVersion.isRepeatable() ? "repeatable to non-repeatable" : "non-repeatable to repeatable"));
}
if (newMigration.isRepeatable() != expectedVersion.isRepeatable()) {
throw new MigrationsException("State of " + Migrations.toString(newMigration) + " changed from " + (expectedVersion.isRepeatable() ? "repeatable to non-repeatable" : "non-repeatable to repeatable"));
}

if ((context.getConfig().isValidateOnMigrate() || alwaysVerify) && !(matches(expectedChecksum, newMigration) || expectedVersion.isRepeatable())) {
throw new MigrationsException("Checksum of " + Migrations.toString(newMigration) + " changed!");
}
if ((context.getConfig().isValidateOnMigrate() || alwaysVerify) && !(matches(expectedChecksum, newMigration) || expectedVersion.isRepeatable())) {
throw new MigrationsException("Checksum of " + Migrations.toString(newMigration) + " changed!");
}

// This is not a pending migration anymore
fullMigrationChain.put(expectedVersion, entry.getValue());
++i;
// This is not a pending migration anymore
fullMigrationChain.put(expectedVersion, entry.getValue());
}
}

// All remaining migrations are pending
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.neo4j.driver.Values;
import org.neo4j.driver.exceptions.NoSuchRecordException;
import org.neo4j.driver.exceptions.ServiceUnavailableException;
import org.neo4j.driver.summary.ResultSummary;
import org.neo4j.driver.summary.SummaryCounters;
import org.neo4j.driver.types.Node;

Expand Down Expand Up @@ -539,7 +538,7 @@ private ValidationResult validate0() {
List<Migration> migrations = this.getMigrations();
Optional<String> targetDatabase = config.getOptionalSchemaDatabase();
try {
MigrationChain migrationChain = new ChainBuilder(true).buildChain(context, migrations, true, ChainBuilderMode.COMPARE);
MigrationChain migrationChain = new ChainBuilder(true).buildChain(context, migrations, true, ChainBuilderMode.COMPARE, true);
int numberOfAppliedMigrations = (int) migrationChain.getElements()
.stream().filter(m -> m.getState() == MigrationState.APPLIED)
.count();
Expand Down Expand Up @@ -721,7 +720,9 @@ private void apply0(List<Migration> migrations) {

StopWatch stopWatch = new StopWatch();
for (Migration migration : new IterableMigrations(config, migrations)) {

if (!chain.isApplied(migration.getVersion().getValue()) && config.getVersionComparator().compare(migration.getVersion(), previousVersion) < 0) {
previousVersion = MigrationVersion.baseline();
}
boolean repeated = false;
Supplier<String> logMessage = () -> String.format("Applied migration %s.", toString(migration));
if (previousVersion != MigrationVersion.baseline() && chain.isApplied(migration.getVersion().getValue())) {
Expand Down Expand Up @@ -766,40 +767,65 @@ private MigrationVersion recordApplication(String neo4jUser, MigrationVersion pr
parameters.put("executionTime", executionTime);
parameters.put(PROPERTY_MIGRATION_TARGET, migrationTarget.orElse(null));

TransactionCallback<ResultSummary> uow;
record ReplacedMigration(long oldRelId, long newMigrationNodeId) {
}

TransactionCallback<Optional<ReplacedMigration>> uow;
if (repeated) {
uow = t -> t.run(
"MATCH (l:__Neo4jMigration) WHERE l.version = $appliedMigration['version'] AND coalesce(l.migrationTarget,'<default>') = coalesce($migrationTarget,'<default>') WITH l "
uow = t -> {
t.run(
"MATCH (l:__Neo4jMigration) WHERE l.version = $appliedMigration['version'] AND coalesce(l.migrationTarget,'<default>') = coalesce($migrationTarget,'<default>') WITH l "
+ "CREATE (l) - [:REPEATED {checksum: $appliedMigration['checksum'], at: datetime({timezone: 'UTC'}), in: duration( {milliseconds: $executionTime} ), by: $installedBy, connectedAs: $neo4jUser}] -> (l)",
parameters).consume();
parameters).consume();
return Optional.empty();
};
} else {
uow = t -> {
String mergeOrMatchAndMaybeCreate;
String mergePreviousMigration;
if (migrationTarget.isPresent()) {
mergeOrMatchAndMaybeCreate = "MERGE (p:__Neo4jMigration {version: $previousVersion, migrationTarget: $migrationTarget}) ";
mergePreviousMigration = "MERGE (p:__Neo4jMigration {version: $previousVersion, migrationTarget: $migrationTarget}) WITH p ";
} else {
Result result = t.run(
"MATCH (p:__Neo4jMigration {version: $previousVersion}) WHERE p.migrationTarget IS NULL RETURN id(p) AS id",
Values.parameters("previousVersion", previousVersion.getValue()));
if (result.hasNext()) {
parameters.put("id", result.single().get("id").asLong());
mergeOrMatchAndMaybeCreate = "MATCH (p) WHERE id(p) = $id WITH p ";
mergePreviousMigration = "MATCH (p) WHERE id(p) = $id WITH p ";
} else {
mergeOrMatchAndMaybeCreate = "CREATE (p:__Neo4jMigration {version: $previousVersion}) ";
mergePreviousMigration = "CREATE (p:__Neo4jMigration {version: $previousVersion}) WITH p ";
}
}

return t.run(
mergeOrMatchAndMaybeCreate
+ "CREATE (c:__Neo4jMigration) SET c = $appliedMigration, c.migrationTarget = $migrationTarget "
+ "MERGE (p) - [:MIGRATED_TO {at: datetime({timezone: 'UTC'}), in: duration( {milliseconds: $executionTime} ), by: $installedBy, connectedAs: $neo4jUser}] -> (c)",
parameters)
.consume();
var createNewMigrationAndPath = """
OPTIONAL MATCH (p) -[om:MIGRATED_TO]-> (ot)
CREATE (c:__Neo4jMigration) SET c = $appliedMigration, c.migrationTarget = $migrationTarget
MERGE (p) - [:MIGRATED_TO {at: datetime({timezone: 'UTC'}), in: duration( {milliseconds: $executionTime} ), by: $installedBy, connectedAs: $neo4jUser}] -> (c)
RETURN id(c) AS insertedId, id(om) AS oldRelId, properties(om), id(ot) AS oldEndId
""";

var result = t.run(mergePreviousMigration + " " + createNewMigrationAndPath, parameters).single();
if (!result.get("oldRelId").isNull()) {
return Optional.of(new ReplacedMigration(result.get("oldRelId").asLong(), result.get("insertedId").asLong()));
} else {
return Optional.empty();
}
};
}

try (Session session = context.getSchemaSession()) {
session.executeWrite(uow);
var f = session.executeWrite(uow);
f.ifPresent(replacedMigration -> {
session.executeWriteWithoutResult(t -> {
t.run("""
MATCH ()-[oldRel]->(oldEnd)
WHERE id(oldRel) = $oldRelId
MATCH (inserted) WHERE id(inserted) = $insertedId
MERGE (inserted) -[r:MIGRATED_TO]-> (oldEnd)
SET r = properties(oldRel)
DELETE (oldRel)
""", Map.of("oldRelId", replacedMigration.oldRelId(), "insertedId", replacedMigration.newMigrationNodeId())).consume();
});
});
}

return appliedMigration.getVersion();
Expand Down
Loading

0 comments on commit 92772da

Please sign in to comment.