diff --git a/gymnasium/envs/mujoco/mujoco_env.py b/gymnasium/envs/mujoco/mujoco_env.py index 5f2fa6396..2f3e53050 100644 --- a/gymnasium/envs/mujoco/mujoco_env.py +++ b/gymnasium/envs/mujoco/mujoco_env.py @@ -1,5 +1,5 @@ from os import path -from typing import Any, Dict, Optional, Tuple, Union +from typing import Dict, Optional, Tuple, Union import numpy as np from numpy.typing import NDArray @@ -34,19 +34,22 @@ def expand_model_path(model_path: str) -> str: return fullpath -class BaseMujocoEnv(gym.Env[NDArray[np.float64], NDArray[np.float32]]): - """Superclass for all MuJoCo environments.""" +class MujocoEnv(gym.Env): + """Superclass for MuJoCo based environments.""" def __init__( self, - model_path, - frame_skip, + model_path: str, + frame_skip: int, observation_space: Optional[Space], render_mode: Optional[str] = None, width: int = DEFAULT_SIZE, height: int = DEFAULT_SIZE, camera_id: Optional[int] = None, camera_name: Optional[str] = None, + default_camera_config: Optional[Dict[str, Union[float, int]]] = None, + max_geom: int = 1000, + visual_options: Dict[int, bool] = {}, ): """Base abstract class for mujoco based environments. @@ -59,6 +62,9 @@ def __init__( height: The height of the render window. camera_id: The camera ID used. camera_name: The name of the camera used (can not be used in conjunction with `camera_id`). + default_camera_config: configuration for rendering camera. + max_geom: max number of rendered geometries. + visual_options: render flag options. Raises: OSError: when the `model_path` does not exist. @@ -93,48 +99,78 @@ def __init__( self.camera_name = camera_name self.camera_id = camera_id + from gymnasium.envs.mujoco.mujoco_rendering import MujocoRenderer + + self.mujoco_renderer = MujocoRenderer( + self.model, + self.data, + default_camera_config, + self.width, + self.height, + max_geom, + camera_id, + camera_name, + visual_options, + ) + def _set_action_space(self): bounds = self.model.actuator_ctrlrange.copy().astype(np.float32) low, high = bounds.T self.action_space = spaces.Box(low=low, high=high, dtype=np.float32) return self.action_space - # methods to override: - # ---------------------------- - def step( - self, action: NDArray[np.float32] - ) -> Tuple[NDArray[np.float64], np.float64, bool, bool, Dict[str, np.float64]]: - raise NotImplementedError - - def reset_model(self) -> NDArray[np.float64]: + def _initialize_simulation( + self, + ) -> Tuple["mujoco.MjModel", "mujoco.MjData"]: """ - Reset the robot degrees of freedom (qpos and qvel). - Implement this in each subclass. + Initialize MuJoCo simulation data structures `mjModel` and `mjData`. """ - raise NotImplementedError + model = mujoco.MjModel.from_xml_path(self.fullpath) + # MjrContext will copy model.vis.global_.off* to con.off* + model.vis.global_.offwidth = self.width + model.vis.global_.offheight = self.height + data = mujoco.MjData(model) + return model, data - def _initialize_simulation(self) -> Tuple[Any, Any]: - """ - Initialize MuJoCo simulation data structures mjModel and mjData. + def set_state(self, qpos, qvel): + """Set the joints position qpos and velocity qvel of the model. + + Note: `qpos` and `qvel` is not the full physics state for all mujoco models/environments https://mujoco.readthedocs.io/en/stable/APIreference/APItypes.html#mjtstate """ - raise NotImplementedError + assert qpos.shape == (self.model.nq,) and qvel.shape == (self.model.nv,) + self.data.qpos[:] = np.copy(qpos) + self.data.qvel[:] = np.copy(qvel) + if self.model.na == 0: + self.data.act[:] = None + mujoco.mj_forward(self.model, self.data) - def _step_mujoco_simulation(self, ctrl, n_frames) -> None: + def _step_mujoco_simulation(self, ctrl, n_frames): """ Step over the MuJoCo simulation. """ - raise NotImplementedError + self.data.ctrl[:] = ctrl - def render(self) -> Union[NDArray[np.float64], None]: + mujoco.mj_step(self.model, self.data, nstep=n_frames) + + # As of MuJoCo 2.0, force-related quantities like cacc are not computed + # unless there's a force sensor in the model. + # See https://github.com/openai/gym/issues/1541 + mujoco.mj_rnePostConstraint(self.model, self.data) + + def render(self): """ Render a frame from the MuJoCo simulation as specified by the render_mode. """ - raise NotImplementedError + return self.mujoco_renderer.render(self.render_mode) - # ----------------------------- - def _get_reset_info(self) -> Dict[str, float]: - """Function that generates the `info` that is returned during a `reset()`.""" - return {} + def close(self): + """Close rendering contexts processes.""" + if self.mujoco_renderer is not None: + self.mujoco_renderer.close() + + def get_body_com(self, body_name): + """Return the cartesian position of a body frame.""" + return self.data.body(body_name).xpos def reset( self, @@ -168,99 +204,29 @@ def do_simulation(self, ctrl, n_frames) -> None: ) self._step_mujoco_simulation(ctrl, n_frames) - def close(self): - """Close all processes like rendering contexts""" - raise NotImplementedError - - def get_body_com(self, body_name) -> NDArray[np.float64]: - """Return the cartesian position of a body frame""" - raise NotImplementedError - def state_vector(self) -> NDArray[np.float64]: - """Return the position and velocity joint states of the model""" - return np.concatenate([self.data.qpos.flat, self.data.qvel.flat]) - - -class MujocoEnv(BaseMujocoEnv): - """Superclass for MuJoCo environments.""" + """Return the position and velocity joint states of the model. - def __init__( - self, - model_path, - frame_skip, - observation_space: Optional[Space], - render_mode: Optional[str] = None, - width: int = DEFAULT_SIZE, - height: int = DEFAULT_SIZE, - camera_id: Optional[int] = None, - camera_name: Optional[str] = None, - default_camera_config: Optional[Dict[str, Union[float, int]]] = None, - max_geom: int = 1000, - visual_options: Dict[int, bool] = {}, - ): - super().__init__( - model_path, - frame_skip, - observation_space, - render_mode, - width, - height, - camera_id, - camera_name, - ) - - from gymnasium.envs.mujoco.mujoco_rendering import MujocoRenderer - - self.mujoco_renderer = MujocoRenderer( - self.model, - self.data, - default_camera_config, - self.width, - self.height, - max_geom, - camera_id, - camera_name, - visual_options, - ) - - def _initialize_simulation( - self, - ) -> Tuple["mujoco._structs.MjModel", "mujoco._structs.MjData"]: - model = mujoco.MjModel.from_xml_path(self.fullpath) - # MjrContext will copy model.vis.global_.off* to con.off* - model.vis.global_.offwidth = self.width - model.vis.global_.offheight = self.height - data = mujoco.MjData(model) - return model, data - - def set_state(self, qpos, qvel): - """Set the joints position qpos and velocity qvel of the model. - - Note: `qpos` and `qvel` is not the full physics state for all mujoco models/environments https://mujoco.readthedocs.io/en/stable/APIreference/APItypes.html#mjtstate + Note: `qpos` and `qvel` does not constitute the full physics state for all `mujoco` environments see https://mujoco.readthedocs.io/en/stable/computation/index.html#the-state. """ - assert qpos.shape == (self.model.nq,) and qvel.shape == (self.model.nv,) - self.data.qpos[:] = np.copy(qpos) - self.data.qvel[:] = np.copy(qvel) - if self.model.na == 0: - self.data.act[:] = None - mujoco.mj_forward(self.model, self.data) - - def _step_mujoco_simulation(self, ctrl, n_frames): - self.data.ctrl[:] = ctrl - - mujoco.mj_step(self.model, self.data, nstep=n_frames) + return np.concatenate([self.data.qpos.flat, self.data.qvel.flat]) - # As of MuJoCo 2.0, force-related quantities like cacc are not computed - # unless there's a force sensor in the model. - # See https://github.com/openai/gym/issues/1541 - mujoco.mj_rnePostConstraint(self.model, self.data) + # methods to override: + # ---------------------------- + def step( + self, action: NDArray[np.float32] + ) -> Tuple[NDArray[np.float64], np.float64, bool, bool, Dict[str, np.float64]]: + raise NotImplementedError - def render(self): - return self.mujoco_renderer.render(self.render_mode) + def reset_model(self) -> NDArray[np.float64]: + """ + Reset the robot degrees of freedom (qpos and qvel). + Implement this in each environment subclass. + """ + raise NotImplementedError - def close(self): - if self.mujoco_renderer is not None: - self.mujoco_renderer.close() + def _get_reset_info(self) -> Dict[str, float]: + """Function that generates the `info` that is returned during a `reset()`.""" + return {} - def get_body_com(self, body_name): - return self.data.body(body_name).xpos + # ----------------------------- diff --git a/tests/envs/mujoco/test_mujoco_v5.py b/tests/envs/mujoco/test_mujoco_v5.py index afba1da47..c7277d10b 100644 --- a/tests/envs/mujoco/test_mujoco_v5.py +++ b/tests/envs/mujoco/test_mujoco_v5.py @@ -6,7 +6,7 @@ import pytest import gymnasium as gym -from gymnasium.envs.mujoco.mujoco_env import BaseMujocoEnv, MujocoEnv +from gymnasium.envs.mujoco.mujoco_env import MujocoEnv from gymnasium.envs.mujoco.mujoco_py_env import BaseMujocoPyEnv from gymnasium.envs.mujoco.utils import check_mujoco_reset_state from gymnasium.error import Error @@ -90,7 +90,7 @@ def test_verify_info_y_position(env_id: str): def test_verify_info_x_velocity(env_name: str, version: str): """Asserts that the environment `info['x_velocity']` is properly assigned.""" env = gym.make(f"{env_name}-{version}").unwrapped - assert isinstance(env, (BaseMujocoEnv, BaseMujocoPyEnv)) + assert isinstance(env, (MujocoEnv, BaseMujocoPyEnv)) env.reset() old_x = env.data.qpos[0] @@ -107,7 +107,7 @@ def test_verify_info_x_velocity(env_name: str, version: str): def test_verify_info_y_velocity(env_id: str): """Asserts that the environment `info['y_velocity']` is properly assigned.""" env = gym.make(env_id).unwrapped - assert isinstance(env, (BaseMujocoEnv, BaseMujocoPyEnv)) + assert isinstance(env, (MujocoEnv, BaseMujocoPyEnv)) env.reset() old_y = env.data.qpos[1] @@ -123,7 +123,7 @@ def test_verify_info_y_velocity(env_id: str): def test_verify_info_xy_velocity_xpos(env_id: str): """Asserts that the environment `info['x/y_velocity']` is properly assigned, for the ant environment which uses kinmatics for the velocity.""" env = gym.make(env_id).unwrapped - assert isinstance(env, (BaseMujocoEnv, BaseMujocoPyEnv)) + assert isinstance(env, (MujocoEnv, BaseMujocoPyEnv)) env.reset() old_xy = env.get_body_com("torso")[:2].copy() @@ -146,7 +146,7 @@ def mass_center(model, data): return (np.sum(mass * xpos, axis=0) / np.sum(mass))[0:2].copy() env = gym.make(env_id).unwrapped - assert isinstance(env, (BaseMujocoEnv, BaseMujocoPyEnv)) + assert isinstance(env, (MujocoEnv, BaseMujocoPyEnv)) env.reset() old_xy = mass_center(env.model, env.data) @@ -178,7 +178,7 @@ def mass_center(model, data): def test_verify_reward_survive(env_name: str, version: str): """Assert that `reward_survive` is 0 on `terminal` states and not 0 on non-`terminal` states.""" env = gym.make(f"{env_name}-{version}", reset_noise_scale=0).unwrapped - assert isinstance(env, (BaseMujocoEnv, BaseMujocoPyEnv)) + assert isinstance(env, (MujocoEnv, BaseMujocoPyEnv)) env.reset(seed=0) env.action_space.seed(1) @@ -361,7 +361,7 @@ def test_ant_com(version: str): """Verify the kinmatic behaviour of the ant.""" # `env` contains `data : MjData` and `model : MjModel` env = gym.make(f"Ant-{version}").unwrapped - assert isinstance(env, (BaseMujocoEnv, BaseMujocoPyEnv)) + assert isinstance(env, (MujocoEnv, BaseMujocoPyEnv)) env.reset() # randomly initlizies the `data.qpos` and `data.qvel`, calls mujoco.mj_forward(env.model, env.data) x_position_before = env.data.qpos[0] @@ -382,7 +382,7 @@ def test_ant_com(version: str): def test_set_state(version: str): """Simple Test to verify that `mujocoEnv.set_state()` works correctly.""" env = gym.make(f"Hopper-{version}").unwrapped - assert isinstance(env, (BaseMujocoEnv, BaseMujocoPyEnv)) + assert isinstance(env, (MujocoEnv, BaseMujocoPyEnv)) env.reset() new_qpos = np.array( [0.00136962, 1.24769787, -0.00459026, -0.00483472, 0.0031327, 0.00412756] @@ -405,7 +405,7 @@ def test_set_state(version: str): def test_distance_from_origin_info(env_id: str): """Verify that `info"distance_from_origin"` is correct.""" env = gym.make(env_id).unwrapped - assert isinstance(env, (BaseMujocoEnv, BaseMujocoPyEnv)) + assert isinstance(env, (MujocoEnv, BaseMujocoPyEnv)) env.reset() _, _, _, _, info = env.step(env.action_space.sample()) @@ -487,7 +487,7 @@ def test_reset_info(env_name: str, version: str): def test_inverted_double_pendulum_max_height(version: str): """Verify the max height of Inverted Double Pendulum.""" env = gym.make(f"InvertedDoublePendulum-{version}", reset_noise_scale=0).unwrapped - assert isinstance(env, (BaseMujocoEnv, BaseMujocoPyEnv)) + assert isinstance(env, (MujocoEnv, BaseMujocoPyEnv)) env.reset() y = env.data.site_xpos[0][2] @@ -498,7 +498,7 @@ def test_inverted_double_pendulum_max_height(version: str): def test_inverted_double_pendulum_max_height_old(version: str): """Verify the max height of Inverted Double Pendulum (v4 does not have `reset_noise_scale` argument).""" env = gym.make(f"InvertedDoublePendulum-{version}").unwrapped - assert isinstance(env, (BaseMujocoEnv, BaseMujocoPyEnv)) + assert isinstance(env, (MujocoEnv, BaseMujocoPyEnv)) env.set_state(env.init_qpos, env.init_qvel) y = env.data.site_xpos[0][2] @@ -510,7 +510,7 @@ def test_inverted_double_pendulum_max_height_old(version: str): def test_model_object_count(version: str): """Verify that all the objects of the model are loaded, mostly useful for using non-mujoco simulator.""" env = gym.make(f"Ant-{version}").unwrapped - assert isinstance(env, (BaseMujocoEnv, BaseMujocoPyEnv)) + assert isinstance(env, (MujocoEnv, BaseMujocoPyEnv)) assert env.model.nq == 15 assert env.model.nv == 14 assert env.model.nu == 8 @@ -521,7 +521,7 @@ def test_model_object_count(version: str): assert env.model.ntendon == 0 env = gym.make(f"HalfCheetah-{version}").unwrapped - assert isinstance(env, (BaseMujocoEnv, BaseMujocoPyEnv)) + assert isinstance(env, (MujocoEnv, BaseMujocoPyEnv)) assert env.model.nq == 9 assert env.model.nv == 9 assert env.model.nu == 6 @@ -532,7 +532,7 @@ def test_model_object_count(version: str): assert env.model.ntendon == 0 env = gym.make(f"Hopper-{version}").unwrapped - assert isinstance(env, (BaseMujocoEnv, BaseMujocoPyEnv)) + assert isinstance(env, (MujocoEnv, BaseMujocoPyEnv)) assert env.model.nq == 6 assert env.model.nv == 6 assert env.model.nu == 3 @@ -543,7 +543,7 @@ def test_model_object_count(version: str): assert env.model.ntendon == 0 env = gym.make(f"Humanoid-{version}").unwrapped - assert isinstance(env, (BaseMujocoEnv, BaseMujocoPyEnv)) + assert isinstance(env, (MujocoEnv, BaseMujocoPyEnv)) assert env.model.nq == 24 assert env.model.nv == 23 assert env.model.nu == 17 @@ -554,7 +554,7 @@ def test_model_object_count(version: str): assert env.model.ntendon == 2 env = gym.make(f"HumanoidStandup-{version}").unwrapped - assert isinstance(env, (BaseMujocoEnv, BaseMujocoPyEnv)) + assert isinstance(env, (MujocoEnv, BaseMujocoPyEnv)) assert env.model.nq == 24 assert env.model.nv == 23 assert env.model.nu == 17 @@ -565,7 +565,7 @@ def test_model_object_count(version: str): assert env.model.ntendon == 2 env = gym.make(f"InvertedDoublePendulum-{version}").unwrapped - assert isinstance(env, (BaseMujocoEnv, BaseMujocoPyEnv)) + assert isinstance(env, (MujocoEnv, BaseMujocoPyEnv)) assert env.model.nq == 3 assert env.model.nv == 3 assert env.model.nu == 1 @@ -576,7 +576,7 @@ def test_model_object_count(version: str): assert env.model.ntendon == 0 env = gym.make(f"InvertedPendulum-{version}").unwrapped - assert isinstance(env, (BaseMujocoEnv, BaseMujocoPyEnv)) + assert isinstance(env, (MujocoEnv, BaseMujocoPyEnv)) assert env.model.nq == 2 assert env.model.nv == 2 assert env.model.nu == 1 @@ -588,7 +588,7 @@ def test_model_object_count(version: str): if not (version == "v4" and mujoco.__version__ >= "3.0.0"): env = gym.make(f"Pusher-{version}").unwrapped - assert isinstance(env, (BaseMujocoEnv, BaseMujocoPyEnv)) + assert isinstance(env, (MujocoEnv, BaseMujocoPyEnv)) assert env.model.nq == 11 assert env.model.nv == 11 assert env.model.nu == 7 @@ -607,7 +607,7 @@ def test_model_object_count(version: str): assert env.model.ntendon == 0 env = gym.make(f"Reacher-{version}").unwrapped - assert isinstance(env, (BaseMujocoEnv, BaseMujocoPyEnv)) + assert isinstance(env, (MujocoEnv, BaseMujocoPyEnv)) assert env.model.nq == 4 assert env.model.nv == 4 assert env.model.nu == 2 @@ -619,7 +619,7 @@ def test_model_object_count(version: str): assert env.model.ntendon == 0 env = gym.make(f"Swimmer-{version}").unwrapped - assert isinstance(env, (BaseMujocoEnv, BaseMujocoPyEnv)) + assert isinstance(env, (MujocoEnv, BaseMujocoPyEnv)) assert env.model.nq == 5 assert env.model.nv == 5 assert env.model.nu == 2 @@ -631,7 +631,7 @@ def test_model_object_count(version: str): assert env.model.ntendon == 0 env = gym.make(f"Walker2d-{version}").unwrapped - assert isinstance(env, (BaseMujocoEnv, BaseMujocoPyEnv)) + assert isinstance(env, (MujocoEnv, BaseMujocoPyEnv)) assert env.model.nq == 9 assert env.model.nv == 9 assert env.model.nu == 6 @@ -669,8 +669,8 @@ def test_dt(): env_b = gym.make( "Ant-v5", include_cfrc_ext_in_observation=False, frame_skip=1 ).unwrapped - assert isinstance(env_a, BaseMujocoEnv) - assert isinstance(env_b, BaseMujocoEnv) + assert isinstance(env_a, MujocoEnv) + assert isinstance(env_b, MujocoEnv) env_b.model.opt.timestep = 0.05 assert env_a.dt == env_b.dt @@ -705,7 +705,7 @@ def test_dt(): ) def test_reset_noise_scale(env_id): """Checks that when `reset_noise_scale=0` we have deterministic initialization.""" - env: BaseMujocoEnv = gym.make(env_id, reset_noise_scale=0).unwrapped + env = gym.make(env_id, reset_noise_scale=0).unwrapped env.reset() assert np.all(env.data.qpos == env.init_qpos)