Skip to content

Commit

Permalink
refactor(interactive): Support customizing ports when deploy interact…
Browse files Browse the repository at this point in the history
…ive in local mode and connect via SDK (#3990)

Interactive service exposes many ports, including `admin_port`,
`query_port(stored_proc_port)`, `cypher_port` and `gremlin_port`. In
previous SDK implementation, we create a driver by only providing the
`admin_endpoint`, hoping for get all other ports via `GET
/v1/service/status`, However, this only works if the SDK runs inside the
service's container. In cases like k8s deployment, the internal port
will be mapped to a large port number which is chosen by the cluster.

In this PR, we let user to declare the endpoints to environment
variables, and connect via SDK.
```bash
export INTERACTIVE_ADMIN_ENDPOINT: http://host:port
export INTERACTIVE_STORED_PROC_ENDPOINT: http://host:port
export INTERACTIVE_CYPHER_ENDPOINT: neo4j://host:port or bolt://host:port
export INTERACTIVE_GREMLIN_ENDPOINT: ws://host:port/gremlin
```

For example, with python sdk, user can connect to the Interactive
service after all environment variable is exported.
```python
from gs_interactive.client.driver import Driver
driver = Driver()
with driver.session() as sess:
   #....
```


The documentaion will be added after the interactive documentation is
refactored with `gsctl`
  • Loading branch information
zhanglei1949 authored Jul 3, 2024
1 parent b395811 commit 70188a6
Show file tree
Hide file tree
Showing 9 changed files with 243 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,36 +32,90 @@
public class Driver {
private static final Logger logger = Logger.getLogger(Driver.class.getName());

private final String host;
private final int port;
private final String adminUri;
private final String storedProcUri;
private String cypherUri;
private String gremlinUri;
private String host;
private int port;
private Session defaultSession;

public static Driver connect(String host, int port) {
return new Driver(host + ":" + port);
/**
* Connect to the interactive service with the environment variables
* INTERACTIVE_ADMIN_ENDPOINT, INTERACTIVE_STORED_PROC_ENDPOINT, INTERACTIVE_CYPHER_ENDPOINT, INTERACTIVE_GREMLIN_ENDPOINT
* @return The driver object.
*/
public static Driver connect() {
String adminUri = System.getenv("INTERACTIVE_ADMIN_ENDPOINT");
if (adminUri == null) {
throw new IllegalArgumentException("INTERACTIVE_ADMIN_ENDPOINT is not set");
}
String storedProcUri = System.getenv("INTERACTIVE_STORED_PROC_ENDPOINT");
if (storedProcUri == null) {
logger.warning(
"INTERACTIVE_STORED_PROC_ENDPOINT is not set, will try to parse endpoint from"
+ " service_status");
}
String cypherUri = System.getenv("INTERACTIVE_CYPHER_ENDPOINT");
if (cypherUri == null) {
logger.warning(
"INTERACTIVE_CYPHER_ENDPOINT is not set, will try to parse endpoint from"
+ " service_status");
}
String gremlinUri = System.getenv("INTERACTIVE_GREMLIN_ENDPOINT");
if (gremlinUri == null) {
logger.warning(
"INTERACTIVE_GREMLIN_ENDPOINT is not set, will try to parse endpoint from"
+ " service_status");
}
return connect(adminUri, storedProcUri, cypherUri, gremlinUri);
}

/**
* Connect to the interactive service by specifying the URIs of the admin, stored procedure, cypher, and gremlin services.
* @param adminUri The URI of the admin service.
* @param storedProcUri The URI of the stored procedure service.
* @param cypherUri The URI of the cypher service.
* @param gremlinUri The URI of the gremlin service.
* @return The driver object.
*/
public static Driver connect(
String adminUri, String storedProcUri, String cypherUri, String gremlinUri) {
return new Driver(adminUri, storedProcUri, cypherUri, gremlinUri);
}

/**
* Should only be used internally. Users should use method connect() or connect(admin_uri,storedProcUri,cypherUri,gremlinUri)
* which require the URIs of all services, or need all uris exported to the environment variables.
* Connect to the interactive service by only specifying the admin service's uri.
* @param uri The URI of the admin service.
* @return The driver object.
*/
public static Driver connect(String uri) {
return new Driver(uri);
}

private Driver(String uri) {
this.adminUri = uri;
this.storedProcUri = null;
this.cypherUri = null;
this.gremlinUri = null;
// Parse uri
String[] parts = uri.split(":");
if (parts.length != 2) {
throw new IllegalArgumentException("Invalid uri: " + uri);
}
String host = parts[0];
this.port = Integer.parseInt(parts[1]);
if (host.startsWith("http://")) {
this.host = host.substring(7);
} else {
this.host = host;
}
initHostPort();
this.defaultSession = null;
}

private Driver(String host, int port) {
this.host = host;
this.port = port;
private Driver(String adminUri, String storedProcUri, String cypherUri, String gremlinUri) {
this.adminUri = adminUri;
this.storedProcUri = storedProcUri;
this.cypherUri = cypherUri;
this.gremlinUri = gremlinUri;
// Parse uri
if (storedProcUri != null && !storedProcUri.startsWith("http")) {
throw new IllegalArgumentException("Invalid uri: " + storedProcUri);
}
initHostPort();
this.defaultSession = null;
}

/**
Expand All @@ -70,7 +124,11 @@ private Driver(String host, int port) {
* @return
*/
public Session session() {
return DefaultSession.newInstance(this.host, this.port);
if (storedProcUri == null) {
return DefaultSession.newInstance(adminUri);
} else {
return DefaultSession.newInstance(adminUri, storedProcUri);
}
}

public Session getDefaultSession() {
Expand Down Expand Up @@ -101,11 +159,15 @@ public org.neo4j.driver.Session getNeo4jSession() {
}

public String getNeo4jEndpoint() {
if (cypherUri != null) {
return cypherUri;
}
Pair<String, Integer> endpoint = getNeo4jEndpointImpl();
if (endpoint == null) {
return null;
}
return "bolt://" + endpoint.getLeft() + ":" + endpoint.getRight();
cypherUri = "bolt://" + endpoint.getLeft() + ":" + endpoint.getRight();
return cypherUri;
}

/**
Expand All @@ -118,7 +180,7 @@ public Pair<String, Integer> getGremlinEndpoint() {
}

public Client getGremlinClient() {
Pair<String, Integer> endpoint = getGremlinEndpointImpl();
Pair<String, Integer> endpoint = getGremlinEndpoint();
Cluster cluster =
Cluster.build()
.addContactPoint(endpoint.getLeft())
Expand Down Expand Up @@ -161,6 +223,17 @@ private Pair<String, Integer> getNeo4jEndpointImpl() {

// TODO(zhanglei): return null if gremlin is not enabled
private Pair<String, Integer> getGremlinEndpointImpl() {
if (gremlinUri != null) {
// parse host and port from ws://host:port/gremlin
String[] parts = gremlinUri.split(":");
if (parts.length != 3) {
throw new IllegalArgumentException("Invalid uri: " + gremlinUri);
}
String host = parts[1].substring(2);
String portStr = parts[2].split("/")[0];
Integer port = Integer.parseInt(portStr);
return Pair.of(host, port);
}
Session gsSession = getDefaultSession();
Result<ServiceStatus> serviceStatus = gsSession.getServiceStatus();
if (!serviceStatus.isOk()) {
Expand All @@ -173,4 +246,16 @@ private Pair<String, Integer> getGremlinEndpointImpl() {
return Pair.of(host, gremlinPort);
}
}

private void initHostPort() {
if (!adminUri.startsWith("http")) {
throw new IllegalArgumentException("Invalid uri: " + adminUri);
}
String[] parts = adminUri.split(":");
if (parts.length != 3) {
throw new IllegalArgumentException("Invalid uri: " + adminUri);
}
host = parts[1].substring(2);
port = Integer.parseInt(parts[2]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class DefaultSession implements Session {
*
* @param uri should be in the format "http://host:port"
*/
private DefaultSession(String uri) {
private DefaultSession(String uri, String storedProcUri) {
client = new ApiClient();
client.setBasePath(uri);
client.setReadTimeout(DEFAULT_READ_TIMEOUT);
Expand All @@ -71,30 +71,32 @@ private DefaultSession(String uri) {

utilsApi = new UtilsApi(client);

Result<ServiceStatus> status = getServiceStatus();
if (!status.isOk()) {
throw new RuntimeException(
"Failed to connect to the server: " + status.getStatusMessage());
}
// TODO: should construct queryService from a endpoint, not a port
Integer queryPort = status.getValue().getHqpsPort();
if (storedProcUri == null) {
Result<ServiceStatus> status = getServiceStatus();
if (!status.isOk()) {
throw new RuntimeException(
"Failed to connect to the server: " + status.getStatusMessage());
}
// TODO: should construct queryService from a endpoint, not a port
Integer queryPort = status.getValue().getHqpsPort();

// Replace the port with the query port, http:://host:port -> http:://host:queryPort
String queryUri = uri.replaceFirst(":[0-9]+", ":" + queryPort);
System.out.println("Query URI: " + queryUri);
// Replace the port with the query port, http:://host:port -> http:://host:queryPort
storedProcUri = uri.replaceFirst(":[0-9]+", ":" + queryPort);
System.out.println("Query URI: " + storedProcUri);
}
queryClient = new ApiClient();
queryClient.setBasePath(queryUri);
queryClient.setBasePath(storedProcUri);
queryClient.setReadTimeout(DEFAULT_READ_TIMEOUT);
queryClient.setWriteTimeout(DEFAULT_WRITE_TIMEOUT);
queryApi = new QueryServiceApi(queryClient);
}

public static DefaultSession newInstance(String uri) {
return new DefaultSession(uri);
public static DefaultSession newInstance(String adminUri) {
return new DefaultSession(adminUri, null);
}

public static DefaultSession newInstance(String host, int port) {
return new DefaultSession("http://" + host + ":" + port);
public static DefaultSession newInstance(String adminUri, String storedProcUri) {
return new DefaultSession(adminUri, storedProcUri);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ public boolean empty() {

@BeforeAll
public static void beforeClass() {
String interactiveEndpoint = System.getProperty("interactive.endpoint", "localhost:7777");
String interactiveEndpoint =
System.getProperty("interactive.endpoint", "http://localhost:7777");
driver = Driver.connect(interactiveEndpoint);
session = driver.session();
String neo4jEndpoint = driver.getNeo4jEndpoint();
Expand Down Expand Up @@ -564,6 +565,15 @@ public void test10CallCypherProcedureViaNeo4j() {
logger.info("result: " + result.toString());
}

@Test
@Order(16)
public void test11CreateDriver() {
// Create a new driver with all endpoints specified.
// Assume the environment variables are set
Driver driver = Driver.connect();
Session session = driver.session();
}

@AfterAll
public static void afterClass() {
logger.info("clean up");
Expand Down
67 changes: 55 additions & 12 deletions flex/interactive/sdk/python/gs_interactive/client/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# limitations under the License.
#

import os
import sys

from gremlin_python import statics
Expand All @@ -32,22 +33,54 @@


class Driver:
def __init__(self, endpoint: str):
# split uri into host and port
self._endpoint = endpoint
# prepend http:// to self._endpoint
if not self._endpoint.startswith("http://"):
def __init__(self, admin_endpoint: str = None, stored_proc_endpoint : str = None, cypher_endpoint : str = None, gremlin_endpoint : str = None):
"""
Construct a new driver with the given endpoints.
"""
if admin_endpoint is None:
self.read_endpoints_from_env()
else:
self._admin_endpoint = admin_endpoint
self._stored_proc_endpoint = stored_proc_endpoint
self._cypher_endpoint = cypher_endpoint
self._gremlin_endpoint = gremlin_endpoint
self._session = None
self.init_host_and_port()

def init_host_and_port(self):
# prepend http:// to self._admin_endpoint
if not self._admin_endpoint.startswith("http://"):
raise ValueError("Invalid uri, expected format is http://host:port")
host_and_port = self._endpoint[7:]
host_and_port = self._admin_endpoint[7:]
splitted = host_and_port.split(":")
if len(splitted) != 2:
raise ValueError("Invalid uri, expected format is host:port")
self._host = splitted[0]
self._port = int(splitted[1])
self._session = None
def read_endpoints_from_env(self):
"""
Construct a new driver from the endpoints declared in environment variables.
INTERACTIVE_ADMIN_ENDPOINT: http://host:port
INTERACTIVE_STORED_PROC_ENDPOINT: http://host:port
INTERACTIVE_CYPHER_ENDPOINT: neo4j://host:port or bolt://host:port
INTERACTIVE_GREMLIN_ENDPOINT: ws://host:port/gremlin
"""
self._admin_endpoint = os.environ.get("INTERACTIVE_ADMIN_ENDPOINT")
assert self._admin_endpoint is not None, "INTERACTIVE_ADMIN_ENDPOINT is not set"
self._stored_proc_endpoint = os.environ.get("INTERACTIVE_STORED_PROC_ENDPOINT")
if self._stored_proc_endpoint is None:
print("INTERACTIVE_STORED_PROC_ENDPOINT is not set, will try to get it from service status endpoint")
self._cypher_endpoint = os.environ.get("INTERACTIVE_CYPHER_ENDPOINT")
if self._cypher_endpoint is None:
print("INTERACTIVE_CYPHER_ENDPOINT is not set, will try to get it from service status endpoint")
self._gremlin_endpoint = os.environ.get("INTERACTIVE_GREMLIN_ENDPOINT")
if self._gremlin_endpoint is None:
print("INTERACTIVE_GREMLIN_ENDPOINT is not set, will try to get it from service status endpoint")

def session(self) -> Session:
return DefaultSession(self._endpoint)
if self._stored_proc_endpoint is not None:
return DefaultSession(self._admin_endpoint, self._stored_proc_endpoint)
return DefaultSession(self._admin_endpoint)

def getDefaultSession(self) -> Session:
if self._session is None:
Expand All @@ -69,10 +102,15 @@ def get_port(self) -> int:
return self._port

def getNeo4jSessionImpl(self, **config) -> Neo4jSession:
endpoint = self.getNeo4jEndpoint()
return GraphDatabase.driver(endpoint, auth=None).session(**config)
if self._cypher_endpoint is None:
self._cypher_endpoint = self.getNeo4jEndpoint()
return GraphDatabase.driver(self._cypher_endpoint, auth=None).session(**config)

def getNeo4jEndpoint(self) -> str:
"""
Get the bolt endpoint from the service status endpoint.
Only works if the sdk was running in the same pod as the service.
"""
service_status = self.getDefaultSession().get_service_status()
if service_status.is_ok():
bolt_port = service_status.get_value().bolt_port
Expand All @@ -83,11 +121,16 @@ def getNeo4jEndpoint(self) -> str:
)

def getGremlinClientImpl(self):
gremlin_endpoint = self.getGremlinEndpoint()
graph_url = "ws://" + gremlin_endpoint + "/gremlin"
if self._gremlin_endpoint is None:
self._gremlin_endpoint = self.getGremlinEndpoint()
graph_url = "ws://" + self._gremlin_endpoint + "/gremlin"
return Client(graph_url, "g")

def getGremlinEndpoint(self):
"""
Get the gremlin endpoint from the service status endpoint.
Only works if the sdk was running in the same pod as the service.
"""
service_status = self.getDefaultSession().get_service_status()
if service_status.is_ok():
gremlin_port = service_status.get_value().gremlin_port
Expand Down
Loading

0 comments on commit 70188a6

Please sign in to comment.