Skip to content

Commit

Permalink
added address resolver
Browse files Browse the repository at this point in the history
  • Loading branch information
conker84 authored and fbiville committed Nov 3, 2023
1 parent 5e05e48 commit d4b9a1f
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,22 @@
import org.neo4j.driver.Config;
import org.neo4j.driver.Driver;
import org.neo4j.driver.Logging;
import org.neo4j.driver.net.ServerAddress;
import org.neo4j.jdbc.Neo4jDriver;

import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.stream.Collectors;


/**
Expand Down Expand Up @@ -88,11 +91,13 @@ public Connection connect(String url, Properties props) throws SQLException {
builder = setMaxTransactionRetryTime(info, builder);
builder.withLogging(Logging.slf4j());

Config config = builder.build();
AuthToken authToken = getAuthToken(info);
Properties routingContext = getRoutingContext(boltUrl, info);
boltUrl = addRoutingPolicy(boltUrl, routingContext);
List<URI> routingUris = buildRoutingUris(boltUrl, routingContext);
builder = setAddressResolvers(routingUris, builder);

Config config = builder.build();
AuthToken authToken = getAuthToken(info);
driver = getDriver(routingUris, config, authToken, info);
driver.verifyConnectivity();
connection = BoltNeo4jConnectionImpl.newInstance(driver, info, url);
Expand All @@ -110,6 +115,13 @@ public Connection connect(String url, Properties props) throws SQLException {
return connection;
}

private Config.ConfigBuilder setAddressResolvers(List<URI> routingUris, Config.ConfigBuilder builder) {
if (routingUris.size() <= 1) return builder;
return builder.withResolver(serverAddress -> routingUris.stream()
.map(m -> ServerAddress.of(m.getHost(), m.getPort()))
.collect(Collectors.toCollection(LinkedHashSet::new)));
}

protected abstract Driver getDriver(List<URI> routingUris, Config config, AuthToken authToken, Properties info) throws URISyntaxException;

private AuthToken getAuthToken(Properties properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,15 @@ public class BoltRoutingNeo4jDriver extends BoltNeo4jDriverImpl {
public static final String CUSTOM_ROUTING_POLICY_SEPARATOR = "&";

private static final BoltDriverCache cache = new BoltDriverCache(params -> {
for (URI uri : params.getRoutingUris()) {
final Driver driver = GraphDatabase.driver(uri, params.getAuthToken(), params.getConfig());
try {
driver.verifyConnectivity();
return driver;
} catch (ServiceUnavailableException e) {
BoltNeo4jUtils.closeSafely(driver, null);
} catch (Throwable e) {
// for any other errors, we first close the driver and then rethrow the original error out.
BoltNeo4jUtils.closeSafely(driver, null);
throw e;
}
final Driver driver = GraphDatabase.driver(params.getRoutingUris().get(0), params.getAuthToken(), params.getConfig());
try {
driver.verifyConnectivity();
return driver;
} catch (Throwable e) {
// for any other errors, we first close the driver and then rethrow the original error out.
BoltNeo4jUtils.closeSafely(driver, null);
throw e;
}
throw new ServiceUnavailableException( "Failed to discover an available server" );
});

static {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public Session getSession() {

public URI getURI() {
return this.sidecars.stream().findAny()
.map(instance -> String.format("bolt+routing://%s:%d", instance.getContainerIpAddress(),
.map(instance -> String.format("neo4j://%s:%d", instance.getContainerIpAddress(),
instance.getMappedPort(DEFAULT_BOLT_PORT)))
.map(uri -> {
try {
Expand All @@ -179,7 +179,7 @@ public URI getURI() {

public URI getAllMembersURI() {
try {
return new URI("bolt+routing://" + this.sidecars.stream()
return new URI("neo4j://" + this.sidecars.stream()
.map(instance -> String.format("%s:%d", instance.getContainerIpAddress(),
instance.getMappedPort(DEFAULT_BOLT_PORT)))
.collect(Collectors.joining(",")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ public class BoltRoutingIT {
public static void beforeClass() {
try {
cluster = TestcontainersCausalCluster.create(3, 1, Duration.ofMinutes(4), Collections.emptyMap());
assumeNotNull(cluster);
} catch (Exception ignored) {}
} catch (Exception ignored) {
ignored.printStackTrace();
}
assumeNotNull(cluster);
}

@AfterClass
Expand All @@ -84,7 +86,7 @@ public static void afterClass() {
//@Ignore
@Test public void shouldAccessReadReplicaNodes() throws SQLException {

try (Connection connection = DriverManager.getConnection("jdbc:neo4j:" + cluster.getURI().toString(), NEO4J_USER, NEO4J_PASSWORD)) {
try (Connection connection = DriverManager.getConnection("jdbc:neo4j:" + cluster.getAllMembersURI().toString(), NEO4J_USER, NEO4J_PASSWORD)) {
connection.setReadOnly(true);
try (Statement statement = connection.createStatement()) {
try (ResultSet resultSet = statement.executeQuery("match (t:BoltRoutingTest) return count(t) as tot")) {
Expand Down Expand Up @@ -115,7 +117,7 @@ public static void afterClass() {

expectedEx.expect(SQLException.class);

try (Connection connection = DriverManager.getConnection("jdbc:neo4j:" + cluster.getURI().toString(), NEO4J_USER, NEO4J_PASSWORD)) {
try (Connection connection = DriverManager.getConnection("jdbc:neo4j:" + cluster.getAllMembersURI().toString(), NEO4J_USER, NEO4J_PASSWORD)) {
connection.setReadOnly(true);

try (Statement statement = connection.createStatement()) {
Expand All @@ -129,7 +131,7 @@ public static void afterClass() {
//@Ignore
@Test public void shouldUseBookmarkToReadYourOwnWrites() throws SQLException {

try (Connection connection = DriverManager.getConnection("jdbc:neo4j:" + cluster.getURI().toString(), NEO4J_USER, NEO4J_PASSWORD)) {
try (Connection connection = DriverManager.getConnection("jdbc:neo4j:" + cluster.getAllMembersURI().toString(), NEO4J_USER, NEO4J_PASSWORD)) {

connection.setAutoCommit(false);

Expand Down Expand Up @@ -162,7 +164,7 @@ public static void afterClass() {
bookmarks.add(insertAndGetBookmark());

String bookmarksAsString = String.join(",", bookmarks);
try (Connection connection = DriverManager.getConnection("jdbc:neo4j:" + cluster.getURI().toString(), NEO4J_USER, NEO4J_PASSWORD)) {
try (Connection connection = DriverManager.getConnection("jdbc:neo4j:" + cluster.getAllMembersURI().toString(), NEO4J_USER, NEO4J_PASSWORD)) {

connection.setClientInfo(BoltRoutingNeo4jDriver.BOOKMARK, bookmarksAsString);
connection.setReadOnly(true);
Expand All @@ -179,7 +181,7 @@ public static void afterClass() {
}

private String insertAndGetBookmark() throws SQLException {
try (Connection connection = DriverManager.getConnection("jdbc:neo4j:" + cluster.getURI().toString(), NEO4J_USER, NEO4J_PASSWORD)) {
try (Connection connection = DriverManager.getConnection("jdbc:neo4j:" + cluster.getAllMembersURI().toString(), NEO4J_USER, NEO4J_PASSWORD)) {
connection.setAutoCommit(false);

try (Statement statement = connection.createStatement()) {
Expand Down

0 comments on commit d4b9a1f

Please sign in to comment.