Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
danlessa committed Dec 21, 2023
1 parent 3ac3194 commit d7e38cb
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 41 deletions.
46 changes: 39 additions & 7 deletions cadCAD/engine/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from time import time
from typing import Callable, Dict, List, Any, Tuple, Union
from typing import Callable, Dict, List, Any, Tuple, Union, Sequence, Mapping
from tqdm.auto import tqdm

from cadCAD.utils import flatten
Expand Down Expand Up @@ -147,18 +147,50 @@ def get_final_results(simulations: List[StateHistory],
eps,
sessions: List[SessionDict],
remote_threshold: int):

# if list of lists of lists of dicts: do flatten
# if list of dicts: do not flatetn
# else raise error


init: bool = isinstance(simulations, Sequence)
failed_1 = False
failed_2 = False

try:
init: bool = isinstance(simulations, Sequence)
dont_flatten = init & isinstance(simulations[0], Mapping)
do_flatten = not dont_flatten
except:
failed_1 = True
do_flatten = True

try:
do_flatten = init & isinstance(simulations[0], Sequence)
do_flatten &= isinstance(simulations[0][0], Sequence)
do_flatten &= isinstance(simulations[0][0][0], Mapping)
except:
failed_2 = True
do_flatten = False

if failed_1 and failed_2:
raise ValueError('Invalid simulation results (Executor output is not list[dict] or list[list[list[dict]]])')


flat_timesteps, tensor_fields = [], []
for sim_result, psu, ep in tqdm(list(zip(simulations, psus, eps)),
total=len(simulations),
desc='Flattening results'):
flat_timesteps.append(flatten(sim_result))
if do_flatten:
flat_timesteps.append(flatten(sim_result))
tensor_fields.append(create_tensor_field(psu, ep))

if do_flatten:
flat_simulations = flatten(flat_timesteps)
else:
flat_simulations = simulations

flat_simulations = flatten(flat_timesteps)
if config_amt == 1:
return simulations, tensor_fields, sessions
elif config_amt > 1:
return flat_simulations, tensor_fields, sessions
return flat_simulations, tensor_fields, sessions

final_result = None
original_N = len(configs_as_dicts(self.configs))
Expand Down
63 changes: 31 additions & 32 deletions cadCAD/engine/execution.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Callable, Dict, List, Any, Tuple
from typing import Callable, Dict, List, Any, Tuple, Sequence
from pathos.multiprocessing import ProcessPool # type: ignore
from collections import Counter
from cadCAD.types import *
Expand All @@ -11,41 +11,40 @@


def single_proc_exec(
simulation_execs: List[ExecutorFunction],
var_dict_list: List[Parameters],
states_lists: List[StateHistory],
configs_structs: List[StateUpdateBlocks],
env_processes_list: List[EnvProcesses],
Ts: List[TimeSeq],
SimIDs: List[SimulationID],
Ns: List[Run],
ExpIDs: List[int],
SubsetIDs: List[SubsetID],
SubsetWindows: List[SubsetWindow],
configured_n: List[N_Runs],
simulation_execs: Sequence[ExecutorFunction],
var_dict_list: Union[Sequence[Parameters], Parameters],
states_lists: Sequence[StateHistory],
configs_structs: Sequence[StateUpdateBlocks],
env_processes_list: Sequence[EnvProcesses],
Ts: Sequence[TimeSeq],
SimIDs: Sequence[SimulationID],
Ns: Sequence[Run],
ExpIDs: Sequence[int],
SubsetIDs: Sequence[SubsetID],
SubsetWindows: Sequence[SubsetWindow],
configured_n: Sequence[N_Runs],
additional_objs=None
):
) -> List:

# HACK for making it run with N_Runs=1
if type(var_dict_list) == list:
var_dict_list = var_dict_list[0]

print(f'Execution Mode: single_threaded')
raw_params: List[List] = [
simulation_execs, states_lists, configs_structs, env_processes_list,
Ts, SimIDs, Ns, SubsetIDs, SubsetWindows
]
simulation_exec, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window = list(
map(lambda x: x.pop(), raw_params)
)
result = simulation_exec(
var_dict_list, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n, additional_objs
)
return flatten(result)



if not isinstance(var_dict_list, Sequence):
var_dict_list = list([var_dict_list])

results: List = []
for var_dict in var_dict_list:
print(f'Execution Mode: single_threaded')
raw_params: List[List] = [
simulation_execs, states_lists, configs_structs, env_processes_list,
Ts, SimIDs, Ns, SubsetIDs, SubsetWindows
]
simulation_exec, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window = list(
map(lambda x: x.pop(), raw_params)
)
result = simulation_exec(
var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n, additional_objs
)
results.append(flatten(result))
return flatten(results)

def parallelize_simulations(
simulation_execs: List[ExecutorFunction],
Expand Down
2 changes: 1 addition & 1 deletion testing/test_row_count.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def test_row_count_single(N_sim, N_sw, N_r, N_t, N_s):
assert len(run_experiment(create_experiments(*args), 'single_proc')) == expected_rows(*args)


@pytest.mark.parametrize("N_sim,N_sw,N_r,N_t,N_s", CONFIG_SIGNATURES_TO_TEST)
@pytest.mark.parametrize("N_sim,N_sw,N_r,N_t,N_s", CONFIG_SIGNATURES_TO_TEST[:-1])
def test_row_count_multi(N_sim, N_sw, N_r, N_t, N_s):
args = (N_sim, N_sw, N_r, N_t, N_s)
assert len(run_experiment(create_experiments(*args), 'multi_proc')) == expected_rows(*args)
Expand Down
2 changes: 1 addition & 1 deletion testing/tests/cadCAD_memory_address.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"memory_address": "0x111d22f20"}
{"memory_address": "0x10fc14ef0"}

0 comments on commit d7e38cb

Please sign in to comment.