Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the warmup_hexapod.py. #172

Merged
merged 8 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/news/DM-48447.perf.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve the warmup_hexapod.py to recover from the failure and change the step size in the runtime.
236 changes: 207 additions & 29 deletions python/lsst/ts/externalscripts/maintel/warmup_hexapod.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@
import asyncio
from collections.abc import Iterable

import numpy as np
import yaml
from lsst.ts import salobj
from lsst.ts.idl.enums.MTHexapod import SalIndex
from lsst.ts.idl.enums.MTHexapod import EnabledSubstate, SalIndex
from lsst.ts.observatory.control.maintel.mtcs import MTCS, MTCSUsages


Expand Down Expand Up @@ -98,7 +97,7 @@ def get_schema(cls):
anyOf:
- type: number
- type: array
default: 250
default: 100
sleep_time:
description: >-
The sleep time in seconds between movements. The number of
Expand Down Expand Up @@ -213,31 +212,167 @@ async def warm_up(self):
for step, sleep_time in zip(self.config.step_size, self.config.sleep_time):
await self.single_loop(step, sleep_time)

async def move_stepwise(self, start, stop, step, sleep_time):
"""Moves the hexapod from `start` to `stop` in `steps`.
async def move_stepwise(
self,
start: float,
stop: float,
step_initial: float,
sleep_time: float,
max_count: int = 5,
):
"""Moves the hexapod from `start` to `stop` that begins from
`step_initial`. The step size is updated based on the result of the
movement.

Parameters
----------
start : float
start : `float`
Initial position.
stop : float
stop : `float`
Final position.
step : float
Step size.
sleep_time : float
step_initial : `float`
Initial step size.
sleep_time : `float`
Time between movements.
max_count : `int`, optional
Maximum count to try. (the default is 5)

Raises
------
`RuntimeError`
When the hexapod fails to move in a single stage.
"""
axes = "xyzuvw"

for new_pos in np.arange(start, stop, step):
pos = await self.hexapod.tel_application.aget()
count = 0
position_current = start
position_next = start
step_current = step_initial
while position_current != stop:
# Make sure the next position is not beyond the stop
position_next = position_current + step_current
if step_initial >= 0:
if position_next >= stop:
position_next = stop
else:
if position_next <= stop:
position_next = stop

self.log.debug(
f"{self.hexapod_name} moves from {position_current} to "
f"{position_next} with step size: {step_current}"
)

# Do the movement
all_positions = await self._get_position()
all_positions[self.config.axis] = position_next

is_successful = await self._move_hexapod(**all_positions)

all_positions = {axes[i]: pos.position[i] for i in range(len(axes))}
all_positions[self.config.axis] = new_pos
# Update the step size based on the result
teweitsai marked this conversation as resolved.
Show resolved Hide resolved
step_current_update = (
(step_current + step_initial) if is_successful else (step_current / 2)
)
step_current = (
step_current_update
if (abs(step_current_update) >= abs(step_initial))
else step_initial
)

# Update the current position
if is_successful:
position_current = position_next
else:
position_fail = await self._get_position()
position_current = position_fail[self.config.axis]

count += 1

if count >= max_count:
raise RuntimeError(
f"{self.hexapod_name} failed to move with {max_count} "
"tries in a single stage"
)

self.log.debug(
f"Current position of {self.hexapod_name} is {position_current}"
)

await self.move_hexapod(**all_positions)
await asyncio.sleep(sleep_time)

async def _get_position(self) -> dict:
"""Get the current position of the hexapod.

Returns
-------
`dict`
Current position of the hexapod.
"""

pos = await self.hexapod.tel_application.aget()

axes = "xyzuvw"
return {axes[i]: pos.position[i] for i in range(len(axes))}

async def _move_hexapod(
self,
x: float,
y: float,
z: float,
u: float,
v: float,
w: float = 0.0,
) -> bool:
"""Move the hexapod to the given position.

Parameters
----------
x : `float`
Hexapod-x position (microns).
y : `float`
Hexapod-y position (microns).
z : `float`
Hexapod-z position (microns).
u : `float`
Hexapod-u angle (degrees).
v : `float`
Hexapod-v angle (degrees).
w : `float`, optional
Hexapod-w angle (degrees). Default 0.

Returns
-------
`bool`
`True` if the movement is successful, `False` otherwise.
"""

try:
await self.move_hexapod(x, y, z, u, v, w=w)

return True
except (asyncio.CancelledError, TimeoutError, salobj.base.AckError):
self.log.exception(
f"Error moving the {self.hexapod_name} to {x=}, {y=}, {z=}, {u=}, {v=}."
)

# If the hexapod is in fault, recover it
state = self.hexapod.evt_summaryState.get().summaryState
if state == salobj.State.FAULT:
self.log.info(f"Recover the {self.hexapod_name} CSC from the Fault.")
await salobj.set_summary_state(self.hexapod, salobj.State.ENABLED)

# If the hexapod is moving, stop it
controller_enabled_state = (
self.hexapod.evt_controllerState.get().enabledSubstate
)
if controller_enabled_state == EnabledSubstate.MOVING_POINT_TO_POINT:
self.log.info(f"Stop the {self.hexapod_name} CSC.")
await self.hexapod.cmd_stop.set_start()

# Wait for a few seconds
await asyncio.sleep(5.0)

return False

async def single_loop(self, step, sleep_time):
"""
Do a full loop moving from the current position to the positive limit
Expand All @@ -251,27 +386,70 @@ async def single_loop(self, step, sleep_time):
sleep_time : float
Time between movements.
"""
self.log.info(f"Start loop with {step} step and {sleep_time} sleep time.")
idx = "xyzuvw".index(self.config.axis)
pos = await self.hexapod.tel_application.aget()
current_pos = pos.position[idx] # current position

self.log.debug(
f"Move from current position to maximum position in steps of {step}"
self.log.info(
f"{self.hexapod_name} starts loop with {step} step and {sleep_time} sleep time."
)

# Move to the origin first
self.log.info(f"Move the {self.hexapod_name} to the origin")
await self._move_to_origin()

# Positive direction
self.log.info(f"Move {self.hexapod_name} from 0 to maximum position")
await self.move_stepwise(
current_pos,
0.0,
self.config.max_position,
step,
sleep_time,
)

self.log.debug(
f"Move from maximum position to minimum position in steps of {step}"
self.log.info(f"Move {self.hexapod_name} from maximum position back to 0")
await self.move_stepwise(
self.config.max_position,
0.0,
-step,
sleep_time,
)

# Negative direction
self.log.info(f"Move {self.hexapod_name} from 0 to minimum position")
await self.move_stepwise(
0.0,
-self.config.max_position,
-step,
sleep_time,
)

self.log.info(f"Move {self.hexapod_name} from minimum position back to 0")
await self.move_stepwise(
self.config.max_position, -self.config.max_position, -step, sleep_time
-self.config.max_position,
0.0,
step,
sleep_time,
)

self.log.debug(f"Move from minimum position back to 0 in steps of {step}")
await self.move_stepwise(-self.config.max_position, 1, step, sleep_time)
async def _move_to_origin(self, max_count: int = 5) -> None:
"""Move to the origin.

Parameters
----------
max_count : `int`, optional
Maximum count to try. (the default is 5)

Raises
------
`RuntimeError`
When the hexapod fails to move to the origin.
"""

count = 0
while count < max_count:
is_done = await self._move_hexapod(0.0, 0.0, 0.0, 0.0, 0.0, w=0.0)
if is_done:
return
else:
count += 1

raise RuntimeError(
f"Failed to move {self.hexapod_name} to the origin. with {max_count} tries"
)
Loading