Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Enhance detection for CALL IN TRANSACTION. #1540

Merged
merged 7 commits into from
Nov 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,14 @@ final class DefaultCypherResource implements CypherResource {

private static final Pattern USE_DATABASE_PATTERN = Pattern.compile(USE_DATABASE_EXPRESSION);

private static final Pattern CALL_PATTERN = Pattern.compile("(?ims)(?<!`)([^`\\s*]\\s*+CALL\\s*\\{.*}\\s*IN\\s+TRANSACTIONS)(?!`)");
/**
* The regex is as greedy as I can get it without parsing the statement fully. And as of now, there is no parser
* available that still supports periodic commit AND concurrent transactions with all options the latter provides
* today in latest Neo4j.
*/
@SuppressWarnings("squid:S5852")
private static final Pattern CALL_PATTERN = Pattern.compile("(?ims)(?<!`)([^`\\s*+]\\s*+CALL\\s*+(?:\\(.+?\\)\\s*+)?\\{.*}\\s*+(?<transactionClause>IN(?<concurrency>.+?)TRANSACTIONS)?)(?!`)");
private static final Pattern PATTERN_CALL_CONCURRENCY = Pattern.compile("(?ims)(-\\d+|\\d+)?\\s*CONCURRENT");

private static final Pattern USING_PERIODIC_PATTERN = Pattern.compile("(?ims)(?<!`)(([^`\\s*]|^)\\s*+USING\\s+PERIODIC\\s+COMMIT\\s+)(?!`)");

Expand Down Expand Up @@ -348,10 +355,26 @@ enum TransactionMode {
*/
static TransactionMode getTransactionMode(String statement) {

if (CALL_PATTERN.matcher(statement).find() || USING_PERIODIC_PATTERN.matcher(statement).find()) {
if (USING_PERIODIC_PATTERN.matcher(statement).find()) {
return TransactionMode.IMPLICIT;
}

var callMatcher = CALL_PATTERN.matcher(statement);
if (!callMatcher.find()) {
return TransactionMode.MANAGED;
}

var transactionClause = callMatcher.group("transactionClause");
if (transactionClause == null) {
return TransactionMode.MANAGED;
}

var concurrency = callMatcher.group("concurrency");
if (concurrency.isBlank() || PATTERN_CALL_CONCURRENCY.matcher(concurrency.trim()).matches()) {
return TransactionMode.IMPLICIT;
} else {
throw new MigrationsException("Invalid statement: " + statement);
}
return TransactionMode.MANAGED;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,20 @@ void shouldDetectWrongUseStatements() {
WITH line
CREATE (:`PERSON` {name: line[1], age: toInteger(line[2])})
} IN TRANSACTIONS
""",
"MATCH(n)CALL(n){MATCH (m)-->(n)RETURN m}IN TRANSACTIONS RETURN m",
"""
MATCH (n) \
CALL(n) {
MATCH (m)-->(n)
RETURN m
} IN TRANSACTIONS RETURN m
""",
"MATCH (n) CALL(n) {MATCH (m)-->(n)RETURN m} IN TRANSACTIONS RETURN y",
"MATCH(n)CALL(n){MATCH (m)-->(n)RETURN m}IN -2 CONCURRENT TRANSACTIONS OF 23 ROWS RETURN x",
"MATCH(n)CALL(n){MATCH (m)-->(n)RETURN m}IN-1 CONCURRENT TRANSACTIONS OF 23 ROWS RETURN m",
"MATCH(n)CALL(n){MATCH (m)-->(n)RETURN m}IN 42 CONCURRENT TRANSACTIONS OF 23 ROWS RETURN m",
"MATCH (n) CALL {RETURN 1 AS x} IN concurrent TRANSACTIONS RETURN *"
})
void shouldDetectImplicitTransactionNeeds(String query) {

Expand All @@ -353,10 +366,27 @@ void shouldDetectImplicitTransactionNeeds(String query) {
"CREATE (a:`USING PERIODIC COMMIT `) RETURN a",
"CREATE (a:`USING PERIODIC COMMIT `) RETURN a",
"CREATE (a:`CALL {WITH WHATEVER} IN TRANSACTIONS`) RETURN a",
"CREATE (a: ` CALL {WITH WHATEVER} IN TRANSACTIONS`) RETURN a"
"CREATE (a: ` CALL {WITH WHATEVER} IN TRANSACTIONS`) RETURN a",
"MATCH (n) CALL {blub}concurrent IN TRANSACTIONS RETURN n",
"""
WITH apoc.date.currentTimestamp() AS start
CALL apoc.util.sleep(4000)
WITH start, apoc.date.currentTimestamp() AS end
RETURN datetime({epochmillis: start}) AS start, datetime({epochmillis: end}) AS end;
"""
})
void shouldDetectManagedTransactionNeeds(String query) {

assertThat(DefaultCypherResource.getTransactionMode(query)).isEqualTo(DefaultCypherResource.TransactionMode.MANAGED);
}

@ParameterizedTest
@ValueSource(strings = {
"MATCH (n) CALL {blub} IN concurrently TRANSACTIONS RETURN n"
})
void weEvenMightThrowWhenWeDoHalfwayParsingAnyway(String query) {

assertThatExceptionOfType(MigrationsException.class)
.isThrownBy(() -> DefaultCypherResource.getTransactionMode(query));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -804,17 +804,18 @@ void shouldApplyResourceBasedMigrations(MigrationsConfig.TransactionMode transac
}
}

@ParameterizedTest // GH-719
@ValueSource(strings = {"V000__Use_call_in_tx1", "V000__Use_call_in_tx2"})
void shouldAllowCallInTX(String script) {
@ParameterizedTest // GH-719, GH-1537
@ValueSource(ints = {1, 2, 3, 4})
void shouldAllowCallInTX(int idx) {

try (Session session = driver.session()) {
session.run("with range(1,100) as r unwind r as i create (n:F) return n").consume();
session.run("CREATE (a:Asset)-[:ASSET_TYPE]->(type:AssetType)");
}

Migrations migrations = new Migrations(MigrationsConfig.defaultConfig(), driver);
int appliedMigrations = migrations.apply(
Objects.requireNonNull(MigrationsIT.class.getResource("/manual_resources/" + script + ".cypher"))
Objects.requireNonNull(MigrationsIT.class.getResource("/manual_resources/V000__Use_call_in_tx" + idx + ".cypher"))
);

assertThat(appliedMigrations).isEqualTo(1);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
MATCH (type:AssetType) MATCH (a:Asset)-[:ASSET_TYPE]->(type)
CALL (a, type) {
CALL apoc.create.setLabels(a, [ type.naam ] ) yield node
} IN 2 CONCURRENT TRANSACTIONS OF 1000 ROWS;
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
LOAD CSV WITH HEADERS FROM 'https://data.neo4j.com/importing-cypher/movies.csv' AS row
CALL (row) {
MERGE (m:Movie {movieId: row.movieId})
MERGE (y:Year {year: row.year})
MERGE (m)-[r:RELEASED_IN]->(y)
} IN 2 CONCURRENT TRANSACTIONS OF 10 ROWS ON ERROR CONTINUE REPORT STATUS as status
WITH status
WHERE status.errorMessage IS NOT NULL
RETURN status.transactionId AS transaction, status.committed AS commitStatus, status.errorMessage AS errorMessage
Loading