Skip to content

Commit

Permalink
Add row_count test + refactors on the final results processing + test…
Browse files Browse the repository at this point in the history
… parametrizations (#336)

* add tests for checking keys

* add tests

* fix issues #335 & #332

* parametrize test_runs

* add more tests

* fix subset ordering
  • Loading branch information
danlessa authored Dec 21, 2023
1 parent ebc921d commit 8948757
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 100 deletions.
2 changes: 1 addition & 1 deletion cadCAD/configuration/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def ep_decorator(f, y, var_dict, sub_step, sL, s, _input, **kwargs):
else:
return y, s[y]

return {es: ep_decorator(f, es) for es, f in ep.items()}
return {es: ep_decorator(f, es) for es, f in ep.items()} # type: ignore


def trigger_condition(s, pre_conditions, cond_opp):
Expand Down
46 changes: 39 additions & 7 deletions cadCAD/engine/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from time import time
from typing import Callable, Dict, List, Any, Tuple, Union
from typing import Callable, Dict, List, Any, Tuple, Union, Sequence, Mapping
from tqdm.auto import tqdm

from cadCAD.utils import flatten
Expand Down Expand Up @@ -147,18 +147,50 @@ def get_final_results(simulations: List[StateHistory],
eps,
sessions: List[SessionDict],
remote_threshold: int):

# if list of lists of lists of dicts: do flatten
# if list of dicts: do not flatetn
# else raise error


init: bool = isinstance(simulations, Sequence)
failed_1 = False
failed_2 = False

try:
init: bool = isinstance(simulations, Sequence)
dont_flatten = init & isinstance(simulations[0], Mapping)
do_flatten = not dont_flatten
except:
failed_1 = True
do_flatten = True

try:
do_flatten = init & isinstance(simulations[0], Sequence)
do_flatten &= isinstance(simulations[0][0], Sequence)
do_flatten &= isinstance(simulations[0][0][0], Mapping)
except:
failed_2 = True
do_flatten = False

if failed_1 and failed_2:
raise ValueError('Invalid simulation results (Executor output is not list[dict] or list[list[list[dict]]])')


flat_timesteps, tensor_fields = [], []
for sim_result, psu, ep in tqdm(list(zip(simulations, psus, eps)),
total=len(simulations),
desc='Flattening results'):
flat_timesteps.append(flatten(sim_result))
if do_flatten:
flat_timesteps.append(flatten(sim_result))
tensor_fields.append(create_tensor_field(psu, ep))

if do_flatten:
flat_simulations = flatten(flat_timesteps)
else:
flat_simulations = simulations

flat_simulations = flatten(flat_timesteps)
if config_amt == 1:
return simulations, tensor_fields, sessions
elif config_amt > 1:
return flat_simulations, tensor_fields, sessions
return flat_simulations, tensor_fields, sessions

final_result = None
original_N = len(configs_as_dicts(self.configs))
Expand Down
61 changes: 29 additions & 32 deletions cadCAD/engine/execution.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Callable, Dict, List, Any, Tuple
from typing import Callable, Dict, List, Any, Tuple, Sequence
from pathos.multiprocessing import ProcessPool # type: ignore
from collections import Counter
from cadCAD.types import *
Expand All @@ -11,41 +11,38 @@


def single_proc_exec(
simulation_execs: List[ExecutorFunction],
var_dict_list: List[Parameters],
states_lists: List[StateHistory],
configs_structs: List[StateUpdateBlocks],
env_processes_list: List[EnvProcesses],
Ts: List[TimeSeq],
SimIDs: List[SimulationID],
Ns: List[Run],
ExpIDs: List[int],
SubsetIDs: List[SubsetID],
SubsetWindows: List[SubsetWindow],
configured_n: List[N_Runs],
simulation_execs: Sequence[ExecutorFunction],
var_dict_list: Union[Sequence[Parameters], Parameters],
states_lists: Sequence[StateHistory],
configs_structs: Sequence[StateUpdateBlocks],
env_processes_list: Sequence[EnvProcesses],
Ts: Sequence[TimeSeq],
SimIDs: Sequence[SimulationID],
Ns: Sequence[Run],
ExpIDs: Sequence[int],
SubsetIDs: Sequence[SubsetID],
SubsetWindows: Sequence[SubsetWindow],
configured_n: Sequence[N_Runs],
additional_objs=None
):
) -> List:


# HACK for making it run with N_Runs=1
if type(var_dict_list) == list:
var_dict_list = var_dict_list[0]
if not isinstance(var_dict_list, Sequence):
var_dict_list = list([var_dict_list])

print(f'Execution Mode: single_threaded')
raw_params: List[List] = [
raw_params = (
simulation_execs, states_lists, configs_structs, env_processes_list,
Ts, SimIDs, Ns, SubsetIDs, SubsetWindows
]
simulation_exec, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window = list(
map(lambda x: x.pop(), raw_params)
)
result = simulation_exec(
var_dict_list, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n, additional_objs
)
return flatten(result)




Ts, SimIDs, Ns, SubsetIDs, SubsetWindows, var_dict_list)

