Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix web impl streaming line breaks with large rows #333

Merged
merged 7 commits into from
Oct 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# 1.7.0 (Common, Node.js, Web)

## Bug fixes

- (Web only) Fixed an issue where streaming large datasets could provide corrupted results. See [#333](https://github.com/ClickHouse/clickhouse-js/pull/333) (PR) for more details.

## New features

- (Experimental) Exposed the `parseColumnType` function that takes a string representation of a ClickHouse type (e.g., `FixedString(16)`, `Nullable(Int32)`, etc.) and returns an AST-like object that represents the type. For example:

```ts
Expand Down
37 changes: 37 additions & 0 deletions packages/client-common/__tests__/utils/datasets.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import type { ClickHouseClient } from '@clickhouse/client-common'
import { fakerRU } from '@faker-js/faker'
import { createTableWithFields } from '@test/fixtures/table_with_fields'

export async function genLargeStringsDataset<Stream = unknown>(
client: ClickHouseClient<Stream>,
{
rows,
words,
}: {
rows: number
words: number
},
): Promise<{
table: string
values: { id: number; sentence: string; timestamp: string }[]
}> {
const table = await createTableWithFields(
client as ClickHouseClient,
`sentence String, timestamp String`,
)
const values = [...new Array(rows)].map((_, id) => ({
id,
// it seems that it is easier to trigger an incorrect behavior with non-ASCII symbols
sentence: fakerRU.lorem.sentence(words),
timestamp: new Date().toISOString(),
}))
await client.insert({
table,
values,
format: 'JSONEachRow',
})
return {
table,
values,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ import {
type ClickHouseClient,
type ClickHouseSettings,
} from '@clickhouse/client-common'
import { fakerRU } from '@faker-js/faker'
import { createSimpleTable } from '@test/fixtures/simple_table'
import { createTableWithFields } from '@test/fixtures/table_with_fields'
import { createTestClient, guid } from '@test/utils'
import { genLargeStringsDataset } from '@test/utils/datasets'
import { tableFromIPC } from 'apache-arrow'
import { Buffer } from 'buffer'
import Fs from 'fs'
Expand Down Expand Up @@ -152,40 +151,9 @@ describe('[Node.js] streaming e2e', () => {
// Here we generate a large enough dataset to break into multiple chunks while streaming,
// effectively testing the implementation of incomplete rows handling
describe('should correctly process multiple chunks', () => {
async function generateData({
rows,
words,
}: {
rows: number
words: number
}): Promise<{
table: string
values: { id: number; sentence: string; timestamp: string }[]
}> {
const table = await createTableWithFields(
client as ClickHouseClient,
`sentence String, timestamp String`,
)
const values = [...new Array(rows)].map((_, id) => ({
id,
// it seems that it is easier to trigger an incorrect behavior with non-ASCII symbols
sentence: fakerRU.lorem.sentence(words),
timestamp: new Date().toISOString(),
}))
await client.insert({
table,
values,
format: 'JSONEachRow',
})
return {
table,
values,
}
}

describe('large amount of rows', () => {
it('should work with .json()', async () => {
const { table, values } = await generateData({
const { table, values } = await genLargeStringsDataset(client, {
rows: 10000,
words: 10,
})
Expand All @@ -199,7 +167,7 @@ describe('[Node.js] streaming e2e', () => {
})

it('should work with .stream()', async () => {
const { table, values } = await generateData({
const { table, values } = await genLargeStringsDataset(client, {
rows: 10000,
words: 10,
})
Expand All @@ -222,7 +190,7 @@ describe('[Node.js] streaming e2e', () => {

describe("rows that don't fit into a single chunk", () => {
it('should work with .json()', async () => {
const { table, values } = await generateData({
const { table, values } = await genLargeStringsDataset(client, {
rows: 5,
words: 10000,
})
Expand All @@ -236,7 +204,7 @@ describe('[Node.js] streaming e2e', () => {
})

it('should work with .stream()', async () => {
const { table, values } = await generateData({
const { table, values } = await genLargeStringsDataset(client, {
rows: 5,
words: 10000,
})
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
import type { ClickHouseClient, Row } from '@clickhouse/client-common'
import { createTestClient } from '@test/utils'
import { genLargeStringsDataset } from '@test/utils/datasets'

describe('[Web] SELECT streaming', () => {
let client: ClickHouseClient<ReadableStream<Row[]>>
afterEach(async () => {
await client.close()
})
beforeEach(async () => {
client = createTestClient()
client = createTestClient({
// It is required to disable keep-alive to allow for larger inserts
// https://fetch.spec.whatwg.org/#http-network-or-cache-fetch
// If contentLength is non-null and httpRequest’s keepalive is true, then:
// <...>
// If the sum of contentLength and inflightKeepaliveBytes is greater than 64 kibibytes, then return a network error.
keep_alive: { enabled: false },
})
})

describe('consume the response only once', () => {
Expand Down Expand Up @@ -199,6 +207,75 @@ describe('[Web] SELECT streaming', () => {
])
})
})

// See https://github.com/ClickHouse/clickhouse-js/issues/171 for more details
// Here we generate a large enough dataset to break into multiple chunks while streaming,
// effectively testing the implementation of incomplete rows handling
describe('should correctly process multiple chunks', () => {
describe('large amount of rows', () => {
it('should work with .json()', async () => {
const { table, values } = await genLargeStringsDataset(client, {
rows: 10000,
words: 10,
})
const result = await client
.query({
query: `SELECT * FROM ${table} ORDER BY id ASC`,
format: 'JSONEachRow',
})
.then((r) => r.json())
expect(result).toEqual(values)
})

it('should work with .stream()', async () => {
const { table, values } = await genLargeStringsDataset(client, {
rows: 10000,
words: 10,
})
const stream = await client
.query({
query: `SELECT * FROM ${table} ORDER BY id ASC`,
format: 'JSONEachRow',
})
.then((r) => r.stream())

const result = await rowsJsonValues(stream)
expect(result).toEqual(values)
})
})

describe("rows that don't fit into a single chunk", () => {
it('should work with .json()', async () => {
const { table, values } = await genLargeStringsDataset(client, {
rows: 5,
words: 10000,
})
const result = await client
.query({
query: `SELECT * FROM ${table} ORDER BY id ASC`,
format: 'JSONEachRow',
})
.then((r) => r.json())
expect(result).toEqual(values)
})

it('should work with .stream()', async () => {
const { table, values } = await genLargeStringsDataset(client, {
rows: 5,
words: 10000,
})
const stream = await client
.query({
query: `SELECT * FROM ${table} ORDER BY id ASC`,
format: 'JSONEachRow',
})
.then((r) => r.stream())

const result = await rowsJsonValues(stream)
expect(result).toEqual(values)
})
})
})
})

async function rowsJsonValues<T = unknown>(
Expand Down
67 changes: 48 additions & 19 deletions packages/client-web/src/result_set.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import type {
import {
isNotStreamableJSONFamily,
isStreamableJSONFamily,
validateStreamFormat,
} from '@clickhouse/client-common'
import { validateStreamFormat } from '@clickhouse/client-common'
import { getAsText } from './utils'

const NEWLINE = 0x0a as const

export class ResultSet<Format extends DataFormat | unknown>
implements BaseResultSet<ReadableStream<Row[]>, Format>
{
Expand Down Expand Up @@ -67,40 +69,67 @@ export class ResultSet<Format extends DataFormat | unknown>
this.markAsConsumed()
validateStreamFormat(this.format)

let decodedChunk = ''
let incompleteChunks: Uint8Array[] = []
let totalIncompleteLength = 0
const decoder = new TextDecoder('utf-8')
const transform = new TransformStream({
start() {
//
},
transform: (chunk, controller) => {
transform: (chunk: Uint8Array, controller) => {
if (chunk === null) {
controller.terminate()
}
decodedChunk += decoder.decode(chunk)
const rows: Row[] = []
// eslint-disable-next-line no-constant-condition
while (true) {
const idx = decodedChunk.indexOf('\n')
if (idx !== -1) {
const text = decodedChunk.slice(0, idx)
decodedChunk = decodedChunk.slice(idx + 1)
let idx: number
let lastIdx = 0
do {
// an unescaped newline character denotes the end of a row
idx = chunk.indexOf(NEWLINE, lastIdx)
// there is no complete row in the rest of the current chunk
if (idx === -1) {
// to be processed during the next transform iteration
const incompleteChunk = chunk.slice(lastIdx)
incompleteChunks.push(incompleteChunk)
totalIncompleteLength += incompleteChunk.length
// send the extracted rows to the consumer, if any
if (rows.length > 0) {
controller.enqueue(rows)
}
} else {
let text: string
if (incompleteChunks.length > 0) {
const completeRowBytes = new Uint8Array(
totalIncompleteLength + idx,
)

// using the incomplete chunks from the previous iterations
let offset = 0
incompleteChunks.forEach((incompleteChunk) => {
completeRowBytes.set(incompleteChunk, offset)
offset += incompleteChunk.length
})
// finalize the row with the current chunk slice that ends with a newline
const finalChunk = chunk.slice(0, idx)
completeRowBytes.set(finalChunk, offset)

// reset the incomplete chunks
incompleteChunks = []
totalIncompleteLength = 0

text = decoder.decode(completeRowBytes)
} else {
text = decoder.decode(chunk.slice(lastIdx, idx))
}
rows.push({
text,
json<T>(): T {
return JSON.parse(text)
},
})
} else {
if (rows.length) {
controller.enqueue(rows)
}
break
lastIdx = idx + 1 // skipping newline character
}
}
},
flush() {
decodedChunk = ''
} while (idx !== -1)
},
})

Expand Down