136 lines
4.8 KiB
Python
136 lines
4.8 KiB
Python
import json
|
|
import os
|
|
import signal
|
|
import subprocess
|
|
import time
|
|
|
|
from .scene_types import PreloadModel, Scene
|
|
|
|
|
|
class SceneManager:
|
|
def __init__(self):
|
|
self.scene_name_dict = {}
|
|
self.scene_id_dict = {}
|
|
self.running_scene = None
|
|
self.sim_process = None
|
|
self.sim_thread = None
|
|
self.scene_plugin = None
|
|
self.load_scene()
|
|
|
|
def load_scene(self):
|
|
"""
|
|
从场景文件夹加载场景。
|
|
"""
|
|
scene_dir = os.path.join(os.path.dirname(__file__), '..', 'scene')
|
|
if not os.path.exists(scene_dir):
|
|
raise RuntimeError('no scene exist')
|
|
names = [name for name in os.listdir(
|
|
scene_dir) if os.path.isdir(os.path.join(scene_dir, name))]
|
|
for name in names:
|
|
scene = Scene(name)
|
|
# 启动文件
|
|
scene.launch_file = os.path.join(scene_dir, name, 'launch.sh')
|
|
# 脚本
|
|
scene.scripts = {}
|
|
if not os.path.exists(scene.launch_file):
|
|
continue
|
|
for script_name in os.listdir(os.path.join(scene_dir, name, 'scripts')):
|
|
if script_name.endswith('.sh'):
|
|
script_name_only = os.path.splitext(script_name)[0]
|
|
scene.scripts[script_name_only] = os.path.join(
|
|
scene_dir, name, 'scripts', script_name)
|
|
# 配置文件
|
|
config_file = os.path.join(scene_dir, name, 'config.json')
|
|
# 需要预加载的模型
|
|
with open(config_file, 'r') as f:
|
|
config = json.load(f)
|
|
for model_data in config.get('preload_models', []):
|
|
pmodel = PreloadModel.from_dict(model_data)
|
|
scene.preload_models.append(pmodel)
|
|
self.scene_name_dict[name] = scene
|
|
self.scene_id_dict[scene.id] = scene
|
|
|
|
def get_scenes(self):
|
|
return list(self.scene_name_dict.values())
|
|
|
|
def get_scripts(self, name):
|
|
return list(self.scene_name_dict[name].scripts.keys())
|
|
|
|
def start_scene(self, id):
|
|
name = self.scene_id_dict[id].name
|
|
if self.running_scene:
|
|
raise RuntimeError(
|
|
f'scene {self.running_scene} is already running')
|
|
if name not in self.scene_name_dict:
|
|
raise ValueError(f'scene {name} not found')
|
|
self.pid_file = os.path.join(
|
|
os.path.dirname(__file__), '..', 'scene', name, 'pids.txt')
|
|
if os.path.exists(self.pid_file):
|
|
os.remove(self.pid_file)
|
|
self.sim_process = subprocess.Popen(
|
|
['bash', self.scene_name_dict[name].launch_file, self.pid_file],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE
|
|
)
|
|
self.running_scene = id
|
|
|
|
def shut_scene(self):
|
|
with open(self.pid_file, 'r') as f:
|
|
pids = [line.strip() for line in f if line.strip()]
|
|
for pid in pids:
|
|
try:
|
|
os.kill(int(pid), signal.SIGTERM)
|
|
print(f"killed PID: {pid}")
|
|
except ProcessLookupError:
|
|
print(f"process PID: {pid} already gone")
|
|
except ValueError:
|
|
print(f"invalid PID: {pid}")
|
|
os.remove(self.pid_file)
|
|
self.pid_file = None
|
|
self.sim_process.terminate()
|
|
print(f'killed launch PID: {self.sim_process.pid}')
|
|
self.kill_all_processes()
|
|
self.running_scene = None
|
|
|
|
def run_script(self, name):
|
|
if not self.running_scene:
|
|
raise RuntimeError('no scene running')
|
|
script_path = self.scene_id_dict[self.running_scene].scripts.get(
|
|
name, None)
|
|
if not script_path:
|
|
raise ValueError(f'script {name} not found')
|
|
subprocess.Popen(['bash', script_path])
|
|
|
|
def kill_all_processes(self):
|
|
try:
|
|
# 获取所有进程
|
|
output = subprocess.check_output(["ps", "aux"]).decode()
|
|
|
|
# 过滤出目标进程
|
|
target_pids = []
|
|
for line in output.splitlines():
|
|
# 排除包含"gazebo_world_manager"的进程
|
|
if any(x in line for x in ["ros", "rviz", "gazebo"]) and "grep" not in line and "gazebo_world_manager" not in line:
|
|
target_pids.append(line.split()[1]) # 提取PID
|
|
|
|
if target_pids:
|
|
# 批量kill
|
|
subprocess.run(["kill", "-9"] + target_pids)
|
|
print(f"Killed PIDs: {', '.join(target_pids)}")
|
|
else:
|
|
print("No matching processes running.")
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Error: {e}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
mgr = SceneManager()
|
|
mgr.load_scene()
|
|
print(mgr.get_scene())
|
|
print(mgr.get_scripts('grasp_box'))
|
|
mgr.start_scene('grasp_box')
|
|
import time
|
|
time.sleep(40)
|
|
mgr.shut_scene()
|