Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP #109

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
3 changes: 2 additions & 1 deletion docs/source/user_guide/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ Json config
["MarketName1", "MarketName2", float], # fundamentalVolatility is required in both markets
...
]
}
},
"numParallel": int (Optional; default 1; only for MultiThreadedRunner),
},
"FundamentalPriceShock": {
"class": "FundamentalPriceShock",
Expand Down
2 changes: 2 additions & 0 deletions pams/runners/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
from .agent_parallel import MultiProcessAgentParallelRuner
from .agent_parallel import MultiThreadAgentParallelRuner
from .base import Runner
from .sequential import SequentialRunner
110 changes: 110 additions & 0 deletions pams/runners/agent_parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import os
import random
import warnings
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
from io import TextIOWrapper
from multiprocessing import cpu_count
from typing import Dict
from typing import List
from typing import Optional
from typing import Type
from typing import Union

from pams.logs.base import Logger
from pams.simulator import Simulator

from ..order import Cancel
from ..order import Order
from ..session import Session
from .sequential import SequentialRunner


class MultiThreadAgentParallelRuner(SequentialRunner):
"""Multi Thread Agent Parallel runner class. This is experimental.

In this runner, only normal agents are parallelized in each steps.
This means that the number of agents that can be parallelized is limited by MAX_NORMAL_ORDERS.
"""

_parallel_pool_provider: Union[
Type[ThreadPoolExecutor], Type[ProcessPoolExecutor]
] = ThreadPoolExecutor

def __init__(
self,
settings: Union[Dict, TextIOWrapper, os.PathLike, str],
prng: Optional[random.Random] = None,
logger: Optional[Logger] = None,
simulator_class: Type[Simulator] = Simulator,
):
super().__init__(settings, prng, logger, simulator_class)
warnings.warn(
f"{self.__class__.__name__} is experimental. Future changes may occur disruptively."
)
self.num_parallel = max(cpu_count() - 1, 1)

def _setup(self) -> None:
super()._setup()
if "numParallel" in self.settings["simulation"]:
self.num_parallel = self.settings["simulation"]["numParallel"]
max_normal_orders = max(
session.max_normal_orders for session in self.simulator.sessions
)
if self.num_parallel > max_normal_orders:
warnings.warn(
f"When {self.__class__.__name__} is used, the maximum number of parallel agents"
f" is limited by max_normal_orders ({max_normal_orders}) evne if numParallel"
f" ({self.num_parallel}) is set to a larger value."
)
self.thread_pool = self._parallel_pool_provider(max_workers=self.num_parallel)

def _collect_orders_from_normal_agents(
self, session: Session
) -> List[List[Union[Order, Cancel]]]:
"""collect orders from normal_agents. (Internal method)
orders are corrected until the total number of orders reaches max_normal_orders

Args:
session (Session): session.

Returns:
List[List[Union[Order, Cancel]]]: orders lists.
"""
agents = self.simulator.normal_frequency_agents
agents = self._prng.sample(agents, len(agents))
all_orders: List[List[Union[Order, Cancel]]] = []
# TODO: currently the original impl is used for order counting.
# See more in the SequentialRunner class.
futures = []
for agent in agents[: session.max_normal_orders]:
future = self.thread_pool.submit(
agent.submit_orders, self.simulator.markets
)
futures.append((future, agent))
for future, agent in futures:
orders = future.result()
if len(orders) > 0:
if not session.with_order_placement:
raise AssertionError("currently order is not accepted")
if sum([order.agent_id != agent.agent_id for order in orders]) > 0:
raise ValueError(
"spoofing order is not allowed. please check agent_id in order"
)
all_orders.append(orders)
return all_orders


class MultiProcessAgentParallelRuner(MultiThreadAgentParallelRuner):
"""Multi Process Agent Parallel runner class. This is experimental.

In this runner, only normal agents are parallelized in each steps.
This means that the number of agents that can be parallelized is limited by MAX_NORMAL_ORDERS.

Note: When you are using MultiProcessAgentParallelRuner, the definition of self-defined classes are saved to a diffenrent fils.
This is because of the limitation of python multiprocessing.
"""

_parallel_pool_provider: Union[
Type[ThreadPoolExecutor], Type[ProcessPoolExecutor]
] = ProcessPoolExecutor
80 changes: 80 additions & 0 deletions tests/pams/runners/dummy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import time
from typing import List
from typing import Union

from pams.agents.fcn_agent import FCNAgent
from pams.logs import CancelLog
from pams.logs import ExecutionLog
from pams.logs import Logger
from pams.logs import MarketStepBeginLog
from pams.logs import MarketStepEndLog
from pams.logs import OrderLog
from pams.logs import SessionBeginLog
from pams.logs import SessionEndLog
from pams.logs import SimulationBeginLog
from pams.logs import SimulationEndLog
from pams.market import Market
from pams.order import Cancel
from pams.order import Order

wait_time = 0.2 # seconds


class FCNDelayAgent(FCNAgent):
def submit_orders(self, markets: List[Market]) -> List[Union[Order, Cancel]]:
time.sleep(wait_time) # Simulate a delay
return super().submit_orders(markets)


class DummyLogger(Logger):
def __init__(self) -> None:
super().__init__()
self.n_market_step_begin = 0
self.n_market_end_begin = 0

def process_market_step_begin_log(self, log: MarketStepBeginLog) -> None:
self.n_market_step_begin += 1

def process_market_step_end_log(self, log: MarketStepEndLog) -> None:
self.n_market_end_begin += 1


class DummyLogger2(Logger):
def __init__(self) -> None:
super().__init__()
self.n_order_log = 0
self.n_cancel_log = 0
self.n_execution_log = 0
self.n_simulation_begin_log = 0
self.n_simulation_end_log = 0
self.n_session_begin_log = 0
self.n_session_end_log = 0
self.n_market_step_begin = 0
self.n_market_step_end = 0

def process_order_log(self, log: OrderLog) -> None:
self.n_order_log += 1

def process_cancel_log(self, log: CancelLog) -> None:
self.n_cancel_log += 1

def process_execution_log(self, log: ExecutionLog) -> None:
self.n_execution_log += 1

def process_simulation_begin_log(self, log: SimulationBeginLog) -> None:
self.n_simulation_begin_log += 1

def process_simulation_end_log(self, log: SimulationEndLog) -> None:
self.n_simulation_end_log += 1

def process_session_begin_log(self, log: SessionBeginLog) -> None:
self.n_session_begin_log += 1

def process_session_end_log(self, log: SessionEndLog) -> None:
self.n_session_end_log += 1

def process_market_step_begin_log(self, log: MarketStepBeginLog) -> None:
self.n_market_step_begin += 1

def process_market_step_end_log(self, log: MarketStepEndLog) -> None:
self.n_market_step_end += 1
111 changes: 111 additions & 0 deletions tests/pams/runners/test_agent_parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import copy
import time
from typing import Dict
from typing import Type

import pytest

from pams.runners import MultiProcessAgentParallelRuner
from pams.runners import MultiThreadAgentParallelRuner
from pams.runners.sequential import SequentialRunner
from tests.pams.runners.test_sequential import TestSequentialRunner

from .dummy import FCNDelayAgent
from .dummy import wait_time


class TestMultiThreadAgentParallelRuner(TestSequentialRunner):
runner_class: Type[SequentialRunner] = MultiThreadAgentParallelRuner
default_setting: Dict = {
"simulation": {
"markets": ["Market"],
"agents": ["FCNAgents"],
"sessions": [
{
"sessionName": 0,
"iterationSteps": 5,
"withOrderPlacement": True,
"withOrderExecution": True,
"withPrint": True,
"events": ["FundamentalPriceShock"],
"maxNormalOrders": 3,
}
],
"numParallel": 3,
},
"Market": {"class": "Market", "tickSize": 0.00001, "marketPrice": 300.0},
"FCNAgents": {
"class": "FCNAgent",
"numAgents": 10,
"markets": ["Market"],
"assetVolume": 50,
"cashAmount": 10000,
"fundamentalWeight": {"expon": [1.0]},
"chartWeight": {"expon": [0.0]},
"noiseWeight": {"expon": [1.0]},
"meanReversionTime": {"uniform": [50, 100]},
"noiseScale": 0.001,
"timeWindowSize": [100, 200],
"orderMargin": [0.0, 0.1],
},
"FundamentalPriceShock": {
"class": "FundamentalPriceShock",
"target": "Market",
"triggerTime": 0,
"priceChangeRate": -0.1,
"shockTimeLength": 1,
"enabled": True,
},
}

def test_parallel_efficiency(self) -> None:

setting = copy.deepcopy(self.default_setting)
setting["FCNAgents"]["class"] = "FCNDelayAgent" # Use the delayed agent

runner_class_dummy = self.runner_class
self.runner_class = SequentialRunner # Temporarily set to SequentialRunner
sequential_runner = self.test__init__(
setting_mode="dict", logger=None, simulator_class=None, setting=setting
)
self.runner_class = runner_class_dummy
parallel_runner = self.test__init__(
setting_mode="dict", logger=None, simulator_class=None, setting=setting
)

sequential_runner.class_register(cls=FCNDelayAgent)
parallel_runner.class_register(cls=FCNDelayAgent)
start_time = time.time()
sequential_runner.main()
end_time = time.time()
elps_time_sequential = end_time - start_time
start_time = time.time()
parallel_runner.main()
end_time = time.time()
elps_time_parallel = end_time - start_time
overhead_time = elps_time_sequential - wait_time * 15
assert elps_time_sequential < wait_time * 15 + overhead_time + 1
assert elps_time_sequential > wait_time * 15
assert elps_time_parallel < wait_time * 5 + overhead_time + 1
assert elps_time_parallel > wait_time * 5

def test_parallel_thread_warning(self) -> None:
settings = copy.deepcopy(self.default_setting)
settings["simulation"]["numParallel"] = 5
runner = self.test__init__(
setting_mode="dict", logger=None, simulator_class=None, setting=settings
)
with pytest.warns(UserWarning, match="is set to a larger value."):
runner.main()


class TestMultiProcessAgentParallelRuner(TestMultiThreadAgentParallelRuner):
runner_class: Type[SequentialRunner] = MultiProcessAgentParallelRuner

def test_collect_orders_from_normal_agents(self) -> None:
# return super().test_collect_orders_from_normal_agents()
pass

def test_collect_orders_from_normal_agents_error_1(self) -> None:
# return super().test_collect_orders_from_normal_agents_error_1()
pass
Loading
Loading