-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpick_and_place_dual_arm_sm.py
100 lines (79 loc) · 3.96 KB
/
pick_and_place_dual_arm_sm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
from enum import Enum
import math
from rospy import loginfo, logwarn
import PyKDL
import pprint
import time
from pick_and_place_arm_sm import PickAndPlaceStateMachine, PickAndPlaceState, PSM_HOME_POS, vector_eps_eq
PSM_HOME_POS = PyKDL.Vector(0., 0., -0.1)
class PickAndPlaceDualArmStateMachine:
# this class isn't actually a state machine, but rather a way to connect
# the states of PSM1 and PSM2 state machines to effectively create a
# 2-arm FSM
def _get_objects_for_psms(self):
'''
Returns a dict of PSM index -> list of objects that are closest to that PSM
'''
result = dict()
for object in self.world.objects:
closest_psm_idx = self.world_to_psm_tfs.index(
min(self.world_to_psm_tfs, key=lambda tf : (tf * object.pos).Norm()))
if closest_psm_idx not in result:
result[closest_psm_idx] = list()
result[closest_psm_idx].append(object)
if self.log_verbose:
loginfo("Unpicked objects: {}".format(pprint.pformat(result)))
return result
def __init__(self, psms, world_to_psm_tfs, world, approach_vec, log_verbose=False):
self.world = world
self.psms = psms
self.world_to_psm_tfs = world_to_psm_tfs
self.log_verbose = log_verbose
self.approach_vec = approach_vec
# get objects for psms
psm_to_objects_map = self._get_objects_for_psms()
print(psm_to_objects_map)
if 0 in psm_to_objects_map:
self.current_sm = PickAndPlaceStateMachine(self.psms[0], self.world,
self.world_to_psm_tfs[0], psm_to_objects_map[0][0],
approach_vec, closed_loop=False,
home_when_done=bool(len(psm_to_objects_map[0]) <= 1))
elif 1 in psm_to_objects_map:
self.current_sm = PickAndPlaceStateMachine(self.psms[1], self.world,
self.world_to_psm_tfs[1], psm_to_objects_map[1][0],
approach_vec, closed_loop=False)
def update_world(self, world):
self.world = world
self.current_sm.update_world(world)
def run_once(self):
if self.current_sm.is_done() and \
self.current_sm.psm.get_current_jaw_position() > math.pi / 3:
objects = self._get_objects_for_psms()
if 0 in objects:
print(len(objects[0]))
self.current_sm = \
PickAndPlaceStateMachine(self.psms[0], self.world,
self.world_to_psm_tfs[0], objects[0][0],
self.approach_vec, closed_loop=False, home_when_done=False)
elif 1 in objects:
if self.current_sm.psm == self.psms[0]:
# go home
self.current_sm.state = PickAndPlaceState.HOME
if not self.current_sm.is_done():
self.current_sm.run_once()
# this state check is here because i want to sleep
while not vector_eps_eq(self.psms[0].get_current_position().p, PSM_HOME_POS):
# spaghetti code
time.sleep(0.1)
self.current_sm = \
PickAndPlaceStateMachine(self.psms[1], self.world,
self.world_to_psm_tfs[1], objects[1][0],
self.approach_vec, closed_loop=False, home_when_done=False)
if not self.current_sm.is_done():
self.current_sm.run_once()
def is_done(self):
return bool(len(self.world.objects) == 0) and self.current_sm.jaw_fully_open()
def __str__(self):
return str(self.__dict__)
def __repr__(self):
return str(self.__dict__)