import json import os import signal import subprocess import time from .scene_types import PreloadModel, Scene class SceneManager: def __init__(self): self.scene_name_dict = {} self.scene_id_dict = {} self.running_scene = None self.sim_process = None self.sim_thread = None self.scene_plugin = None self.load_scene() def load_scene(self): """ 从场景文件夹加载场景。 """ scene_dir = os.path.join(os.path.dirname(__file__), '..', 'scene') if not os.path.exists(scene_dir): raise RuntimeError('no scene exist') names = [name for name in os.listdir( scene_dir) if os.path.isdir(os.path.join(scene_dir, name))] for name in names: scene = Scene(name) # 启动文件 scene.launch_file = os.path.join(scene_dir, name, 'launch.sh') # 脚本 scene.scripts = {} if not os.path.exists(scene.launch_file): continue for script_name in os.listdir(os.path.join(scene_dir, name, 'scripts')): if script_name.endswith('.sh'): script_name_only = os.path.splitext(script_name)[0] scene.scripts[script_name_only] = os.path.join( scene_dir, name, 'scripts', script_name) # 配置文件 config_file = os.path.join(scene_dir, name, 'config.json') # 需要预加载的模型 with open(config_file, 'r') as f: config = json.load(f) for model_data in config.get('preload_models', []): pmodel = PreloadModel.from_dict(model_data) scene.preload_models.append(pmodel) self.scene_name_dict[name] = scene self.scene_id_dict[scene.id] = scene def get_scenes(self): return list(self.scene_name_dict.values()) def get_scripts(self, name): return list(self.scene_name_dict[name].scripts.keys()) def start_scene(self, id): name = self.scene_id_dict[id].name if self.running_scene: raise RuntimeError( f'scene {self.running_scene} is already running') if name not in self.scene_name_dict: raise ValueError(f'scene {name} not found') self.pid_file = os.path.join( os.path.dirname(__file__), '..', 'scene', name, 'pids.txt') if os.path.exists(self.pid_file): os.remove(self.pid_file) self.sim_process = subprocess.Popen( ['bash', self.scene_name_dict[name].launch_file, self.pid_file], stdout=subprocess.PIPE, stderr=subprocess.PIPE ) self.running_scene = id def shut_scene(self): with open(self.pid_file, 'r') as f: pids = [line.strip() for line in f if line.strip()] for pid in pids: try: os.kill(int(pid), signal.SIGTERM) print(f"killed PID: {pid}") except ProcessLookupError: print(f"process PID: {pid} already gone") except ValueError: print(f"invalid PID: {pid}") os.remove(self.pid_file) self.pid_file = None self.sim_process.terminate() print(f'killed launch PID: {self.sim_process.pid}') self.kill_all_processes() self.running_scene = None def run_script(self, name): if not self.running_scene: raise RuntimeError('no scene running') script_path = self.scene_id_dict[self.running_scene].scripts.get( name, None) if not script_path: raise ValueError(f'script {name} not found') subprocess.Popen(['bash', script_path]) def kill_all_processes(self): try: # 获取所有进程 output = subprocess.check_output(["ps", "aux"]).decode() # 过滤出目标进程 target_pids = [] for line in output.splitlines(): # 排除包含"gazebo_world_manager"的进程 if any(x in line for x in ["ros", "rviz", "gazebo"]) and "grep" not in line and "gazebo_world_manager" not in line: target_pids.append(line.split()[1]) # 提取PID if target_pids: # 批量kill subprocess.run(["kill", "-9"] + target_pids) print(f"Killed PIDs: {', '.join(target_pids)}") else: print("No matching processes running.") except subprocess.CalledProcessError as e: print(f"Error: {e}") if __name__ == '__main__': mgr = SceneManager() mgr.load_scene() print(mgr.get_scene()) print(mgr.get_scripts('grasp_box')) mgr.start_scene('grasp_box') import time time.sleep(40) mgr.shut_scene()