diff --git a/README.md b/README.md index d2524c38a..bc5a1e2f4 100644 --- a/README.md +++ b/README.md @@ -120,11 +120,12 @@ export interface GlobalShim { ### Envs -1. `SYNCKIT_BUFFER_SIZE`: `bufferSize` to create `SharedArrayBuffer` for `worker_threads` (default as `1024`) -2. `SYNCKIT_TIMEOUT`: `timeout` for performing the async job (no default) -3. `SYNCKIT_EXEC_ARGV`: List of node CLI options passed to the worker, split with comma `,`. (default as `[]`), see also [`node` docs](https://nodejs.org/api/worker_threads.html) -4. `SYNCKIT_TS_RUNNER`: Which TypeScript runner to be used, it could be very useful for development, could be `'ts-node' | 'esbuild-register' | 'esbuild-runner' | 'swc' | 'tsx'`, `'ts-node'` is used by default, make sure you have installed them already -5. `SYNCKIT_GLOBAL_SHIMS`: Whether to enable the default `DEFAULT_GLOBAL_SHIMS_PRESET` as `globalShims` +1. `SYNCKIT_BUFFER_SIZE`: Initial `bufferSize` for `SharedArrayBuffer` to transfer data from `worker_threads` (default as `1024` / 1KB) +2. `SYNCKIT_MAX_BUFFER_SIZE`: Maximum `bufferSize` for `SharedArrayBuffer` to transfer data from `worker_threads` (default as `1024 * 1024` / 1MB) +3. `SYNCKIT_TIMEOUT`: `timeout` for performing the async job (no default) +4. `SYNCKIT_EXEC_ARGV`: List of node CLI options passed to the worker, split with comma `,`. (default as `[]`), see also [`node` docs](https://nodejs.org/api/worker_threads.html) +5. `SYNCKIT_TS_RUNNER`: Which TypeScript runner to be used, it could be very useful for development, could be `'ts-node' | 'esbuild-register' | 'esbuild-runner' | 'swc' | 'tsx'`, `'ts-node'` is used by default, make sure you have installed them already +6. `SYNCKIT_GLOBAL_SHIMS`: Whether to enable the default `DEFAULT_GLOBAL_SHIMS_PRESET` as `globalShims` ### TypeScript diff --git a/src/SharedArrayBuffer.d.ts b/src/SharedArrayBuffer.d.ts new file mode 100644 index 000000000..c487e060e --- /dev/null +++ b/src/SharedArrayBuffer.d.ts @@ -0,0 +1,33 @@ +// Adds definitions missing from TypeScript, this feature is available since Node.js 20 + +interface SharedArrayBufferOptions { + maxByteLength?: number +} + +interface SharedArrayBufferConstructor { + readonly prototype: SharedArrayBuffer + new ( + byteLength: number, + options?: SharedArrayBufferOptions, + ): SharedArrayBuffer +} + +/** + * https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/SharedArrayBuffer/grow#specifications + */ +interface SharedArrayBuffer { + /** + * returns whether this SharedArrayBuffer can be grow or not. + */ + readonly growable?: boolean + + /** + * returns the maximum length (in bytes) that this SharedArrayBuffer can be grown to. + */ + readonly maxByteLength?: number + + /** + * grows the SharedArrayBuffer to the specified size, in bytes. + */ + grow?(newLength: number): void +} diff --git a/src/index.ts b/src/index.ts index 359a3c0d1..977f77231 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,8 +3,10 @@ import fs from 'node:fs' import { createRequire } from 'node:module' import path from 'node:path' import { fileURLToPath, pathToFileURL } from 'node:url' +import v8 from 'node:v8' import { MessageChannel, + type MessagePort, type TransferListItem, Worker, parentPort, @@ -26,6 +28,8 @@ import type { WorkerToMainMessage, } from './types.js' +const INT32_BYTES = 4 + export * from './types.js' export const TsRunner = { @@ -45,6 +49,7 @@ export type TsRunner = ValueOf<typeof TsRunner> const { SYNCKIT_BUFFER_SIZE, + SYNCKIT_MAX_BUFFER_SIZE, SYNCKIT_TIMEOUT, SYNCKIT_EXEC_ARGV, SYNCKIT_TS_RUNNER, @@ -56,10 +61,17 @@ export const DEFAULT_BUFFER_SIZE = SYNCKIT_BUFFER_SIZE ? +SYNCKIT_BUFFER_SIZE : undefined -export const DEFAULT_TIMEOUT = SYNCKIT_TIMEOUT ? +SYNCKIT_TIMEOUT : undefined - export const DEFAULT_WORKER_BUFFER_SIZE = DEFAULT_BUFFER_SIZE || 1024 +export const DEFAULT_MAX_BUFFER_SIZE = SYNCKIT_MAX_BUFFER_SIZE + ? +SYNCKIT_MAX_BUFFER_SIZE + : undefined + +export const DEFAULT_MAX_WORKER_BUFFER_SIZE = + DEFAULT_MAX_BUFFER_SIZE || 1024 * 1024 + +export const DEFAULT_TIMEOUT = SYNCKIT_TIMEOUT ? +SYNCKIT_TIMEOUT : undefined + /* istanbul ignore next */ export const DEFAULT_EXEC_ARGV = SYNCKIT_EXEC_ARGV?.split(',') || [] @@ -484,6 +496,11 @@ function startWorkerThread<R, T extends AnyAsyncFn<R>>( const useEval = isTs ? !tsUseEsm : !jsUseEsm && useGlobals + const sharedBuffer = new SharedArrayBuffer(bufferSize, { + maxByteLength: DEFAULT_MAX_WORKER_BUFFER_SIZE, + }) + const sharedBufferView = new Int32Array(sharedBuffer, 0, 1) + const worker = new Worker( (jsUseEsm && useGlobals) || (tsUseEsm && finalTsRunner === TsRunner.TsNode) ? dataUrl( @@ -501,7 +518,7 @@ function startWorkerThread<R, T extends AnyAsyncFn<R>>( : workerPathUrl, { eval: useEval, - workerData: { workerPort }, + workerData: { workerPort, sharedBuffer }, transferList: [workerPort, ...transferList], execArgv: finalExecArgv, }, @@ -512,10 +529,10 @@ function startWorkerThread<R, T extends AnyAsyncFn<R>>( const syncFn = (...args: Parameters<T>): R => { const id = nextID++ - const sharedBuffer = new SharedArrayBuffer(bufferSize) - const sharedBufferView = new Int32Array(sharedBuffer) + // Reset SharedArrayBuffer + Atomics.store(sharedBufferView, 0, 0) - const msg: MainToWorkerMessage<Parameters<T>> = { sharedBuffer, id, args } + const msg: MainToWorkerMessage<Parameters<T>> = { id, args } worker.postMessage(msg) const status = Atomics.wait(sharedBufferView, 0, 0, timeout) @@ -525,13 +542,19 @@ function startWorkerThread<R, T extends AnyAsyncFn<R>>( throw new Error('Internal error: Atomics.wait() failed: ' + status) } + const useBuffer = sharedBufferView[0] !== -1 + const { id: id2, result, error, properties, - } = (receiveMessageOnPort(mainPort) as { message: WorkerToMainMessage<R> }) - .message + } = useBuffer + ? (v8.deserialize( + new Uint8Array(sharedBuffer, INT32_BYTES, sharedBufferView[0]), + ) as WorkerToMainMessage<R>) + : (receiveMessageOnPort(mainPort) as { message: WorkerToMainMessage<R> }) + .message /* istanbul ignore if */ if (id !== id2) { @@ -550,6 +573,43 @@ function startWorkerThread<R, T extends AnyAsyncFn<R>>( return syncFn } +/* istanbul ignore next */ +function sendResponse<R>( + msg: WorkerToMainMessage<R>, + sharedBuffer: SharedArrayBuffer, + workerPort: MessagePort, +) { + // We try as much as possible to use the SharedArrayBuffer to send back the response + // But due to the size of the response it might not be possible + const buf = v8.serialize(msg) + let useBuffer = true + + const expectedBufferLength = buf.length + INT32_BYTES + + if (expectedBufferLength > sharedBuffer.byteLength) { + if ( + sharedBuffer.growable && + expectedBufferLength <= (sharedBuffer.maxByteLength || 0) + ) { + sharedBuffer.grow!(expectedBufferLength) + } else { + useBuffer = false + } + } + + const sharedBufferView = new Int32Array(sharedBuffer, 0, 1) + + if (useBuffer) { + buf.copy(new Uint8Array(sharedBuffer, INT32_BYTES, buf.length)) + Atomics.store(sharedBufferView, 0, buf.length) + } else { + workerPort.postMessage(msg) + Atomics.store(sharedBufferView, 0, -1) + } + + Atomics.notify(sharedBufferView, 0) +} + /* istanbul ignore next */ export function runAsWorker< R = unknown, @@ -560,23 +620,21 @@ export function runAsWorker< return } - const { workerPort } = workerData as WorkerData + const { sharedBuffer, workerPort } = workerData as WorkerData parentPort!.on( 'message', - ({ sharedBuffer, id, args }: MainToWorkerMessage<Parameters<T>>) => { + ({ id, args }: MainToWorkerMessage<Parameters<T>>) => { // eslint-disable-next-line @typescript-eslint/no-floating-promises ;(async () => { - const sharedBufferView = new Int32Array(sharedBuffer) let msg: WorkerToMainMessage<R> try { msg = { id, result: await fn(...args) } } catch (error: unknown) { msg = { id, error, properties: extractProperties(error) } } - workerPort.postMessage(msg) - Atomics.add(sharedBufferView, 0, 1) - Atomics.notify(sharedBufferView, 0) + + sendResponse(msg, sharedBuffer, workerPort) })() }, ) diff --git a/src/types.ts b/src/types.ts index 806202523..9548a341e 100644 --- a/src/types.ts +++ b/src/types.ts @@ -22,12 +22,12 @@ export type PromiseType<T extends AnyPromise> = T extends Promise<infer R> export type ValueOf<T> = T[keyof T] export interface MainToWorkerMessage<T extends unknown[]> { - sharedBuffer: SharedArrayBuffer id: number args: T } export interface WorkerData { + sharedBuffer: SharedArrayBuffer workerPort: MessagePort } diff --git a/test/fn.spec.ts b/test/fn.spec.ts index b94540cd6..8d6449e88 100644 --- a/test/fn.spec.ts +++ b/test/fn.spec.ts @@ -14,6 +14,7 @@ beforeEach(() => { jest.resetModules() delete process.env.SYNCKIT_BUFFER_SIZE + delete process.env.SYNCKIT_MAX_BUFFER_SIZE delete process.env.SYNCKIT_GLOBAL_SHIMS if (SYNCKIT_TIMEOUT) { @@ -106,6 +107,29 @@ test('timeout', async () => { ) }) +test('works with responses bigger than the SharedArrayBuffer', async () => { + process.env.SYNCKIT_MAX_BUFFER_SIZE = `${1024 * 1024}` + + const { createSyncFn } = await import('synckit') + const syncFn = createSyncFn<AsyncWorkerFn<string>>(workerCjsPath) + + const longString = 'x'.repeat(10 * 1024 * 1024) + + expect(syncFn(longString)).toBe(longString) +}) + +test('works with responses that require the SharedArrayBuffer to grow', async () => { + process.env.SYNCKIT_BUFFER_SIZE = `1024` + process.env.SYNCKIT_MAX_BUFFER_SIZE = `${1024 * 1024}` + + const { createSyncFn } = await import('synckit') + const syncFn = createSyncFn<AsyncWorkerFn<string>>(workerCjsPath) + + const longString = 'x'.repeat(1024 * 2) + + expect(syncFn(longString)).toBe(longString) +}) + test('globalShims env', async () => { process.env.SYNCKIT_GLOBAL_SHIMS = '1'