Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new timeout for observe_value #650

Merged
merged 11 commits into from
Dec 2, 2024
32 changes: 16 additions & 16 deletions src/ophyd_async/core/_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ async def observe_value(
signal: SignalR[SignalDatatypeT],
timeout: float | None = None,
done_status: Status | None = None,
except_after_time: float | None = None,
done_timeout: float | None = None,
) -> AsyncGenerator[SignalDatatypeT, None]:
"""Subscribe to the value of a signal so it can be iterated from.

Expand All @@ -441,7 +441,7 @@ async def observe_value(
done_status:
If this status is complete, stop observing and make the iterator return.
If it raises an exception then this exception will be raised by the iterator.
except_after_time:
done_timeout:
If given, the maximum time to watch a signal, in seconds. If the loop is still
being watched after this length, raise asyncio.TimeoutError. This should be used
instead of on an 'asyncio.wait_for' timeout
Expand All @@ -450,7 +450,7 @@ async def observe_value(
-----
Due to a rare condition with busy signals, it is not recommended to use this
function with asyncio.timeout, including in an 'asyncio.wait_for' loop. Instead,
this timeout should be given to the except_after_time parameter.
this timeout should be given to the done_timeout parameter.

Example usage::

Expand All @@ -462,7 +462,7 @@ async def observe_value(
signal,
timeout=timeout,
done_status=done_status,
except_after_time=except_after_time,
done_timeout=done_timeout,
):
yield value

Expand All @@ -478,7 +478,7 @@ async def observe_signals_value(
*signals: SignalR[SignalDatatypeT],
timeout: float | None = None,
done_status: Status | None = None,
except_after_time: float | None = None,
done_timeout: float | None = None,
) -> AsyncGenerator[tuple[SignalR[SignalDatatypeT], SignalDatatypeT], None]:
"""Subscribe to the value of a signal so it can be iterated from.

Expand All @@ -493,7 +493,7 @@ async def observe_signals_value(
done_status:
If this status is complete, stop observing and make the iterator return.
If it raises an exception then this exception will be raised by the iterator.
except_after_time:
done_timeout:
If given, the maximum time to watch a signal, in seconds. If the loop is still
being watched after this length, raise asyncio.TimeoutError. This should be used
instead of on an 'asyncio.wait_for' timeout
Expand All @@ -512,10 +512,12 @@ async def observe_signals_value(
asyncio.Queue()
)

async def get_value(timeout: float | None = None):
if timeout is None:
return await q.get()
return await asyncio.wait_for(q.get(), timeout)
if timeout is None:
get_value = q.get
else:

async def get_value():
return await asyncio.wait_for(q.get(), timeout)
olliesilvester marked this conversation as resolved.
Show resolved Hide resolved

cbs: dict[SignalR, Callback] = {}
for signal in signals:
Expand All @@ -528,19 +530,17 @@ def queue_value(value: SignalDatatypeT, signal=signal):

if done_status is not None:
done_status.add_callback(q.put_nowait)
overall_deadline = (
time.monotonic() + except_after_time if except_after_time else None
)
overall_deadline = time.monotonic() + done_timeout if done_timeout else None
try:
while True:
if overall_deadline and time.monotonic() >= overall_deadline:
raise asyncio.TimeoutError(
f"observe_value was still observing signals "
f"{[signal.source for signal in signals]} after "
f"timeout {except_after_time}s"
f"timeout {done_timeout}s"
)

item = await get_value(_get_iteration_timeout(timeout, overall_deadline))
iteration_timeout = _get_iteration_timeout(timeout, overall_deadline)
item = await asyncio.wait_for(q.get(), iteration_timeout)
if done_status and item is done_status:
if exc := done_status.exception():
raise exc
Expand Down
16 changes: 8 additions & 8 deletions tests/core/test_observe.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async def tick():
recv = []

async def watch():
async for val in observe_value(sig, except_after_time=0.2):
async for val in observe_value(sig, done_timeout=0.2):
recv.append(val)

t = asyncio.create_task(tick())
Expand All @@ -85,7 +85,7 @@ async def tick():
recv = []

async def watch():
async for val in observe_value(sig, except_after_time=0.2):
async for val in observe_value(sig, done_timeout=0.2):
time.sleep(0.15)
recv.append(val)

Expand All @@ -105,26 +105,26 @@ async def test_observe_value_times_out_with_no_external_task():

recv = []

async def watch(except_after_time):
async for val in observe_value(sig, except_after_time=except_after_time):
async def watch(done_timeout):
async for val in observe_value(sig, done_timeout=done_timeout):
recv.append(val)
setter(val + 1)

start = time.time()
with pytest.raises(asyncio.TimeoutError):
await watch(except_after_time=0.1)
await watch(done_timeout=0.1)
assert recv
assert time.time() - start == pytest.approx(0.1, abs=0.05)


async def test_observe_value_uses_correct_timeout():
sig, _ = soft_signal_r_and_setter(float)

async def watch(timeout, except_after_time):
async for _ in observe_value(sig, timeout, except_after_time=except_after_time):
async def watch(timeout, done_timeout):
async for _ in observe_value(sig, timeout, done_timeout=done_timeout):
...

start = time.time()
with pytest.raises(asyncio.TimeoutError):
await watch(timeout=0.3, except_after_time=0.15)
await watch(timeout=0.3, done_timeout=0.15)
assert time.time() - start == pytest.approx(0.15, abs=0.05)
2 changes: 1 addition & 1 deletion tests/epics/signal/test_signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ async def test_observe_ticking_signal_with_busy_loop(ioc, protocol):
recv = []

async def watch():
async for val in observe_value(sig, except_after_time=0.4):
async for val in observe_value(sig, done_timeout=0.4):
time.sleep(0.3)
recv.append(val)

Expand Down
Loading