Skip to content

Commit

Permalink
Timeout fixes (#82)
Browse files Browse the repository at this point in the history
* Timeout fixes

* fix properties

* fix e2e

* formatting
  • Loading branch information
amshalev authored Aug 2, 2020
1 parent fe0cded commit 3a61f7e
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 51 deletions.
2 changes: 1 addition & 1 deletion azure-kusto-data/example.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ kustoClient.execute("db", "TableName | limit 1", (err, results) => {
// go to https://docs.microsoft.com/en-us/azure/kusto/api/netfx/request-properties#list-of-clientrequestproperties
let clientRequestProps = new ClientRequestProperties();
const ONE_MINUTE = 1000 * 60;
clientRequestProps.setOption("servertimeout", ONE_MINUTE);
clientRequestProps.setTimeout(ONE_MINUTE);

// having client code provide its own clientRequestId is
// highly recommended. It not only allows the caller to
Expand Down
58 changes: 32 additions & 26 deletions azure-kusto-data/source/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ const ConnectionStringBuilder = require("./connectionBuilder");
const ClientRequestProperties = require("./clientRequestProperties");
const pkg = require("../package.json");

const COMMAND_TIMEOUT_IN_MILLISECS = moment.duration(10.5, "minutes").asMilliseconds();
const QUERY_TIMEOUT_IN_MILLISECS = moment.duration(4.5, "minutes").asMilliseconds();
const CLIENT_SERVER_DELTA_IN_MILLISECS = moment.duration(0.5, "minutes").asMilliseconds();

module.exports = class KustoClient {
constructor(kcsb) {
Expand Down Expand Up @@ -57,11 +60,13 @@ module.exports = class KustoClient {
_execute(endpoint, db, query, stream, callback, properties) {
let headers = {};
Object.assign(headers, this.headers);

let payload;
let clientRequestPrefix = "";
let clientRequestId;

let timeout = this._getClientTimeout(endpoint, properties);

if (query != null) {
payload = {
"db": db,
Expand All @@ -72,50 +77,38 @@ module.exports = class KustoClient {
payload.properties = properties.toJson();
clientRequestId = properties.clientRequestId;
}

payload = JSON.stringify(payload);

headers["Content-Type"] = "application/json; charset=utf-8";
clientRequestPrefix = "KNC.execute;";
} else if (stream != null) {
payload = stream;
clientRequestPrefix = "KNC.executeStreamingIngest;";
headers["Content-Encoding"] = "gzip";
headers["Content-Encoding"] = "gzip";
}

headers["x-ms-client-request-id"] = clientRequestId || clientRequestPrefix + `${uuidv4()}`;

let timeout = this._getTimeout(endpoint, properties);

return this.aadHelper.getAuthHeader((err, authHeader) => {
if (err) return callback(err);

headers["Authorization"] = authHeader;

return request({
method: "POST",
url: endpoint,
headers,
body: payload,
gzip: true,
timeout
}, this._getRequestCallback(properties, callback)
);
return this._doRequest(endpoint, headers, payload, timeout, properties, callback);
});
}

_getTimeout(endpoint, properties) {
let timeout = null;
if (properties != null) {
timeout = properties instanceof ClientRequestProperties ? properties.getTimeout()
: properties.timeout;
}

if (timeout == null) {
let timeoutInMinutes = endpoint == this.endpoints.mgmt ? 10.5 : 4.5;
timeout = moment.duration(timeoutInMinutes, "minutes").asMilliseconds();
}
return timeout;
_doRequest(endpoint, headers, payload, timeout, properties, callback) {
return request({
method: "POST",
url: endpoint,
headers,
body: payload,
gzip: true,
timeout
}, this._getRequestCallback(properties, callback)
);
}

_getRequestCallback(properties, callback) {
Expand Down Expand Up @@ -150,4 +143,17 @@ module.exports = class KustoClient {
}
};
}

_getClientTimeout(endpoint, properties) {
let timeout = null;
if (properties != null) {
var serverTimeout = properties instanceof ClientRequestProperties ? properties.getTimeout() : properties.timeout;
if (serverTimeout != null) {
return serverTimeout + CLIENT_SERVER_DELTA_IN_MILLISECS;
}
}

timeout = endpoint == this.endpoints.query ? QUERY_TIMEOUT_IN_MILLISECS : COMMAND_TIMEOUT_IN_MILLISECS;
return timeout;
}
};
16 changes: 16 additions & 0 deletions azure-kusto-data/source/clientRequestProperties.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ module.exports = class ClientRequestProperties {

if (Object.keys(this._options).length !== 0) {
json.Options = this._options;
if (json.Options.servertimeout) {
json.Options.servertimeout = this._msToTimespan(json.Options.servertimeout);
}
}

if (Object.keys(this._parameters).length !== 0) {
Expand All @@ -64,4 +67,17 @@ module.exports = class ClientRequestProperties {
toString() {
return JSON.stringify(this.toJson());
}

_msToTimespan(duration) {
var milliseconds = parseInt((duration % 1000) / 100)
, seconds = parseInt((duration / 1000) % 60)
, minutes = parseInt((duration / (1000 * 60)) % 60)
, hours = parseInt((duration / (1000 * 60 * 60)) % 24);

hours = (hours < 10) ? "0" + hours : hours;
minutes = (minutes < 10) ? "0" + minutes : minutes;
seconds = (seconds < 10) ? "0" + seconds : seconds;

return hours + ":" + minutes + ":" + seconds + "." + milliseconds;
}
};
50 changes: 27 additions & 23 deletions azure-kusto-data/test/clientTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const v2ResponseError = require("./data/response/v2error");
const v1Response = require("./data/response/v1");
const v1_2Response = require("./data/response/v1_2");
const uuidv4 = require("uuid/v4");
const moment = require("moment");

const KustoClient = require("../source/client");
const KustoClientRequestProperties = require("../source/clientRequestProperties");
Expand Down Expand Up @@ -78,45 +79,48 @@ describe("KustoClient", function () {
reqCb(null, { statusCode: 200, request: { path: "/v2/query/" } }, v2Response);
});

it("setTimout for request", function () {
it("setTimout for request", function (done) {
let url = "https://cluster.kusto.windows.net";
let client = new KustoClient(url);

let clientRequestProps = new KustoClientRequestProperties();
clientRequestProps.setTimeout(10);
let timeoutMs = moment.duration(2.51, "minutes").asMilliseconds();
clientRequestProps.setTimeout(timeoutMs);
client.aadHelper.getAuthHeader = (callback) => callback(null, "MockToken");
client._doRequest = (endpoint, payload, timeout, properties, callback) => {
assert.equal(properties.toString(), JSON.stringify({ "Options": { "servertimeout": 10 } }));
callback(v2Response);
client._doRequest = (endpoint, headers, payload, timeout, properties, callback) => {
let payloadObj = JSON.parse(payload);
assert.equal(payloadObj.properties.Options.servertimeout, "00:02:30.6");
assert.equal(timeout, timeoutMs + moment.duration(0.5, "minutes").asMilliseconds());
done();
};

client.execute("Database", "Table | count" , () => {}, clientRequestProps);
client.execute("Database", "Table | count", () => { }, clientRequestProps);
});

it("default timeout for query", function () {
it("default timeout for query", function (done) {
let url = "https://cluster.kusto.windows.net";
let client = new KustoClient(url);

client.aadHelper.getAuthHeader = (callback) => callback(null, "MockToken");
client._doRequest = (endpoint, payload, timeout, properties, callback) => {
assert.equal(timeout, client._getDefaultQueryTimeout());
callback(v2Response);
client._doRequest = (endpoint, headers, payload, timeout, properties, callback) => {
assert.equal(timeout, moment.duration(4.5, "minutes").asMilliseconds());
done();
};

client.execute("Database", "Table | count" , () => {});
client.execute("Database", "Table | count", () => { });
});

it("default timeout for admin", function () {
it("default timeout for admin", function (done) {
let url = "https://cluster.kusto.windows.net";
let client = new KustoClient(url);

client.aadHelper.getAuthHeader = (callback) => callback(null, "MockToken");
client._doRequest = (endpoint, payload, timeout, properties, callback) => {
assert.equal(timeout, client._getDefaultCommandTimeout());
callback(v2Response);
client._doRequest = (endpoint, headers, payload, timeout, properties, callback) => {
assert.equal(timeout, moment.duration(10.5, "minutes").asMilliseconds());
done();
};

client.execute("Database", ".show database DataBase schema" , () => {});
client.execute("Database", ".show database DataBase schema", () => { });
});

it("erred v2 not partial", function (done) {
Expand Down Expand Up @@ -174,20 +178,20 @@ describe("KustoClient", function () {
reqCb(null, { statusCode: 200, request: { path: "/v2/query/" } }, JSON.stringify({}));
});

it("set clientRequestId for request", function () {
it("set clientRequestId for request", function (done) {
let url = "https://cluster.kusto.windows.net";
let client = new KustoClient(url);
const clientRequestId = `MyApp.MyActivity;${uuidv4()}`;

let clientRequestProps = new KustoClientRequestProperties();
clientRequestProps.clientRequestId = clientRequestId;
client.aadHelper.getAuthHeader = (callback) => callback(null, "MockToken");
client._doRequest = (endpoint, payload, timeout, properties, callback) => {
assert.equal(properties.clientRequestId, clientRequestId);
callback(v2Response);
client._doRequest = (endpoint, headers, payload, timeout, properties, callback) => {
assert.equal(headers["x-ms-client-request-id"], clientRequestId);
done();
};

client.execute("Database", "Table | count" , () => {}, clientRequestProps);
client.execute("Database", "Table | count", () => { }, clientRequestProps);
});
});
});
2 changes: 1 addition & 1 deletion azure-kusto-ingest/test/e2eTests/e2eTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const { IngestionProperties, DataFormat } = require("../../source/ingestionPrope
const databaseName = process.env.TEST_DATABASE;
const appId = process.env.APP_ID;
const appKey = process.env.APP_KEY;
const tenantId = process.env.TENANT_Is;
const tenantId = process.env.TENANT_ID;

if(!databaseName || !appId || !appKey || !tenantId){
process.stdout.write("Skip E2E test - Missing env variables");
Expand Down

0 comments on commit 3a61f7e

Please sign in to comment.