You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
When performing an alltoallv message exchange on cpus results in the following error
terminate called after throwing an instance of 'gloo::EnforceNotMet'
what(): [enforce fail at ../third_party/gloo/gloo/transport/tcp/pair.cc:490] op.preamble.length <= op.nbytes. 881392472 vs 881392448
This error is reproducible and a standalone python script is included in this Issue report submission.
This is a very simple script which uses 10 machines/10-processes to reproduce this above mentioned error.
I used 10 machine cluster to reproduce this error repeatedly... however I guess the same may happen on a single machine using 10 processes.
This script performs the following tasks
create the processgroup with 10 ranks
exchanges no of int64's which will be exchanged, this no. is used on the receiving side to allocate buffers.
once the buffers are allocated alltoallv (which is included in the standalone script) is performed to exchange int64's.
The error happens when performing alltoallv using cpus.
Some observations about this error:
This error happens when sending large messages. The same piece of logic works when smaller messages were sent.
This standalone script is created by mimic'ing some of the functionality in the application I am working on at the moment.
The hardcoded no. of int64's is one such instance when this error is deterministically reproducible.
Please use the following standalone script
import numpy as np
import argparse
import torch
import os
import time
from datetime import timedelta
import torch.distributed as dist
from timeit import default_timer as timer
from datetime import timedelta
def alltoall_cpu(rank, world_size, output_tensor_list, input_tensor_list):
input_tensor_list = [tensor.to(torch.device('cpu')) for tensor in input_tensor_list]
for i in range(world_size):
dist.scatter(output_tensor_list[i], input_tensor_list if i == rank else [], src=i)
def alltoallv_cpu(rank, world_size, output_tensor_list, input_tensor_list):
senders = []
for i in range(world_size):
if i == rank:
output_tensor_list[i] = input_tensor_list[i].to(torch.device('cpu'))
else:
sender = dist.isend(input_tensor_list[i].to(torch.device('cpu')), dst=i, tag=i)
senders.append(sender)
for i in range(world_size):
if i != rank:
dist.recv(output_tensor_list[i], src=i, tag=i)
torch.distributed.barrier()
def splitdata_exec(rank, world_size):
int64_counts = np.array([
[0, 110105856, 110093280, 110116272, 110097840, 110111128, 110174059, 110087008, 110125040, 110087400],#0
[110174059, 0, 110158903, 110160317, 110149564, 110170899, 110166538, 110139263, 110163283, 110154040],#1
[110251793, 110254110, 0, 110243087, 110249640, 110270594, 110248594, 110249172, 110277587, 110242484],#2
[110191018, 110171210, 110170046, 0, 110167632, 110165475, 110174676, 110158908, 110171609, 110158631],#3
[110197278, 110198689, 110193780, 110198301, 0, 110208663, 110184046, 110194628, 110200308, 110168337],#4
[110256343, 110244546, 110248884, 110255858, 110236621, 0, 110247954, 110246921, 110247543, 110243309],#5
[110113348, 109915976, 109891208, 109908240, 109916552, 109917544, 0, 109893592, 109930888, 109895912],#6
[110024052, 109995591, 110003242, 110013125, 110002038, 110013278, 110003047, 0, 110015547, 109981915],#7
[109936439, 109948208, 109937391, 109936696, 109930888, 109941325, 109940259, 109917662, 0, 109917002],#8
[110050394, 110029327, 110036926, 110043437, 110021664, 110051453, 110036305, 110039768, 110054324, 0],#9
])
start = timer()
sizes = int64_counts[rank]
print('[Rank: ', rank, '] outgoing int64 counts: ', sizes)
# buffer sizes send/recv
send_counts = list(torch.Tensor(sizes).type(dtype=torch.int64).chunk(world_size))
recv_counts = list(torch.zeros([world_size], dtype=torch.int64).chunk(world_size))
alltoall_cpu(rank, world_size, recv_counts, send_counts)
#allocate buffers
recv_nodes = []
for i in recv_counts:
recv_nodes.append(torch.zeros(i.tolist(), dtype=torch.int64))
#form the outgoing message
send_nodes = []
for i in range(world_size):
# sending
d = np.ones(shape=(sizes[i]), dtype=np.int64)*rank
send_nodes.append(torch.from_numpy(d))
alltoallv_cpu(rank, world_size, recv_nodes, send_nodes)
end = timer()
for i in range(world_size):
data = recv_nodes[i].numpy()
assert np.all(data == np.ones(data.shape, dtype=np.int64)*i)
print('[Rank: ', rank, '] Done with the test...')
def multi_dev_proc_init(params):
rank = int(os.environ["RANK"])
dist.init_process_group("gloo", rank=rank, world_size=params.world_size, timeout=timedelta(seconds=5*60))
splitdata_exec(rank, params.world_size)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Construct graph partitions')
parser.add_argument('--world-size', help='no. of processes to spawn', default=1, type=int, required=True)
params = parser.parse_args()
multi_dev_proc_init(params)
The text was updated successfully, but these errors were encountered:
When performing an alltoallv message exchange on cpus results in the following error
terminate called after throwing an instance of 'gloo::EnforceNotMet'
what(): [enforce fail at ../third_party/gloo/gloo/transport/tcp/pair.cc:490] op.preamble.length <= op.nbytes. 881392472 vs 881392448
This error is reproducible and a standalone python script is included in this Issue report submission.
This is a very simple script which uses 10 machines/10-processes to reproduce this above mentioned error.
I used 10 machine cluster to reproduce this error repeatedly... however I guess the same may happen on a single machine using 10 processes.
This script performs the following tasks
Some observations about this error:
Please use the following standalone script
The text was updated successfully, but these errors were encountered: