Skip to content

Commit

Permalink
Async agency
Browse files Browse the repository at this point in the history
  • Loading branch information
VRSEN committed Jan 23, 2024
1 parent df10db6 commit ed4535b
Show file tree
Hide file tree
Showing 5 changed files with 367 additions and 22 deletions.
2 changes: 1 addition & 1 deletion agency_swarm/agency/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .agency import Agency
from .agency import Agency
82 changes: 69 additions & 13 deletions agency_swarm/agency/agency.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
import uuid
from enum import Enum
from typing import List, TypedDict, Callable, Any, Dict
from typing import List, TypedDict, Callable, Any, Dict, Literal

from pydantic import Field, field_validator
from rich.console import Console
Expand All @@ -20,13 +20,19 @@ class SettingsCallbacks(TypedDict):
load: Callable[[], List[Dict]]
save: Callable[[List[Dict]], Any]


class ThreadsCallbacks(TypedDict):
load: Callable[[], Dict]
save: Callable[[Dict], Any]


class Agency:
ThreadType = Thread
send_message_tool_description = """Use this tool to facilitate direct, synchronous communication between specialized agents within your agency. When you send a message using this tool, you receive a response exclusively from the designated recipient agent. To continue the dialogue, invoke this tool again with the desired recipient and your follow-up message. Remember, communication here is synchronous; the recipient agent won't perform any tasks post-response. You are responsible for relaying the recipient agent's responses back to the user, as they do not have direct access to these replies. Keep engaging with the tool for continuous interaction until the task is fully resolved."""
send_message_tool_description_async = """Use this tool to facilitate direct, asynchronous communication between specialized agents within your agency. When you send a message using this tool, you initiate the task with the recepient agent. To check the status of the task and recieve a response, please invoke the 'GetResponse' tool with the same recipient agent. You are responsible for relaying the recipient agent's responses back to the user, as they do not have direct access to these replies. Remember that you can't check the status yourself later, user needs to tell you when to do so. Keep engaging with this tool until the task is fully resolved."""

def __init__(self, agency_chart: List, shared_instructions: str = "", shared_files: List = None,
async_mode: Literal['threading'] = None,
settings_callbacks: SettingsCallbacks = None, threads_callbacks: ThreadsCallbacks = None):
"""
Initializes the Agency object, setting up agents, threads, and core functionalities.
Expand All @@ -40,6 +46,11 @@ def __init__(self, agency_chart: List, shared_instructions: str = "", shared_fil
This constructor initializes various components of the Agency, including CEO, agents, threads, and user interactions. It parses the agency chart to set up the organizational structure and initializes the messaging tools, agents, and threads necessary for the operation of the agency. Additionally, it prepares a main thread for user interactions.
"""
self.async_mode = async_mode
if self.async_mode == "threading":
from agency_swarm.threads.thread_async import ThreadAsync
self.ThreadType = ThreadAsync

