Skip to content

Commit

Permalink
support agent.say in before_llm_cb (#1460)
Browse files Browse the repository at this point in the history
  • Loading branch information
longcw authored Feb 10, 2025
1 parent 117abd2 commit 87d4232
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 30 deletions.
5 changes: 5 additions & 0 deletions .changeset/warm-seahorses-hunt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"livekit-agents": patch
---

support agent.say inside the before_llm_cb
84 changes: 54 additions & 30 deletions livekit-agents/livekit/agents/pipeline/pipeline_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,8 @@ async def say(

if self._playing_speech and not self._playing_speech.nested_speech_done:
self._playing_speech.add_nested_speech(new_handle)
elif self._speech_q:
self._speech_q[0].add_nested_speech(new_handle)
else:
self._add_speech_for_playout(new_handle)

Expand Down Expand Up @@ -776,6 +778,47 @@ async def _synthesize_answer_task(
SpeechDataContextVar.reset(tk)

async def _play_speech(self, speech_handle: SpeechHandle) -> None:
fnc_done_fut = asyncio.Future[None]()
playing_lock = asyncio.Lock()
nested_speech_played = asyncio.Event()

async def _play_nested_speech():
speech_handle._nested_speech_done_fut = asyncio.Future[None]()
while not speech_handle.nested_speech_done:
nesting_changed = asyncio.create_task(
speech_handle.nested_speech_changed.wait()
)
nesting_done_fut: asyncio.Future = speech_handle._nested_speech_done_fut
await asyncio.wait(
[nesting_changed, nesting_done_fut, fnc_done_fut],
return_when=asyncio.FIRST_COMPLETED,
)
if not nesting_changed.done():
nesting_changed.cancel()

while speech_handle.nested_speech_handles:
nested_speech_played.clear()
speech = speech_handle.nested_speech_handles[0]
if speech_handle.nested_speech_done:
# in case tool speech is added after nested speech done
speech.cancel(cancel_nested=True)
speech_handle.nested_speech_handles.pop(0)
continue

async with playing_lock:
self._playing_speech = speech
await self._play_speech(speech)
speech_handle.nested_speech_handles.pop(0)
self._playing_speech = speech_handle

nested_speech_played.set()
speech_handle.nested_speech_changed.clear()
# break if the function calls task is done
if fnc_done_fut.done():
speech_handle.mark_nested_speech_done()

nested_speech_task = asyncio.create_task(_play_nested_speech())

try:
await speech_handle.wait_for_initialization()
except asyncio.CancelledError:
Expand All @@ -789,6 +832,11 @@ async def _play_speech(self, speech_handle: SpeechHandle) -> None:

user_question = speech_handle.user_question

# wait for all pre-added nested speech to be played
while speech_handle.nested_speech_handles:
await nested_speech_played.wait()

await playing_lock.acquire()
play_handle = synthesis_handle.play()
join_fut = play_handle.join()

Expand Down Expand Up @@ -884,6 +932,7 @@ def _commit_user_question_if_needed() -> None:
"speech_id": speech_handle.id,
},
)
playing_lock.release()

@utils.log_exceptions(logger=logger)
async def _execute_function_calls() -> None:
Expand Down Expand Up @@ -1010,40 +1059,15 @@ async def _execute_function_calls() -> None:
_CallContextVar.reset(tk)

if not is_using_tools:
# skip the function calls execution
fnc_done_fut.set_result(None)
await nested_speech_task
speech_handle._set_done()
return

speech_handle._nested_speech_done_fut = asyncio.Future[None]()
fnc_task = asyncio.create_task(_execute_function_calls())
while not speech_handle.nested_speech_done:
nesting_changed = asyncio.create_task(
speech_handle.nested_speech_changed.wait()
)
nesting_done_fut: asyncio.Future = speech_handle._nested_speech_done_fut
await asyncio.wait(
[nesting_changed, fnc_task, nesting_done_fut],
return_when=asyncio.FIRST_COMPLETED,
)
if not nesting_changed.done():
nesting_changed.cancel()

while speech_handle.nested_speech_handles:
speech = speech_handle.nested_speech_handles[0]
if speech_handle.nested_speech_done:
# in case tool speech is added after nested speech done
speech.cancel(cancel_nested=True)
speech_handle.nested_speech_handles.pop(0)
continue

self._playing_speech = speech
await self._play_speech(speech)
speech_handle.nested_speech_handles.pop(0)
self._playing_speech = speech_handle

speech_handle.nested_speech_changed.clear()
# break if the function calls task is done
if fnc_task.done():
speech_handle.mark_nested_speech_done()
fnc_task.add_done_callback(lambda _: fnc_done_fut.set_result(None))
await nested_speech_task

if not fnc_task.done():
logger.debug(
Expand Down

0 comments on commit 87d4232

Please sign in to comment.