diff --git a/src/SharedArrayBuffer.d.ts b/src/SharedArrayBuffer.d.ts new file mode 100644 index 000000000..cd41b4e5a --- /dev/null +++ b/src/SharedArrayBuffer.d.ts @@ -0,0 +1,16 @@ +// Adds definitions missing from TypeScript, this feature is available since Node.js 20 + +/** + * https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/SharedArrayBuffer/grow#specifications + */ +interface SharedArrayBuffer { + /** + * returns whether this SharedArrayBuffer can be grow or not. + */ + readonly growable: boolean + + /** + * grows the SharedArrayBuffer to the specified size, in bytes. + */ + grow(newLength: number): undefined +} diff --git a/src/index.ts b/src/index.ts index 2be4cf3db..45f5fe5f7 100644 --- a/src/index.ts +++ b/src/index.ts @@ -5,9 +5,11 @@ import path from 'node:path' import { fileURLToPath, pathToFileURL } from 'node:url' import v8 from 'node:v8' import { + MessageChannel, type TransferListItem, Worker, parentPort, + receiveMessageOnPort, // type-coverage:ignore-next-line -- we can't control workerData, } from 'node:worker_threads' @@ -59,10 +61,7 @@ export const DEFAULT_BUFFER_SIZE = SYNCKIT_BUFFER_SIZE export const DEFAULT_TIMEOUT = SYNCKIT_TIMEOUT ? +SYNCKIT_TIMEOUT : undefined -const DEFAULT_WORKER_BUFFER_KB = 64 - -export const DEFAULT_WORKER_BUFFER_SIZE = - DEFAULT_BUFFER_SIZE || DEFAULT_WORKER_BUFFER_KB * 1024 +export const DEFAULT_WORKER_BUFFER_SIZE = DEFAULT_BUFFER_SIZE || 1024 /* istanbul ignore next */ export const DEFAULT_EXEC_ARGV = SYNCKIT_EXEC_ARGV?.split(',') || [] @@ -433,6 +432,8 @@ function startWorkerThread>( globalShims = DEFAULT_GLOBAL_SHIMS, }: SynckitOptions = {}, ) { + const { port1: mainPort, port2: workerPort } = new MessageChannel() + const { isTs, ext, @@ -506,12 +507,16 @@ function startWorkerThread>( : workerPathUrl, { eval: useEval, - workerData: { sharedBuffer }, - transferList, + workerData: { workerPort, sharedBuffer }, + transferList: [workerPort, ...transferList], execArgv: finalExecArgv, }, ) + // SharedArrayBuffer is faster to pass a response back, but has a limited size. + // We use this feature only if we can automatically resize the SharedArrayBuffer. + const useBuffer = sharedBuffer.growable + let nextID = 0 const syncFn = (...args: Parameters): R => { @@ -520,7 +525,7 @@ function startWorkerThread>( // Reset SharedArrayBuffer Atomics.store(sharedBufferView, 0, 0) - const msg: MainToWorkerMessage> = { id, args } + const msg: MainToWorkerMessage> = { id, useBuffer, args } worker.postMessage(msg) const status = Atomics.wait(sharedBufferView, 0, 0, timeout) @@ -535,9 +540,12 @@ function startWorkerThread>( result, error, properties, - } = v8.deserialize( - Buffer.from(sharedBuffer, INT32_BYTES, sharedBufferView[0]), - ) as WorkerToMainMessage + } = useBuffer + ? (v8.deserialize( + Buffer.from(sharedBuffer, INT32_BYTES, sharedBufferView[0]), + ) as WorkerToMainMessage) + : (receiveMessageOnPort(mainPort) as { message: WorkerToMainMessage }) + .message /* istanbul ignore if */ if (id !== id2) { @@ -562,15 +570,15 @@ export function runAsWorker< T extends AnyAsyncFn = AnyAsyncFn, >(fn: T) { // type-coverage:ignore-next-line -- we can't control - if (!workerData || !parentPort) { + if (!workerData) { return } - const { sharedBuffer } = workerData as WorkerData + const { sharedBuffer, workerPort } = workerData as WorkerData - parentPort.on( + parentPort!.on( 'message', - ({ id, args }: MainToWorkerMessage>) => { + ({ id, useBuffer, args }: MainToWorkerMessage>) => { // eslint-disable-next-line @typescript-eslint/no-floating-promises ;(async () => { const sharedBufferView = new Int32Array(sharedBuffer) @@ -581,10 +589,20 @@ export function runAsWorker< msg = { id, error, properties: extractProperties(error) } } - const buf = v8.serialize(msg) - buf.copy(Buffer.from(sharedBuffer), INT32_BYTES) + if (useBuffer) { + const buf = v8.serialize(msg) + + if (buf.length > sharedBuffer.byteLength) { + sharedBuffer.grow(buf.length) + } + + buf.copy(Buffer.from(sharedBuffer), INT32_BYTES) + Atomics.store(sharedBufferView, 0, buf.length) + } else { + workerPort.postMessage(msg) + Atomics.store(sharedBufferView, 0, 1) + } - Atomics.store(sharedBufferView, 0, buf.length) Atomics.notify(sharedBufferView, 0) })() }, diff --git a/src/types.ts b/src/types.ts index 6f5d908cd..1d38390d2 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,3 +1,5 @@ +import { MessagePort } from 'node:worker_threads' + // eslint-disable-next-line @typescript-eslint/no-explicit-any export type AnyFn = (...args: T) => R @@ -21,11 +23,13 @@ export type ValueOf = T[keyof T] export interface MainToWorkerMessage { id: number + useBuffer: boolean args: T } export interface WorkerData { sharedBuffer: SharedArrayBuffer + workerPort: MessagePort } export interface DataMessage {