gazebo_world_manager/scene_mgr/manager.py
2025-08-11 20:03:19 +08:00

128 lines
4.4 KiB
Python

import dataclasses
import importlib
import json
import os
import signal
import subprocess
import threading
from .scene_types import PreloadModel, Scene
class SceneManager:
def __init__(self):
self.scene_name_dict = {}
self.scene_id_dict = {}
self.running_scene = None
self.sim_process = None
self.sim_thread = None
self.scene_plugin = None
self.load_scene()
def load_scene(self):
"""
从场景文件夹加载场景。
"""
scene_dir = os.path.join(os.path.dirname(__file__), '..', 'scene')
if not os.path.exists(scene_dir):
raise RuntimeError('no scene exist')
names = [name for name in os.listdir(
scene_dir) if os.path.isdir(os.path.join(scene_dir, name))]
for name in names:
scene = Scene(name)
# 启动文件
scene.launch_file = os.path.join(scene_dir, name, 'launch.sh')
# 脚本
scene.scripts = {}
if not os.path.exists(scene.launch_file):
continue
for script_name in os.listdir(os.path.join(scene_dir, name, 'scripts')):
if script_name.endswith('.sh'):
script_name_only = os.path.splitext(script_name)[0]
scene.scripts[script_name_only] = os.path.join(
scene_dir, name, 'scripts', script_name)
# 配置文件
config_file = os.path.join(scene_dir, name, 'config.json')
# 需要预加载的模型
with open(config_file, 'r') as f:
config = json.load(f)
for model_data in config.get('preload_models', []):
pmodel = PreloadModel.from_dict(model_data)
scene.preload_models.append(pmodel)
self.scene_name_dict[name] = scene
self.scene_id_dict[scene.id] = scene
def get_scene(self):
return list(self.scene_name_dict.values())
def get_scripts(self, name):
return list(self.scene_name_dict[name].scripts.keys())
def start_scene(self, id):
name = self.scene_id_dict[id].name
if self.running_scene:
raise RuntimeError(
f'scene {self.running_scene} is already running')
if name not in self.scene_name_dict:
raise ValueError(f'scene {name} not found')
self.pid_file = os.path.join(
os.path.dirname(__file__), 'scene', name, 'pids.txt')
if os.path.exists(self.pid_file):
os.remove(self.pid_file)
self.sim_process = subprocess.Popen(
['bash', self.scene_name_dict[name].launch_file, self.pid_file],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
self.running_scene = id
def shut_scene(self):
with open(self.pid_file, 'r') as f:
pids = [line.strip() for line in f if line.strip()]
for pid in pids:
try:
os.kill(int(pid), signal.SIGTERM)
print(f"killed PID: {pid}")
except ProcessLookupError:
print(f"process PID: {pid} already gone")
except ValueError:
print(f"invalid PID: {pid}")
os.remove(self.pid_file)
self.pid_file = None
self.sim_process.terminate()
print(f'killed launch PID: {self.sim_process.pid}')
# 杀死gazebo和rviz
subprocess.Popen(['pkill', '-15', '-f', 'gzserver'])
subprocess.Popen(['pkill', '-15', '-f', 'gzclient'])
subprocess.Popen(['pkill', '-15', '-f', 'rviz'])
self.sim_process = None
self.running_scene = None
def shut_scene_truly(self):
id = self.running_scene
self.shut_scene()
time.sleep(2)
self.start_scene(id)
time.sleep(15)
self.shut_scene()
def run_script(self, name):
if not self.running_scene:
raise RuntimeError('no scene running')
script_path = self.scene_id_dict[self.running_scene].scripts.get(
name, None)
if not script_path:
raise ValueError(f'script {name} not found')
subprocess.Popen(['bash', script_path])
if __name__ == '__main__':
mgr = SceneManager()
mgr.load_scene()
print(mgr.get_scene())
print(mgr.get_scripts('grasp_box'))
mgr.start_scene('grasp_box')
import time
time.sleep(40)
mgr.shut_scene()