Skip to content

Commit

Permalink
feat: support query forwards.
Browse files Browse the repository at this point in the history
  • Loading branch information
youngsofun committed Oct 18, 2024
1 parent 558c070 commit baa3a28
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class ClientSettings {
public static final int DEFAULT_RETRY_ATTEMPTS = 5;
public static final String X_Databend_Query_ID = "X-DATABEND-QUERY-ID";
public static final String X_DATABEND_ROUTE_HINT = "X-DATABEND-ROUTE-HINT";
public static final String X_DATABEND_STICKY_NODE = "X-DATABEND-STICKY-NODE";
public static final String DatabendWarehouseHeader = "X-DATABEND-WAREHOUSE";
public static final String DatabendTenantHeader = "X-DATABEND-TENANT";
private final String host;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,13 @@ public class DatabendClientV1
private final Map<String, String> additonalHeaders;
// client session
private final AtomicReference<DatabendSession> databendSession;
private String nodeID;
private final AtomicReference<QueryResults> currentResults = new AtomicReference<>(null);
private static final Logger logger = Logger.getLogger(DatabendClientV1.class.getPackage().getName());

private Consumer<DatabendSession> on_session_state_update;

public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings settings, Consumer<DatabendSession> on_session_state_update) {
public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings settings, Consumer<DatabendSession> on_session_state_update, AtomicReference<String> last_node_id) {
requireNonNull(httpClient, "httpClient is null");
requireNonNull(sql, "sql is null");
requireNonNull(settings, "settings is null");
Expand All @@ -87,11 +88,13 @@ public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings sett
this.maxRetryAttempts = settings.getRetryAttempts();
// need atomic reference since it may get updated when query returned.
this.databendSession = new AtomicReference<>(settings.getSession());
this.nodeID = last_node_id.get();
Request request = buildQueryRequest(query, settings);
boolean completed = this.execute(request);
if (!completed) {
throw new RuntimeException("Query failed to complete");
}
last_node_id.set(this.nodeID);
}

