Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor!: rename num_cpus to num_parallel for run_parallel #147

Merged
merged 2 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/tbp/monty/frameworks/config_utils/cmd_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,11 @@ def create_cmd_parser_parallel(all_configs):
choices=list(all_configs.keys()),
)
parser.add_argument(
"-n", "--num_cpus", default=16, help="How many cpus to use", type=int
"-n",
"--num_parallel",
default=16,
help="How many episodes to run in parallel",
type=int,
)
parser.add_argument(
"-q",
Expand Down
36 changes: 20 additions & 16 deletions src/tbp/monty/frameworks/run_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,16 @@
"""
Just like run.py, but run episodes in parallel. Running in parallel is as simple as

`python run_parallel.py -e my_exp -n ${NUM_CPUS}`
`python run_parallel.py -e my_exp -n ${NUM_PARALLEL}`

Assumptions and notes:
--- There are some differences between training and testing in parallel. At train time,
we parallelize across objects, but episodes with the same object are run in serial.
In this case it is best to set num_cpus to num_objects in your dataset if possible.
At test time, we separate all (object, pose) combos into separate jobs and run them
in parallel. In this case, the total_n_jobs is n_objects * n_poses, and the
more cpus the better (assuming you won't have more than total_n_jobs oavialable).
In this case it is best to set num_parallel to num_objects in your dataset if
possible. At test time, we separate all (object, pose) combos into separate jobs and
run them in parallel. In this case, the total_n_jobs is n_objects * n_poses, and
the more parallelism the better (assuming you won't have more than total_n_jobs
available).
--- Only certain experiment classes are supported for training. Right now the focus is
on SupervisedPreTraning. Some classes like ObjectRecognition are inherently
not parallelizable because each episode depends on results from the previous.
Expand Down Expand Up @@ -301,12 +302,12 @@ def post_parallel_train(configs, base_dir):


def run_episodes_parallel(
configs, num_cpus, experiment_name, train=True, is_unittest=False
configs, num_parallel, experiment_name, train=True, is_unittest=False
):
exp_type = "training" if train else "evaluation"
print(
f"-------- Running {exp_type} experiment {experiment_name}"
f" with {num_cpus} cpus --------"
f" with {num_parallel} episodes in parallel --------"
)
start_time = time.time()
if configs[0]["logging_config"]["log_parallel_wandb"]:
Expand All @@ -326,7 +327,7 @@ def run_episodes_parallel(
for config in configs:
run_fn(config)
else:
with mp.Pool(num_cpus) as p:
with mp.Pool(num_parallel) as p:
if train:
# NOTE: since we don't use wandb logging for training right now
# it is also not covered here. Might want to add that in the future.
Expand All @@ -349,7 +350,7 @@ def run_episodes_parallel(
overall_stats["overall/parallel_run_time"] = (
time.time() - start_time
)
overall_stats["overall/num_processes"] = num_cpus
overall_stats["overall/num_processes"] = num_parallel
run.log(overall_stats)
else:
p.map(single_evaluate, configs)
Expand Down Expand Up @@ -380,7 +381,10 @@ def run_episodes_parallel(
else:
print(f"No csv table found at {csv_path} to log to wandb")

print(f"Total time for {len(configs)} using {num_cpus} cpus: {total_time}")
print(
f"Total time for {len(configs)} running {num_parallel} episodes in parallel: "
f"{total_time}"
)
if configs[0]["logging_config"]["log_parallel_wandb"]:
run.finish()

Expand All @@ -389,7 +393,7 @@ def run_episodes_parallel(
# Keep a record of how long everything takes
with open(os.path.join(base_dir, "parallel_log.txt"), "w") as f:
f.write(f"experiment: {experiment_name}\n")
f.write(f"num_cpus: {num_cpus}\n")
f.write(f"num_parallel: {num_parallel}\n")
f.write(f"total_time: {total_time}")


Expand Down Expand Up @@ -512,21 +516,21 @@ def main(
all_configs=None,
exp=None,
experiment=None,
num_cpus=None,
num_parallel=None,
quiet_habitat_logs=True,
print_cfg=False,
is_unittest=False,
):
# Handle args passed directly (only used by unittest) or command line (normal)
if experiment:
assert num_cpus, "missing arg num_cpus"
assert num_parallel, "missing arg num_parallel"
assert exp, "missing arg exp"

else:
cmd_parser = create_cmd_parser_parallel(all_configs=all_configs)
cmd_args = cmd_parser.parse_args()
experiment = cmd_args.experiment
num_cpus = cmd_args.num_cpus
num_parallel = cmd_args.num_parallel
quiet_habitat_logs = cmd_args.quiet_habitat_logs
print_cfg = cmd_args.print_cfg
is_unittest = False
Expand Down Expand Up @@ -556,7 +560,7 @@ def main(
else:
run_episodes_parallel(
train_configs,
num_cpus,
num_parallel,
experiment,
train=True,
is_unittest=is_unittest,
Expand All @@ -575,7 +579,7 @@ def main(
else:
run_episodes_parallel(
eval_configs,
num_cpus,
num_parallel,
experiment,
train=False,
is_unittest=is_unittest,
Expand Down
8 changes: 4 additions & 4 deletions tests/unit/run_parallel_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def test_parallel_runs_n_epochs_lt(self):
run_parallel(
exp=self.supervised_pre_training,
experiment="unittest_supervised_pre_training",
num_cpus=1,
num_parallel=1,
quiet_habitat_logs=True,
print_cfg=False,
is_unittest=True,
Expand Down Expand Up @@ -251,7 +251,7 @@ def test_parallel_runs_n_epochs_lt(self):
run_parallel(
exp=self.eval_config,
experiment="unittest_eval_eq",
num_cpus=1,
num_parallel=1,
quiet_habitat_logs=True,
print_cfg=False,
is_unittest=True,
Expand Down Expand Up @@ -302,7 +302,7 @@ def test_parallel_runs_n_epochs_lt(self):
run_parallel(
exp=self.eval_config_lt,
experiment="unittest_eval_lt",
num_cpus=1,
num_parallel=1,
quiet_habitat_logs=True,
print_cfg=False,
is_unittest=True,
Expand Down Expand Up @@ -342,7 +342,7 @@ def test_parallel_runs_n_epochs_lt(self):
run_parallel(
exp=self.eval_config_gt,
experiment="unittest_eval_gt",
num_cpus=1,
num_parallel=1,
quiet_habitat_logs=True,
print_cfg=False,
is_unittest=True,
Expand Down
Loading