112 lines
3.6 KiB
Python
112 lines
3.6 KiB
Python
import dataclasses
|
|
import importlib
|
|
import os
|
|
import signal
|
|
import subprocess
|
|
import threading
|
|
|
|
import scene_plugin
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class Scene:
|
|
name: str
|
|
launch_file: str = None
|
|
scripts: dict = None
|
|
|
|
|
|
class SceneManager:
|
|
def __init__(self):
|
|
self.scene_dict = {}
|
|
self.running_scene = None
|
|
self.sim_process = None
|
|
self.sim_thread = None
|
|
self.scene_plugin = None
|
|
self.load_scene()
|
|
|
|
def load_scene(self):
|
|
"""
|
|
从场景文件夹加载场景。
|
|
"""
|
|
scene_dir = os.path.join(os.path.dirname(__file__), 'scene')
|
|
if not os.path.exists(scene_dir):
|
|
raise RuntimeError('no scene exist')
|
|
names = [name for name in os.listdir(
|
|
scene_dir) if os.path.isdir(os.path.join(scene_dir, name))]
|
|
for name in names:
|
|
scene = Scene(name)
|
|
scene.launch_file = os.path.join(scene_dir, name, 'launch.sh')
|
|
scene.scripts = {}
|
|
if not os.path.exists(scene.launch_file):
|
|
continue
|
|
for script_name in os.listdir(os.path.join(scene_dir, name, 'scripts')):
|
|
if script_name.endswith('.sh'):
|
|
script_name_only = os.path.splitext(script_name)[0]
|
|
scene.scripts[script_name_only] = os.path.join(
|
|
scene_dir, name, 'scripts', script_name)
|
|
self.scene_dict[name] = scene
|
|
|
|
def get_scene(self):
|
|
return list(self.scene_dict.keys())
|
|
|
|
def get_scripts(self, name):
|
|
return list(self.scene_dict[name].scripts.keys())
|
|
|
|
def start_scene(self, name):
|
|
if self.running_scene:
|
|
raise RuntimeError(
|
|
f'scene {self.running_scene} is already running')
|
|
if name not in self.scene_dict:
|
|
raise ValueError(f'scene {name} not found')
|
|
self.pid_file = os.path.join(
|
|
os.path.dirname(__file__), 'scene', name, 'pids.txt')
|
|
if os.path.exists(self.pid_file):
|
|
os.remove(self.pid_file)
|
|
self.sim_process = subprocess.Popen(
|
|
['bash', self.scene_dict[name].launch_file, self.pid_file],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE
|
|
)
|
|
scene_plugin_module = importlib.import_module(f'scene.{name}.plugin')
|
|
self.scene_plugin = scene_plugin_module.Plugin(name)
|
|
self.running_scene = name
|
|
|
|
def shut_scene(self):
|
|
with open(self.pid_file, 'r') as f:
|
|
pids = [line.strip() for line in f if line.strip()]
|
|
for pid in pids:
|
|
try:
|
|
os.kill(int(pid), signal.SIGTERM)
|
|
print(f"killed PID: {pid}")
|
|
except ProcessLookupError:
|
|
print(f"process PID: {pid} already gone")
|
|
except ValueError:
|
|
print(f"invalid PID: {pid}")
|
|
# os.remove(self.pid_file)
|
|
self.pid_file = None
|
|
print(self.sim_process.returncode)
|
|
self.sim_process.kill()
|
|
self.sim_process = None
|
|
self.running_scene = None
|
|
self.scene_plugin = None
|
|
|
|
def run_script(self, name):
|
|
if not self.running_scene:
|
|
raise RuntimeError('no scene running')
|
|
script_path = self.scene_dict[self.running_scene].scripts.get(
|
|
name, None)
|
|
if not script_path:
|
|
raise ValueError(f'script {name} not found')
|
|
subprocess.Popen(['bash', script_path])
|
|
|
|
|
|
if __name__ == '__main__':
|
|
mgr = SceneManager()
|
|
mgr.load_scene()
|
|
print(mgr.get_scene())
|
|
print(mgr.get_scripts('grasp_box'))
|
|
mgr.start_scene('grasp_box')
|
|
import time
|
|
time.sleep(40)
|
|
mgr.shut_scene()
|