Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ForceTorqueMonitoring] Adds Force Torque Monitoring for the real robot #233

Merged
merged 3 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/pycram/failures.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)


class SensorMonitoringCondition(PlanFailure):
"""Thrown when a sensor monitoring condition is met."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)


class DeliveringFailed(HighLevelFailure):
"""Thrown when delivering plan completely gives up."""

Expand Down
2 changes: 1 addition & 1 deletion src/pycram/language.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ def perform(self):
for i in range(self.repeat):
for child in self.children:
if self.interrupted:
return
return State.FAILED, results
try:
results.append(child.resolve().perform())
except PlanFailure as e:
Expand Down
46 changes: 39 additions & 7 deletions src/pycram/ros_utils/force_torque_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from ..datastructures.enums import FilterConfig
from ..datastructures.world import World
from ..failures import SensorMonitoringCondition
from ..ros.filter import Butterworth
from ..ros.data_types import Time
from ..ros.publisher import create_publisher
Expand Down Expand Up @@ -133,6 +134,14 @@ def _get_robot_parameters(self):

elif self.robot_name == 'iai_donbot':
self.wrench_topic_name = '/kms40_driver/wrench'

elif self.robot_name == 'pr2':
raw_data = "/ft/l_gripper_motor_zeroed"
raw_data_10_sample_moving_average = "/ft/l_gripper_motor_zeroed_avg"
derivative_data = "/ft/l_gripper_motor_zeroed_derivative"
derivative_data_10_sample_moving_average = "/ft/l_gripper_motor_zeroed_derivative_avg"

self.wrench_topic_name = raw_data
else:
rospy.logerr(f'{self.robot_name} is not supported')

Expand Down Expand Up @@ -163,6 +172,7 @@ def _get_filter(self, order=4, cutoff=10, fs=60):

def _filter_data(self, current_wrench_data: WrenchStamped) -> WrenchStamped:
filtered_data = WrenchStamped()
filtered_data.header = current_wrench_data.header
for attr in ['x', 'y', 'z']:
force_values = [getattr(val.wrench.force, attr) for val in self.prev_values] + [
getattr(current_wrench_data.wrench.force, attr)]
Expand Down Expand Up @@ -210,19 +220,41 @@ def get_derivative(self, is_filtered=True) -> WrenchStamped:
"""
Calculate the derivative of current data.

:param is_filtered: Decides about using filtered or raw data
:param is_filtered: Decides about using filtered or raw data.
:return: The derivative as a WrenchStamped object. Returns a zeroed derivative if only one data point exists.
"""
status = self.filtered if is_filtered else self.unfiltered

if len(self.whole_data[status]) < 2:
return WrenchStamped()

before: WrenchStamped = self.whole_data[status][-2]
after: WrenchStamped = self.whole_data[status][-1]
derivative = WrenchStamped()

derivative.wrench.force.x = before.wrench.force.x - after.wrench.force.x
derivative.wrench.force.y = before.wrench.force.y - after.wrench.force.y
derivative.wrench.force.z = before.wrench.force.z - after.wrench.force.z
derivative.wrench.torque.x = before.wrench.torque.x - after.wrench.torque.x
derivative.wrench.torque.y = before.wrench.torque.y - after.wrench.torque.y
derivative.wrench.torque.z = before.wrench.torque.z - after.wrench.torque.z
dt = (after.header.stamp - before.header.stamp).to_sec()
if dt == 0:
return WrenchStamped()

derivative.wrench.force.x = (after.wrench.force.x - before.wrench.force.x) / dt
derivative.wrench.force.y = (after.wrench.force.y - before.wrench.force.y) / dt
derivative.wrench.force.z = (after.wrench.force.z - before.wrench.force.z) / dt
derivative.wrench.torque.x = (after.wrench.torque.x - before.wrench.torque.x) / dt
derivative.wrench.torque.y = (after.wrench.torque.y - before.wrench.torque.y) / dt
derivative.wrench.torque.z = (after.wrench.torque.z - before.wrench.torque.z) / dt

return derivative

def human_touch_monitoring(self):
print("monitoring")
LucaKro marked this conversation as resolved.
Show resolved Hide resolved
if self.robot_name == 'hsrb':
der = self.get_last_value()
if abs(der.wrench.force.x) > 10.30:
print("sensor")
LucaKro marked this conversation as resolved.
Show resolved Hide resolved
return SensorMonitoringCondition
elif self.robot_name == 'pr2':
der = self.get_derivative()
print(der.wrench.torque.x)
LucaKro marked this conversation as resolved.
Show resolved Hide resolved
if abs(der.wrench.torque.x) > 3:
print("sensor")
return SensorMonitoringCondition