Skip to content

Commit

Permalink
fix: unbound gpu<->cpu queues to avoid deadlock in mp deterministic mode
Browse files Browse the repository at this point in the history
  • Loading branch information
percevalw committed Nov 14, 2024
1 parent 989b70c commit 7e0ceeb
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 5 deletions.
8 changes: 3 additions & 5 deletions edsnlp/processing/multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -851,16 +851,14 @@ def __init__(self, stream):
# one single shared queue but N bounded queues, one for each producer.
# This is to avoid the situation where a single producer occupies all
# slots, leading to congestions affecting the whole workers pool.
# In practice, using a single shared queue, in particular for the
# worker -> main

# Input queues for each CPU worker
if not self.stream.reader.read_in_worker:
queue = mp.Queue(2 * num_cpu_workers) if share_queues else None
for cpu in self.cpu_worker_names:
name = f"from-main_to-stage-0_of-{cpu}"
if not share_queues:
queue = mp.Queue(2)
queue = mp.SimpleQueue()
self.data_queues[name] = queue
self.input_queue_names.append(name)

Expand All @@ -872,11 +870,11 @@ def __init__(self, stream):
for stage in range(0, len(self.stages) - 1):
# Queue to send data from CPU to GPU
name = f"from-{cpu}_to-stage-{stage}_of-{gpu}"
self.data_queues[name] = mp.Queue(2)
self.data_queues[name] = mp.Queue()

# Answer queue from GPU to CPU
name = f"from-{gpu}_to-stage-{stage + 1}_of-{cpu}"
self.data_queues[name] = mp.Queue(2)
self.data_queues[name] = mp.Queue()

# Final output queue for each CPU worker
name = f"from-{cpu}_to-main"
Expand Down
20 changes: 20 additions & 0 deletions tests/processing/test_backends.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import random
import time
from itertools import chain
from pathlib import Path
Expand Down Expand Up @@ -168,6 +169,25 @@ def test_multiprocessing_gpu_stub_backend(frozen_ml_nlp, backend, deterministic)
list(stream)


def test_multiprocessing_gpu_stub_multi_cpu_deterministic_backend(frozen_ml_nlp):
text1 = "Exemple"
text2 = "Ceci est un autre exemple"
text3 = "Ceci est un très long exemple ! Regardez tous ces mots !"
texts = [text1, text2, text3] * 100
random.Random(42).shuffle(texts)
stream = frozen_ml_nlp.pipe(iter(texts))
stream = stream.set_processing(
batch_size="15 words",
num_gpu_workers=1,
num_cpu_workers=2,
deterministic=True,
# show_progress=True,
# just to test in gpu-less environments
gpu_worker_devices=["cpu"],
)
list(stream)


@pytest.mark.parametrize("wait", [True, False])
def test_multiprocessing_gpu_stub_wait(frozen_ml_nlp, wait):
text1 = "Ceci est un exemple"
Expand Down

0 comments on commit 7e0ceeb

Please sign in to comment.