Skip to content

Commit

Permalink
Improve performance by using v8 serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
onigoetz committed Dec 27, 2023
1 parent 7f453ca commit 8ea7a09
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 29 deletions.
42 changes: 19 additions & 23 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,8 @@ import fs from 'node:fs'
import { createRequire } from 'node:module'
import path from 'node:path'
import { fileURLToPath, pathToFileURL } from 'node:url'
import {
MessageChannel,
type TransferListItem,
Worker,
parentPort,
receiveMessageOnPort,
// type-coverage:ignore-next-line -- we can't control
workerData,
} from 'node:worker_threads'
import v8 from 'node:v8'
import { type TransferListItem, Worker, parentPort } from 'node:worker_threads'

import { findUp, isPkgAvailable, tryExtensions } from '@pkgr/core'

Expand All @@ -22,10 +15,11 @@ import type {
MainToWorkerMessage,
Syncify,
ValueOf,
WorkerData,
WorkerToMainMessage,
} from './types.js'

const INT32_BYTES = 4

export * from './types.js'

export const TsRunner = {
Expand Down Expand Up @@ -58,7 +52,10 @@ export const DEFAULT_BUFFER_SIZE = SYNCKIT_BUFFER_SIZE

export const DEFAULT_TIMEOUT = SYNCKIT_TIMEOUT ? +SYNCKIT_TIMEOUT : undefined

export const DEFAULT_WORKER_BUFFER_SIZE = DEFAULT_BUFFER_SIZE || 1024
const DEFAULT_WORKER_BUFFER_KB = 64

export const DEFAULT_WORKER_BUFFER_SIZE =
DEFAULT_BUFFER_SIZE || DEFAULT_WORKER_BUFFER_KB * 1024

/* istanbul ignore next */
export const DEFAULT_EXEC_ARGV = SYNCKIT_EXEC_ARGV?.split(',') || []
Expand Down Expand Up @@ -429,8 +426,6 @@ function startWorkerThread<R, T extends AnyAsyncFn<R>>(
globalShims = DEFAULT_GLOBAL_SHIMS,
}: SynckitOptions = {},
) {
const { port1: mainPort, port2: workerPort } = new MessageChannel()

const {
isTs,
ext,
Expand Down Expand Up @@ -501,8 +496,7 @@ function startWorkerThread<R, T extends AnyAsyncFn<R>>(
: workerPathUrl,
{
eval: useEval,
workerData: { workerPort },
transferList: [workerPort, ...transferList],
transferList,
execArgv: finalExecArgv,
},
)
Expand Down Expand Up @@ -530,8 +524,9 @@ function startWorkerThread<R, T extends AnyAsyncFn<R>>(
result,
error,
properties,
} = (receiveMessageOnPort(mainPort) as { message: WorkerToMainMessage<R> })
.message
} = v8.deserialize(
Buffer.from(sharedBuffer, INT32_BYTES, sharedBufferView[0]),
) as WorkerToMainMessage<R>

/* istanbul ignore if */
if (id !== id2) {
Expand All @@ -556,13 +551,11 @@ export function runAsWorker<
T extends AnyAsyncFn<R> = AnyAsyncFn<R>,
>(fn: T) {
// type-coverage:ignore-next-line -- we can't control
if (!workerData) {
if (!parentPort) {
return
}

const { workerPort } = workerData as WorkerData

parentPort!.on(
parentPort.on(
'message',
({ sharedBuffer, id, args }: MainToWorkerMessage<Parameters<T>>) => {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
Expand All @@ -574,8 +567,11 @@ export function runAsWorker<
} catch (error: unknown) {
msg = { id, error, properties: extractProperties(error) }
}
workerPort.postMessage(msg)
Atomics.add(sharedBufferView, 0, 1)

const buf = v8.serialize(msg)
buf.copy(Buffer.from(sharedBuffer), INT32_BYTES)

Atomics.store(sharedBufferView, 0, buf.length)
Atomics.notify(sharedBufferView, 0)
})()
},
Expand Down
6 changes: 0 additions & 6 deletions src/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import { MessagePort } from 'node:worker_threads'

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type AnyFn<R = any, T extends any[] = any[]> = (...args: T) => R

Expand Down Expand Up @@ -27,10 +25,6 @@ export interface MainToWorkerMessage<T extends unknown[]> {
args: T
}

export interface WorkerData {
workerPort: MessagePort
}

export interface DataMessage<T> {
result?: T
error?: unknown
Expand Down

0 comments on commit 8ea7a09

Please sign in to comment.