Skip to content

Commit

Permalink
TST: improve execution time for 0MQ tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dmgav committed Apr 20, 2024
1 parent d6ca5be commit 0b95e22
Showing 1 changed file with 72 additions and 25 deletions.
97 changes: 72 additions & 25 deletions src/bluesky/tests/test_zmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,17 @@ def test_proxy_script():
def test_zmq_basic(RE, hw):
# COMPONENT 1
# Run a 0MQ proxy on a separate process.
def start_proxy():

def start_proxy(start_event):
start_event.set()
Proxy(5567, 5568).start()

proxy_proc = multiprocess.Process(target=start_proxy, daemon=True)
proxy_start_event = multiprocess.Event()
proxy_proc = multiprocess.Process(target=start_proxy, args=(proxy_start_event,), daemon=True)
proxy_proc.start()
time.sleep(5) # Give this plenty of time to start up.
proxy_start_event.wait(timeout=5)
assert proxy_start_event.is_set()
time.sleep(0.2)

# COMPONENT 2
# Run a Publisher and a RunEngine in this main process.
Expand All @@ -41,21 +46,27 @@ def start_proxy():
# it receives over a Queue to this process, so we can count them for our
# test.

def make_and_start_dispatcher(queue):
def make_and_start_dispatcher(queue, start_event):
def put_in_queue(name, doc):
print("putting ", name, "in queue")
start_event.set()
queue.put((name, doc))

d = RemoteDispatcher("127.0.0.1:5568")
d.subscribe(put_in_queue)
print("REMOTE IS READY TO START")
d.loop.call_later(9, d.stop)
start_event.set()
d.start()


dispatcher_start_event = multiprocess.Event()
queue = multiprocess.Queue()
dispatcher_proc = multiprocess.Process(target=make_and_start_dispatcher, daemon=True, args=(queue,))
dispatcher_proc = multiprocess.Process(target=make_and_start_dispatcher, daemon=True, args=(queue, dispatcher_start_event))
dispatcher_proc.start()
time.sleep(5) # As above, give this plenty of time to start.
dispatcher_start_event.wait(timeout=5)
assert dispatcher_start_event.is_set()
time.sleep(0.2)

# Generate two documents. The Publisher will send them to the proxy
# device over 5567, and the proxy will send them to the
Expand Down Expand Up @@ -142,12 +153,17 @@ def test_zmq_RD_ports_spec(host):
def test_zmq_no_RE_basic(RE):
# COMPONENT 1
# Run a 0MQ proxy on a separate process.
def start_proxy():

def start_proxy(start_event):
start_event.set()
Proxy(5567, 5568).start()

proxy_proc = multiprocess.Process(target=start_proxy, daemon=True)
proxy_start_event = multiprocess.Event()
proxy_proc = multiprocess.Process(target=start_proxy, args=(proxy_start_event,), daemon=True)
proxy_proc.start()
time.sleep(5) # Give this plenty of time to start up.
proxy_start_event.wait(timeout=5)
assert proxy_start_event.is_set()
time.sleep(0.2)

# COMPONENT 2
# Run a Publisher and a RunEngine in this main process.
Expand All @@ -159,21 +175,27 @@ def start_proxy():
# it receives over a Queue to this process, so we can count them for our
# test.

def make_and_start_dispatcher(queue):
def make_and_start_dispatcher(queue, start_event):
def put_in_queue(name, doc):
print("putting ", name, "in queue")
start_event.set()
queue.put((name, doc))

d = RemoteDispatcher("127.0.0.1:5568")
d.subscribe(put_in_queue)
print("REMOTE IS READY TO START")
d.loop.call_later(9, d.stop)
start_event.set()
d.start()


dispatcher_start_event = multiprocess.Event()
queue = multiprocess.Queue()
dispatcher_proc = multiprocess.Process(target=make_and_start_dispatcher, daemon=True, args=(queue,))
dispatcher_proc = multiprocess.Process(target=make_and_start_dispatcher, daemon=True, args=(queue, dispatcher_start_event))
dispatcher_proc.start()
time.sleep(5) # As above, give this plenty of time to start.
dispatcher_start_event.wait(timeout=5)
assert dispatcher_start_event.is_set()
time.sleep(0.2)

# Generate two documents. The Publisher will send them to the proxy
# device over 5567, and the proxy will send them to the
Expand Down Expand Up @@ -212,12 +234,16 @@ def test_zmq_no_RE_newserializer(RE):

