diff --git a/src/pipeline.ts b/src/pipeline.ts index fe8d6160..29364ace 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -17,7 +17,7 @@ import { MessageCollector, type VoiceBasedChannel, } from "discord.js"; -import { SynthesisWorkerPool } from "./synthesis"; +import { synthesizer } from "./synthesis"; export interface StateOptions extends CreateVoiceConnectionOptions, @@ -36,7 +36,6 @@ export default class Pipeline extends EventEmitter { public readonly player: AudioPlayer; private readonly collector: MessageCollector; private audioQueue: AudioResource[] = []; - private readonly pool?: SynthesisWorkerPool; constructor(public readonly channel: VoiceBasedChannel) { super(); @@ -53,16 +52,6 @@ export default class Pipeline extends EventEmitter { filter: (message) => !message.author.bot, }); - if (process.env.DICTIONARY && process.env.MODEL) { - this.pool = new SynthesisWorkerPool( - process.env.DICTIONARY, - process.env.MODEL, - ); - this.pool.on("synthesis", (resource: AudioResource) => { - this.emit("synthesis", resource); - }); - } - this.init(); } @@ -88,6 +77,10 @@ export default class Pipeline extends EventEmitter { "collect", (message) => void this.emit("message", message), ); + synthesizer.on("synthesis", (resource, message) => { + if (message.channelId !== this.channel.id) return; + this.emit("synthesis", resource); + }); this.on("ready", () => { this.play(); @@ -99,11 +92,7 @@ export default class Pipeline extends EventEmitter { Pipeline.#cache.delete(this.channel.guild.id); }); this.on("message", (message) => { - this.pool?.dispatchSynthesis( - message.cleanContent.length > 200 - ? `${message.cleanContent.slice(0, 196)} 以下略` - : message.cleanContent, - ); + synthesizer.dispatchSynthesis(message); }); this.on("synthesis", (audio) => { this.audioQueue.push(audio); diff --git a/src/synthesis/index.ts b/src/synthesis/index.ts index 58d1114d..f8ec8c78 100644 --- a/src/synthesis/index.ts +++ b/src/synthesis/index.ts @@ -1,77 +1,10 @@ -import { Readable } from "node:stream"; -import { StreamType, createAudioResource } from "@discordjs/voice"; -import { AltJTalkConfig } from "node-altjtalk-binding"; -import { Result, Task } from "./common"; -import WorkerPool from "./worker-pool"; +import { Synthesizer } from "./synthesizer"; +import WorkerSynthesizer from "./worker-synthesizer"; -export class SynthesisWorkerPool extends WorkerPool< - Task, - Result, - AltJTalkConfig, - object -> { - constructor(dictionary: string, model: string) { - super( - new URL("task", import.meta.url), - { - dictionary, - model, - }, - process.env.NUM_THREADS ? Number(process.env.NUM_THREADS) : 1, - ); - this.on("data", ({ data }: Result) => { - const resource = createAudioResource(new SynthesizedSoundStream(data), { - inputType: StreamType.Raw, - }); - this.emit("synthesis", resource); - }); - } - - public dispatchSynthesis(inputText: string) { - this.dispatchTask( - { - inputText, - option: { - samplingFrequency: 48000, - }, - }, - {}, - ); - } -} - -class SynthesizedSoundStream extends Readable { - private pos: number = 0; - private buf: Int16Array | null; - constructor(buf: Int16Array) { - super(); - this.buf = buf; - } - _read(size: number = ((48000 * 2 * 2) / 1000) * 20) { - if (!this.buf) { - throw new Error("Stream ended"); - } - - const offset = this.pos; - let end = Math.ceil(size / 4); - if (end + offset > this.buf.length) { - end = this.buf.length - offset; - } - const buf = Buffer.alloc(end * 4); - const dst = new Int16Array(buf.buffer); - for (let i = 0; i < end; ++i) { - const elem = this.buf[i + offset]; - dst[i * 2] = elem; - dst[i * 2 + 1] = elem; - } - this.push(buf); - this.pos += end; - if (this.pos == this.buf.length) { - this.buf = null; - this.push(null); - } - } - _destroy() { - this.buf = null; - } +if (!process.env.DICTIONARY || !process.env.MODEL) { + throw new Error("Dictionary and model must be specified."); } +export const synthesizer: Synthesizer = new WorkerSynthesizer( + process.env.DICTIONARY, + process.env.MODEL, +); diff --git a/src/synthesis/options.ts b/src/synthesis/options.ts new file mode 100644 index 00000000..943e0f34 --- /dev/null +++ b/src/synthesis/options.ts @@ -0,0 +1,6 @@ +import { Message } from "discord.js"; +import { SynthesisOption } from "node-altjtalk-binding"; + +export function createSynthesisOption(_: Message): SynthesisOption { + return {}; +} diff --git a/src/synthesis/stream.ts b/src/synthesis/stream.ts new file mode 100644 index 00000000..02b99155 --- /dev/null +++ b/src/synthesis/stream.ts @@ -0,0 +1,37 @@ +import { Readable } from "node:stream"; + +export default class SynthesizedSoundStream extends Readable { + private pos: number = 0; + private buf: Int16Array | null; + constructor(buf: Int16Array) { + super(); + this.buf = buf; + } + _read(size: number = ((48000 * 2 * 2) / 1000) * 20) { + if (!this.buf) { + throw new Error("Stream ended"); + } + + const offset = this.pos; + let end = Math.ceil(size / 4); + if (end + offset > this.buf.length) { + end = this.buf.length - offset; + } + const buf = Buffer.alloc(end * 4); + const dst = new Int16Array(buf.buffer); + for (let i = 0; i < end; ++i) { + const elem = this.buf[i + offset]; + dst[i * 2] = elem; + dst[i * 2 + 1] = elem; + } + this.push(buf); + this.pos += end; + if (this.pos == this.buf.length) { + this.buf = null; + this.push(null); + } + } + _destroy() { + this.buf = null; + } +} diff --git a/src/synthesis/synthesizer.ts b/src/synthesis/synthesizer.ts new file mode 100644 index 00000000..deac56cb --- /dev/null +++ b/src/synthesis/synthesizer.ts @@ -0,0 +1,14 @@ +import { AudioResource } from "@discordjs/voice"; +import { Message } from "discord.js"; + +export interface Synthesizer { + dispatchSynthesis(message: Message): void; + on( + event: K, + listener: (...args: SynthesizerEvents[K]) => void, + ): this; +} + +export interface SynthesizerEvents { + synthesis: [resource: AudioResource, message: Message]; +} diff --git a/src/synthesis/worker-pool.ts b/src/synthesis/worker-pool.ts index 56d63922..b28ed420 100644 --- a/src/synthesis/worker-pool.ts +++ b/src/synthesis/worker-pool.ts @@ -1,3 +1,4 @@ +/* eslint-disable @typescript-eslint/no-unsafe-declaration-merging */ // Reference: https://nodejs.org/api/async_context.html#using-asyncresource-for-a-worker-thread-pool import { EventEmitter } from "node:events"; @@ -56,7 +57,7 @@ export default class WorkerPool< const prop = this.workerInfo.get(worker.threadId); if (!prop) return; - this.emit("data", result); + this.emit("data", result, prop); this.workerInfo.delete(worker.threadId); this.freeWorkers.push(worker); @@ -77,7 +78,7 @@ export default class WorkerPool< this.emit(kWorkerFreedEvent); } - protected dispatchTask(task: Task, prop: TaskProperty) { + public dispatchTask(task: Task, prop: TaskProperty) { const worker = this.freeWorkers.pop(); if (!worker) { // No free threads, wait until a worker thread becomes free. @@ -93,3 +94,35 @@ export default class WorkerPool< for (const worker of this.workers) await worker.terminate(); } } + +export default interface WorkerPool< + // eslint-disable-next-line @typescript-eslint/no-unused-vars + Task, + Result, + // eslint-disable-next-line @typescript-eslint/no-unused-vars + WorkerData, + TaskProperty extends object, +> { + on>( + event: K, + listener: (...args: WorkerPoolEvents[K]) => void, + ): this; + once>( + event: K, + listener: (...args: WorkerPoolEvents[K]) => void, + ): this; + off>( + event: K, + listener: (...args: WorkerPoolEvents[K]) => void, + ): this; + emit>( + event: K, + ...args: WorkerPoolEvents[K] + ): boolean; +} + +export interface WorkerPoolEvents { + data: [result: Result, prop: TaskProperty]; + error: [error: unknown]; + [kWorkerFreedEvent]: []; +} diff --git a/src/synthesis/worker-synthesizer.ts b/src/synthesis/worker-synthesizer.ts new file mode 100644 index 00000000..384e27e7 --- /dev/null +++ b/src/synthesis/worker-synthesizer.ts @@ -0,0 +1,73 @@ +/* eslint-disable @typescript-eslint/no-unsafe-declaration-merging */ +import EventEmitter from "events"; +import { StreamType, createAudioResource } from "@discordjs/voice"; +import { Message } from "discord.js"; +import { AltJTalkConfig } from "node-altjtalk-binding"; +import { Result, Task } from "./common"; +import { createSynthesisOption } from "./options"; +import SynthesizedSoundStream from "./stream"; +import { Synthesizer, SynthesizerEvents } from "./synthesizer"; +import WorkerPool from "./worker-pool"; + +export default class WorkerSynthesizer + extends EventEmitter + implements Synthesizer +{ + workerPool: WorkerPool; + + constructor(dictionary: string, model: string) { + super(); + this.workerPool = new WorkerPool( + new URL("task", import.meta.url), + { + dictionary, + model, + }, + process.env.NUM_THREADS ? Number(process.env.NUM_THREADS) : 1, + ); + this.workerPool.on("data", ({ data }: Result, message) => { + const resource = createAudioResource(new SynthesizedSoundStream(data), { + inputType: StreamType.Raw, + }); + this.emit("synthesis", resource, message); + }); + } + + public dispatchSynthesis(message: Message) { + const inputText = + message.cleanContent.length > 200 + ? `${message.cleanContent.slice(0, 196)} 以下略` + : message.cleanContent; + const option = createSynthesisOption(message); + + this.workerPool.dispatchTask( + { + inputText, + option: { + ...option, + samplingFrequency: 48000, + }, + }, + message, + ); + } +} + +export default interface WorkerSynthesizer { + on( + event: K, + listener: (...args: SynthesizerEvents[K]) => void, + ): this; + once( + event: K, + listener: (...args: SynthesizerEvents[K]) => void, + ): this; + off( + event: K, + listener: (...args: SynthesizerEvents[K]) => void, + ): this; + emit( + event: K, + ...args: SynthesizerEvents[K] + ): boolean; +}