-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
169 additions
and
27 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Improve the warmup_hexapod.py to recover from the failure and change the step size in the runtime. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,10 +23,9 @@ | |
import asyncio | ||
from collections.abc import Iterable | ||
|
||
import numpy as np | ||
import yaml | ||
from lsst.ts import salobj | ||
from lsst.ts.idl.enums.MTHexapod import SalIndex | ||
from lsst.ts.idl.enums.MTHexapod import EnabledSubstate, SalIndex | ||
from lsst.ts.observatory.control.maintel.mtcs import MTCS, MTCSUsages | ||
|
||
|
||
|
@@ -213,31 +212,133 @@ async def warm_up(self): | |
for step, sleep_time in zip(self.config.step_size, self.config.sleep_time): | ||
await self.single_loop(step, sleep_time) | ||
|
||
async def move_stepwise(self, start, stop, step, sleep_time): | ||
"""Moves the hexapod from `start` to `stop` in `steps`. | ||
async def move_stepwise( | ||
self, start: float, stop: float, step_initial: float, sleep_time: float | ||
): | ||
"""Moves the hexapod from `start` to `stop` that begins from | ||
`step_initial`. The step size is updated based on the result of the | ||
movement. | ||
Parameters | ||
---------- | ||
start : float | ||
start : `float` | ||
Initial position. | ||
stop : float | ||
stop : `float` | ||
Final position. | ||
step : float | ||
Step size. | ||
sleep_time : float | ||
step_initial : `float` | ||
Initial step size. | ||
sleep_time : `float` | ||
Time between movements. | ||
""" | ||
axes = "xyzuvw" | ||
|
||
for new_pos in np.arange(start, stop, step): | ||
pos = await self.hexapod.tel_application.aget() | ||
position_current = start | ||
position_next = start | ||
step_current = step_initial | ||
while position_current != stop: | ||
# Make sure the next position is not beyond the stop | ||
position_next = position_current + step_current | ||
if step_initial >= 0: | ||
if position_next >= stop: | ||
position_next = stop | ||
else: | ||
if position_next <= stop: | ||
position_next = stop | ||
|
||
# Do the movement | ||
all_positions = await self._get_position() | ||
all_positions[self.config.axis] = position_next | ||
|
||
is_successful = await self._move_hexapod(**all_positions) | ||
|
||
# Update the step size based on the result | ||
step_current_update = ( | ||
(step_current + step_initial) if is_successful else (step_current / 2) | ||
) | ||
step_current = ( | ||
step_current_update | ||
if (abs(step_current_update) >= abs(step_initial)) | ||
else step_initial | ||
) | ||
|
||
all_positions = {axes[i]: pos.position[i] for i in range(len(axes))} | ||
all_positions[self.config.axis] = new_pos | ||
# Update the current position | ||
if is_successful: | ||
position_current = position_next | ||
else: | ||
position_fail = await self._get_position() | ||
position_current = position_fail[self.config.axis] | ||
|
||
await self.move_hexapod(**all_positions) | ||
await asyncio.sleep(sleep_time) | ||
|
||
async def _get_position(self) -> dict: | ||
"""Get the current position of the hexapod. | ||
Returns | ||
------- | ||
`dict` | ||
Current position of the hexapod. | ||
""" | ||
|
||
pos = await self.hexapod.tel_application.aget() | ||
|
||
axes = "xyzuvw" | ||
return {axes[i]: pos.position[i] for i in range(len(axes))} | ||
|
||
async def _move_hexapod( | ||
self, | ||
x: float, | ||
y: float, | ||
z: float, | ||
u: float, | ||
v: float, | ||
w: float = 0.0, | ||
) -> bool: | ||
"""Move the hexapod to the given position. | ||
Parameters | ||
---------- | ||
x : `float` | ||
Hexapod-x position (microns). | ||
y : `float` | ||
Hexapod-y position (microns). | ||
z : `float` | ||
Hexapod-z position (microns). | ||
u : `float` | ||
Hexapod-u angle (degrees). | ||
v : `float` | ||
Hexapod-v angle (degrees). | ||
w : `float`, optional | ||
Hexapod-w angle (degrees). Default 0. | ||
Returns | ||
------- | ||
`bool` | ||
`True` if the movement is successful, `False` otherwise. | ||
""" | ||
|
||
try: | ||
await self.move_hexapod(x, y, z, u, v, w=w) | ||
|
||
return True | ||
except Exception: | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong.
b1quint
Contributor
|
||
# If the hexapod is moving, stop it | ||
controller_enabled_state = ( | ||
self.hexapod.evt_controllerState.get().enabledSubstate | ||
) | ||
if controller_enabled_state == EnabledSubstate.MOVING_POINT_TO_POINT: | ||
self.log.info("Stop the hexapod CSC.") | ||
await self.hexapod.cmd_stop.set_start() | ||
|
||
# If the hexapod is in fault, recover it | ||
state = self.hexapod.evt_summaryState.get() | ||
if state == salobj.State.FAULT: | ||
self.log.info("Recover the hexapod CSC from the Fault.") | ||
await salobj.set_summary_state(self.hexapod, salobj.State.ENABLED) | ||
|
||
# Wait for a few seconds | ||
await asyncio.sleep(5.0) | ||
|
||
return False | ||
|
||
async def single_loop(self, step, sleep_time): | ||
""" | ||
Do a full loop moving from the current position to the positive limit | ||
|
@@ -252,26 +353,66 @@ async def single_loop(self, step, sleep_time): | |
Time between movements. | ||
""" | ||
self.log.info(f"Start loop with {step} step and {sleep_time} sleep time.") | ||
idx = "xyzuvw".index(self.config.axis) | ||
pos = await self.hexapod.tel_application.aget() | ||
current_pos = pos.position[idx] # current position | ||
|
||
self.log.debug( | ||
f"Move from current position to maximum position in steps of {step}" | ||
) | ||
# Move to the origin first | ||
await self._move_to_origin() | ||
|
||
# Positive direction | ||
self.log.debug("Move from 0 to maximum position") | ||
await self.move_stepwise( | ||
current_pos, | ||
0.0, | ||
self.config.max_position, | ||
step, | ||
sleep_time, | ||
) | ||
|
||
self.log.debug( | ||
f"Move from maximum position to minimum position in steps of {step}" | ||
self.log.debug("Move from maximum position back to 0") | ||
await self.move_stepwise( | ||
self.config.max_position, | ||
0.0, | ||
-step, | ||
sleep_time, | ||
) | ||
|
||
# Negative direction | ||
self.log.debug("Move from 0 to minimum position") | ||
await self.move_stepwise( | ||
self.config.max_position, -self.config.max_position, -step, sleep_time | ||
0.0, | ||
-self.config.max_position, | ||
-step, | ||
sleep_time, | ||
) | ||
|
||
self.log.debug(f"Move from minimum position back to 0 in steps of {step}") | ||
await self.move_stepwise(-self.config.max_position, 1, step, sleep_time) | ||
self.log.debug("Move from minimum position back to 0") | ||
await self.move_stepwise( | ||
-self.config.max_position, | ||
0.0, | ||
step, | ||
sleep_time, | ||
) | ||
|
||
async def _move_to_origin(self, max_count: int = 5) -> None: | ||
"""Move to the origin. | ||
Parameters | ||
---------- | ||
max_count : `int`, optional | ||
Maximum count to try. (the default is 5) | ||
Raises | ||
------ | ||
`RuntimeError` | ||
When the hexapod fails to move to the origin. | ||
""" | ||
|
||
count = 0 | ||
while count < max_count: | ||
is_done = await self._move_hexapod(0.0, 0.0, 0.0, 0.0, 0.0, w=0.0) | ||
if is_done: | ||
return | ||
else: | ||
count += 1 | ||
|
||
raise RuntimeError( | ||
f"Failed to move the hexapod to the origin. with {max_count} tries" | ||
) |
We need to forward the error message so users can have more information about what is going on.