Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Munir/adding actions #63

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
22 changes: 22 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Current File",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
"env": {
"PYTHONPATH": "${workspaceFolder}",
"CUDA_VISIBLE_DEVICES": "0"
},
"args": [ // Pass arguments here
"--fname", "configs/pretrain/vith16_384.yaml"
],
}
]
}
Empty file added app/__init__.py
Empty file.
40 changes: 22 additions & 18 deletions app/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# Copyright (c) NeoCybernetica, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the license found in the
Expand All @@ -17,55 +17,59 @@

parser = argparse.ArgumentParser()
parser.add_argument(
'--fname', type=str,
help='name of config file to load',
default='configs.yaml')
"--fname", type=str, help="name of config file to load", default="configs/pretrain/vith16_384.yaml"
)
parser.add_argument(
'--devices', type=str, nargs='+', default=['cuda:0'],
help='which devices to use on local machine')
"--devices",
type=str,
nargs="+",
default=["cuda:0"],
help="which devices to use on local machine",
)


def process_main(rank, fname, world_size, devices):
import os
os.environ['CUDA_VISIBLE_DEVICES'] = str(devices[rank].split(':')[-1])

os.environ["CUDA_VISIBLE_DEVICES"] = str(devices[rank].split(":")[-1])

import logging
from src.utils.logging import get_logger

logger = get_logger(force=True)
if rank == 0:
logger.setLevel(logging.INFO)
else:
logger.setLevel(logging.ERROR)

logger.info(f'called-params {fname}')
logger.info(f"called-params {fname}")

# Load config
params = None
with open(fname, 'r') as y_file:
with open(fname, "r") as y_file:
params = yaml.load(y_file, Loader=yaml.FullLoader)
logger.info('loaded params...')
logger.info("loaded params...")

# Log config
if rank == 0:
pprint.PrettyPrinter(indent=4).pprint(params)
dump = os.path.join(params['logging']['folder'], 'params-pretrain.yaml')
with open(dump, 'w') as f:
dump = os.path.join(params["logging"]["folder"], "params-pretrain.yaml")
with open(dump, "w") as f:
yaml.dump(params, f)

# Init distributed (access to comm between GPUS on same machine)
world_size, rank = init_distributed(rank_and_world_size=(rank, world_size))
logger.info(f'Running... (rank: {rank}/{world_size})')
logger.info(f"Running... (rank: {rank}/{world_size})")

# Launch the app with loaded config
app_main(params['app'], args=params)
app_main(params["app"], args=params)


if __name__ == '__main__':
if __name__ == "__main__":
args = parser.parse_args()
num_gpus = len(args.devices)
mp.set_start_method('spawn')
mp.set_start_method("spawn")
for rank in range(num_gpus):
mp.Process(
target=process_main,
args=(rank, args.fname, num_gpus, args.devices)
target=process_main, args=(rank, args.fname, num_gpus, args.devices)
).start()
77 changes: 42 additions & 35 deletions app/main_distributed.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# Copyright (c) NeoCybernetica, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the license found in the
Expand All @@ -20,32 +20,33 @@

parser = argparse.ArgumentParser()
parser.add_argument(
'--folder', type=str,
help='location to save submitit logs',
default='/fsx-jepa/massran/submitit/')
"--folder",
type=str,
help="location to save submitit logs",
default="/fsx-jepa/massran/submitit/",
)
parser.add_argument(
'--exclude', type=str,
help='nodes to exclude from training',
default=None)
"--exclude", type=str, help="nodes to exclude from training", default=None
)
parser.add_argument(
'--batch-launch', action='store_true',
help='whether fname points to a file to batch-lauch several config files')
"--batch-launch",
action="store_true",
help="whether fname points to a file to batch-lauch several config files",
)
parser.add_argument(
'--fname', type=str,
help='yaml file containing config file names to launch',
default='configs.yaml')
parser.add_argument(
'--partition', type=str,
help='cluster partition to submit jobs on')
parser.add_argument(
'--time', type=int, default=4300,
help='time in minutes to run job')
"--fname",
type=str,
help="yaml file containing config file names to launch",
default="configs.yaml",
)
parser.add_argument("--partition", type=str, help="cluster partition to submit jobs on")
parser.add_argument("--time", type=int, default=4300, help="time in minutes to run job")


class Trainer:

def __init__(self, args_pretrain, load_model=None):
self.app = args_pretrain['app']
self.app = args_pretrain["app"]
self.args_pretrain = args_pretrain
self.load_model = load_model

Expand All @@ -54,7 +55,7 @@ def __call__(self):
params = self.args_pretrain
load_model = self.load_model

logger.info('loaded pretrain params...')
logger.info("loaded pretrain params...")
pp = pprint.PrettyPrinter(indent=4)
pp.pprint(params)

Expand All @@ -64,7 +65,9 @@ def __call__(self):

def checkpoint(self):
fb_trainer = Trainer(self.args_pretrain, True)
return submitit.helpers.DelayedSubmission(fb_trainer,)
return submitit.helpers.DelayedSubmission(
fb_trainer,
)


