Skip to content

Commit

Permalink
feat(schema-compiler): Move transpiling to worker threads (under the …
Browse files Browse the repository at this point in the history
…flag) (#9188)

* move js transpilation to worker threads

* hide worker threads transpile under the flag

* add useful comment

* tune Push CI to test with CUBEJS_WORKER_THREADS_TRANSPILATION=true

* add workerpool pkg

* use workerpool pkg

* add max workers cfg

* update env in ci

* remove obsolete stuff
  • Loading branch information
KSDaemon authored Feb 6, 2025
1 parent 0dbf3bb commit 7451452
Show file tree
Hide file tree
Showing 13 changed files with 264 additions and 356 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,11 @@ jobs:
# Current docker version + next LTS
node-version: [20.x, 22.x]
python-version: [3.11]
transpile-worker-threads: [false, true]
fail-fast: false

env:
CUBEJS_TRANSPILATION_WORKER_THREADS: ${{ matrix.transpile-worker-threads }}
steps:
- id: get-tag-out
run: echo "$OUT"
Expand Down
6 changes: 6 additions & 0 deletions packages/cubejs-backend-shared/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ const variables: Record<string, (...args: any) => any> = {
nativeOrchestrator: () => get('CUBEJS_TESSERACT_ORCHESTRATOR')
.default('false')
.asBoolStrict(),
transpilationWorkerThreads: () => get('CUBEJS_TRANSPILATION_WORKER_THREADS')
.default('false')
.asBoolStrict(),
transpilationWorkerThreadsCount: () => get('CUBEJS_TRANSPILATION_WORKER_THREADS_COUNT')
.default('0')
.asInt(),

/** ****************************************************************
* Common db options *
Expand Down
3 changes: 2 additions & 1 deletion packages/cubejs-schema-compiler/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@
"node-dijkstra": "^2.5.0",
"ramda": "^0.27.2",
"syntax-error": "^1.3.0",
"uuid": "^8.3.2"
"uuid": "^8.3.2",
"workerpool": "^9.2.0"
},
"devDependencies": {
"@clickhouse/client": "^1.7.0",
Expand Down
4 changes: 2 additions & 2 deletions packages/cubejs-schema-compiler/src/compiler/CubeSymbols.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { camelizeCube } from './utils';
import { BaseQuery } from '../adapter';

const FunctionRegex = /function\s+\w+\(([A-Za-z0-9_,]*)|\(([\s\S]*?)\)\s*=>|\(?(\w+)\)?\s*=>/;
const CONTEXT_SYMBOLS = {
export const CONTEXT_SYMBOLS = {
SECURITY_CONTEXT: 'securityContext',
// SECURITY_CONTEXT has been deprecated, however security_context (lowecase)
// is allowed in RBAC policies for query-time attribute matching
Expand All @@ -19,7 +19,7 @@ const CONTEXT_SYMBOLS = {
SQL_UTILS: 'sqlUtils'
};

const CURRENT_CUBE_CONSTANTS = ['CUBE', 'TABLE'];
export const CURRENT_CUBE_CONSTANTS = ['CUBE', 'TABLE'];

export class CubeSymbols {
constructor(evaluateViews) {
Expand Down
101 changes: 78 additions & 23 deletions packages/cubejs-schema-compiler/src/compiler/DataSchemaCompiler.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import { parse } from '@babel/parser';
import babelGenerator from '@babel/generator';
import babelTraverse from '@babel/traverse';
import R from 'ramda';
import workerpool from 'workerpool';

import { isNativeSupported } from '@cubejs-backend/shared';
import { getEnv, isNativeSupported } from '@cubejs-backend/shared';
import { UserError } from './UserError';
import { ErrorReporter } from './ErrorReporter';

Expand All @@ -28,6 +29,8 @@ export class DataSchemaCompiler {
this.viewCompilationGate = options.viewCompilationGate;
this.cubeNameCompilers = options.cubeNameCompilers || [];
this.extensions = options.extensions || {};
this.cubeDictionary = options.cubeDictionary;
this.cubeSymbols = options.cubeSymbols;
this.cubeFactory = options.cubeFactory;
this.filesToCompile = options.filesToCompile;
this.omitErrors = options.omitErrors;
Expand All @@ -40,6 +43,7 @@ export class DataSchemaCompiler {
this.yamlCompiler = options.yamlCompiler;
this.yamlCompiler.dataSchemaCompiler = this;
this.pythonContext = null;
this.workerPool = null;
}

compileObjects(compileServices, objects, errorsReport) {
Expand Down Expand Up @@ -89,10 +93,40 @@ export class DataSchemaCompiler {
const errorsReport = new ErrorReporter(null, [], this.errorReport);
this.errorsReport = errorsReport;

// TODO: required in order to get pre transpile compilation work
const transpile = () => toCompile.map(f => this.transpileFile(f, errorsReport)).filter(f => !!f);
if (getEnv('transpilationWorkerThreads')) {
const wc = getEnv('transpilationWorkerThreadsCount');
this.workerPool = workerpool.pool(
path.join(__dirname, 'transpilers/transpiler_worker'),
wc > 0 ? { maxWorkers: wc } : undefined,
);
}

const transpile = async () => {
let cubeNames;
let cubeSymbolsNames;

if (getEnv('transpilationWorkerThreads')) {
cubeNames = Object.keys(this.cubeDictionary.byId);
// We need only cubes and all its member names for transpiling.
// Cubes doesn't change during transpiling, but are changed during compilation phase,
// so we can prepare them once for every phase.
// Communication between main and worker threads uses
// The structured clone algorithm (@see https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm)
// which doesn't allow passing any function objects, so we need to sanitize the symbols.
cubeSymbolsNames = Object.fromEntries(
Object.entries(this.cubeSymbols.symbols)
.map(
([key, value]) => [key, Object.fromEntries(
Object.keys(value).map((k) => [k, true]),
)],
),
);
}
const results = await Promise.all(toCompile.map(f => this.transpileFile(f, errorsReport, { cubeNames, cubeSymbolsNames })));
return results.filter(f => !!f);
};

const compilePhase = (compilers) => this.compileCubeFiles(compilers, transpile(), errorsReport);
const compilePhase = async (compilers) => this.compileCubeFiles(compilers, await transpile(), errorsReport);

return compilePhase({ cubeCompilers: this.cubeNameCompilers })
.then(() => compilePhase({ cubeCompilers: this.preTranspileCubeCompilers.concat([this.viewCompilationGate]) }))
Expand All @@ -102,7 +136,12 @@ export class DataSchemaCompiler {
.then(() => compilePhase({
cubeCompilers: this.cubeCompilers,
contextCompilers: this.contextCompilers,
}));
}))
.then(() => {
if (this.workerPool) {
this.workerPool.terminate();
}
});
}

compile() {
Expand All @@ -118,7 +157,7 @@ export class DataSchemaCompiler {
return this.compilePromise;
}

transpileFile(file, errorsReport) {
async transpileFile(file, errorsReport, options) {
if (R.endsWith('.jinja', file.fileName) ||
(R.endsWith('.yml', file.fileName) || R.endsWith('.yaml', file.fileName))
// TODO do Jinja syntax check with jinja compiler
Expand All @@ -137,31 +176,47 @@ export class DataSchemaCompiler {
} else if (R.endsWith('.yml', file.fileName) || R.endsWith('.yaml', file.fileName)) {
return file;
} else if (R.endsWith('.js', file.fileName)) {
return this.transpileJsFile(file, errorsReport);
return this.transpileJsFile(file, errorsReport, options);
} else {
return file;
}
}

transpileJsFile(file, errorsReport) {
async transpileJsFile(file, errorsReport, { cubeNames, cubeSymbolsNames }) {
try {
const ast = parse(
file.content,
{
sourceFilename: file.fileName,
sourceType: 'module',
plugins: ['objectRestSpread']
},
);
if (getEnv('transpilationWorkerThreads')) {
const data = {
fileName: file.fileName,
content: file.content,
transpilers: this.transpilers.map(t => t.constructor.name),
cubeNames,
cubeSymbolsNames,
};

const res = await this.workerPool.exec('transpile', [data]);
errorsReport.addErrors(res.errors);
errorsReport.addWarnings(res.warnings);

return Object.assign({}, file, { content: res.content });
} else {
const ast = parse(
file.content,
{
sourceFilename: file.fileName,
sourceType: 'module',
plugins: ['objectRestSpread'],
},
);

this.transpilers.forEach((t) => {
errorsReport.inFile(file);
babelTraverse(ast, t.traverseObject(errorsReport));
errorsReport.exitFile();
});
this.transpilers.forEach((t) => {
errorsReport.inFile(file);
babelTraverse(ast, t.traverseObject(errorsReport));
errorsReport.exitFile();
});

const content = babelGenerator(ast, {}, file.content).code;
return Object.assign({}, file, { content });
const content = babelGenerator(ast, {}, file.content).code;
return Object.assign({}, file, { content });
}
} catch (e) {
if (e.toString().indexOf('SyntaxError') !== -1) {
const line = file.content.split('\n')[e.loc.line - 1];
Expand Down
16 changes: 16 additions & 0 deletions packages/cubejs-schema-compiler/src/compiler/ErrorReporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,22 @@ export class ErrorReporter {
}
}

public getErrors() {
return this.rootReporter().errors;
}

public addErrors(errors: CompilerErrorInterface[]) {
this.rootReporter().errors.push(...errors);
}

public getWarnings() {
return this.rootReporter().warnings;
}

public addWarnings(warnings: SyntaxErrorInterface[]) {
this.rootReporter().warnings.push(...warnings);
}

protected rootReporter(): ErrorReporter {
return this.parent ? this.parent.rootReporter() : this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ export const prepareCompiler = (repo: SchemaFileRepository, options: PrepareComp
contextCompilers: [contextEvaluator],
cubeFactory: cubeSymbols.createCube.bind(cubeSymbols),
compilerCache,
cubeDictionary,
cubeSymbols,
extensions: {
Funnels,
RefreshKeys,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@ import * as t from '@babel/types';
import R from 'ramda';

import type { NodePath } from '@babel/traverse';
import type { TranspilerInterface, TraverseObject } from './transpiler.interface';
import {
TranspilerCubeResolver,
TranspilerInterface,
TranspilerSymbolResolver,
TraverseObject
} from './transpiler.interface';
import type { CubeSymbols } from '../CubeSymbols';
import type { CubeDictionary } from '../CubeDictionary';

Expand Down Expand Up @@ -39,9 +44,9 @@ transpiledFieldsPatterns?.forEach((r) => {

export class CubePropContextTranspiler implements TranspilerInterface {
public constructor(
protected readonly cubeSymbols: CubeSymbols,
protected readonly cubeDictionary: CubeDictionary,
protected readonly viewCompiler: CubeSymbols,
protected readonly cubeSymbols: TranspilerSymbolResolver,
protected readonly cubeDictionary: TranspilerCubeResolver,
protected readonly viewCompiler: TranspilerSymbolResolver,
) {
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { TranspilerCubeResolver } from './transpiler.interface';

export class LightweightNodeCubeDictionary implements TranspilerCubeResolver {
public constructor(private cubeNames: string[] = []) {
}

public resolveCube(name: string): boolean {
return this.cubeNames.includes(name);
}

public setCubeNames(cubeNames: string[]): void {
this.cubeNames = cubeNames;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { TranspilerSymbolResolver } from './transpiler.interface';
import { CONTEXT_SYMBOLS, CURRENT_CUBE_CONSTANTS } from '../CubeSymbols';

type CubeSymbols = Record<string, Record<string, boolean>>;

export class LightweightSymbolResolver implements TranspilerSymbolResolver {
public constructor(private symbols: CubeSymbols = {}) {
}

public setSymbols(symbols: CubeSymbols) {
this.symbols = symbols;
}

public isCurrentCube(name): boolean {
return CURRENT_CUBE_CONSTANTS.indexOf(name) >= 0;
}

public resolveSymbol(cubeName, name): any {
if (name === 'USER_CONTEXT') {
throw new Error('Support for USER_CONTEXT was removed, please migrate to SECURITY_CONTEXT.');
}

if (CONTEXT_SYMBOLS[name]) {
return true;
}

const cube = this.symbols[this.isCurrentCube(name) ? cubeName : name];
return cube || (this.symbols[cubeName] && this.symbols[cubeName][name]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,12 @@ export type TraverseObject = TraverseOptions;
export interface TranspilerInterface {
traverseObject(reporter: ErrorReporter): TraverseObject;
}

export interface TranspilerSymbolResolver {
resolveSymbol(cubeName, name): any;
isCurrentCube(name): boolean;
}

export interface TranspilerCubeResolver {
resolveCube(name): boolean;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import workerpool from 'workerpool';
import { parse } from '@babel/parser';
import babelGenerator from '@babel/generator';
import babelTraverse from '@babel/traverse';

import { ValidationTranspiler } from './ValidationTranspiler';
import { ImportExportTranspiler } from './ImportExportTranspiler';
import { CubeCheckDuplicatePropTranspiler } from './CubeCheckDuplicatePropTranspiler';
import { CubePropContextTranspiler } from './CubePropContextTranspiler';
import { ErrorReporter } from '../ErrorReporter';
import { LightweightSymbolResolver } from './LightweightSymbolResolver';
import { LightweightNodeCubeDictionary } from './LightweightNodeCubeDictionary';

type TransferContent = {
fileName: string;
content: string;
transpilers: string[];
cubeNames: string[];
cubeSymbolsNames: Record<string, Record<string, boolean>>;
};

const cubeDictionary = new LightweightNodeCubeDictionary();
const cubeSymbols = new LightweightSymbolResolver();
const errorsReport = new ErrorReporter(null, []);

const transpilers = {
ValidationTranspiler: new ValidationTranspiler(),
ImportExportTranspiler: new ImportExportTranspiler(),
CubeCheckDuplicatePropTranspiler: new CubeCheckDuplicatePropTranspiler(),
CubePropContextTranspiler: new CubePropContextTranspiler(cubeSymbols, cubeDictionary, cubeSymbols),
};

const transpile = (data: TransferContent) => {
cubeDictionary.setCubeNames(data.cubeNames);
cubeSymbols.setSymbols(data.cubeSymbolsNames);

const ast = parse(
data.content,
{
sourceFilename: data.fileName,
sourceType: 'module',
plugins: ['objectRestSpread']
},
);

data.transpilers.forEach(transpilerName => {
if (transpilers[transpilerName]) {
errorsReport.inFile(data);
babelTraverse(ast, transpilers[transpilerName].traverseObject(errorsReport));
errorsReport.exitFile();
} else {
throw new Error(`Transpiler ${transpilerName} not supported`);
}
});

const content = babelGenerator(ast, {}, data.content).code;

return {
content,
errors: errorsReport.getErrors(),
warnings: errorsReport.getWarnings()
};
};

workerpool.worker({
transpile,
});
Loading

0 comments on commit 7451452

Please sign in to comment.