From f2781baf5d96b934b4ab989f14336b3928f1130a Mon Sep 17 00:00:00 2001 From: Serge Klochkov <3175289+slvrtrn@users.noreply.github.com> Date: Sat, 5 Oct 2024 16:38:25 +0200 Subject: [PATCH] JSONEachRowWithProgress (#334) --- CHANGELOG.md | 1 + examples/README.md | 1 + .../select_json_each_row_with_progress.ts | 39 +++++++++++++ .../__tests__/unit/clickhouse_types.test.ts | 29 ++++++++++ .../client-common/src/clickhouse_types.ts | 16 ++++++ .../src/data_formatter/formatter.ts | 1 + packages/client-common/src/index.ts | 3 + packages/client-common/src/result.ts | 56 +++++++++++-------- .../node_query_format_types.test.ts | 3 +- .../node_stream_json_formats.test.ts | 22 +++++++- packages/client-node/src/index.ts | 3 + packages/client-node/src/result_set.ts | 2 +- .../integration/web_select_streaming.test.ts | 19 +++++++ packages/client-web/src/index.ts | 3 + packages/client-web/src/result_set.ts | 2 +- 15 files changed, 173 insertions(+), 27 deletions(-) create mode 100644 examples/node/select_json_each_row_with_progress.ts create mode 100644 packages/client-common/__tests__/unit/clickhouse_types.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 0dd46f8f..1ef56960 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ ## New features +- Added `JSONEachRowWithProgress` format support, `ProgressRow` interface, and `isProgressRow` type guard. See [this Node.js example](./examples/node/select_json_each_row_with_progress.ts) for more details. It should work similarly with the Web version. - (Experimental) Exposed the `parseColumnType` function that takes a string representation of a ClickHouse type (e.g., `FixedString(16)`, `Nullable(Int32)`, etc.) and returns an AST-like object that represents the type. For example: ```ts diff --git a/examples/README.md b/examples/README.md index 50baa1f5..225eff8a 100644 --- a/examples/README.md +++ b/examples/README.md @@ -57,6 +57,7 @@ If something is missing, or you found a mistake in one of these examples, please - [select_streaming_json_each_row.ts](node/select_streaming_json_each_row.ts) - (Node.js only) streaming JSON\* formats from ClickHouse and processing it with `on('data')` event. - [select_streaming_json_each_row_for_await.ts](node/select_streaming_json_each_row_for_await.ts) - (Node.js only) similar to [select_streaming_json_each_row.ts](node/select_streaming_json_each_row.ts), but using the `for await` loop syntax. - [select_streaming_text_line_by_line.ts](node/select_streaming_text_line_by_line.ts) - (Node.js only) streaming text formats from ClickHouse and processing it line by line. In this example, CSV format is used. +- [select_json_each_row_with_progress.ts](node/select_json_each_row_with_progress.ts) - streaming using `JSONEachRowWithProgress` format, checking for the progress rows in the stream. #### Data types diff --git a/examples/node/select_json_each_row_with_progress.ts b/examples/node/select_json_each_row_with_progress.ts new file mode 100644 index 00000000..90664368 --- /dev/null +++ b/examples/node/select_json_each_row_with_progress.ts @@ -0,0 +1,39 @@ +import { createClient } from '@clickhouse/client' +import { isProgressRow } from '@clickhouse/client-common' + +/** See the format spec - https://clickhouse.com/docs/en/interfaces/formats#jsoneachrowwithprogress + * When JSONEachRowWithProgress format is used in TypeScript, + * the ResultSet should infer the final row type as `{ row: Data } | ProgressRow`. */ +type Data = { number: string } + +void (async () => { + const client = createClient() + const rs = await client.query({ + query: 'SELECT number FROM system.numbers LIMIT 100', + format: 'JSONEachRowWithProgress', + }) + + let totalRows = 0 + let totalProgressRows = 0 + + const stream = rs.stream() + for await (const rows of stream) { + for (const row of rows) { + const decodedRow = row.json() + if (isProgressRow(decodedRow)) { + console.log('Got a progress row:', decodedRow) + totalProgressRows++ + } else { + totalRows++ + if (totalRows % 100 === 0) { + console.log('Sample row:', decodedRow) + } + } + } + } + + console.log('Total rows:', totalRows) + console.log('Total progress rows:', totalProgressRows) + + await client.close() +})() diff --git a/packages/client-common/__tests__/unit/clickhouse_types.test.ts b/packages/client-common/__tests__/unit/clickhouse_types.test.ts new file mode 100644 index 00000000..96b43fff --- /dev/null +++ b/packages/client-common/__tests__/unit/clickhouse_types.test.ts @@ -0,0 +1,29 @@ +import { isProgressRow } from '@clickhouse/client-common' + +describe('ClickHouse types', () => { + it('should check if a row is progress row', async () => { + const row = { + progress: { + read_rows: '1', + read_bytes: '1', + written_rows: '1', + written_bytes: '1', + total_rows_to_read: '1', + result_rows: '1', + result_bytes: '1', + elapsed_ns: '1', + }, + } + expect(isProgressRow(row)).toBeTruthy() + expect(isProgressRow({})).toBeFalsy() + expect( + isProgressRow({ + ...row, + extra: 'extra', + }), + ).toBeFalsy() + expect(isProgressRow(null)).toBeFalsy() + expect(isProgressRow(42)).toBeFalsy() + expect(isProgressRow({ foo: 'bar' })).toBeFalsy() + }) +}) diff --git a/packages/client-common/src/clickhouse_types.ts b/packages/client-common/src/clickhouse_types.ts index c436270f..846888b9 100644 --- a/packages/client-common/src/clickhouse_types.ts +++ b/packages/client-common/src/clickhouse_types.ts @@ -40,3 +40,19 @@ export interface WithClickHouseSummary { export interface WithResponseHeaders { response_headers: ResponseHeaders } + +/** X-ClickHouse-Summary response header and progress rows from JSONEachRowWithProgress share the same structure */ +export interface ProgressRow { + progress: ClickHouseSummary +} + +/** Type guard to use with JSONEachRowWithProgress, checking if the emitted row is a progress row. + * @see https://clickhouse.com/docs/en/interfaces/formats#jsoneachrowwithprogress */ +export function isProgressRow(row: unknown): row is ProgressRow { + return ( + row !== null && + typeof row === 'object' && + 'progress' in row && + Object.keys(row).length === 1 + ) +} diff --git a/packages/client-common/src/data_formatter/formatter.ts b/packages/client-common/src/data_formatter/formatter.ts index f4b92830..c6aa7389 100644 --- a/packages/client-common/src/data_formatter/formatter.ts +++ b/packages/client-common/src/data_formatter/formatter.ts @@ -7,6 +7,7 @@ export const StreamableJSONFormats = [ 'JSONCompactEachRowWithNamesAndTypes', 'JSONCompactStringsEachRowWithNames', 'JSONCompactStringsEachRowWithNamesAndTypes', + 'JSONEachRowWithProgress', ] as const export const RecordsJSONFormats = ['JSONObjectEachRow'] as const export const SingleDocumentJSONFormats = [ diff --git a/packages/client-common/src/index.ts b/packages/client-common/src/index.ts index b7eb270a..3bbc2621 100644 --- a/packages/client-common/src/index.ts +++ b/packages/client-common/src/index.ts @@ -16,6 +16,7 @@ export { export { type BaseClickHouseClientConfigOptions } from './config' export type { Row, + RowOrProgress, BaseResultSet, ResultJSONType, RowJSONType, @@ -51,7 +52,9 @@ export type { ResponseHeaders, WithClickHouseSummary, WithResponseHeaders, + ProgressRow, } from './clickhouse_types' +export { isProgressRow } from './clickhouse_types' export { type ClickHouseSettings, type MergeTreeSettings, diff --git a/packages/client-common/src/result.ts b/packages/client-common/src/result.ts index efdfa077..e1eb14ea 100644 --- a/packages/client-common/src/result.ts +++ b/packages/client-common/src/result.ts @@ -1,4 +1,8 @@ -import type { ResponseHeaders, ResponseJSON } from './clickhouse_types' +import type { + ProgressRow, + ResponseHeaders, + ResponseJSON, +} from './clickhouse_types' import type { DataFormat, RawDataFormat, @@ -8,6 +12,8 @@ import type { StreamableJSONDataFormat, } from './data_formatter' +export type RowOrProgress = { row: T } | ProgressRow + export type ResultStream = // JSON*EachRow (except JSONObjectEachRow), CSV, TSV etc. Format extends StreamableDataFormat @@ -22,29 +28,35 @@ export type ResultStream = Stream export type ResultJSONType = - // JSON*EachRow formats except JSONObjectEachRow - F extends StreamableJSONDataFormat - ? T[] - : // JSON formats with known layout { data, meta, statistics, ... } - F extends SingleDocumentJSONFormat - ? ResponseJSON - : // JSON formats represented as a Record - F extends RecordsJSONFormat - ? Record - : // CSV, TSV etc. - cannot be represented as JSON - F extends RawDataFormat - ? never - : // happens only when Format could not be inferred from a literal - T[] | Record | ResponseJSON + // Emits either a { row: T } or an object with progress + F extends 'JSONEachRowWithProgress' + ? RowOrProgress[] + : // JSON*EachRow formats except JSONObjectEachRow + F extends StreamableJSONDataFormat + ? T[] + : // JSON formats with known layout { data, meta, statistics, ... } + F extends SingleDocumentJSONFormat + ? ResponseJSON + : // JSON formats represented as a Record + F extends RecordsJSONFormat + ? Record + : // CSV, TSV etc. - cannot be represented as JSON + F extends RawDataFormat + ? never + : // happens only when Format could not be inferred from a literal + T[] | Record | ResponseJSON export type RowJSONType = - // JSON*EachRow formats - F extends StreamableJSONDataFormat - ? T - : // CSV, TSV, non-streamable JSON formats - cannot be streamed as JSON - F extends RawDataFormat | SingleDocumentJSONFormat | RecordsJSONFormat - ? never - : T // happens only when Format could not be inferred from a literal + // Emits either a { row: T } or an object with progress + F extends 'JSONEachRowWithProgress' + ? RowOrProgress + : // JSON*EachRow formats + F extends StreamableJSONDataFormat + ? T + : // CSV, TSV, non-streamable JSON formats - cannot be streamed as JSON + F extends RawDataFormat | SingleDocumentJSONFormat | RecordsJSONFormat + ? never + : T // happens only when Format could not be inferred from a literal export interface Row< JSONType = unknown, diff --git a/packages/client-node/__tests__/integration/node_query_format_types.test.ts b/packages/client-node/__tests__/integration/node_query_format_types.test.ts index e19a33ff..d9fc63ed 100644 --- a/packages/client-node/__tests__/integration/node_query_format_types.test.ts +++ b/packages/client-node/__tests__/integration/node_query_format_types.test.ts @@ -1,11 +1,10 @@ -import type { ResultSet } from '../../src' import type { ClickHouseClient as BaseClickHouseClient, DataFormat, } from '@clickhouse/client-common' import { createTableWithFields } from '@test/fixtures/table_with_fields' import { guid } from '@test/utils' -import type { ClickHouseClient } from '../../src' +import type { ClickHouseClient, ResultSet } from '../../src' import { createNodeTestClient } from '../utils/node_client' /* eslint-disable @typescript-eslint/no-unused-expressions */ diff --git a/packages/client-node/__tests__/integration/node_stream_json_formats.test.ts b/packages/client-node/__tests__/integration/node_stream_json_formats.test.ts index c196e2e2..802c878d 100644 --- a/packages/client-node/__tests__/integration/node_stream_json_formats.test.ts +++ b/packages/client-node/__tests__/integration/node_stream_json_formats.test.ts @@ -1,4 +1,4 @@ -import { type ClickHouseClient } from '@clickhouse/client-common' +import { type ClickHouseClient, isProgressRow } from '@clickhouse/client-common' import { createSimpleTable } from '@test/fixtures/simple_table' import { assertJsonValues, jsonValues } from '@test/fixtures/test_data' import { createTestClient, guid } from '@test/utils' @@ -231,6 +231,26 @@ describe('[Node.js] stream JSON formats', () => { }) }) + describe('JSONEachRowWithProgress', () => { + it('should work', async () => { + const limit = 2 + const expectedProgressRowsCount = 4 + const rs = await client.query({ + query: `SELECT number FROM system.numbers LIMIT ${limit}`, + format: 'JSONEachRowWithProgress', + clickhouse_settings: { + max_block_size: '1', // reduce the block size, so the progress is reported more frequently + }, + }) + const rows = await rs.json<{ number: 'string' }>() + expect(rows.length).toEqual(limit + expectedProgressRowsCount) + expect(rows.filter((r) => !isProgressRow(r)) as unknown[]).toEqual([ + { row: { number: '0' } }, + { row: { number: '1' } }, + ]) + }) + }) + it('does not throw if stream closes prematurely', async () => { const stream = new Stream.Readable({ objectMode: true, diff --git a/packages/client-node/src/index.ts b/packages/client-node/src/index.ts index 60786892..68d4a300 100644 --- a/packages/client-node/src/index.ts +++ b/packages/client-node/src/index.ts @@ -60,4 +60,7 @@ export { type ParsedColumnType, parseColumnType, SimpleColumnTypes, + type ProgressRow, + isProgressRow, + type RowOrProgress, } from '@clickhouse/client-common' diff --git a/packages/client-node/src/result_set.ts b/packages/client-node/src/result_set.ts index 43aceb28..d05b275d 100644 --- a/packages/client-node/src/result_set.ts +++ b/packages/client-node/src/result_set.ts @@ -85,7 +85,7 @@ export class ResultSet const stream = this.stream() for await (const rows of stream) { for (const row of rows) { - result.push(row.json()) + result.push(row.json() as T) } } return result as any diff --git a/packages/client-web/__tests__/integration/web_select_streaming.test.ts b/packages/client-web/__tests__/integration/web_select_streaming.test.ts index 66cf2934..8cc00e5e 100644 --- a/packages/client-web/__tests__/integration/web_select_streaming.test.ts +++ b/packages/client-web/__tests__/integration/web_select_streaming.test.ts @@ -1,4 +1,5 @@ import type { ClickHouseClient, Row } from '@clickhouse/client-common' +import { isProgressRow } from '@clickhouse/client-common' import { createTestClient } from '@test/utils' import { genLargeStringsDataset } from '@test/utils/datasets' @@ -125,6 +126,24 @@ describe('[Web] SELECT streaming', () => { ]) }) + it('should return objects in JSONEachRowWithProgress format', async () => { + const limit = 2 + const expectedProgressRowsCount = 4 + const rs = await client.query({ + query: `SELECT * FROM system.numbers LIMIT ${limit}`, + format: 'JSONEachRowWithProgress', + clickhouse_settings: { + max_block_size: '1', // reduce the block size, so the progress is reported more frequently + }, + }) + const rows = await rs.json<{ number: string }>() + expect(rows.length).toEqual(limit + expectedProgressRowsCount) + expect(rows.filter((r) => !isProgressRow(r)) as unknown[]).toEqual([ + { row: { number: '0' } }, + { row: { number: '1' } }, + ]) + }) + it('returns stream of objects in JSONStringsEachRow format', async () => { const result = await client.query({ query: 'SELECT number FROM system.numbers LIMIT 5', diff --git a/packages/client-web/src/index.ts b/packages/client-web/src/index.ts index 791a74a6..27503be7 100644 --- a/packages/client-web/src/index.ts +++ b/packages/client-web/src/index.ts @@ -59,4 +59,7 @@ export { type ParsedColumnType, parseColumnType, SimpleColumnTypes, + type ProgressRow, + isProgressRow, + type RowOrProgress, } from '@clickhouse/client-common' diff --git a/packages/client-web/src/result_set.ts b/packages/client-web/src/result_set.ts index fc744812..ecf6cfbb 100644 --- a/packages/client-web/src/result_set.ts +++ b/packages/client-web/src/result_set.ts @@ -50,7 +50,7 @@ export class ResultSet break } for (const row of value) { - result.push(row.json()) + result.push(row.json() as T) } } return result as any