Skip to content

Commit

Permalink
Only use sharedArrayBuffer if it is growable
Browse files Browse the repository at this point in the history
  • Loading branch information
onigoetz committed Dec 29, 2023
1 parent c00ac04 commit 555c4a0
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 17 deletions.
16 changes: 16 additions & 0 deletions src/SharedArrayBuffer.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Adds definitions missing from TypeScript, this feature is available since Node.js 20

/**
* https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/SharedArrayBuffer/grow#specifications
*/
interface SharedArrayBuffer {
/**
* returns whether this SharedArrayBuffer can be grow or not.
*/
readonly growable: boolean

/**
* grows the SharedArrayBuffer to the specified size, in bytes.
*/
grow(newLength: number): undefined
}
52 changes: 35 additions & 17 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import path from 'node:path'
import { fileURLToPath, pathToFileURL } from 'node:url'
import v8 from 'node:v8'
import {
MessageChannel,
type TransferListItem,
Worker,
parentPort,
receiveMessageOnPort,
// type-coverage:ignore-next-line -- we can't control
workerData,
} from 'node:worker_threads'
Expand Down Expand Up @@ -59,10 +61,7 @@ export const DEFAULT_BUFFER_SIZE = SYNCKIT_BUFFER_SIZE

export const DEFAULT_TIMEOUT = SYNCKIT_TIMEOUT ? +SYNCKIT_TIMEOUT : undefined

const DEFAULT_WORKER_BUFFER_KB = 64

export const DEFAULT_WORKER_BUFFER_SIZE =
DEFAULT_BUFFER_SIZE || DEFAULT_WORKER_BUFFER_KB * 1024
export const DEFAULT_WORKER_BUFFER_SIZE = DEFAULT_BUFFER_SIZE || 1024

/* istanbul ignore next */
export const DEFAULT_EXEC_ARGV = SYNCKIT_EXEC_ARGV?.split(',') || []
Expand Down Expand Up @@ -433,6 +432,8 @@ function startWorkerThread<R, T extends AnyAsyncFn<R>>(
globalShims = DEFAULT_GLOBAL_SHIMS,
}: SynckitOptions = {},
) {
const { port1: mainPort, port2: workerPort } = new MessageChannel()

const {
isTs,
ext,
Expand Down Expand Up @@ -506,12 +507,16 @@ function startWorkerThread<R, T extends AnyAsyncFn<R>>(
: workerPathUrl,
{
eval: useEval,
workerData: { sharedBuffer },
transferList,
workerData: { workerPort, sharedBuffer },
transferList: [workerPort, ...transferList],
execArgv: finalExecArgv,
},
)

// SharedArrayBuffer is faster to pass a response back, but has a limited size.
// We use this feature only if we can automatically resize the SharedArrayBuffer.
const useBuffer = sharedBuffer.growable

let nextID = 0

const syncFn = (...args: Parameters<T>): R => {
Expand All @@ -520,7 +525,7 @@ function startWorkerThread<R, T extends AnyAsyncFn<R>>(
// Reset SharedArrayBuffer
Atomics.store(sharedBufferView, 0, 0)

const msg: MainToWorkerMessage<Parameters<T>> = { id, args }
const msg: MainToWorkerMessage<Parameters<T>> = { id, useBuffer, args }
worker.postMessage(msg)

const status = Atomics.wait(sharedBufferView, 0, 0, timeout)
Expand All @@ -535,9 +540,12 @@ function startWorkerThread<R, T extends AnyAsyncFn<R>>(
result,
error,
properties,
} = v8.deserialize(
Buffer.from(sharedBuffer, INT32_BYTES, sharedBufferView[0]),
) as WorkerToMainMessage<R>
} = useBuffer
? (v8.deserialize(
Buffer.from(sharedBuffer, INT32_BYTES, sharedBufferView[0]),
) as WorkerToMainMessage<R>)
: (receiveMessageOnPort(mainPort) as { message: WorkerToMainMessage<R> })
.message

/* istanbul ignore if */
if (id !== id2) {
Expand All @@ -562,15 +570,15 @@ export function runAsWorker<
T extends AnyAsyncFn<R> = AnyAsyncFn<R>,
>(fn: T) {
// type-coverage:ignore-next-line -- we can't control
if (!workerData || !parentPort) {
if (!workerData) {
return
}

const { sharedBuffer } = workerData as WorkerData
const { sharedBuffer, workerPort } = workerData as WorkerData

parentPort.on(
parentPort!.on(
'message',
({ id, args }: MainToWorkerMessage<Parameters<T>>) => {
({ id, useBuffer, args }: MainToWorkerMessage<Parameters<T>>) => {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
;(async () => {
const sharedBufferView = new Int32Array(sharedBuffer)
Expand All @@ -581,10 +589,20 @@ export function runAsWorker<
msg = { id, error, properties: extractProperties(error) }
}

const buf = v8.serialize(msg)
buf.copy(Buffer.from(sharedBuffer), INT32_BYTES)
if (useBuffer) {
const buf = v8.serialize(msg)

if (buf.length > sharedBuffer.byteLength) {
sharedBuffer.grow(buf.length)
}

buf.copy(Buffer.from(sharedBuffer), INT32_BYTES)
Atomics.store(sharedBufferView, 0, buf.length)
} else {
workerPort.postMessage(msg)
Atomics.store(sharedBufferView, 0, 1)
}

Atomics.store(sharedBufferView, 0, buf.length)
Atomics.notify(sharedBufferView, 0)
})()
},
Expand Down
4 changes: 4 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { MessagePort } from 'node:worker_threads'

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type AnyFn<R = any, T extends any[] = any[]> = (...args: T) => R

Expand All @@ -21,11 +23,13 @@ export type ValueOf<T> = T[keyof T]

export interface MainToWorkerMessage<T extends unknown[]> {
id: number
useBuffer: boolean
args: T
}

export interface WorkerData {
sharedBuffer: SharedArrayBuffer
workerPort: MessagePort
}

export interface DataMessage<T> {
Expand Down

0 comments on commit 555c4a0

Please sign in to comment.