Skip to content

Commit

Permalink
refactor: make qc test executor multithreaded
Browse files Browse the repository at this point in the history
  • Loading branch information
aqrln committed Feb 10, 2025
1 parent 81f90f4 commit 6847763
Show file tree
Hide file tree
Showing 11 changed files with 693 additions and 405 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test-query-compiler-template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,4 @@ jobs:
- name: "Run tests"
env:
PARTITION: ${{ matrix.partition }}
run: cargo nextest run --package query-engine-tests --test-threads=1 --partition hash:"$PARTITION"
run: cargo nextest run --package query-engine-tests --partition hash:"$PARTITION"
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ test-pg-wasm: dev-pg-wasm test-qe-st
dev-pg-qc: start-pg-js build-qc-wasm build-driver-adapters-kit-qc
cp $(CONFIG_PATH)/pg-qc $(CONFIG_FILE)

test-pg-qc: dev-pg-qc test-qe-st
test-pg-qc: dev-pg-qc test-qe

test-driver-adapter-pg: test-pg-js
test-driver-adapter-pg-wasm: test-pg-wasm
Expand Down
4 changes: 2 additions & 2 deletions libs/driver-adapters/executor/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
"description": "",
"private": true,
"scripts": {
"build": "tsup ./src/testd-qe.ts ./src/testd-qc.ts ./src/demo-se.ts ./src/bench.ts --format esm --dts",
"build": "tsup ./src/testd-qe.ts ./src/qc-test-runner/runner.ts ./src/qc-test-runner/worker.ts ./src/demo-se.ts ./src/bench.ts --format esm --dts",
"build:qe": "tsup ./src/testd-qe.ts ./src/bench.ts --format esm --dts",
"build:qc": "tsup ./src/testd-qc.ts --format esm --dts",
"build:qc": "tsup ./src/qc-test-runner/runner.ts ./src/qc-test-runner/worker.ts --format esm --dts",
"test:qe": "node --import tsx ./src/testd-qe.ts",
"test:qc": "node --import tsx ./src/testd-qc.ts",
"demo:se": "node --import tsx ./src/demo-se.ts",
Expand Down
2 changes: 1 addition & 1 deletion libs/driver-adapters/executor/script/testd-qc.sh
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
#!/usr/bin/env bash
node "$(dirname "${BASH_SOURCE[0]}")/../dist/testd-qc.js"
node "$(dirname "${BASH_SOURCE[0]}")/../dist/qc-test-runner/runner.js"
19 changes: 19 additions & 0 deletions libs/driver-adapters/executor/src/panic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,25 @@ export function setupDefaultPanicHandler() {
}
}

export function withLocalPanicHandler<T>(fn: () => T): T {
const previousHandler = global.PRISMA_WASM_PANIC_REGISTRY.set_message
let panic: string | undefined = undefined

global.PRISMA_WASM_PANIC_REGISTRY.set_message = (message) => {
panic = message
}

try {
return fn()
} finally {
global.PRISMA_WASM_PANIC_REGISTRY.set_message = previousHandler

if (panic) {
throw new PanicError(panic)
}
}
}

export class PanicError extends Error {
constructor(message: string) {
super('Panic in WASM module: ' + message)
Expand Down
202 changes: 202 additions & 0 deletions libs/driver-adapters/executor/src/qc-test-runner/runner.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
import * as events from 'node:events'
import * as path from 'node:path'
import * as readline from 'node:readline'
import * as util from 'node:util'
import { Worker, MessageChannel } from 'node:worker_threads'
import * as S from '@effect/schema/Schema'

import { Env, jsonRpc } from '../types'
import { assertNever, debug, err } from '../utils'
import { SchemaId } from '../types/jsonRpc'
import type { Message } from './worker'

async function main(): Promise<void> {
const env = S.decodeUnknownSync(Env)(process.env)
console.log('[env]', env)

const iface = readline.createInterface({
input: process.stdin,
output: process.stdout,
terminal: false,
})

iface.on('line', async (line) => {
try {
const request = S.decodeSync(jsonRpc.RequestFromString)(line)
debug(`Got a request: ${line}`)

try {
const response = await handleRequest(request, env)
respondOk(request.id, response)
} catch (err) {
debug('[nodejs] Error from request handler: ', err)
respondErr(request.id, {
code: 1,
message: err.stack ?? err.toString(),
})
}
} catch (err) {
debug('Received non-json line: ', line)
console.error(err)
}
})
}

const state: Record<
SchemaId,
{
worker: Worker
health: { status: 'running' } | { status: 'terminated'; error: Error }
}
> = {}

async function handleRequest(
{ method, params }: jsonRpc.Request,
env: Env,
): Promise<unknown> {
const schemaState = state[params.schemaId]

if (schemaState?.health.status === 'terminated') {
throw schemaState.health.error
}

if (method !== 'initializeSchema') {
if (schemaState === undefined) {
throw new Error(
`Schema with id ${params.schemaId} is not initialized. Please call 'initializeSchema' first.`,
)
}
}

switch (method) {
case 'initializeSchema': {
debug('Got `initializeSchema`', params)

state[params.schemaId] = {
worker: new Worker(path.join(import.meta.dirname, 'worker.js')),
health: { status: 'running' },
}

const schemaState = state[params.schemaId]

schemaState.worker.on('error', (error) => {
console.error('Worker error:', error)
schemaState.health = { status: 'terminated', error }
})

return await messageWorker(schemaState.worker, {
type: 'initializeSchema',
params,
env,
})
}

case 'query': {
debug('Got `query`', util.inspect(params, false, null, true))

return await messageWorker(schemaState.worker, {
type: 'query',
params,
})
}

case 'startTx': {
debug('Got `startTx`', params)

return await messageWorker(schemaState.worker, {
type: 'startTx',
params,
})
}

case 'commitTx': {
debug('Got `commitTx`', params)

return await messageWorker(schemaState.worker, {
type: 'commitTx',
params,
})
}

case 'rollbackTx': {
debug('Got `rollbackTx`', params)

return await messageWorker(schemaState.worker, {
type: 'rollbackTx',
params,
})
}

case 'teardown': {
debug('Got `teardown`', params)

await messageWorker(schemaState.worker, {
type: 'teardown',
})

await schemaState.worker.terminate()

return {}
}

case 'getLogs': {
debug('Got `getLogs`', params)

return await messageWorker(schemaState.worker, {
type: 'getLogs',
})
}

default: {
assertNever(method, `Unknown method: \`${method}\``)
}
}
}

function respondErr(requestId: number, error: jsonRpc.RpcError) {
const msg: jsonRpc.ErrResponse = {
jsonrpc: '2.0',
id: requestId,
error,
}
console.log(JSON.stringify(msg))
}

function respondOk(requestId: number, payload: unknown) {
const msg: jsonRpc.OkResponse = {
jsonrpc: '2.0',
id: requestId,
result: payload,
}
console.log(JSON.stringify(msg))
}

type MessageWithoutResponsePort = {
[K in Message['type']]: Omit<Extract<Message, { type: K }>, 'responsePort'>
}[Message['type']]

async function messageWorker(
worker: Worker,
message: MessageWithoutResponsePort,
): Promise<unknown> {
const { port1, port2 } = new MessageChannel()
const responsePromise = events.once(port1, 'message')

worker.postMessage(
{
...message,
responsePort: port2,
},
[port2],
)

const [response] = await responsePromise

if (response instanceof Error) {
throw response
}

return response
}

main().catch(err)
Loading

0 comments on commit 6847763

Please sign in to comment.