diff --git a/model/gym-interface/py/ns3ai_gym_env/envs/ns3_environment.py b/model/gym-interface/py/ns3ai_gym_env/envs/ns3_environment.py index cbbbe39..e50473e 100644 --- a/model/gym-interface/py/ns3ai_gym_env/envs/ns3_environment.py +++ b/model/gym-interface/py/ns3ai_gym_env/envs/ns3_environment.py @@ -1,8 +1,11 @@ -import numpy as np +from pathlib import Path +from typing import Any + import gymnasium as gym -from gymnasium import spaces import messages_pb2 as pb import ns3ai_gym_msg_py as py_binding +import numpy as np +from gymnasium import spaces from ns3ai_utils import Experiment @@ -271,11 +274,18 @@ def get_state(self): extraInfo = {"info": self.get_extra_info()} return obs, reward, done, False, extraInfo - def __init__(self, targetName, ns3Path, ns3Settings=None, shmSize=4096): + def __init__( + self, + targetName: str | Path, + ns3Path: str, + ns3Settings: dict[str, Any] | None = None, + debug: bool = False, + shmSize=4096, + ): if self._created: raise Exception('Error: Ns3Env is singleton') self._created = True - self.exp = Experiment(targetName, ns3Path, py_binding, shmSize=shmSize) + self.exp = Experiment(targetName, ns3Path, py_binding, debug=debug, shmSize=shmSize) self.ns3Settings = ns3Settings self.newStateRx = False diff --git a/python_utils/ns3ai_utils.py b/python_utils/ns3ai_utils.py index efe605a..07fe5a8 100644 --- a/python_utils/ns3ai_utils.py +++ b/python_utils/ns3ai_utils.py @@ -18,43 +18,60 @@ # Muyuan Shen import os +import signal import subprocess -import psutil import time -import signal +from pathlib import Path +from typing import Any +import psutil SIMULATION_EARLY_ENDING = 0.5 # wait and see if the subprocess is running after creation -def get_setting(setting_map): - ret = '' +def get_setting(setting_map: dict[str, Any]) -> str: + ret = "" for key, value in setting_map.items(): - ret += ' --{}={}'.format(key, value) + ret += f" --{key}" + if value: + ret += f"={value}" return ret -def run_single_ns3(path, pname, setting=None, env=None, show_output=False): +def run_single_ns3( + path, + pname: str | Path, + setting: dict[str, Any] | None = None, + env=None, + show_output=False, + debug=False, +): if env is None: env = {} env.update(os.environ) - env['LD_LIBRARY_PATH'] = os.path.abspath(os.path.join(path, 'build', 'lib')) - # import pdb; pdb.set_trace() - exec_path = os.path.join(path, 'ns3') - if not setting: - cmd = '{} run {}'.format(exec_path, pname) + env["LD_LIBRARY_PATH"] = os.path.abspath(os.path.join(path, "build", "lib")) + if Path(pname).is_file(): + cmd = pname else: - cmd = '{} run {} --{}'.format(exec_path, pname, get_setting(setting)) + exec_path = os.path.join(path, "ns3") + cmd = f"{exec_path} run {pname} --" + if setting: + cmd += get_setting(setting) + if debug: + cmd = f"sleep infinity && {cmd}" if show_output: - proc = subprocess.Popen(cmd, shell=True, text=True, env=env, - stdin=subprocess.PIPE, - preexec_fn=os.setpgrp) + proc = subprocess.Popen(cmd, shell=True, text=True, env=env, stdin=subprocess.PIPE, preexec_fn=os.setpgrp) else: - proc = subprocess.Popen(cmd, shell=True, text=True, env=env, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - preexec_fn=os.setpgrp) + proc = subprocess.Popen( + cmd, + shell=True, + text=True, + env=env, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + preexec_fn=os.setpgrp, + ) return cmd, proc @@ -99,18 +116,26 @@ class Experiment: # \param[in] memSize : share memory size # \param[in] targetName : program name of ns3 # \param[in] path : current working directory - def __init__(self, targetName, ns3Path, msgModule, - handleFinish=False, - useVector=False, vectorSize=None, - shmSize=4096, - segName="My Seg", - cpp2pyMsgName="My Cpp to Python Msg", - py2cppMsgName="My Python to Cpp Msg", - lockableName="My Lockable"): + def __init__( + self, + targetName: str | Path, + ns3Path, + msgModule, + debug=False, + handleFinish=False, + useVector=False, + vectorSize=None, + shmSize=4096, + segName="My Seg", + cpp2pyMsgName="My Cpp to Python Msg", + py2cppMsgName="My Python to Cpp Msg", + lockableName="My Lockable", + ): if self._created: raise Exception('ns3ai_utils: Error: Experiment is singleton') self._created = True - self.targetName = targetName # ns-3 target name, not file name + self.targetName = targetName # ns-3 target name or file name + self.debug = debug os.chdir(ns3Path) self.msgModule = msgModule self.handleFinish = handleFinish @@ -144,10 +169,15 @@ def __del__(self): # run ns3 script in cmd with the setting being input # \param[in] setting : ns3 script input parameters(default : None) # \param[in] show_output : whether to show output or not(default : False) - def run(self, setting=None, show_output=False): + def run(self, setting: dict[str, Any] | None = None, show_output=False): self.kill() self.simCmd, self.proc = run_single_ns3( - './', self.targetName, setting=setting, show_output=show_output) + "./", + self.targetName, + setting=setting, + show_output=show_output, + debug=self.debug, + ) print("ns3ai_utils: Running ns-3 with: ", self.simCmd) # exit if an early error occurred, such as wrong target name time.sleep(SIMULATION_EARLY_ENDING)