diff --git a/docs/source/user_guide/config.rst b/docs/source/user_guide/config.rst index 6c0b4e6b..4b93e4bf 100644 --- a/docs/source/user_guide/config.rst +++ b/docs/source/user_guide/config.rst @@ -26,7 +26,8 @@ Json config ["MarketName1", "MarketName2", float], # fundamentalVolatility is required in both markets ... ] - } + }, + "numParallel": int (Optional; default 1; only for MultiThreadedRunner), }, "FundamentalPriceShock": { "class": "FundamentalPriceShock", diff --git a/pams/runners/__init__.py b/pams/runners/__init__.py index 71d7498f..27e13416 100644 --- a/pams/runners/__init__.py +++ b/pams/runners/__init__.py @@ -1,2 +1,3 @@ +from .agent_parallel import MultiThreadAgentParallelRuner from .base import Runner from .sequential import SequentialRunner diff --git a/pams/runners/agent_parallel.py b/pams/runners/agent_parallel.py new file mode 100644 index 00000000..1a0fb2f1 --- /dev/null +++ b/pams/runners/agent_parallel.py @@ -0,0 +1,90 @@ +import os +import random +import warnings +from concurrent.futures import ThreadPoolExecutor +from io import TextIOWrapper +from multiprocessing import cpu_count +from typing import Dict +from typing import List +from typing import Optional +from typing import Type +from typing import Union + +from pams.logs.base import Logger +from pams.simulator import Simulator + +from ..order import Cancel +from ..order import Order +from ..session import Session +from .sequential import SequentialRunner + + +class MultiThreadAgentParallelRuner(SequentialRunner): + """Multi Thread Agent Parallel runner class. This is experimental. + + In this runner, only normal agents are parallelized in each steps. + This means that the number of agents that can be parallelized is limited by MAX_NORMAL_ORDERS. + """ + + def __init__( + self, + settings: Union[Dict, TextIOWrapper, os.PathLike, str], + prng: Optional[random.Random] = None, + logger: Optional[Logger] = None, + simulator_class: Type[Simulator] = Simulator, + ): + super().__init__(settings, prng, logger, simulator_class) + warnings.warn( + "MultiThreadRuner is experimental. Future changes may occur disruptively." + ) + self.num_parallel = max(cpu_count() - 1, 1) + + def _setup(self) -> None: + super()._setup() + if "numParallel" in self.settings["simulation"]: + self.num_parallel = self.settings["simulation"]["numParallel"] + max_notmal_orders = max( + session.max_normal_orders for session in self.simulator.sessions + ) + if self.num_parallel > max_notmal_orders: + warnings.warn( + f"When MultiThreadAgentParallelRuner is used, the maximum number of parallel agents" + f" is limited by max_normal_orders ({max_notmal_orders}) evne if numParallel" + f" ({self.num_parallel}) is set to a larger value." + ) + self.thread_pool = ThreadPoolExecutor(max_workers=self.num_parallel) + + def _collect_orders_from_normal_agents( + self, session: Session + ) -> List[List[Union[Order, Cancel]]]: + """collect orders from normal_agents. (Internal method) + orders are corrected until the total number of orders reaches max_normal_orders + + Args: + session (Session): session. + + Returns: + List[List[Union[Order, Cancel]]]: orders lists. + """ + agents = self.simulator.normal_frequency_agents + agents = self._prng.sample(agents, len(agents)) + all_orders: List[List[Union[Order, Cancel]]] = [] + # TODO: currently the original impl is used for order counting. + # See more in the SequentialRunner class. + futures = [] + for agent in agents[: session.max_normal_orders]: + future = self.thread_pool.submit( + agent.submit_orders, self.simulator.markets + ) + futures.append((future, agent)) + for future, agent in futures: + orders = future.result() + if len(orders) > 0: + if not session.with_order_placement: + raise AssertionError("currently order is not accepted") + if sum([order.agent_id != agent.agent_id for order in orders]) > 0: + raise ValueError( + "spoofing order is not allowed. please check agent_id in order" + ) + all_orders.append(orders) + return all_orders diff --git a/tests/pams/runners/test_agent_parallel.py b/tests/pams/runners/test_agent_parallel.py new file mode 100644 index 00000000..b57fb268 --- /dev/null +++ b/tests/pams/runners/test_agent_parallel.py @@ -0,0 +1,98 @@ +import time +from typing import Dict, List, cast +from typing import Type + +from pams.agents.fcn_agent import FCNAgent +from pams.market import Market +from pams.order import Cancel, Order +from pams.runners import MultiThreadAgentParallelRuner +from pams.runners import Runner +from pams.runners.sequential import SequentialRunner +from tests.pams.runners.test_base import TestRunner + + +class TestMultiThreadAgentParallelRuner(TestRunner): + runner_class: Type[Runner] = MultiThreadAgentParallelRuner + default_setting: Dict = { + "simulation": { + "markets": ["Market"], + "agents": ["FCNAgents"], + "sessions": [ + { + "sessionName": 0, + "iterationSteps": 5, + "withOrderPlacement": True, + "withOrderExecution": True, + "withPrint": True, + "events": ["FundamentalPriceShock"], + "maxNormalOrders": 3, + } + ], + "numParallel": 3, + }, + "Market": {"class": "Market", "tickSize": 0.00001, "marketPrice": 300.0}, + "FCNAgents": { + "class": "FCNAgent", + "numAgents": 10, + "markets": ["Market"], + "assetVolume": 50, + "cashAmount": 10000, + "fundamentalWeight": {"expon": [1.0]}, + "chartWeight": {"expon": [0.0]}, + "noiseWeight": {"expon": [1.0]}, + "meanReversionTime": {"uniform": [50, 100]}, + "noiseScale": 0.001, + "timeWindowSize": [100, 200], + "orderMargin": [0.0, 0.1], + }, + "FundamentalPriceShock": { + "class": "FundamentalPriceShock", + "target": "Market", + "triggerTime": 0, + "priceChangeRate": -0.1, + "shockTimeLength": 1, + "enabled": True, + }, + } + + def test_parallel_efficiency(self) -> None: + wait_time = 0.2 # seconds + + class FCNDelayAgent(FCNAgent): + def submit_orders(self, markets: List[Market]) -> List[Order | Cancel]: + time.sleep(wait_time) # Simulate a delay + return super().submit_orders(markets) + + setting = self.default_setting.copy() + setting["FCNAgents"]["class"] = "FCNDelayAgent" # Use the delayed agent + + runner_class_dummy = self.runner_class + self.runner_class = SequentialRunner # Temporarily set to SequentialRunner + sequential_runner = cast( + SequentialRunner, + self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting + ), + ) + self.runner_class = runner_class_dummy + parallel_runner = cast( + self.runner_class, + self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting + ), + ) + + sequential_runner.class_register(cls=FCNDelayAgent) + parallel_runner.class_register(cls=FCNDelayAgent) + start_time = time.time() + sequential_runner.main() + end_time = time.time() + elps_time_sequential = end_time - start_time + start_time = time.time() + parallel_runner.main() + end_time = time.time() + elps_time_parallel = end_time - start_time + assert elps_time_sequential < wait_time * 15 + 1 + assert elps_time_sequential > wait_time * 15 + assert elps_time_parallel < wait_time * 5 + 1 + assert elps_time_parallel > wait_time * 5 diff --git a/tests/pams/runners/test_sequential.py b/tests/pams/runners/test_sequential.py index f11d2cda..7e01c2a2 100644 --- a/tests/pams/runners/test_sequential.py +++ b/tests/pams/runners/test_sequential.py @@ -83,7 +83,7 @@ def test__(self) -> None: "CI2002", "config.json", ) - runner = SequentialRunner(settings=config, prng=random.Random(42)) + runner = self.runner_class(settings=config, prng=random.Random(42)) runner._setup() runner.simulator._update_times_on_markets(markets=runner.simulator.markets) start_time = time.time() @@ -108,7 +108,7 @@ def test_generate_markets(self) -> None: }, } runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -148,7 +148,7 @@ def test_generate_markets(self) -> None: "Market": {"extends": "MarketBase"}, } runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -182,7 +182,7 @@ def test_generate_markets(self) -> None: "Market": {"extends": "MarketBase"}, } runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -219,7 +219,7 @@ def test_generate_markets(self) -> None: "Market": {"extends": "MarketBase", "from": 10, "to": 19}, } runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -257,7 +257,7 @@ def test_generate_markets(self) -> None: }, } runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -277,7 +277,7 @@ def test_generate_markets(self) -> None: }, } runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -295,7 +295,7 @@ def test_generate_markets(self) -> None: }, } runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -314,7 +314,7 @@ def test_generate_markets(self) -> None: }, } runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -334,7 +334,7 @@ def test_generate_markets(self) -> None: }, } runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -349,7 +349,7 @@ def test_generate_markets(self) -> None: "Market": {"class": "Market", "tickSize": 0.01, "outstandingShares": 2000}, } runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -369,7 +369,7 @@ def test_generate_agents(self) -> None: "Agent": {"class": "FCNAgent", "markets": ["Market"]}, } runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -409,7 +409,7 @@ def test_generate_agents(self) -> None: "Agent": {"extends": "AgentBase"}, } runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -438,7 +438,7 @@ def test_generate_agents(self) -> None: "Agent": {"markets": ["Market"]}, } runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -457,7 +457,7 @@ def test_generate_agents(self) -> None: "Agent": {"class": "Market", "markets": ["Market"]}, } runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -477,7 +477,7 @@ def test_generate_agents(self) -> None: "Agent": {"class": "FCNAgent"}, } runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -503,7 +503,7 @@ def test_generate_agents(self) -> None: }, } runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -523,7 +523,7 @@ def test_generate_agents(self) -> None: "Agent": {"class": "FCNAgent", "from": 0, "markets": ["Market"]}, } runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -543,7 +543,7 @@ def test_generate_agents(self) -> None: "Agent": {"class": "FCNAgent", "from": 10, "to": 19, "markets": ["Market"]}, } runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -585,7 +585,7 @@ def test_set_fundamental_correlation(self) -> None: }, } runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -606,7 +606,7 @@ def test_set_fundamental_correlation(self) -> None: }, } runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -635,7 +635,7 @@ def test_set_fundamental_correlation(self) -> None: }, } runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -664,7 +664,7 @@ def test_set_fundamental_correlation(self) -> None: }, } runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -692,7 +692,7 @@ def test_set_fundamental_correlation(self) -> None: }, } runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -722,7 +722,7 @@ def test_set_fundamental_correlation(self) -> None: }, } runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -774,7 +774,7 @@ def test_generate_sessions(self) -> None: }, } runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -840,7 +840,7 @@ def test_generate_sessions(self) -> None: }, } runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -871,7 +871,7 @@ def test_generate_sessions(self) -> None: }, } runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -902,7 +902,7 @@ def test_generate_sessions(self) -> None: }, } runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -926,7 +926,7 @@ def test_generate_sessions(self) -> None: } } runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -969,7 +969,7 @@ def test_generate_sessions(self) -> None: }, } runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -979,7 +979,7 @@ def test_generate_sessions(self) -> None: def test_setup(self) -> None: runner = cast( - SequentialRunner, + self.runner_class, self.test__init__(setting_mode="dict", logger=None, simulator_class=None), ) runner._setup() @@ -1012,7 +1012,7 @@ def test_setup(self) -> None: setting = copy.deepcopy(self.default_setting) del setting["simulation"] runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -1023,7 +1023,7 @@ def test_setup(self) -> None: setting = copy.deepcopy(self.default_setting) del setting["simulation"]["markets"] runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -1034,7 +1034,7 @@ def test_setup(self) -> None: setting = copy.deepcopy(self.default_setting) del setting["simulation"] runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -1045,7 +1045,7 @@ def test_setup(self) -> None: setting = copy.deepcopy(self.default_setting) setting["simulation"]["markets"] = {} runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -1056,7 +1056,7 @@ def test_setup(self) -> None: setting = copy.deepcopy(self.default_setting) setting["simulation"]["markets"] = [10] runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -1067,7 +1067,7 @@ def test_setup(self) -> None: setting = copy.deepcopy(self.default_setting) del setting["simulation"]["agents"] runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -1078,7 +1078,7 @@ def test_setup(self) -> None: setting = copy.deepcopy(self.default_setting) setting["simulation"]["agents"] = {} runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -1089,7 +1089,7 @@ def test_setup(self) -> None: setting = copy.deepcopy(self.default_setting) setting["simulation"]["agents"] = [10] runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -1100,7 +1100,7 @@ def test_setup(self) -> None: setting = copy.deepcopy(self.default_setting) del setting["simulation"]["sessions"] runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -1111,7 +1111,7 @@ def test_setup(self) -> None: setting = copy.deepcopy(self.default_setting) setting["simulation"]["sessions"] = {} runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -1122,7 +1122,7 @@ def test_setup(self) -> None: setting = copy.deepcopy(self.default_setting) setting["simulation"]["sessions"] = [10] runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -1213,7 +1213,7 @@ def test_collect_orders_from_normal_agents(self) -> None: }, } runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -1257,7 +1257,7 @@ def dummy_fn(cls: Agent, markets: List[Market]) -> List[Order]: setting["simulation"]["sessions"][0]["withOrderPlacement"] = False # type: ignore runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -1312,7 +1312,7 @@ def test_handle_orders(self) -> None: }, } runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -1329,7 +1329,7 @@ def test_handle_orders(self) -> None: ) runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -1448,7 +1448,7 @@ def __init__(self, market_id: int = 0, agent_id: int = 0): }, } runner = cast( - SequentialRunner, + self.runner_class, self.test__init__( setting_mode="dict", logger=None, simulator_class=None, setting=setting ), @@ -1633,7 +1633,7 @@ def process_market_step_end_log(self, log: MarketStepEndLog) -> None: logger = DummyLogger() runner = cast( - SequentialRunner, + self.runner_class, self.test__init__(setting_mode="dict", logger=logger, simulator_class=None), ) runner._setup() @@ -1687,7 +1687,7 @@ def process_market_step_end_log(self, log: MarketStepEndLog) -> None: logger = DummyLogger() runner = cast( - SequentialRunner, + self.runner_class, self.test__init__(setting_mode="dict", logger=logger, simulator_class=None), ) runner._setup()