Skip to content

Commit

Permalink
chore(cubestore): Upgrade DF: backport rolling window implementation …
Browse files Browse the repository at this point in the history
…and allow multiple ClusterSend nodes within plan to support multi-stage aggregations
  • Loading branch information
paveltiunov committed Feb 9, 2025
1 parent d4cc975 commit 45bf74e
Show file tree
Hide file tree
Showing 13 changed files with 3,895 additions and 460 deletions.
3 changes: 3 additions & 0 deletions packages/cubejs-backend-shared/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1695,6 +1695,9 @@ const variables: Record<string, (...args: any) => any> = {
cubeStoreNoHeartBeatTimeout: () => get('CUBEJS_CUBESTORE_NO_HEART_BEAT_TIMEOUT')
.default('30')
.asInt(),
cubeStoreRollingWindowJoin: () => get('CUBEJS_CUBESTORE_ROLLING_WINDOW_JOIN')
.default('false')
.asBoolStrict(),

allowUngroupedWithoutPrimaryKey: () => get('CUBEJS_ALLOW_UNGROUPED_WITHOUT_PRIMARY_KEY')
.default(get('CUBESQL_SQL_PUSH_DOWN').default('true').asString())
Expand Down
17 changes: 15 additions & 2 deletions packages/cubejs-schema-compiler/src/adapter/CubeStoreQuery.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import moment from 'moment-timezone';
import { parseSqlInterval } from '@cubejs-backend/shared';
import { parseSqlInterval, getEnv } from '@cubejs-backend/shared';
import { BaseQuery } from './BaseQuery';
import { BaseFilter } from './BaseFilter';
import { BaseMeasure } from './BaseMeasure';
Expand Down Expand Up @@ -30,6 +30,13 @@ type RollingWindow = {
};

export class CubeStoreQuery extends BaseQuery {
private readonly cubeStoreRollingWindowJoin: boolean;

public constructor(compilers, options) {
super(compilers, options);
this.cubeStoreRollingWindowJoin = getEnv('cubeStoreRollingWindowJoin');
}

public newFilter(filter) {
return new CubeStoreFilter(this, filter);
}
Expand All @@ -55,10 +62,16 @@ export class CubeStoreQuery extends BaseQuery {
}

public subtractInterval(date: string, interval: string) {
if (this.cubeStoreRollingWindowJoin) {
return super.subtractInterval(date, interval);
}
return `DATE_SUB(${date}, INTERVAL ${this.formatInterval(interval)})`;
}

public addInterval(date: string, interval: string) {
if (this.cubeStoreRollingWindowJoin) {
return super.addInterval(date, interval);
}
return `DATE_ADD(${date}, INTERVAL ${this.formatInterval(interval)})`;
}

Expand Down Expand Up @@ -179,7 +192,7 @@ export class CubeStoreQuery extends BaseQuery {
cumulativeMeasures: Array<[boolean, BaseMeasure]>,
preAggregationForQuery: any
) {
if (!cumulativeMeasures.length) {
if (this.cubeStoreRollingWindowJoin || !cumulativeMeasures.length) {
return super.regularAndTimeSeriesRollupQuery(regularMeasures, multipliedMeasures, cumulativeMeasures, preAggregationForQuery);
}
const cumulativeMeasuresWithoutMultiplied = cumulativeMeasures.map(([_, measure]) => measure);
Expand Down
2 changes: 2 additions & 0 deletions rust/cubestore/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 45bf74e

Please sign in to comment.