public static List<DiscoveryNode> discoverNodes(OkHttpClient httpClient, ClientSettings settings) {
Expand Down Expand Up @@ -127,8 +130,13 @@ private Request buildQueryRequest(String query, ClientSettings settings) {
if (reqString == null || reqString.isEmpty()) {
throw new IllegalArgumentException("Invalid request: " + req);
}

url = url.newBuilder().encodedPath(QUERY_PATH).build();
Request.Builder builder = prepareRequest(url, this.additonalHeaders);
DatabendSession session = databendSession.get();
if (session != null && session.getNeedSticky()) {
builder.addHeader(ClientSettings.X_DATABEND_STICKY_NODE, nodeID);
}
return builder.post(okhttp3.RequestBody.create(MEDIA_TYPE_JSON, reqString)).build();
}

Expand Down Expand Up @@ -299,6 +307,7 @@ public boolean execute(Request request) {
}

private void processResponse(Headers headers, QueryResults results) {
nodeID = results.getNodeId();
DatabendSession session = results.getSession();
if (session != null) {
databendSession.set(session);
Expand Down Expand Up @@ -332,6 +341,7 @@ public boolean advance() {
HttpUrl url = HttpUrl.get(this.host);
url = url.newBuilder().encodedPath(nextUriPath).build();
Request.Builder builder = prepareRequest(url, this.additonalHeaders);
builder.addHeader(ClientSettings.X_DATABEND_STICKY_NODE, this.nodeID);
Request request = builder.get().build();
return executeInternal(request, OptionalLong.of(MAX_MATERIALIZED_JSON_RESPONSE_SIZE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,25 @@ public class DatabendSession {

// txn
private String txnState;
private Boolean needSticky;

private Map<String, Object> additionalProperties = new HashMap<>();

@JsonCreator
public DatabendSession(
@JsonProperty("database") String database,
@JsonProperty("settings") Map<String, String> settings,
@JsonProperty("txn_state") String txnState) {
@JsonProperty("txn_state") String txnState,
@JsonProperty("need_sticky") Boolean needSticky) {
this.database = database;
this.settings = settings;
this.txnState = txnState;
this.needSticky = needSticky;
}

// default
public static DatabendSession createDefault() {
return new DatabendSession(DEFAULT_DATABASE, null, null);
return new DatabendSession(DEFAULT_DATABASE, null, null, false);
}

public static Builder builder() {
Expand All @@ -79,6 +82,11 @@ public String getTxnState() {
return txnState;
}

@JsonProperty("need_sticky")
public Boolean getNeedSticky() {
return needSticky;
}

@JsonAnyGetter
public Map<String, Object> getAdditionalProperties() {
return additionalProperties;
Expand Down Expand Up @@ -145,7 +153,7 @@ public void setAutoCommit(boolean autoCommit) {
}

public DatabendSession build() {
return new DatabendSession(database, settings, txnState);
return new DatabendSession(database, settings, txnState, false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

public class QueryResults {
private final String queryId;
private final String nodeId;
private final String sessionId;
private final DatabendSession session;
private final List<QueryRowField> schema;
Expand All @@ -42,6 +43,7 @@ public class QueryResults {
@JsonCreator
public QueryResults(
@JsonProperty("id") String queryId,
@JsonProperty("node_id") String nodeId,
@JsonProperty("session_id") String sessionId,
@JsonProperty("session") DatabendSession session,
@JsonProperty("schema") List<QueryRowField> schema,
Expand All @@ -55,6 +57,7 @@ public QueryResults(
@JsonProperty("next_uri") URI nextUri,
@JsonProperty("kill_uri") URI killUri) {
this.queryId = queryId;
this.nodeId = nodeId;
this.sessionId = sessionId;
this.session = session;
this.schema = schema;
Expand All @@ -76,6 +79,11 @@ public String getQueryId() {
return queryId;
}

@JsonProperty
public String getNodeId() {
return nodeId;
}

@JsonProperty
public String getSessionId() {
return sessionId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;

import static com.databend.client.ClientSettings.*;

Expand All @@ -37,7 +38,8 @@ public void testBasicQueryPagination() {
OkHttpClient client = new OkHttpClient.Builder().addInterceptor(OkHttpUtils.basicAuthInterceptor("databend", "databend")).build();

ClientSettings settings = new ClientSettings(DATABEND_HOST);
DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null);
AtomicReference<String> lastNodeID = new AtomicReference<>();
DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null, lastNodeID);
System.out.println(cli.getResults().getData());
Assert.assertEquals(cli.getQuery(), "select 1");
Assert.assertEquals(cli.getSession().getDatabase(), DATABASE);
Expand All @@ -55,8 +57,10 @@ public void testConnectionRefused() {
OkHttpClient client = new OkHttpClient.Builder().addInterceptor(OkHttpUtils.basicAuthInterceptor("databend", "databend")).build();
ClientSettings settings = new ClientSettings("http://localhost:13191");

AtomicReference<String> lastNodeID = new AtomicReference<>();

try {
DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null);
DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null, lastNodeID);
cli.getResults(); // This should trigger the connection attempt
Assert.fail("Expected exception was not thrown");
} catch (Exception e) {
Expand All @@ -71,11 +75,12 @@ public void testConnectionRefused() {
public void testBasicQueryIDHeader() {
OkHttpClient client = new OkHttpClient.Builder().addInterceptor(OkHttpUtils.basicAuthInterceptor("databend", "databend")).build();
String expectedUUID = UUID.randomUUID().toString();
AtomicReference<String> lastNodeID = new AtomicReference<>();

Map<String, String> additionalHeaders = new HashMap<>();
additionalHeaders.put(X_Databend_Query_ID, expectedUUID);
ClientSettings settings = new ClientSettings(DATABEND_HOST, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), additionalHeaders, null, DEFAULT_RETRY_ATTEMPTS);
DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null);
DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null, lastNodeID);
Assert.assertEquals(cli.getAdditionalHeaders().get(X_Databend_Query_ID), expectedUUID);

String expectedUUID1 = UUID.randomUUID().toString();
Expand All @@ -84,7 +89,7 @@ public void testBasicQueryIDHeader() {
ClientSettings settings1 = new ClientSettings(DATABEND_HOST, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), additionalHeaders1, null, DEFAULT_RETRY_ATTEMPTS);
Assert.assertEquals(cli.getAdditionalHeaders().get(X_Databend_Query_ID), expectedUUID);
// check X_Databend_Query_ID won't change after calling next()
DatabendClient cli1 = new DatabendClientV1(client, "SELECT number from numbers(200000) order by number", settings1, null);
DatabendClient cli1 = new DatabendClientV1(client, "SELECT number from numbers(200000) order by number", settings1, null, lastNodeID);
for (int i = 1; i < 1000; i++) {
cli.advance();
Assert.assertEquals(cli1.getAdditionalHeaders().get(X_Databend_Query_ID), expectedUUID1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class DatabendConnection implements Connection, FileTransferAPI, Consumer
private AtomicReference<DatabendSession> session = new AtomicReference<>();

private String routeHint = "";
private AtomicReference<String> lastNodeID = new AtomicReference<>();

private void initializeFileHandler() {
if (this.debug()) {
Expand Down Expand Up @@ -696,12 +697,12 @@ DatabendClient startQueryWithFailover(String sql, StageAttachment attach) throws
throw new SQLException("Error start query: " + "SQL: " + sql + " " + e.getMessage() + " cause: " + e.getCause(), e);
}
try {
// route hint is used when transaction occured or when multi-cluster warehouse adopted(CLOUD ONLY)
// route hint is used when transaction occurred or when multi-cluster warehouse adopted(CLOUD ONLY)
// on cloud case, we have gateway to handle with route hint, and will not parse URI from route hint.
// transaction procedure:
// 1. server return session body where txn state is active
// 2. when there is a active transaction, it will route all query to target route hint uri if exists
// 3. if there is not active transaction, it will use load balancing policy to choose a host to execute query
// 2. when there is an active transaction, it will route all query to target route hint uri if exists
// 3. if there is not an active transaction, it will use load balancing policy to choose a host to execute query
String query_id = UUID.randomUUID().toString();
String candidateHost = this.driverUri.getUri(query_id).toString();
if (!inActiveTransaction()) {
Expand All @@ -726,7 +727,7 @@ DatabendClient startQueryWithFailover(String sql, StageAttachment attach) throws
if (this.autoDiscovery) {
tryAutoDiscovery(httpClient, s);
}
return new DatabendClientV1(httpClient, sql, s, this);
return new DatabendClientV1(httpClient, sql, s, this, lastNodeID);
} catch (RuntimeException e1) {
e = e1;
} catch (Exception e1) {
Expand Down

0 comments on commit baa3a28

Please sign in to comment.