Skip to content

Commit

Permalink
Merge pull request #7 from discordjs-japan/threadpool-improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
cm-ayf authored Oct 22, 2023
2 parents ed61c46 + 85e1ece commit 3492d6d
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 94 deletions.
23 changes: 6 additions & 17 deletions src/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {
MessageCollector,
type VoiceBasedChannel,
} from "discord.js";
import { SynthesisWorkerPool } from "./synthesis";
import { synthesizer } from "./synthesis";

export interface StateOptions
extends CreateVoiceConnectionOptions,
Expand All @@ -36,7 +36,6 @@ export default class Pipeline extends EventEmitter {
public readonly player: AudioPlayer;
private readonly collector: MessageCollector;
private audioQueue: AudioResource[] = [];
private readonly pool?: SynthesisWorkerPool;

constructor(public readonly channel: VoiceBasedChannel) {
super();
Expand All @@ -53,16 +52,6 @@ export default class Pipeline extends EventEmitter {
filter: (message) => !message.author.bot,
});

if (process.env.DICTIONARY && process.env.MODEL) {
this.pool = new SynthesisWorkerPool(
process.env.DICTIONARY,
process.env.MODEL,
);
this.pool.on("synthesis", (resource: AudioResource) => {
this.emit("synthesis", resource);
});
}

this.init();
}

Expand All @@ -88,6 +77,10 @@ export default class Pipeline extends EventEmitter {
"collect",
(message) => void this.emit("message", message),
);
synthesizer.on("synthesis", (resource, message) => {
if (message.channelId !== this.channel.id) return;
this.emit("synthesis", resource);
});

this.on("ready", () => {
this.play();
Expand All @@ -99,11 +92,7 @@ export default class Pipeline extends EventEmitter {
Pipeline.#cache.delete(this.channel.guild.id);
});
this.on("message", (message) => {
this.pool?.dispatchSynthesis(
message.cleanContent.length > 200
? `${message.cleanContent.slice(0, 196)} 以下略`
: message.cleanContent,
);
synthesizer.dispatchSynthesis(message);
});
this.on("synthesis", (audio) => {
this.audioQueue.push(audio);
Expand Down
83 changes: 8 additions & 75 deletions src/synthesis/index.ts
Original file line number Diff line number Diff line change
@@ -1,77 +1,10 @@
import { Readable } from "node:stream";
import { StreamType, createAudioResource } from "@discordjs/voice";
import { AltJTalkConfig } from "node-altjtalk-binding";
import { Result, Task } from "./common";
import WorkerPool from "./worker-pool";
import { Synthesizer } from "./synthesizer";
import WorkerSynthesizer from "./worker-synthesizer";

export class SynthesisWorkerPool extends WorkerPool<
Task,
Result,
AltJTalkConfig,
object
> {
constructor(dictionary: string, model: string) {
super(
new URL("task", import.meta.url),
{
dictionary,
model,
},
process.env.NUM_THREADS ? Number(process.env.NUM_THREADS) : 1,
);
this.on("data", ({ data }: Result) => {
const resource = createAudioResource(new SynthesizedSoundStream(data), {
inputType: StreamType.Raw,
});
this.emit("synthesis", resource);
});
}

public dispatchSynthesis(inputText: string) {
this.dispatchTask(
{
inputText,
option: {
samplingFrequency: 48000,
},
},
{},
);
}
}

class SynthesizedSoundStream extends Readable {
private pos: number = 0;
private buf: Int16Array | null;
constructor(buf: Int16Array) {
super();
this.buf = buf;
}
_read(size: number = ((48000 * 2 * 2) / 1000) * 20) {
if (!this.buf) {
throw new Error("Stream ended");
}

const offset = this.pos;
let end = Math.ceil(size / 4);
if (end + offset > this.buf.length) {
end = this.buf.length - offset;
}
const buf = Buffer.alloc(end * 4);
const dst = new Int16Array(buf.buffer);
for (let i = 0; i < end; ++i) {
const elem = this.buf[i + offset];
dst[i * 2] = elem;
dst[i * 2 + 1] = elem;
}
this.push(buf);
this.pos += end;
if (this.pos == this.buf.length) {
this.buf = null;
this.push(null);
}
}
_destroy() {
this.buf = null;
}
if (!process.env.DICTIONARY || !process.env.MODEL) {
throw new Error("Dictionary and model must be specified.");
}
export const synthesizer: Synthesizer = new WorkerSynthesizer(
process.env.DICTIONARY,
process.env.MODEL,
);
6 changes: 6 additions & 0 deletions src/synthesis/options.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { Message } from "discord.js";
import { SynthesisOption } from "node-altjtalk-binding";

export function createSynthesisOption(_: Message): SynthesisOption {
return {};
}
37 changes: 37 additions & 0 deletions src/synthesis/stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { Readable } from "node:stream";

export default class SynthesizedSoundStream extends Readable {
private pos: number = 0;
private buf: Int16Array | null;
constructor(buf: Int16Array) {
super();
this.buf = buf;
}
_read(size: number = ((48000 * 2 * 2) / 1000) * 20) {
if (!this.buf) {
throw new Error("Stream ended");
}

const offset = this.pos;
let end = Math.ceil(size / 4);
if (end + offset > this.buf.length) {
end = this.buf.length - offset;
}
const buf = Buffer.alloc(end * 4);
const dst = new Int16Array(buf.buffer);
for (let i = 0; i < end; ++i) {
const elem = this.buf[i + offset];
dst[i * 2] = elem;
dst[i * 2 + 1] = elem;
}
this.push(buf);
this.pos += end;
if (this.pos == this.buf.length) {
this.buf = null;
this.push(null);
}
}
_destroy() {
this.buf = null;
}
}
14 changes: 14 additions & 0 deletions src/synthesis/synthesizer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { AudioResource } from "@discordjs/voice";
import { Message } from "discord.js";

export interface Synthesizer {
dispatchSynthesis(message: Message): void;
on<K extends keyof SynthesizerEvents>(
event: K,
listener: (...args: SynthesizerEvents[K]) => void,
): this;
}

export interface SynthesizerEvents {
synthesis: [resource: AudioResource, message: Message];
}
37 changes: 35 additions & 2 deletions src/synthesis/worker-pool.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable @typescript-eslint/no-unsafe-declaration-merging */
// Reference: https://nodejs.org/api/async_context.html#using-asyncresource-for-a-worker-thread-pool

import { EventEmitter } from "node:events";
Expand Down Expand Up @@ -56,7 +57,7 @@ export default class WorkerPool<
const prop = this.workerInfo.get(worker.threadId);
if (!prop) return;

this.emit("data", result);
this.emit("data", result, prop);
this.workerInfo.delete(worker.threadId);

this.freeWorkers.push(worker);
Expand All @@ -77,7 +78,7 @@ export default class WorkerPool<
this.emit(kWorkerFreedEvent);
}

protected dispatchTask(task: Task, prop: TaskProperty) {
public dispatchTask(task: Task, prop: TaskProperty) {
const worker = this.freeWorkers.pop();
if (!worker) {
// No free threads, wait until a worker thread becomes free.
Expand All @@ -93,3 +94,35 @@ export default class WorkerPool<
for (const worker of this.workers) await worker.terminate();
}
}

export default interface WorkerPool<
// eslint-disable-next-line @typescript-eslint/no-unused-vars
Task,
Result,
// eslint-disable-next-line @typescript-eslint/no-unused-vars
WorkerData,
TaskProperty extends object,
> {
on<K extends keyof WorkerPoolEvents<Result, TaskProperty>>(
event: K,
listener: (...args: WorkerPoolEvents<Result, TaskProperty>[K]) => void,
): this;
once<K extends keyof WorkerPoolEvents<Result, TaskProperty>>(
event: K,
listener: (...args: WorkerPoolEvents<Result, TaskProperty>[K]) => void,
): this;
off<K extends keyof WorkerPoolEvents<Result, TaskProperty>>(
event: K,
listener: (...args: WorkerPoolEvents<Result, TaskProperty>[K]) => void,
): this;
emit<K extends keyof WorkerPoolEvents<Result, TaskProperty>>(
event: K,
...args: WorkerPoolEvents<Result, TaskProperty>[K]
): boolean;
}

export interface WorkerPoolEvents<Result, TaskProperty> {
data: [result: Result, prop: TaskProperty];
error: [error: unknown];
[kWorkerFreedEvent]: [];
}
73 changes: 73 additions & 0 deletions src/synthesis/worker-synthesizer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/* eslint-disable @typescript-eslint/no-unsafe-declaration-merging */
import EventEmitter from "events";
import { StreamType, createAudioResource } from "@discordjs/voice";
import { Message } from "discord.js";
import { AltJTalkConfig } from "node-altjtalk-binding";
import { Result, Task } from "./common";
import { createSynthesisOption } from "./options";
import SynthesizedSoundStream from "./stream";
import { Synthesizer, SynthesizerEvents } from "./synthesizer";
import WorkerPool from "./worker-pool";

export default class WorkerSynthesizer
extends EventEmitter
implements Synthesizer
{
workerPool: WorkerPool<Task, Result, AltJTalkConfig, Message>;

constructor(dictionary: string, model: string) {
super();
this.workerPool = new WorkerPool(
new URL("task", import.meta.url),
{
dictionary,
model,
},
process.env.NUM_THREADS ? Number(process.env.NUM_THREADS) : 1,
);
this.workerPool.on("data", ({ data }: Result, message) => {
const resource = createAudioResource(new SynthesizedSoundStream(data), {
inputType: StreamType.Raw,
});
this.emit("synthesis", resource, message);
});
}

public dispatchSynthesis(message: Message) {
const inputText =
message.cleanContent.length > 200
? `${message.cleanContent.slice(0, 196)} 以下略`
: message.cleanContent;
const option = createSynthesisOption(message);

this.workerPool.dispatchTask(
{
inputText,
option: {
...option,
samplingFrequency: 48000,
},
},
message,
);
}
}

export default interface WorkerSynthesizer {
on<K extends keyof SynthesizerEvents>(
event: K,
listener: (...args: SynthesizerEvents[K]) => void,
): this;
once<K extends keyof SynthesizerEvents>(
event: K,
listener: (...args: SynthesizerEvents[K]) => void,
): this;
off<K extends keyof SynthesizerEvents>(
event: K,
listener: (...args: SynthesizerEvents[K]) => void,
): this;
emit<K extends keyof SynthesizerEvents>(
event: K,
...args: SynthesizerEvents[K]
): boolean;
}

0 comments on commit 3492d6d

Please sign in to comment.