Skip to content

Commit

Permalink
added test
Browse files Browse the repository at this point in the history
  • Loading branch information
Relm-Arrowny committed Jan 3, 2025
1 parent 8da4bf1 commit e51299a
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 5 deletions.
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ filterwarnings = "error"
# Doctest python code in docs, python code in src docstrings, test functions in tests
testpaths = "docs src tests"


asyncio_mode = "auto"


[tool.coverage.run]
data_file = "/tmp/i10_bluesky.coverage"

Expand Down
4 changes: 2 additions & 2 deletions src/i10_bluesky/plans/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .alignments import PeakPosition, fast_scan_and_move_cen, scan_and_move_cen
from .alignments import PeakPosition, fast_scan_and_move_cen, step_scan_and_move_cen

__all__ = ["fast_scan_and_move_cen", "scan_and_move_cen", "PeakPosition"]
__all__ = ["fast_scan_and_move_cen", "step_scan_and_move_cen", "PeakPosition"]
13 changes: 10 additions & 3 deletions src/i10_bluesky/plans/utils/alignments.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,21 @@ class PeakPosition(tuple, Enum):
def scan_and_move_cen(funcs) -> Callable:
def inner(**kwargs):
ps = PeakStats(
f"{kwargs['motor'].name}",
f"{kwargs['det'].name}",
f"{kwargs['motor'].name}{kwargs['motor_name']}",
f"{kwargs['det'].name}{kwargs['det_name']}",
calc_derivative_and_stats=True,
)
yield from bpp.subs_wrapper(
funcs(**kwargs),
ps,
)
print(ps)

