diff --git a/docs/source/user_guide/config.rst b/docs/source/user_guide/config.rst index 6c0b4e6b..4b93e4bf 100644 --- a/docs/source/user_guide/config.rst +++ b/docs/source/user_guide/config.rst @@ -26,7 +26,8 @@ Json config ["MarketName1", "MarketName2", float], # fundamentalVolatility is required in both markets ... ] - } + }, + "numParallel": int (Optional; default 1; only for MultiThreadedRunner), }, "FundamentalPriceShock": { "class": "FundamentalPriceShock", diff --git a/pams/runners/__init__.py b/pams/runners/__init__.py index 71d7498f..5ec622f2 100644 --- a/pams/runners/__init__.py +++ b/pams/runners/__init__.py @@ -1,2 +1,4 @@ +from .agent_parallel import MultiProcessAgentParallelRuner +from .agent_parallel import MultiThreadAgentParallelRuner from .base import Runner from .sequential import SequentialRunner diff --git a/pams/runners/agent_parallel.py b/pams/runners/agent_parallel.py new file mode 100644 index 00000000..ff611bfc --- /dev/null +++ b/pams/runners/agent_parallel.py @@ -0,0 +1,110 @@ +import os +import random +import warnings +from concurrent.futures import ProcessPoolExecutor +from concurrent.futures import ThreadPoolExecutor +from io import TextIOWrapper +from multiprocessing import cpu_count +from typing import Dict +from typing import List +from typing import Optional +from typing import Type +from typing import Union + +from pams.logs.base import Logger +from pams.simulator import Simulator + +from ..order import Cancel +from ..order import Order +from ..session import Session +from .sequential import SequentialRunner + + +class MultiThreadAgentParallelRuner(SequentialRunner): + """Multi Thread Agent Parallel runner class. This is experimental. + + In this runner, only normal agents are parallelized in each steps. + This means that the number of agents that can be parallelized is limited by MAX_NORMAL_ORDERS. + """ + + _parallel_pool_provider: Union[ + Type[ThreadPoolExecutor], Type[ProcessPoolExecutor] + ] = ThreadPoolExecutor + + def __init__( + self, + settings: Union[Dict, TextIOWrapper, os.PathLike, str], + prng: Optional[random.Random] = None, + logger: Optional[Logger] = None, + simulator_class: Type[Simulator] = Simulator, + ): + super().__init__(settings, prng, logger, simulator_class) + warnings.warn( + f"{self.__class__.__name__} is experimental. Future changes may occur disruptively." + ) + self.num_parallel = max(cpu_count() - 1, 1) + + def _setup(self) -> None: + super()._setup() + if "numParallel" in self.settings["simulation"]: + self.num_parallel = self.settings["simulation"]["numParallel"] + max_normal_orders = max( + session.max_normal_orders for session in self.simulator.sessions + ) + if self.num_parallel > max_normal_orders: + warnings.warn( + f"When {self.__class__.__name__} is used, the maximum number of parallel agents" + f" is limited by max_normal_orders ({max_normal_orders}) evne if numParallel" + f" ({self.num_parallel}) is set to a larger value." + ) + self.thread_pool = self._parallel_pool_provider(max_workers=self.num_parallel) + + def _collect_orders_from_normal_agents( + self, session: Session + ) -> List[List[Union[Order, Cancel]]]: + """collect orders from normal_agents. (Internal method) + orders are corrected until the total number of orders reaches max_normal_orders + + Args: + session (Session): session. + + Returns: + List[List[Union[Order, Cancel]]]: orders lists. + """ + agents = self.simulator.normal_frequency_agents + agents = self._prng.sample(agents, len(agents)) + all_orders: List[List[Union[Order, Cancel]]] = [] + # TODO: currently the original impl is used for order counting. + # See more in the SequentialRunner class. + futures = [] + for agent in agents[: session.max_normal_orders]: + future = self.thread_pool.submit( + agent.submit_orders, self.simulator.markets + ) + futures.append((future, agent)) + for future, agent in futures: + orders = future.result() + if len(orders) > 0: + if not session.with_order_placement: + raise AssertionError("currently order is not accepted") + if sum([order.agent_id != agent.agent_id for order in orders]) > 0: + raise ValueError( + "spoofing order is not allowed. please check agent_id in order" + ) + all_orders.append(orders) + return all_orders + + +class MultiProcessAgentParallelRuner(MultiThreadAgentParallelRuner): + """Multi Process Agent Parallel runner class. This is experimental. + + In this runner, only normal agents are parallelized in each steps. + This means that the number of agents that can be parallelized is limited by MAX_NORMAL_ORDERS. + + Note: When you are using MultiProcessAgentParallelRuner, the definition of self-defined classes are saved to a diffenrent fils. + This is because of the limitation of python multiprocessing. + """ + + _parallel_pool_provider: Union[ + Type[ThreadPoolExecutor], Type[ProcessPoolExecutor] + ] = ProcessPoolExecutor diff --git a/tests/pams/runners/dummy.py b/tests/pams/runners/dummy.py new file mode 100644 index 00000000..7cda1fa3 --- /dev/null +++ b/tests/pams/runners/dummy.py @@ -0,0 +1,80 @@ +import time +from typing import List +from typing import Union + +from pams.agents.fcn_agent import FCNAgent +from pams.logs import CancelLog +from pams.logs import ExecutionLog +from pams.logs import Logger +from pams.logs import MarketStepBeginLog +from pams.logs import MarketStepEndLog +from pams.logs import OrderLog +from pams.logs import SessionBeginLog +from pams.logs import SessionEndLog +from pams.logs import SimulationBeginLog +from pams.logs import SimulationEndLog +from pams.market import Market +from pams.order import Cancel +from pams.order import Order + +wait_time = 0.2 # seconds + + +class FCNDelayAgent(FCNAgent): + def submit_orders(self, markets: List[Market]) -> List[Union[Order, Cancel]]: + time.sleep(wait_time) # Simulate a delay + return super().submit_orders(markets) + + +class DummyLogger(Logger): + def __init__(self) -> None: + super().__init__() + self.n_market_step_begin = 0 + self.n_market_end_begin = 0 + + def process_market_step_begin_log(self, log: MarketStepBeginLog) -> None: + self.n_market_step_begin += 1 + + def process_market_step_end_log(self, log: MarketStepEndLog) -> None: + self.n_market_end_begin += 1 + + +class DummyLogger2(Logger): + def __init__(self) -> None: + super().__init__() + self.n_order_log = 0 + self.n_cancel_log = 0 + self.n_execution_log = 0 + self.n_simulation_begin_log = 0 + self.n_simulation_end_log = 0 + self.n_session_begin_log = 0 + self.n_session_end_log = 0 + self.n_market_step_begin = 0 + self.n_market_step_end = 0 + + def process_order_log(self, log: OrderLog) -> None: + self.n_order_log += 1 + + def process_cancel_log(self, log: CancelLog) -> None: + self.n_cancel_log += 1 + + def process_execution_log(self, log: ExecutionLog) -> None: + self.n_execution_log += 1 + + def process_simulation_begin_log(self, log: SimulationBeginLog) -> None: + self.n_simulation_begin_log += 1 + + def process_simulation_end_log(self, log: SimulationEndLog) -> None: + self.n_simulation_end_log += 1 + + def process_session_begin_log(self, log: SessionBeginLog) -> None: + self.n_session_begin_log += 1 + + def process_session_end_log(self, log: SessionEndLog) -> None: + self.n_session_end_log += 1 + + def process_market_step_begin_log(self, log: MarketStepBeginLog) -> None: + self.n_market_step_begin += 1 + + def process_market_step_end_log(self, log: MarketStepEndLog) -> None: + self.n_market_step_end += 1 diff --git a/tests/pams/runners/test_agent_parallel.py b/tests/pams/runners/test_agent_parallel.py new file mode 100644 index 00000000..2f632d41 --- /dev/null +++ b/tests/pams/runners/test_agent_parallel.py @@ -0,0 +1,111 @@ +import copy +import time +from typing import Dict +from typing import Type + +import pytest + +from pams.runners import MultiProcessAgentParallelRuner +from pams.runners import MultiThreadAgentParallelRuner +from pams.runners.sequential import SequentialRunner +from tests.pams.runners.test_sequential import TestSequentialRunner + +from .dummy import FCNDelayAgent +from .dummy import wait_time + + +class TestMultiThreadAgentParallelRuner(TestSequentialRunner): + runner_class: Type[SequentialRunner] = MultiThreadAgentParallelRuner + default_setting: Dict = { + "simulation": { + "markets": ["Market"], + "agents": ["FCNAgents"], + "sessions": [ + { + "sessionName": 0, + "iterationSteps": 5, + "withOrderPlacement": True, + "withOrderExecution": True, + "withPrint": True, + "events": ["FundamentalPriceShock"], + "maxNormalOrders": 3, + } + ], + "numParallel": 3, + }, + "Market": {"class": "Market", "tickSize": 0.00001, "marketPrice": 300.0}, + "FCNAgents": { + "class": "FCNAgent", + "numAgents": 10, + "markets": ["Market"], + "assetVolume": 50, + "cashAmount": 10000, + "fundamentalWeight": {"expon": [1.0]}, + "chartWeight": {"expon": [0.0]}, + "noiseWeight": {"expon": [1.0]}, + "meanReversionTime": {"uniform": [50, 100]}, + "noiseScale": 0.001, + "timeWindowSize": [100, 200], + "orderMargin": [0.0, 0.1], + }, + "FundamentalPriceShock": { + "class": "FundamentalPriceShock", + "target": "Market", + "triggerTime": 0, + "priceChangeRate": -0.1, + "shockTimeLength": 1, + "enabled": True, + }, + } + + def test_parallel_efficiency(self) -> None: + + setting = copy.deepcopy(self.default_setting) + setting["FCNAgents"]["class"] = "FCNDelayAgent" # Use the delayed agent + + runner_class_dummy = self.runner_class + self.runner_class = SequentialRunner # Temporarily set to SequentialRunner + sequential_runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting + ) + self.runner_class = runner_class_dummy + parallel_runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting + ) + + sequential_runner.class_register(cls=FCNDelayAgent) + parallel_runner.class_register(cls=FCNDelayAgent) + start_time = time.time() + sequential_runner.main() + end_time = time.time() + elps_time_sequential = end_time - start_time + start_time = time.time() + parallel_runner.main() + end_time = time.time() + elps_time_parallel = end_time - start_time + overhead_time = elps_time_sequential - wait_time * 15 + assert elps_time_sequential < wait_time * 15 + overhead_time + 1 + assert elps_time_sequential > wait_time * 15 + assert elps_time_parallel < wait_time * 5 + overhead_time + 1 + assert elps_time_parallel > wait_time * 5 + + def test_parallel_thread_warning(self) -> None: + settings = copy.deepcopy(self.default_setting) + settings["simulation"]["numParallel"] = 5 + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=settings + ) + with pytest.warns(UserWarning, match="is set to a larger value."): + runner.main() + + +class TestMultiProcessAgentParallelRuner(TestMultiThreadAgentParallelRuner): + runner_class: Type[SequentialRunner] = MultiProcessAgentParallelRuner + + def test_collect_orders_from_normal_agents(self) -> None: + # return super().test_collect_orders_from_normal_agents() + pass + + def test_collect_orders_from_normal_agents_error_1(self) -> None: + # return super().test_collect_orders_from_normal_agents_error_1() + pass diff --git a/tests/pams/runners/test_sequential.py b/tests/pams/runners/test_sequential.py index f11d2cda..a7f4bfa7 100644 --- a/tests/pams/runners/test_sequential.py +++ b/tests/pams/runners/test_sequential.py @@ -6,7 +6,6 @@ from typing import List from typing import Type from typing import Union -from typing import cast from unittest import mock import pytest @@ -17,23 +16,15 @@ from pams import Market from pams import Order from pams.agents import Agent -from pams.logs import CancelLog -from pams.logs import ExecutionLog -from pams.logs import Logger -from pams.logs import MarketStepBeginLog -from pams.logs import MarketStepEndLog -from pams.logs import OrderLog -from pams.logs import SessionBeginLog -from pams.logs import SessionEndLog -from pams.logs import SimulationBeginLog -from pams.logs import SimulationEndLog -from pams.runners import Runner from pams.runners import SequentialRunner from tests.pams.runners.test_base import TestRunner +from .dummy import DummyLogger +from .dummy import DummyLogger2 + class TestSequentialRunner(TestRunner): - runner_class: Type[Runner] = SequentialRunner + runner_class: Type[SequentialRunner] = SequentialRunner default_setting: Dict = { "simulation": { "markets": ["Market"], @@ -83,7 +74,7 @@ def test__(self) -> None: "CI2002", "config.json", ) - runner = SequentialRunner(settings=config, prng=random.Random(42)) + runner = self.runner_class(settings=config, prng=random.Random(42)) runner._setup() runner.simulator._update_times_on_markets(markets=runner.simulator.markets) start_time = time.time() @@ -92,9 +83,9 @@ def test__(self) -> None: session=runner.simulator.sessions[0] ) end_time = time.time() - time_per_step = (end_time - start_time) / 1000 + time_per_step = (end_time - start_time) / 10000 print("time/step", time_per_step) - assert time_per_step < 0.003 + assert time_per_step < 0.0003 def test_generate_markets(self) -> None: setting = { @@ -107,11 +98,8 @@ def test_generate_markets(self) -> None: "outstandingShares": 2000, }, } - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) with pytest.raises(ValueError): runner._generate_markets(market_type_names=["market"]) @@ -147,11 +135,8 @@ def test_generate_markets(self) -> None: }, "Market": {"extends": "MarketBase"}, } - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) runner._generate_markets(market_type_names=["Market"]) assert len(runner.simulator.markets) == 1 @@ -181,11 +166,8 @@ def test_generate_markets(self) -> None: }, "Market": {"extends": "MarketBase"}, } - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) runner._generate_markets(market_type_names=["Market"]) assert len(runner.simulator.markets) == 10 @@ -218,11 +200,8 @@ def test_generate_markets(self) -> None: }, "Market": {"extends": "MarketBase", "from": 10, "to": 19}, } - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) runner._generate_markets(market_type_names=["Market"]) assert len(runner.simulator.markets) == 10 @@ -256,11 +235,8 @@ def test_generate_markets(self) -> None: "outstandingShares": 2000, }, } - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) with pytest.raises(ValueError): runner._generate_markets(market_type_names=["Market"]) @@ -276,11 +252,8 @@ def test_generate_markets(self) -> None: "outstandingShares": 2000, }, } - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) with pytest.raises(ValueError): runner._generate_markets(market_type_names=["Market"]) @@ -294,11 +267,8 @@ def test_generate_markets(self) -> None: "outstandingShares": 2000, }, } - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) with pytest.raises(ValueError): runner._generate_markets(market_type_names=["Market"]) @@ -313,11 +283,8 @@ def test_generate_markets(self) -> None: "outstandingShares": 2000, }, } - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) with pytest.raises(ValueError): runner._generate_markets(market_type_names=["Market"]) @@ -333,11 +300,8 @@ def test_generate_markets(self) -> None: "fundamentalVolatility": 0.2, }, } - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) runner._generate_markets(market_type_names=["Market"]) assert runner.simulator.fundamentals.prices == {0: [300.0]} @@ -348,11 +312,8 @@ def test_generate_markets(self) -> None: "simulation": {"markets": ["Market"]}, "Market": {"class": "Market", "tickSize": 0.01, "outstandingShares": 2000}, } - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) with pytest.raises(ValueError): runner._generate_markets(market_type_names=["Market"]) @@ -368,11 +329,8 @@ def test_generate_agents(self) -> None: }, "Agent": {"class": "FCNAgent", "markets": ["Market"]}, } - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) runner._generate_markets(market_type_names=["Market"]) with pytest.raises(ValueError): @@ -408,11 +366,8 @@ def test_generate_agents(self) -> None: }, "Agent": {"extends": "AgentBase"}, } - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) runner._generate_markets(market_type_names=["Market"]) runner._generate_agents(agent_type_names=["Agent"]) @@ -437,11 +392,8 @@ def test_generate_agents(self) -> None: }, "Agent": {"markets": ["Market"]}, } - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) runner._generate_markets(market_type_names=["Market"]) with pytest.raises(ValueError): @@ -456,11 +408,8 @@ def test_generate_agents(self) -> None: }, "Agent": {"class": "Market", "markets": ["Market"]}, } - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) runner._generate_markets(market_type_names=["Market"]) with pytest.raises(ValueError): @@ -476,11 +425,8 @@ def test_generate_agents(self) -> None: }, "Agent": {"class": "FCNAgent"}, } - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) runner._generate_markets(market_type_names=["Market"]) with pytest.raises(ValueError): @@ -502,11 +448,8 @@ def test_generate_agents(self) -> None: "markets": ["Market"], }, } - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) runner._generate_markets(market_type_names=["Market"]) with pytest.raises(ValueError): @@ -522,11 +465,8 @@ def test_generate_agents(self) -> None: }, "Agent": {"class": "FCNAgent", "from": 0, "markets": ["Market"]}, } - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) runner._generate_markets(market_type_names=["Market"]) with pytest.raises(ValueError): @@ -542,11 +482,8 @@ def test_generate_agents(self) -> None: }, "Agent": {"class": "FCNAgent", "from": 10, "to": 19, "markets": ["Market"]}, } - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) runner._generate_markets(market_type_names=["Market"]) runner._generate_agents(agent_type_names=["Agent"]) @@ -584,11 +521,8 @@ def test_set_fundamental_correlation(self) -> None: "fundamentalVolatility": 0.1, }, } - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) runner._generate_markets(market_type_names=["Market"]) runner._set_fundamental_correlation() @@ -605,11 +539,8 @@ def test_set_fundamental_correlation(self) -> None: "fundamentalVolatility": 0.1, }, } - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) runner._generate_markets(market_type_names=["Market"]) runner._set_fundamental_correlation() @@ -634,11 +565,8 @@ def test_set_fundamental_correlation(self) -> None: "fundamentalVolatility": 0.1, }, } - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) runner._generate_markets(market_type_names=["Market"]) with pytest.raises(NotImplementedError): @@ -663,11 +591,8 @@ def test_set_fundamental_correlation(self) -> None: "fundamentalVolatility": 0.1, }, } - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) runner._generate_markets(market_type_names=["Market"]) with pytest.raises(ValueError): @@ -691,11 +616,8 @@ def test_set_fundamental_correlation(self) -> None: "outstandingShares": 2000, }, } - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) runner._generate_markets(market_type_names=["Market"]) with pytest.raises(ValueError): @@ -721,11 +643,8 @@ def test_set_fundamental_correlation(self) -> None: "fundamentalVolatility": 0.1, }, } - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) runner._generate_markets(market_type_names=["Market"]) runner._set_fundamental_correlation() @@ -773,11 +692,8 @@ def test_generate_sessions(self) -> None: "enabled": True, }, } - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) runner._generate_sessions() assert len(runner.simulator.sessions) == 2 @@ -839,11 +755,8 @@ def test_generate_sessions(self) -> None: "enabled": True, }, } - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) with pytest.raises(ValueError): runner._generate_sessions() @@ -870,11 +783,8 @@ def test_generate_sessions(self) -> None: "enabled": True, }, } - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) with pytest.raises(ValueError): runner._generate_sessions() @@ -901,11 +811,8 @@ def test_generate_sessions(self) -> None: "enabled": True, }, } - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) with pytest.raises(ValueError): runner._generate_sessions() @@ -925,11 +832,8 @@ def test_generate_sessions(self) -> None: ] } } - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) with pytest.raises(ValueError): runner._generate_sessions() @@ -968,19 +872,15 @@ def test_generate_sessions(self) -> None: "enabled": True, }, } - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) with pytest.raises(ValueError): runner._generate_sessions() def test_setup(self) -> None: - runner = cast( - SequentialRunner, - self.test__init__(setting_mode="dict", logger=None, simulator_class=None), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None ) runner._setup() assert len(runner.simulator.agents) == 10 @@ -1011,121 +911,88 @@ def test_setup(self) -> None: setting = copy.deepcopy(self.default_setting) del setting["simulation"] - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) with pytest.raises(ValueError): runner._setup() setting = copy.deepcopy(self.default_setting) del setting["simulation"]["markets"] - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) with pytest.raises(ValueError): runner._setup() setting = copy.deepcopy(self.default_setting) del setting["simulation"] - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) with pytest.raises(ValueError): runner._setup() setting = copy.deepcopy(self.default_setting) setting["simulation"]["markets"] = {} - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) with pytest.raises(ValueError): runner._setup() setting = copy.deepcopy(self.default_setting) setting["simulation"]["markets"] = [10] - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) with pytest.raises(ValueError): runner._setup() setting = copy.deepcopy(self.default_setting) del setting["simulation"]["agents"] - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) with pytest.raises(ValueError): runner._setup() setting = copy.deepcopy(self.default_setting) setting["simulation"]["agents"] = {} - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) with pytest.raises(ValueError): runner._setup() setting = copy.deepcopy(self.default_setting) setting["simulation"]["agents"] = [10] - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) with pytest.raises(ValueError): runner._setup() setting = copy.deepcopy(self.default_setting) del setting["simulation"]["sessions"] - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) with pytest.raises(ValueError): runner._setup() setting = copy.deepcopy(self.default_setting) setting["simulation"]["sessions"] = {} - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) with pytest.raises(ValueError): runner._setup() setting = copy.deepcopy(self.default_setting) setting["simulation"]["sessions"] = [10] - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) with pytest.raises(ValueError): runner._setup() @@ -1212,11 +1079,8 @@ def test_collect_orders_from_normal_agents(self) -> None: "orderThresholdPrice": 1.0, }, } - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) runner._setup() @@ -1256,11 +1120,8 @@ def dummy_fn(cls: Agent, markets: List[Market]) -> List[Order]: ) setting["simulation"]["sessions"][0]["withOrderPlacement"] = False # type: ignore - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) runner._setup() dummy_order = Order( @@ -1311,11 +1172,8 @@ def test_handle_orders(self) -> None: "orderMargin": [0.0, 0.1], }, } - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) runner._setup() runner.simulator.markets[0]._update_time(next_fundamental_price=200.0) @@ -1328,11 +1186,8 @@ def test_handle_orders(self) -> None: session=runner.simulator.sessions[0], local_orders=local_orders ) - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) runner._setup() runner.simulator.markets[0]._update_time(next_fundamental_price=200.0) @@ -1447,11 +1302,8 @@ def __init__(self, market_id: int = 0, agent_id: int = 0): "orderThresholdPrice": 1.0, }, } - runner = cast( - SequentialRunner, - self.test__init__( - setting_mode="dict", logger=None, simulator_class=None, setting=setting - ), + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting ) runner._setup() runner.simulator.markets[0]._is_running = True @@ -1619,22 +1471,10 @@ def dummy_fn3(cls: Agent, markets: List[Market]) -> List[Order]: ) def test_iterate_market_update(self) -> None: - class DummyLogger(Logger): - def __init__(self) -> None: - super().__init__() - self.n_market_step_begin = 0 - self.n_market_end_begin = 0 - - def process_market_step_begin_log(self, log: MarketStepBeginLog) -> None: - self.n_market_step_begin += 1 - - def process_market_step_end_log(self, log: MarketStepEndLog) -> None: - self.n_market_end_begin += 1 logger = DummyLogger() - runner = cast( - SequentialRunner, - self.test__init__(setting_mode="dict", logger=logger, simulator_class=None), + runner = self.test__init__( + setting_mode="dict", logger=logger, simulator_class=None ) runner._setup() runner.simulator._update_times_on_markets(runner.simulator.markets) @@ -1645,50 +1485,9 @@ def process_market_step_end_log(self, log: MarketStepEndLog) -> None: assert logger.n_market_end_begin == runner.simulator.sessions[0].iteration_steps def test_run(self) -> None: - class DummyLogger(Logger): - def __init__(self) -> None: - super().__init__() - self.n_order_log = 0 - self.n_cancel_log = 0 - self.n_execution_log = 0 - self.n_simulation_begin_log = 0 - self.n_simulation_end_log = 0 - self.n_session_begin_log = 0 - self.n_session_end_log = 0 - self.n_market_step_begin = 0 - self.n_market_step_end = 0 - - def process_order_log(self, log: OrderLog) -> None: - self.n_order_log += 1 - - def process_cancel_log(self, log: CancelLog) -> None: - self.n_cancel_log += 1 - - def process_execution_log(self, log: ExecutionLog) -> None: - self.n_execution_log += 1 - - def process_simulation_begin_log(self, log: SimulationBeginLog) -> None: - self.n_simulation_begin_log += 1 - - def process_simulation_end_log(self, log: SimulationEndLog) -> None: - self.n_simulation_end_log += 1 - - def process_session_begin_log(self, log: SessionBeginLog) -> None: - self.n_session_begin_log += 1 - - def process_session_end_log(self, log: SessionEndLog) -> None: - self.n_session_end_log += 1 - - def process_market_step_begin_log(self, log: MarketStepBeginLog) -> None: - self.n_market_step_begin += 1 - - def process_market_step_end_log(self, log: MarketStepEndLog) -> None: - self.n_market_step_end += 1 - - logger = DummyLogger() - runner = cast( - SequentialRunner, - self.test__init__(setting_mode="dict", logger=logger, simulator_class=None), + logger = DummyLogger2() + runner = self.test__init__( + setting_mode="dict", logger=logger, simulator_class=None ) runner._setup() runner._run() @@ -1702,3 +1501,49 @@ def process_market_step_end_log(self, log: MarketStepEndLog) -> None: assert logger.n_market_step_end == sum( [session.iteration_steps for session in runner.simulator.sessions] ) + + def test_collect_orders_from_normal_agents_error_1(self) -> None: + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None + ) + runner._setup() + + dummy_order = Order( + agent_id=100, + market_id=2, + is_buy=True, + kind=LIMIT_ORDER, + volume=1, + price=300.0, + ) + + with mock.patch( + "pams.agents.fcn_agent.FCNAgent.submit_orders", return_value=[dummy_order] + ): + with pytest.raises(ValueError): + _ = runner._collect_orders_from_normal_agents( + session=runner.simulator.sessions[0] + ) + + setting = copy.deepcopy(self.default_setting) + setting["simulation"]["sessions"][0]["withOrderPlacement"] = False # type: ignore + runner = self.test__init__( + setting_mode="dict", logger=None, simulator_class=None, setting=setting + ) + runner._setup() + dummy_order = Order( + agent_id=100, + market_id=2, + is_buy=True, + kind=LIMIT_ORDER, + volume=1, + price=300.0, + ) + + with mock.patch( + "pams.agents.fcn_agent.FCNAgent.submit_orders", return_value=[dummy_order] + ): + with pytest.raises(AssertionError): + _ = runner._collect_orders_from_normal_agents( + session=runner.simulator.sessions[0] + )