Skip to content

Commit

Permalink
Added gymnasium_interface module with PyCRAMGymEnv and task executor …
Browse files Browse the repository at this point in the history
…implementation
  • Loading branch information
mkhoshnam committed Nov 21, 2024
1 parent ba70684 commit 0e64189
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 0 deletions.
75 changes: 75 additions & 0 deletions src/gymnasium_interface/Pycram_gym_env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import gymnasium as gym
from gymnasium.spaces import Discrete
from .task_executor import PyCRAMTaskExecutor
from pycram.process_module import simulated_robot
import sys

class PyCRAMGymEnv(gym.Env):
def __init__(self, actions, default_params=None, objects=None, reward_function=None):
"""
Initializes the Gymnasium environment.
Args:
actions (list): List of valid action names (e.g., ["navigate", "pick_up", "place"]).
default_params (dict): Default parameters for actions if not provided by the user.
objects (list): List of objects to add to the environment.
reward_function (function): User-defined function to calculate rewards.
"""
super().__init__()
self.actions = actions
self.default_params = default_params or {}
self.objects = objects or []
self.reward_function = reward_function

# Dynamically define the action space
self.action_space = Discrete(len(actions))

# Initialize the task executor
self.executor = PyCRAMTaskExecutor()

# Initialize the state
self.state = None
self.reset()

def reset(self):
"""Resets the environment."""
with simulated_robot:
self.executor.reset_task(self.objects)
self.state = self.executor.get_current_state()
return self.state, {}

def step(self, action, params=None):
"""Executes a step in the environment."""
with simulated_robot:
action_name = self.actions[action]
action_params = self.default_params.get(action_name, {}).copy()
if params:
action_params.update(params)

# Execute the action
self.executor.execute_action(action_name, action_params)

# Update the state
self.state = self._get_observation()

# Calculate reward
reward = self._calculate_reward()

# Placeholder: done logic can be updated later
done = self._is_done()

return self.state, reward, done, False, {}

def _get_observation(self):
"""Fetches the current state of the environment."""
return self.state

def _calculate_reward(self):
"""Calculates the reward using the user-defined reward function."""
if self.reward_function:
return self.reward_function(self.state)
return 1.0

def _is_done(self):
"""Checks if the task is complete."""
return False
4 changes: 4 additions & 0 deletions src/gymnasium_interface/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .Pycram_gym_env import PyCRAMGymEnv
from .task_executor import PyCRAMTaskExecutor

__all__ = ["PyCRAMGymEnv", "PyCRAMTaskExecutor"]
21 changes: 21 additions & 0 deletions src/gymnasium_interface/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from gymnasium_interface.Pycram_gym_env import PyCRAMGymEnv
from pycram.datastructures.enums import Arms, Grasp
from pycram.datastructures.pose import Pose

# Example usage
def custom_reward(state):
return 10 if state else -1

actions = ["navigate", "pick_up"]
default_params = {
"navigate": {"target_pose": Pose([1.0, 2.0, 0.0], [0.0, 0.0, 0.0, 1.0])},
"pick_up": {"object_desig": "milk", "arm": Arms.RIGHT, "grasps": [Grasp.FRONT]},
}
objects = [{"name": "milk", "type": "object", "urdf": "milk.stl", "pose": Pose([2.5, 2.10, 1.02])}]

env = PyCRAMGymEnv(actions, default_params, objects=objects, reward_function=custom_reward)
state = env.reset()
print("State after reset:", state)
state, reward, done, truncated, info = env.step(1, {"object_desig": "milk", "arm": Arms.RIGHT, "grasps": [Grasp.FRONT]})
print("State after step:", state, "Reward:", reward)

111 changes: 111 additions & 0 deletions src/gymnasium_interface/task_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from pycram.worlds.bullet_world import BulletWorld
from pycram.world_concepts.world_object import Object
from pycram.datastructures.enums import ObjectType, WorldMode, Grasp
from pycram.datastructures.pose import Pose
from pycram.designators.action_designator import NavigateAction, PickUpAction, PlaceAction, OpenAction, CloseAction
from pycram.designators.object_designator import BelieveObject
from pycram.process_module import simulated_robot

class PyCRAMTaskExecutor:
def __init__(self):
"""Initializes task executor for PyCRAM actions."""
self.world = BulletWorld(WorldMode.GUI)
self.robot = None
self.apartment = None

def clear_world(self):
"""Removes all objects from the BulletWorld."""
print("Clearing all objects from BulletWorld...")
for obj in list(self.world.objects):
obj.remove()
print("All objects removed from BulletWorld.")

def reset_task(self, objects):
"""Resets the simulation environment dynamically."""
self.clear_world()

# Reload the apartment URDF
self.apartment = Object("apartment", "environment", "apartment.urdf")

# Reinitialize the robot
self.robot = Object("pr2", ObjectType.ROBOT, "pr2.urdf", pose=Pose([1.2, 1, 0]))
self.world.robot = self.robot

# Add dynamic objects
for obj in objects:
name = obj["name"]
obj_type = obj["type"]
urdf = obj["urdf"]
pose = obj["pose"]

print(f"Adding object: {name}, URDF path: {urdf}, Pose: {pose}")

existing_object = self.world.get_object_by_name(name)
if existing_object:
print(f"Reusing existing object: {name}")
else:
Object(name, obj_type, urdf, pose=pose)

print("Environment reset: Apartment, robot, and dynamic objects added.")

def execute_action(self, action, params):
"""Executes a PyCRAM action."""
with simulated_robot:
if action == "navigate":
self._navigate(params)
elif action == "pick_up":
self._pick_up(params)
elif action == "place":
self._place(params)
elif action == "open":
self._open(params)
elif action == "close":
self._close(params)
else:
raise ValueError(f"Unknown action: {action}")

def _navigate(self, params):
target_pose = params.get("target_pose")
if not target_pose:
raise ValueError("Missing parameter: target_pose")
NavigateAction(target_locations=[target_pose]).resolve().perform()

def _pick_up(self, params):
object_name = params.get("object_desig")
arm = params.get("arm")
grasps = params.get("grasps", [Grasp.RIGHT])
if not object_name or not arm:
raise ValueError("Missing parameters: object_desig and arm are required")
object_desig = BelieveObject(names=[object_name])
action = PickUpAction(
object_designator_description=object_desig, arms=[arm], grasps=grasps
).resolve()
action.perform()

def _place(self, params):
object_desig = params.get("object_desig")
target_pose = params.get("target_pose")
arm = params.get("arm")
if not object_desig or not target_pose or not arm:
raise ValueError("Missing parameters: object_desig, target_pose, and arm are required")
PlaceAction(object_designator_description=object_desig, target_locations=[target_pose], arms=[arm]).resolve().perform()

def _open(self, params):
handle_desig = params.get("handle_desig")
arm = params.get("arm")
if not handle_desig or not arm:
raise ValueError("Missing parameters: handle_desig and arm are required")
OpenAction(handle_desig, [arm]).resolve().perform()

def _close(self, params):
handle_desig = params.get("handle_desig")
arm = params.get("arm")
if not handle_desig or not arm:
raise ValueError("Missing parameters: handle_desig and arm are required")
CloseAction(handle_desig, [arm]).resolve().perform()

def get_current_state(self):
"""Fetches the current state of the environment."""
robot_pose = self.robot.get_pose() if self.robot else None
objects = [{"name": obj.name, "pose": obj.pose} for obj in self.world.objects]
return {"robot_pose": robot_pose, "objects": objects}

0 comments on commit 0e64189

Please sign in to comment.