diff --git a/databend-client/src/main/java/com/databend/client/ClientSettings.java b/databend-client/src/main/java/com/databend/client/ClientSettings.java index 2cdb82fa..23f4991c 100644 --- a/databend-client/src/main/java/com/databend/client/ClientSettings.java +++ b/databend-client/src/main/java/com/databend/client/ClientSettings.java @@ -24,6 +24,7 @@ public class ClientSettings { public static final int DEFAULT_RETRY_ATTEMPTS = 5; public static final String X_Databend_Query_ID = "X-DATABEND-QUERY-ID"; public static final String X_DATABEND_ROUTE_HINT = "X-DATABEND-ROUTE-HINT"; + public static final String X_DATABEND_STICKY_NODE = "X-DATABEND-STICKY-NODE"; public static final String DatabendWarehouseHeader = "X-DATABEND-WAREHOUSE"; public static final String DatabendTenantHeader = "X-DATABEND-TENANT"; private final String host; diff --git a/databend-client/src/main/java/com/databend/client/DatabendClientV1.java b/databend-client/src/main/java/com/databend/client/DatabendClientV1.java index ee151767..d620a201 100644 --- a/databend-client/src/main/java/com/databend/client/DatabendClientV1.java +++ b/databend-client/src/main/java/com/databend/client/DatabendClientV1.java @@ -67,12 +67,13 @@ public class DatabendClientV1 private final Map additonalHeaders; // client session private final AtomicReference databendSession; + private String nodeID; private final AtomicReference currentResults = new AtomicReference<>(null); private static final Logger logger = Logger.getLogger(DatabendClientV1.class.getPackage().getName()); private Consumer on_session_state_update; - public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings settings, Consumer on_session_state_update) { + public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings settings, Consumer on_session_state_update, AtomicReference last_node_id) { requireNonNull(httpClient, "httpClient is null"); requireNonNull(sql, "sql is null"); requireNonNull(settings, "settings is null"); @@ -87,11 +88,13 @@ public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings sett this.maxRetryAttempts = settings.getRetryAttempts(); // need atomic reference since it may get updated when query returned. this.databendSession = new AtomicReference<>(settings.getSession()); + this.nodeID = last_node_id.get(); Request request = buildQueryRequest(query, settings); boolean completed = this.execute(request); if (!completed) { throw new RuntimeException("Query failed to complete"); } + last_node_id.set(this.nodeID); } public static List discoverNodes(OkHttpClient httpClient, ClientSettings settings) { @@ -127,8 +130,13 @@ private Request buildQueryRequest(String query, ClientSettings settings) { if (reqString == null || reqString.isEmpty()) { throw new IllegalArgumentException("Invalid request: " + req); } + url = url.newBuilder().encodedPath(QUERY_PATH).build(); Request.Builder builder = prepareRequest(url, this.additonalHeaders); + DatabendSession session = databendSession.get(); + if (session != null && session.getNeedSticky()) { + builder.addHeader(ClientSettings.X_DATABEND_STICKY_NODE, nodeID); + } return builder.post(okhttp3.RequestBody.create(MEDIA_TYPE_JSON, reqString)).build(); } @@ -299,6 +307,7 @@ public boolean execute(Request request) { } private void processResponse(Headers headers, QueryResults results) { + nodeID = results.getNodeId(); DatabendSession session = results.getSession(); if (session != null) { databendSession.set(session); @@ -332,6 +341,7 @@ public boolean advance() { HttpUrl url = HttpUrl.get(this.host); url = url.newBuilder().encodedPath(nextUriPath).build(); Request.Builder builder = prepareRequest(url, this.additonalHeaders); + builder.addHeader(ClientSettings.X_DATABEND_STICKY_NODE, this.nodeID); Request request = builder.get().build(); return executeInternal(request, OptionalLong.of(MAX_MATERIALIZED_JSON_RESPONSE_SIZE)); } diff --git a/databend-client/src/main/java/com/databend/client/DatabendSession.java b/databend-client/src/main/java/com/databend/client/DatabendSession.java index 24178f4d..592c8cbd 100644 --- a/databend-client/src/main/java/com/databend/client/DatabendSession.java +++ b/databend-client/src/main/java/com/databend/client/DatabendSession.java @@ -41,6 +41,7 @@ public class DatabendSession { // txn private String txnState; + private Boolean needSticky; private Map additionalProperties = new HashMap<>(); @@ -48,15 +49,17 @@ public class DatabendSession { public DatabendSession( @JsonProperty("database") String database, @JsonProperty("settings") Map settings, - @JsonProperty("txn_state") String txnState) { + @JsonProperty("txn_state") String txnState, + @JsonProperty("need_sticky") Boolean needSticky) { this.database = database; this.settings = settings; this.txnState = txnState; + this.needSticky = needSticky; } // default public static DatabendSession createDefault() { - return new DatabendSession(DEFAULT_DATABASE, null, null); + return new DatabendSession(DEFAULT_DATABASE, null, null, false); } public static Builder builder() { @@ -79,6 +82,11 @@ public String getTxnState() { return txnState; } + @JsonProperty("need_sticky") + public Boolean getNeedSticky() { + return needSticky; + } + @JsonAnyGetter public Map getAdditionalProperties() { return additionalProperties; @@ -145,7 +153,7 @@ public void setAutoCommit(boolean autoCommit) { } public DatabendSession build() { - return new DatabendSession(database, settings, txnState); + return new DatabendSession(database, settings, txnState, false); } } } diff --git a/databend-client/src/main/java/com/databend/client/QueryResults.java b/databend-client/src/main/java/com/databend/client/QueryResults.java index 30b0a07b..35fe223b 100644 --- a/databend-client/src/main/java/com/databend/client/QueryResults.java +++ b/databend-client/src/main/java/com/databend/client/QueryResults.java @@ -25,6 +25,7 @@ public class QueryResults { private final String queryId; + private final String nodeId; private final String sessionId; private final DatabendSession session; private final List schema; @@ -42,6 +43,7 @@ public class QueryResults { @JsonCreator public QueryResults( @JsonProperty("id") String queryId, + @JsonProperty("node_id") String nodeId, @JsonProperty("session_id") String sessionId, @JsonProperty("session") DatabendSession session, @JsonProperty("schema") List schema, @@ -55,6 +57,7 @@ public QueryResults( @JsonProperty("next_uri") URI nextUri, @JsonProperty("kill_uri") URI killUri) { this.queryId = queryId; + this.nodeId = nodeId; this.sessionId = sessionId; this.session = session; this.schema = schema; @@ -76,6 +79,11 @@ public String getQueryId() { return queryId; } + @JsonProperty + public String getNodeId() { + return nodeId; + } + @JsonProperty public String getSessionId() { return sessionId; diff --git a/databend-client/src/test/java/com/databend/client/TestClientIT.java b/databend-client/src/test/java/com/databend/client/TestClientIT.java index 25b6df5c..cdd3b27d 100644 --- a/databend-client/src/test/java/com/databend/client/TestClientIT.java +++ b/databend-client/src/test/java/com/databend/client/TestClientIT.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; import static com.databend.client.ClientSettings.*; @@ -37,7 +38,8 @@ public void testBasicQueryPagination() { OkHttpClient client = new OkHttpClient.Builder().addInterceptor(OkHttpUtils.basicAuthInterceptor("databend", "databend")).build(); ClientSettings settings = new ClientSettings(DATABEND_HOST); - DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null); + AtomicReference lastNodeID = new AtomicReference<>(); + DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null, lastNodeID); System.out.println(cli.getResults().getData()); Assert.assertEquals(cli.getQuery(), "select 1"); Assert.assertEquals(cli.getSession().getDatabase(), DATABASE); @@ -55,8 +57,10 @@ public void testConnectionRefused() { OkHttpClient client = new OkHttpClient.Builder().addInterceptor(OkHttpUtils.basicAuthInterceptor("databend", "databend")).build(); ClientSettings settings = new ClientSettings("http://localhost:13191"); + AtomicReference lastNodeID = new AtomicReference<>(); + try { - DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null); + DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null, lastNodeID); cli.getResults(); // This should trigger the connection attempt Assert.fail("Expected exception was not thrown"); } catch (Exception e) { @@ -71,11 +75,12 @@ public void testConnectionRefused() { public void testBasicQueryIDHeader() { OkHttpClient client = new OkHttpClient.Builder().addInterceptor(OkHttpUtils.basicAuthInterceptor("databend", "databend")).build(); String expectedUUID = UUID.randomUUID().toString(); + AtomicReference lastNodeID = new AtomicReference<>(); Map additionalHeaders = new HashMap<>(); additionalHeaders.put(X_Databend_Query_ID, expectedUUID); ClientSettings settings = new ClientSettings(DATABEND_HOST, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), additionalHeaders, null, DEFAULT_RETRY_ATTEMPTS); - DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null); + DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null, lastNodeID); Assert.assertEquals(cli.getAdditionalHeaders().get(X_Databend_Query_ID), expectedUUID); String expectedUUID1 = UUID.randomUUID().toString(); @@ -84,7 +89,7 @@ public void testBasicQueryIDHeader() { ClientSettings settings1 = new ClientSettings(DATABEND_HOST, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), additionalHeaders1, null, DEFAULT_RETRY_ATTEMPTS); Assert.assertEquals(cli.getAdditionalHeaders().get(X_Databend_Query_ID), expectedUUID); // check X_Databend_Query_ID won't change after calling next() - DatabendClient cli1 = new DatabendClientV1(client, "SELECT number from numbers(200000) order by number", settings1, null); + DatabendClient cli1 = new DatabendClientV1(client, "SELECT number from numbers(200000) order by number", settings1, null, lastNodeID); for (int i = 1; i < 1000; i++) { cli.advance(); Assert.assertEquals(cli1.getAdditionalHeaders().get(X_Databend_Query_ID), expectedUUID1); diff --git a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendConnection.java b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendConnection.java index 922d4dbd..4a43abea 100644 --- a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendConnection.java +++ b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendConnection.java @@ -78,6 +78,7 @@ public class DatabendConnection implements Connection, FileTransferAPI, Consumer private AtomicReference session = new AtomicReference<>(); private String routeHint = ""; + private AtomicReference lastNodeID = new AtomicReference<>(); private void initializeFileHandler() { if (this.debug()) { @@ -696,12 +697,12 @@ DatabendClient startQueryWithFailover(String sql, StageAttachment attach) throws throw new SQLException("Error start query: " + "SQL: " + sql + " " + e.getMessage() + " cause: " + e.getCause(), e); } try { - // route hint is used when transaction occured or when multi-cluster warehouse adopted(CLOUD ONLY) + // route hint is used when transaction occurred or when multi-cluster warehouse adopted(CLOUD ONLY) // on cloud case, we have gateway to handle with route hint, and will not parse URI from route hint. // transaction procedure: // 1. server return session body where txn state is active - // 2. when there is a active transaction, it will route all query to target route hint uri if exists - // 3. if there is not active transaction, it will use load balancing policy to choose a host to execute query + // 2. when there is an active transaction, it will route all query to target route hint uri if exists + // 3. if there is not an active transaction, it will use load balancing policy to choose a host to execute query String query_id = UUID.randomUUID().toString(); String candidateHost = this.driverUri.getUri(query_id).toString(); if (!inActiveTransaction()) { @@ -726,7 +727,7 @@ DatabendClient startQueryWithFailover(String sql, StageAttachment attach) throws if (this.autoDiscovery) { tryAutoDiscovery(httpClient, s); } - return new DatabendClientV1(httpClient, sql, s, this); + return new DatabendClientV1(httpClient, sql, s, this, lastNodeID); } catch (RuntimeException e1) { e = e1; } catch (Exception e1) {