Skip to content

Commit

Permalink
fix: Enhance detection for CALL IN TRANSACTION. (#1540)
Browse files Browse the repository at this point in the history
Detects now also the `CONCURRENT` keyword.

Fixes #1537, but I really think mixing up a script runner and its
transactional flow with concurrent database managed transaction can be
quite a source of confusion.
  • Loading branch information
michael-simons authored Nov 30, 2024
1 parent fd16d3f commit cd0f323
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,14 @@ final class DefaultCypherResource implements CypherResource {

private static final Pattern USE_DATABASE_PATTERN = Pattern.compile(USE_DATABASE_EXPRESSION);

private static final Pattern CALL_PATTERN = Pattern.compile("(?ims)(?<!`)([^`\\s*]\\s*+CALL\\s*\\{.*}\\s*IN\\s+TRANSACTIONS)(?!`)");
/**
* The regex is as greedy as I can get it without parsing the statement fully. And as of now, there is no parser
* available that still supports periodic commit AND concurrent transactions with all options the latter provides
* today in latest Neo4j.
*/
@SuppressWarnings("squid:S5852")
private static final Pattern CALL_PATTERN = Pattern.compile("(?ims)(?<!`)([^`\\s*+]\\s*+CALL\\s*+(?:\\(.+?\\)\\s*+)?\\{.*}\\s*+(?<transactionClause>IN(?<concurrency>.+?)TRANSACTIONS)?)(?!`)");
private static final Pattern PATTERN_CALL_CONCURRENCY = Pattern.compile("(?ims)(-\\d+|\\d+)?\\s*CONCURRENT");

private static final Pattern USING_PERIODIC_PATTERN = Pattern.compile("(?ims)(?<!`)(([^`\\s*]|^)\\s*+USING\\s+PERIODIC\\s+COMMIT\\s+)(?!`)");

Expand Down Expand Up @@ -348,10 +355,26 @@ enum TransactionMode {
*/
static TransactionMode getTransactionMode(String statement) {

if (CALL_PATTERN.matcher(statement).find() || USING_PERIODIC_PATTERN.matcher(statement).find()) {
if (USING_PERIODIC_PATTERN.matcher(statement).find()) {
return TransactionMode.IMPLICIT;
}

var callMatcher = CALL_PATTERN.matcher(statement);
if (!callMatcher.find()) {
return TransactionMode.MANAGED;
}

var transactionClause = callMatcher.group("transactionClause");
if (transactionClause == null) {
return TransactionMode.MANAGED;
}

var concurrency = callMatcher.group("concurrency");
if (concurrency.isBlank() || PATTERN_CALL_CONCURRENCY.matcher(concurrency.trim()).matches()) {
return TransactionMode.IMPLICIT;
} else {
throw new MigrationsException("Invalid statement: " + statement);
}
return TransactionMode.MANAGED;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,20 @@ void shouldDetectWrongUseStatements() {
WITH line
CREATE (:`PERSON` {name: line[1], age: toInteger(line[2])})
} IN TRANSACTIONS
""",
"MATCH(n)CALL(n){MATCH (m)-->(n)RETURN m}IN TRANSACTIONS RETURN m",
"""
MATCH (n) \
CALL(n) {
MATCH (m)-->(n)
RETURN m
} IN TRANSACTIONS RETURN m
""",
"MATCH (n) CALL(n) {MATCH (m)-->(n)RETURN m} IN TRANSACTIONS RETURN y",
"MATCH(n)CALL(n){MATCH (m)-->(n)RETURN m}IN -2 CONCURRENT TRANSACTIONS OF 23 ROWS RETURN x",
"MATCH(n)CALL(n){MATCH (m)-->(n)RETURN m}IN-1 CONCURRENT TRANSACTIONS OF 23 ROWS RETURN m",
"MATCH(n)CALL(n){MATCH (m)-->(n)RETURN m}IN 42 CONCURRENT TRANSACTIONS OF 23 ROWS RETURN m",
"MATCH (n) CALL {RETURN 1 AS x} IN concurrent TRANSACTIONS RETURN *"
})
void shouldDetectImplicitTransactionNeeds(String query) {

Expand All @@ -353,10 +366,27 @@ void shouldDetectImplicitTransactionNeeds(String query) {
"CREATE (a:`USING PERIODIC COMMIT `) RETURN a",
"CREATE (a:`USING PERIODIC COMMIT `) RETURN a",
"CREATE (a:`CALL {WITH WHATEVER} IN TRANSACTIONS`) RETURN a",
"CREATE (a: ` CALL {WITH WHATEVER} IN TRANSACTIONS`) RETURN a"
"CREATE (a: ` CALL {WITH WHATEVER} IN TRANSACTIONS`) RETURN a",
"MATCH (n) CALL {blub}concurrent IN TRANSACTIONS RETURN n",
"""
WITH apoc.date.currentTimestamp() AS start
CALL apoc.util.sleep(4000)
WITH start, apoc.date.currentTimestamp() AS end
RETURN datetime({epochmillis: start}) AS start, datetime({epochmillis: end}) AS end;
"""
})
void shouldDetectManagedTransactionNeeds(String query) {

assertThat(DefaultCypherResource.getTransactionMode(query)).isEqualTo(DefaultCypherResource.TransactionMode.MANAGED);
}

@ParameterizedTest
@ValueSource(strings = {
"MATCH (n) CALL {blub} IN concurrently TRANSACTIONS RETURN n"
})
void weEvenMightThrowWhenWeDoHalfwayParsingAnyway(String query) {

assertThatExceptionOfType(MigrationsException.class)
.isThrownBy(() -> DefaultCypherResource.getTransactionMode(query));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -804,17 +804,18 @@ void shouldApplyResourceBasedMigrations(MigrationsConfig.TransactionMode transac
}
}

@ParameterizedTest // GH-719
@ValueSource(strings = {"V000__Use_call_in_tx1", "V000__Use_call_in_tx2"})
void shouldAllowCallInTX(String script) {
@ParameterizedTest // GH-719, GH-1537
@ValueSource(ints = {1, 2, 3, 4})
void shouldAllowCallInTX(int idx) {

try (Session session = driver.session()) {
session.run("with range(1,100) as r unwind r as i create (n:F) return n").consume();
session.run("CREATE (a:Asset)-[:ASSET_TYPE]->(type:AssetType)");
}

Migrations migrations = new Migrations(MigrationsConfig.defaultConfig(), driver);
int appliedMigrations = migrations.apply(
Objects.requireNonNull(MigrationsIT.class.getResource("/manual_resources/" + script + ".cypher"))
Objects.requireNonNull(MigrationsIT.class.getResource("/manual_resources/V000__Use_call_in_tx" + idx + ".cypher"))
);

assertThat(appliedMigrations).isEqualTo(1);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
MATCH (type:AssetType) MATCH (a:Asset)-[:ASSET_TYPE]->(type)
CALL (a, type) {
CALL apoc.create.setLabels(a, [ type.naam ] ) yield node
} IN 2 CONCURRENT TRANSACTIONS OF 1000 ROWS;
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
LOAD CSV WITH HEADERS FROM 'https://data.neo4j.com/importing-cypher/movies.csv' AS row
CALL (row) {
MERGE (m:Movie {movieId: row.movieId})
MERGE (y:Year {year: row.year})
MERGE (m)-[r:RELEASED_IN]->(y)
} IN 2 CONCURRENT TRANSACTIONS OF 10 ROWS ON ERROR CONTINUE REPORT STATUS as status
WITH status
WHERE status.errorMessage IS NOT NULL
RETURN status.transactionId AS transaction, status.committed AS commitStatus, status.errorMessage AS errorMessage

0 comments on commit cd0f323

Please sign in to comment.