Skip to content

Commit

Permalink
fix: try updateClientSession on each response.
Browse files Browse the repository at this point in the history
  • Loading branch information
youngsofun committed Apr 17, 2024
1 parent 23ec0bf commit 141e98b
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
import java.util.function.Consumer;

import static com.databend.client.JsonCodec.jsonCodec;
import static com.google.common.base.MoreObjects.firstNonNull;
Expand Down Expand Up @@ -69,13 +70,16 @@ public class DatabendClientV1
private final AtomicReference<QueryResults> currentResults = new AtomicReference<>(null);
private static final Logger logger = Logger.getLogger(DatabendClientV1.class.getPackage().getName());

public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings settings) {
private Consumer<DatabendSession> on_session_state_update;

public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings settings, Consumer<DatabendSession> on_session_state_update) {
requireNonNull(httpClient, "httpClient is null");
requireNonNull(sql, "sql is null");
requireNonNull(settings, "settings is null");
requireNonNull(settings.getHost(), "settings.host is null");
this.httpClient = httpClient;
this.query = sql;
this.on_session_state_update = on_session_state_update;
this.host = settings.getHost();
this.paginationOptions = settings.getPaginationOptions();
this.requestTimeoutSecs = settings.getQueryTimeoutSecs();
Expand Down Expand Up @@ -202,12 +206,16 @@ public boolean execute(Request request) {
}

private void processResponse(Headers headers, QueryResults results) {
if (results.getSession() != null) {
databendSession.set(results.getSession());
DatabendSession session = results.getSession();
if (session != null) {
databendSession.set(session);
this.on_session_state_update.accept(session);
}
if (results.getQueryId() != null && this.additonalHeaders.get(ClientSettings.X_Databend_Query_ID) == null) {
this.additonalHeaders.put(ClientSettings.X_Databend_Query_ID, results.getQueryId());
}
if (this.on_session_state_update != null) {
}
currentResults.set(results);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void testBasicQueryPagination() {
OkHttpClient client = new OkHttpClient.Builder().addInterceptor(OkHttpUtils.basicAuthInterceptor("databend", "databend")).build();

ClientSettings settings = new ClientSettings(DATABEND_HOST);
DatabendClient cli = new DatabendClientV1(client, "select 1", settings);
DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null);
System.out.println(cli.getResults().getData());
Assert.assertEquals(cli.getQuery(), "select 1");
Assert.assertEquals(cli.getSession().getDatabase(), DATABASE);
Expand All @@ -57,15 +57,15 @@ public void testBasicQueryIDHeader() {
Map<String, String> additionalHeaders = new HashMap<>();
additionalHeaders.put(X_Databend_Query_ID, expectedUUID);
ClientSettings settings = new ClientSettings(DATABEND_HOST, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), additionalHeaders, null, DEFAULT_RETRY_ATTEMPTS);
DatabendClient cli = new DatabendClientV1(client, "select 1", settings);
DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null);
Assert.assertEquals(cli.getAdditionalHeaders().get(X_Databend_Query_ID), expectedUUID);

String expectedUUID1 = UUID.randomUUID().toString();
Map<String, String> additionalHeaders1 = new HashMap<>();
additionalHeaders1.put(X_Databend_Query_ID, expectedUUID1);
ClientSettings settings1 = new ClientSettings(DATABEND_HOST, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), additionalHeaders1, null, DEFAULT_RETRY_ATTEMPTS);
// check X_Databend_Query_ID won't change after calling next()
DatabendClient cli1 = new DatabendClientV1(client, "SELECT number from numbers(200000) order by number", settings1);
DatabendClient cli1 = new DatabendClientV1(client, "SELECT number from numbers(200000) order by number", settings, null);
for (int i = 1; i < 1000; i++) {
cli.advance();
Assert.assertEquals(cli1.getAdditionalHeaders().get(X_Databend_Query_ID), expectedUUID1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.zip.GZIPOutputStream;
Expand All @@ -49,7 +50,7 @@
import static java.util.Collections.newSetFromMap;
import static java.util.Objects.requireNonNull;

public class DatabendConnection implements Connection, FileTransferAPI {
public class DatabendConnection implements Connection, FileTransferAPI, Consumer<DatabendSession> {
private static final Logger logger = Logger.getLogger(DatabendConnection.class.getPackage().getName());
private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicBoolean autoCommit = new AtomicBoolean(true);
Expand Down Expand Up @@ -560,9 +561,12 @@ public void PingDatabendClientV1() throws SQLException {
}
}

public void accept(DatabendSession session) {
setSession(session);
}

DatabendClient startQuery(String sql) throws SQLException {
return new DatabendClientV1(httpClient, sql, makeClientSettings());
return new DatabendClientV1(httpClient, sql, makeClientSettings(), this);
}

DatabendClient startQuery(String sql, StageAttachment attach) throws SQLException {
Expand All @@ -578,7 +582,7 @@ DatabendClient startQuery(String sql, StageAttachment attach) throws SQLExceptio
setAdditionalHeaders(additionalHeaders).
setStageAttachment(attach).
build();
return new DatabendClientV1(httpClient, sql, s);
return new DatabendClientV1(httpClient, sql, s, this);
}

private ClientSettings makeClientSettings() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ private void updateClientSession(QueryResults q) {
}

final boolean internalExecute(String sql, StageAttachment attachment) throws SQLException {
// System.out.println(sql);
clearCurrentResults();
checkOpen();
DatabendClient client = null;
Expand All @@ -192,7 +193,6 @@ final boolean internalExecute(String sql, StageAttachment attachment) throws SQL
throw resultsException(client.getResults(), sql);
}
}
updateClientSession(client.getResults());
if (isQueryStatement(sql)) {
currentUpdateCount = -1;// Always -1 when returning a ResultSet with query statement
} else {
Expand Down

0 comments on commit 141e98b

Please sign in to comment.