diff --git a/src/bluesky/tests/test_zmq.py b/src/bluesky/tests/test_zmq.py index d0f27f0c2..234f92875 100644 --- a/src/bluesky/tests/test_zmq.py +++ b/src/bluesky/tests/test_zmq.py @@ -23,12 +23,17 @@ def test_proxy_script(): def test_zmq_basic(RE, hw): # COMPONENT 1 # Run a 0MQ proxy on a separate process. - def start_proxy(): + + def start_proxy(start_event): + start_event.set() Proxy(5567, 5568).start() - proxy_proc = multiprocess.Process(target=start_proxy, daemon=True) + proxy_start_event = multiprocess.Event() + proxy_proc = multiprocess.Process(target=start_proxy, args=(proxy_start_event,), daemon=True) proxy_proc.start() - time.sleep(5) # Give this plenty of time to start up. + proxy_start_event.wait(timeout=5) + assert proxy_start_event.is_set() + time.sleep(0.2) # COMPONENT 2 # Run a Publisher and a RunEngine in this main process. @@ -41,21 +46,27 @@ def start_proxy(): # it receives over a Queue to this process, so we can count them for our # test. - def make_and_start_dispatcher(queue): + def make_and_start_dispatcher(queue, start_event): def put_in_queue(name, doc): print("putting ", name, "in queue") + start_event.set() queue.put((name, doc)) d = RemoteDispatcher("127.0.0.1:5568") d.subscribe(put_in_queue) print("REMOTE IS READY TO START") d.loop.call_later(9, d.stop) + start_event.set() d.start() + + dispatcher_start_event = multiprocess.Event() queue = multiprocess.Queue() - dispatcher_proc = multiprocess.Process(target=make_and_start_dispatcher, daemon=True, args=(queue,)) + dispatcher_proc = multiprocess.Process(target=make_and_start_dispatcher, daemon=True, args=(queue, dispatcher_start_event)) dispatcher_proc.start() - time.sleep(5) # As above, give this plenty of time to start. + dispatcher_start_event.wait(timeout=5) + assert dispatcher_start_event.is_set() + time.sleep(0.2) # Generate two documents. The Publisher will send them to the proxy # device over 5567, and the proxy will send them to the @@ -142,12 +153,17 @@ def test_zmq_RD_ports_spec(host): def test_zmq_no_RE_basic(RE): # COMPONENT 1 # Run a 0MQ proxy on a separate process. - def start_proxy(): + + def start_proxy(start_event): + start_event.set() Proxy(5567, 5568).start() - proxy_proc = multiprocess.Process(target=start_proxy, daemon=True) + proxy_start_event = multiprocess.Event() + proxy_proc = multiprocess.Process(target=start_proxy, args=(proxy_start_event,), daemon=True) proxy_proc.start() - time.sleep(5) # Give this plenty of time to start up. + proxy_start_event.wait(timeout=5) + assert proxy_start_event.is_set() + time.sleep(0.2) # COMPONENT 2 # Run a Publisher and a RunEngine in this main process. @@ -159,21 +175,27 @@ def start_proxy(): # it receives over a Queue to this process, so we can count them for our # test. - def make_and_start_dispatcher(queue): + def make_and_start_dispatcher(queue, start_event): def put_in_queue(name, doc): print("putting ", name, "in queue") + start_event.set() queue.put((name, doc)) d = RemoteDispatcher("127.0.0.1:5568") d.subscribe(put_in_queue) print("REMOTE IS READY TO START") d.loop.call_later(9, d.stop) + start_event.set() d.start() + + dispatcher_start_event = multiprocess.Event() queue = multiprocess.Queue() - dispatcher_proc = multiprocess.Process(target=make_and_start_dispatcher, daemon=True, args=(queue,)) + dispatcher_proc = multiprocess.Process(target=make_and_start_dispatcher, daemon=True, args=(queue, dispatcher_start_event)) dispatcher_proc.start() - time.sleep(5) # As above, give this plenty of time to start. + dispatcher_start_event.wait(timeout=5) + assert dispatcher_start_event.is_set() + time.sleep(0.2) # Generate two documents. The Publisher will send them to the proxy # device over 5567, and the proxy will send them to the @@ -212,12 +234,16 @@ def test_zmq_no_RE_newserializer(RE): # COMPONENT 1 # Run a 0MQ proxy on a separate process. - def start_proxy(): + def start_proxy(start_event): + start_event.set() Proxy(5567, 5568).start() - proxy_proc = multiprocess.Process(target=start_proxy, daemon=True) + proxy_start_event = multiprocess.Event() + proxy_proc = multiprocess.Process(target=start_proxy, args=(proxy_start_event,), daemon=True) proxy_proc.start() - time.sleep(5) # Give this plenty of time to start up. + proxy_start_event.wait(timeout=5) + assert proxy_start_event.is_set() + time.sleep(0.2) # COMPONENT 2 # Run a Publisher and a RunEngine in this main process. @@ -227,21 +253,27 @@ def start_proxy(): # Run a RemoteDispatcher on another separate process. Pass the documents # it receives over a Queue to this process, so we can count them for our # test. - def make_and_start_dispatcher(queue): + def make_and_start_dispatcher(queue, start_event): def put_in_queue(name, doc): print("putting ", name, "in queue") + start_event.set() queue.put((name, doc)) - d = RemoteDispatcher("127.0.0.1:5568", deserializer=cloudpickle.loads) + d = RemoteDispatcher("127.0.0.1:5568") d.subscribe(put_in_queue) print("REMOTE IS READY TO START") d.loop.call_later(9, d.stop) + start_event.set() d.start() + + dispatcher_start_event = multiprocess.Event() queue = multiprocess.Queue() - dispatcher_proc = multiprocess.Process(target=make_and_start_dispatcher, daemon=True, args=(queue,)) + dispatcher_proc = multiprocess.Process(target=make_and_start_dispatcher, daemon=True, args=(queue, dispatcher_start_event)) dispatcher_proc.start() - time.sleep(5) # As above, give this plenty of time to start. + dispatcher_start_event.wait(timeout=5) + assert dispatcher_start_event.is_set() + time.sleep(0.2) # Generate two documents. The Publisher will send them to the proxy # device over 5567, and the proxy will send them to the @@ -278,12 +310,16 @@ def local_cb(name, doc): def test_zmq_prefix(RE, hw): # COMPONENT 1 # Run a 0MQ proxy on a separate process. - def start_proxy(): + def start_proxy(start_event): + start_event.set() Proxy(5567, 5568).start() - proxy_proc = multiprocess.Process(target=start_proxy, daemon=True) + proxy_start_event = multiprocess.Event() + proxy_proc = multiprocess.Process(target=start_proxy, args=(proxy_start_event,), daemon=True) proxy_proc.start() - time.sleep(5) # Give this plenty of time to start up. + proxy_start_event.wait(timeout=5) + assert proxy_start_event.is_set() + time.sleep(0.2) # COMPONENT 2 # Run a Publisher and a RunEngine in this main process. @@ -297,21 +333,32 @@ def start_proxy(): # it receives over a Queue to this process, so we can count them for our # test. - def make_and_start_dispatcher(queue): + def make_and_start_dispatcher(queue, start_event): def put_in_queue(name, doc): print("putting ", name, "in queue") + start_event.set() queue.put((name, doc)) d = RemoteDispatcher("127.0.0.1:5568", prefix=b"sb") d.subscribe(put_in_queue) print("REMOTE IS READY TO START") d.loop.call_later(9, d.stop) + start_event.set() d.start() + + dispatcher_start_event = multiprocess.Event() queue = multiprocess.Queue() - dispatcher_proc = multiprocess.Process(target=make_and_start_dispatcher, daemon=True, args=(queue,)) + dispatcher_proc = multiprocess.Process(target=make_and_start_dispatcher, daemon=True, args=(queue, dispatcher_start_event)) dispatcher_proc.start() - time.sleep(5) # As above, give this plenty of time to start. + dispatcher_start_event.wait(timeout=5) + assert dispatcher_start_event.is_set() + time.sleep(0.2) + + # queue = multiprocess.Queue() + # dispatcher_proc = multiprocess.Process(target=make_and_start_dispatcher, daemon=True, args=(queue,)) + # dispatcher_proc.start() + # time.sleep(5) # As above, give this plenty of time to start. # Generate two documents. The Publisher will send them to the proxy # device over 5567, and the proxy will send them to the