From 7e0ceeb06db93b3ad978e2ea0e37c3a72281ee8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Perceval=20Wajsb=C3=BCrt?= Date: Thu, 14 Nov 2024 12:14:51 +0100 Subject: [PATCH] fix: unbound gpu<->cpu queues to avoid deadlock in mp deterministic mode --- edsnlp/processing/multiprocessing.py | 8 +++----- tests/processing/test_backends.py | 20 ++++++++++++++++++++ 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/edsnlp/processing/multiprocessing.py b/edsnlp/processing/multiprocessing.py index f21b69580..af288c0a2 100644 --- a/edsnlp/processing/multiprocessing.py +++ b/edsnlp/processing/multiprocessing.py @@ -851,8 +851,6 @@ def __init__(self, stream): # one single shared queue but N bounded queues, one for each producer. # This is to avoid the situation where a single producer occupies all # slots, leading to congestions affecting the whole workers pool. - # In practice, using a single shared queue, in particular for the - # worker -> main # Input queues for each CPU worker if not self.stream.reader.read_in_worker: @@ -860,7 +858,7 @@ def __init__(self, stream): for cpu in self.cpu_worker_names: name = f"from-main_to-stage-0_of-{cpu}" if not share_queues: - queue = mp.Queue(2) + queue = mp.SimpleQueue() self.data_queues[name] = queue self.input_queue_names.append(name) @@ -872,11 +870,11 @@ def __init__(self, stream): for stage in range(0, len(self.stages) - 1): # Queue to send data from CPU to GPU name = f"from-{cpu}_to-stage-{stage}_of-{gpu}" - self.data_queues[name] = mp.Queue(2) + self.data_queues[name] = mp.Queue() # Answer queue from GPU to CPU name = f"from-{gpu}_to-stage-{stage + 1}_of-{cpu}" - self.data_queues[name] = mp.Queue(2) + self.data_queues[name] = mp.Queue() # Final output queue for each CPU worker name = f"from-{cpu}_to-main" diff --git a/tests/processing/test_backends.py b/tests/processing/test_backends.py index 22a254dfd..c5724ce8d 100644 --- a/tests/processing/test_backends.py +++ b/tests/processing/test_backends.py @@ -1,3 +1,4 @@ +import random import time from itertools import chain from pathlib import Path @@ -168,6 +169,25 @@ def test_multiprocessing_gpu_stub_backend(frozen_ml_nlp, backend, deterministic) list(stream) +def test_multiprocessing_gpu_stub_multi_cpu_deterministic_backend(frozen_ml_nlp): + text1 = "Exemple" + text2 = "Ceci est un autre exemple" + text3 = "Ceci est un très long exemple ! Regardez tous ces mots !" + texts = [text1, text2, text3] * 100 + random.Random(42).shuffle(texts) + stream = frozen_ml_nlp.pipe(iter(texts)) + stream = stream.set_processing( + batch_size="15 words", + num_gpu_workers=1, + num_cpu_workers=2, + deterministic=True, + # show_progress=True, + # just to test in gpu-less environments + gpu_worker_devices=["cpu"], + ) + list(stream) + + @pytest.mark.parametrize("wait", [True, False]) def test_multiprocessing_gpu_stub_wait(frozen_ml_nlp, wait): text1 = "Ceci est un exemple"