diff --git a/docs/sphinx_doc/en/source/tutorial/208-distribute.md b/docs/sphinx_doc/en/source/tutorial/208-distribute.md index 0381a13f1..01c0a4bc8 100644 --- a/docs/sphinx_doc/en/source/tutorial/208-distribute.md +++ b/docs/sphinx_doc/en/source/tutorial/208-distribute.md @@ -61,7 +61,7 @@ b = AgentB( In the Independent Process Mode, we need to start the agent server process on the target machine first. When starting the agent server process, you need to specify a model config file, which contains the models which can be used in the agent server, the IP address and port of the agent server process For example, start two agent server processes on the two different machines with IP `ip_a` and `ip_b`(called `Machine1` and `Machine2` accrodingly). -You can run the following code on `Machine1`.Before running, make sure that the machine has access to all models that used in your application, specifically, you need to put your model config file in `model_config_path_a` and set environment variables such as your model API key correctly in `Machine1`. The example model config file instances are located under `examples/model_configs_template`. +You can run the following code on `Machine1`.Before running, make sure that the machine has access to all models that used in your application, specifically, you need to put your model config file in `model_config_path_a` and set environment variables such as your model API key correctly in `Machine1`. The example model config file instances are located under `examples/model_configs_template`. In addition, your customized agent classes that need to run in the server must be registered in `custom_agent_classes` so that the server can correctly identify these agents. If you only use AgentScope's built-in agents, you can ignore `custom_agent_classes` field. ```python # import some packages @@ -74,6 +74,7 @@ agentscope.init( server = RpcAgentServerLauncher( host="ip_a", port=12001, # choose an available port + custom_agent_classes=[AgentA, AgentB] # register your customized agent classes ) # Start the service @@ -100,6 +101,7 @@ agentscope.init( server = RpcAgentServerLauncher( host="ip_b", port=12002, # choose an available port + custom_agent_classes=[AgentA, AgentB] # register your customized agent classes ) # Start the service @@ -135,50 +137,6 @@ b = AgentB( The above code will deploy `AgentA` on the agent server process of `Machine1` and `AgentB` on the agent server process of `Machine2`. And developers just need to write the application flow in a centralized way in the main process. -#### Advanced Usage of `to_dist` - -All examples described above convert initialized agents into their distributed version through the {func}`to_dist` method, which is equivalent to initialize the agent twice, once in the main process and once in the agent server process. -For agents whose initialization process is time-consuming, the `to_dist` method is inefficient. Therefore, AgentScope also provides a method to convert the Agent instance into its distributed version while initializing it, that is, passing in `to_dist` parameter to the Agent's initialization function. - -In Child Process Mode, just pass `to_dist=True` to the Agent's initialization function. - -```python -# Child Process mode -a = AgentA( - name="A", - # ... - to_dist=True -) -b = AgentB( - name="B", - # ... - to_dist=True -) -``` - -In Independent Process Mode, you need to encapsulate the parameters of the `to_dist()` method in {class}`DistConf` instance and pass it into the `to_dist` field, for example: - -```python -a = AgentA( - name="A", - # ... - to_dist=DistConf( - host="ip_a", - port=12001, - ), -) -b = AgentB( - name="B", - # ... - to_dist=DistConf( - host="ip_b", - port=12002, - ), -) -``` - -Compared with the original `to_dist()` function call, this method just initializes the agent once in the agent server process. - ### Step 2: Orchestrate Distributed Application Flow In AgentScope, the orchestration of distributed application flow is exactly the same as non-distributed programs, and developers can write the entire application flow in a centralized way. @@ -233,9 +191,133 @@ while x is None or x.content == "exit": x = b(x) ``` -### About Implementation +### Advanced Usage + +#### `to_dist` with lower cost + +All examples described above convert initialized agents into their distributed version through the {func}`to_dist` method, which is equivalent to initialize the agent twice, once in the main process and once in the agent server process. +For agents whose initialization process is time-consuming, the `to_dist` method is inefficient. Therefore, AgentScope also provides a method to convert the Agent instance into its distributed version while initializing it, that is, passing in `to_dist` parameter to the Agent's initialization function. + +In Child Process Mode, just pass `to_dist=True` to the Agent's initialization function. + +```python +# Child Process mode +a = AgentA( + name="A", + # ... + to_dist=True +) +b = AgentB( + name="B", + # ... + to_dist=True +) +``` + +In Independent Process Mode, you need to encapsulate the parameters of the `to_dist()` method in {class}`DistConf` instance and pass it into the `to_dist` field, for example: + +```python +a = AgentA( + name="A", + # ... + to_dist=DistConf( + host="ip_a", + port=12001, + ), +) +b = AgentB( + name="B", + # ... + to_dist=DistConf( + host="ip_b", + port=12002, + ), +) +``` + +Compared with the original `to_dist()` function call, this method just initializes the agent once in the agent server process, which reduces the cost of initialization. + +#### Manage your agent server processes + +When running large-scale multi-agent applications, it's common to have multiple Agent Server processes running. To facilitate management of these processes, AgentScope offers management interfaces in the {class}`RpcAgentClient` class. Here's a brief overview of these methods: + +- `is_alive`: This method checks whether the Agent Server process is still running. + + ```python + client = RpcAgentClient(host=server_host, port=server_port) + if client.is_alive(): + do_something() + ``` + +- `stop`: This method stops the Agent Server process. + + ```python + client.stop() + assert(client.is_alive() == False) + ``` + +- `get_agent_list`: This method retrieves a list of JSON format thumbnails of all agents currently running within the Agent Server process. The thumbnail is generated by the `__str__` method of the Agent instance. + + ```python + agent_list = client.get_agent_list() + print(agent_list) # [agent1_info, agent2_info, ...] + ``` + +- `get_agent_memory`: With this method, you can fetch the memory content of an agent specified by its `agent_id`. + + ```python + agent_id = my_agent.agent_id + agent_memory = client.get_agent_memory(agent_id) + print(agent_memory) # [msg1, msg2, ...] + ``` + +- `get_server_info`:This method provides information about the resource utilization of the Agent Server process, including CPU usage, memory consumption. + + ```python + server_info = client.get_server_info() + print(server_info) # { "cpu": xxx, "mem": xxx } + ``` + +- `set_model_configs`: This method set the specific model configs into the agent server, the agent created later can directly use these model configs. + + ```python + agent = MyAgent( # failed because the model config [my_openai] is not found + # ... + model_config_name="my_openai", + to_dist={ + # ... + } + ) + client.set_model_configs([{ # set the model config [my_openai] + "config_name": "my_openai", + "model_type": "openai_chat", + # ... + }]) + agent = MyAgent( # success + # ... + model_config_name="my_openai", + to_dist={ + # ... + } + ) + ``` + +- `delete_agent`: This method deletes an agent specified by its `agent_id`. + + ```python + agent_id = agent.agent_id + ok = client.delete_agent(agent_id) + ``` + +- `delete_all_agent`: This method deletes all agents currently running within the Agent Server process. + + ```python + ok = client.delete_all_agent() + ``` + +## Implementation -#### Actor Model +### Actor Model [The Actor model](https://en.wikipedia.org/wiki/Actor_model) is a widely used programming paradigm in large-scale distributed systems, and it is also applied in the distributed design of the AgentScope platform. @@ -251,10 +333,10 @@ E-->F D-->F ``` -Specifically, `B` and `C` can start execution simultaneously after receiving the message from `A`, and `E` can run immediately without waiting for `A`, `B`, `C,` and `D`. +Specifically, `B` and `C` can start execution simultaneously after receiving the message from `A`, and `E` can run immediately without waiting for `A`, `B`, `C`, and `D`. By implementing each Agent as an Actor, an Agent will automatically wait for its input `Msg` before starting to execute the `reply` method, and multiple Agents can also automatically execute `reply` at the same time if their input messages are ready, which avoids complex parallel control and makes things simple. -#### PlaceHolder +### PlaceHolder Meanwhile, to support centralized application orchestration, AgentScope introduces the concept of {class}`Placeholder`. A Placeholder is a special message that contains the address and port number of the agent that generated the placeholder, which is used to indicate that the output message of the Agent is not ready yet. @@ -265,7 +347,7 @@ A placeholder itself is also a message, and it can be sent to other agents, and About more detailed technical implementation solutions, please refer to our [paper](https://arxiv.org/abs/2402.14034). -#### Agent Server +### Agent Server In agentscope, the agent server provides a running platform for various types of agents. Multiple agents can run in the same agent server and hold independent memory and other local states but they will share the same computation resources. diff --git a/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md b/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md index 11b0ef070..11028fc41 100644 --- a/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md +++ b/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md @@ -13,7 +13,7 @@ AgentScope实现了基于Actor模式的智能体分布式部署和并行优化 ## 使用方法 AgentScope中,我们将运行应用流程的进程称为**主进程 (Main Process)**,而所有的智能体都会运行在额外的 **智能体服务器进程 (Agent Server Process)** 中。 -根据主进程域智能体服务器进程之间的关系,AgentScope 为每个 Agent 提供了两种启动模式:**子进程模式 (Child)** 和 **独立进程模式 (Indpendent)**。 +根据主进程与智能体服务器进程之间的关系,AgentScope 为每个 Agent 提供了两种启动模式:**子进程模式 (Child)** 和 **独立进程模式 (Indpendent)**。 子进程模式中,开发者可以从主进程中启动所有的智能体服务器进程,而独立进程模式中,智能体服务器进程相对主进程来说是独立的,需要在对应的机器上启动智能体服务器进程。 上述概念有些复杂,但是不用担心,对于应用开发者而言,仅需将已有的智能体转化为对应的分布式版本,其余操作都和正常的单机版本完全一致。 @@ -59,7 +59,7 @@ b = AgentB( 在独立进程模式中,需要首先在目标机器上启动智能体服务器进程,启动时需要提供该服务器能够使用的模型的配置信息,以及服务器的 IP 和端口号。 例如想要将两个智能体服务进程部署在 IP 分别为 `ip_a` 和 `ip_b` 的机器上(假设这两台机器分别为`Machine1` 和 `Machine2`)。 -你可以在 `Machine1` 上运行如下代码。在运行之前请确保该机器能够正确访问到应用中所使用的所有模型。具体来讲,需要将用到的所有模型的配置信息放置在 `model_config_path_a` 文件中,并检查API key 等环境变量是否正确设置,模型配置文件样例可参考 `examples/model_configs_template`。 +你可以在 `Machine1` 上运行如下代码。在运行之前请确保该机器能够正确访问到应用中所使用的所有模型。具体来讲,需要将用到的所有模型的配置信息放置在 `model_config_path_a` 文件中,并检查API key 等环境变量是否正确设置,模型配置文件样例可参考 `examples/model_configs_template`。除此之外,还要将那些需要在该服务器中运行的自定义 Agent 类在 `custom_agent_classes` 中注册,以便启动的服务器能够正确识别这些自定义的 Agent,如果只是使用 AgentScope 内置的 Agent 类,则不需要填写 `custom_agent_classes`。 ```python # import some packages @@ -72,6 +72,7 @@ agentscope.init( server = RpcAgentServerLauncher( host="ip_a", port=12001, # choose an available port + custom_agent_classes=[AgentA, AgentB] # register your customized agent classes ) # Start the service @@ -98,6 +99,7 @@ agentscope.init( server = RpcAgentServerLauncher( host="ip_b", port=12002, # choose an available port + custom_agent_classes=[AgentA, AgentB] # register your customized agent classes ) # Start the service @@ -133,49 +135,6 @@ b = AgentB( 上述代码将会把 `AgentA` 部署到 `Machine1` 的智能体服务器进程上,并将 `AgentB` 部署到 `Machine2` 的智能体服务器进程上。 开发者在这之后只需要用中心化的方法编排各智能体的交互逻辑即可。 -#### `to_dist` 进阶用法 - -上面介绍的案例都是将一个已经初始化的 Agent 通过 {func}`to_dist` 方法转化为其分布式版本,相当于要执行两次初始化操作,一次在主进程中,一次在智能体进程中。如果 Agent 的初始化过程耗时较长,直接使用 `to_dist` 方法会严重影响运行效率。为此 AgentScope 也提供了在初始化 Agent 实例的同时将其转化为其分布式版本的方法,即在原 Agent 实例初始化时传入 `to_dist` 参数。 - -子进程模式下,只需要在 Agent 初始化函数中传入 `to_dist=True` 即可: - -```python -# Child Process mode -a = AgentA( - name="A", - # ... - to_dist=True -) -b = AgentB( - name="B", - # ... - to_dist=True -) -``` - -独立进程模式下, 则需要将原来 `to_dist()` 函数的参数以 {class}`DistConf` 实例的形式传入 Agent 初始化函数的 `to_dist` 域: - -```python -a = AgentA( - name="A", - # ... - to_dist=DistConf( - host="ip_a", - port=12001, - ), -) -b = AgentB( - name="B", - # ... - to_dist=DistConf( - host="ip_b", - port=12002, - ), -) -``` - -相较于原有的 `to_dist()` 函数调用,该方法只会在智能体进程中初始化一次 Agent,避免了重复初始化现象。 - ### 步骤2: 编排分布式应用流程 在AgentScope中,分布式应用流程的编排和非分布式的程序完全一致,开发者可以用中心化的方式编写全部应用流程。 @@ -230,9 +189,132 @@ while x is None or x.content == "exit": x = b(x) ``` -### 实现原理 +### 进阶用法 + +#### 更低成本的 `to_dist` + +上面介绍的案例都是将一个已经初始化的 Agent 通过 {func}`to_dist` 方法转化为其分布式版本,相当于要执行两次初始化操作,一次在主进程中,一次在智能体进程中。如果 Agent 的初始化过程耗时较长,直接使用 `to_dist` 方法会严重影响运行效率。为此 AgentScope 提供了在初始化 Agent 实例的同时将其转化为其分布式版本的方法,即在原 Agent 实例初始化时传入 `to_dist` 参数。 + +子进程模式下,只需要在 Agent 初始化函数中传入 `to_dist=True` 即可: + +```python +# Child Process mode +a = AgentA( + name="A", + # ... + to_dist=True +) +b = AgentB( + name="B", + # ... + to_dist=True +) +``` + +独立进程模式下, 则需要将原来 `to_dist()` 函数的参数以 {class}`DistConf` 实例的形式传入 Agent 初始化函数的 `to_dist` 域: + +```python +a = AgentA( + name="A", + # ... + to_dist=DistConf( + host="ip_a", + port=12001, + ), +) +b = AgentB( + name="B", + # ... + to_dist=DistConf( + host="ip_b", + port=12002, + ), +) +``` + +相较于原有的 `to_dist()` 函数调用,该方法只会在智能体进程中初始化一次 Agent,避免了重复初始化行为,能够有效减少初始化开销。 + +#### 管理 Agent Server + +在运行大规模多智能体应用时,往往需要启动众多的 Agent Server 进程。为了让使用者能够有效管理这些进程,AgentScope 在 {class}`RpcAgentClient` 中提供了如下管理接口: + +- `is_alive`: 该方法能够判断该 Agent Server 进程是否正在运行。 + + ```python + client = RpcAgentClient(host=server_host, port=server_port) + if client.is_alive(): + do_something() + ``` + +- `stop`: 该方法能够停止连接的 Agent Server 进程。 + + ```python + client.stop() + assert(client.is_alive() == False) + ``` + +- `get_agent_list`: 该方法能够获取该 Agent Server 进程中正在运行的所有 Agent 的 JSON 格式的缩略信息列表,具体展示的缩略信息内容取决于该 Agent 类的 `__str__` 方法。 + + ```python + agent_list = client.get_agent_list() + print(agent_list) # [agent1_info, agent2_info, ...] + ``` + +- `get_agent_memory`: 该方法能够获取指定 `agent_id` 对应 Agent 实例的 memory 内容。 + + ```python + agent_id = my_agent.agent_id + agent_memory = client.get_agent_memory(agent_id) + print(agent_memory) # [msg1, msg2, ...] + ``` + +- `get_server_info`:该方法能够获取该 Agent Server 进程的资源占用情况,包括 CPU 利用率、内存占用。 + + ```python + server_info = client.get_server_info() + print(server_info) # { "cpu": xxx, "mem": xxx } + ``` + +- `set_model_configs`: 该方法可以将指定的模型配置信息设置到 Agent Server 进程中,新创建的 Agent 实例可以直接使用这些模型配置信息。 + + ```python + agent = MyAgent( # 因为找不到 [my_openai] 模型而失败 + # ... + model_config_name="my_openai", + to_dist={ + # ... + } + ) + client.set_model_configs([{ # 新增 [my_openai] 模型配置信息 + "config_name": "my_openai", + "model_type": "openai_chat", + # ... + }]) + agent = MyAgent( # 成功创建 Agent 实例 + # ... + model_config_name="my_openai", + to_dist={ + # ... + } + ) + ``` + +- `delete_agent`: 该方法用于删除指定 `agent_id` 对应的 Agent 实例。 + + ```python + agent_id = agent.agent_id + ok = client.delete_agent(agent_id) + ``` + +- `delete_all_agent`: 该方法可以删除 Agent Server 进程中所有的 Agent 实例。 + + ```python + ok = client.delete_all_agent() + ``` + +## 实现原理 -#### Actor模式 +### Actor模式 [Actor模式](https://en.wikipedia.org/wiki/Actor_model)是大规模分布式系统中广泛使用的编程范式,同时也被应用于AgentScope平台的分布式设计中。 在Actor模型中,一个actor是一个实体,它封装了自己的状态,并且仅通过消息传递与其他actor通信。 @@ -262,7 +344,7 @@ Placeholder 内部包含了该消息产生方的联络方法,可以通过网 关于更加详细的技术实现方案,请参考我们的[论文](https://arxiv.org/abs/2402.14034)。 -#### Agent Server +### Agent Server Agent Server 也就是智能体服务器。在 AgentScope 中,Agent Server 提供了一个让不同 Agent 实例运行的平台。多个不同类型的 Agent 可以运行在同一个 Agent Server 中并保持独立的记忆以及其他本地状态信息,但是他们将共享同一份计算资源。 diff --git a/examples/distributed_debate/distributed_debate.py b/examples/distributed_debate/distributed_debate.py index 93e5aeeae..279c107d7 100644 --- a/examples/distributed_debate/distributed_debate.py +++ b/examples/distributed_debate/distributed_debate.py @@ -81,7 +81,7 @@ def setup_server(parsed_args: argparse.Namespace) -> None: server_launcher = RpcAgentServerLauncher( host=host, port=port, - custom_agents=[UserProxyAgent, DialogAgent], + custom_agent_classes=[UserProxyAgent, DialogAgent], ) server_launcher.launch(in_subprocess=False) server_launcher.wait_until_terminate() diff --git a/examples/distributed_simulation/main.py b/examples/distributed_simulation/main.py index bb26fe533..93f5141a9 100644 --- a/examples/distributed_simulation/main.py +++ b/examples/distributed_simulation/main.py @@ -3,6 +3,7 @@ import argparse import time +import math from concurrent import futures from concurrent.futures import as_completed from loguru import logger @@ -62,7 +63,7 @@ def setup_participant_agent_server(host: str, port: int) -> None: host=host, port=port, max_pool_size=16384, - custom_agents=[Moderator, RandomParticipant, LLMParticipant], + custom_agent_classes=[Moderator, RandomParticipant, LLMParticipant], ) assistant_server_launcher.launch(in_subprocess=False) assistant_server_launcher.wait_until_terminate() @@ -113,7 +114,9 @@ def run_main_process( ) host_num = len(hosts) total_agent_server_num = server_per_host * host_num - participant_per_agent_server = participant_num // total_agent_server_num + participant_per_agent_server = math.ceil( + participant_num / total_agent_server_num, + ) ist = time.time() configs = [] logger.info(f"init {participant_num} {agent_type} participant agents...") diff --git a/examples/distributed_simulation/start_all_server.sh b/examples/distributed_simulation/start_all_server.sh index 1c1f56aea..1fa32fa99 100755 --- a/examples/distributed_simulation/start_all_server.sh +++ b/examples/distributed_simulation/start_all_server.sh @@ -14,7 +14,7 @@ fi participant_server_num=$1 # create files for pid -> .pid +>> .pid # create log dir mkdir -p log diff --git a/examples/distributed_simulation/start_vllm.sh b/examples/distributed_simulation/start_vllm.sh index 11b92498c..c66b748d9 100755 --- a/examples/distributed_simulation/start_vllm.sh +++ b/examples/distributed_simulation/start_vllm.sh @@ -5,7 +5,7 @@ gpu_num=8 model_path="path-to-your-model-dir" base_port=8010 -> .vllm_pid +>> .vllm_pid mkdir -p log for ((i=0; i<8; i++)); do diff --git a/setup.py b/setup.py index 8055476f2..ac1fa06e0 100644 --- a/setup.py +++ b/setup.py @@ -23,6 +23,7 @@ "protobuf==4.25.0", "expiringdict", "dill", + "psutil", ] service_requires = [ diff --git a/src/agentscope/agents/agent.py b/src/agentscope/agents/agent.py index 7ae48a5b7..383936472 100644 --- a/src/agentscope/agents/agent.py +++ b/src/agentscope/agents/agent.py @@ -8,6 +8,7 @@ from typing import Union from typing import Any from typing import Type +import json import uuid from loguru import logger @@ -187,9 +188,7 @@ def __init__( """ self.name = name self.memory_config = memory_config - - if sys_prompt is not None: - self.sys_prompt = sys_prompt + self.sys_prompt = sys_prompt # TODO: support to receive a ModelWrapper instance if model_config_name is not None: @@ -217,7 +216,7 @@ def __init__( def generate_agent_id(cls) -> str: """Generate the agent_id of this agent instance""" # TODO: change cls.__name__ into a global unique agent_type - return f"{cls.__name__}_{uuid.uuid4().hex}" + return uuid.uuid4().hex # todo: add a unique agent_type field to distinguish different agent class @classmethod @@ -234,7 +233,7 @@ def get_agent_class(cls, agent_class_name: str) -> Type[AgentBase]: Type[AgentBase]: the AgentBase sub-class. """ if agent_class_name not in cls._registry: - raise ValueError(f"Agent [{agent_class_name}] not found.") + raise ValueError(f"Agent class <{agent_class_name}> not found.") return cls._registry[agent_class_name] # type: ignore[return-value] @classmethod @@ -381,6 +380,20 @@ def _broadcast_to_audience(self, x: dict) -> None: for agent in self._audience: agent.observe(x) + def __str__(self) -> str: + serialized_fields = { + "name": self.name, + "type": self.__class__.__name__, + "sys_prompt": self.sys_prompt, + "agent_id": self.agent_id, + } + if hasattr(self, "model"): + serialized_fields["model"] = { + "model_type": self.model.model_type, + "config_name": self.model.config_name, + } + return json.dumps(serialized_fields, ensure_ascii=False) + @property def agent_id(self) -> str: """The unique id of this agent. diff --git a/src/agentscope/agents/rpc_agent.py b/src/agentscope/agents/rpc_agent.py index ec6cc39ba..8cb942452 100644 --- a/src/agentscope/agents/rpc_agent.py +++ b/src/agentscope/agents/rpc_agent.py @@ -40,7 +40,7 @@ def __init__( agent_class (`Type[AgentBase]`): the AgentBase subclass of the source agent. agent_configs (`dict`): The args used to - initialize the agent, generated by `_AgentMeta`. + init configs of the agent, generated by `_AgentMeta`. max_pool_size (`int`, defaults to `8192`): Max number of task results that the server can accommodate. max_timeout_seconds (`int`, defaults to `1800`): @@ -80,7 +80,7 @@ def __init__( max_pool_size=max_pool_size, max_timeout_seconds=max_timeout_seconds, local_mode=local_mode, - custom_agents=[agent_class], + custom_agent_classes=[agent_class], studio_url=studio_url, ) if not lazy_launch: @@ -92,7 +92,9 @@ def __init__( agent_id=self.agent_id, ) if not self.connect_existing: - self.client.create_agent(agent_configs) + self.client.create_agent( + agent_configs, + ) def _launch_server(self) -> None: """Launch a rpc server and update the port and the client""" @@ -118,7 +120,7 @@ def reply(self, x: dict = None) -> dict: def observe(self, x: Union[dict, Sequence[dict]]) -> None: if self.client is None: self._launch_server() - self.client.call_func( + self.client.call_agent_func( func_name="_observe", value=serialize(x), # type: ignore[arg-type] ) @@ -156,7 +158,7 @@ def clone_instances( # clone instances without agent server for _ in range(generated_instance_number): - new_agent_id = self.client.call_func("_clone_agent") + new_agent_id = self.client.clone_agent(self.agent_id) generated_instances.append( RpcAgent( name=self.name, diff --git a/src/agentscope/constants.py b/src/agentscope/constants.py index a0298ad0c..22b2f4459 100644 --- a/src/agentscope/constants.py +++ b/src/agentscope/constants.py @@ -45,6 +45,14 @@ # typing Embedding = list[Number] +# rpc + +# set max message size to 32 MB +_DEFAULT_RPC_OPTIONS = [ + ("grpc.max_send_message_length", 32 * 1024 * 1024), + ("grpc.max_receive_message_length", 32 * 1024 * 1024), +] + # enums class ResponseFormat(IntEnum): diff --git a/src/agentscope/exception.py b/src/agentscope/exception.py index 6110fe36e..ac83c772f 100644 --- a/src/agentscope/exception.py +++ b/src/agentscope/exception.py @@ -98,6 +98,9 @@ class ArgumentTypeError(FunctionCallError): """The exception class for argument type error.""" +# - AgentScope Studio Exceptions + + class StudioError(Exception): """The base class for exception raising during interaction with agentscope studio.""" @@ -111,3 +114,46 @@ def __str__(self) -> str: class StudioRegisterError(StudioError): """The exception class for error when registering to agentscope studio.""" + + +# - Agent Server Exceptions + + +class AgentServerError(Exception): + """The exception class for agent server related errors.""" + + host: str + """Hostname of the server.""" + port: int + """Port of the server.""" + message: str + """Error message""" + + def __init__( + self, + host: str, + port: int, + message: str = None, + ) -> None: + """Initialize the exception with the message.""" + self.host = host + self.port = port + self.message = message + + def __str__(self) -> str: + err_msg = f"{self.__class__.__name__}[{self.host}:{self.port}]" + if self.message is not None: + err_msg += f": {self.message}" + return err_msg + + +class AgentServerNotAliveError(AgentServerError): + """The exception class for agent server not alive error.""" + + +class AgentCreationError(AgentServerError): + """The exception class for failing to create agent.""" + + +class AgentCallError(AgentServerError): + """The exception class for failing to call agent.""" diff --git a/src/agentscope/message.py b/src/agentscope/message.py index 23d2dd7a0..d904ded4a 100644 --- a/src/agentscope/message.py +++ b/src/agentscope/message.py @@ -9,6 +9,7 @@ from .rpc import RpcAgentClient, ResponseStub, call_in_thread from .utils.tools import _get_timestamp +from .utils.tools import is_web_accessible class MessageBase(dict): @@ -365,19 +366,32 @@ def update_value(self) -> MessageBase: # retrieve real message from rpc agent server self.__update_task_id() client = RpcAgentClient(self._host, self._port) - result = client.call_func( - func_name="_get", - value=json.dumps({"task_id": self._task_id}), - ) + result = client.update_placeholder(task_id=self._task_id) msg = deserialize(result) - status = msg.pop("__status", "OK") - if status == "ERROR": - raise RuntimeError(msg.content) + self.__update_url(msg) # type: ignore[arg-type] self.update(msg) # the actual value has been updated, not a placeholder any more self._is_placeholder = False return self + def __update_url(self, msg: MessageBase) -> None: + """Update the url field of the message.""" + if hasattr(msg, "url") and msg.url is None: + return + url = msg.url + if isinstance(url, str): + urls = [url] + else: + urls = url + checked_urls = [] + for url in urls: + if not is_web_accessible(url): + client = RpcAgentClient(self._host, self._port) + checked_urls.append(client.download_file(path=url)) + else: + checked_urls.append(url) + msg.url = checked_urls[0] if isinstance(url, str) else checked_urls + def __update_task_id(self) -> None: if self._stub is not None: try: diff --git a/src/agentscope/rpc/rpc_agent.proto b/src/agentscope/rpc/rpc_agent.proto index fe27e0d1e..95893e03f 100644 --- a/src/agentscope/rpc/rpc_agent.proto +++ b/src/agentscope/rpc/rpc_agent.proto @@ -1,13 +1,81 @@ syntax = "proto3"; +import "google/protobuf/empty.proto"; + // Servicer for rpc agent server service RpcAgent { - rpc call_func (RpcMsg) returns (RpcMsg) {} + // check server is alive + rpc is_alive (google.protobuf.Empty) returns (GeneralResponse) {} + + // stop the server + rpc stop (google.protobuf.Empty) returns (GeneralResponse) {} + + // create a new agent on the server + rpc create_agent (CreateAgentRequest) returns (GeneralResponse) {} + + // delete agent from the server + rpc delete_agent (StringMsg) returns (GeneralResponse) {} + + // clear all agent on the server + rpc delete_all_agents (google.protobuf.Empty) returns (GeneralResponse) {} + + // clone an agent with specific agent_id + rpc clone_agent (StringMsg) returns (GeneralResponse) {} + + // get id of all agents on the server as a list + rpc get_agent_list (google.protobuf.Empty) returns (GeneralResponse) {} + + // get the resource utilization information of the server + rpc get_server_info(google.protobuf.Empty) returns (GeneralResponse) {} + + // update the model configs in the server + rpc set_model_configs(StringMsg) returns (GeneralResponse) {} + + // get memory of a specific agent + rpc get_agent_memory (StringMsg) returns (GeneralResponse) {} + + // call funcs of agent running on the server + rpc call_agent_func(RpcMsg) returns (GeneralResponse) {} + + // update value of PlaceholderMessage + rpc update_placeholder(UpdatePlaceholderRequest) returns (GeneralResponse) {} + + // file transfer + rpc download_file(StringMsg) returns (stream ByteMsg) {} } -// Message class for rpc communication +// Message classes for agent server management +message GeneralResponse { + bool ok = 1; + string message = 2; +} + +message CreateAgentRequest { + string agent_id = 1; + bytes agent_init_args = 2; + bytes agent_source_code = 3; +} + +message AgentStatus { + string agent_id = 1; + string status = 2; +} + +message UpdatePlaceholderRequest { + int64 task_id = 1; +} + +message StringMsg { + string value = 1; +} + +message ByteMsg { + bytes data = 1; +} + +// Message class for agent function call message RpcMsg { string value = 1; string target_func = 2; string agent_id = 3; -} \ No newline at end of file +} diff --git a/src/agentscope/rpc/rpc_agent_client.py b/src/agentscope/rpc/rpc_agent_client.py index 189e0895f..5421d4ace 100644 --- a/src/agentscope/rpc/rpc_agent_client.py +++ b/src/agentscope/rpc/rpc_agent_client.py @@ -2,95 +2,342 @@ """ Client of rpc agent server """ import threading -import base64 -from typing import Optional +import json +import os +from typing import Optional, Sequence, Union from loguru import logger try: import dill import grpc from grpc import RpcError - from agentscope.rpc.rpc_agent_pb2 import RpcMsg # pylint: disable=E0611 + from google.protobuf.empty_pb2 import Empty from agentscope.rpc.rpc_agent_pb2_grpc import RpcAgentStub + import agentscope.rpc.rpc_agent_pb2 as agent_pb2 except ImportError as import_error: from agentscope.utils.tools import ImportErrorReporter dill = ImportErrorReporter(import_error, "distribute") grpc = ImportErrorReporter(import_error, "distribute") - RpcMsg = ImportErrorReporter(import_error, "distribute") + agent_pb2 = ImportErrorReporter(import_error, "distribute") RpcAgentStub = ImportErrorReporter(import_error, "distribute") RpcError = ImportError +from agentscope.file_manager import file_manager +from agentscope.utils.tools import generate_id_from_seed +from agentscope.exception import AgentServerNotAliveError +from agentscope.constants import _DEFAULT_RPC_OPTIONS +from agentscope.exception import AgentCallError + class RpcAgentClient: """A client of Rpc agent server""" - def __init__(self, host: str, port: int, agent_id: str = "") -> None: + def __init__( + self, + host: str, + port: int, + agent_id: str = None, + ) -> None: """Init a rpc agent client Args: - host (`str`): the hostname of the rpc agent server which the + host (`str`): The hostname of the rpc agent server which the client is connected. - port (`int`): the port of the rpc agent server which the client + port (`int`): The port of the rpc agent server which the client is connected. - agent_id (`str`): the agent id of the agent being called. + agent_id (`str`): The agent id of the agent being called. + Defaults to None. """ self.host = host self.port = port self.agent_id = agent_id - def call_func( + def call_agent_func( self, func_name: str, value: Optional[str] = None, timeout: int = 300, ) -> str: - """Call the specific function of rpc server. + """Call the specific function of an agent running on the server. Args: - func_name (`str`): the name of the function being called. - x (`str`, optional): the seralized input value. Defaults to None. + func_name (`str`): The name of the function being called. + value (`str`, optional): The serialized function input value. + Defaults to None. + timeout (`int`, optional): The timeout for the RPC call in seconds. + Defaults to 300. Returns: str: serialized return data. """ - with grpc.insecure_channel(f"{self.host}:{self.port}") as channel: - stub = RpcAgentStub(channel) - result_msg = stub.call_func( - RpcMsg( - value=value, - target_func=func_name, - agent_id=self.agent_id, - ), - timeout=timeout, - ) - return result_msg.value + try: + with grpc.insecure_channel(f"{self.host}:{self.port}") as channel: + stub = RpcAgentStub(channel) + result_msg = stub.call_agent_func( + agent_pb2.RpcMsg( + value=value, + target_func=func_name, + agent_id=self.agent_id, + ), + timeout=timeout, + ) + return result_msg.message + except Exception as e: + # check the server and raise a more reasonable error + if not self.is_alive(): + raise AgentServerNotAliveError( + host=self.host, + port=self.port, + message=str(e), + ) from e + raise e + + def is_alive(self) -> bool: + """Check if the agent server is alive. + + Returns: + bool: Indecate whether the server is alive. + """ - def create_agent(self, agent_configs: dict) -> None: - """Create a new agent for this client.""" try: - if self.agent_id is None or len(self.agent_id) == 0: - return - self.call_func( - "_create_agent", - base64.b64encode(dill.dumps(agent_configs)).decode("utf-8"), + with grpc.insecure_channel(f"{self.host}:{self.port}") as channel: + stub = RpcAgentStub(channel) + status = stub.is_alive(Empty(), timeout=5) + if not status.ok: + raise AgentServerNotAliveError( + host=self.host, + port=self.port, + ) + return status.ok + except Exception: + logger.info( + f"Agent server [{self.host}:{self.port}] not alive.", ) + return False + + def stop(self) -> None: + """Stop the agent server.""" + try: + with grpc.insecure_channel(f"{self.host}:{self.port}") as channel: + stub = RpcAgentStub(channel) + logger.info( + f"Stopping agent server at [{self.host}:{self.port}].", + ) + resp = stub.stop(Empty(), timeout=5) + if resp.ok: + logger.info( + f"Agent server at [{self.host}:{self.port}] stopped.", + ) + else: + logger.error( + f"Fail to stop the agent server: {resp.message}", + ) except Exception as e: logger.error( - f"Fail to create agent with id [{self.agent_id}]: {e}", + f"Fail to stop the agent server: {e}", ) - def delete_agent(self) -> None: + def create_agent( + self, + agent_configs: dict, + agent_id: str = None, + ) -> bool: + """Create a new agent for this client. + + Args: + agent_configs (`dict`): Init configs of the agent, generated by + `_AgentMeta`. + agent_id (`str`): agent_id of the created agent. + + Returns: + bool: Indecate whether the creation is successful """ - Delete the agent created by this client. + try: + with grpc.insecure_channel(f"{self.host}:{self.port}") as channel: + stub = RpcAgentStub(channel) + status = stub.create_agent( + agent_pb2.CreateAgentRequest( + agent_id=( + self.agent_id if agent_id is None else agent_id + ), + agent_init_args=dill.dumps(agent_configs), + ), + ) + if not status.ok: + logger.error( + f"Error when creating agent: {status.message}", + ) + return status.ok + except Exception as e: + # check the server and raise a more reasonable error + if not self.is_alive(): + raise AgentServerNotAliveError( + host=self.host, + port=self.port, + message=str(e), + ) from e + raise e + + def delete_agent( + self, + agent_id: str = None, + ) -> bool: + """ + Delete agents with the specific agent_id. + + Args: + agent_id (`str`): id of the agent to be deleted. + + Returns: + bool: Indecate whether the deletion is successful """ + with grpc.insecure_channel(f"{self.host}:{self.port}") as channel: + stub = RpcAgentStub(channel) + status = stub.delete_agent( + agent_pb2.StringMsg(value=agent_id), + ) + if not status.ok: + logger.error(f"Error when deleting agent: {status.message}") + return status.ok + + def delete_all_agent(self) -> bool: + """Delete all agents on the server.""" + with grpc.insecure_channel(f"{self.host}:{self.port}") as channel: + stub = RpcAgentStub(channel) + status = stub.delete_all_agents(Empty()) + if not status.ok: + logger.error(f"Error when delete all agents: {status.message}") + return status.ok + + def clone_agent(self, agent_id: str) -> Optional[str]: + """Clone a new agent instance from the origin instance. + + Args: + agent_id (`str`): The agent_id of the agent to be cloned. + + Returns: + str: The `agent_id` of the generated agent. + """ + with grpc.insecure_channel(f"{self.host}:{self.port}") as channel: + stub = RpcAgentStub(channel) + resp = stub.clone_agent( + agent_pb2.StringMsg(value=agent_id), + ) + if not resp.ok: + logger.error( + f"Error when clone agent [{agent_id}]: {resp.message}", + ) + return None + else: + return resp.message + + def update_placeholder(self, task_id: int) -> str: + """Update the placeholder value. + + Args: + task_id (`int`): `task_id` of the PlaceholderMessage. + + Returns: + bool: Whether the update is successful. + str: Serialized message value. + """ + with grpc.insecure_channel( + f"{self.host}:{self.port}", + options=_DEFAULT_RPC_OPTIONS, + ) as channel: + stub = RpcAgentStub(channel) + resp = stub.update_placeholder( + agent_pb2.UpdatePlaceholderRequest(task_id=task_id), + ) + if not resp.ok: + raise AgentCallError( + host=self.host, + port=self.port, + message=f"Failed to update placeholder: {resp.message}", + ) + return resp.message + + def get_agent_list(self) -> Sequence[dict]: + """ + Get the summary of all agents on the server as a list. + + Returns: + Sequence[str]: list of agent summary information. + """ + with grpc.insecure_channel(f"{self.host}:{self.port}") as channel: + stub = RpcAgentStub(channel) + resp = stub.get_agent_list(Empty()) + if not resp.ok: + logger.error(f"Error when get agent list: {resp.message}") + return [] + return [ + json.loads(agent_str) for agent_str in json.loads(resp.message) + ] + + def get_server_info(self) -> dict: + """Get the agent server resource usage information.""" try: - if self.agent_id is not None and len(self.agent_id) > 0: - self.call_func("_delete_agent", timeout=5) - except Exception: - logger.warning( - f"Fail to delete agent with id [{self.agent_id}]", + with grpc.insecure_channel(f"{self.host}:{self.port}") as channel: + stub = RpcAgentStub(channel) + resp = stub.get_server_info(Empty()) + if not resp.ok: + logger.error(f"Error in get_server_info: {resp.message}") + return {} + return json.loads(resp.message) + except Exception as e: + logger.error(f"Error in get_server_info: {e}") + return {} + + def set_model_configs( + self, + model_configs: Union[dict, list[dict]], + ) -> bool: + """Set the model configs of the server.""" + with grpc.insecure_channel(f"{self.host}:{self.port}") as channel: + stub = RpcAgentStub(channel) + resp = stub.set_model_configs( + agent_pb2.StringMsg(value=json.dumps(model_configs)), ) + if not resp.ok: + logger.error(f"Error in set_model_configs: {resp.message}") + return False + return True + + def get_agent_memory(self, agent_id: str) -> Union[list, dict]: + """Get the memory usage of the specific agent.""" + with grpc.insecure_channel(f"{self.host}:{self.port}") as channel: + stub = RpcAgentStub(channel) + resp = stub.get_agent_memory( + agent_pb2.StringMsg(value=agent_id), + ) + if not resp.ok: + logger.error(f"Error in get_agent_memory: {resp.message}") + return json.loads(resp.message) + + def download_file(self, path: str) -> str: + """Download a file from a remote server to the local machine. + + Args: + path (`str`): The path of the file to be downloaded. Note that + it is the path on the remote server. + + Returns: + `str`: The path of the downloaded file. Note that it is the path + on the local machine. + """ + local_file_name = ( + f"{generate_id_from_seed(path, 5)}_{os.path.basename(path)}" + ) + local_path = os.path.join(file_manager.dir_file, local_file_name) + with grpc.insecure_channel(f"{self.host}:{self.port}") as channel: + stub = RpcAgentStub(channel) + with open(local_path, "wb") as f: + for resp in stub.download_file( + agent_pb2.StringMsg(value=path), + ): + f.write(resp.data) + return local_path class ResponseStub: @@ -122,18 +369,18 @@ def call_in_thread( """Call rpc function in a sub-thread. Args: - client (`RpcAgentClient`): the rpc client. - x (`str`): the value of the reqeust. - func_name (`str`): the name of the function being called. + client (`RpcAgentClient`): The rpc client. + x (`str`): The value of the reqeust. + func_name (`str`): The name of the function being called. Returns: - `ResponseStub`: a stub to get the response. + `ResponseStub`: A stub to get the response. """ stub = ResponseStub() def wrapper() -> None: try: - resp = client.call_func( + resp = client.call_agent_func( func_name=func_name, value=value, ) diff --git a/src/agentscope/rpc/rpc_agent_pb2.py b/src/agentscope/rpc/rpc_agent_pb2.py index 3480a543c..a3fe9ae9e 100644 --- a/src/agentscope/rpc/rpc_agent_pb2.py +++ b/src/agentscope/rpc/rpc_agent_pb2.py @@ -13,8 +13,11 @@ _sym_db = _symbol_database.Default() +from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2 + + DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x0frpc_agent.proto">\n\x06RpcMsg\x12\r\n\x05value\x18\x01 \x01(\t\x12\x13\n\x0btarget_func\x18\x02 \x01(\t\x12\x10\n\x08\x61gent_id\x18\x03 \x01(\t2+\n\x08RpcAgent\x12\x1f\n\tcall_func\x12\x07.RpcMsg\x1a\x07.RpcMsg"\x00\x62\x06proto3', + b'\n\x0frpc_agent.proto\x1a\x1bgoogle/protobuf/empty.proto".\n\x0fGeneralResponse\x12\n\n\x02ok\x18\x01 \x01(\x08\x12\x0f\n\x07message\x18\x02 \x01(\t"Z\n\x12\x43reateAgentRequest\x12\x10\n\x08\x61gent_id\x18\x01 \x01(\t\x12\x17\n\x0f\x61gent_init_args\x18\x02 \x01(\x0c\x12\x19\n\x11\x61gent_source_code\x18\x03 \x01(\x0c"/\n\x0b\x41gentStatus\x12\x10\n\x08\x61gent_id\x18\x01 \x01(\t\x12\x0e\n\x06status\x18\x02 \x01(\t"+\n\x18UpdatePlaceholderRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\x03"\x1a\n\tStringMsg\x12\r\n\x05value\x18\x01 \x01(\t"\x17\n\x07\x42yteMsg\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c">\n\x06RpcMsg\x12\r\n\x05value\x18\x01 \x01(\t\x12\x13\n\x0btarget_func\x18\x02 \x01(\t\x12\x10\n\x08\x61gent_id\x18\x03 \x01(\t2\xd5\x05\n\x08RpcAgent\x12\x36\n\x08is_alive\x12\x16.google.protobuf.Empty\x1a\x10.GeneralResponse"\x00\x12\x32\n\x04stop\x12\x16.google.protobuf.Empty\x1a\x10.GeneralResponse"\x00\x12\x37\n\x0c\x63reate_agent\x12\x13.CreateAgentRequest\x1a\x10.GeneralResponse"\x00\x12.\n\x0c\x64\x65lete_agent\x12\n.StringMsg\x1a\x10.GeneralResponse"\x00\x12?\n\x11\x64\x65lete_all_agents\x12\x16.google.protobuf.Empty\x1a\x10.GeneralResponse"\x00\x12-\n\x0b\x63lone_agent\x12\n.StringMsg\x1a\x10.GeneralResponse"\x00\x12<\n\x0eget_agent_list\x12\x16.google.protobuf.Empty\x1a\x10.GeneralResponse"\x00\x12=\n\x0fget_server_info\x12\x16.google.protobuf.Empty\x1a\x10.GeneralResponse"\x00\x12\x33\n\x11set_model_configs\x12\n.StringMsg\x1a\x10.GeneralResponse"\x00\x12\x32\n\x10get_agent_memory\x12\n.StringMsg\x1a\x10.GeneralResponse"\x00\x12.\n\x0f\x63\x61ll_agent_func\x12\x07.RpcMsg\x1a\x10.GeneralResponse"\x00\x12\x43\n\x12update_placeholder\x12\x19.UpdatePlaceholderRequest\x1a\x10.GeneralResponse"\x00\x12)\n\rdownload_file\x12\n.StringMsg\x1a\x08.ByteMsg"\x00\x30\x01\x62\x06proto3', ) _globals = globals() @@ -22,8 +25,20 @@ _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, "rpc_agent_pb2", _globals) if _descriptor._USE_C_DESCRIPTORS == False: DESCRIPTOR._options = None - _globals["_RPCMSG"]._serialized_start = 19 - _globals["_RPCMSG"]._serialized_end = 81 - _globals["_RPCAGENT"]._serialized_start = 83 - _globals["_RPCAGENT"]._serialized_end = 126 + _globals["_GENERALRESPONSE"]._serialized_start = 48 + _globals["_GENERALRESPONSE"]._serialized_end = 94 + _globals["_CREATEAGENTREQUEST"]._serialized_start = 96 + _globals["_CREATEAGENTREQUEST"]._serialized_end = 186 + _globals["_AGENTSTATUS"]._serialized_start = 188 + _globals["_AGENTSTATUS"]._serialized_end = 235 + _globals["_UPDATEPLACEHOLDERREQUEST"]._serialized_start = 237 + _globals["_UPDATEPLACEHOLDERREQUEST"]._serialized_end = 280 + _globals["_STRINGMSG"]._serialized_start = 282 + _globals["_STRINGMSG"]._serialized_end = 308 + _globals["_BYTEMSG"]._serialized_start = 310 + _globals["_BYTEMSG"]._serialized_end = 333 + _globals["_RPCMSG"]._serialized_start = 335 + _globals["_RPCMSG"]._serialized_end = 397 + _globals["_RPCAGENT"]._serialized_start = 400 + _globals["_RPCAGENT"]._serialized_end = 1125 # @@protoc_insertion_point(module_scope) diff --git a/src/agentscope/rpc/rpc_agent_pb2_grpc.py b/src/agentscope/rpc/rpc_agent_pb2_grpc.py index 4099c7027..0234d55f2 100644 --- a/src/agentscope/rpc/rpc_agent_pb2_grpc.py +++ b/src/agentscope/rpc/rpc_agent_pb2_grpc.py @@ -3,11 +3,15 @@ """Client and server classes corresponding to protobuf-defined services.""" try: import grpc + from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2 except ImportError as import_error: from agentscope.utils.tools import ImportErrorReporter grpc = ImportErrorReporter(import_error, "distribute") - + google_dot_protobuf_dot_empty__pb2 = ImportErrorReporter( + import_error, + "distribute", + ) import agentscope.rpc.rpc_agent_pb2 as rpc__agent__pb2 @@ -20,18 +24,150 @@ def __init__(self, channel): Args: channel: A grpc.Channel. """ - self.call_func = channel.unary_unary( - "/RpcAgent/call_func", + self.is_alive = channel.unary_unary( + "/RpcAgent/is_alive", + request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + response_deserializer=rpc__agent__pb2.GeneralResponse.FromString, + ) + self.stop = channel.unary_unary( + "/RpcAgent/stop", + request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + response_deserializer=rpc__agent__pb2.GeneralResponse.FromString, + ) + self.create_agent = channel.unary_unary( + "/RpcAgent/create_agent", + request_serializer=rpc__agent__pb2.CreateAgentRequest.SerializeToString, + response_deserializer=rpc__agent__pb2.GeneralResponse.FromString, + ) + self.delete_agent = channel.unary_unary( + "/RpcAgent/delete_agent", + request_serializer=rpc__agent__pb2.StringMsg.SerializeToString, + response_deserializer=rpc__agent__pb2.GeneralResponse.FromString, + ) + self.delete_all_agents = channel.unary_unary( + "/RpcAgent/delete_all_agents", + request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + response_deserializer=rpc__agent__pb2.GeneralResponse.FromString, + ) + self.clone_agent = channel.unary_unary( + "/RpcAgent/clone_agent", + request_serializer=rpc__agent__pb2.StringMsg.SerializeToString, + response_deserializer=rpc__agent__pb2.GeneralResponse.FromString, + ) + self.get_agent_list = channel.unary_unary( + "/RpcAgent/get_agent_list", + request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + response_deserializer=rpc__agent__pb2.GeneralResponse.FromString, + ) + self.get_server_info = channel.unary_unary( + "/RpcAgent/get_server_info", + request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + response_deserializer=rpc__agent__pb2.GeneralResponse.FromString, + ) + self.set_model_configs = channel.unary_unary( + "/RpcAgent/set_model_configs", + request_serializer=rpc__agent__pb2.StringMsg.SerializeToString, + response_deserializer=rpc__agent__pb2.GeneralResponse.FromString, + ) + self.get_agent_memory = channel.unary_unary( + "/RpcAgent/get_agent_memory", + request_serializer=rpc__agent__pb2.StringMsg.SerializeToString, + response_deserializer=rpc__agent__pb2.GeneralResponse.FromString, + ) + self.call_agent_func = channel.unary_unary( + "/RpcAgent/call_agent_func", request_serializer=rpc__agent__pb2.RpcMsg.SerializeToString, - response_deserializer=rpc__agent__pb2.RpcMsg.FromString, + response_deserializer=rpc__agent__pb2.GeneralResponse.FromString, + ) + self.update_placeholder = channel.unary_unary( + "/RpcAgent/update_placeholder", + request_serializer=rpc__agent__pb2.UpdatePlaceholderRequest.SerializeToString, + response_deserializer=rpc__agent__pb2.GeneralResponse.FromString, + ) + self.download_file = channel.unary_stream( + "/RpcAgent/download_file", + request_serializer=rpc__agent__pb2.StringMsg.SerializeToString, + response_deserializer=rpc__agent__pb2.ByteMsg.FromString, ) class RpcAgentServicer(object): """Servicer for rpc agent server""" - def call_func(self, request, context): - """Missing associated documentation comment in .proto file.""" + def is_alive(self, request, context): + """check server is alive""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def stop(self, request, context): + """stop the server""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def create_agent(self, request, context): + """create a new agent on the server""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def delete_agent(self, request, context): + """delete agent from the server""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def delete_all_agents(self, request, context): + """clear all agent on the server""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def clone_agent(self, request, context): + """clone an agent with specific agent_id""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def get_agent_list(self, request, context): + """get id of all agents on the server as a list""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def get_server_info(self, request, context): + """get the resource utilization information of the server""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def set_model_configs(self, request, context): + """update the model configs in the server""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def get_agent_memory(self, request, context): + """get memory of a specific agent""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def call_agent_func(self, request, context): + """call funcs of agent running on the server""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def update_placeholder(self, request, context): + """update value of PlaceholderMessage""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def download_file(self, request, context): + """file transfer""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details("Method not implemented!") raise NotImplementedError("Method not implemented!") @@ -39,10 +175,70 @@ def call_func(self, request, context): def add_RpcAgentServicer_to_server(servicer, server): rpc_method_handlers = { - "call_func": grpc.unary_unary_rpc_method_handler( - servicer.call_func, + "is_alive": grpc.unary_unary_rpc_method_handler( + servicer.is_alive, + request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, + response_serializer=rpc__agent__pb2.GeneralResponse.SerializeToString, + ), + "stop": grpc.unary_unary_rpc_method_handler( + servicer.stop, + request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, + response_serializer=rpc__agent__pb2.GeneralResponse.SerializeToString, + ), + "create_agent": grpc.unary_unary_rpc_method_handler( + servicer.create_agent, + request_deserializer=rpc__agent__pb2.CreateAgentRequest.FromString, + response_serializer=rpc__agent__pb2.GeneralResponse.SerializeToString, + ), + "delete_agent": grpc.unary_unary_rpc_method_handler( + servicer.delete_agent, + request_deserializer=rpc__agent__pb2.StringMsg.FromString, + response_serializer=rpc__agent__pb2.GeneralResponse.SerializeToString, + ), + "delete_all_agents": grpc.unary_unary_rpc_method_handler( + servicer.delete_all_agents, + request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, + response_serializer=rpc__agent__pb2.GeneralResponse.SerializeToString, + ), + "clone_agent": grpc.unary_unary_rpc_method_handler( + servicer.clone_agent, + request_deserializer=rpc__agent__pb2.StringMsg.FromString, + response_serializer=rpc__agent__pb2.GeneralResponse.SerializeToString, + ), + "get_agent_list": grpc.unary_unary_rpc_method_handler( + servicer.get_agent_list, + request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, + response_serializer=rpc__agent__pb2.GeneralResponse.SerializeToString, + ), + "get_server_info": grpc.unary_unary_rpc_method_handler( + servicer.get_server_info, + request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, + response_serializer=rpc__agent__pb2.GeneralResponse.SerializeToString, + ), + "set_model_configs": grpc.unary_unary_rpc_method_handler( + servicer.set_model_configs, + request_deserializer=rpc__agent__pb2.StringMsg.FromString, + response_serializer=rpc__agent__pb2.GeneralResponse.SerializeToString, + ), + "get_agent_memory": grpc.unary_unary_rpc_method_handler( + servicer.get_agent_memory, + request_deserializer=rpc__agent__pb2.StringMsg.FromString, + response_serializer=rpc__agent__pb2.GeneralResponse.SerializeToString, + ), + "call_agent_func": grpc.unary_unary_rpc_method_handler( + servicer.call_agent_func, request_deserializer=rpc__agent__pb2.RpcMsg.FromString, - response_serializer=rpc__agent__pb2.RpcMsg.SerializeToString, + response_serializer=rpc__agent__pb2.GeneralResponse.SerializeToString, + ), + "update_placeholder": grpc.unary_unary_rpc_method_handler( + servicer.update_placeholder, + request_deserializer=rpc__agent__pb2.UpdatePlaceholderRequest.FromString, + response_serializer=rpc__agent__pb2.GeneralResponse.SerializeToString, + ), + "download_file": grpc.unary_stream_rpc_method_handler( + servicer.download_file, + request_deserializer=rpc__agent__pb2.StringMsg.FromString, + response_serializer=rpc__agent__pb2.ByteMsg.SerializeToString, ), } generic_handler = grpc.method_handlers_generic_handler( @@ -57,7 +253,7 @@ class RpcAgent(object): """Servicer for rpc agent server""" @staticmethod - def call_func( + def is_alive( request, target, options=(), @@ -72,9 +268,357 @@ def call_func( return grpc.experimental.unary_unary( request, target, - "/RpcAgent/call_func", + "/RpcAgent/is_alive", + google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + rpc__agent__pb2.GeneralResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) + + @staticmethod + def stop( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/RpcAgent/stop", + google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + rpc__agent__pb2.GeneralResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) + + @staticmethod + def create_agent( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/RpcAgent/create_agent", + rpc__agent__pb2.CreateAgentRequest.SerializeToString, + rpc__agent__pb2.GeneralResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) + + @staticmethod + def delete_agent( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/RpcAgent/delete_agent", + rpc__agent__pb2.StringMsg.SerializeToString, + rpc__agent__pb2.GeneralResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) + + @staticmethod + def delete_all_agents( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/RpcAgent/delete_all_agents", + google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + rpc__agent__pb2.GeneralResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) + + @staticmethod + def clone_agent( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/RpcAgent/clone_agent", + rpc__agent__pb2.StringMsg.SerializeToString, + rpc__agent__pb2.GeneralResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) + + @staticmethod + def get_agent_list( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/RpcAgent/get_agent_list", + google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + rpc__agent__pb2.GeneralResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) + + @staticmethod + def get_server_info( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/RpcAgent/get_server_info", + google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + rpc__agent__pb2.GeneralResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) + + @staticmethod + def set_model_configs( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/RpcAgent/set_model_configs", + rpc__agent__pb2.StringMsg.SerializeToString, + rpc__agent__pb2.GeneralResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) + + @staticmethod + def get_agent_memory( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/RpcAgent/get_agent_memory", + rpc__agent__pb2.StringMsg.SerializeToString, + rpc__agent__pb2.GeneralResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) + + @staticmethod + def call_agent_func( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/RpcAgent/call_agent_func", rpc__agent__pb2.RpcMsg.SerializeToString, - rpc__agent__pb2.RpcMsg.FromString, + rpc__agent__pb2.GeneralResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) + + @staticmethod + def update_placeholder( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/RpcAgent/update_placeholder", + rpc__agent__pb2.UpdatePlaceholderRequest.SerializeToString, + rpc__agent__pb2.GeneralResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) + + @staticmethod + def download_file( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_stream( + request, + target, + "/RpcAgent/download_file", + rpc__agent__pb2.StringMsg.SerializeToString, + rpc__agent__pb2.ByteMsg.FromString, options, channel_credentials, insecure, diff --git a/src/agentscope/server/launcher.py b/src/agentscope/server/launcher.py index 7a77e4971..9a34f5e34 100644 --- a/src/agentscope/server/launcher.py +++ b/src/agentscope/server/launcher.py @@ -28,6 +28,7 @@ from agentscope.server.servicer import AgentServerServicer from agentscope.agents.agent import AgentBase from agentscope.utils.tools import check_port, generate_id_from_seed +from agentscope.constants import _DEFAULT_RPC_OPTIONS def _setup_agent_server( @@ -40,9 +41,9 @@ def _setup_agent_server( pipe: int = None, local_mode: bool = True, max_pool_size: int = 8192, - max_timeout_seconds: int = 1800, + max_timeout_seconds: int = 7200, studio_url: str = None, - custom_agents: list = None, + custom_agent_classes: list = None, ) -> None: """Setup agent server. @@ -63,16 +64,17 @@ def _setup_agent_server( process has been stopped. pipe (`int`, defaults to `None`): A pipe instance used to pass the actual port of the server. - local_mode (`bool`, defaults to `None`): + local_mode (`bool`, defaults to `True`): Only listen to local requests. max_pool_size (`int`, defaults to `8192`): Max number of agent replies that the server can accommodate. - max_timeout_seconds (`int`, defaults to `1800`): + max_timeout_seconds (`int`, defaults to `7200`): Timeout for agent replies. studio_url (`str`, defaults to `None`): URL of the AgentScope Studio. - custom_agents (`list`, defaults to `None`): - A list of custom agent classes that are not in `agentscope.agents`. + custom_agent_classes (`list`, defaults to `None`): + A list of customized agent classes that are not in + `agentscope.agents`. """ asyncio.run( _setup_agent_server_async( @@ -87,7 +89,7 @@ def _setup_agent_server( max_pool_size=max_pool_size, max_timeout_seconds=max_timeout_seconds, studio_url=studio_url, - custom_agents=custom_agents, + custom_agent_classes=custom_agent_classes, ), ) @@ -102,9 +104,9 @@ async def _setup_agent_server_async( pipe: int = None, local_mode: bool = True, max_pool_size: int = 8192, - max_timeout_seconds: int = 1800, + max_timeout_seconds: int = 7200, studio_url: str = None, - custom_agents: list = None, + custom_agent_classes: list = None, ) -> None: """Setup agent server in an async way. @@ -120,31 +122,30 @@ async def _setup_agent_server_async( start_event (`EventClass`, defaults to `None`): An Event instance used to determine whether the child process has been started. - stop_event (`EventClass`, defaults to `None`): - The stop Event instance used to determine whether the child - process has been stopped. pipe (`int`, defaults to `None`): A pipe instance used to pass the actual port of the server. - local_mode (`bool`, defaults to `None`): + local_mode (`bool`, defaults to `True`): If `True`, only listen to requests from "localhost", otherwise, listen to requests from all hosts. max_pool_size (`int`, defaults to `8192`): The max number of agent reply messages that the server can accommodate. Note that the oldest message will be deleted after exceeding the pool size. - max_timeout_seconds (`int`, defaults to `1800`): + max_timeout_seconds (`int`, defaults to `7200`): Maximum time for reply messages to be cached in the server. Note that expired messages will be deleted. studio_url (`str`, defaults to `None`): URL of the AgentScope Studio. - custom_agents (`list`, defaults to `None`): - A list of custom agent classes that are not in `agentscope.agents`. + custom_agent_classes (`list`, defaults to `None`): + A list of customized agent classes that are not in + `agentscope.agents`. """ from agentscope._init import init_process if init_settings is not None: init_process(**init_settings) servicer = AgentServerServicer( + stop_event=stop_event, host=host, port=port, server_id=server_id, @@ -153,8 +154,8 @@ async def _setup_agent_server_async( max_timeout_seconds=max_timeout_seconds, ) # update agent registry - if custom_agents is not None: - for agent_class in custom_agents: + if custom_agent_classes is not None: + for agent_class in custom_agent_classes: AgentBase.register_agent_class(agent_class=agent_class) async def shutdown_signal_handler() -> None: @@ -162,6 +163,8 @@ async def shutdown_signal_handler() -> None: f"Received shutdown signal. Gracefully stopping the server at " f"[{host}:{port}].", ) + if stop_event is not None: + stop_event.set() await server.stop(grace=5) loop = asyncio.get_running_loop() @@ -178,6 +181,8 @@ async def shutdown_signal_handler() -> None: servicer.port = port server = grpc.aio.server( futures.ThreadPoolExecutor(max_workers=None), + # set max message size to 32 MB + options=_DEFAULT_RPC_OPTIONS, ) add_RpcAgentServicer_to_server(servicer, server) if local_mode: @@ -197,14 +202,12 @@ async def shutdown_signal_handler() -> None: if start_event is not None: pipe.send(port) start_event.set() - while not stop_event.is_set(): - await asyncio.sleep(1) - logger.info( - f"Stopping agent server at [{host}:{port}]", - ) - await server.stop(grace=10.0) - else: - await server.wait_for_termination() + while not stop_event.is_set(): + await asyncio.sleep(1) + logger.info( + f"Stopping agent server at [{host}:{port}]", + ) + await server.stop(grace=10.0) logger.info( f"agent server [{server_id}] at {host}:{port} stopped successfully", ) @@ -218,9 +221,9 @@ def __init__( host: str = "localhost", port: int = None, max_pool_size: int = 8192, - max_timeout_seconds: int = 1800, + max_timeout_seconds: int = 7200, local_mode: bool = False, - custom_agents: list = None, + custom_agent_classes: list = None, server_id: str = None, studio_url: str = None, agent_class: Type[AgentBase] = None, @@ -238,14 +241,14 @@ def __init__( The max number of agent reply messages that the server can accommodate. Note that the oldest message will be deleted after exceeding the pool size. - max_timeout_seconds (`int`, defaults to `1800`): + max_timeout_seconds (`int`, defaults to `7200`): Maximum time for reply messages to be cached in the server. Note that expired messages will be deleted. local_mode (`bool`, defaults to `False`): If `True`, only listen to requests from "localhost", otherwise, listen to requests from all hosts. - custom_agents (`list`, defaults to `None`): - A list of custom agent classes that are not in + custom_agent_classes (`list`, defaults to `None`): + A list of customized agent classes that are not in `agentscope.agents`. server_id (`str`, defaults to `None`): The id of the agent server. If not specified, a random id @@ -265,9 +268,9 @@ def __init__( self.max_timeout_seconds = max_timeout_seconds self.local_mode = local_mode self.server = None - self.stop_event = None self.parent_con = None - self.custom_agents = custom_agents + self.custom_agent_classes = custom_agent_classes + self.stop_event = Event() self.server_id = ( RpcAgentServerLauncher.generate_server_id(self.host, self.port) if server_id is None @@ -298,11 +301,12 @@ def _launch_in_main(self) -> None: _setup_agent_server_async( host=self.host, port=self.port, + stop_event=self.stop_event, server_id=self.server_id, max_pool_size=self.max_pool_size, max_timeout_seconds=self.max_timeout_seconds, local_mode=self.local_mode, - custom_agents=self.custom_agents, + custom_agent_classes=self.custom_agent_classes, studio_url=self.studio_url, ), ) @@ -311,7 +315,6 @@ def _launch_in_sub(self) -> None: """Launch an agent server in sub-process.""" from agentscope._init import _INIT_SETTINGS - self.stop_event = Event() self.parent_con, child_con = Pipe() start_event = Event() server_process = Process( @@ -328,7 +331,7 @@ def _launch_in_sub(self) -> None: "max_timeout_seconds": self.max_timeout_seconds, "local_mode": self.local_mode, "studio_url": self.studio_url, - "custom_agents": self.custom_agents, + "custom_agent_classes": self.custom_agent_classes, }, ) server_process.start() @@ -424,7 +427,7 @@ def as_server() -> None: parser.add_argument( "--max-timeout-seconds", type=int, - default=1800, + default=7200, help=( "max time for agent reply messages to be cached" "in the server. Note that expired messages will be deleted." diff --git a/src/agentscope/server/servicer.py b/src/agentscope/server/servicer.py index 6d23e36ed..9908da6c6 100644 --- a/src/agentscope/server/servicer.py +++ b/src/agentscope/server/servicer.py @@ -1,44 +1,50 @@ # -*- coding: utf-8 -*- """ Server of distributed agent""" +import os import threading -import base64 -import json import traceback +import json from concurrent import futures +from multiprocessing.synchronize import Event as EventClass +from typing import Any from loguru import logger import requests try: import dill + import psutil import grpc from grpc import ServicerContext + from google.protobuf.empty_pb2 import Empty from expiringdict import ExpiringDict - from ..rpc.rpc_agent_pb2 import RpcMsg # pylint: disable=E0611 except ImportError as import_error: from agentscope.utils.tools import ImportErrorReporter dill = ImportErrorReporter(import_error, "distribute") + psutil = ImportErrorReporter(import_error, "distribute") grpc = ImportErrorReporter(import_error, "distribute") ServicerContext = ImportErrorReporter(import_error, "distribute") - ExpiringDict = ImportErrorReporter(import_error, "distribute") - RpcMsg = ImportErrorReporter( # type: ignore[misc] + Empty = ImportErrorReporter( # type: ignore[misc] import_error, "distribute", ) + ExpiringDict = ImportErrorReporter(import_error, "distribute") -from .._runtime import _runtime -from ..studio._client import _studio_client -from ..agents.agent import AgentBase -from ..exception import StudioRegisterError -from ..rpc.rpc_agent_pb2_grpc import RpcAgentServicer -from ..message import ( +import agentscope.rpc.rpc_agent_pb2 as agent_pb2 +from agentscope.agents.agent import AgentBase +from agentscope.models import read_model_configs +from agentscope.studio._client import _studio_client +from agentscope._runtime import _runtime +from agentscope.exception import StudioRegisterError +from agentscope.rpc.rpc_agent_pb2_grpc import RpcAgentServicer +from agentscope.message import ( Msg, PlaceholderMessage, deserialize, ) -def _register_to_studio( +def _register_server_to_studio( studio_url: str, server_id: str, host: str, @@ -56,11 +62,23 @@ def _register_to_studio( raise StudioRegisterError(f"Failed to register server: {resp.text}") +class _AgentError: + """Use this class to represent an error when calling agent funcs.""" + + def __init__(self, agent_id: str, err_msg: str) -> None: + self.agent_id = agent_id + self.err_msg = err_msg + + def __repr__(self) -> str: + return f"Agent[{self.agent_id}] error: {self.err_msg}" + + class AgentServerServicer(RpcAgentServicer): """A Servicer for RPC Agent Server (formerly RpcServerSideWrapper)""" def __init__( self, + stop_event: EventClass, host: str = "localhost", port: int = None, server_id: str = None, @@ -71,6 +89,7 @@ def __init__( """Init the AgentServerServicer. Args: + stop_event (`Event`): Event to stop the server. host (`str`, defaults to "localhost"): Hostname of the rpc agent server. port (`int`, defaults to `None`): @@ -92,7 +111,7 @@ def __init__( self.server_id = server_id self.studio_url = studio_url if studio_url is not None: - _register_to_studio( + _register_server_to_studio( studio_url=studio_url, server_id=server_id, host=host, @@ -109,6 +128,8 @@ def __init__( self.agent_id_lock = threading.Lock() self.task_id_counter = 0 self.agent_pool: dict[str, AgentBase] = {} + self.pid = os.getpid() + self.stop_event = stop_event def get_task_id(self) -> int: """Get the auto-increment task id. @@ -128,66 +149,168 @@ def agent_exists(self, agent_id: str) -> bool: """ return agent_id in self.agent_pool - def check_and_generate_agent( - self, - agent_id: str, - agent_configs: dict, - ) -> None: - """ - Check whether the agent exists, and create new agent instance - for new agent. + def get_agent(self, agent_id: str) -> AgentBase: + """Get the agent by agent id. Args: agent_id (`str`): the agent id. - agent_configs (`dict`): configuration used to initialize the agent, - with three fields (generated in `_AgentMeta`): - .. code-block:: python + Returns: + AgentBase: the agent. + """ + with self.agent_id_lock: + return self.agent_pool.get(agent_id, None) - { - "class_name": {name of the agent} - "args": {args in tuple type to init the agent} - "kwargs": {args in dict type to init the agent} - } + def is_alive( + self, + request: Empty, + context: ServicerContext, + ) -> agent_pb2.GeneralResponse: + """Check whether the server is alive.""" + return agent_pb2.GeneralResponse(ok=True) - """ + def stop( + self, + request: Empty, + context: ServicerContext, + ) -> agent_pb2.GeneralResponse: + """Stop the server.""" + self.stop_event.set() + return agent_pb2.GeneralResponse(ok=True) + + def create_agent( + self, + request: agent_pb2.CreateAgentRequest, + context: ServicerContext, + ) -> agent_pb2.GeneralResponse: + """Create a new agent on the server.""" + agent_id = request.agent_id with self.agent_id_lock: - if agent_id not in self.agent_pool: - agent_class_name = agent_configs["class_name"] - agent_instance = AgentBase.get_agent_class(agent_class_name)( + if agent_id in self.agent_pool: + return agent_pb2.GeneralResponse( + ok=False, + message=f"Agent with agent_id [{agent_id}] already exists", + ) + agent_configs = dill.loads(request.agent_init_args) + if len(request.agent_source_code) > 0: + cls = dill.loads(request.agent_source_code) + cls_name = cls.__name__ + logger.info( + f"Load class [{cls_name}] from uploaded source code.", + ) + else: + cls_name = agent_configs["class_name"] + try: + cls = AgentBase.get_agent_class(cls_name) + except ValueError as e: + err_msg = ( + f"Agent class [{cls_name}] not found: {str(e)}", + ) + logger.error(err_msg) + return agent_pb2.GeneralResponse(ok=False, message=err_msg) + try: + agent_instance = cls( *agent_configs["args"], **agent_configs["kwargs"], ) - agent_instance._agent_id = agent_id # pylint: disable=W0212 - self.agent_pool[agent_id] = agent_instance - logger.info(f"create agent instance [{agent_id}]") + except Exception as e: + err_msg = ( + f"Failed to create agent instance <{cls_name}>: {str(e)}", + ) + logger.error(err_msg) + return agent_pb2.GeneralResponse(ok=False, message=err_msg) + agent_instance._agent_id = agent_id # pylint: disable=W0212 + self.agent_pool[agent_id] = agent_instance + logger.info(f"create agent instance <{cls_name}>[{agent_id}]") + return agent_pb2.GeneralResponse(ok=True) + + def delete_agent( + self, + request: agent_pb2.StringMsg, + context: ServicerContext, + ) -> agent_pb2.GeneralResponse: + """Delete agents from the server. - def check_and_delete_agent(self, agent_id: str) -> None: + Args: + request (`StringMsg`): The `value` field is the agent_id of the + agents to be deleted. """ - Check whether the agent exists, and delete the agent instance - for the agent_id. + aid = request.value + with self.agent_id_lock: + if aid in self.agent_pool: + agent = self.agent_pool.pop(aid) + logger.info( + f"delete agent instance <{agent.__class__.__name__}>" + f"[{aid}]", + ) + return agent_pb2.GeneralResponse(ok=True) + else: + logger.warning( + f"try to delete a non-existent agent [{aid}].", + ) + return agent_pb2.GeneralResponse( + ok=False, + message=f"try to delete a non-existent agent [{aid}].", + ) + + def clone_agent( + self, + request: agent_pb2.StringMsg, + context: ServicerContext, + ) -> agent_pb2.GeneralResponse: + """Clone a new agent instance from the origin instance. Args: - agent_id (`str`): the agent id. + request (`StringMsg`): The `value` field is the agent_id of the + agent to be cloned. + + Returns: + `GeneralResponse`: The agent_id of generated agent. + Empty if clone failed. """ + agent_id = request.value with self.agent_id_lock: - if agent_id in self.agent_pool: - self.agent_pool.pop(agent_id) - logger.info(f"delete agent instance [{agent_id}]") + if agent_id not in self.agent_pool: + logger.error( + f"Try to clone a non-existent agent [{agent_id}].", + ) + return agent_pb2.GeneralResponse( + ok=False, + message=f"Try to clone a non-existent agent [{agent_id}].", + ) + ori_agent = self.agent_pool[agent_id] + new_agent = ori_agent.__class__( + *ori_agent._init_settings["args"], # pylint: disable=W0212 + **ori_agent._init_settings["kwargs"], # pylint: disable=W0212 + ) + with self.agent_id_lock: + self.agent_pool[new_agent.agent_id] = new_agent + return agent_pb2.GeneralResponse(ok=True, message=new_agent.agent_id) - def call_func( # pylint: disable=W0236 + def delete_all_agents( self, - request: RpcMsg, + request: Empty, context: ServicerContext, - ) -> RpcMsg: + ) -> agent_pb2.GeneralResponse: + with self.agent_id_lock: + self.agent_pool.clear() + logger.info( + "Deleting all agent instances on the server", + ) + return agent_pb2.GeneralResponse(ok=True) + + def call_agent_func( # pylint: disable=W0236 + self, + request: agent_pb2.RpcMsg, + context: ServicerContext, + ) -> agent_pb2.GeneralResponse: """Call the specific servicer function.""" + if not self.agent_exists(request.agent_id): + return context.abort( + grpc.StatusCode.INVALID_ARGUMENT, + f"Agent [{request.agent_id}] not exists.", + ) if hasattr(self, request.target_func): - if request.target_func not in ["_create_agent", "_get"]: - if not self.agent_exists(request.agent_id): - return context.abort( - grpc.StatusCode.INVALID_ARGUMENT, - f"Agent [{request.agent_id}] not exists.", - ) return getattr(self, request.target_func)(request) else: # TODO: support other user defined method @@ -197,7 +320,120 @@ def call_func( # pylint: disable=W0236 f"Unsupported method {request.target_func}", ) - def _reply(self, request: RpcMsg) -> RpcMsg: + def update_placeholder( + self, + request: agent_pb2.UpdatePlaceholderRequest, + context: ServicerContext, + ) -> agent_pb2.GeneralResponse: + """Update the value of a placeholder.""" + task_id = request.task_id + while True: + result = self.result_pool.get(task_id) + if isinstance(result, threading.Condition): + with result: + result.wait(timeout=1) + else: + break + if isinstance(result, _AgentError): + return agent_pb2.GeneralResponse( + ok=False, + message=result.err_msg, + ) + else: + return agent_pb2.GeneralResponse( + ok=True, + message=result.serialize(), + ) + + def get_agent_list( + self, + request: Empty, + context: ServicerContext, + ) -> agent_pb2.GeneralResponse: + """Get id of all agents on the server as a list.""" + with self.agent_id_lock: + summaries = [] + for agent in self.agent_pool.values(): + summaries.append(str(agent)) + return agent_pb2.GeneralResponse( + ok=True, + message=json.dumps(summaries), + ) + + def get_server_info( + self, + request: Empty, + context: ServicerContext, + ) -> agent_pb2.GeneralResponse: + """Get the agent server resource usage information.""" + status = {} + status["pid"] = self.pid + status["id"] = self.server_id + process = psutil.Process(self.pid) + status["cpu"] = process.cpu_percent(interval=1) + status["mem"] = process.memory_info().rss / (1024**2) + return agent_pb2.GeneralResponse(ok=True, message=json.dumps(status)) + + def set_model_configs( + self, + request: agent_pb2.StringMsg, + context: ServicerContext, + ) -> agent_pb2.GeneralResponse: + """Set the model configs of the agent server.""" + model_configs = json.loads(request.value) + try: + read_model_configs(model_configs) + except Exception as e: + return agent_pb2.GeneralResponse( + ok=False, + message=str(e), + ) + return agent_pb2.GeneralResponse(ok=True) + + def get_agent_memory( + self, + request: agent_pb2.StringMsg, + context: ServicerContext, + ) -> agent_pb2.StringMsg: + """Get the memory of a specific agent.""" + agent_id = request.value + agent = self.get_agent(agent_id=agent_id) + if agent is None: + return agent_pb2.GeneralResponse( + ok=False, + message="Agent [{agent_id}] has not found", + ) + if agent.memory is None: + return agent_pb2.GeneralResponse( + ok=False, + message="Agent [{agent_id}] has no memory", + ) + return agent_pb2.GeneralResponse( + ok=True, + message=json.dumps(agent.memory.get_memory()), + ) + + def download_file( + self, + request: agent_pb2.StringMsg, + context: ServicerContext, + ) -> Any: + """Download file from local path.""" + filepath = request.value + if not os.path.exists(filepath): + context.abort( + grpc.StatusCode.NOT_FOUND, + f"File {filepath} not found", + ) + + with open(filepath, "rb") as f: + while True: + piece = f.read(1024 * 1024) # send 1MB each time + if not piece: + break + yield agent_pb2.ByteMsg(data=piece) + + def _reply(self, request: agent_pb2.RpcMsg) -> agent_pb2.GeneralResponse: """Call function of RpcAgentService Args: @@ -216,44 +452,21 @@ def _reply(self, request: RpcMsg) -> RpcMsg: task_id = self.get_task_id() self.result_pool[task_id] = threading.Condition() self.executor.submit( - self.process_messages, + self._process_messages, task_id, request.agent_id, msg, # type: ignore[arg-type] ) - return RpcMsg( - value=Msg( # type: ignore[arg-type] - name=self.agent_pool[request.agent_id].name, + return agent_pb2.GeneralResponse( + ok=True, + message=Msg( # type: ignore[arg-type] + name=self.get_agent(request.agent_id).name, content=None, task_id=task_id, ).serialize(), ) - def _get(self, request: RpcMsg) -> RpcMsg: - """Get a reply message with specific task_id. - - Args: - request (`RpcMsg`): - The task id that generated this message, with json format:: - - { - 'task_id': int - } - - Returns: - `RpcMsg`: Concrete values of the specific message (or part of it). - """ - msg = json.loads(request.value) - while True: - result = self.result_pool.get(msg["task_id"]) - if isinstance(result, threading.Condition): - with result: - result.wait(timeout=1) - else: - break - return RpcMsg(value=result.serialize()) - - def _observe(self, request: RpcMsg) -> RpcMsg: + def _observe(self, request: agent_pb2.RpcMsg) -> agent_pb2.GeneralResponse: """Observe function of the original agent. Args: @@ -268,58 +481,9 @@ def _observe(self, request: RpcMsg) -> RpcMsg: if isinstance(msg, PlaceholderMessage): msg.update_value() self.agent_pool[request.agent_id].observe(msgs) - return RpcMsg() - - def _create_agent(self, request: RpcMsg) -> RpcMsg: - """Create a new agent instance with the given agent_id. - - Args: - request (RpcMsg): request message with a `agent_id` field. - """ - self.check_and_generate_agent( - request.agent_id, - agent_configs=( - dill.loads(base64.b64decode(request.value)) - if request.value - else None - ), - ) - return RpcMsg() - - def _clone_agent(self, request: RpcMsg) -> RpcMsg: - """Clone a new agent instance from the origin instance. - - Args: - request (RpcMsg): The `agent_id` field is the agent_id of the - agent to be cloned. + return agent_pb2.GeneralResponse(ok=True) - Returns: - `RpcMsg`: The `value` field contains the agent_id of generated - agent. - """ - agent_id = request.agent_id - with self.agent_id_lock: - if agent_id not in self.agent_pool: - raise ValueError(f"Agent [{agent_id}] not exists") - ori_agent = self.agent_pool[agent_id] - new_agent = ori_agent.__class__( - *ori_agent._init_settings["args"], # pylint: disable=W0212 - **ori_agent._init_settings["kwargs"], # pylint: disable=W0212 - ) - with self.agent_id_lock: - self.agent_pool[new_agent.agent_id] = new_agent - return RpcMsg(value=new_agent.agent_id) # type: ignore[arg-type] - - def _delete_agent(self, request: RpcMsg) -> RpcMsg: - """Delete the agent instance of the specific agent_id. - - Args: - request (RpcMsg): request message with a `agent_id` field. - """ - self.check_and_delete_agent(request.agent_id) - return RpcMsg() - - def process_messages( + def _process_messages( self, task_id: int, agent_id: str, @@ -335,17 +499,13 @@ def process_messages( if isinstance(task_msg, PlaceholderMessage): task_msg.update_value() cond = self.result_pool[task_id] + agent = self.get_agent(agent_id) try: - result = self.agent_pool[agent_id].reply(task_msg) + result = agent.reply(task_msg) self.result_pool[task_id] = result except Exception: error_msg = traceback.format_exc() logger.error(f"Error in agent [{agent_id}]:\n{error_msg}") - self.result_pool[task_id] = Msg( - name="ERROR", - role="assistant", - __status="ERROR", - content=f"Error in agent [{agent_id}]:\n{error_msg}", - ) + self.result_pool[task_id] = _AgentError(agent_id, error_msg) with cond: cond.notify_all() diff --git a/src/agentscope/studio/_app.py b/src/agentscope/studio/_app.py index 60f3de613..35b8538a6 100644 --- a/src/agentscope/studio/_app.py +++ b/src/agentscope/studio/_app.py @@ -27,6 +27,7 @@ from agentscope._runtime import _runtime from agentscope.constants import _DEFAULT_SUBDIR_CODE, _DEFAULT_SUBDIR_INVOKE from agentscope.utils.tools import _is_process_alive, _is_windows +from agentscope.rpc.rpc_agent_client import RpcAgentClient _app = Flask(__name__) @@ -267,6 +268,99 @@ def _register_server() -> Response: return jsonify(status="ok") +@_app.route("/api/servers/all", methods=["GET"]) +def _get_all_servers() -> Response: + """Get all servers.""" + servers = _ServerTable.query.all() + + return jsonify( + [ + { + "id": server.id, + "host": server.host, + "port": server.port, + "create_time": server.create_time.strftime( + "%Y-%m-%d %H:%M:%S", + ), + } + for server in servers + ], + ) + + +@_app.route("/api/servers/status/", methods=["GET"]) +def _get_server_status(server_id: str) -> Response: + server = _ServerTable.query.filter_by(id=server_id).first() + status = RpcAgentClient( + host=server.host, + port=server.port, + ).get_server_info() + if not status or status["id"] != server_id: + return jsonify({"status": "dead"}) + else: + return jsonify( + { + "status": "running", + "cpu": status["cpu"], + "mem": status["mem"], + }, + ) + + +@_app.route("/api/servers/delete", methods=["POST"]) +def _delete_server() -> Response: + server_id = request.json.get("server_id") + stop_server = request.json.get("stop", False) + server = _ServerTable.query.filter_by(id=server_id).first() + if stop_server: + RpcAgentClient(host=server.host, port=server.port).stop() + _ServerTable.query.filter_by(id=server_id).delete() + _db.session.commit() + return jsonify({"status": "ok"}) + + +@_app.route("/api/servers/agent_info/", methods=["GET"]) +def _get_server_agent_info(server_id: str) -> Response: + _app.logger.info(f"Get info of server [{server_id}]") + server = _ServerTable.query.filter_by(id=server_id).first() + agents = RpcAgentClient( + host=server.host, + port=server.port, + ).get_agent_list() + return jsonify(agents) + + +@_app.route("/api/servers/agents/delete", methods=["POST"]) +def _delete_agent() -> Response: + server_id = request.json.get("server_id") + agent_id = request.json.get("agent_id", None) + server = _ServerTable.query.filter_by(id=server_id).first() + # delete all agents if agent_id is None + if agent_id is not None: + ok = RpcAgentClient(host=server.host, port=server.port).delete_agent( + agent_id, + ) + else: + ok = RpcAgentClient( + host=server.host, + port=server.port, + ).delete_all_agent() + return jsonify(status="ok" if ok else "fail") + + +@_app.route("/api/servers/agents/memory", methods=["POST"]) +def _agent_memory() -> Response: + server_id = request.json.get("server_id") + agent_id = request.json.get("agent_id") + server = _ServerTable.query.filter_by(id=server_id).first() + mem = RpcAgentClient(host=server.host, port=server.port).get_agent_memory( + agent_id, + ) + if isinstance(mem, dict): + mem = [mem] + return jsonify(mem) + + @_app.route("/api/messages/push", methods=["POST"]) def _push_message() -> Response: """Receive a message from the agentscope application, and display it on diff --git a/src/agentscope/studio/static/css/server.css b/src/agentscope/studio/static/css/server.css new file mode 100644 index 000000000..285e151f2 --- /dev/null +++ b/src/agentscope/studio/static/css/server.css @@ -0,0 +1,132 @@ +:root { + --server-content-padding-vertical: 20px; + --server-content-padding-horizontal: 30px; + --server-table-height: calc(100% - var(--runs-content-control-panel-height)); +} + + +#server-panel { + display: flex; + height: 100%; + width: 100%; + flex-direction: column; +} + +#server-content { + display: flex; + flex-direction: column; + width: 100%; + padding: var(--server-content-padding-vertical) var(--server-content-padding-horizontal); + box-sizing: border-box; +} + +.collapsed { + display: none; +} + +#server-table { + flex-grow: 1; + overflow-y: auto; + max-height: 30vh; + width: 100%; + background-color: #ffffff; + border: 1px solid var(--border-color); +} + +#agent-memory-content { + display: flex; + flex-direction: row; + height: 200px; + width: 100%; + border: 1px solid var(--border-color); +} + +#agent-memory-table { + display: flex; + flex-direction: column; + width: 350px; + box-sizing: border-box; + border: 0; + background-color: #ffffff; +} + +#agent-memory-raw { + display: flex; + flex-direction: column; + width: calc(100% - 350px); + height: 100%; + box-sizing: border-box; + border-left: 1px solid var(--border-color); +} + +#agent-memory-raw .monaco-editor { + display: flex; + height: 100%; + width: 100%; + min-width: 100%; +} + +.server-section-title-bar { + display: flex; + justify-content: flex-start; + font-size: 20px; + padding: 6px 0px; +} + + +.align-right-btn { + margin-left: auto; +} + +#server-content .icon { + display: flex; + height: 20px; + padding: 4px 15px; + cursor: pointer; +} + +#agent-table { + flex-grow: 1; + overflow-y: auto; + max-height: 30vh; + width: 100%; + background-color: #ffffff; + border: 1px solid var(--border-color); +} + +.status-tag { + display: flex; + flex-direction: row; + align-items: center; + height: 26px; + width: fit-content; + border-radius: 13px; + color: var(--base-color); + box-sizing: border-box; + margin: 0 5px; + padding: 0 14px; +} + +.running.status-tag { + background-color: #ccf4dd; + color: #27ae60; + fill: #27ae60; +} + +.dead.status-tag { + background-color: #f5d5d1; + color: #c0392b; + fill: #c0392b; +} + +.loading.status-tag { + background-color: #e1e1e1; + color: #3d4047; + fill: #3d4047; +} + +.unknown.status-tag { + background-color: #e1e1e1; + color: #3d4047; + fill: #3d4047; +} \ No newline at end of file diff --git a/src/agentscope/studio/static/html/index-guide.html b/src/agentscope/studio/static/html/index-guide.html index 895af6ec2..8d5a2bf83 100644 --- a/src/agentscope/studio/static/html/index-guide.html +++ b/src/agentscope/studio/static/html/index-guide.html @@ -1,23 +1,29 @@

