From 333b20d3277200bfc9dac778e9abe97f618d74c2 Mon Sep 17 00:00:00 2001 From: Tristan Slominski Date: Tue, 21 Jan 2025 16:02:05 -0600 Subject: [PATCH 1/2] refactor!: rename num_cpus to num_parallel for run_parallel --- .../frameworks/config_utils/cmd_parser.py | 6 ++++- src/tbp/monty/frameworks/run_parallel.py | 22 +++++++++---------- tests/unit/run_parallel_test.py | 8 +++---- 3 files changed, 20 insertions(+), 16 deletions(-) diff --git a/src/tbp/monty/frameworks/config_utils/cmd_parser.py b/src/tbp/monty/frameworks/config_utils/cmd_parser.py index 38922d5f..77b6633d 100644 --- a/src/tbp/monty/frameworks/config_utils/cmd_parser.py +++ b/src/tbp/monty/frameworks/config_utils/cmd_parser.py @@ -121,7 +121,11 @@ def create_cmd_parser_parallel(all_configs): choices=list(all_configs.keys()), ) parser.add_argument( - "-n", "--num_cpus", default=16, help="How many cpus to use", type=int + "-n", + "--num_parallel", + default=16, + help="How many episodes to run in parallel", + type=int, ) parser.add_argument( "-q", diff --git a/src/tbp/monty/frameworks/run_parallel.py b/src/tbp/monty/frameworks/run_parallel.py index 4c49103a..ebb6f98c 100644 --- a/src/tbp/monty/frameworks/run_parallel.py +++ b/src/tbp/monty/frameworks/run_parallel.py @@ -301,12 +301,12 @@ def post_parallel_train(configs, base_dir): def run_episodes_parallel( - configs, num_cpus, experiment_name, train=True, is_unittest=False + configs, num_parallel, experiment_name, train=True, is_unittest=False ): exp_type = "training" if train else "evaluation" print( f"-------- Running {exp_type} experiment {experiment_name}" - f" with {num_cpus} cpus --------" + f" with {num_parallel} episodes in parallel --------" ) start_time = time.time() if configs[0]["logging_config"]["log_parallel_wandb"]: @@ -326,7 +326,7 @@ def run_episodes_parallel( for config in configs: run_fn(config) else: - with mp.Pool(num_cpus) as p: + with mp.Pool(num_parallel) as p: if train: # NOTE: since we don't use wandb logging for training right now # it is also not covered here. Might want to add that in the future. @@ -349,7 +349,7 @@ def run_episodes_parallel( overall_stats["overall/parallel_run_time"] = ( time.time() - start_time ) - overall_stats["overall/num_processes"] = num_cpus + overall_stats["overall/num_processes"] = num_parallel run.log(overall_stats) else: p.map(single_evaluate, configs) @@ -380,7 +380,7 @@ def run_episodes_parallel( else: print(f"No csv table found at {csv_path} to log to wandb") - print(f"Total time for {len(configs)} using {num_cpus} cpus: {total_time}") + print(f"Total time for {len(configs)} using {num_parallel} cpus: {total_time}") if configs[0]["logging_config"]["log_parallel_wandb"]: run.finish() @@ -389,7 +389,7 @@ def run_episodes_parallel( # Keep a record of how long everything takes with open(os.path.join(base_dir, "parallel_log.txt"), "w") as f: f.write(f"experiment: {experiment_name}\n") - f.write(f"num_cpus: {num_cpus}\n") + f.write(f"num_cpus: {num_parallel}\n") f.write(f"total_time: {total_time}") @@ -512,21 +512,21 @@ def main( all_configs=None, exp=None, experiment=None, - num_cpus=None, + num_parallel=None, quiet_habitat_logs=True, print_cfg=False, is_unittest=False, ): # Handle args passed directly (only used by unittest) or command line (normal) if experiment: - assert num_cpus, "missing arg num_cpus" + assert num_parallel, "missing arg num_parallel" assert exp, "missing arg exp" else: cmd_parser = create_cmd_parser_parallel(all_configs=all_configs) cmd_args = cmd_parser.parse_args() experiment = cmd_args.experiment - num_cpus = cmd_args.num_cpus + num_parallel = cmd_args.num_parallel quiet_habitat_logs = cmd_args.quiet_habitat_logs print_cfg = cmd_args.print_cfg is_unittest = False @@ -556,7 +556,7 @@ def main( else: run_episodes_parallel( train_configs, - num_cpus, + num_parallel, experiment, train=True, is_unittest=is_unittest, @@ -575,7 +575,7 @@ def main( else: run_episodes_parallel( eval_configs, - num_cpus, + num_parallel, experiment, train=False, is_unittest=is_unittest, diff --git a/tests/unit/run_parallel_test.py b/tests/unit/run_parallel_test.py index 86016771..dd2eaa67 100644 --- a/tests/unit/run_parallel_test.py +++ b/tests/unit/run_parallel_test.py @@ -183,7 +183,7 @@ def test_parallel_runs_n_epochs_lt(self): run_parallel( exp=self.supervised_pre_training, experiment="unittest_supervised_pre_training", - num_cpus=1, + num_parallel=1, quiet_habitat_logs=True, print_cfg=False, is_unittest=True, @@ -251,7 +251,7 @@ def test_parallel_runs_n_epochs_lt(self): run_parallel( exp=self.eval_config, experiment="unittest_eval_eq", - num_cpus=1, + num_parallel=1, quiet_habitat_logs=True, print_cfg=False, is_unittest=True, @@ -302,7 +302,7 @@ def test_parallel_runs_n_epochs_lt(self): run_parallel( exp=self.eval_config_lt, experiment="unittest_eval_lt", - num_cpus=1, + num_parallel=1, quiet_habitat_logs=True, print_cfg=False, is_unittest=True, @@ -342,7 +342,7 @@ def test_parallel_runs_n_epochs_lt(self): run_parallel( exp=self.eval_config_gt, experiment="unittest_eval_gt", - num_cpus=1, + num_parallel=1, quiet_habitat_logs=True, print_cfg=False, is_unittest=True, From 1787525dfcdd5e8b5db6bc84dc0a0bb92f5b0cd1 Mon Sep 17 00:00:00 2001 From: Tristan Slominski Date: Tue, 21 Jan 2025 16:13:32 -0600 Subject: [PATCH 2/2] refactor!: log num_cpus to num_parallel --- src/tbp/monty/frameworks/run_parallel.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/tbp/monty/frameworks/run_parallel.py b/src/tbp/monty/frameworks/run_parallel.py index ebb6f98c..986b8ecc 100644 --- a/src/tbp/monty/frameworks/run_parallel.py +++ b/src/tbp/monty/frameworks/run_parallel.py @@ -40,15 +40,16 @@ """ Just like run.py, but run episodes in parallel. Running in parallel is as simple as -`python run_parallel.py -e my_exp -n ${NUM_CPUS}` +`python run_parallel.py -e my_exp -n ${NUM_PARALLEL}` Assumptions and notes: --- There are some differences between training and testing in parallel. At train time, we parallelize across objects, but episodes with the same object are run in serial. - In this case it is best to set num_cpus to num_objects in your dataset if possible. - At test time, we separate all (object, pose) combos into separate jobs and run them - in parallel. In this case, the total_n_jobs is n_objects * n_poses, and the - more cpus the better (assuming you won't have more than total_n_jobs oavialable). + In this case it is best to set num_parallel to num_objects in your dataset if + possible. At test time, we separate all (object, pose) combos into separate jobs and + run them in parallel. In this case, the total_n_jobs is n_objects * n_poses, and + the more parallelism the better (assuming you won't have more than total_n_jobs + available). --- Only certain experiment classes are supported for training. Right now the focus is on SupervisedPreTraning. Some classes like ObjectRecognition are inherently not parallelizable because each episode depends on results from the previous. @@ -380,7 +381,10 @@ def run_episodes_parallel( else: print(f"No csv table found at {csv_path} to log to wandb") - print(f"Total time for {len(configs)} using {num_parallel} cpus: {total_time}") + print( + f"Total time for {len(configs)} running {num_parallel} episodes in parallel: " + f"{total_time}" + ) if configs[0]["logging_config"]["log_parallel_wandb"]: run.finish() @@ -389,7 +393,7 @@ def run_episodes_parallel( # Keep a record of how long everything takes with open(os.path.join(base_dir, "parallel_log.txt"), "w") as f: f.write(f"experiment: {experiment_name}\n") - f.write(f"num_cpus: {num_parallel}\n") + f.write(f"num_parallel: {num_parallel}\n") f.write(f"total_time: {total_time}")