Skip to content

Commit

Permalink
feat(query-orchestrator): Debounce updates for table used/touch
Browse files Browse the repository at this point in the history
  • Loading branch information
ovr committed Nov 20, 2023
1 parent a5ff0f7 commit 20ed9b9
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 25 deletions.
75 changes: 75 additions & 0 deletions packages/cubejs-query-orchestrator/src/orchestrator/MemoryQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import { QueryCache } from './QueryCache';

export abstract class AbstractSetMemoryQueue {
protected readonly queue: Set<string> = new Set();

public constructor(
protected readonly capacity: number,
protected readonly concurrency: number,
) {

}

protected execution: boolean = false;

public addToQueue(item: string) {
this.queue.add(item);

if (this.queue.size > 100) {
console.log('Too large capacity', this.queue.size);

Check warning on line 19 in packages/cubejs-query-orchestrator/src/orchestrator/MemoryQueue.ts

View check run for this annotation

Codecov / codecov/patch

packages/cubejs-query-orchestrator/src/orchestrator/MemoryQueue.ts#L19

Added line #L19 was not covered by tests
}

this.run().catch(e => console.log(e));
}

public async run(): Promise<void> {
if (this.execution) {
return;
}

this.execution = true;

try {
let toExecute: string[] = [];

do {
for (const item of this.queue) {
toExecute.push(item);
this.queue.delete(item);

if (toExecute.length > this.concurrency) {
break;

Check warning on line 41 in packages/cubejs-query-orchestrator/src/orchestrator/MemoryQueue.ts

View check run for this annotation

Codecov / codecov/patch

packages/cubejs-query-orchestrator/src/orchestrator/MemoryQueue.ts#L41

Added line #L41 was not covered by tests
}
}

console.log('toExecute', toExecute.length, {
toExecute
});

await Promise.all(toExecute.map(async (item) => this.execute(item)));
toExecute = [];
} while (toExecute.length > 0);
} finally {
this.execution = false;
}
}

abstract execute(item: string): Promise<void>;
}

