Skip to content

Commit

Permalink
Add with_timeout() to utils and add tests for utils.py
Browse files Browse the repository at this point in the history
  • Loading branch information
albireox committed Nov 29, 2024
1 parent 7547a8b commit 728c47f
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 5 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# Changelog

## Next version

### 🚀 New

* Add `with_timeout()` to utils.

### ✨ Improved

* Add test coverage for `utils.py`.


## 0.4.2 - November 27, 2024

### ✨ Improved
Expand Down
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ omit = [
"src/lvmopstools/ds9.py",
"src/lvmopstools/kubernetes.py",
"src/lvmopstools/influxdb.py",
"src/lvmopstools/utils.py",
"src/lvmopstools/devices/specs.py",
"src/lvmopstools/devices/nps.py"
]
Expand Down
53 changes: 49 additions & 4 deletions src/lvmopstools/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,21 @@

import asyncio

from typing import Any
from typing import Any, Coroutine, TypeVar

from clu import AMQPClient


__all__ = ["get_amqp_client", "get_exception_data", "stop_event_loop"]
__all__ = [
"get_amqp_client",
"get_exception_data",
"stop_event_loop",
"with_timeout",
"is_notebook",
]


async def get_amqp_client(**kwargs) -> AMQPClient:
async def get_amqp_client(**kwargs) -> AMQPClient: # pragma: no cover
"""Returns a CLU AMQP client."""

amqp_client = AMQPClient(**kwargs)
Expand Down Expand Up @@ -62,7 +68,7 @@ def get_exception_data(exception: Exception | None, traceback_frame: int = 0):
return exception_data


async def stop_event_loop(timeout: float | None = 5):
async def stop_event_loop(timeout: float | None = 5): # pragma: no cover
"""Cancels all running tasks and stops the event loop."""

for task in asyncio.all_tasks():
Expand Down Expand Up @@ -93,3 +99,42 @@ def is_notebook() -> bool:
return False # Other type (?)
except NameError:
return False # Probably standard Python interpreter


T = TypeVar("T", bound=Any)


async def with_timeout(
coro: Coroutine[Any, Any, T],
timeout: float | None,
raise_on_timeout: bool = True,
) -> T | None:
"""Runs a coroutine with a timeout.
Parameters
----------
coro
The coroutine to run.
timeout
The timeout in seconds.
raise_on_timeout
If :obj:`True`, raises a :class:`asyncio.TimeoutError` if the coroutine times
out, otherwise returns :obj:`None`.
Returns
-------
result
The result of the coroutine.
Raises
------
asyncio.TimeoutError
If the coroutine times out.
"""

try:
return await asyncio.wait_for(coro, timeout)
except asyncio.TimeoutError:
if raise_on_timeout:
raise asyncio.TimeoutError(f"Timed out after {timeout} seconds.")
71 changes: 71 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego ([email protected])
# @Date: 2024-11-29
# @Filename: test_utils.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio

import pytest
import pytest_mock

import lvmopstools.utils
from lvmopstools.utils import is_notebook, with_timeout


async def _timeout(delay: float):
await asyncio.sleep(delay)
return True


async def test_with_timeout():
with pytest.raises(asyncio.TimeoutError):
await with_timeout(_timeout(0.5), timeout=0.1)


async def test_with_timeout_no_raise():
result = await with_timeout(_timeout(0.5), timeout=0.1, raise_on_timeout=False)
assert result is None


class GetPythonMocker:
def __init__(self, shell: str):
self.shell = shell
self.__class__.__name__ = shell

def __call__(self):
return self


@pytest.mark.parametrize(
"shell, result",
[
("ZMQInteractiveShell", True),
("TerminalInteractiveShell", False),
("other", False),
],
)
async def test_is_notebook(shell: str, result: bool, mocker: pytest_mock.MockerFixture):
mocker.patch.object(
lvmopstools.utils,
"get_ipython",
return_value=GetPythonMocker(shell),
create=True,
)

assert is_notebook() == result


async def test_is_notebook_name_Error(mocker: pytest_mock.MockerFixture):
mocker.patch.object(
lvmopstools.utils,
"get_ipython",
side_effect=NameError,
create=True,
)

assert not is_notebook()

0 comments on commit 728c47f

Please sign in to comment.