From d2f1ef9f54cd7dd13892aa2ebe2f3f348b01afcb Mon Sep 17 00:00:00 2001 From: aoife cassidy Date: Sun, 10 Nov 2024 21:24:09 +0200 Subject: [PATCH] fix(proc): behave correctly on numIdleProcesses: 0 (#142) --- .changeset/funny-plants-taste.md | 5 +++++ agents/src/ipc/proc_pool.ts | 34 +++++++++++++++++++++----------- 2 files changed, 28 insertions(+), 11 deletions(-) create mode 100644 .changeset/funny-plants-taste.md diff --git a/.changeset/funny-plants-taste.md b/.changeset/funny-plants-taste.md new file mode 100644 index 00000000..3be37f8c --- /dev/null +++ b/.changeset/funny-plants-taste.md @@ -0,0 +1,5 @@ +--- +"@livekit/agents": patch +--- + +fix(proc): behave correctly on numIdleProcesses: 0 diff --git a/agents/src/ipc/proc_pool.ts b/agents/src/ipc/proc_pool.ts index 5f2a3f04..33cb9936 100644 --- a/agents/src/ipc/proc_pool.ts +++ b/agents/src/ipc/proc_pool.ts @@ -17,7 +17,7 @@ export class ProcPool { closed = false; controller = new AbortController(); initMutex = new Mutex(); - procMutex: MultiMutex; + procMutex?: MultiMutex; procUnlock?: () => void; warmedProcQueue = new Queue(); @@ -28,7 +28,9 @@ export class ProcPool { closeTimeout: number, ) { this.agent = agent; - this.procMutex = new MultiMutex(numIdleProcesses); + if (numIdleProcesses > 0) { + this.procMutex = new MultiMutex(numIdleProcesses); + } this.initializeTimeout = initializeTimeout; this.closeTimeout = closeTimeout; } @@ -42,10 +44,18 @@ export class ProcPool { } async launchJob(info: RunningJobInfo) { - const proc = await this.warmedProcQueue.get(); - if (this.procUnlock) { - this.procUnlock(); - this.procUnlock = undefined; + let proc: JobExecutor; + if (this.procMutex) { + proc = await this.warmedProcQueue.get(); + if (this.procUnlock) { + this.procUnlock(); + this.procUnlock = undefined; + } + } else { + proc = new ProcJobExecutor(this.agent, this.initializeTimeout, this.closeTimeout); + this.executors.push(proc); + await proc.start(); + await proc.initialize(); } await proc.launchJob(info); } @@ -89,11 +99,13 @@ export class ProcPool { } async run(signal: AbortSignal) { - while (!signal.aborted) { - this.procUnlock = await this.procMutex.lock(); - const task = this.procWatchTask(); - this.tasks.push(task); - task.finally(() => this.tasks.splice(this.tasks.indexOf(task))); + if (this.procMutex) { + while (!signal.aborted) { + this.procUnlock = await this.procMutex.lock(); + const task = this.procWatchTask(); + this.tasks.push(task); + task.finally(() => this.tasks.splice(this.tasks.indexOf(task))); + } } }