From a7a53eee7ea566ad6d2a8013b8a139a9c59e4cda Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Thu, 6 Feb 2025 12:46:02 +0200 Subject: [PATCH] feat(schema-compiler): Move transpiling to worker threads (under the flag) --- .github/workflows/push.yml | 3 + packages/cubejs-backend-shared/src/env.ts | 6 ++ packages/cubejs-schema-compiler/package.json | 3 +- .../src/compiler/CubeSymbols.js | 4 +- .../src/compiler/DataSchemaCompiler.js | 101 ++++++++++++++---- .../src/compiler/ErrorReporter.ts | 16 +++ .../src/compiler/PrepareCompiler.ts | 2 + .../transpilers/CubePropContextTranspiler.ts | 11 +- .../LightweightNodeCubeDictionary.ts | 14 +++ .../transpilers/LightweightSymbolResolver.ts | 30 ++++++ .../transpilers/transpiler.interface.ts | 9 ++ .../compiler/transpilers/transpiler_worker.ts | 67 ++++++++++++ yarn.lock | 5 + 13 files changed, 242 insertions(+), 29 deletions(-) create mode 100644 packages/cubejs-schema-compiler/src/compiler/transpilers/LightweightNodeCubeDictionary.ts create mode 100644 packages/cubejs-schema-compiler/src/compiler/transpilers/LightweightSymbolResolver.ts create mode 100644 packages/cubejs-schema-compiler/src/compiler/transpilers/transpiler_worker.ts diff --git a/.github/workflows/push.yml b/.github/workflows/push.yml index 778f2b91cff35..9f28caec0a0f6 100644 --- a/.github/workflows/push.yml +++ b/.github/workflows/push.yml @@ -42,8 +42,11 @@ jobs: matrix: # Current docker version + next LTS node-version: [20.x, 22.x] + transpile-worker-threads: [false, true] fail-fast: false + env: + CUBEJS_TRANSPILATION_WORKER_THREADS: ${{ matrix.transpile-worker-threads }} steps: - id: get-tag-out run: echo "$OUT" diff --git a/packages/cubejs-backend-shared/src/env.ts b/packages/cubejs-backend-shared/src/env.ts index 2f204b1cd4907..51df9ad4f5cfb 100644 --- a/packages/cubejs-backend-shared/src/env.ts +++ b/packages/cubejs-backend-shared/src/env.ts @@ -193,6 +193,12 @@ const variables: Record any> = { .default('1') .asInt(), nativeSqlPlanner: () => get('CUBEJS_TESSERACT_SQL_PLANNER').asBool(), + transpilationWorkerThreads: () => get('CUBEJS_TRANSPILATION_WORKER_THREADS') + .default('false') + .asBoolStrict(), + transpilationWorkerThreadsCount: () => get('CUBEJS_TRANSPILATION_WORKER_THREADS_COUNT') + .default('0') + .asInt(), /** **************************************************************** * Common db options * diff --git a/packages/cubejs-schema-compiler/package.json b/packages/cubejs-schema-compiler/package.json index 41d16d32fbeb6..f09d94da01111 100644 --- a/packages/cubejs-schema-compiler/package.json +++ b/packages/cubejs-schema-compiler/package.json @@ -54,7 +54,8 @@ "node-dijkstra": "^2.5.0", "ramda": "^0.27.2", "syntax-error": "^1.3.0", - "uuid": "^8.3.2" + "uuid": "^8.3.2", + "workerpool": "^9.2.0" }, "devDependencies": { "@cubejs-backend/apla-clickhouse": "^1.7.0", diff --git a/packages/cubejs-schema-compiler/src/compiler/CubeSymbols.js b/packages/cubejs-schema-compiler/src/compiler/CubeSymbols.js index ee8b0ab6e333f..22c3ae28087f6 100644 --- a/packages/cubejs-schema-compiler/src/compiler/CubeSymbols.js +++ b/packages/cubejs-schema-compiler/src/compiler/CubeSymbols.js @@ -8,7 +8,7 @@ import { camelizeCube } from './utils'; import { BaseQuery } from '../adapter'; const FunctionRegex = /function\s+\w+\(([A-Za-z0-9_,]*)|\(([\s\S]*?)\)\s*=>|\(?(\w+)\)?\s*=>/; -const CONTEXT_SYMBOLS = { +export const CONTEXT_SYMBOLS = { SECURITY_CONTEXT: 'securityContext', // SECURITY_CONTEXT has been deprecated, however security_context (lowecase) // is allowed in RBAC policies for query-time attribute matching @@ -19,7 +19,7 @@ const CONTEXT_SYMBOLS = { SQL_UTILS: 'sqlUtils' }; -const CURRENT_CUBE_CONSTANTS = ['CUBE', 'TABLE']; +export const CURRENT_CUBE_CONSTANTS = ['CUBE', 'TABLE']; export class CubeSymbols { constructor(evaluateViews) { diff --git a/packages/cubejs-schema-compiler/src/compiler/DataSchemaCompiler.js b/packages/cubejs-schema-compiler/src/compiler/DataSchemaCompiler.js index 0d16153fad2d7..1078c3458e14f 100644 --- a/packages/cubejs-schema-compiler/src/compiler/DataSchemaCompiler.js +++ b/packages/cubejs-schema-compiler/src/compiler/DataSchemaCompiler.js @@ -6,8 +6,9 @@ import { parse } from '@babel/parser'; import babelGenerator from '@babel/generator'; import babelTraverse from '@babel/traverse'; import R from 'ramda'; +import workerpool from 'workerpool'; -import { isNativeSupported } from '@cubejs-backend/shared'; +import { getEnv, isNativeSupported } from '@cubejs-backend/shared'; import { UserError } from './UserError'; import { ErrorReporter } from './ErrorReporter'; @@ -26,6 +27,8 @@ export class DataSchemaCompiler { this.preTranspileCubeCompilers = options.preTranspileCubeCompilers || []; this.cubeNameCompilers = options.cubeNameCompilers || []; this.extensions = options.extensions || {}; + this.cubeDictionary = options.cubeDictionary; + this.cubeSymbols = options.cubeSymbols; this.cubeFactory = options.cubeFactory; this.filesToCompile = options.filesToCompile; this.omitErrors = options.omitErrors; @@ -38,6 +41,7 @@ export class DataSchemaCompiler { this.yamlCompiler = options.yamlCompiler; this.yamlCompiler.dataSchemaCompiler = this; this.pythonContext = null; + this.workerPool = null; } compileObjects(compileServices, objects, errorsReport) { @@ -87,17 +91,52 @@ export class DataSchemaCompiler { const errorsReport = new ErrorReporter(null, [], this.errorReport); this.errorsReport = errorsReport; - // TODO: required in order to get pre transpile compilation work - const transpile = () => toCompile.map(f => this.transpileFile(f, errorsReport)).filter(f => !!f); + if (getEnv('transpilationWorkerThreads')) { + const wc = getEnv('transpilationWorkerThreadsCount'); + this.workerPool = workerpool.pool( + path.join(__dirname, 'transpilers/transpiler_worker'), + wc > 0 ? { maxWorkers: wc } : undefined, + ); + } + + const transpile = async () => { + let cubeNames; + let cubeSymbolsNames; + + if (getEnv('transpilationWorkerThreads')) { + cubeNames = Object.keys(this.cubeDictionary.byId); + // We need only cubes and all its member names for transpiling. + // Cubes doesn't change during transpiling, but are changed during compilation phase, + // so we can prepare them once for every phase. + // Communication between main and worker threads uses + // The structured clone algorithm (@see https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm) + // which doesn't allow passing any function objects, so we need to sanitize the symbols. + cubeSymbolsNames = Object.fromEntries( + Object.entries(this.cubeSymbols.symbols) + .map( + ([key, value]) => [key, Object.fromEntries( + Object.keys(value).map((k) => [k, true]), + )], + ), + ); + } + const results = await Promise.all(toCompile.map(f => this.transpileFile(f, errorsReport, { cubeNames, cubeSymbolsNames }))); + return results.filter(f => !!f); + }; - const compilePhase = (compilers) => this.compileCubeFiles(compilers, transpile(), errorsReport); + const compilePhase = async (compilers) => this.compileCubeFiles(compilers, await transpile(), errorsReport); return compilePhase({ cubeCompilers: this.cubeNameCompilers }) .then(() => compilePhase({ cubeCompilers: this.preTranspileCubeCompilers })) .then(() => compilePhase({ cubeCompilers: this.cubeCompilers, contextCompilers: this.contextCompilers, - })); + })) + .then(() => { + if (this.workerPool) { + this.workerPool.terminate(); + } + }); } compile() { @@ -113,7 +152,7 @@ export class DataSchemaCompiler { return this.compilePromise; } - transpileFile(file, errorsReport) { + async transpileFile(file, errorsReport, options) { if (R.endsWith('.jinja', file.fileName) || (R.endsWith('.yml', file.fileName) || R.endsWith('.yaml', file.fileName)) // TODO do Jinja syntax check with jinja compiler @@ -132,31 +171,47 @@ export class DataSchemaCompiler { } else if (R.endsWith('.yml', file.fileName) || R.endsWith('.yaml', file.fileName)) { return file; } else if (R.endsWith('.js', file.fileName)) { - return this.transpileJsFile(file, errorsReport); + return this.transpileJsFile(file, errorsReport, options); } else { return file; } } - transpileJsFile(file, errorsReport) { + async transpileJsFile(file, errorsReport, { cubeNames, cubeSymbolsNames }) { try { - const ast = parse( - file.content, - { - sourceFilename: file.fileName, - sourceType: 'module', - plugins: ['objectRestSpread'] - }, - ); + if (getEnv('transpilationWorkerThreads')) { + const data = { + fileName: file.fileName, + content: file.content, + transpilers: this.transpilers.map(t => t.constructor.name), + cubeNames, + cubeSymbolsNames, + }; + + const res = await this.workerPool.exec('transpile', [data]); + errorsReport.addErrors(res.errors); + errorsReport.addWarnings(res.warnings); + + return Object.assign({}, file, { content: res.content }); + } else { + const ast = parse( + file.content, + { + sourceFilename: file.fileName, + sourceType: 'module', + plugins: ['objectRestSpread'], + }, + ); - this.transpilers.forEach((t) => { - errorsReport.inFile(file); - babelTraverse(ast, t.traverseObject(errorsReport)); - errorsReport.exitFile(); - }); + this.transpilers.forEach((t) => { + errorsReport.inFile(file); + babelTraverse(ast, t.traverseObject(errorsReport)); + errorsReport.exitFile(); + }); - const content = babelGenerator(ast, {}, file.content).code; - return Object.assign({}, file, { content }); + const content = babelGenerator(ast, {}, file.content).code; + return Object.assign({}, file, { content }); + } } catch (e) { if (e.toString().indexOf('SyntaxError') !== -1) { const line = file.content.split('\n')[e.loc.line - 1]; diff --git a/packages/cubejs-schema-compiler/src/compiler/ErrorReporter.ts b/packages/cubejs-schema-compiler/src/compiler/ErrorReporter.ts index 8407dd8605453..bf48792f4a3ab 100644 --- a/packages/cubejs-schema-compiler/src/compiler/ErrorReporter.ts +++ b/packages/cubejs-schema-compiler/src/compiler/ErrorReporter.ts @@ -137,6 +137,22 @@ export class ErrorReporter { } } + public getErrors() { + return this.rootReporter().errors; + } + + public addErrors(errors: CompilerErrorInterface[]) { + this.rootReporter().errors.push(...errors); + } + + public getWarnings() { + return this.rootReporter().warnings; + } + + public addWarnings(warnings: SyntaxErrorInterface[]) { + this.rootReporter().warnings.push(...warnings); + } + protected rootReporter(): ErrorReporter { return this.parent ? this.parent.rootReporter() : this; } diff --git a/packages/cubejs-schema-compiler/src/compiler/PrepareCompiler.ts b/packages/cubejs-schema-compiler/src/compiler/PrepareCompiler.ts index 446769688f07d..c01d2ff3e680b 100644 --- a/packages/cubejs-schema-compiler/src/compiler/PrepareCompiler.ts +++ b/packages/cubejs-schema-compiler/src/compiler/PrepareCompiler.ts @@ -64,6 +64,8 @@ export const prepareCompiler = (repo: SchemaFileRepository, options: PrepareComp contextCompilers: [contextEvaluator], cubeFactory: cubeSymbols.createCube.bind(cubeSymbols), compilerCache, + cubeDictionary, + cubeSymbols, extensions: { Funnels, RefreshKeys, diff --git a/packages/cubejs-schema-compiler/src/compiler/transpilers/CubePropContextTranspiler.ts b/packages/cubejs-schema-compiler/src/compiler/transpilers/CubePropContextTranspiler.ts index a7dcb2e85b26a..74a8aa67bba49 100644 --- a/packages/cubejs-schema-compiler/src/compiler/transpilers/CubePropContextTranspiler.ts +++ b/packages/cubejs-schema-compiler/src/compiler/transpilers/CubePropContextTranspiler.ts @@ -2,7 +2,12 @@ import * as t from '@babel/types'; import R from 'ramda'; import type { NodePath } from '@babel/traverse'; -import type { TranspilerInterface, TraverseObject } from './transpiler.interface'; +import { + TranspilerCubeResolver, + TranspilerInterface, + TranspilerSymbolResolver, + TraverseObject +} from './transpiler.interface'; import type { CubeSymbols } from '../CubeSymbols'; import type { CubeDictionary } from '../CubeDictionary'; @@ -39,8 +44,8 @@ transpiledFieldsPatterns?.forEach((r) => { export class CubePropContextTranspiler implements TranspilerInterface { public constructor( - protected readonly cubeSymbols: CubeSymbols, - protected readonly cubeDictionary: CubeDictionary, + protected readonly cubeSymbols: TranspilerSymbolResolver, + protected readonly cubeDictionary: TranspilerCubeResolver, ) { } diff --git a/packages/cubejs-schema-compiler/src/compiler/transpilers/LightweightNodeCubeDictionary.ts b/packages/cubejs-schema-compiler/src/compiler/transpilers/LightweightNodeCubeDictionary.ts new file mode 100644 index 0000000000000..159dbb8377a7a --- /dev/null +++ b/packages/cubejs-schema-compiler/src/compiler/transpilers/LightweightNodeCubeDictionary.ts @@ -0,0 +1,14 @@ +import { TranspilerCubeResolver } from './transpiler.interface'; + +export class LightweightNodeCubeDictionary implements TranspilerCubeResolver { + public constructor(private cubeNames: string[] = []) { + } + + public resolveCube(name: string): boolean { + return this.cubeNames.includes(name); + } + + public setCubeNames(cubeNames: string[]): void { + this.cubeNames = cubeNames; + } +} diff --git a/packages/cubejs-schema-compiler/src/compiler/transpilers/LightweightSymbolResolver.ts b/packages/cubejs-schema-compiler/src/compiler/transpilers/LightweightSymbolResolver.ts new file mode 100644 index 0000000000000..1041a6190c6eb --- /dev/null +++ b/packages/cubejs-schema-compiler/src/compiler/transpilers/LightweightSymbolResolver.ts @@ -0,0 +1,30 @@ +import { TranspilerSymbolResolver } from './transpiler.interface'; +import { CONTEXT_SYMBOLS, CURRENT_CUBE_CONSTANTS } from '../CubeSymbols'; + +type CubeSymbols = Record>; + +export class LightweightSymbolResolver implements TranspilerSymbolResolver { + public constructor(private symbols: CubeSymbols = {}) { + } + + public setSymbols(symbols: CubeSymbols) { + this.symbols = symbols; + } + + public isCurrentCube(name): boolean { + return CURRENT_CUBE_CONSTANTS.indexOf(name) >= 0; + } + + public resolveSymbol(cubeName, name): any { + if (name === 'USER_CONTEXT') { + throw new Error('Support for USER_CONTEXT was removed, please migrate to SECURITY_CONTEXT.'); + } + + if (CONTEXT_SYMBOLS[name]) { + return true; + } + + const cube = this.symbols[this.isCurrentCube(name) ? cubeName : name]; + return cube || (this.symbols[cubeName] && this.symbols[cubeName][name]); + } +} diff --git a/packages/cubejs-schema-compiler/src/compiler/transpilers/transpiler.interface.ts b/packages/cubejs-schema-compiler/src/compiler/transpilers/transpiler.interface.ts index 0f15c6f8401a8..803a1c0fa4082 100644 --- a/packages/cubejs-schema-compiler/src/compiler/transpilers/transpiler.interface.ts +++ b/packages/cubejs-schema-compiler/src/compiler/transpilers/transpiler.interface.ts @@ -6,3 +6,12 @@ export type TraverseObject = TraverseOptions; export interface TranspilerInterface { traverseObject(reporter: ErrorReporter): TraverseObject; } + +export interface TranspilerSymbolResolver { + resolveSymbol(cubeName, name): any; + isCurrentCube(name): boolean; +} + +export interface TranspilerCubeResolver { + resolveCube(name): boolean; +} diff --git a/packages/cubejs-schema-compiler/src/compiler/transpilers/transpiler_worker.ts b/packages/cubejs-schema-compiler/src/compiler/transpilers/transpiler_worker.ts new file mode 100644 index 0000000000000..9dbefdf705bbd --- /dev/null +++ b/packages/cubejs-schema-compiler/src/compiler/transpilers/transpiler_worker.ts @@ -0,0 +1,67 @@ +import workerpool from 'workerpool'; +import { parse } from '@babel/parser'; +import babelGenerator from '@babel/generator'; +import babelTraverse from '@babel/traverse'; + +import { ValidationTranspiler } from './ValidationTranspiler'; +import { ImportExportTranspiler } from './ImportExportTranspiler'; +import { CubeCheckDuplicatePropTranspiler } from './CubeCheckDuplicatePropTranspiler'; +import { CubePropContextTranspiler } from './CubePropContextTranspiler'; +import { ErrorReporter } from '../ErrorReporter'; +import { LightweightSymbolResolver } from './LightweightSymbolResolver'; +import { LightweightNodeCubeDictionary } from './LightweightNodeCubeDictionary'; + +type TransferContent = { + fileName: string; + content: string; + transpilers: string[]; + cubeNames: string[]; + cubeSymbolsNames: Record>; +}; + +const cubeDictionary = new LightweightNodeCubeDictionary(); +const cubeSymbols = new LightweightSymbolResolver(); +const errorsReport = new ErrorReporter(null, []); + +const transpilers = { + ValidationTranspiler: new ValidationTranspiler(), + ImportExportTranspiler: new ImportExportTranspiler(), + CubeCheckDuplicatePropTranspiler: new CubeCheckDuplicatePropTranspiler(), + CubePropContextTranspiler: new CubePropContextTranspiler(cubeSymbols, cubeDictionary), +}; + +const transpile = (data: TransferContent) => { + cubeDictionary.setCubeNames(data.cubeNames); + cubeSymbols.setSymbols(data.cubeSymbolsNames); + + const ast = parse( + data.content, + { + sourceFilename: data.fileName, + sourceType: 'module', + plugins: ['objectRestSpread'] + }, + ); + + data.transpilers.forEach(transpilerName => { + if (transpilers[transpilerName]) { + errorsReport.inFile(data); + babelTraverse(ast, transpilers[transpilerName].traverseObject(errorsReport)); + errorsReport.exitFile(); + } else { + throw new Error(`Transpiler ${transpilerName} not supported`); + } + }); + + const content = babelGenerator(ast, {}, data.content).code; + + return { + content, + errors: errorsReport.getErrors(), + warnings: errorsReport.getWarnings() + }; +}; + +workerpool.worker({ + transpile, +}); diff --git a/yarn.lock b/yarn.lock index 724fe49cca325..04fb697d4685e 100644 --- a/yarn.lock +++ b/yarn.lock @@ -29691,6 +29691,11 @@ wordwrap@^1.0.0: resolved "https://registry.yarnpkg.com/wordwrap/-/wordwrap-1.0.0.tgz#27584810891456a4171c8d0226441ade90cbcaeb" integrity sha1-J1hIEIkUVqQXHI0CJkQa3pDLyus= +workerpool@^9.2.0: + version "9.2.0" + resolved "https://registry.yarnpkg.com/workerpool/-/workerpool-9.2.0.tgz#f74427cbb61234708332ed8ab9cbf56dcb1c4371" + integrity sha512-PKZqBOCo6CYkVOwAxWxQaSF2Fvb5Iv2fCeTP7buyWI2GiynWr46NcXSgK/idoV6e60dgCBfgYc+Un3HMvmqP8w== + "wrap-ansi-cjs@npm:wrap-ansi@^7.0.0": version "7.0.0" resolved "https://registry.yarnpkg.com/wrap-ansi/-/wrap-ansi-7.0.0.tgz#67e145cff510a6a6984bdf1152911d69d2eb9e43"