peak_position = get_stat_loc(ps, kwargs["loc"])
if (
kwargs["start"] >= peak_position >= kwargs["end"]
or kwargs["start"] <= peak_position <= kwargs["end"]
):
print(peak_position)
yield from abs_set(kwargs["motor"], peak_position, wait=True)
else:
raise ValueError(f"New position ({peak_position}) is outside scan range.")
Expand All @@ -53,6 +54,8 @@ def step_scan_and_move_cen(
start: float,
end: float,
num: int,
motor_name: str = "-user_readback",
det_name: str = "-value",
loc: PeakPosition = PeakPosition.CEN,
) -> MsgGenerator:
return scan([det], motor, start, end, num=num)
Expand All @@ -64,6 +67,8 @@ def fast_scan_and_move_cen(
motor: Motor,
start: float,
end: float,
motor_name: str = "-user_readback",
det_name: str = "-value",
motor_speed: float | None = None,
loc: PeakPosition = PeakPosition.CEN,
) -> MsgGenerator:
Expand All @@ -72,5 +77,7 @@ def fast_scan_and_move_cen(

def get_stat_loc(ps: PeakStats, loc: PeakPosition) -> float:
stat = getattr(ps, loc.value[0])
if not stat:
raise ValueError("Fitting failed, check devices name are correct.")
stat_pos = getattr(stat, loc.value[1])
return stat_pos if isinstance(stat_pos, float) else stat_pos[0]
84 changes: 84 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,19 @@

import pytest
from bluesky.run_engine import RunEngine
from ophyd_async.core import (
DeviceCollector,
FilenameProvider,
StaticFilenameProvider,
StaticPathProvider,
)
from ophyd_async.testing import set_mock_value
from p99_bluesky.devices.stages import ThreeAxisStage
from p99_bluesky.sim.sim_stages import SimThreeAxisStage
from super_state_machine.errors import TransitionError

from sim_devices import sim_detector

RECORD = str(Path(__file__).parent / "panda" / "db" / "panda.db")
INCOMPLETE_BLOCK_RECORD = str(
Path(__file__).parent / "panda" / "db" / "incomplete_block_panda.db"
Expand Down Expand Up @@ -33,3 +44,76 @@ def clean_event_loop():

request.addfinalizer(clean_event_loop)
return RE


A_BIT = 0.5


@pytest.fixture
def static_filename_provider():
return StaticFilenameProvider("ophyd_async_tests")


@pytest.fixture
def static_path_provider_factory(tmp_path: Path):
def create_static_dir_provider_given_fp(fp: FilenameProvider):
return StaticPathProvider(fp, tmp_path)

return create_static_dir_provider_given_fp


@pytest.fixture
def static_path_provider(
static_path_provider_factory,
static_filename_provider: FilenameProvider,
):
return static_path_provider_factory(static_filename_provider)


@pytest.fixture
async def sim_motor():
async with DeviceCollector(mock=True):
sim_motor = ThreeAxisStage("BLxxI-MO-TABLE-01:X", name="sim_motor")
set_mock_value(sim_motor.x.velocity, 2.78)
set_mock_value(sim_motor.x.high_limit_travel, 8.168)
set_mock_value(sim_motor.x.low_limit_travel, -8.888)
set_mock_value(sim_motor.x.user_readback, 1)
set_mock_value(sim_motor.x.motor_egu, "mm")
set_mock_value(sim_motor.x.motor_done_move, True)
set_mock_value(sim_motor.x.max_velocity, 10)

set_mock_value(sim_motor.y.motor_egu, "mm")
set_mock_value(sim_motor.y.high_limit_travel, 5.168)
set_mock_value(sim_motor.y.low_limit_travel, -5.888)
set_mock_value(sim_motor.y.user_readback, 0)
set_mock_value(sim_motor.y.motor_egu, "mm")
set_mock_value(sim_motor.y.velocity, 2.88)
set_mock_value(sim_motor.y.motor_done_move, True)
set_mock_value(sim_motor.y.max_velocity, 10)

set_mock_value(sim_motor.z.motor_egu, "mm")
set_mock_value(sim_motor.z.high_limit_travel, 5.168)
set_mock_value(sim_motor.z.low_limit_travel, -5.888)
set_mock_value(sim_motor.z.user_readback, 0)
set_mock_value(sim_motor.z.motor_egu, "mm")
set_mock_value(sim_motor.z.velocity, 2.88)
set_mock_value(sim_motor.z.motor_done_move, True)
set_mock_value(sim_motor.z.max_velocity, 10)

yield sim_motor


@pytest.fixture
async def sim_motor_step():
async with DeviceCollector(mock=True):
sim_motor_step = SimThreeAxisStage(name="sim_motor_step", instant=True)

yield sim_motor_step


@pytest.fixture
async def fake_detector():
async with DeviceCollector(mock=True):
fake_detector = sim_detector(prefix="fake_Pv", name="fake_detector")
set_mock_value(fake_detector.value, 0)
yield fake_detector
Empty file added tests/plans/__init__.py
Empty file.
Empty file added tests/plans/utils/__init__.py
Empty file.
93 changes: 93 additions & 0 deletions tests/plans/utils/test_alignment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from collections import defaultdict
from unittest.mock import Mock

import numpy as np
import pytest
from bluesky.run_engine import RunEngine
from ophyd_async.testing import callback_on_mock_put, set_mock_value
from p99_bluesky.devices.stages import ThreeAxisStage

from i10_bluesky.plans.utils import (
PeakPosition,
step_scan_and_move_cen,
)
from sim_devices import sim_detector

docs = defaultdict(list)


def capture_emitted(name, doc):
docs[name].append(doc)


def gaussian(x, mu, sig):
return (
1.0
/ (np.sqrt(2.0 * np.pi) * sig)
* np.exp(-np.power((x - mu) / sig, 2.0) / 2.0)
)


@pytest.mark.parametrize(
"test_input, expected_centre",
[
(
[5, -5, 21, 0.1],
-1,
),
(
[50, -55, 151, 1],
21.1,
),
(
[-1, -2.51241, 31, 0.05],
-1.2,
),
],
)
async def test_scan_and_move_cen_success_peak(
RE: RunEngine,
sim_motor_step: ThreeAxisStage,
fake_detector: sim_detector,
test_input,
expected_centre,
):
start = test_input[0]
end = test_input[1]
num = test_input[2]
peak_width = test_input[3]
cen = expected_centre
# Generate gaussian
x_data = np.linspace(start, end, num, endpoint=True)
y_data = gaussian(x_data, cen, peak_width)

rbv_mocks = Mock()
y_data = np.append(y_data, [0] * 2)
y_data = np.array(y_data, dtype=np.float64)
rbv_mocks.get.side_effect = y_data
callback_on_mock_put(
sim_motor_step.x.user_setpoint,
lambda *_, **__: set_mock_value(fake_detector.value, value=rbv_mocks.get()),
)

RE(
step_scan_and_move_cen(
det=fake_detector,
motor=sim_motor_step.x,
start=start,
end=end,
num=num,
motor_name="-user_readback",
det_name="-value",
loc=PeakPosition.CEN,
),
capture_emitted,
)
y_data1 = np.array([])
x_data1 = np.array([])
for i in docs["event"]:
y_data1 = np.append(y_data1, i["data"]["fake_detector-value"])
x_data1 = np.append(x_data1, i["data"]["sim_motor_step-x-user_readback"])
assert await sim_motor_step.x.user_setpoint.get_value() == pytest.approx(
expected_centre, 0.05
)
4 changes: 4 additions & 0 deletions tests/sim_devices/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .sim_detectors import sim_detector
from .soft_motor import SoftThreeAxisStage

__all__ = ["SoftThreeAxisStage", "sim_detector"]
12 changes: 12 additions & 0 deletions tests/sim_devices/sim_detectors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from ophyd_async.core import (
StandardReadable,
)
from ophyd_async.core import StandardReadableFormat as Format
from ophyd_async.epics.core import epics_signal_rw


class sim_detector(StandardReadable):
def __init__(self, prefix: str, name: str):
with self.add_children_as_readables(Format.HINTED_SIGNAL):
self.value = epics_signal_rw(float, read_pv=prefix)
super().__init__(name=name)

0 comments on commit e51299a

Please sign in to comment.