Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance by using v8 serialization #150

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,12 @@ export interface GlobalShim {

### Envs

1. `SYNCKIT_BUFFER_SIZE`: `bufferSize` to create `SharedArrayBuffer` for `worker_threads` (default as `1024`)
2. `SYNCKIT_TIMEOUT`: `timeout` for performing the async job (no default)
3. `SYNCKIT_EXEC_ARGV`: List of node CLI options passed to the worker, split with comma `,`. (default as `[]`), see also [`node` docs](https://nodejs.org/api/worker_threads.html)
4. `SYNCKIT_TS_RUNNER`: Which TypeScript runner to be used, it could be very useful for development, could be `'ts-node' | 'esbuild-register' | 'esbuild-runner' | 'swc' | 'tsx'`, `'ts-node'` is used by default, make sure you have installed them already
5. `SYNCKIT_GLOBAL_SHIMS`: Whether to enable the default `DEFAULT_GLOBAL_SHIMS_PRESET` as `globalShims`
1. `SYNCKIT_BUFFER_SIZE`: Initial `bufferSize` for `SharedArrayBuffer` to transfer data from `worker_threads` (default as `1024` / 1KB)
2. `SYNCKIT_MAX_BUFFER_SIZE`: Maximum `bufferSize` for `SharedArrayBuffer` to transfer data from `worker_threads` (default as `1024 * 1024` / 1MB)
3. `SYNCKIT_TIMEOUT`: `timeout` for performing the async job (no default)
4. `SYNCKIT_EXEC_ARGV`: List of node CLI options passed to the worker, split with comma `,`. (default as `[]`), see also [`node` docs](https://nodejs.org/api/worker_threads.html)
5. `SYNCKIT_TS_RUNNER`: Which TypeScript runner to be used, it could be very useful for development, could be `'ts-node' | 'esbuild-register' | 'esbuild-runner' | 'swc' | 'tsx'`, `'ts-node'` is used by default, make sure you have installed them already
6. `SYNCKIT_GLOBAL_SHIMS`: Whether to enable the default `DEFAULT_GLOBAL_SHIMS_PRESET` as `globalShims`

### TypeScript

Expand Down
33 changes: 33 additions & 0 deletions src/SharedArrayBuffer.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Adds definitions missing from TypeScript, this feature is available since Node.js 20

interface SharedArrayBufferOptions {
maxByteLength?: number
}

interface SharedArrayBufferConstructor {
readonly prototype: SharedArrayBuffer
new (
byteLength: number,
options?: SharedArrayBufferOptions,
): SharedArrayBuffer
}

/**
* https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/SharedArrayBuffer/grow#specifications
*/
interface SharedArrayBuffer {
/**
* returns whether this SharedArrayBuffer can be grow or not.
*/
readonly growable?: boolean

/**
* returns the maximum length (in bytes) that this SharedArrayBuffer can be grown to.
*/
readonly maxByteLength?: number

/**
* grows the SharedArrayBuffer to the specified size, in bytes.
*/
grow?(newLength: number): void
}
86 changes: 72 additions & 14 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ import fs from 'node:fs'
import { createRequire } from 'node:module'
import path from 'node:path'
import { fileURLToPath, pathToFileURL } from 'node:url'
import v8 from 'node:v8'
import {
MessageChannel,
type MessagePort,
type TransferListItem,
Worker,
parentPort,
Expand All @@ -26,6 +28,8 @@ import type {
WorkerToMainMessage,
} from './types.js'

const INT32_BYTES = 4

export * from './types.js'

export const TsRunner = {
Expand All @@ -45,6 +49,7 @@ export type TsRunner = ValueOf<typeof TsRunner>

const {
SYNCKIT_BUFFER_SIZE,
SYNCKIT_MAX_BUFFER_SIZE,
SYNCKIT_TIMEOUT,
SYNCKIT_EXEC_ARGV,
SYNCKIT_TS_RUNNER,
Expand All @@ -56,10 +61,17 @@ export const DEFAULT_BUFFER_SIZE = SYNCKIT_BUFFER_SIZE
? +SYNCKIT_BUFFER_SIZE
: undefined

export const DEFAULT_TIMEOUT = SYNCKIT_TIMEOUT ? +SYNCKIT_TIMEOUT : undefined

export const DEFAULT_WORKER_BUFFER_SIZE = DEFAULT_BUFFER_SIZE || 1024

export const DEFAULT_MAX_BUFFER_SIZE = SYNCKIT_MAX_BUFFER_SIZE
? +SYNCKIT_MAX_BUFFER_SIZE
: undefined

export const DEFAULT_MAX_WORKER_BUFFER_SIZE =
DEFAULT_MAX_BUFFER_SIZE || 1024 * 1024

export const DEFAULT_TIMEOUT = SYNCKIT_TIMEOUT ? +SYNCKIT_TIMEOUT : undefined

/* istanbul ignore next */
export const DEFAULT_EXEC_ARGV = SYNCKIT_EXEC_ARGV?.split(',') || []

Expand Down Expand Up @@ -484,6 +496,11 @@ function startWorkerThread<R, T extends AnyAsyncFn<R>>(

const useEval = isTs ? !tsUseEsm : !jsUseEsm && useGlobals

const sharedBuffer = new SharedArrayBuffer(bufferSize, {
maxByteLength: DEFAULT_MAX_WORKER_BUFFER_SIZE,
})
const sharedBufferView = new Int32Array(sharedBuffer, 0, 1)

const worker = new Worker(
(jsUseEsm && useGlobals) || (tsUseEsm && finalTsRunner === TsRunner.TsNode)
? dataUrl(
Expand All @@ -501,7 +518,7 @@ function startWorkerThread<R, T extends AnyAsyncFn<R>>(
: workerPathUrl,
{
eval: useEval,
workerData: { workerPort },
workerData: { workerPort, sharedBuffer },
transferList: [workerPort, ...transferList],
execArgv: finalExecArgv,
},
Expand All @@ -512,10 +529,10 @@ function startWorkerThread<R, T extends AnyAsyncFn<R>>(
const syncFn = (...args: Parameters<T>): R => {
const id = nextID++

const sharedBuffer = new SharedArrayBuffer(bufferSize)
const sharedBufferView = new Int32Array(sharedBuffer)
// Reset SharedArrayBuffer
Atomics.store(sharedBufferView, 0, 0)

const msg: MainToWorkerMessage<Parameters<T>> = { sharedBuffer, id, args }
const msg: MainToWorkerMessage<Parameters<T>> = { id, args }
worker.postMessage(msg)

const status = Atomics.wait(sharedBufferView, 0, 0, timeout)
Expand All @@ -525,13 +542,19 @@ function startWorkerThread<R, T extends AnyAsyncFn<R>>(
throw new Error('Internal error: Atomics.wait() failed: ' + status)
}

const useBuffer = sharedBufferView[0] !== -1

const {
id: id2,
result,
error,
properties,
} = (receiveMessageOnPort(mainPort) as { message: WorkerToMainMessage<R> })
.message
} = useBuffer
? (v8.deserialize(
new Uint8Array(sharedBuffer, INT32_BYTES, sharedBufferView[0]),
) as WorkerToMainMessage<R>)
: (receiveMessageOnPort(mainPort) as { message: WorkerToMainMessage<R> })
.message

/* istanbul ignore if */
if (id !== id2) {
Expand All @@ -550,6 +573,43 @@ function startWorkerThread<R, T extends AnyAsyncFn<R>>(
return syncFn
}

/* istanbul ignore next */
function sendResponse<R>(
msg: WorkerToMainMessage<R>,
sharedBuffer: SharedArrayBuffer,
workerPort: MessagePort,
) {
// We try as much as possible to use the SharedArrayBuffer to send back the response
// But due to the size of the response it might not be possible
const buf = v8.serialize(msg)
let useBuffer = true

const expectedBufferLength = buf.length + INT32_BYTES

if (expectedBufferLength > sharedBuffer.byteLength) {
if (
sharedBuffer.growable &&
expectedBufferLength <= (sharedBuffer.maxByteLength || 0)
) {
sharedBuffer.grow!(expectedBufferLength)
} else {
useBuffer = false
}
}

const sharedBufferView = new Int32Array(sharedBuffer, 0, 1)

if (useBuffer) {
buf.copy(new Uint8Array(sharedBuffer, INT32_BYTES, buf.length))
Atomics.store(sharedBufferView, 0, buf.length)
} else {
workerPort.postMessage(msg)
Atomics.store(sharedBufferView, 0, -1)
}

Atomics.notify(sharedBufferView, 0)
}

/* istanbul ignore next */
export function runAsWorker<
R = unknown,
Expand All @@ -560,23 +620,21 @@ export function runAsWorker<
return
}

const { workerPort } = workerData as WorkerData
const { sharedBuffer, workerPort } = workerData as WorkerData

parentPort!.on(
'message',
({ sharedBuffer, id, args }: MainToWorkerMessage<Parameters<T>>) => {
({ id, args }: MainToWorkerMessage<Parameters<T>>) => {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
;(async () => {
const sharedBufferView = new Int32Array(sharedBuffer)
let msg: WorkerToMainMessage<R>
try {
msg = { id, result: await fn(...args) }
} catch (error: unknown) {
msg = { id, error, properties: extractProperties(error) }
}
workerPort.postMessage(msg)
Atomics.add(sharedBufferView, 0, 1)
Atomics.notify(sharedBufferView, 0)

sendResponse(msg, sharedBuffer, workerPort)
})()
},
)
Expand Down
2 changes: 1 addition & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ export type PromiseType<T extends AnyPromise> = T extends Promise<infer R>
export type ValueOf<T> = T[keyof T]

export interface MainToWorkerMessage<T extends unknown[]> {
sharedBuffer: SharedArrayBuffer
id: number
args: T
}

export interface WorkerData {
sharedBuffer: SharedArrayBuffer
workerPort: MessagePort
}

Expand Down
24 changes: 24 additions & 0 deletions test/fn.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ beforeEach(() => {
jest.resetModules()

delete process.env.SYNCKIT_BUFFER_SIZE
delete process.env.SYNCKIT_MAX_BUFFER_SIZE
delete process.env.SYNCKIT_GLOBAL_SHIMS

if (SYNCKIT_TIMEOUT) {
Expand Down Expand Up @@ -106,6 +107,29 @@ test('timeout', async () => {
)
})

test('works with responses bigger than the SharedArrayBuffer', async () => {
process.env.SYNCKIT_MAX_BUFFER_SIZE = `${1024 * 1024}`

const { createSyncFn } = await import('synckit')
const syncFn = createSyncFn<AsyncWorkerFn<string>>(workerCjsPath)

const longString = 'x'.repeat(10 * 1024 * 1024)

expect(syncFn(longString)).toBe(longString)
})

test('works with responses that require the SharedArrayBuffer to grow', async () => {
process.env.SYNCKIT_BUFFER_SIZE = `1024`
process.env.SYNCKIT_MAX_BUFFER_SIZE = `${1024 * 1024}`

const { createSyncFn } = await import('synckit')
const syncFn = createSyncFn<AsyncWorkerFn<string>>(workerCjsPath)

const longString = 'x'.repeat(1024 * 2)

expect(syncFn(longString)).toBe(longString)
})

test('globalShims env', async () => {
process.env.SYNCKIT_GLOBAL_SHIMS = '1'

Expand Down