From 3f38afe39ee5e1212dcc068af2894c18ab4fc0b2 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Thu, 30 Jan 2025 20:05:22 +0200 Subject: [PATCH 1/7] add CUBEJS_SCHEDULED_REFRESH_QUERIES_PER_APP_ID and deprecate CUBEJS_SCHEDULED_REFRESH_CONCURRENCY --- packages/cubejs-backend-shared/src/env.ts | 23 +++++++++++++++++++ .../src/core/OptsHandler.ts | 7 ++---- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/packages/cubejs-backend-shared/src/env.ts b/packages/cubejs-backend-shared/src/env.ts index cc5417eac04ab..b644cdaf502a4 100644 --- a/packages/cubejs-backend-shared/src/env.ts +++ b/packages/cubejs-backend-shared/src/env.ts @@ -170,6 +170,29 @@ const variables: Record any> = { // It's true by default for development return process.env.NODE_ENV !== 'production'; }, + scheduledRefreshQueriesPerAppId: () => { + const refreshQueries = get('CUBEJS_SCHEDULED_REFRESH_QUERIES_PER_APP_ID').asIntPositive(); + + if (refreshQueries) { + return refreshQueries; + } + + console.warn( + 'The CUBEJS_SCHEDULED_REFRESH_CONCURRENCY is deprecated. Please, use the CUBEJS_SCHEDULED_REFRESH_QUERIES_PER_APP_ID instead.' + ); + return get('CUBEJS_SCHEDULED_REFRESH_CONCURRENCY').asIntPositive(); + }, + refreshWorkerConcurrency: () => get('CUBEJS_REFRESH_WORKER_CONCURRENCY') + .default(5) + .asIntPositive(), + // eslint-disable-next-line consistent-return + scheduledRefreshTimezones: () => { + const timezones = get('CUBEJS_SCHEDULED_REFRESH_TIMEZONES').asString(); + + if (timezones) { + return timezones.split(',').map(t => t.trim()); + } + }, preAggregationsBuilder: () => get('CUBEJS_PRE_AGGREGATIONS_BUILDER').asBool(), gracefulShutdown: () => get('CUBEJS_GRACEFUL_SHUTDOWN') .asIntPositive(), diff --git a/packages/cubejs-server-core/src/core/OptsHandler.ts b/packages/cubejs-server-core/src/core/OptsHandler.ts index 55731d1aa0c9b..3c04db9e39a7b 100644 --- a/packages/cubejs-server-core/src/core/OptsHandler.ts +++ b/packages/cubejs-server-core/src/core/OptsHandler.ts @@ -453,15 +453,12 @@ export class OptsHandler { externalDialectFactory, apiSecret: process.env.CUBEJS_API_SECRET, telemetry: getEnv('telemetry'), - scheduledRefreshTimeZones: - process.env.CUBEJS_SCHEDULED_REFRESH_TIMEZONES && - process.env.CUBEJS_SCHEDULED_REFRESH_TIMEZONES.split(',').map(t => t.trim()), + scheduledRefreshTimeZones: getEnv('scheduledRefreshTimezones'), scheduledRefreshContexts: async () => [null], basePath: '/cubejs-api', dashboardAppPath: 'dashboard-app', dashboardAppPort: 3000, - scheduledRefreshConcurrency: - parseInt(process.env.CUBEJS_SCHEDULED_REFRESH_CONCURRENCY, 10), + scheduledRefreshConcurrency: getEnv('scheduledRefreshQueriesPerAppId'), scheduledRefreshBatchSize: getEnv('scheduledRefreshBatchSize'), preAggregationsSchema: getEnv('preAggregationsSchema') || From 9c5a9f37b517f78ee5251e26a4d23b60d688ae29 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Thu, 30 Jan 2025 20:12:08 +0200 Subject: [PATCH 2/7] =?UTF-8?q?increase=20DefaultConcurrency=20settings=20?= =?UTF-8?q?for=C2=A0drivers?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/cubejs-athena-driver/src/AthenaDriver.ts | 2 +- packages/cubejs-clickhouse-driver/src/ClickHouseDriver.ts | 2 +- packages/cubejs-databricks-jdbc-driver/src/DatabricksDriver.ts | 2 +- packages/cubejs-firebolt-driver/src/FireboltDriver.ts | 2 +- packages/cubejs-pinot-driver/src/PinotDriver.ts | 2 +- packages/cubejs-redshift-driver/src/RedshiftDriver.ts | 2 +- packages/cubejs-snowflake-driver/src/SnowflakeDriver.ts | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) diff --git a/packages/cubejs-athena-driver/src/AthenaDriver.ts b/packages/cubejs-athena-driver/src/AthenaDriver.ts index fe60a1dd71722..aaea16c04a275 100644 --- a/packages/cubejs-athena-driver/src/AthenaDriver.ts +++ b/packages/cubejs-athena-driver/src/AthenaDriver.ts @@ -76,7 +76,7 @@ export class AthenaDriver extends BaseDriver implements DriverInterface { * Returns default concurrency value. */ public static getDefaultConcurrency(): number { - return 5; + return 10; } private config: AthenaDriverOptionsInitialized; diff --git a/packages/cubejs-clickhouse-driver/src/ClickHouseDriver.ts b/packages/cubejs-clickhouse-driver/src/ClickHouseDriver.ts index 6ef8ba7cd93ad..19a2f037c2e91 100644 --- a/packages/cubejs-clickhouse-driver/src/ClickHouseDriver.ts +++ b/packages/cubejs-clickhouse-driver/src/ClickHouseDriver.ts @@ -119,7 +119,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface { * Returns default concurrency value. */ public static getDefaultConcurrency(): number { - return 5; + return 10; } // ClickHouseClient has internal pool of several sockets, no need for generic-pool diff --git a/packages/cubejs-databricks-jdbc-driver/src/DatabricksDriver.ts b/packages/cubejs-databricks-jdbc-driver/src/DatabricksDriver.ts index d3754207a3d07..d79ca9a0ec1d7 100644 --- a/packages/cubejs-databricks-jdbc-driver/src/DatabricksDriver.ts +++ b/packages/cubejs-databricks-jdbc-driver/src/DatabricksDriver.ts @@ -147,7 +147,7 @@ export class DatabricksDriver extends JDBCDriver { * Returns default concurrency value. */ public static getDefaultConcurrency(): number { - return 2; + return 10; } /** diff --git a/packages/cubejs-firebolt-driver/src/FireboltDriver.ts b/packages/cubejs-firebolt-driver/src/FireboltDriver.ts index 10ce708a2d5d8..a9ccc9e4161b3 100644 --- a/packages/cubejs-firebolt-driver/src/FireboltDriver.ts +++ b/packages/cubejs-firebolt-driver/src/FireboltDriver.ts @@ -46,7 +46,7 @@ export class FireboltDriver extends BaseDriver implements DriverInterface { * Returns default concurrency value. */ public static getDefaultConcurrency(): number { - return 5; + return 10; } private config: FireboltDriverConfiguration; diff --git a/packages/cubejs-pinot-driver/src/PinotDriver.ts b/packages/cubejs-pinot-driver/src/PinotDriver.ts index c9f96de3f02b6..be02426001acf 100644 --- a/packages/cubejs-pinot-driver/src/PinotDriver.ts +++ b/packages/cubejs-pinot-driver/src/PinotDriver.ts @@ -74,7 +74,7 @@ export class PinotDriver extends BaseDriver implements DriverInterface { * Returns default concurrency value. */ public static getDefaultConcurrency() { - return 2; + return 10; } private config: PinotDriverConfiguration; diff --git a/packages/cubejs-redshift-driver/src/RedshiftDriver.ts b/packages/cubejs-redshift-driver/src/RedshiftDriver.ts index ecc39bd31e167..5ce9870cd331d 100644 --- a/packages/cubejs-redshift-driver/src/RedshiftDriver.ts +++ b/packages/cubejs-redshift-driver/src/RedshiftDriver.ts @@ -57,7 +57,7 @@ export class RedshiftDriver extends PostgresDriver * Returns default concurrency value. */ public static getDefaultConcurrency(): number { - return 4; + return 5; } /** diff --git a/packages/cubejs-snowflake-driver/src/SnowflakeDriver.ts b/packages/cubejs-snowflake-driver/src/SnowflakeDriver.ts index b8422155af569..8561e24b2052d 100644 --- a/packages/cubejs-snowflake-driver/src/SnowflakeDriver.ts +++ b/packages/cubejs-snowflake-driver/src/SnowflakeDriver.ts @@ -195,7 +195,7 @@ export class SnowflakeDriver extends BaseDriver implements DriverInterface { * Returns default concurrency value. */ public static getDefaultConcurrency(): number { - return 5; + return 8; } public static driverEnvVariables() { From a52032c1396aef3d3ec9b9f7c57eaa0660060083 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Fri, 31 Jan 2025 17:51:40 +0200 Subject: [PATCH 3/7] =?UTF-8?q?use=20refreshWorkerConcurrency=20only=20for?= =?UTF-8?q?=C2=A0refresh=20workers?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/cubejs-server-core/src/core/OptsHandler.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/cubejs-server-core/src/core/OptsHandler.ts b/packages/cubejs-server-core/src/core/OptsHandler.ts index 3c04db9e39a7b..cd963a92ca51f 100644 --- a/packages/cubejs-server-core/src/core/OptsHandler.ts +++ b/packages/cubejs-server-core/src/core/OptsHandler.ts @@ -458,7 +458,9 @@ export class OptsHandler { basePath: '/cubejs-api', dashboardAppPath: 'dashboard-app', dashboardAppPort: 3000, - scheduledRefreshConcurrency: getEnv('scheduledRefreshQueriesPerAppId'), + scheduledRefreshConcurrency: getEnv('refreshWorkerMode') + ? getEnv('refreshWorkerConcurrency') + : getEnv('scheduledRefreshQueriesPerAppId'), scheduledRefreshBatchSize: getEnv('scheduledRefreshBatchSize'), preAggregationsSchema: getEnv('preAggregationsSchema') || From 6740e00731449768a0400bac5b073ad815e53816 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Fri, 7 Feb 2025 12:40:57 +0200 Subject: [PATCH 4/7] =?UTF-8?q?correct=20warn=20flow=20for=C2=A0refreshCon?= =?UTF-8?q?currency?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/cubejs-backend-shared/src/env.ts | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/packages/cubejs-backend-shared/src/env.ts b/packages/cubejs-backend-shared/src/env.ts index b644cdaf502a4..2f8760f412c35 100644 --- a/packages/cubejs-backend-shared/src/env.ts +++ b/packages/cubejs-backend-shared/src/env.ts @@ -177,13 +177,17 @@ const variables: Record any> = { return refreshQueries; } - console.warn( - 'The CUBEJS_SCHEDULED_REFRESH_CONCURRENCY is deprecated. Please, use the CUBEJS_SCHEDULED_REFRESH_QUERIES_PER_APP_ID instead.' - ); - return get('CUBEJS_SCHEDULED_REFRESH_CONCURRENCY').asIntPositive(); + const refreshConcurrency = get('CUBEJS_SCHEDULED_REFRESH_CONCURRENCY').asIntPositive(); + + if (refreshConcurrency) { + console.warn( + 'The CUBEJS_SCHEDULED_REFRESH_CONCURRENCY is deprecated. Please, use the CUBEJS_SCHEDULED_REFRESH_QUERIES_PER_APP_ID instead.' + ); + } + + return refreshConcurrency; }, refreshWorkerConcurrency: () => get('CUBEJS_REFRESH_WORKER_CONCURRENCY') - .default(5) .asIntPositive(), // eslint-disable-next-line consistent-return scheduledRefreshTimezones: () => { From e04afad0c082efb01d63d87311c96cda9d856c7c Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Fri, 7 Feb 2025 12:41:29 +0200 Subject: [PATCH 5/7] correct queueOptionsWrapper flow --- .../cubejs-server-core/src/core/OptsHandler.ts | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/packages/cubejs-server-core/src/core/OptsHandler.ts b/packages/cubejs-server-core/src/core/OptsHandler.ts index cd963a92ca51f..c0d79990d945a 100644 --- a/packages/cubejs-server-core/src/core/OptsHandler.ts +++ b/packages/cubejs-server-core/src/core/OptsHandler.ts @@ -287,6 +287,7 @@ export class OptsHandler { private queueOptionsWrapper( context: RequestContext, queueOptions: unknown | ((dataSource?: string) => QueueOptions), + queueType: 'query' | 'pre-aggs', ): (dataSource?: string) => Promise { return async (dataSource = 'default') => { const options = ( @@ -298,6 +299,14 @@ export class OptsHandler { // concurrency specified in cube.js return options; } else { + const workerConcurrency = getEnv('refreshWorkerConcurrency'); + if (queueType === 'pre-aggs' && workerConcurrency) { + return { + ...options, + concurrency: workerConcurrency, + }; + } + const envConcurrency: number = getEnv('concurrency', { dataSource }); if (envConcurrency) { // concurrency specified in CUBEJS_CONCURRENCY @@ -320,7 +329,7 @@ export class OptsHandler { // no specified concurrency return { ...options, - concurrency: 2, + concurrency: 5, }; } } @@ -458,9 +467,7 @@ export class OptsHandler { basePath: '/cubejs-api', dashboardAppPath: 'dashboard-app', dashboardAppPort: 3000, - scheduledRefreshConcurrency: getEnv('refreshWorkerMode') - ? getEnv('refreshWorkerConcurrency') - : getEnv('scheduledRefreshQueriesPerAppId'), + scheduledRefreshConcurrency: getEnv('scheduledRefreshQueriesPerAppId'), scheduledRefreshBatchSize: getEnv('scheduledRefreshBatchSize'), preAggregationsSchema: getEnv('preAggregationsSchema') || @@ -661,6 +668,7 @@ export class OptsHandler { clone.queryCacheOptions.queueOptions = this.queueOptionsWrapper( context, clone.queryCacheOptions.queueOptions, + 'query' ); // pre-aggs queue options @@ -668,6 +676,7 @@ export class OptsHandler { clone.preAggregationsOptions.queueOptions = this.queueOptionsWrapper( context, clone.preAggregationsOptions.queueOptions, + 'pre-aggs' ); // pre-aggs external refresh flag (force to run pre-aggs build flow first if From 5cd39907aa8b536f87effda79a43a381b07e6afe Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Fri, 7 Feb 2025 14:12:14 +0200 Subject: [PATCH 6/7] fix unit tests --- .../cubejs-server-core/test/unit/OptsHandler.test.ts | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/packages/cubejs-server-core/test/unit/OptsHandler.test.ts b/packages/cubejs-server-core/test/unit/OptsHandler.test.ts index 3d37757679c8b..e37dc382b09e1 100644 --- a/packages/cubejs-server-core/test/unit/OptsHandler.test.ts +++ b/packages/cubejs-server-core/test/unit/OptsHandler.test.ts @@ -525,13 +525,13 @@ describe('OptsHandler class', () => { expect(opts.queryCacheOptions.queueOptions).toBeDefined(); expect(typeof opts.queryCacheOptions.queueOptions).toEqual('function'); expect(await opts.queryCacheOptions.queueOptions()).toEqual({ - concurrency: 2, + concurrency: 5, }); expect(opts.preAggregationsOptions.queueOptions).toBeDefined(); expect(typeof opts.preAggregationsOptions.queueOptions).toEqual('function'); expect(await opts.preAggregationsOptions.queueOptions()).toEqual({ - concurrency: 2, + concurrency: 5, }); } ); @@ -555,13 +555,13 @@ describe('OptsHandler class', () => { expect(opts.queryCacheOptions.queueOptions).toBeDefined(); expect(typeof opts.queryCacheOptions.queueOptions).toEqual('function'); expect(await opts.queryCacheOptions.queueOptions()).toEqual({ - concurrency: 2, + concurrency: 5, }); expect(opts.preAggregationsOptions.queueOptions).toBeDefined(); expect(typeof opts.preAggregationsOptions.queueOptions).toEqual('function'); expect(await opts.preAggregationsOptions.queueOptions()).toEqual({ - concurrency: 2, + concurrency: 5, }); } ); @@ -585,13 +585,13 @@ describe('OptsHandler class', () => { expect(opts.queryCacheOptions.queueOptions).toBeDefined(); expect(typeof opts.queryCacheOptions.queueOptions).toEqual('function'); expect(await opts.queryCacheOptions.queueOptions()).toEqual({ - concurrency: 2, + concurrency: 5, }); expect(opts.preAggregationsOptions.queueOptions).toBeDefined(); expect(typeof opts.preAggregationsOptions.queueOptions).toEqual('function'); expect(await opts.preAggregationsOptions.queueOptions()).toEqual({ - concurrency: 2, + concurrency: 5, }); } ); From beaebe18dd8edb267ecaa0a98d3d922c31d09cf4 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Fri, 7 Feb 2025 16:15:00 +0200 Subject: [PATCH 7/7] =?UTF-8?q?add=20tests=20for=C2=A0CUBEJS=5FREFRESH=5FW?= =?UTF-8?q?ORKER=5FCONCURRENCY?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../test/unit/OptsHandler.test.ts | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/packages/cubejs-server-core/test/unit/OptsHandler.test.ts b/packages/cubejs-server-core/test/unit/OptsHandler.test.ts index e37dc382b09e1..306b810192ec0 100644 --- a/packages/cubejs-server-core/test/unit/OptsHandler.test.ts +++ b/packages/cubejs-server-core/test/unit/OptsHandler.test.ts @@ -658,6 +658,41 @@ describe('OptsHandler class', () => { } ); + test( + 'must configure queueOptions with empty orchestratorOptions function, ' + + 'with CUBEJS_REFRESH_WORKER_CONCURRENCY, CUBEJS_CONCURRENCY and with default driver concurrency', + async () => { + process.env.CUBEJS_CONCURRENCY = '11'; + process.env.CUBEJS_REFRESH_WORKER_CONCURRENCY = '22'; + process.env.CUBEJS_DB_TYPE = 'postgres'; + + const core = new CubejsServerCoreExposed({ + ...conf, + dbType: undefined, + driverFactory: () => ({ type: process.env.CUBEJS_DB_TYPE }), + orchestratorOptions: () => ({}), + }); + + const opts = ( await core.getOrchestratorApi({})).options; + + expect(opts.queryCacheOptions.queueOptions).toBeDefined(); + expect(typeof opts.queryCacheOptions.queueOptions).toEqual('function'); + expect(await opts.queryCacheOptions.queueOptions()).toEqual({ + concurrency: parseInt(process.env.CUBEJS_CONCURRENCY, 10), + }); + + expect(opts.preAggregationsOptions.queueOptions).toBeDefined(); + expect(typeof opts.preAggregationsOptions.queueOptions).toEqual('function'); + expect(await opts.preAggregationsOptions.queueOptions()).toEqual({ + concurrency: parseInt(process.env.CUBEJS_REFRESH_WORKER_CONCURRENCY, 10), + }); + + delete process.env.CUBEJS_CONCURRENCY; + delete process.env.CUBEJS_REFRESH_WORKER_CONCURRENCY; + delete process.env.CUBEJS_DB_TYPE; + } + ); + test( 'multi data source concurrency', async () => {