From 48dd6884ed5df502a194f0671a6332dc5c77d3d4 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Fri, 15 Dec 2023 17:43:37 +0100 Subject: [PATCH] fix(clickhouse-driver): Correct parsing for DateTime64, fix #7537 --- .github/actions/integration/clickhouse.sh | 6 +-- .../cubejs-clickhouse-driver/package.json | 2 + .../src/ClickHouseDriver.ts | 6 ++- .../src/HydrationStream.ts | 5 ++- .../test/ClickHouseDriver.test.ts | 44 ++++++++----------- .../clickhouse/ClickHouseDbRunner.js | 4 +- .../src/db/clickhouse.ts | 26 +++++++++++ 7 files changed, 61 insertions(+), 32 deletions(-) create mode 100644 packages/cubejs-testing-shared/src/db/clickhouse.ts diff --git a/.github/actions/integration/clickhouse.sh b/.github/actions/integration/clickhouse.sh index 8900415d690f2..ee1e968df37e3 100755 --- a/.github/actions/integration/clickhouse.sh +++ b/.github/actions/integration/clickhouse.sh @@ -4,21 +4,21 @@ set -eo pipefail # Debug log for test containers export DEBUG=testcontainers -export TEST_CLICKHOUSE_VERSION=21.1.2 +export TEST_CLICKHOUSE_VERSION=23.11 echo "::group::Clickhouse ${TEST_CLICKHOUSE_VERSION}"; docker pull yandex/clickhouse-server:${TEST_CLICKHOUSE_VERSION} yarn lerna run --concurrency 1 --stream --no-prefix integration:clickhouse echo "::endgroup::" -export TEST_CLICKHOUSE_VERSION=20.6 +export TEST_CLICKHOUSE_VERSION=22.8 echo "::group::Clickhouse ${TEST_CLICKHOUSE_VERSION}"; docker pull yandex/clickhouse-server:${TEST_CLICKHOUSE_VERSION} yarn lerna run --concurrency 1 --stream --no-prefix integration:clickhouse echo "::endgroup::" -export TEST_CLICKHOUSE_VERSION=19 +export TEST_CLICKHOUSE_VERSION=21.8 echo "::group::Clickhouse ${TEST_CLICKHOUSE_VERSION}"; docker pull yandex/clickhouse-server:${TEST_CLICKHOUSE_VERSION} diff --git a/packages/cubejs-clickhouse-driver/package.json b/packages/cubejs-clickhouse-driver/package.json index fa1b0428720af..0c9e0096c543d 100644 --- a/packages/cubejs-clickhouse-driver/package.json +++ b/packages/cubejs-clickhouse-driver/package.json @@ -31,11 +31,13 @@ "@cubejs-backend/base-driver": "^0.34.33", "@cubejs-backend/shared": "^0.34.33", "generic-pool": "^3.6.0", + "moment": "^2.24.0", "sqlstring": "^2.3.1", "uuid": "^8.3.2" }, "license": "Apache-2.0", "devDependencies": { + "@cubejs-backend/testing-shared": "^0.34.35", "@cubejs-backend/linter": "^0.34.25", "@types/jest": "^27", "jest": "27", diff --git a/packages/cubejs-clickhouse-driver/src/ClickHouseDriver.ts b/packages/cubejs-clickhouse-driver/src/ClickHouseDriver.ts index 0134fff718192..c9cf04e60bcb2 100644 --- a/packages/cubejs-clickhouse-driver/src/ClickHouseDriver.ts +++ b/packages/cubejs-clickhouse-driver/src/ClickHouseDriver.ts @@ -21,6 +21,8 @@ import { import genericPool, { Pool } from 'generic-pool'; import { v4 as uuidv4 } from 'uuid'; import sqlstring from 'sqlstring'; +import * as moment from 'moment'; + import { HydrationStream } from './HydrationStream'; const ClickHouse = require('@apla/clickhouse'); @@ -239,8 +241,10 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface { const value = row[field]; if (value !== null) { const meta = res.meta.find((m: any) => m.name === field); - if (meta.type.includes('DateTime')) { + if (meta.type === 'DateTime') { row[field] = `${value.substring(0, 10)}T${value.substring(11, 22)}.000`; + } else if (meta.type.includes('DateTime64')) { + row[field] = moment.utc(value).format(moment.HTML5_FMT.DATETIME_LOCAL_MS); } else if (meta.type.includes('Date')) { row[field] = `${value}T00:00:00.000`; } else if (meta.type.includes('Int') || meta.type.includes('Float') || meta.type.includes('Decimal')) { diff --git a/packages/cubejs-clickhouse-driver/src/HydrationStream.ts b/packages/cubejs-clickhouse-driver/src/HydrationStream.ts index 68dadc7e31680..853460d12d135 100644 --- a/packages/cubejs-clickhouse-driver/src/HydrationStream.ts +++ b/packages/cubejs-clickhouse-driver/src/HydrationStream.ts @@ -1,5 +1,6 @@ /* eslint-disable no-restricted-syntax */ import stream, { TransformCallback } from 'stream'; +import * as moment from 'moment/moment'; export type HydrationMap = Record; @@ -11,8 +12,10 @@ export class HydrationStream extends stream.Transform { for (const [index, value] of Object.entries(row)) { if (value !== null) { const metaForField = meta[index]; - if (metaForField.type.includes('DateTime')) { + if (metaForField.type === 'DateTime') { row[index] = `${value.substring(0, 10)}T${value.substring(11, 22)}.000`; + } else if (metaForField.type.includes('DateTime64')) { + row[index] = moment.utc(value).format(moment.HTML5_FMT.DATETIME_LOCAL_MS); } else if (metaForField.type.includes('Date')) { row[index] = `${value}T00:00:00.000`; } else if (metaForField.type.includes('Int') diff --git a/packages/cubejs-clickhouse-driver/test/ClickHouseDriver.test.ts b/packages/cubejs-clickhouse-driver/test/ClickHouseDriver.test.ts index d818e7ac04a6a..a55fd5fed29d6 100644 --- a/packages/cubejs-clickhouse-driver/test/ClickHouseDriver.test.ts +++ b/packages/cubejs-clickhouse-driver/test/ClickHouseDriver.test.ts @@ -1,10 +1,13 @@ import { GenericContainer } from 'testcontainers'; import { ClickHouseDriver } from '../src/ClickHouseDriver'; +import { ClickhouseDBRunner } from '@cubejs-backend/testing-shared/dist/src/db/clickhouse'; const streamToArray = require('stream-to-array'); describe('ClickHouseDriver', () => { + jest.setTimeout(20 * 1000); + let container: any; let config: any; @@ -20,13 +23,7 @@ describe('ClickHouseDriver', () => { // eslint-disable-next-line func-names beforeAll(async () => { - jest.setTimeout(20 * 1000); - - const version = process.env.TEST_CLICKHOUSE_VERSION || 'latest'; - - container = await new GenericContainer(`yandex/clickhouse-server:${version}`) - .withExposedPorts(8123) - .start(); + container = await ClickhouseDBRunner.startContainer({}); config = { host: 'localhost', @@ -35,13 +32,12 @@ describe('ClickHouseDriver', () => { await doWithDriver(async (driver) => { await driver.createSchemaIfNotExists('test'); - // Unsupported in old servers - // datetime64 DateTime64, await driver.query( ` CREATE TABLE test.types_test ( date Date, datetime DateTime, + datetime64 DateTime64(3, 'UTC'), int8 Int8, int16 Int16, int32 Int32, @@ -60,17 +56,17 @@ describe('ClickHouseDriver', () => { [] ); - await driver.query('INSERT INTO test.types_test VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', [ - '2020-01-01', '2020-01-01 00:00:00', 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1.01, 1.01, 1.01 + await driver.query('INSERT INTO test.types_test VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', [ + '2020-01-01', '2020-01-01 00:00:00', '2020-01-01 00:00:00.000', 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1.01, 1.01, 1.01 ]); - await driver.query('INSERT INTO test.types_test VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', [ - '2020-01-02', '2020-01-02 00:00:00', 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2.02, 2.02, 2.02 + await driver.query('INSERT INTO test.types_test VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', [ + '2020-01-02', '2020-01-02 00:00:00', '2020-01-02 00:00:00.123', 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2.02, 2.02, 2.02 ]); - await driver.query('INSERT INTO test.types_test VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', [ - '2020-01-03', '2020-01-03 00:00:00', 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3.03, 3.03, 3.03 + await driver.query('INSERT INTO test.types_test VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', [ + '2020-01-03', '2020-01-03 00:00:00', '2020-01-03 00:00:00.234', 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3.03, 3.03, 3.03 ]); }); - }); + }, 30 * 1000); // eslint-disable-next-line func-names afterAll(async () => { @@ -83,7 +79,7 @@ describe('ClickHouseDriver', () => { if (container) { await container.stop(); } - }); + }, 30 * 1000); it('should construct', async () => { await doWithDriver(async () => { @@ -160,8 +156,7 @@ describe('ClickHouseDriver', () => { expect(values).toEqual([{ date: '2020-01-01T00:00:00.000', datetime: '2020-01-01T00:00:00.000', - // Unsupported in old servers - // datetime64: '2020-01-01T00:00:00.00.000', + datetime64: '2020-01-01T00:00:00.000', int8: '1', int16: '1', int32: '1', @@ -242,7 +237,7 @@ describe('ClickHouseDriver', () => { }); }); - it('stream', async () => { + fit('stream', async () => { await doWithDriver(async (driver) => { const tableData = await driver.stream('SELECT * FROM test.types_test ORDER BY int8', [], { highWaterMark: 100, @@ -252,8 +247,7 @@ describe('ClickHouseDriver', () => { expect(tableData.types).toEqual([ { name: 'date', type: 'date' }, { name: 'datetime', type: 'timestamp' }, - // Unsupported in old servers - // { name: 'datetime64', type: 'timestamp' }, + { name: 'datetime64', type: 'timestamp' }, { name: 'int8', type: 'int' }, { name: 'int16', type: 'int' }, { name: 'int32', type: 'int' }, @@ -269,9 +263,9 @@ describe('ClickHouseDriver', () => { { name: 'decimal128', type: 'decimal' }, ]); expect(await streamToArray(tableData.rowStream)).toEqual([ - ['2020-01-01T00:00:00.000', '2020-01-01T00:00:00.000', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1.01', '1.01', '1.01'], - ['2020-01-02T00:00:00.000', '2020-01-02T00:00:00.000', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2.02', '2.02', '2.02'], - ['2020-01-03T00:00:00.000', '2020-01-03T00:00:00.000', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3.03', '3.03', '3.03'], + ['2020-01-01T00:00:00.000', '2020-01-01T00:00:00.000', '2020-01-01T00:00:00.000', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1.01', '1.01', '1.01'], + ['2020-01-02T00:00:00.000', '2020-01-02T00:00:00.000', '2020-01-02T00:00:00.123', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2.02', '2.02', '2.02'], + ['2020-01-03T00:00:00.000', '2020-01-03T00:00:00.000', '2020-01-03T00:00:00.234', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3.03', '3.03', '3.03'], ]); } finally { // @ts-ignore diff --git a/packages/cubejs-schema-compiler/test/integration/clickhouse/ClickHouseDbRunner.js b/packages/cubejs-schema-compiler/test/integration/clickhouse/ClickHouseDbRunner.js index 169673e4c28ce..164a04fe97bd5 100644 --- a/packages/cubejs-schema-compiler/test/integration/clickhouse/ClickHouseDbRunner.js +++ b/packages/cubejs-schema-compiler/test/integration/clickhouse/ClickHouseDbRunner.js @@ -68,9 +68,9 @@ export class ClickHouseDbRunner { testQueries = async (queries, prepareDataSet) => { if (!this.container && !process.env.TEST_CLICKHOUSE_HOST) { - const version = process.env.TEST_CLICKHOUSE_VERSION || '21.1.2'; + const version = process.env.TEST_CLICKHOUSE_VERSION || '23.11'; - this.container = await new GenericContainer(`yandex/clickhouse-server:${version}`) + this.container = await new GenericContainer(`clickhouse/clickhouse-server:${version}`) .withExposedPorts(8123) .start(); } diff --git a/packages/cubejs-testing-shared/src/db/clickhouse.ts b/packages/cubejs-testing-shared/src/db/clickhouse.ts new file mode 100644 index 0000000000000..bbbdb719580de --- /dev/null +++ b/packages/cubejs-testing-shared/src/db/clickhouse.ts @@ -0,0 +1,26 @@ +import { GenericContainer, Wait } from 'testcontainers'; + +import { DbRunnerAbstract, DBRunnerContainerOptions } from './db-runner.abstract'; + +type ClickhouseStartOptions = DBRunnerContainerOptions & { + version?: string, +}; + +export class ClickhouseDBRunner extends DbRunnerAbstract { + public static startContainer(options: ClickhouseStartOptions) { + const version = process.env.TEST_CLICKHOUSE_VERSION || options.version || '23.11'; + + const container = new GenericContainer(`clickhouse/clickhouse-server:${version}`) + .withExposedPorts(8123) + .withStartupTimeout(10 * 1000); + + if (options.volumes) { + // eslint-disable-next-line no-restricted-syntax + for (const { source, target, bindMode } of options.volumes) { + container.withBindMount(source, target, bindMode); + } + } + + return container.start(); + } +}