results: List = []
print(f'Execution Mode: single_threaded')
for raw_param in zip(*raw_params):
simulation_exec, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, var_dict = raw_param
result = simulation_exec(
var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n, additional_objs
)
results.append(flatten(result))
return flatten(results)

def parallelize_simulations(
simulation_execs: List[ExecutorFunction],
Expand Down
8 changes: 4 additions & 4 deletions cadCAD/types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TypedDict, Callable, Union, Dict, List, Tuple, Iterator
from typing import TypedDict, Callable, Union, Dict, List, Tuple, Iterable
from collections import deque

State = Dict[str, object]
Expand All @@ -20,18 +20,18 @@ class StateUpdateBlock(TypedDict):
StateUpdateBlocks = List[StateUpdateBlock]

class ConfigurationDict(TypedDict):
T: Iterator # Generator for the timestep variable
T: Iterable # Generator for the timestep variable
N: int # Number of MC Runs
M: Union[Parameters, SweepableParameters] # Parameters / List of Parameter to Sweep

TargetValue = object
EnvProcess: Callable[[State, SweepableParameters, TargetValue], TargetValue]
EnvProcesses = Dict[str, Callable]
TimeSeq = Iterator
TimeSeq = Iterable
SimulationID = int
Run = int
SubsetID = int
SubsetWindow = Iterator
SubsetWindow = Iterable
N_Runs = int

ExecutorFunction = Callable[[Parameters, StateHistory, StateUpdateBlocks, EnvProcesses, TimeSeq, SimulationID, Run, SubsetID, SubsetWindow, N_Runs], object]
Expand Down
67 changes: 67 additions & 0 deletions testing/test_row_count.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from cadCAD.configuration import Experiment
from cadCAD.configuration.utils import config_sim
from cadCAD.engine import Executor, ExecutionContext, ExecutionMode
import pytest


CONFIG_SIGNATURES_TO_TEST = [(3, 3, 3, 3, 3), (1, 3, 3, 3, 3),
(3, 1, 3, 3, 3), (1, 1, 3, 3, 3),
(3, 3, 1, 3, 3), (1, 3, 1, 3, 3), (1, 1, 1, 3, 3)]

def run_experiment(exp: Experiment, mode: str):
exec_context = ExecutionContext(mode)
executor = Executor(exec_context=exec_context, configs=exp.configs)
(records, tensor_field, _) = executor.execute()
return records


def create_experiments(N_simulations=3, N_sweeps=3, N_runs=3, N_timesteps=3, N_substeps=3) -> Experiment:

INITIAL_STATE = {'varA': None}
PSUBs = [{'policies': {}, 'variables': {}}] * N_substeps
params = {'A': [None] * N_sweeps,
'B': [None]}

SIM_CONFIG = config_sim(
{
"N": N_runs,
"T": range(N_timesteps),
"M": params, # Optional
}
)

exp = Experiment()
for i_sim in range(N_simulations):
exp.append_model(
sim_configs=SIM_CONFIG,
initial_state=INITIAL_STATE,
partial_state_update_blocks=PSUBs
)
return exp


def expected_rows(N_simulations, N_sweeps, N_runs, N_timesteps, N_substeps) -> int:
return N_simulations * N_sweeps * N_runs * (N_timesteps * N_substeps + 1)



@pytest.mark.parametrize("N_sim,N_sw,N_r,N_t,N_s", CONFIG_SIGNATURES_TO_TEST)
def test_row_count_single(N_sim, N_sw, N_r, N_t, N_s):
args = (N_sim, N_sw, N_r, N_t, N_s)
assert len(run_experiment(create_experiments(*args), 'single_proc')) == expected_rows(*args)


@pytest.mark.parametrize("N_sim,N_sw,N_r,N_t,N_s", CONFIG_SIGNATURES_TO_TEST)
def test_row_count_multi(N_sim, N_sw, N_r, N_t, N_s):
args = (N_sim, N_sw, N_r, N_t, N_s)

if N_sim == 1 and N_sw == 1 and N_r == 1:
with pytest.raises(ValueError) as e_info:
assert len(run_experiment(create_experiments(*args), 'multi_proc')) == expected_rows(*args)
else:
assert len(run_experiment(create_experiments(*args), 'multi_proc')) == expected_rows(*args)

@pytest.mark.parametrize("N_sim,N_sw,N_r,N_t,N_s", CONFIG_SIGNATURES_TO_TEST)
def test_row_count_local(N_sim, N_sw, N_r, N_t, N_s):
args = (N_sim, N_sw, N_r, N_t, N_s)
assert len(run_experiment(create_experiments(*args), 'local_proc')) == expected_rows(*args)
Loading

0 comments on commit 8948757

Please sign in to comment.