def launch_app_with_parsed_args(
Expand All @@ -74,19 +77,20 @@ def launch_app_with_parsed_args(
timeout=4300,
nodes=1,
tasks_per_node=1,
exclude_nodes=None
exclude_nodes=None,
):
executor = submitit.AutoExecutor(
folder=os.path.join(submitit_folder, 'job_%j'),
slurm_max_num_timeout=20)
folder=os.path.join(submitit_folder, "job_%j"), slurm_max_num_timeout=20
)
executor.update_parameters(
slurm_partition=partition,
slurm_mem_per_gpu='55G',
slurm_mem_per_gpu="55G",
timeout_min=timeout,
nodes=nodes,
tasks_per_node=tasks_per_node,
cpus_per_task=12,
gpus_per_node=tasks_per_node)
gpus_per_node=tasks_per_node,
)

if args.exclude is not None:
executor.update_parameters(slurm_exclude=args.exclude)
Expand All @@ -95,7 +99,9 @@ def launch_app_with_parsed_args(
with executor.batch():
for ap in args_for_pretrain:
fb_trainer = Trainer(ap)
job = executor.submit(fb_trainer,)
job = executor.submit(
fb_trainer,
)
trainers.append(fb_trainer)
jobs.append(job)

Expand All @@ -114,7 +120,7 @@ def launch():
# -- config, but actually specifies a list of other config files
# -- to run in a slurm job array
if args.batch_launch:
with open(args.fname, 'r') as y_file:
with open(args.fname, "r") as y_file:
config_fnames = yaml.load(y_file, Loader=yaml.FullLoader)
# ---------------------------------------------------------------------- #

Expand All @@ -124,13 +130,13 @@ def launch():
nodes, tasks_per_node = None, None
configs = []
for f in config_fnames:
with open(f, 'r') as y_file:
with open(f, "r") as y_file:
_params = yaml.load(y_file, Loader=yaml.FullLoader)
nodes = int(_params.get('nodes'))
tasks_per_node = int(_params.get('tasks_per_node'))
nodes = int(_params.get("nodes"))
tasks_per_node = int(_params.get("tasks_per_node"))
configs += [_params]
logger.info(f'Loaded {len(configs)} config files')
logger.info(f'Running all jobs with {nodes=} / {tasks_per_node=}')
logger.info(f"Loaded {len(configs)} config files")
logger.info(f"Running all jobs with {nodes=} / {tasks_per_node=}")
# ---------------------------------------------------------------------- #

# ---------------------------------------------------------------------- #
Expand All @@ -143,10 +149,11 @@ def launch():
timeout=args.time,
nodes=nodes,
tasks_per_node=tasks_per_node,
exclude_nodes=args.exclude)
exclude_nodes=args.exclude,
)
# ---------------------------------------------------------------------- #


if __name__ == '__main__':
if __name__ == "__main__":
args = parser.parse_args()
launch()
76 changes: 76 additions & 0 deletions app/main_with_actions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# In app/main_with_actions.py

# Copyright (c) NeoCybernetica, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree.
#

import argparse
import pprint
import yaml
import os
import logging
import traceback

from app.scaffold import main as app_main
from src.utils.distributed import init_distributed
from app.vjepa.train_with_actions import main as train # Import the main function from train_with_actions.py


def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--fname",
type=str,
help="name of config file to load",
default="configs/pretrain/vith16_384.yaml",
)
parser.add_argument(
"--devices",
type=str,
nargs="+",
default=["cuda:0"],
help="which devices to use on local machine",
)

args = parser.parse_args()

# Initialize logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.info(f"Called parameters: {args.fname}")

# Load configuration from YAML file
with open(args.fname, "r") as y_file:
params = yaml.load(y_file, Loader=yaml.FullLoader)
logger.info("Loaded configuration parameters.")

# Pretty print the configuration parameters
pprint.PrettyPrinter(indent=4).pprint(params)

# Save the configuration parameters to a YAML file
dump_file = os.path.join(params["logging"]["folder"], "params-pretrain.yaml")
os.makedirs(os.path.dirname(dump_file), exist_ok=True)
with open(dump_file, "w") as f:
yaml.dump(params, f)

# Initialize distributed training (for single GPU, world_size and rank will be 1 and 0 respectively)
num_gpus = len(args.devices)
rank = 0 # Since you're on a single GPU
world_size, rank = init_distributed(rank_and_world_size=(rank, num_gpus)) # Update for single GPU
logger.info(f"Running... (rank: {rank}/{world_size})")

# Setup environment variables for GPU visibility
os.environ["CUDA_VISIBLE_DEVICES"] = str(args.devices[rank].split(":")[-1])

# Launch the app with loaded config
try:
train(args=params, world_size=world_size, rank=rank)
except Exception as e:
logger.error(f"An error occurred during training: {traceback.format_exc}")
raise e

if __name__ == "__main__":
main()
10 changes: 5 additions & 5 deletions app/scaffold.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# Copyright (c) NeoCybernetica, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the license found in the
Expand All @@ -15,7 +15,7 @@

def main(app, args, resume_preempt=False):

logger.info(f'Running pre-training of app: {app}')
return importlib.import_module(f'app.{app}.train').main(
args=args,
resume_preempt=resume_preempt)
logger.info(f"Running pre-training of app: {app}")
return importlib.import_module(f"app.{app}.train").main(
args=args, resume_preempt=resume_preempt
)
Empty file added app/vjepa/__init__.py
Empty file.
Loading