self.ceo = None
self.agents = []
self.agents_and_threads = {}
Expand All @@ -55,7 +66,7 @@ def __init__(self, agency_chart: List, shared_instructions: str = "", shared_fil
self.shared_instructions = shared_instructions

self._parse_agency_chart(agency_chart)
self._create_send_message_tools()
self._create_special_tools()
self._init_agents()
self._init_threads()

Expand Down Expand Up @@ -115,9 +126,10 @@ def _init_threads(self):

for agent_name, threads in self.agents_and_threads.items():
for other_agent, items in threads.items():
self.agents_and_threads[agent_name][other_agent] = Thread(self.get_agent_by_name(items["agent"]),
self.get_agent_by_name(
items["recipient_agent"]))
self.agents_and_threads[agent_name][other_agent] = self.ThreadType(
self.get_agent_by_name(items["agent"]),
self.get_agent_by_name(
items["recipient_agent"]))

if agent_name in loaded_thread_ids and other_agent in loaded_thread_ids[agent_name]:
self.agents_and_threads[agent_name][other_agent].id = loaded_thread_ids[agent_name][other_agent]
Expand Down Expand Up @@ -366,7 +378,7 @@ def _read_instructions(self, path):
def plot_agency_chart(self):
pass

def _create_send_message_tools(self):
def _create_special_tools(self):
"""
Creates and assigns 'SendMessage' tools to each agent based on the agency's structure.
Expand All @@ -381,6 +393,8 @@ def _create_send_message_tools(self):
recipient_agents = self.get_agents_by_names(recipient_names)
agent = self.get_agent_by_name(agent_name)
agent.add_tool(self._create_send_message_tool(agent, recipient_agents))
if self.async_mode:
agent.add_tool(self._create_get_response_tool(agent, recipient_agents))

def _create_send_message_tool(self, agent: Agent, recipient_agents: List[Agent]):
"""
Expand All @@ -407,7 +421,6 @@ def _create_send_message_tool(self, agent: Agent, recipient_agents: List[Agent])
outer_self = self

class SendMessage(BaseTool):
"""Use this tool to facilitate direct, synchronous communication between specialized agents within your agency. When you send a message using this tool, you receive a response exclusively from the designated recipient agent. To continue the dialogue, invoke this tool again with the desired recipient and your follow-up message. Remember, communication here is synchronous; the recipient agent won't perform any tasks post-response. You are responsible for relaying the recipient agent's responses back to the user, as they do not have direct access to these replies. Keep engaging with the tool for continuous interaction until the task is fully resolved."""
instructions: str = Field(...,
description="Please repeat your instructions step-by-step, including both completed "
"and the following next steps that you need to perfrom. For multi-step complex tasks, first break them down "
Expand Down Expand Up @@ -439,19 +452,62 @@ def check_caller_agent_name(cls, value):
def run(self):
thread = outer_self.agents_and_threads[self.caller_agent_name][self.recipient.value]

gen = thread.get_completion(message=self.message, message_files=self.message_files)
try:
while True:
yield next(gen)
except StopIteration as e:
message = e.value
if not outer_self.async_mode:
gen = thread.get_completion(message=self.message, message_files=self.message_files)
try:
while True:
yield next(gen)
except StopIteration as e:
message = e.value
else:
message = thread.get_completion_async(message=self.message, message_files=self.message_files)

return message or ""

SendMessage.caller_agent = agent
if self.async_mode:
SendMessage.__doc__ = self.send_message_tool_description_async
else:
SendMessage.__doc__ = self.send_message_tool_description

return SendMessage

def _create_get_response_tool(self, agent: Agent, recipient_agents: List[Agent]):
"""
Creates a CheckStatus tool to enable an agent to check the status of a task with a specified recipient agent.
"""
recipient_names = [agent.name for agent in recipient_agents]
recipients = Enum("recipient", {name: name for name in recipient_names})

outer_self = self

class GetResponse(BaseTool):
"""This tool allows you to check the status of a task or get a response from a specified recipient agent, if the task has been completed. You must always use 'SendMessage' tool first."""
recipient: recipients = Field(..., description=f"Recipient agent that you want to check the status of. Valid recipients are: {recipient_names}")
caller_agent_name: str = Field(default=agent.name,
description="The agent calling this tool. Defaults to your name. Do not change it.")

@field_validator('recipient')
def check_recipient(cls, value):
if value.value not in recipient_names:
raise ValueError(f"Recipient {value} is not valid. Valid recipients are: {recipient_names}")
return value

@field_validator('caller_agent_name')
def check_caller_agent_name(cls, value):
if value != agent.name:
raise ValueError(f"Caller agent name must be {agent.name}.")
return value

def run(self):
thread = outer_self.agents_and_threads[self.caller_agent_name][self.recipient.value]

return thread.check_status()

GetResponse.caller_agent = agent

return GetResponse

def get_recipient_names(self):
"""
Retrieves the names of all agents in the agency.
Expand Down
53 changes: 53 additions & 0 deletions agency_swarm/threads/thread_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from agency_swarm.threads import Thread
import threading
from typing import Literal
from agency_swarm.agents import Agent
from agency_swarm.messages import MessageOutput
from agency_swarm.user import User
from agency_swarm.util.oai import get_openai_client


class ThreadAsync(Thread):
def __init__(self, agent: Literal[Agent, User], recipient_agent: Agent):
super().__init__(agent, recipient_agent)
self.pythread = None
self.response = None

def worker(self, message: str, message_files=None):
gen = super().get_completion(message=message, message_files=message_files,
yield_messages=False) # yielding is not supported in async mode
while True:
try:
next(gen)
except StopIteration as e:
self.response = f"""{self.recipient_agent.name} Response: '{e.value}'"""
break

return

def get_completion_async(self, message: str, message_files=None):
if self.pythread and self.pythread.is_alive():
return "System Notification: 'Agent is busy, so your message was not recived. Please always use 'GetResponse' tool to check for status first, before using 'SendMessage' tool again for the same agent.'"
elif self.pythread and not self.pythread.is_alive():
self.pythread.join()
self.pythread = None
return self.response

self.response = None

self.pythread = threading.Thread(target=self.worker,
args=(message, message_files))

self.pythread.start()

return "System Notification: 'Task has started. Please notify the user that they can tell you to check the status later. You can do this with the 'GetResponse' tool, but don't mention this tool to the user. "

def check_status(self):
if self.pythread and self.pythread.is_alive():
return "System Notification: 'Agent is busy. Please tell the user that they need to wait and ask you to check for status again later.'"
elif self.pythread and not self.pythread.is_alive():
self.pythread.join()
self.pythread = None
return self.response
else:
return "System Notification: 'Agent is available. Please use 'SendMessage' tool to send a message.'"
Loading

0 comments on commit ed4535b

Please sign in to comment.