Skip to content

Commit

Permalink
JSONEachRowWithProgress (#334)
Browse files Browse the repository at this point in the history
  • Loading branch information
slvrtrn authored Oct 5, 2024
1 parent 1c9b28b commit f2781ba
Show file tree
Hide file tree
Showing 15 changed files with 173 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

## New features

- Added `JSONEachRowWithProgress` format support, `ProgressRow` interface, and `isProgressRow` type guard. See [this Node.js example](./examples/node/select_json_each_row_with_progress.ts) for more details. It should work similarly with the Web version.
- (Experimental) Exposed the `parseColumnType` function that takes a string representation of a ClickHouse type (e.g., `FixedString(16)`, `Nullable(Int32)`, etc.) and returns an AST-like object that represents the type. For example:

```ts
Expand Down
1 change: 1 addition & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ If something is missing, or you found a mistake in one of these examples, please
- [select_streaming_json_each_row.ts](node/select_streaming_json_each_row.ts) - (Node.js only) streaming JSON\* formats from ClickHouse and processing it with `on('data')` event.
- [select_streaming_json_each_row_for_await.ts](node/select_streaming_json_each_row_for_await.ts) - (Node.js only) similar to [select_streaming_json_each_row.ts](node/select_streaming_json_each_row.ts), but using the `for await` loop syntax.
- [select_streaming_text_line_by_line.ts](node/select_streaming_text_line_by_line.ts) - (Node.js only) streaming text formats from ClickHouse and processing it line by line. In this example, CSV format is used.
- [select_json_each_row_with_progress.ts](node/select_json_each_row_with_progress.ts) - streaming using `JSONEachRowWithProgress` format, checking for the progress rows in the stream.

#### Data types

Expand Down
39 changes: 39 additions & 0 deletions examples/node/select_json_each_row_with_progress.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { createClient } from '@clickhouse/client'
import { isProgressRow } from '@clickhouse/client-common'

/** See the format spec - https://clickhouse.com/docs/en/interfaces/formats#jsoneachrowwithprogress
* When JSONEachRowWithProgress format is used in TypeScript,
* the ResultSet should infer the final row type as `{ row: Data } | ProgressRow`. */
type Data = { number: string }

void (async () => {
const client = createClient()
const rs = await client.query({
query: 'SELECT number FROM system.numbers LIMIT 100',
format: 'JSONEachRowWithProgress',
})

let totalRows = 0
let totalProgressRows = 0

const stream = rs.stream<Data>()
for await (const rows of stream) {
for (const row of rows) {
const decodedRow = row.json()
if (isProgressRow(decodedRow)) {
console.log('Got a progress row:', decodedRow)
totalProgressRows++
} else {
totalRows++
if (totalRows % 100 === 0) {
console.log('Sample row:', decodedRow)
}
}
}
}

console.log('Total rows:', totalRows)
console.log('Total progress rows:', totalProgressRows)

await client.close()
})()
29 changes: 29 additions & 0 deletions packages/client-common/__tests__/unit/clickhouse_types.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { isProgressRow } from '@clickhouse/client-common'

describe('ClickHouse types', () => {
it('should check if a row is progress row', async () => {
const row = {
progress: {
read_rows: '1',
read_bytes: '1',
written_rows: '1',
written_bytes: '1',
total_rows_to_read: '1',
result_rows: '1',
result_bytes: '1',
elapsed_ns: '1',
},
}
expect(isProgressRow(row)).toBeTruthy()
expect(isProgressRow({})).toBeFalsy()
expect(
isProgressRow({
...row,
extra: 'extra',
}),
).toBeFalsy()
expect(isProgressRow(null)).toBeFalsy()
expect(isProgressRow(42)).toBeFalsy()
expect(isProgressRow({ foo: 'bar' })).toBeFalsy()
})
})
16 changes: 16 additions & 0 deletions packages/client-common/src/clickhouse_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,19 @@ export interface WithClickHouseSummary {
export interface WithResponseHeaders {
response_headers: ResponseHeaders
}

/** X-ClickHouse-Summary response header and progress rows from JSONEachRowWithProgress share the same structure */
export interface ProgressRow {
progress: ClickHouseSummary
}

/** Type guard to use with JSONEachRowWithProgress, checking if the emitted row is a progress row.
* @see https://clickhouse.com/docs/en/interfaces/formats#jsoneachrowwithprogress */
export function isProgressRow(row: unknown): row is ProgressRow {
return (
row !== null &&
typeof row === 'object' &&
'progress' in row &&
Object.keys(row).length === 1
)
}
1 change: 1 addition & 0 deletions packages/client-common/src/data_formatter/formatter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export const StreamableJSONFormats = [
'JSONCompactEachRowWithNamesAndTypes',
'JSONCompactStringsEachRowWithNames',
'JSONCompactStringsEachRowWithNamesAndTypes',
'JSONEachRowWithProgress',
] as const
export const RecordsJSONFormats = ['JSONObjectEachRow'] as const
export const SingleDocumentJSONFormats = [
Expand Down
3 changes: 3 additions & 0 deletions packages/client-common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export {
export { type BaseClickHouseClientConfigOptions } from './config'
export type {
Row,
RowOrProgress,
BaseResultSet,
ResultJSONType,
RowJSONType,
Expand Down Expand Up @@ -51,7 +52,9 @@ export type {
ResponseHeaders,
WithClickHouseSummary,
WithResponseHeaders,
ProgressRow,
} from './clickhouse_types'
export { isProgressRow } from './clickhouse_types'
export {
type ClickHouseSettings,
type MergeTreeSettings,
Expand Down
56 changes: 34 additions & 22 deletions packages/client-common/src/result.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import type { ResponseHeaders, ResponseJSON } from './clickhouse_types'
import type {
ProgressRow,
ResponseHeaders,
ResponseJSON,
} from './clickhouse_types'
import type {
DataFormat,
RawDataFormat,
Expand All @@ -8,6 +12,8 @@ import type {
StreamableJSONDataFormat,
} from './data_formatter'

export type RowOrProgress<T> = { row: T } | ProgressRow

export type ResultStream<Format extends DataFormat | unknown, Stream> =
// JSON*EachRow (except JSONObjectEachRow), CSV, TSV etc.
Format extends StreamableDataFormat
Expand All @@ -22,29 +28,35 @@ export type ResultStream<Format extends DataFormat | unknown, Stream> =
Stream

export type ResultJSONType<T, F extends DataFormat | unknown> =
// JSON*EachRow formats except JSONObjectEachRow
F extends StreamableJSONDataFormat
? T[]
: // JSON formats with known layout { data, meta, statistics, ... }
F extends SingleDocumentJSONFormat
? ResponseJSON<T>
: // JSON formats represented as a Record<string, T>
F extends RecordsJSONFormat
? Record<string, T>
: // CSV, TSV etc. - cannot be represented as JSON
F extends RawDataFormat
? never
: // happens only when Format could not be inferred from a literal
T[] | Record<string, T> | ResponseJSON<T>
// Emits either a { row: T } or an object with progress
F extends 'JSONEachRowWithProgress'
? RowOrProgress<T>[]
: // JSON*EachRow formats except JSONObjectEachRow
F extends StreamableJSONDataFormat
? T[]
: // JSON formats with known layout { data, meta, statistics, ... }
F extends SingleDocumentJSONFormat
? ResponseJSON<T>
: // JSON formats represented as a Record<string, T>
F extends RecordsJSONFormat
? Record<string, T>
: // CSV, TSV etc. - cannot be represented as JSON
F extends RawDataFormat
? never
: // happens only when Format could not be inferred from a literal
T[] | Record<string, T> | ResponseJSON<T>

export type RowJSONType<T, F extends DataFormat | unknown> =
// JSON*EachRow formats
F extends StreamableJSONDataFormat
? T
: // CSV, TSV, non-streamable JSON formats - cannot be streamed as JSON
F extends RawDataFormat | SingleDocumentJSONFormat | RecordsJSONFormat
? never
: T // happens only when Format could not be inferred from a literal
// Emits either a { row: T } or an object with progress
F extends 'JSONEachRowWithProgress'
? RowOrProgress<T>
: // JSON*EachRow formats
F extends StreamableJSONDataFormat
? T
: // CSV, TSV, non-streamable JSON formats - cannot be streamed as JSON
F extends RawDataFormat | SingleDocumentJSONFormat | RecordsJSONFormat
? never
: T // happens only when Format could not be inferred from a literal

export interface Row<
JSONType = unknown,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import type { ResultSet } from '../../src'
import type {
ClickHouseClient as BaseClickHouseClient,
DataFormat,
} from '@clickhouse/client-common'
import { createTableWithFields } from '@test/fixtures/table_with_fields'
import { guid } from '@test/utils'
import type { ClickHouseClient } from '../../src'
import type { ClickHouseClient, ResultSet } from '../../src'
import { createNodeTestClient } from '../utils/node_client'

/* eslint-disable @typescript-eslint/no-unused-expressions */
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { type ClickHouseClient } from '@clickhouse/client-common'
import { type ClickHouseClient, isProgressRow } from '@clickhouse/client-common'
import { createSimpleTable } from '@test/fixtures/simple_table'
import { assertJsonValues, jsonValues } from '@test/fixtures/test_data'
import { createTestClient, guid } from '@test/utils'
Expand Down Expand Up @@ -231,6 +231,26 @@ describe('[Node.js] stream JSON formats', () => {
})
})

describe('JSONEachRowWithProgress', () => {
it('should work', async () => {
const limit = 2
const expectedProgressRowsCount = 4
const rs = await client.query({
query: `SELECT number FROM system.numbers LIMIT ${limit}`,
format: 'JSONEachRowWithProgress',
clickhouse_settings: {
max_block_size: '1', // reduce the block size, so the progress is reported more frequently
},
})
const rows = await rs.json<{ number: 'string' }>()
expect(rows.length).toEqual(limit + expectedProgressRowsCount)
expect(rows.filter((r) => !isProgressRow(r)) as unknown[]).toEqual([
{ row: { number: '0' } },
{ row: { number: '1' } },
])
})
})

it('does not throw if stream closes prematurely', async () => {
const stream = new Stream.Readable({
objectMode: true,
Expand Down
3 changes: 3 additions & 0 deletions packages/client-node/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,7 @@ export {
type ParsedColumnType,
parseColumnType,
SimpleColumnTypes,
type ProgressRow,
isProgressRow,
type RowOrProgress,
} from '@clickhouse/client-common'
2 changes: 1 addition & 1 deletion packages/client-node/src/result_set.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ export class ResultSet<Format extends DataFormat | unknown>
const stream = this.stream<T>()
for await (const rows of stream) {
for (const row of rows) {
result.push(row.json())
result.push(row.json() as T)
}
}
return result as any
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { ClickHouseClient, Row } from '@clickhouse/client-common'
import { isProgressRow } from '@clickhouse/client-common'
import { createTestClient } from '@test/utils'
import { genLargeStringsDataset } from '@test/utils/datasets'

Expand Down Expand Up @@ -125,6 +126,24 @@ describe('[Web] SELECT streaming', () => {
])
})

it('should return objects in JSONEachRowWithProgress format', async () => {
const limit = 2
const expectedProgressRowsCount = 4
const rs = await client.query({
query: `SELECT * FROM system.numbers LIMIT ${limit}`,
format: 'JSONEachRowWithProgress',
clickhouse_settings: {
max_block_size: '1', // reduce the block size, so the progress is reported more frequently
},
})
const rows = await rs.json<{ number: string }>()
expect(rows.length).toEqual(limit + expectedProgressRowsCount)
expect(rows.filter((r) => !isProgressRow(r)) as unknown[]).toEqual([
{ row: { number: '0' } },
{ row: { number: '1' } },
])
})

it('returns stream of objects in JSONStringsEachRow format', async () => {
const result = await client.query({
query: 'SELECT number FROM system.numbers LIMIT 5',
Expand Down
3 changes: 3 additions & 0 deletions packages/client-web/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,7 @@ export {
type ParsedColumnType,
parseColumnType,
SimpleColumnTypes,
type ProgressRow,
isProgressRow,
type RowOrProgress,
} from '@clickhouse/client-common'
2 changes: 1 addition & 1 deletion packages/client-web/src/result_set.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export class ResultSet<Format extends DataFormat | unknown>
break
}
for (const row of value) {
result.push(row.json())
result.push(row.json() as T)
}
}
return result as any
Expand Down

0 comments on commit f2781ba

Please sign in to comment.