gazebo_world_manager/scene_mgr.py
2025-08-10 11:58:37 +08:00

94 lines
3.0 KiB
Python

import dataclasses
import importlib
import os
import subprocess
import threading
import scene_plugin
@dataclasses.dataclass
class Scene:
name: str
launch_file: str = None
scripts: dict = None
class SceneManager:
def __init__(self):
self.scene_dict = {}
self.running_scene = None
self.sim_process = None
self.sim_thread = None
self.scene_plugin = None
def load_scene(self):
"""
从场景文件夹加载场景。
"""
scene_dir = os.path.join(os.path.dirname(__file__), 'scene')
if not os.path.exists(scene_dir):
raise RuntimeError('no scene exist')
names = [name for name in os.listdir(
scene_dir) if os.path.isdir(os.path.join(scene_dir, name))]
for name in names:
scene = Scene(name)
scene.launch_file = os.path.join(scene_dir, name, 'launch.sh')
scene.scripts = {}
if not os.path.exists(scene.launch_file):
continue
for script_name in os.listdir(os.path.join(scene_dir, name, 'scripts')):
if script_name.endswith('.sh'):
script_name_only = os.path.splitext(script_name)[0]
scene.scripts[script_name_only] = os.path.join(
scene_dir, name, 'scripts', script_name)
self.scene_dict[name] = scene
def get_scene(self):
return list(self.scene_dict.keys())
def get_scripts(self, name):
return list(self.scene_dict[name].scripts.keys())
def start_scene(self, name):
if self.running_scene:
raise RuntimeError(
f'scene {self.running_scene} is already running')
if name not in self.scene_dict:
raise ValueError(f'scene {name} not found')
scene_plugin_module = importlib.import_module(f'scene.{name}.plugin')
self.scene_plugin = scene_plugin_module.Plugin(name)
self.running_scene = name
self.sim_process = subprocess.Popen(
['bash', self.scene_dict[name].launch_file],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
def shut_scene(self):
# TODO: 并没有正确终止仿真系列进程
self.sim_process.terminate()
self.sim_process = None
self.running_scene = None
self.scene_plugin = None
def run_script(self, name):
if not self.running_scene:
raise RuntimeError('no scene running')
script_path = self.scene_dict[self.running_scene].scripts.get(name, None)
if not script_path:
raise ValueError(f'script {name} not found')
subprocess.Popen(['bash', script_path])
if __name__ == '__main__':
mgr = SceneManager()
mgr.load_scene()
print(mgr.get_scene())
print(mgr.get_scripts('grasp-box'))
mgr.start_scene('grasp-box')
import time
time.sleep(20)
mgr.run_script('grasp_box_example')
time.sleep(60)
mgr.shut_scene()