Skip to content

Commit

Permalink
Allow to disable stream decompression for exec (#300)
Browse files Browse the repository at this point in the history
  • Loading branch information
slvrtrn authored Aug 22, 2024
1 parent 442392c commit 43751b0
Show file tree
Hide file tree
Showing 13 changed files with 146 additions and 52 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# 1.5.0 (Node.js)

## New features

- It is now possible to disable the automatic decompression of the response stream with the `exec` method. See `ExecParams.decompress_response_stream` for more details. ([#298](https://github.com/ClickHouse/clickhouse-js/issues/298)).

# 1.4.1 (Node.js, Web)

## Improvements
Expand Down
41 changes: 29 additions & 12 deletions packages/client-common/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,17 @@ export type ExecParams = BaseQueryParams & {
* If {@link ExecParamsWithValues.values} are defined, the query is sent as a request parameter,
* and the values are sent in the request body instead. */
query: string
/** If set to `false`, the client _will not_ decompress the response stream, even if the response compression
* was requested by the client via the {@link BaseClickHouseClientConfigOptions.compression.response } setting.
* This could be useful if the response stream is passed to another application as-is,
* and the decompression is handled there.
* @note 1) Node.js only. This setting will have no effect on the Web version.
* @note 2) In case of an error, the stream will be decompressed anyway, regardless of this setting.
* @default true */
decompress_response_stream?: boolean
}
export type ExecParamsWithValues<Stream> = ExecParams & {
/** If you have a custom INSERT statement to run with `exec`,
* the data from this stream will be inserted.
/** If you have a custom INSERT statement to run with `exec`, the data from this stream will be inserted.
*
* NB: the data in the stream is expected to be serialized accordingly to the FORMAT clause
* used in {@link ExecParams.query} in this case.
Expand Down Expand Up @@ -170,11 +177,12 @@ export class ClickHouseClient<Stream = unknown> {
}

/**
* Used for most statements that can have a response, such as SELECT.
* FORMAT clause should be specified separately via {@link QueryParams.format} (default is JSON)
* Consider using {@link ClickHouseClient.insert} for data insertion,
* or {@link ClickHouseClient.command} for DDLs.
* Used for most statements that can have a response, such as `SELECT`.
* FORMAT clause should be specified separately via {@link QueryParams.format} (default is `JSON`).
* Consider using {@link ClickHouseClient.insert} for data insertion, or {@link ClickHouseClient.command} for DDLs.
* Returns an implementation of {@link BaseResultSet}.
*
* See {@link DataFormat} for the formats supported by the client.
*/
async query<Format extends DataFormat = 'JSON'>(
params: QueryParamsWithFormat<Format>,
Expand Down Expand Up @@ -211,7 +219,9 @@ export class ClickHouseClient<Stream = unknown> {
* when the format clause is not applicable, or when you are not interested in the response at all.
* Response stream is destroyed immediately as we do not expect useful information there.
* Examples of such statements are DDLs or custom inserts.
* If you are interested in the response data, consider using {@link ClickHouseClient.exec}
*
* @note if you have a custom query that does not work with {@link ClickHouseClient.query},
* and you are interested in the response data, consider using {@link ClickHouseClient.exec}.
*/
async command(params: CommandParams): Promise<CommandResult> {
const query = removeTrailingSemi(params.query.trim())
Expand All @@ -222,18 +232,23 @@ export class ClickHouseClient<Stream = unknown> {
}

/**
* Similar to {@link ClickHouseClient.command}, but for the cases where the output is expected,
* but format clause is not applicable. The caller of this method is expected to consume the stream,
* otherwise, the request will eventually be timed out.
* Similar to {@link ClickHouseClient.command}, but for the cases where the output _is expected_,
* but format clause is not applicable. The caller of this method _must_ consume the stream,
* as the underlying socket will not be released until then, and the request will eventually be timed out.
*
* @note it is not intended to use this method to execute the DDLs, such as `CREATE TABLE` or similar;
* use {@link ClickHouseClient.command} instead.
*/
async exec(
params: ExecParams | ExecParamsWithValues<Stream>,
): Promise<ExecResult<Stream>> {
const query = removeTrailingSemi(params.query.trim())
const values = 'values' in params ? params.values : undefined
const decompress_response_stream = params.decompress_response_stream ?? true
return await this.connection.exec({
query,
values,
decompress_response_stream,
...this.withClientQueryParams(params),
})
}
Expand All @@ -242,8 +257,10 @@ export class ClickHouseClient<Stream = unknown> {
* The primary method for data insertion. It is recommended to avoid arrays in case of large inserts
* to reduce application memory consumption and consider streaming for most of such use cases.
* As the insert operation does not provide any output, the response stream is immediately destroyed.
* In case of a custom insert operation, such as, for example, INSERT FROM SELECT,
* consider using {@link ClickHouseClient.command}, passing the entire raw query there (including FORMAT clause).
*
* @note in case of a custom insert operation (e.g., `INSERT FROM SELECT`),
* consider using {@link ClickHouseClient.command}, passing the entire raw query there
* (including the `FORMAT` clause).
*/
async insert<T>(params: InsertParams<Stream, T>): Promise<InsertResult> {
if (Array.isArray(params.values) && params.values.length === 0) {
Expand Down
1 change: 1 addition & 0 deletions packages/client-common/src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export interface ConnInsertParams<Stream> extends ConnBaseQueryParams {

export interface ConnExecParams<Stream> extends ConnBaseQueryParams {
values?: Stream
decompress_response_stream?: boolean
}

export interface ConnBaseResult extends WithResponseHeaders {
Expand Down
12 changes: 6 additions & 6 deletions packages/client-common/src/utils/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ export type HttpHeaders = Record<string, HttpHeader | undefined>

export function withCompressionHeaders({
headers,
compress_request,
decompress_response,
enable_request_compression,
enable_response_compression,
}: {
headers: HttpHeaders
compress_request: boolean | undefined
decompress_response: boolean | undefined
enable_request_compression: boolean | undefined
enable_response_compression: boolean | undefined
}): Record<string, string> {
return {
...headers,
...(decompress_response ? { 'Accept-Encoding': 'gzip' } : {}),
...(compress_request ? { 'Content-Encoding': 'gzip' } : {}),
...(enable_response_compression ? { 'Accept-Encoding': 'gzip' } : {}),
...(enable_request_compression ? { 'Content-Encoding': 'gzip' } : {}),
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/client-common/src/version.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export default '1.4.1'
export default '1.5.0'
46 changes: 43 additions & 3 deletions packages/client-node/__tests__/integration/node_exec.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import type { ClickHouseClient } from '@clickhouse/client-common'
import { createTestClient } from '@test/utils'
import { guid } from '@test/utils'
import { createSimpleTable } from '@test/fixtures/simple_table'
import { createTestClient, guid } from '@test/utils'
import Stream from 'stream'
import { getAsText } from '../../src/utils'
import Zlib from 'zlib'
import { drainStream, ResultSet } from '../../src'
import { getAsText } from '../../src/utils'

describe('[Node.js] exec', () => {
let client: ClickHouseClient<Stream.Readable>
Expand Down Expand Up @@ -165,4 +165,44 @@ describe('[Node.js] exec', () => {
expect(await rs.json()).toEqual(expected)
}
})

describe('disabled stream decompression', () => {
beforeEach(() => {
client = createTestClient({
compression: {
response: true,
},
})
})

it('should get a compressed response stream without decompressing it', async () => {
const result = await client.exec({
query: 'SELECT 42 AS result FORMAT JSONEachRow',
decompress_response_stream: false,
})
const text = await getAsText(decompress(result.stream))
expect(text).toEqual('{"result":42}\n')
})

it('should force decompress in case of an error', async () => {
await expectAsync(
client.exec({
query: 'invalid',
decompress_response_stream: false,
}),
).toBeRejectedWith(
jasmine.objectContaining({
message: jasmine.stringContaining('Syntax error'),
}),
)
})

function decompress(stream: Stream.Readable) {
return Stream.pipeline(stream, Zlib.createGunzip(), (err) => {
if (err) {
console.error(err)
}
})
}
})
})
69 changes: 49 additions & 20 deletions packages/client-node/src/connection/node_base_connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ import type {
LogWriter,
ResponseHeaders,
} from '@clickhouse/client-common'
import { sleep } from '@clickhouse/client-common'
import {
isSuccessfulResponse,
parseError,
sleep,
toSearchParams,
transformUrl,
withHttpSettings,
Expand Down Expand Up @@ -63,8 +63,10 @@ export interface RequestParams {
body?: string | Stream.Readable
// provided by the user and wrapped around internally
abort_signal: AbortSignal
decompress_response?: boolean
compress_request?: boolean
enable_response_compression?: boolean
enable_request_compression?: boolean
// if there are compression headers, attempt to decompress it
try_decompress_response_stream?: boolean
parse_summary?: boolean
}

Expand All @@ -73,7 +75,6 @@ export abstract class NodeBaseConnection
{
protected readonly defaultAuthHeader: string
protected readonly defaultHeaders: Http.OutgoingHttpHeaders
protected readonly additionalHTTPHeaders: Record<string, string>

private readonly logger: LogWriter
private readonly knownSockets = new WeakMap<net.Socket, SocketInfo>()
Expand All @@ -83,12 +84,11 @@ export abstract class NodeBaseConnection
protected readonly params: NodeConnectionParams,
protected readonly agent: Http.Agent,
) {
this.additionalHTTPHeaders = params.http_headers ?? {}
this.defaultAuthHeader = `Basic ${Buffer.from(
`${params.username}:${params.password}`,
).toString('base64')}`
this.defaultHeaders = {
...this.additionalHTTPHeaders,
...(params.http_headers ?? {}),
// KeepAlive agent for some reason does not set this on its own
Connection: this.params.keep_alive.enabled ? 'keep-alive' : 'close',
'User-Agent': getUserAgent(this.params.application_id),
Expand Down Expand Up @@ -137,21 +137,23 @@ export abstract class NodeBaseConnection
)
const searchParams = toSearchParams({
database: this.params.database,
clickhouse_settings,
query_params: params.query_params,
session_id: params.session_id,
clickhouse_settings,
query_id,
})
const decompressResponse = clickhouse_settings.enable_http_compression === 1
const { controller, controllerCleanup } = this.getAbortController(params)
// allows to enforce the compression via the settings even if the client instance has it disabled
const enableResponseCompression =
clickhouse_settings.enable_http_compression === 1
try {
const { stream, response_headers } = await this.request(
{
method: 'POST',
url: transformUrl({ url: this.params.url, searchParams }),
body: params.query,
abort_signal: controller.signal,
decompress_response: decompressResponse,
enable_response_compression: enableResponseCompression,
headers: this.buildRequestHeaders(params),
},
'Query',
Expand All @@ -170,7 +172,7 @@ export abstract class NodeBaseConnection
search_params: searchParams,
err: err as Error,
extra_args: {
decompress_response: decompressResponse,
decompress_response: enableResponseCompression,
clickhouse_settings,
},
})
Expand Down Expand Up @@ -200,7 +202,7 @@ export abstract class NodeBaseConnection
url: transformUrl({ url: this.params.url, searchParams }),
body: params.values,
abort_signal: controller.signal,
compress_request: this.params.compression.compress_request,
enable_request_compression: this.params.compression.compress_request,
parse_summary: true,
headers: this.buildRequestHeaders(params),
},
Expand Down Expand Up @@ -371,16 +373,28 @@ export abstract class NodeBaseConnection
): Promise<ConnExecResult<Stream.Readable>> {
const query_id = this.getQueryId(params.query_id)
const sendQueryInParams = params.values !== undefined
const clickhouse_settings = withHttpSettings(
params.clickhouse_settings,
this.params.compression.decompress_response,
)
const toSearchParamsOptions = {
query: sendQueryInParams ? params.query : undefined,
database: this.params.database,
clickhouse_settings: params.clickhouse_settings,
query_params: params.query_params,
session_id: params.session_id,
clickhouse_settings,
query_id,
}
const searchParams = toSearchParams(toSearchParamsOptions)
const { controller, controllerCleanup } = this.getAbortController(params)
const tryDecompressResponseStream =
params.op === 'Exec'
? // allows to disable stream decompression for the `Exec` operation only
params.decompress_response_stream ??
this.params.compression.decompress_response
: // there is nothing useful in the response stream for the `Command` operation,
// and it is immediately destroyed; never decompress it
false
try {
const { stream, summary, response_headers } = await this.request(
{
Expand All @@ -389,6 +403,10 @@ export abstract class NodeBaseConnection
body: sendQueryInParams ? params.values : params.query,
abort_signal: controller.signal,
parse_summary: true,
enable_request_compression: this.params.compression.compress_request,
enable_response_compression:
this.params.compression.decompress_response,
try_decompress_response_stream: tryDecompressResponseStream,
headers: this.buildRequestHeaders(params),
},
params.op,
Expand Down Expand Up @@ -438,20 +456,30 @@ export abstract class NodeBaseConnection
): Promise<void> => {
this.logResponse(op, request, params, _response, start)

const decompressionResult = decompressResponse(_response)
if (isDecompressionError(decompressionResult)) {
return reject(decompressionResult.error)
let responseStream: Stream.Readable
const tryDecompressResponseStream =
params.try_decompress_response_stream ?? true
// even if the stream decompression is disabled, we have to decompress it in case of an error
const isFailedResponse = !isSuccessfulResponse(_response.statusCode)
if (tryDecompressResponseStream || isFailedResponse) {
const decompressionResult = decompressResponse(_response)
if (isDecompressionError(decompressionResult)) {
return reject(decompressionResult.error)
}
responseStream = decompressionResult.response
} else {
responseStream = _response
}
if (isSuccessfulResponse(_response.statusCode)) {
if (isFailedResponse) {
reject(parseError(await getAsText(responseStream)))
} else {
return resolve({
stream: decompressionResult.response,
stream: responseStream,
summary: params.parse_summary
? this.parseSummary(op, _response)
: undefined,
response_headers: { ..._response.headers },
})
} else {
reject(parseError(await getAsText(decompressionResult.response)))
}
}

Expand Down Expand Up @@ -492,7 +520,7 @@ export abstract class NodeBaseConnection
}
}

if (params.compress_request) {
if (params.enable_request_compression) {
Stream.pipeline(bodyStream, Zlib.createGzip(), request, callback)
} else {
Stream.pipeline(bodyStream, request, callback)
Expand Down Expand Up @@ -626,4 +654,5 @@ interface SocketInfo {
type RunExecParams = ConnBaseQueryParams & {
op: 'Exec' | 'Command'
values?: ConnExecParams<Stream.Readable>['values']
decompress_response_stream?: boolean
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ export class NodeCustomAgentConnection extends NodeBaseConnection {
protected createClientRequest(params: RequestParams): Http.ClientRequest {
const headers = withCompressionHeaders({
headers: params.headers,
compress_request: params.compress_request,
decompress_response: params.decompress_response,
enable_request_compression: params.enable_request_compression,
enable_response_compression: params.enable_response_compression,
})
return Http.request(params.url, {
method: params.method,
Expand Down
4 changes: 2 additions & 2 deletions packages/client-node/src/connection/node_http_connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ export class NodeHttpConnection extends NodeBaseConnection {
protected createClientRequest(params: RequestParams): Http.ClientRequest {
const headers = withCompressionHeaders({
headers: params.headers,
compress_request: params.compress_request,
decompress_response: params.decompress_response,
enable_request_compression: params.enable_request_compression,
enable_response_compression: params.enable_response_compression,
})
return Http.request(params.url, {
method: params.method,
Expand Down
Loading

0 comments on commit 43751b0

Please sign in to comment.