export class TableTouchMemoryQueue extends AbstractSetMemoryQueue {
public constructor(
capacity: number,
concurrency: number,
protected readonly queryCache: QueryCache,
protected readonly touchTablePersistTime: number
) {
super(capacity, concurrency);
}

public async execute(tableName: string): Promise<void> {
const key = this.queryCache.getKey('SQL_PRE_AGGREGATIONS_TABLES_TOUCH', tableName);
console.log('touch', key);
await this.queryCache.getCacheDriver().set(key, new Date().getTime(), this.touchTablePersistTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import { QueryQueue } from './QueryQueue';
import { LargeStreamWarning } from './StreamObjectsCounter';
import { CacheAndQueryDriverType } from './QueryOrchestrator';
import { RedisPool } from './RedisPool';
import { TableTouchMemoryQueue } from './MemoryQueue';

/// Name of the inline table containing the lambda rows.
export const LAMBDA_TABLE_PREFIX = 'lambda';
Expand Down Expand Up @@ -653,7 +654,7 @@ export class PreAggregationLoader {
if (versionEntryByContentVersion && !this.forceBuild) {
const targetTableName = this.targetTableName(versionEntryByContentVersion);
// No need to block here
this.updateLastTouch(targetTableName);
this.preAggregations.updateLastTouch(targetTableName);
return {
targetTableName,
refreshKeyValues: [],
Expand All @@ -672,7 +673,7 @@ export class PreAggregationLoader {
if (versionEntryByStructureVersion) {
const targetTableName = this.targetTableName(versionEntryByStructureVersion);
// No need to block here
this.updateLastTouch(targetTableName);
this.preAggregations.updateLastTouch(targetTableName);
return {
targetTableName,
refreshKeyValues: [],
Expand Down Expand Up @@ -712,7 +713,8 @@ export class PreAggregationLoader {
throw new Error(`Pre-aggregation table is not found for ${this.preAggregation.tableName} after it was successfully created`);
}
const targetTableName = this.targetTableName(lastVersion);
this.updateLastTouch(targetTableName);
// No need to block here
this.preAggregations.updateLastTouch(targetTableName);
return {
targetTableName,
refreshKeyValues: [],
Expand Down Expand Up @@ -743,7 +745,7 @@ export class PreAggregationLoader {
});
});
const targetTableName = this.targetTableName(newVersionEntry);
this.updateLastTouch(targetTableName);
this.preAggregations.updateLastTouch(targetTableName);

Check warning on line 748 in packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts

View check run for this annotation

Codecov / codecov/patch

packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts#L748

Added line #L748 was not covered by tests
return {
targetTableName,
refreshKeyValues: [],
Expand Down Expand Up @@ -791,7 +793,7 @@ export class PreAggregationLoader {
return mostRecentResult();
}
const targetTableName = this.targetTableName(versionEntry);
this.updateLastTouch(targetTableName);
this.preAggregations.updateLastTouch(targetTableName);
return {
targetTableName,
refreshKeyValues: [],
Expand All @@ -800,14 +802,6 @@ export class PreAggregationLoader {
};
}

private updateLastTouch(tableName: string) {
this.preAggregations.updateLastTouch(tableName).catch(e => {
this.logger('Error on pre-aggregation touch', {
error: (e.stack || e), preAggregation: this.preAggregation, requestId: this.requestId,
});
});
}

protected contentVersion(invalidationKeys) {
const versionArray = [this.preAggregation.structureVersionLoadSql || this.preAggregation.loadSql];
if (this.preAggregation.indexesSql && this.preAggregation.indexesSql.length) {
Expand Down Expand Up @@ -895,7 +889,7 @@ export class PreAggregationLoader {
}

public refresh(newVersionEntry: VersionEntry, invalidationKeys: InvalidationKeys, client) {
this.updateLastTouch(this.targetTableName(newVersionEntry));
this.preAggregations.updateLastTouch(this.targetTableName(newVersionEntry));
let refreshStrategy = this.refreshStoreInSourceStrategy;
if (this.preAggregation.external) {
const readOnly =
Expand Down Expand Up @@ -1971,6 +1965,9 @@ export class PreAggregations {

private readonly getQueueEventsBus: any;

private readonly touchQueue: TableTouchMemoryQueue;


Check failure on line 1970 in packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts

View workflow job for this annotation

GitHub Actions / lint

More than 1 blank line not allowed
public constructor(
private readonly redisPrefix: string,
private readonly driverFactory: DriverFactoryByDataSource,
Expand All @@ -1979,10 +1976,10 @@ export class PreAggregations {
options,
) {
this.options = options || {};

this.touchTablePersistTime = options.touchTablePersistTime || getEnv('touchPreAggregationTimeout');
this.touchQueue = new TableTouchMemoryQueue(16_768, 5, this.queryCache, this.touchTablePersistTime);
this.externalDriverFactory = options.externalDriverFactory;
this.structureVersionPersistTime = options.structureVersionPersistTime || 60 * 60 * 24 * 30;
this.touchTablePersistTime = options.touchTablePersistTime || getEnv('touchPreAggregationTimeout');
this.dropPreAggregationsWithoutTouch = options.dropPreAggregationsWithoutTouch || getEnv('dropPreAggregationsWithoutTouch');
this.usedTablePersistTime = options.usedTablePersistTime || getEnv('dbQueryTimeout');
this.externalRefresh = options.externalRefresh;
Expand All @@ -1994,17 +1991,13 @@ export class PreAggregations {
return this.queryCache.getKey('SQL_PRE_AGGREGATIONS_TABLES_USED', tableName);
}

protected tablesTouchRedisKey(tableName) {
// TODO add dataSource?
return this.queryCache.getKey('SQL_PRE_AGGREGATIONS_TABLES_TOUCH', tableName);
}

protected refreshEndReachedKey() {
// TODO add dataSource?
return this.queryCache.getKey('SQL_PRE_AGGREGATIONS_REFRESH_END_REACHED', '');
}

public async addTableUsed(tableName) {
console.log('tableUsed', tableName);
return this.queryCache.getCacheDriver().set(this.tablesUsedRedisKey(tableName), true, this.usedTablePersistTime);
}

Expand All @@ -2013,13 +2006,16 @@ export class PreAggregations {
.map(k => k.replace(this.tablesUsedRedisKey(''), ''));
}

public async updateLastTouch(tableName) {
return this.queryCache.getCacheDriver().set(this.tablesTouchRedisKey(tableName), new Date().getTime(), this.touchTablePersistTime);
public updateLastTouch(tableName: string) {
this.touchQueue.addToQueue(tableName);
}

public async tablesTouched() {
return (await this.queryCache.getCacheDriver().keysStartingWith(this.tablesTouchRedisKey('')))
.map(k => k.replace(this.tablesTouchRedisKey(''), ''));
// TODO add dataSource?
const key = this.queryCache.getKey('SQL_PRE_AGGREGATIONS_TABLES_TOUCH', '');

Check warning on line 2015 in packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts

View check run for this annotation

Codecov / codecov/patch

packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts#L2015

Added line #L2015 was not covered by tests

return (await this.queryCache.getCacheDriver().keysStartingWith(key))
.map(k => k.replace(key, ''));

Check warning on line 2018 in packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts

View check run for this annotation

Codecov / codecov/patch

packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts#L2017-L2018

Added lines #L2017 - L2018 were not covered by tests
}

public async updateRefreshEndReached() {
Expand Down

0 comments on commit 20ed9b9

Please sign in to comment.