Skip to content

Commit

Permalink
Fixed papers100M, partition bugs (#159)
Browse files Browse the repository at this point in the history
  • Loading branch information
CongjieHe authored Jun 15, 2023
1 parent 3454393 commit 7765b37
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 35 deletions.
2 changes: 1 addition & 1 deletion benchmarks/ogbn-papers100M/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# OGBn-Papers100M

This dataset is large, so we need to preprocess the dataset. We assume that you have downloaded the raw dataset from [OGB](https://snap.stanford.edu/ogb/data/nodeproppred/) and decompress it to `\data\papers` (also the files in `split/time/`). Then `preprocess.py` can help you transform the data into the appropriate format.
This dataset is large, so we need to preprocess the dataset. We assume that you have downloaded the raw dataset from [OGB](https://snap.stanford.edu/ogb/data/nodeproppred/) and decompress it to `/data` (also the files in `split/time/`). Then `preprocess.py` can help you transform the data into the appropriate format.

Also, Quiver uses large shared memory to hold the dataset. If your program is killed silently or has bus error, make sure your physical memory can hold the dataset. You should make sure your shared memory limit is set properly, and we recommend that it is greater than 128G:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def run(rank, world_size, quiver_sampler, quiver_feature, y, train_idx,


if __name__ == '__main__':
root = "/data/papers/"
root = "/data"
world_size = torch.cuda.device_count()
dataset = Paper100MDataset(root, 0.15 * min(world_size, 4))

Expand Down
70 changes: 37 additions & 33 deletions benchmarks/ogbn-papers100M/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,19 @@
import quiver
from quiver.partition import partition_without_replication, select_nodes

# data_root = "/data/papers/ogbn_papers100M/raw/"
# label = np.load(osp.join(data_root, "node-label.npz"))
# data = np.load(osp.join(data_root, "data.npz"))
# path = Path('/data/papers/ogbn_papers100M/feat')
# path.mkdir(parents=True)
# path = Path('/data/papers/ogbn_papers100M/csr')
# path.mkdir(parents=True)
# path = Path('/data/papers/ogbn_papers100M/label')
# path.mkdir(parents=True)
# path = Path('/data/papers/ogbn_papers100M/index')
# path.mkdir(parents=True)
root = '/data'

data_root = f"{root}/ogbn_papers100M/raw/"
label = np.load(osp.join(data_root, "node-label.npz"))
data = np.load(osp.join(data_root, "data.npz"))
path = Path(f'{root}/ogbn_papers100M/feat')
path.mkdir(parents=True)
path = Path(f'{root}/ogbn_papers100M/csr')
path.mkdir(parents=True)
path = Path(f'{root}/ogbn_papers100M/label')
path.mkdir(parents=True)
path = Path(f'{root}/ogbn_papers100M/index')
path.mkdir(parents=True)

SCALE = 1
GPU_CACHE_GB = 4
Expand Down Expand Up @@ -58,8 +60,8 @@ def process_topo():

print("LOG>>> Begin Save")

torch.save(indptr, "/data/papers/ogbn_papers100M/csr/indptr.pt")
torch.save(indices, "/data/papers/ogbn_papers100M/csr/indices.pt")
torch.save(indptr, f"{root}/ogbn_papers100M/csr/indptr.pt")
torch.save(indices, f"{root}/ogbn_papers100M/csr/indices.pt")

csr_mat = get_csr_from_coo(edge_index, True)
indptr_reverse = csr_mat.indptr
Expand All @@ -68,9 +70,9 @@ def process_topo():
indices_reverse = torch.from_numpy(indices_reverse).type(torch.long)

torch.save(indptr_reverse,
"/data/papers/ogbn_papers100M/csr/indptr_reverse.pt")
f"{root}/ogbn_papers100M/csr/indptr_reverse.pt")
torch.save(indices_reverse,
"/data/papers/ogbn_papers100M/csr/indices_reverse.pt")
f"{root}/ogbn_papers100M/csr/indices_reverse.pt")


def process_feature():
Expand All @@ -80,41 +82,40 @@ def process_feature():
nid_feat = data["node_feat"]
tensor = torch.from_numpy(nid_feat).type(torch.float)
print("LOG>>> Begin Process")
torch.save(tensor, "/data/papers/ogbn_papers100M/feat/feature.pt")
torch.save(tensor, f"{root}/ogbn_papers100M/feat/feature.pt")


def process_label():
print("LOG>>> Load Finished")
node_label = label["node_label"]
tensor = torch.from_numpy(node_label).type(torch.long)
torch.save(tensor, "/data/papers/ogbn_papers100M/label/label.pt")
torch.save(tensor, f"{root}/ogbn_papers100M/label/label.pt")


def sort_feature():
NUM_ELEMENT = 111059956
indptr = torch.load("/data/papers/ogbn_papers100M/csr/indptr_reverse.pt")
feature = torch.load("/data/papers/ogbn_papers100M/feat/feature.pt")
indptr = torch.load(f"{root}/ogbn_papers100M/csr/indptr_reverse.pt")
feature = torch.load(f"{root}/ogbn_papers100M/feat/feature.pt")
prev = torch.LongTensor(indptr[:-1])
sub = torch.LongTensor(indptr[1:])
deg = sub - prev
sorted_deg, prev_order = torch.sort(deg, descending=True)
total_num = NUM_ELEMENT
total_range = torch.arange(total_num, dtype=torch.long)
feature = feature[prev_order]
torch.save(feature, "/data/papers/ogbn_papers100M/feat/sort_feature.pt")
torch.save(prev_order, "/data/papers/ogbn_papers100M/feat/prev_order.pt")
torch.save(feature, f"{root}/ogbn_papers100M/feat/sort_feature.pt")
torch.save(prev_order, f"{root}/ogbn_papers100M/feat/prev_order.pt")


def process_index():
data = genfromtxt('/data/papers/ogbn_papers100M/split/time/train.csv',
data = genfromtxt(f"{root}/ogbn_papers100M/split/time/train.csv",
delimiter='\n')
data = data.astype(np.long)
data = data.astype(np.int_)
data = torch.from_numpy(data)
torch.save(data, "/data/papers/ogbn_papers100M/index/train_idx.pt")
torch.save(data, f"{root}/ogbn_papers100M/index/train_idx.pt")


def preprocess(host, host_size, p2p_group, p2p_size):
root = '/data/papers'
data_dir = osp.join(root, 'ogbn_papers100M')
indptr_root = osp.join(data_dir, 'csr', 'indptr.pt')
indices_root = osp.join(data_dir, 'csr', 'indices.pt')
Expand All @@ -132,6 +133,9 @@ def preprocess(host, host_size, p2p_group, p2p_size):
end = min(idx_len, beg + (idx_len // global_gpus))
train_idxs.append(train_idx[beg:end])
beg = end

path = Path(f'{root}/ogbn_papers100M/{host_size}h')
path.mkdir(parents=True)

csr_topo = quiver.CSRTopo(indptr=indptr, indices=indices)
quiver_sampler = quiver.pyg.GraphSageSampler(csr_topo, [25, 10],
Expand Down Expand Up @@ -164,7 +168,7 @@ def preprocess(host, host_size, p2p_group, p2p_size):
print(f'prob {t1 - t0}')
for h in range(host_size):
global2host[res[h]] = h
torch.save(global2host.cpu(), f'/data/papers/{host_size}h/global2host.pt')
torch.save(global2host.cpu(), f"{root}/ogbn_papers100M/{host_size}h/global2host.pt")
t2 = time.time()
print(f'g2h {t2 - t1}')

Expand All @@ -183,7 +187,7 @@ def preprocess(host, host_size, p2p_group, p2p_size):
nz.size(0), cpu_size + gpu_size * p2p_size) - choice.size(0)
replicate = local_order[:local_replicate_size]
torch.save(replicate.cpu(),
f'/data/papers/{host_size}h/replicate{host}.pt')
f'{root}/ogbn_papers100M/{host_size}h/replicate{host}.pt')
t3 = time.time()
print(f'replicate {t3 - t2}')
local_all = torch.cat([choice, replicate])
Expand All @@ -199,15 +203,15 @@ def preprocess(host, host_size, p2p_group, p2p_size):
local_gpu_ids = [local_all[r] for r in local_res]
local_orders = torch.cat((local_gpu_orders, local_cpu_order))
torch.save(local_orders.cpu(),
f'/data/papers/{host_size}h/local_order{host}.pt')
f'{root}/ogbn_papers100M/{host_size}h/local_order{host}.pt')
t4 = time.time()
print(f'order {t4 - t3}')


# process_topo()
# process_feature()
# process_label()
# sort_feature()
# process_index()
process_topo()
process_feature()
process_label()
sort_feature()
process_index()

preprocess(0, 3, 1, 2)
1 change: 1 addition & 0 deletions srcs/python/quiver/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
]

QUIVER_MAGIC_NUMBER = 256
CHUNK_NUM = 32


def partition_without_replication(device, probs, ids):
Expand Down

0 comments on commit 7765b37

Please sign in to comment.