Skip to content

Commit

Permalink
Add some edge case scenarios for exec insert streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
slvrtrn committed Jul 11, 2024
1 parent 5367048 commit b198439
Showing 1 changed file with 74 additions and 6 deletions.
80 changes: 74 additions & 6 deletions packages/client-node/__tests__/integration/node_exec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,13 @@ describe('[Node.js] exec', () => {
})

describe('custom insert streaming with exec', () => {
it('should send an insert stream', async () => {
const tableName = `test_node_exec_insert_stream_${guid()}`
let tableName: string
beforeEach(async () => {
tableName = `test_node_exec_insert_stream_${guid()}`
await createSimpleTable(client, tableName)
})

it('should send an insert stream', async () => {
const stream = Stream.Readable.from(['42,foobar,"[1,2]"'], {
objectMode: false,
})
Expand All @@ -83,18 +86,83 @@ describe('[Node.js] exec', () => {
})
// the result stream contains nothing useful for an insert and should be immediately drained to release the socket
await drainStream(execResult.stream)
await checkInsertedValues([
{
id: '42',
name: 'foobar',
sku: [1, 2],
},
])
})

const rs = await client.query({
query: `SELECT * FROM ${tableName}`,
format: 'JSONEachRow',
it('should not fail with an empty stream', async () => {
const stream = new Stream.Readable({
read() {
// required
},
objectMode: false,
})
const execPromise = client.exec({
query: `INSERT INTO ${tableName} FORMAT CSV`,
values: stream,
})
expect(await rs.json()).toEqual([
// close the empty stream after the request is sent
stream.push(null)
// the result stream contains nothing useful for an insert and should be immediately drained to release the socket
const execResult = await execPromise
await drainStream(execResult.stream)
await checkInsertedValues([])
})

it('should not fail with an already closed stream', async () => {
const stream = new Stream.Readable({
read() {
// required
},
objectMode: false,
})
stream.push('42,foobar,"[1,2]"\n')
// close the stream with some values
stream.push(null)
const execResult = await client.exec({
query: `INSERT INTO ${tableName} FORMAT CSV`,
values: stream,
})
// the result stream contains nothing useful for an insert and should be immediately drained to release the socket
await drainStream(execResult.stream)
await checkInsertedValues([
{
id: '42',
name: 'foobar',
sku: [1, 2],
},
])
})

it('should not fail with an empty and already closed stream', async () => {
const stream = new Stream.Readable({
read() {
// required
},
objectMode: false,
})
// close the empty stream immediately
stream.push(null)
const execResult = await client.exec({
query: `INSERT INTO ${tableName} FORMAT CSV`,
values: stream,
})
// the result stream contains nothing useful for an insert and should be immediately drained to release the socket
await drainStream(execResult.stream)
await checkInsertedValues([])
})

async function checkInsertedValues<T = unknown>(expected: Array<T>) {
const rs = await client.query({
query: `SELECT * FROM ${tableName}`,
format: 'JSONEachRow',
})
expect(await rs.json()).toEqual(expected)
}
})
})

0 comments on commit b198439

Please sign in to comment.