Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Opendrive map support from SUMO #744

Merged
merged 4 commits into from
Jul 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion metadrive/component/map/scenario_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class ScenarioMap(BaseMap):
def __init__(self, map_index, map_data, random_seed=None):
self.map_index = map_index
self.map_data = map_data
self.need_lane_localization = self.engine.global_config["need_lane_localization"]
self.need_lane_localization = self.engine.global_config.get("need_lane_localization", False)
super(ScenarioMap, self).__init__(dict(id=self.map_index), random_seed=random_seed)

def show_coordinates(self):
Expand Down
42 changes: 42 additions & 0 deletions metadrive/manager/sumo_map_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from metadrive.component.map.scenario_map import ScenarioMap
from metadrive.manager.base_manager import BaseManager
from metadrive.utils.sumo.map_utils import extract_map_features, RoadLaneJunctionGraph


class SumoMapManager(BaseManager):
"""
It currently only support load one map into the simulation.
"""
PRIORITY = 0 # Map update has the most high priority

def __init__(self, sumo_map_path):
"""
Init the map manager. It can be extended to manage more maps
"""
super(SumoMapManager, self).__init__()
self.current_map = None
self.graph = RoadLaneJunctionGraph(sumo_map_path)
self.map_feature = extract_map_features(self.graph)

def destroy(self):
"""
Delete the map manager
"""
self.current_map.destroy()
super(SumoMapManager, self).destroy()
self.current_map = None

def before_reset(self):
"""
Detach existing maps
"""
if self.current_map:
self.current_map.detach_from_world()

def reset(self):
"""
Rebuild the map and load it into the scene
"""
if not self.current_map:
self.current_map = ScenarioMap(map_index=0, map_data=self.map_feature)
self.current_map.attach_to_world()
50 changes: 50 additions & 0 deletions metadrive/tests/vis_env/vis_sumo_map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""use netconvert --opendrive-files CARLA_town01.net.xml first"""

from metadrive.envs import BaseEnv
from metadrive.obs.observation_base import DummyObservation
import logging
from metadrive.manager.sumo_map_manager import SumoMapManager
from metadrive.engine.asset_loader import AssetLoader


class MyEnv(BaseEnv):
def reward_function(self, agent):
"""Dummy reward function."""
return 0, {}

def cost_function(self, agent):
"""Dummy cost function."""
return 0, {}

def done_function(self, agent):
"""Dummy done function."""
return False, {}

def get_single_observation(self):
"""Dummy observation function."""
return DummyObservation()

def setup_engine(self):
"""Register the map manager"""
super().setup_engine()
map_path = AssetLoader.file_path("carla", "CARLA_town01.net.xml", unix_style=False)
self.engine.register_manager("map_manager", SumoMapManager(map_path))


if __name__ == "__main__":
# create env
env = MyEnv(
dict(
use_render=True,
# if you have a screen and OpenGL suppor, you can set use_render=True to use 3D rendering
vehicle_config={"spawn_position_heading": [(0, 0), 0]},
manual_control=True, # we usually manually control the car to test environment
use_mesh_terrain=True,
log_level=logging.CRITICAL
)
) # suppress logging message
env.reset()
for i in range(10000):
# step
obs, reward, termination, truncate, info = env.step(env.action_space.sample())
env.close()
Empty file.
Loading
Loading