Skip to content

Commit

Permalink
feat(druid-driver): Support type receiving for new Druid versions
Browse files Browse the repository at this point in the history
  • Loading branch information
ovr committed Nov 14, 2023
1 parent 5d1ad88 commit 52fb634
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 12 deletions.
2 changes: 1 addition & 1 deletion .github/actions/integration/druid.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export DEBUG=testcontainers

export TEST_POSTGRES_VERSION=13
export TEST_ZOOKEEPER_VERSION=3.5
export TEST_DRUID_VERSION=0.19.0
export TEST_DRUID_VERSION=27.0.0

echo "::group::Druid ${TEST_DRUID_VERSION}";

Expand Down
10 changes: 5 additions & 5 deletions packages/cubejs-druid-driver/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ services:
restart: always

coordinator:
image: apache/druid:${TEST_DRUID_VERSION:-0.19.0}
image: apache/druid:${TEST_DRUID_VERSION:-27.0.0}
container_name: coordinator
volumes:
- ./storage:/opt/data
Expand All @@ -56,7 +56,7 @@ services:
start_period: 60s

broker:
image: apache/druid:${TEST_DRUID_VERSION:-0.19.0}
image: apache/druid:${TEST_DRUID_VERSION:-27.0.0}
container_name: broker
volumes:
- broker_var:/opt/druid/var
Expand All @@ -79,7 +79,7 @@ services:
start_period: 60s

historical:
image: apache/druid:${TEST_DRUID_VERSION:-0.19.0}
image: apache/druid:${TEST_DRUID_VERSION:-27.0.0}
container_name: historical
volumes:
- ./storage:/opt/data
Expand All @@ -103,7 +103,7 @@ services:
start_period: 60s

middlemanager:
image: apache/druid:${TEST_DRUID_VERSION:-0.19.0}
image: apache/druid:${TEST_DRUID_VERSION:-27.0.0}
container_name: middlemanager
volumes:
- ./storage:/opt/data
Expand All @@ -127,7 +127,7 @@ services:
start_period: 60s

router:
image: apache/druid:${TEST_DRUID_VERSION:-0.19.0}
image: apache/druid:${TEST_DRUID_VERSION:-27.0.0}
container_name: router
volumes:
- router_var:/opt/druid/var
Expand Down
21 changes: 18 additions & 3 deletions packages/cubejs-druid-driver/src/DruidClient.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import Axios, { AxiosRequestConfig } from 'axios';
import { DownloadQueryResultsResult } from '@cubejs-backend/base-driver';

export type DruidClientBaseConfiguration = {
user?: string,
Expand Down Expand Up @@ -41,11 +42,11 @@ export class DruidClient {
});
}

public async query<R = unknown>(query: string, parameters: { type: string, value: unknown }[]): Promise<R[]> {
public async query<R = unknown>(query: string, parameters: { type: string, value: unknown }[]): Promise<{ rows: R[], columns: Record<string, { sqlType: string }> | null }> {
let cancelled = false;
const cancelObj: any = {};

const promise: Promise<R[]> & { cancel?: () => void } = (async () => {
const promise: Promise<{ rows: R[], columns: any }> & { cancel?: () => void } = (async () => {
cancelObj.cancel = async () => {
cancelled = true;
};
Expand All @@ -57,6 +58,8 @@ export class DruidClient {
data: {
query,
parameters,
header: true,
sqlTypesHeader: true,
resultFormat: 'object',
},
});
Expand All @@ -67,7 +70,19 @@ export class DruidClient {
throw new Error('Query cancelled');
}

return response.data;
if (response.headers['x-druid-sql-header-included']) {
const [columns, ...rows] = response.data;

return {
columns,
rows
};
} else {
return {
columns: null,
rows: response.data,
};
}
} catch (e: any) {
if (cancelled) {
throw new Error('Query cancelled');
Expand Down
33 changes: 31 additions & 2 deletions packages/cubejs-druid-driver/src/DruidDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@ import {
getEnv,
assertDataSource,
} from '@cubejs-backend/shared';
import { BaseDriver, TableQueryResult } from '@cubejs-backend/base-driver';
import {
BaseDriver,
DownloadQueryResultsOptions,
DownloadQueryResultsResult,
TableQueryResult, TableStructure,
} from '@cubejs-backend/base-driver';
import { DruidClient, DruidClientBaseConfiguration, DruidClientConfiguration } from './DruidClient';
import { DruidQuery } from './DruidQuery';

Expand Down Expand Up @@ -105,7 +110,8 @@ export class DruidDriver extends BaseDriver {
}

public async query<R = unknown>(query: string, values: unknown[] = []): Promise<Array<R>> {
return this.client.query(query, this.normalizeQueryValues(values));
const result = await this.client.query<R>(query, this.normalizeQueryValues(values));
return result.rows;
}

public informationSchemaQuery() {
Expand All @@ -130,6 +136,29 @@ export class DruidDriver extends BaseDriver {
]);
}

public async downloadQueryResults(query: string, values: unknown[], _options: DownloadQueryResultsOptions): Promise<DownloadQueryResultsResult> {
const { rows, columns } = await this.client.query<any>(query, this.normalizeQueryValues(values));
if (!columns) {
throw new Error(
'You are using an old version of Druid. Unable to detect column types for pre-aggregation on empty values in readOnly mode.'
);
}

const types: TableStructure = [];

for (const [name, meta] of Object.entries(columns)) {
types.push({
name,
type: this.toGenericType(meta.sqlType.toLowerCase()),
});
}

return {
rows,
types,
};
}

protected normalizeQueryValues(values: unknown[]) {
return values.map((value) => ({
value,
Expand Down
28 changes: 27 additions & 1 deletion packages/cubejs-druid-driver/test/druid-driver.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import path from 'path';
import { DruidDriver, DruidDriverConfiguration } from '../src/DruidDriver';

describe('DruidDriver', () => {
let env: StartedDockerComposeEnvironment|null = null;
let env: StartedDockerComposeEnvironment | null = null;
let config: DruidDriverConfiguration;

const doWithDriver = async (callback: (driver: DruidDriver) => Promise<any>) => {
Expand All @@ -23,6 +23,8 @@ describe('DruidDriver', () => {

config = {
url: `http://${host}:${port}`,
user: 'admin',
password: 'password1',
};

return;
Expand Down Expand Up @@ -85,4 +87,28 @@ describe('DruidDriver', () => {
}]);
});
});

it('downloadQueryResults', async () => {
jest.setTimeout(10 * 1000);

return doWithDriver(async (driver) => {
const result = await driver.downloadQueryResults(
'SELECT 1 as id, true as finished, \'netherlands\' as country, CAST(\'2020-01-01T01:01:01.111Z\' as timestamp) as created UNION ALL SELECT 2 as id, false as finished, \'spain\' as country, CAST(\'2020-01-01T01:01:01.111Z\' as timestamp) as created',
[],
{ highWaterMark: 1 }
);
expect(result).toEqual({
rows: [
{ country: 'netherlands', created: '2020-01-01T01:01:01.111Z', finished: true, id: 1 },
{ country: 'spain', created: '2020-01-01T01:01:01.111Z', finished: false, id: 2 }
],
types: [
{ name: 'id', type: 'int' },
{ name: 'finished', type: 'boolean' },
{ name: 'country', type: 'text' },
{ name: 'created', type: 'timestamp' }
]
});
});
});
});

0 comments on commit 52fb634

Please sign in to comment.