diff --git a/python/pylibwholegraph/examples/node_classfication.py b/python/pylibwholegraph/examples/node_classfication.py index c007ff096..5994d5ece 100644 --- a/python/pylibwholegraph/examples/node_classfication.py +++ b/python/pylibwholegraph/examples/node_classfication.py @@ -14,29 +14,26 @@ import datetime import os import time -from optparse import OptionParser +import argparse import apex import torch from apex.parallel import DistributedDataParallel as DDP - import pylibwholegraph.torch as wgth -parser = OptionParser() -wgth.add_distributed_launch_options(parser) -wgth.add_training_options(parser) -wgth.add_common_graph_options(parser) -wgth.add_common_model_options(parser) -wgth.add_common_sampler_options(parser) -wgth.add_node_classfication_options(parser) -wgth.add_dataloader_options(parser) -parser.add_option( +argparser = argparse.ArgumentParser() +wgth.add_distributed_launch_options(argparser) +wgth.add_training_options(argparser) +wgth.add_common_graph_options(argparser) +wgth.add_common_model_options(argparser) +wgth.add_common_sampler_options(argparser) +wgth.add_node_classfication_options(argparser) +wgth.add_dataloader_options(argparser) +argparser.add_argument( "--fp16_embedding", action="store_true", dest="fp16_mbedding", default=False, help="Whether to use fp16 embedding" ) - - -(options, args) = parser.parse_args() +args = argparser.parse_args() def valid_test(dataloader, model, name): @@ -68,7 +65,7 @@ def valid(valid_dataloader, model): def test(test_dataset, model): - test_dataloader = wgth.get_valid_test_dataloader(test_dataset, options.batchsize) + test_dataloader = wgth.get_valid_test_dataloader(test_dataset, args.batchsize) valid_test(test_dataloader, model, "TEST") @@ -77,19 +74,19 @@ def train(train_data, valid_data, model, optimizer, wm_optimizer, global_comm): print("start training...") train_dataloader = wgth.get_train_dataloader( train_data, - options.batchsize, + args.batchsize, replica_id=wgth.get_rank(), num_replicas=wgth.get_world_size(), - num_workers=options.dataloaderworkers, + num_workers=args.dataloaderworkers, ) - valid_dataloader = wgth.get_valid_test_dataloader(valid_data, options.batchsize) + valid_dataloader = wgth.get_valid_test_dataloader(valid_data, args.batchsize) valid(valid_dataloader, model) train_step = 0 epoch = 0 loss_fcn = torch.nn.CrossEntropyLoss() train_start_time = time.time() - while epoch < options.epochs: + while epoch < args.epochs: for i, (idx, label) in enumerate(train_dataloader): label = torch.reshape(label, (-1,)).cuda() optimizer.zero_grad() @@ -99,7 +96,7 @@ def train(train_data, valid_data, model, optimizer, wm_optimizer, global_comm): loss.backward() optimizer.step() if wm_optimizer is not None: - wm_optimizer.step(options.lr * 0.1) + wm_optimizer.step(args.lr * 0.1) if wgth.get_rank() == 0 and train_step % 100 == 0: print( "[%s] [LOSS] step=%d, loss=%f" @@ -121,7 +118,7 @@ def train(train_data, valid_data, model, optimizer, wm_optimizer, global_comm): ) print( "[EPOCH_TIME] %.2f seconds." - % ((train_end_time - train_start_time) / options.epochs,) + % ((train_end_time - train_start_time) / args.epochs,) ) valid(valid_dataloader, model) @@ -135,11 +132,11 @@ def main_func(): wgth.get_local_size(), ) - if options.use_cpp_ext: + if args.use_cpp_ext: wgth.compile_cpp_extension() train_ds, valid_ds, test_ds = wgth.create_node_claffication_datasets( - options.pickle_data_path + args.pickle_data_path ) graph_structure = wgth.GraphStructure() @@ -152,70 +149,70 @@ def main_func(): graph_comm = global_comm graph_structure_wholememory_type = "continuous" graph_structure_wholememory_location = "cuda" - if not options.use_global_embedding: - options.use_global_embedding = True + if not args.use_global_embedding: + args.use_global_embedding = True print("Changing to using global communicator for embedding...") - if options.embedding_memory_type == "chunked": + if args.embedding_memory_type == "chunked": print("Changing to continuous wholememory for embedding...") - options.embedding_memory_type = "continuous" + args.embedding_memory_type = "continuous" csr_row_ptr_wm_tensor = wgth.create_wholememory_tensor_from_filelist( graph_comm, graph_structure_wholememory_type, graph_structure_wholememory_location, - os.path.join(options.root_dir, "homograph_csr_row_ptr"), + os.path.join(args.root_dir, "homograph_csr_row_ptr"), torch.int64, ) csr_col_ind_wm_tensor = wgth.create_wholememory_tensor_from_filelist( graph_comm, graph_structure_wholememory_type, graph_structure_wholememory_location, - os.path.join(options.root_dir, "homograph_csr_col_idx"), + os.path.join(args.root_dir, "homograph_csr_col_idx"), torch.int, ) graph_structure.set_csr_graph(csr_row_ptr_wm_tensor, csr_col_ind_wm_tensor) - feature_comm = global_comm if options.use_global_embedding else local_comm + feature_comm = global_comm if args.use_global_embedding else local_comm - embedding_wholememory_type = options.embedding_memory_type + embedding_wholememory_type = args.embedding_memory_type embedding_wholememory_location = ( - "cpu" if options.cache_type != "none" or options.cache_ratio == 0.0 else "cuda" + "cpu" if args.cache_type != "none" or args.cache_ratio == 0.0 else "cuda" ) - if options.cache_ratio == 0.0: - options.cache_type = "none" - access_type = "readonly" if options.train_embedding is False else "readwrite" + if args.cache_ratio == 0.0: + args.cache_type = "none" + access_type = "readonly" if args.train_embedding is False else "readwrite" if wgth.get_rank() == 0: print( f"graph_structure: type={graph_structure_wholememory_type}, " f"location={graph_structure_wholememory_location}\n" f"embedding: type={embedding_wholememory_type}, location={embedding_wholememory_location}, " - f"cache_type={options.cache_type}, cache_ratio={options.cache_ratio}, " - f"trainable={options.train_embedding}" + f"cache_type={args.cache_type}, cache_ratio={args.cache_ratio}, " + f"trainable={args.train_embedding}" ) cache_policy = wgth.create_builtin_cache_policy( - options.cache_type, + args.cache_type, embedding_wholememory_type, embedding_wholememory_location, access_type, - options.cache_ratio, + args.cache_ratio, ) wm_optimizer = ( None - if options.train_embedding is False + if args.train_embedding is False else wgth.create_wholememory_optimizer("adam", {}) ) - embedding_dtype = torch.float32 if not options.fp16_mbedding else torch.float16 + embedding_dtype = torch.float32 if not args.fp16_mbedding else torch.float16 if wm_optimizer is None: node_feat_wm_embedding = wgth.create_embedding_from_filelist( feature_comm, embedding_wholememory_type, embedding_wholememory_location, - os.path.join(options.root_dir, "node_feat.bin"), + os.path.join(args.root_dir, "node_feat.bin"), embedding_dtype, - options.feat_dim, + args.feat_dim, optimizer=wm_optimizer, cache_policy=cache_policy, ) @@ -225,16 +222,16 @@ def main_func(): embedding_wholememory_type, embedding_wholememory_location, embedding_dtype, - [graph_structure.node_count, options.feat_dim], + [graph_structure.node_count, args.feat_dim], optimizer=wm_optimizer, cache_policy=cache_policy, random_init=True, ) - wgth.set_framework(options.framework) - model = wgth.HomoGNNModel(graph_structure, node_feat_wm_embedding, options) + wgth.set_framework(args.framework) + model = wgth.HomoGNNModel(graph_structure, node_feat_wm_embedding, args) model.cuda() model = DDP(model, delay_allreduce=True) - optimizer = apex.optimizers.FusedAdam(model.parameters(), lr=options.lr) + optimizer = apex.optimizers.FusedAdam(model.parameters(), lr=args.lr) train(train_ds, valid_ds, model, optimizer, wm_optimizer, global_comm) test(test_ds, model) @@ -243,4 +240,4 @@ def main_func(): if __name__ == "__main__": - wgth.distributed_launch(options, main_func) + wgth.distributed_launch(args, main_func) diff --git a/python/pylibwholegraph/pylibwholegraph/torch/common_options.py b/python/pylibwholegraph/pylibwholegraph/torch/common_options.py index 9a4723629..7697d8b5d 100644 --- a/python/pylibwholegraph/pylibwholegraph/torch/common_options.py +++ b/python/pylibwholegraph/pylibwholegraph/torch/common_options.py @@ -11,51 +11,51 @@ # See the License for the specific language governing permissions and # limitations under the License. -from optparse import OptionParser +from argparse import ArgumentParser -def add_training_options(parser: OptionParser): - parser.add_option( - "-e", "--epochs", type="int", dest="epochs", default=24, help="number of epochs" +def add_training_options(argparser: ArgumentParser): + argparser.add_argument( + "-e", "--epochs", type=int, dest="epochs", default=24, help="number of epochs" ) - parser.add_option( + argparser.add_argument( "-b", "--batchsize", - type="int", + type=int, dest="batchsize", default=1024, help="batch size", ) - parser.add_option( - "--lr", type="float", dest="lr", default=0.003, help="learning rate" + argparser.add_argument( + "--lr", type=float, dest="lr", default=0.003, help="learning rate" ) - parser.add_option( + argparser.add_argument( "--embedding-memory-type", dest="embedding_memory_type", default="chunked", help="Embedding memory type, should be: continuous, chunked or distributed", ) - parser.add_option( + argparser.add_argument( "--cache-type", dest="cache_type", default="none", help="Embedding cache type, should be: none, local_device, local_node or all_devices", ) - parser.add_option( + argparser.add_argument( "--cache-ratio", - type="float", + type=float, dest="cache_ratio", default=0.5, help="cache ratio", ) - parser.add_option( + argparser.add_argument( "--use-cpp-ext", action="store_true", dest="use_cpp_ext", default=False, help="Whether to use cpp extension for pytorch" ) - parser.add_option( + argparser.add_argument( "--train-embedding", action="store_true", dest="train_embedding", @@ -64,96 +64,97 @@ def add_training_options(parser: OptionParser): ) -def add_common_graph_options(parser: OptionParser): - parser.add_option( +def add_common_graph_options(argparser: ArgumentParser): + argparser.add_argument( "-r", "--root-dir", dest="root_dir", default="dataset", help="graph dataset root directory.", ) - parser.add_option( + argparser.add_argument( "--use-global-embedding", action="store_true", dest="use_global_embedding", default=False, help="Store embedding across all ranks or only in local node.", ) - parser.add_option( + argparser.add_argument( "--feat-dim", - type="int", + type=int, dest="feat_dim", default=100, help="default feature dim", ) -def add_common_model_options(parser: OptionParser): - parser.add_option( - "--hiddensize", type="int", dest="hiddensize", default=256, help="hidden size" +def add_common_model_options(argparser: ArgumentParser): + argparser.add_argument( + "--hiddensize", type=int, dest="hiddensize", default=256, help="hidden size" ) - parser.add_option( - "-l", "--layernum", type="int", dest="layernum", default=3, help="layer number" + argparser.add_argument( + "-l", "--layernum", type=int, dest="layernum", default=3, help="layer number" ) - parser.add_option( + argparser.add_argument( "-m", "--model", dest="model", default="sage", help="model type, valid values are: sage, gcn, gat", ) - parser.add_option( + argparser.add_argument( "-f", "--framework", dest="framework", default="cugraph", help="framework type, valid values are: dgl, pyg, wg, cugraph", ) - parser.add_option("--heads", type="int", dest="heads", default=4, help="num heads") - parser.add_option( - "-d", "--dropout", type="float", dest="dropout", default=0.5, help="dropout" + argparser.add_argument("--heads", type=int, dest="heads", default=4, help="num heads") + argparser.add_argument( + "-d", "--dropout", type=float, dest="dropout", default=0.5, help="dropout" ) -def add_common_sampler_options(parser: OptionParser): - parser.add_option( +def add_common_sampler_options(argparser: ArgumentParser): + argparser.add_argument( "-n", "--neighbors", dest="neighbors", default="30,30,30", help="train neighboor sample count", ) - parser.add_option( + argparser.add_argument( "-s", "--inferencesample", + type=int, dest="inferencesample", default="30", help="inference sample count, -1 is all", ) -def add_node_classfication_options(parser: OptionParser): - parser.add_option( +def add_node_classfication_options(argparser: ArgumentParser): + argparser.add_argument( "-c", "--classnum", - type="int", + type=int, dest="classnum", default=172, help="class number", ) -def add_dataloader_options(parser: OptionParser): - parser.add_option( +def add_dataloader_options(argparser: ArgumentParser): + argparser.add_argument( "--pickle-data-path", dest="pickle_data_path", default="", help="training data file path, should be pickled dict", ) - parser.add_option( + argparser.add_argument( "-w", "--dataloaderworkers", - type="int", + type=int, dest="dataloaderworkers", default=0, help="number of workers for dataloader", diff --git a/python/pylibwholegraph/pylibwholegraph/torch/distributed_launch.py b/python/pylibwholegraph/pylibwholegraph/torch/distributed_launch.py index 3585b847d..8fbcbfa4d 100644 --- a/python/pylibwholegraph/pylibwholegraph/torch/distributed_launch.py +++ b/python/pylibwholegraph/pylibwholegraph/torch/distributed_launch.py @@ -12,7 +12,7 @@ # limitations under the License. import os -from optparse import OptionParser +from argparse import ArgumentParser class DistributedConfig(object): @@ -81,45 +81,45 @@ def is_main_process(): return get_rank() == 0 -def add_distributed_launch_options(parser: OptionParser): - parser.add_option( +def add_distributed_launch_options(parser: ArgumentParser): + parser.add_argument( "--launch-agent", dest="launch_agent", default="mpi", help="launch agent used, mpi, pytorch or spawn", ) # command line flags - parser.add_option( + parser.add_argument( "--rank", dest="rank", type=int, default=-1, help="command line flag for rank" ) - parser.add_option( + parser.add_argument( "--world-size", dest="world_size", type=int, default=-1, help="command line flag for world_size", ) - parser.add_option( + parser.add_argument( "--local-rank", dest="local_rank", type=int, default=-1, help="command line flag for local_rank", ) - parser.add_option( + parser.add_argument( "--local-size", dest="local_size", type=int, default=-1, help="command line flag for local_size", ) - parser.add_option( + parser.add_argument( "--master-addr", dest="master_addr", default="", help="command line flag for master_addr", ) - parser.add_option( + parser.add_argument( "--master-port", dest="master_port", type=int, @@ -127,37 +127,37 @@ def add_distributed_launch_options(parser: OptionParser): help="command line flag for master_port", ) # environment variable names - parser.add_option( + parser.add_argument( "--launch-env-name-world-rank", dest="launch_env_name_world_rank", default="RANK", help="environment variable name for world rank", ) - parser.add_option( + parser.add_argument( "--launch-env-name-world-size", dest="launch_env_name_world_size", default="WORLD_SIZE", help="environment variable name for world size", ) - parser.add_option( + parser.add_argument( "--launch-env-name-local-rank", dest="launch_env_name_local_rank", default="LOCAL_RANK", help="environment variable name for local rank", ) - parser.add_option( + parser.add_argument( "--launch-env-name-local-size", dest="launch_env_name_local_size", default="LOCAL_WORLD_SIZE", help="environment variable name for local size", ) - parser.add_option( + parser.add_argument( "--launch-env-name-master-addr", dest="launch_env_name_master_addr", default="MASTER_ADDR", help="environment variable name for master_addr", ) - parser.add_option( + parser.add_argument( "--launch-env-name-master-port", dest="launch_env_name_master_port", default="MASTER_PORT", @@ -187,7 +187,7 @@ def get_value_from_option_and_env( return option_value -def distributed_launch_mpi(options, main_func): +def distributed_launch_mpi(args, main_func): from mpi4py import MPI mpi_communicator = MPI.COMM_WORLD @@ -199,11 +199,11 @@ def distributed_launch_mpi(options, main_func): distributed_config.local_rank = shared_mpi_communicator.Get_rank() distributed_config.local_size = shared_mpi_communicator.Get_size() distributed_config.master_addr = get_value_from_option_and_env( - options.master_addr, options.launch_env_name_master_addr, "", "localhost" + args.master_addr, args.launch_env_name_master_addr, "", "localhost" ) distributed_config.master_port = int( get_value_from_option_and_env( - options.master_port, options.launch_env_name_master_port, -1, 12335 + args.master_port, args.launch_env_name_master_port, -1, 12335 ) ) @@ -216,33 +216,33 @@ def distributed_launch_mpi(options, main_func): def distributed_launch_pytorch( - options, + args, main_func, ): global distributed_config distributed_config.rank = int( - get_value_from_env(options.launch_env_name_world_rank) + get_value_from_env(args.launch_env_name_world_rank) ) distributed_config.world_size = int( - get_value_from_env(options.launch_env_name_world_size) + get_value_from_env(args.launch_env_name_world_size) ) distributed_config.local_rank = int( get_value_from_option_and_env( - options.local_rank, options.launch_env_name_local_rank, -1 + args.local_rank, args.launch_env_name_local_rank, -1 ) ) assert distributed_config.local_rank >= 0 distributed_config.local_size = int( get_value_from_option_and_env( - options.local_size, options.launch_env_name_local_size, -1 + args.local_size, args.launch_env_name_local_size, -1 ) ) assert distributed_config.local_size > 0 distributed_config.master_addr = get_value_from_env( - options.launch_env_name_master_addr + args.launch_env_name_master_addr ) distributed_config.master_port = int( - get_value_from_env(options.launch_env_name_master_port) + get_value_from_env(args.launch_env_name_master_port) ) main_func() @@ -268,30 +268,30 @@ def main_spawn_routine(local_rank, main_func, distributed_config_input): main_func() -def distributed_launch_spawn(options, main_func): +def distributed_launch_spawn(args, main_func): global distributed_config distributed_config.rank = int( get_value_from_option_and_env( - options.rank, options.launch_env_name_world_rank, -1, 0 + args.rank, args.launch_env_name_world_rank, -1, 0 ) ) distributed_config.world_size = int( get_value_from_option_and_env( - options.world_size, options.launch_env_name_world_size, -1, 1 + args.world_size, args.launch_env_name_world_size, -1, 1 ) ) distributed_config.local_rank = 0 distributed_config.local_size = int( get_value_from_option_and_env( - options.local_size, options.launch_env_name_local_size, -1, 1 + args.local_size, args.launch_env_name_local_size, -1, 1 ) ) distributed_config.master_addr = get_value_from_option_and_env( - options.master_addr, options.launch_env_name_master_addr, "", "localhost" + args.master_addr, args.launch_env_name_master_addr, "", "localhost" ) distributed_config.master_port = int( get_value_from_option_and_env( - options.master_port, options.launch_env_name_master_port, -1, 12335 + args.master_port, args.launch_env_name_master_port, -1, 12335 ) ) @@ -307,25 +307,25 @@ def distributed_launch_spawn(options, main_func): main_spawn_routine(0, main_func, distributed_config) -def distributed_launch(options, main_func): +def distributed_launch(args, main_func): assert ( - options.launch_agent == "mpi" - or options.launch_agent == "pytorch" - or options.launch_agent == "spawn" + args.launch_agent == "mpi" + or args.launch_agent == "pytorch" + or args.launch_agent == "spawn" ) - if options.launch_agent == "mpi": + if args.launch_agent == "mpi": # use MPI to launch multiprocess # when using MPI, command is like: # mpirun python [train_script.py] - distributed_launch_mpi(options, main_func) - elif options.launch_agent == "pytorch": + distributed_launch_mpi(args, main_func) + elif args.launch_agent == "pytorch": # use pytorch DDP to launch multiprocess # when using pytorch DDP, assume two nodes with 8 GPU each, command is like: # on node1: python -m torch.distributed.run --nproc_per_node=8 --nnodes=2 --node_rank=0 --master_addr=node1 # --master_port=12335 [train_script.py] --launch_agent=pytorch # on node2: python -m torch.distributed.run --nproc_per_node=8 --nnodes=2 --node_rank=1 --master_addr=node1 # --master_port=12335 [train_script.py] --launch_agent=pytorch - distributed_launch_pytorch(options, main_func) + distributed_launch_pytorch(args, main_func) else: # cluster scheduler # when using spawn to create multiprocess for each node, assume two nodes with 8 GPU each, command is like: @@ -333,4 +333,4 @@ def distributed_launch(options, main_func): # --local_size=8 --rank=0 --world_size=2 # on node2: python [train_script.py] --launch_agent=spawn --master_addr=node1 --master_port=12335 # --local_size=8 --rank=1 --world_size=2 - distributed_launch_spawn(options, main_func) + distributed_launch_spawn(args, main_func) diff --git a/python/pylibwholegraph/pylibwholegraph/torch/gnn_model.py b/python/pylibwholegraph/pylibwholegraph/torch/gnn_model.py index 7f23ee03c..709060c4f 100644 --- a/python/pylibwholegraph/pylibwholegraph/torch/gnn_model.py +++ b/python/pylibwholegraph/pylibwholegraph/torch/gnn_model.py @@ -193,31 +193,31 @@ def __init__( self, graph_structure: GraphStructure, node_embedding: WholeMemoryEmbedding, - options, + args, ): super().__init__() - hidden_feat_dim = options.hiddensize + hidden_feat_dim = args.hiddensize self.graph_structure = graph_structure self.node_embedding = node_embedding - self.num_layer = options.layernum - self.hidden_feat_dim = options.hiddensize - num_head = options.heads if (options.model == "gat") else 1 + self.num_layer = args.layernum + self.hidden_feat_dim = args.hiddensize + num_head = args.heads if (args.model == "gat") else 1 assert hidden_feat_dim % num_head == 0 in_feat_dim = self.node_embedding.shape[1] self.gnn_layers = create_gnn_layers( in_feat_dim, hidden_feat_dim, - options.classnum, - options.layernum, + args.classnum, + args.layernum, num_head, - options.model, + args.model, ) - self.mean_output = True if options.model == "gat" else False - self.add_self_loop = True if options.model == "gat" else False + self.mean_output = True if args.model == "gat" else False + self.add_self_loop = True if args.model == "gat" else False self.gather_fn = WholeMemoryEmbeddingModule(self.node_embedding) - self.dropout = options.dropout - self.max_neighbors = parse_max_neighbors(options.layernum, options.neighbors) - self.max_inference_neighbors = parse_max_neighbors(options.layernum, options.inferencesample) + self.dropout = args.dropout + self.max_neighbors = parse_max_neighbors(args.layernum, args.neighbors) + self.max_inference_neighbors = parse_max_neighbors(args.layernum, args.inferencesample) def forward(self, ids): global framework_name