-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #7 from helix-robotics-ag/seb/add_tendon_control
Seb/add tendon control
- Loading branch information
Showing
12 changed files
with
305 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
helix_transmission/config/tendon_calib.yml | ||
helix_transmission/config/tendon_calib.yml.bak |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
tendon_transmission_node: | ||
ros__parameters: | ||
pulley_radius: 0.01 # [m] | ||
# For Motor 0,1,2,3,4,5 increasing rotor position/positive torque -> contraction | ||
# For Motor 6,7,8 decreasing rotor position/negative torque -> contraction | ||
motor_orients: [1, 1, 1, 1, 1, 1, -1, -1, -1] | ||
tendon_min_lim: -0.1 # [m] | ||
tendon_max_lim: 0.1 # [m] | ||
holding_current: 70.0 # [mA] | ||
# TODO - a better solution for the calibration file | ||
tendon_calib_file_path: "/colcon_ws/src/helix_transmission/config/tendon_calib.yml" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
motor_offsets: | ||
- -2.820990800857544 | ||
- -2.980524778366089 | ||
- -5.218602657318115 | ||
- -8.886350631713867 | ||
- -6.9934186935424805 | ||
- -2.9897286891937256 | ||
- 1.1443496942520142 | ||
- -8.996797561645508 | ||
- -9.438584327697754 | ||
|
Empty file.
216 changes: 216 additions & 0 deletions
216
helix_transmission/helix_transmission/tendon_transmission.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,216 @@ | ||
import rclpy | ||
from rclpy.node import Node | ||
from rclpy.executors import MultiThreadedExecutor | ||
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup | ||
import numpy as np | ||
import yaml | ||
|
||
from sensor_msgs.msg import JointState | ||
from std_msgs.msg import Float64MultiArray | ||
from std_srvs.srv import Trigger | ||
from controller_manager_msgs.srv import SwitchController | ||
|
||
class TendonTransmissionNode(Node): | ||
|
||
def __init__(self): | ||
super().__init__('tendon_transmission_node') | ||
|
||
# Config from helix_transmission.config.yml | ||
self.declare_parameters( | ||
namespace='', | ||
parameters=[ | ||
('pulley_radius',rclpy.Parameter.Type.DOUBLE), | ||
('motor_orients',rclpy.Parameter.Type.INTEGER_ARRAY), | ||
('tendon_min_lim',rclpy.Parameter.Type.DOUBLE), | ||
('tendon_max_lim',rclpy.Parameter.Type.DOUBLE), | ||
('holding_current',rclpy.Parameter.Type.DOUBLE), | ||
('tendon_calib_file_path',rclpy.Parameter.Type.STRING) | ||
]) | ||
self.PULLEY_RADIUS = self.get_parameter('pulley_radius').value | ||
self.MOTOR_ORIENTS = np.array(self.get_parameter('motor_orients').value, dtype=np.float64) | ||
self.TENDON_LIMITS = np.array([self.get_parameter('tendon_min_lim').value, | ||
self.get_parameter('tendon_max_lim').value]) | ||
self.HOLDING_CURRENT = self.get_parameter('holding_current').value | ||
|
||
# Motor offsets from local tendon calibration file | ||
self.MOTOR_OFFSETS = np.array([0, 0, 0, 0, 0, 0, 0, 0, 0], dtype=np.float64) | ||
try: | ||
with open(self.get_parameter('tendon_calib_file_path').value, 'r') as file: | ||
data = yaml.safe_load(file) | ||
if 'motor_offsets' in data: | ||
self.MOTOR_OFFSETS = data['motor_offsets'] | ||
else: | ||
self.get_logger().info('No offset data found in calibration file, offsets set to 0') | ||
except (FileNotFoundError, yaml.YAMLError, TypeError): | ||
self.get_logger().info('Failed to load calibration file, offsets set to 0') | ||
|
||
self.last_motor_joint_positions = None | ||
|
||
# Subscription/publication for motor<->tendon transmission | ||
self.tendon_command_sub = self.create_subscription( | ||
Float64MultiArray, | ||
'~/command', | ||
self.tendon_to_motor_command_cb, | ||
10) | ||
self.tendon_command_sub | ||
|
||
self.motor_state_sub = self.create_subscription( | ||
JointState, | ||
'/motor_head_joint_state_broadcaster/joint_states', | ||
self.motor_state_cb, | ||
10) | ||
self.motor_state_sub | ||
|
||
self.tendon_to_motor_command_pub = self.create_publisher( | ||
Float64MultiArray, | ||
'/motor_head_joint_position_controller/commands', | ||
10) | ||
|
||
self.tendon_state_pub = self.create_publisher( | ||
JointState, | ||
'~/tendon_states', | ||
10) | ||
|
||
# Publisher for motor current holding current | ||
self.motor_effort_command_pub = self.create_publisher( | ||
Float64MultiArray, | ||
'/motor_head_joint_effort_controller/commands', | ||
10) | ||
|
||
# Client/service for setting holding current | ||
client_cb_group = MutuallyExclusiveCallbackGroup() | ||
|
||
self.controller_switch_cli = self.create_client( | ||
SwitchController, '/controller_manager/switch_controller', callback_group=client_cb_group) | ||
|
||
service_cb_group = MutuallyExclusiveCallbackGroup() | ||
|
||
# TODO - make custom services work over rosbridge so can have one parametrised set current service | ||
self.set_holding_current_srv = self.create_service( | ||
Trigger, '~/set_holding_current', self.set_holding_current_cb, callback_group=service_cb_group) | ||
|
||
self.unwind_srv = self.create_service( | ||
Trigger, '~/unwind', self.unwind_cb, callback_group=service_cb_group) | ||
|
||
self.set_motor_offsets_srv = self.create_service( | ||
Trigger, '~/set_motor_offsets', self.set_motor_offsets_cb, callback_group=service_cb_group) | ||
|
||
self.check_calibration_srv = self.create_service( | ||
Trigger, '~/check_calibration', self.check_calibration_cb, callback_group=service_cb_group) | ||
|
||
# Callbacks for motor<->tendon transmission | ||
def tendon_to_motor_command_cb(self, msg): | ||
command = np.array(msg.data, dtype=np.float64) | ||
command_limited = np.clip(command, self.TENDON_LIMITS[0], self.TENDON_LIMITS[1]) | ||
if not np.array_equal(command, command_limited): | ||
self.get_logger().info('Some tendon commands exceeded limits and were clipped') | ||
motor_command = Float64MultiArray() | ||
motor_command.data = ((command_limited * self.MOTOR_ORIENTS) \ | ||
/ self.PULLEY_RADIUS + self.MOTOR_OFFSETS).tolist() | ||
self.tendon_to_motor_command_pub.publish(motor_command) | ||
|
||
def motor_state_cb(self, msg): | ||
tendon_state = JointState() | ||
motor_names = sorted(msg.name) | ||
motor_angs = [msg.position[msg.name.index(motor_name)] for motor_name in motor_names] | ||
self.last_motor_joint_positions = np.array(motor_angs, dtype=np.float64) | ||
motor_angvels = [msg.velocity[msg.name.index(motor_name)] for motor_name in motor_names] | ||
motor_currents = [msg.effort[msg.name.index(motor_name)] for motor_name in motor_names] | ||
tendon_state.name = motor_names | ||
tendon_state.position = ((np.array(motor_angs, dtype=np.float64) - self.MOTOR_OFFSETS) \ | ||
* self.PULLEY_RADIUS * self.MOTOR_ORIENTS).tolist() | ||
tendon_state.velocity = (np.array(motor_angvels, dtype=np.float64) \ | ||
* self.PULLEY_RADIUS * self.MOTOR_ORIENTS).tolist() | ||
tendon_state.effort = motor_currents | ||
self.tendon_state_pub.publish(tendon_state) | ||
|
||
# Callbacks for calibration | ||
def set_holding_current_cb(self, request, response): | ||
while not self.controller_switch_cli.wait_for_service(timeout_sec=1.0): | ||
self.get_logger().info('Waiting for controller switch service') | ||
controller_switch_req = SwitchController.Request() | ||
controller_switch_req.activate_controllers = ['motor_head_joint_effort_controller'] | ||
controller_switch_req.deactivate_controllers = ['motor_head_joint_position_controller'] | ||
controller_switch_req.strictness = SwitchController.Request.BEST_EFFORT | ||
controller_switch_future = self.controller_switch_cli.call_async(controller_switch_req) | ||
while self.executor.spin_until_future_complete(controller_switch_future): | ||
self.get_logger().info("Waiting for controller switch to complete") | ||
if controller_switch_future.result().ok == False: | ||
self.get_logger().error('Failed to switch to effort controller') | ||
response.success = False | ||
response.message = 'Failed to switch to effort controller' | ||
return response | ||
self.motor_effort_command_pub.publish( | ||
Float64MultiArray(data = self.HOLDING_CURRENT * self.MOTOR_ORIENTS)) | ||
response.success = True | ||
return response | ||
|
||
def unwind_cb(self, request, response): | ||
while not self.controller_switch_cli.wait_for_service(timeout_sec=1.0): | ||
self.get_logger().info('Waiting for controller switch service') | ||
controller_switch_req = SwitchController.Request() | ||
controller_switch_req.activate_controllers = ['motor_head_joint_effort_controller'] | ||
controller_switch_req.deactivate_controllers = ['motor_head_joint_position_controller'] | ||
controller_switch_req.strictness = SwitchController.Request.STRICT | ||
controller_switch_future = self.controller_switch_cli.call_async(controller_switch_req) | ||
while self.executor.spin_until_future_complete(controller_switch_future): | ||
self.get_logger().info("Waiting for controller switch to complete") | ||
if controller_switch_future.result().ok == False: | ||
self.get_logger().error('Failed to switch to effort controller') | ||
response.success = False | ||
response.message = 'Failed to switch to effort controller' | ||
return response | ||
self.motor_effort_command_pub.publish( | ||
Float64MultiArray(data = -3.0 * self.MOTOR_ORIENTS)) | ||
response.success = True | ||
return response | ||
|
||
def set_motor_offsets_cb(self, request, response): | ||
write_succeeded = self.write_motor_offsets(self.last_motor_joint_positions.tolist()) | ||
if write_succeeded: | ||
response.success = True | ||
else: | ||
response.success = False | ||
response.message = 'Failed to set motor offsets file' | ||
return response | ||
|
||
def check_calibration_cb(self, request, response): | ||
recalibrated_offsets = self.MOTOR_OFFSETS \ | ||
+ 2*np.pi * np.round((self.last_motor_joint_positions-self.MOTOR_OFFSETS) / (2*np.pi)) | ||
write_succeeded = self.write_motor_offsets(recalibrated_offsets.tolist()) | ||
if write_succeeded: | ||
response.success = True | ||
else: | ||
response.success = False | ||
response.message = 'Failed to update motor offsets file' | ||
return response | ||
|
||
def write_motor_offsets(self, new_offsets): | ||
try: | ||
with open(self.get_parameter('tendon_calib_file_path').value + '.bak', 'w') as backup: | ||
yaml.dump({'motor_offsets': self.MOTOR_OFFSETS}, backup) | ||
with open(self.get_parameter('tendon_calib_file_path').value, 'w') as file: | ||
self.MOTOR_OFFSETS = new_offsets | ||
yaml.dump({'motor_offsets': self.MOTOR_OFFSETS}, file) | ||
return True | ||
except (FileNotFoundError, yaml.YAMLError, TypeError): | ||
self.get_logger().info('Failed to write motor offsets to file') | ||
return False | ||
|
||
|
||
def main(args=None): | ||
rclpy.init(args=args) | ||
|
||
tendon_transmission_node = TendonTransmissionNode() | ||
|
||
# Unable to complete controller switch service call from within holding current service callback | ||
# MultiThreadedExectuor solution based on https://answers.ros.org/question/412149/ | ||
executor = MultiThreadedExecutor() | ||
rclpy.spin(tendon_transmission_node, executor=executor) | ||
|
||
tendon_transmission_node.destroy_node() | ||
rclpy.shutdown() | ||
|
||
|
||
if __name__ == '__main__': | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
<?xml version="1.0"?> | ||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?> | ||
<package format="3"> | ||
<name>helix_transmission</name> | ||
<version>0.0.0</version> | ||
<description>Transmission providing abstraction between Helix motors and tendons</description> | ||
<maintainer email="[email protected]">sebtiburzio</maintainer> | ||
<license>TODO: License declaration</license> | ||
|
||
<exec_depend>rclpy</exec_depend> | ||
<exec_depend>numpy</exec_depend> | ||
<exec_depend>yaml</exec_depend> | ||
<exec_depend>sensor_msgs</exec_depend> | ||
<exec_depend>std_msgs</exec_depend> | ||
<exec_depend>controller_manager_msgs</exec_depend> | ||
|
||
<export> | ||
<build_type>ament_python</build_type> | ||
</export> | ||
</package> |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
[develop] | ||
script_dir=$base/lib/helix_transmission | ||
[install] | ||
install_scripts=$base/lib/helix_transmission |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
import os | ||
from glob import glob | ||
from setuptools import find_packages, setup | ||
|
||
package_name = 'helix_transmission' | ||
|
||
setup( | ||
name=package_name, | ||
version='0.0.0', | ||
packages=find_packages(exclude=['test']), | ||
data_files=[ | ||
('share/ament_index/resource_index/packages', | ||
['resource/' + package_name]), | ||
('share/' + package_name, ['package.xml']), | ||
(os.path.join('share', package_name, 'launch'), glob(os.path.join('launch/*launch.[pxy][yma]*'))), | ||
(os.path.join('share', package_name, 'config'), glob(os.path.join('config', '*config.[pxy][yma]*'))), | ||
], | ||
install_requires=['setuptools'], | ||
zip_safe=True, | ||
maintainer='sebtiburzio', | ||
maintainer_email='[email protected]', | ||
description='Transmission providing abstraction between Helix motors and tendons', | ||
license='TODO: License declaration', | ||
entry_points={ | ||
'console_scripts': [ | ||
'tendon_transmission_node = helix_transmission.tendon_transmission:main' | ||
], | ||
}, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters