Skip to content

Commit

Permalink
fix: this refactors execution to properly use the proces pool
Browse files Browse the repository at this point in the history
Prior to this change the process pool was created and destroyed
for every configuration, this would cause the cpu/memory to
thrash and improperly allocate task to avaialble cpu, resulting
in sometimes 25% utilization of available cpu resources

The change corrects this, as well as tackles memory problems,
by writing temporary results to disk and then reading them back
at the end of the simulation.

This is non configurable in this commit, and can also result in
loading too much memory, as it does not include the ability to
progressively or lazy load data into the final dataframe to
complete the simulation.

This commit is a WIP fixes #351
  • Loading branch information
zcstarr committed Mar 15, 2024
1 parent 5a04e98 commit 850c280
Showing 1 changed file with 59 additions and 48 deletions.
107 changes: 59 additions & 48 deletions cadCAD/engine/execution.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import os
from typing import Callable, Dict, List, Any, Tuple, Sequence
from pathos.multiprocessing import ProcessPool # type: ignore
from pathos.multiprocessing import ProcessPool # type: ignore
from collections import Counter
from cadCAD.types import *
from cadCAD.utils import flatten
import tempfile
import pickle

VarDictType = Dict[str, List[object]]
StatesListsType = List[dict[str, object]]
Expand All @@ -25,15 +28,14 @@ def single_proc_exec(
configured_n: Sequence[N_Runs],
additional_objs=None
) -> List:



if not isinstance(var_dict_list, Sequence):
var_dict_list = list([var_dict_list])

raw_params = (
simulation_execs, states_lists, configs_structs, env_processes_list,
Ts, SimIDs, Ns, SubsetIDs, SubsetWindows, var_dict_list)

results: List = []
print(f'Execution Mode: single_threaded')
for raw_param in zip(*raw_params):
Expand All @@ -44,6 +46,46 @@ def single_proc_exec(
results.append(flatten(result))
return flatten(results)


def process_executor(params):
if len_configs_structs > 1:
with ProcessPool(processes=len_configs_structs) as pp:
results = pp.map(
lambda t: t[0](t[1], t[2], t[3], t[4], t[5],
t[6], t[7], t[8], t[9], configured_n), params
)
else:
t = params[0]
results = t[0](t[1], t[2], t[3], t[4], t[5], t[6],
t[7], t[8], t[9], configured_n)
return results


def process_executor(params):
simulation_exec, var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n = params

result = [simulation_exec(
var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n
)]
temp_file = tempfile.NamedTemporaryFile(delete=False)
with open(temp_file.name, 'wb') as f: # Note 'wb' for binary writing mode
pickle.dump(result, f)
return temp_file.name


def file_handler(filenames: List[str]) -> List:
combined_results = []
for file_name in filenames:
with open(file_name, 'rb') as f: # Note 'rb' for binary reading mode
result = pickle.load(f)
combined_results.append(result)
result = None
f.close()
os.remove(file_name) # Clean up temporary file
del result # Delete the result from memory after processing
return combined_results


def parallelize_simulations(
simulation_execs: List[ExecutorFunction],
var_dict_list: List[Parameters],
Expand All @@ -61,50 +103,19 @@ def parallelize_simulations(
):

print(f'Execution Mode: parallelized')
params = list(
zip(
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list,
Ts, SimIDs, Ns, SubsetIDs, SubsetWindows
)
)

len_configs_structs = len(configs_structs)
params = [
(sim_exec, var_dict, states_list, config, env_processes,
T, sim_id, N, subset_id, subset_window, configured_n)
for sim_exec, var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window in
zip(simulation_execs, var_dict_list, states_lists, configs_structs,
env_processes_list, Ts, SimIDs, Ns, SubsetIDs, SubsetWindows)
]

unique_runs = Counter(SimIDs)
sim_count = max(unique_runs.values())
highest_divisor = int(len_configs_structs / sim_count)
with ProcessPool(maxtasksperchild=1) as pool:
temp_files = pool.map(process_executor, params)

new_configs_structs, new_params = [], []
for count in range(len(params)):
if count == 0:
new_params.append(
params[count: highest_divisor]
)
new_configs_structs.append(
configs_structs[count: highest_divisor]
)
elif count > 0:
new_params.append(
params[count * highest_divisor: (count + 1) * highest_divisor]
)
new_configs_structs.append(
configs_structs[count * highest_divisor: (count + 1) * highest_divisor]
)

def process_executor(params):
if len_configs_structs > 1:
with ProcessPool(processes=len_configs_structs) as pp:
results = pp.map(
lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], configured_n), params
)
else:
t = params[0]
results = t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], configured_n)
return results

results = flatten(list(map(lambda params: process_executor(params), new_params)))

return results
return flatten(file_handler(temp_files))


def local_simulations(
Expand All @@ -121,15 +132,15 @@ def local_simulations(
SubsetWindows: List[SubsetWindow],
configured_n: List[N_Runs],
additional_objs=None
):
):
config_amt = len(configs_structs)

if config_amt == 1: # and configured_n != 1
if config_amt == 1: # and configured_n != 1
return single_proc_exec(
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list,
Ts, SimIDs, Ns, ExpIDs, SubsetIDs, SubsetWindows, configured_n, additional_objs
)
elif config_amt > 1: # and configured_n != 1
elif config_amt > 1: # and configured_n != 1
return parallelize_simulations(
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list,
Ts, SimIDs, Ns, ExpIDs, SubsetIDs, SubsetWindows, configured_n, additional_objs
Expand Down

0 comments on commit 850c280

Please sign in to comment.