Skip to content

Commit

Permalink
added multiprocessing parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
masanorihirano committed Oct 23, 2024
1 parent d626bdb commit b7aaa3e
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 15 deletions.
1 change: 1 addition & 0 deletions pams/runners/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .agent_parallel import MultiProcessAgentParallelRuner
from .agent_parallel import MultiThreadAgentParallelRuner
from .base import Runner
from .sequential import SequentialRunner
23 changes: 20 additions & 3 deletions pams/runners/agent_parallel.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import random
import warnings
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
from io import TextIOWrapper
from multiprocessing import cpu_count
Expand All @@ -26,6 +27,10 @@ class MultiThreadAgentParallelRuner(SequentialRunner):
This means that the number of agents that can be parallelized is limited by MAX_NORMAL_ORDERS.
"""

_parallel_pool_provider: Union[
Type[ThreadPoolExecutor], Type[ProcessPoolExecutor]
] = ThreadPoolExecutor

def __init__(
self,
settings: Union[Dict, TextIOWrapper, os.PathLike, str],
Expand All @@ -35,7 +40,7 @@ def __init__(
):
super().__init__(settings, prng, logger, simulator_class)
warnings.warn(
"MultiThreadRuner is experimental. Future changes may occur disruptively."
f"{self.__class__.__name__} is experimental. Future changes may occur disruptively."
)
self.num_parallel = max(cpu_count() - 1, 1)

Expand All @@ -48,11 +53,11 @@ def _setup(self) -> None:
)
if self.num_parallel > max_notmal_orders:
warnings.warn(
f"When MultiThreadAgentParallelRuner is used, the maximum number of parallel agents"
f"When {self.__class__.__name__} is used, the maximum number of parallel agents"
f" is limited by max_normal_orders ({max_notmal_orders}) evne if numParallel"
f" ({self.num_parallel}) is set to a larger value."
)
self.thread_pool = ThreadPoolExecutor(max_workers=self.num_parallel)
self.thread_pool = self._parallel_pool_provider(max_workers=self.num_parallel)

def _collect_orders_from_normal_agents(
self, session: Session
Expand Down Expand Up @@ -88,3 +93,15 @@ def _collect_orders_from_normal_agents(
)
all_orders.append(orders)
return all_orders


class MultiProcessAgentParallelRuner(MultiThreadAgentParallelRuner):
"""Multi Process Agent Parallel runner class. This is experimental.
In this runner, only normal agents are parallelized in each steps.
This means that the number of agents that can be parallelized is limited by MAX_NORMAL_ORDERS.
"""

_parallel_pool_provider: Union[
Type[ThreadPoolExecutor], Type[ProcessPoolExecutor]
] = ProcessPoolExecutor
15 changes: 15 additions & 0 deletions tests/pams/runners/delay_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import time
from typing import List

from pams.agents.fcn_agent import FCNAgent
from pams.market import Market
from pams.order import Cancel
from pams.order import Order

wait_time = 0.2 # seconds


class FCNDelayAgent(FCNAgent):
def submit_orders(self, markets: List[Market]) -> List[Order | Cancel]:
time.sleep(wait_time) # Simulate a delay
return super().submit_orders(markets)
22 changes: 10 additions & 12 deletions tests/pams/runners/test_agent_parallel.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import copy
import time
from typing import Dict
from typing import List
from typing import Type
from typing import cast

from pams.agents.fcn_agent import FCNAgent
from pams.market import Market
from pams.order import Cancel
from pams.order import Order
from pams.runners import MultiProcessAgentParallelRuner
from pams.runners import MultiThreadAgentParallelRuner
from pams.runners import Runner
from pams.runners.sequential import SequentialRunner
from tests.pams.runners.test_base import TestRunner

from .delay_agent import FCNDelayAgent
from .delay_agent import wait_time


class TestMultiThreadAgentParallelRuner(TestRunner):
runner_class: Type[Runner] = MultiThreadAgentParallelRuner
Expand Down Expand Up @@ -59,14 +59,8 @@ class TestMultiThreadAgentParallelRuner(TestRunner):
}

def test_parallel_efficiency(self) -> None:
wait_time = 0.2 # seconds

class FCNDelayAgent(FCNAgent):
def submit_orders(self, markets: List[Market]) -> List[Order | Cancel]:
time.sleep(wait_time) # Simulate a delay
return super().submit_orders(markets)

setting = self.default_setting.copy()
setting = copy.deepcopy(self.default_setting)
setting["FCNAgents"]["class"] = "FCNDelayAgent" # Use the delayed agent

runner_class_dummy = self.runner_class
Expand Down Expand Up @@ -99,3 +93,7 @@ def submit_orders(self, markets: List[Market]) -> List[Order | Cancel]:
assert elps_time_sequential > wait_time * 15
assert elps_time_parallel < wait_time * 5 + 1
assert elps_time_parallel > wait_time * 5


class TestMultiProcessAgentParallelRuner(TestMultiThreadAgentParallelRuner):
runner_class: Type[Runner] = MultiProcessAgentParallelRuner

0 comments on commit b7aaa3e

Please sign in to comment.