# COMPONENT 1
# Run a 0MQ proxy on a separate process.
def start_proxy():
def start_proxy(start_event):
start_event.set()
Proxy(5567, 5568).start()

proxy_proc = multiprocess.Process(target=start_proxy, daemon=True)
proxy_start_event = multiprocess.Event()
proxy_proc = multiprocess.Process(target=start_proxy, args=(proxy_start_event,), daemon=True)
proxy_proc.start()
time.sleep(5) # Give this plenty of time to start up.
proxy_start_event.wait(timeout=5)
assert proxy_start_event.is_set()
time.sleep(0.2)

# COMPONENT 2
# Run a Publisher and a RunEngine in this main process.
Expand All @@ -227,21 +253,27 @@ def start_proxy():
# Run a RemoteDispatcher on another separate process. Pass the documents
# it receives over a Queue to this process, so we can count them for our
# test.
def make_and_start_dispatcher(queue):
def make_and_start_dispatcher(queue, start_event):
def put_in_queue(name, doc):
print("putting ", name, "in queue")
start_event.set()
queue.put((name, doc))

d = RemoteDispatcher("127.0.0.1:5568", deserializer=cloudpickle.loads)
d = RemoteDispatcher("127.0.0.1:5568")
d.subscribe(put_in_queue)
print("REMOTE IS READY TO START")
d.loop.call_later(9, d.stop)
start_event.set()
d.start()


dispatcher_start_event = multiprocess.Event()
queue = multiprocess.Queue()
dispatcher_proc = multiprocess.Process(target=make_and_start_dispatcher, daemon=True, args=(queue,))
dispatcher_proc = multiprocess.Process(target=make_and_start_dispatcher, daemon=True, args=(queue, dispatcher_start_event))
dispatcher_proc.start()
time.sleep(5) # As above, give this plenty of time to start.
dispatcher_start_event.wait(timeout=5)
assert dispatcher_start_event.is_set()
time.sleep(0.2)

# Generate two documents. The Publisher will send them to the proxy
# device over 5567, and the proxy will send them to the
Expand Down Expand Up @@ -278,12 +310,16 @@ def local_cb(name, doc):
def test_zmq_prefix(RE, hw):
# COMPONENT 1
# Run a 0MQ proxy on a separate process.
def start_proxy():
def start_proxy(start_event):
start_event.set()
Proxy(5567, 5568).start()

proxy_proc = multiprocess.Process(target=start_proxy, daemon=True)
proxy_start_event = multiprocess.Event()
proxy_proc = multiprocess.Process(target=start_proxy, args=(proxy_start_event,), daemon=True)
proxy_proc.start()
time.sleep(5) # Give this plenty of time to start up.
proxy_start_event.wait(timeout=5)
assert proxy_start_event.is_set()
time.sleep(0.2)

# COMPONENT 2
# Run a Publisher and a RunEngine in this main process.
Expand All @@ -297,21 +333,32 @@ def start_proxy():
# it receives over a Queue to this process, so we can count them for our
# test.

def make_and_start_dispatcher(queue):
def make_and_start_dispatcher(queue, start_event):
def put_in_queue(name, doc):
print("putting ", name, "in queue")
start_event.set()
queue.put((name, doc))

d = RemoteDispatcher("127.0.0.1:5568", prefix=b"sb")
d.subscribe(put_in_queue)
print("REMOTE IS READY TO START")
d.loop.call_later(9, d.stop)
start_event.set()
d.start()


dispatcher_start_event = multiprocess.Event()
queue = multiprocess.Queue()
dispatcher_proc = multiprocess.Process(target=make_and_start_dispatcher, daemon=True, args=(queue,))
dispatcher_proc = multiprocess.Process(target=make_and_start_dispatcher, daemon=True, args=(queue, dispatcher_start_event))
dispatcher_proc.start()
time.sleep(5) # As above, give this plenty of time to start.
dispatcher_start_event.wait(timeout=5)
assert dispatcher_start_event.is_set()
time.sleep(0.2)

# queue = multiprocess.Queue()
# dispatcher_proc = multiprocess.Process(target=make_and_start_dispatcher, daemon=True, args=(queue,))
# dispatcher_proc.start()
# time.sleep(5) # As above, give this plenty of time to start.

# Generate two documents. The Publisher will send them to the proxy
# device over 5567, and the proxy will send them to the
Expand Down

0 comments on commit 0b95e22

Please sign in to comment.