Welcome to AgentScope Studio

- AgentScope Studio is a web-based platform for monitoring, managing and developing multi-agent applications in AgentScope! Feel free to explore the platform and its features. + AgentScope Studio is a web-based platform for monitoring, managing and + developing multi-agent applications in AgentScope! Feel free to explore + the platform and its features.

Start to explore AgentScope Studio with

-
+
Dashboard
-
Monitor and manage your - AgentScope running instances. +
+ Monitor and manage your AgentScope running instances.
-
+
Workstation
-
Develop your multi-agent - applications by dragging. +
+ Develop your multi-agent applications by dragging.
@@ -26,9 +32,14 @@

Start to explore AgentScope Studio with

Gallery
Coming soon ...
-
-
Server Management
-
Coming soon ...
+
+
Server Manager
+
+ Manage your agent servers (In development). +
-
\ No newline at end of file +
diff --git a/src/agentscope/studio/static/html/server.html b/src/agentscope/studio/static/html/server.html index 533eab1f5..be0367a5d 100644 --- a/src/agentscope/studio/static/html/server.html +++ b/src/agentscope/studio/static/html/server.html @@ -1 +1,57 @@ -

Coming soon ...

\ No newline at end of file +
+
+ Server Manager +
+
+
+
+ Servers + + +
+
+
+ + +
+
diff --git a/src/agentscope/studio/static/js/index.js b/src/agentscope/studio/static/js/index.js index ac1355dd5..f4c1a64fb 100644 --- a/src/agentscope/studio/static/js/index.js +++ b/src/agentscope/studio/static/js/index.js @@ -30,6 +30,9 @@ function initializeTabPageByUrl(pageUrl) { script.src = "static/js/workstation_iframe.js"; document.head.appendChild(script); break; + case "static/html/server.html": + initializeServerPage(); + break; } } @@ -55,6 +58,9 @@ function loadTabPage(pageUrl, javascriptUrl) { activeExpanded = false; } + // Load the page content + document.getElementById("content").innerHTML = html; + // Load the javascript file if (javascriptUrl && !isScriptLoaded(javascriptUrl)) { let script = document.createElement("script"); @@ -70,9 +76,6 @@ function loadTabPage(pageUrl, javascriptUrl) { initializeTabPageByUrl(pageUrl); } - // Load the page content - document.getElementById("content").innerHTML = html; - // switch selected status of the tab buttons switch (pageUrl) { case "static/html/dashboard.html": diff --git a/src/agentscope/studio/static/js/server.js b/src/agentscope/studio/static/js/server.js new file mode 100644 index 000000000..65c599f62 --- /dev/null +++ b/src/agentscope/studio/static/js/server.js @@ -0,0 +1,483 @@ +var serversTable; +var agentsTable; +var agentMemoryTable; +var curServerId; +var curAgentId; +var messageEditor; + +// Sever table functions + +function deleteServer(row) { + fetch("/api/servers/delete", { + method: "POST", + headers: { + "Content-Type": "application/json; charset=utf-8", + }, + body: JSON.stringify({ + server_id: row.getData().id, + stop: row.getData().status == "running", + }), + }) + .then((response) => { + if (!response.ok) { + throw new Error("Failed to delete server"); + } + return response.json(); + }) + .then((data) => { + row.delete(); + }); +} + +function deleteServerBtn(e, cell) { + const serverId = cell.getData().id; + if (confirm(`Are you sure to stop server ${serverId} ?`)) { + deleteServer(cell.getRow()); + } +} + +function newServer() { + console.log("new Server"); +} + +function flushServerTable(data) { + if (serversTable) { + serversTable.setData(data); + console.log("Flush Server Table"); + } else { + console.error("Server Table is not initialized."); + } +} + +function deleteDeadServer() { + let deadServerIds = []; + if (serversTable) { + let rows = serversTable.getRows(); + for (let i = 0; i < rows.length; i++) { + let row = rows[i]; + if (row.getData().status == "dead") { + deadServerIds.push(row.getData().id); + deleteServer(row); + } + } + } else { + console.error("Server Table is not initialized."); + } + console.log("Delete dead servers: ", deadServerIds); + return deadServerIds; +} + +function deleteIcon(cell, formatterParams, onRendered) { + return ''; +} + +function getServerStatus(cell, formatterParams, onRendered) { + cell.getElement().innerHTML = + '
loading
'; + + const serverId = cell.getRow().getData().id; + + fetch(`/api/servers/status/${serverId}`) + .then((response) => { + if (!response.ok) { + throw new Error("Failed to get server status"); + } + return response.json(); + }) + .then((data) => { + let row = cell.getRow(); + if (data.status == "running") { + cell.getElement().innerHTML = + '
running
'; + row.update({ + status: data.status, + cpu: data.cpu, + mem: data.mem, + }); + } else { + cell.getElement().innerHTML = + '
dead
'; + row.update({ + status: "dead", + }); + } + }) + .catch((error) => { + console.error("Error fetching server status:", error); + cell.getElement().innerHTML = + '
unknown
'; + }); +} + +function cpuUsage(cell, formatterParams, onRendered) { + const cpu = cell.getData().cpu; + if (!cpu && cpu !== 0) { + return '
unknown
'; + } else { + return `
${cell.getData().cpu} %
`; + } +} + +function memoryUsage(cell, formatterParams, onRendered) { + if (cell.getData().mem) { + return `
${cell + .getData() + .mem.toFixed(2)} MB
`; + } else { + return '
unknown
'; + } +} + +function getServerTableData(callback) { + fetch("/api/servers/all") + .then((response) => { + if (!response.ok) { + throw new Error("Failed to fetch servers data"); + } + return response.json(); + }) + .then((data) => { + callback(data); + }); +} + +function initServerTable(data) { + serversTable = new Tabulator("#server-table", { + data: data, + columns: [ + { + title: "", + field: "status", + vertAlign: "middle", + visible: false, + }, + { + title: "ID", + field: "id", + vertAlign: "middle", + }, + { + title: "Host", + field: "host", + vertAlign: "middle", + }, + { + title: "Port", + field: "port", + vertAlign: "middle", + }, + { + title: "Created Time", + field: "create_time", + vertAlign: "middle", + }, + { + title: "Status", + vertAlign: "middle", + formatter: getServerStatus, + }, + { + title: "CPU Usage", + field: "cpu", + vertAlign: "middle", + formatter: cpuUsage, + }, + { + title: "Memory Usage", + field: "mem", + vertAlign: "middle", + formatter: memoryUsage, + }, + { + title: "Delete", + formatter: deleteIcon, + width: 75, + vertAlign: "middle", + cellClick: deleteServerBtn, + }, + ], + layout: "fitColumns", + }); + serversTable.on("rowClick", function (e, row) { + if (row.getData().status != "running") { + return; + } + if (e.target.classList.contains("cell-btn")) { + return; + } + loadAgentDetails(row.getData()); + }); +} + +// Agent table functions + +function getAgentTableData(serverId, callback) { + fetch(`/api/servers/agent_info/${serverId}`) + .then((response) => { + if (!response.ok) { + throw new Error("Failed to fetch agents data"); + } + return response.json(); + }) + .then((data) => { + callback(serverId, data); + }); +} + +function flushAgentTable(serverId, data) { + if (agentsTable) { + agentsTable.setData(data); + console.log("Flush Agent Table"); + } else { + console.error("Agent Table is not initialized."); + } +} + +function deleteAllAgent() { + let serverId = curServerId; + if (agentsTable) { + if (confirm(`Are you sure to delete all agent on ${serverId} ?`)) { + fetch(`/api/servers/agents/delete`, { + method: "POST", + headers: { + "Content-Type": "application/json; charset=utf-8", + }, + body: JSON.stringify({ + server_id: serverId, + }), + }) + .then((response) => { + return response.json(); + }) + .then((data) => { + agentsTable.clearData(); + }) + .catch((error) => { + console.error("Error when deleting all agent:", error); + }); + } + } else { + console.error("Agent Table is not initialized."); + } +} + +function initAgentTable(serverId, data) { + curServerId = serverId; + agentsTable = new Tabulator("#agent-table", { + data: data, + columns: [ + { + title: "ID", + field: "agent_id", + vertAlign: "middle", + }, + { + title: "Name", + field: "name", + vertAlign: "middle", + }, + { + title: "Class", + field: "type", + vertAlign: "middle", + }, + { + title: "System prompt", + field: "sys_prompt", + vertAlign: "middle", + }, + { + title: "Model", + field: "model", + vertAlign: "middle", + formatter: function (cell, formatterParams, onRendered) { + if (cell.getData().model == null) { + return `
None
`; + } + return `
[${ + cell.getData().model.model_type + }]: ${cell.getData().model.config_name}
`; + }, + }, + { + title: "Delete", + formatter: deleteIcon, + width: 75, + hozAlign: "center", + vertAlign: "middle", + cellClick: function (e, cell) { + if ( + confirm( + `Are you sure to delete agent ${ + cell.getData().id + } ?` + ) + ) { + fetch(`/api/servers/agents/delete`, { + method: "POST", + headers: { + "Content-Type": + "application/json; charset=utf-8", + }, + body: JSON.stringify({ + agent_id: cell.getData().agent_id, + server_id: serverId, + }), + }) + .then((response) => { + return response.json(); + }) + .then((data) => { + cell.getRow().delete(); + }) + .catch((error) => { + console.error( + "Error when deleting agent:", + error + ); + }); + } + }, + }, + ], + layout: "fitColumns", + }); + agentsTable.on("rowClick", function (e, row) { + if (e.target.classList.contains("cell-btn")) { + return; + } + loadAgentMemory(serverId, row.getData().agent_id, row.getData().name); + }); +} + +function loadAgentDetails(serverData) { + var serverDetail = document.getElementById("server-detail"); + var serverDetailTitle = serverDetail.querySelector(".server-section-title"); + serverDetailTitle.textContent = `Agents on (${serverData.host}:${serverData.port})[${serverData.id}]`; + serverDetail.classList.remove("collapsed"); + var agentMemory = document.getElementById("agent-memory"); + if (!agentMemory.classList.contains("collapsed")) { + agentMemory.classList.add("collapsed"); + } + getAgentTableData(serverData.id, initAgentTable); +} + +// agent memory functions + +function showMessage(message) { + if (messageEditor) { + messageEditor.setValue(JSON.stringify(message, null, 2)); + } else { + console.error("Message Editor is not initialized."); + } +} + +function loadAgentMemory(serverId, agentId, agentName) { + var agentMemory = document.getElementById("agent-memory"); + var agentMemoryTitle = agentMemory.querySelector(".server-section-title"); + agentMemoryTitle.textContent = `Memory of (${agentName})[${agentId}]`; + agentMemory.classList.remove("collapsed"); + getAgentMemoryData(serverId, agentId, initAgentMemoryTable); +} + +function getAgentMemoryData(serverId, agentId, callback) { + fetch(`/api/servers/agents/memory`, { + method: "POST", + headers: { + "Content-Type": "application/json; charset=utf-8", + }, + body: JSON.stringify({ + server_id: serverId, + agent_id: agentId, + }), + }) + .then((response) => { + if (!response.ok) { + throw new Error("Failed to fetch agent memory data"); + } + return response.json(); + }) + .then((data) => { + // Update the agent memory table with the fetched data + callback(agentId, data); + }); +} + +function initAgentMemoryTable(agentId, memoryData) { + agentMemoryTable = new Tabulator("#agent-memory-table", { + data: memoryData, + columns: [ + { + title: "Name", + field: "name", + vertAlign: "middle", + }, + { + title: "Role", + field: "role", + vertAlign: "middle", + }, + ], + layout: "fitColumns", + }); + agentMemoryTable.on("rowClick", function (e, row) { + showMessage(row.getData()); + }); + require.config({ + paths: { + vs: "https://cdn.jsdelivr.net/npm/monaco-editor@latest/min/vs", + }, + }); + require(["vs/editor/editor.main"], function () { + if (messageEditor) { + messageEditor.dispose(); + messageEditor = null; + } + messageEditor = monaco.editor.create( + document.getElementById("agent-memory-raw"), + { + language: "json", + theme: "vs-light", + minimap: { + enabled: false, + }, + scrollBeyondLastLine: false, + readOnly: true, + } + ); + }); +} + +function flushAgentMemoryTable(agentId, data) { + if (agentMemoryTable) { + agentMemoryTable.setData(data); + console.log("Flush Agent Memory Table"); + } else { + console.error("Agent Memory Table is not initialized."); + } +} + +// Initialize the server page with a table of servers +function initializeServerPage() { + // init servers + console.log("init server manager script"); + getServerTableData(initServerTable); + let serverflushBtn = document.getElementById("flush-server-btn"); + serverflushBtn.onclick = function () { + getServerTableData(flushServerTable); + }; + let deleteDeadServerBtn = document.getElementById("delete-dead-server-btn"); + deleteDeadServerBtn.onclick = deleteDeadServer; + let agentflushBtn = document.getElementById("flush-agent-btn"); + agentflushBtn.onclick = function () { + let serverId = curServerId; + getAgentTableData(serverId, flushAgentTable); + }; + let deleteAllAgentBtn = document.getElementById("delete-all-agent-btn"); + deleteAllAgentBtn.onclick = deleteAllAgent; + window.addEventListener("resize", () => { + if (messageEditor) { + messageEditor.layout(); + } + }); +} diff --git a/src/agentscope/studio/static/svg/circle-add.svg b/src/agentscope/studio/static/svg/circle-add.svg new file mode 100644 index 000000000..e635a2f77 --- /dev/null +++ b/src/agentscope/studio/static/svg/circle-add.svg @@ -0,0 +1,12 @@ + + + \ No newline at end of file diff --git a/src/agentscope/studio/static/svg/circle-delete.svg b/src/agentscope/studio/static/svg/circle-delete.svg new file mode 100644 index 000000000..afa63ca46 --- /dev/null +++ b/src/agentscope/studio/static/svg/circle-delete.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/src/agentscope/studio/static/svg/flush.svg b/src/agentscope/studio/static/svg/flush.svg new file mode 100644 index 000000000..0a41bd5e6 --- /dev/null +++ b/src/agentscope/studio/static/svg/flush.svg @@ -0,0 +1,13 @@ + + + \ No newline at end of file diff --git a/src/agentscope/studio/static/svg/trash-bin.svg b/src/agentscope/studio/static/svg/trash-bin.svg new file mode 100644 index 000000000..e205f45ac --- /dev/null +++ b/src/agentscope/studio/static/svg/trash-bin.svg @@ -0,0 +1,3 @@ + + + \ No newline at end of file diff --git a/src/agentscope/studio/templates/index.html b/src/agentscope/studio/templates/index.html index 0d5cb74f7..3ba6334e1 100644 --- a/src/agentscope/studio/templates/index.html +++ b/src/agentscope/studio/templates/index.html @@ -13,6 +13,8 @@ href="{{ url_for('static', filename='css/index.css') }}"> + @@ -77,12 +79,12 @@
diff --git a/src/agentscope/utils/tools.py b/src/agentscope/utils/tools.py index fb958632a..e267e86bc 100644 --- a/src/agentscope/utils/tools.py +++ b/src/agentscope/utils/tools.py @@ -252,6 +252,21 @@ def generate_id_from_seed(seed: str, length: int = 8) -> str: return "".join(id_chars) +def is_web_accessible(url: str) -> bool: + """Whether the url is accessible from the Web. + + Args: + url (`str`): + The url to check. + + Note: + This function is not perfect, it only checks if the URL starts with + common web protocols, e.g., http, https, ftp, oss. + """ + parsed_url = urlparse(url) + return parsed_url.scheme in ["http", "https", "ftp", "oss"] + + def _is_json_serializable(obj: Any) -> bool: """Check if the given object is json serializable.""" try: diff --git a/tests/agent_test.py b/tests/agent_test.py index 4a120df1a..8353107fa 100644 --- a/tests/agent_test.py +++ b/tests/agent_test.py @@ -72,10 +72,8 @@ def test_agent_init(self) -> None: {"sys_prompt": "Hello", "attribute_2": "Bye"}, ) self.assertNotEqual(a1.agent_id, a2.agent_id) - self.assertTrue(a1.agent_id.startswith("TestAgent")) - self.assertTrue(a2.agent_id.startswith("TestAgent")) a3 = TestAgentCopy("c") - self.assertTrue(a3.agent_id.startswith("TestAgentCopy")) + self.assertNotEqual(a3.agent_id, a2.agent_id) a4 = TestAgent( "d", ) diff --git a/tests/data/image.png b/tests/data/image.png new file mode 100644 index 000000000..2a96d01c9 Binary files /dev/null and b/tests/data/image.png differ diff --git a/tests/rpc_agent_test.py b/tests/rpc_agent_test.py index 321087a44..c55254479 100644 --- a/tests/rpc_agent_test.py +++ b/tests/rpc_agent_test.py @@ -3,13 +3,13 @@ Unit tests for rpc agent classes """ import unittest -import time import os +import time import shutil from loguru import logger import agentscope -from agentscope.agents import AgentBase, DistConf +from agentscope.agents import AgentBase, DistConf, DialogAgent from agentscope.server import RpcAgentServerLauncher from agentscope.message import Msg from agentscope.message import PlaceholderMessage @@ -17,6 +17,8 @@ from agentscope.msghub import msghub from agentscope.pipelines import sequentialpipeline from agentscope.utils import MonitorFactory, QuotaExceededError +from agentscope.rpc.rpc_agent_client import RpcAgentClient +from agentscope.exception import AgentCallError class DemoRpcAgent(AgentBase): @@ -152,6 +154,25 @@ def reply(self, x: dict = None) -> dict: raise RuntimeError("Demo Error") +class FileAgent(AgentBase): + """An agent returns a file""" + + def reply(self, x: dict = None) -> dict: + image_path = os.path.abspath( + os.path.join( + os.path.abspath(os.path.dirname(__file__)), + "data", + "image.png", + ), + ) + return Msg( + name=self.name, + role="assistant", + content="Image", + url=image_path, + ) + + class BasicRpcAgentTest(unittest.TestCase): "Test cases for Rpc Agent" @@ -227,9 +248,11 @@ def test_connect_to_an_existing_rpc_server(self) -> None: host="127.0.0.1", port=12010, local_mode=False, - custom_agents=[DemoRpcAgent], + custom_agent_classes=[DemoRpcAgent], ) launcher.launch() + client = RpcAgentClient(host=launcher.host, port=launcher.port) + self.assertTrue(client.is_alive()) # pylint: disable=W0212 agent_a = DemoRpcAgent( name="a", ).to_dist( @@ -426,7 +449,7 @@ def test_multi_agent_in_same_server(self) -> None: host="127.0.0.1", port=12010, local_mode=False, - custom_agents=[DemoRpcAgentWithMemory], + custom_agent_classes=[DemoRpcAgentWithMemory], ) launcher.launch() # although agent1 and agent2 connect to the same server @@ -471,7 +494,7 @@ def test_multi_agent_in_same_server(self) -> None: res4 = agent2(msg4) self.assertEqual(res4.content["mem_size"], 3) # delete existing agent - agent2.client.delete_agent() + agent2.client.delete_agent(agent2.agent_id) msg2 = Msg(name="System", content="First Msg for agent2") res2 = agent2(msg2) self.assertRaises(ValueError, res2.__getattr__, "content") @@ -501,14 +524,6 @@ def test_clone_instances(self) -> None: self.assertEqual(len(agents), 2) agent1 = agents[0] agent2 = agents[1] - self.assertTrue(agent1.agent_id.startswith("DemoRpcAgentWithMemory")) - self.assertTrue(agent2.agent_id.startswith("DemoRpcAgentWithMemory")) - self.assertTrue( - agent1.client.agent_id.startswith("DemoRpcAgentWithMemory"), - ) - self.assertTrue( - agent2.client.agent_id.startswith("DemoRpcAgentWithMemory"), - ) self.assertNotEqual(agent1.agent_id, agent2.agent_id) self.assertEqual(agent1.agent_id, agent1.client.agent_id) self.assertEqual(agent2.agent_id, agent2.client.agent_id) @@ -545,7 +560,7 @@ def test_error_handling(self) -> None: """Test error handling""" agent = DemoErrorAgent(name="a").to_dist() x = agent() - self.assertRaises(RuntimeError, x.__getattr__, "content") + self.assertRaises(AgentCallError, x.__getattr__, "content") def test_agent_nesting(self) -> None: """Test agent nesting""" @@ -555,14 +570,14 @@ def test_agent_nesting(self) -> None: host=host, port=12010, local_mode=False, - custom_agents=[DemoGatherAgent, DemoGeneratorAgent], + custom_agent_classes=[DemoGatherAgent, DemoGeneratorAgent], ) launcher2 = RpcAgentServerLauncher( # choose port automatically host=host, port=12011, local_mode=False, - custom_agents=[DemoGatherAgent, DemoGeneratorAgent], + custom_agent_classes=[DemoGatherAgent, DemoGeneratorAgent], ) launcher1.launch() launcher2.launch() @@ -606,3 +621,112 @@ def test_agent_nesting(self) -> None: self.assertTrue(0.5 < r2.content["time"] < 2) launcher1.shutdown() launcher2.shutdown() + + def test_agent_server_management_funcs(self) -> None: + """Test agent server management functions""" + launcher = RpcAgentServerLauncher( + host="localhost", + port=12010, + local_mode=False, + custom_agent_classes=[DemoRpcAgentWithMemory, FileAgent], + ) + launcher.launch() + client = RpcAgentClient(host="localhost", port=launcher.port) + agent_lists = client.get_agent_list() + self.assertEqual(len(agent_lists), 0) + memory_agent = DemoRpcAgentWithMemory( + name="demo", + to_dist={ + "host": "localhost", + "port": launcher.port, + }, + ) + resp = memory_agent(Msg(name="test", content="first msg", role="user")) + resp.update_value() + memory = client.get_agent_memory(memory_agent.agent_id) + self.assertEqual(len(memory), 2) + self.assertEqual(memory[0]["content"], "first msg") + self.assertEqual(memory[1]["content"]["mem_size"], 1) + agent_lists = client.get_agent_list() + self.assertEqual(len(agent_lists), 1) + self.assertEqual(agent_lists[0]["agent_id"], memory_agent.agent_id) + agent_info = agent_lists[0] + logger.info(agent_info) + server_info = client.get_server_info() + logger.info(server_info) + self.assertTrue("pid" in server_info) + self.assertTrue("id" in server_info) + self.assertTrue("cpu" in server_info) + self.assertTrue("mem" in server_info) + # test download file + file_agent = FileAgent("File").to_dist( + host="localhost", + port=launcher.port, + ) + file = file_agent() + remote_file_path = os.path.abspath( + os.path.join( + os.path.abspath(os.path.dirname(__file__)), + "data", + "image.png", + ), + ) + local_file_path = file.url + self.assertNotEqual(remote_file_path, local_file_path) + with open(remote_file_path, "rb") as rf: + remote_content = rf.read() + with open(local_file_path, "rb") as lf: + local_content = lf.read() + self.assertEqual(remote_content, local_content) + agent_lists = client.get_agent_list() + self.assertEqual(len(agent_lists), 2) + # model not exists error + self.assertRaises( + Exception, + DialogAgent, + name="dialogue", + sys_prompt="You are a helful assistant.", + model_config_name="my_openai", + to_dist={ + "host": "localhost", + "port": launcher.port, + }, + ) + # set model configs + client.set_model_configs( + [ + { + "model_type": "openai_chat", + "config_name": "my_openai", + "model_name": "gpt-3.5-turbo", + "api_key": "xxx", + "organization": "xxx", + "generate_args": { + "temperature": 0.5, + }, + }, + { + "model_type": "post_api_chat", + "config_name": "my_postapi", + "api_url": "https://xxx", + "headers": {}, + }, + ], + ) + # create agent after set model configs + dia_agent = DialogAgent( # pylint: disable=E1123 + name="dialogue", + sys_prompt="You are a helful assistant.", + model_config_name="my_openai", + to_dist={ + "host": "localhost", + "port": launcher.port, + }, + ) + self.assertIsNotNone(dia_agent) + self.assertTrue(client.delete_all_agent()) + self.assertEqual(len(client.get_agent_list()), 0) + # client.stop() + # time.sleep(1) + # self.assertFalse(client.is_alive()) + launcher.shutdown()