diff --git a/gazebo_ctrl.py b/gazebo_ctrl.py index a7ea00b..cb8ecce 100644 --- a/gazebo_ctrl.py +++ b/gazebo_ctrl.py @@ -157,9 +157,6 @@ class GazeboROSController: m_state = states[name] info = self.format_model_state(m_state) info['name'] = name - # 获取箱子质量 - if name == 'block': - info['mass'] = self.get_block_mass() model_info.append(info) return model_info diff --git a/http_server.py b/http_server.py index 6d5ca39..28e5e57 100644 --- a/http_server.py +++ b/http_server.py @@ -10,6 +10,7 @@ import werkzeug.serving from flask import jsonify, request import gazebo_ctrl +import scene_mgr @dataclasses.dataclass @@ -80,6 +81,8 @@ class GazeboSimHttpServer(HttpRPCServer): def __init__(self): super().__init__() + self.scene_mgr = scene_mgr.SceneManager() + self.gazebo_ctrl = gazebo_ctrl.GazeboROSController() # Gazebo仿真/程序控制 self.add_route('/simulation/scene', self.get_simulation_scene, ['GET']) @@ -109,57 +112,41 @@ class GazeboSimHttpServer(HttpRPCServer): # ===== 处理函数实现 ===== def get_simulation_scene(self): - scene_dir = os.path.join(os.path.dirname(__file__), 'scene') - if not os.path.exists(scene_dir): - return jsonify([]) - scenes = [name for name in os.listdir( - scene_dir) if os.path.isdir(os.path.join(scene_dir, name))] + try: + scenes = self.scene_mgr.get_scene() + except Exception as e: + return str(e), 400 return jsonify(scenes) def start_simulation(self): scene = request.args.get('scene', 'grasp-box') - self.scene_name = scene - # 找到场景文件夹下的launch文件 - launch_file = os.path.join('scene', scene, 'launch.sh') - if not os.path.exists(launch_file): - return jsonify({'error': 'Launch file not found'}), 404 - # 加载场景插件 - plugin_py = os.path.join('scene', scene, 'plugin.py') - if not os.path.exists(plugin_py): - return jsonify({'error': 'Plugin file not found'}), 404 - self.scene_plugin_class = importlib.import_module(f'scene.{scene}.plugin') - self.scene_plugin = self.scene_plugin_class.Plugin(scene) - # 执行launch文件 - def run_sim(): - self.sim_process = subprocess.Popen(['bash', launch_file]) - - if hasattr(self, 'sim_thread') and self.sim_thread.is_alive(): - return jsonify({'error': 'Simulation already running'}), 400 - - self.sim_thread = threading.Thread(target=run_sim, daemon=True) - self.sim_thread.start() - self.gazebo_controller = gazebo_ctrl.GazeboROSController() + try: + self.scene_mgr.start_scene(scene) + except Exception as e: + return str(e), 400 return 'OK', 200 def shutdown_simulation(self): - if hasattr(self, 'sim_process') and self.sim_process: - self.sim_process.terminate() - self.sim_process = None - self.sim_thread.join(timeout=1) - self.sim_thread = None - self.scene_name = None - self.scene_plugin = None - self.scene_plugin_class = None - return jsonify({'status': 'stopped'}) + try: + self.scene_mgr.shut_scene() + except Exception as e: + return str(e), 400 + return 'OK', 200 def run_script(self): name = request.args.get('name') - if not name: - return jsonify({'error': 'Missing name'}), 400 - return jsonify({'status': 'script executed', 'name': name}) + try: + self.scene_mgr.run_script(name) + except Exception as e: + return str(e), 400 + return 'OK', 200 def continue_physics(self): - return jsonify({'status': 'physics continued'}) + try: + self.gazebo_ctr??? + except Exception as e: + return str(e), 400 + return 'OK', 200 def pause_physics(self): return jsonify({'status': 'physics paused'}) @@ -193,17 +180,17 @@ class GazeboSimHttpServer(HttpRPCServer): def get_scene_model_state(self, scene): if scene != self.scene_name: return jsonify({'error': f'running {self.scene_name} but try to access {scene}'}), 400 - return self.scene_plugin.get_scene_model_state() + return self.scene_mgr.scene_plugin.get_scene_model_state() def set_scene_model_state(self, scene): if scene != self.scene_name: return jsonify({'error': f'running {self.scene_name} but try to access {scene}'}), 400 - return self.scene_plugin.post_scene_model_state() + return self.scene_mgr.scene_plugin.post_scene_model_state() def spawn_scene_model(self, scene): if scene != self.scene_name: return jsonify({'error': f'running {self.scene_name} but try to access {scene}'}), 400 - return self.scene_plugin.post_scene_model_spawn() + return self.scene_mgr.scene_plugin.post_scene_model_spawn() if __name__ == '__main__': server = GazeboSimHttpServer() diff --git a/main.py b/main.py index d4e2e28..9f8c785 100755 --- a/main.py +++ b/main.py @@ -1,63 +1,5 @@ -import math -import os -import subprocess -import tempfile - -import rospy -from gazebo_msgs.srv import SpawnModel -from geometry_msgs.msg import Point, Pose, Quaternion -from tf.transformations import quaternion_from_euler - - -class GazeboWorldManager: - def __init__(self): - rospy.init_node('gazebo_world_manager') - self.spawn_service = rospy.ServiceProxy( - '/gazebo/spawn_urdf_model', SpawnModel) - - def spawn_apriltag_marker(self, model_name, tag_id, x=0, y=0, z=1): - """生成 AprilTag 标记模型""" - - xacro_file = "/home/leju/gazebo_world_manager/scene/grasp-box/models/fixed_tag.xacro" - - try: - # 添加包路径到 ROS_PACKAGE_PATH - ros_package_path = os.environ.get('ROS_PACKAGE_PATH', '') - custom_path = '/home/leju/gazebo_world_manager' - if custom_path not in ros_package_path.split(':'): - os.environ['ROS_PACKAGE_PATH'] = f"{custom_path}:{ros_package_path}" if ros_package_path else custom_path - - # 转换为 URDF - urdf_content = subprocess.check_output([ - 'rosrun', 'xacro', 'xacro', xacro_file, f'tag_id:={tag_id}' - ]).decode('utf-8') - - # 设置位姿 - pose = Pose( - position=Point(x=x, y=y, z=z), - orientation=Quaternion(*quaternion_from_euler(0, 0, 0)) - ) - - # 生成模型 - rospy.wait_for_service('/gazebo/spawn_urdf_model') - response = self.spawn_service( - model_name=model_name, - model_xml=urdf_content, - robot_namespace='', - initial_pose=pose, - reference_frame='world' - ) - - if response.success: - rospy.loginfo(f"Successfully spawned {model_name}") - else: - rospy.logerr( - f"Failed to spawn {model_name}: {response.status_message}") - except subprocess.CalledProcessError as e: - rospy.logerr(f"Failed to convert xacro to urdf: {e}") - +from http_server import GazeboSimHttpServer if __name__ == '__main__': - manager = GazeboWorldManager() - manager.spawn_apriltag_marker('mk', 0, 1, 1, 1.4) - # rospy.spin() + server = GazeboSimHttpServer() + server.start(hostname='0.0.0.0', port=12300) \ No newline at end of file diff --git a/scene/grasp-box/scripts/gen_models.py b/scene/grasp-box/gen_models.py similarity index 87% rename from scene/grasp-box/scripts/gen_models.py rename to scene/grasp-box/gen_models.py index fd5d923..389cb90 100644 --- a/scene/grasp-box/scripts/gen_models.py +++ b/scene/grasp-box/gen_models.py @@ -28,16 +28,17 @@ class GraspModelGenerator: initial_pose=pose, reference_frame='world' ) - if response.success: rospy.loginfo(f"Successfully spawned {model_name}") else: rospy.logerr( f"Failed to spawn {model_name}: {response.status_message}") + return False, f"Failed to spawn {model_name}: {response.status_message}" except subprocess.CalledProcessError as e: rospy.logerr(f"Failed to convert xacro to urdf: {e}") + return False, f"Failed to convert xacro to urdf: {e}" - def generate_table(self, name, x, y, height): + def generate_table(self, name, x=0, y=0, height=1): xacro_file = "/home/leju/gazebo_world_manager/scene/grasp-box/models/table.xacro" urdf_content = subprocess.check_output([ 'rosrun', 'xacro', 'xacro', xacro_file, @@ -47,7 +48,7 @@ class GraspModelGenerator: position=Point(x=x, y=y, z=0), orientation=Quaternion(*quaternion_from_euler(0, 0, 0)) ) - self._generate(name, urdf_content, pose) + return self._generate(name, urdf_content, pose) def generate_apriltag(self, name, tag_id=0, x=0, y=0, z=0, roll=0, pitch=0, yaw=0): """生成AprilTag标记模型""" @@ -60,7 +61,7 @@ class GraspModelGenerator: orientation=Quaternion( *quaternion_from_euler(roll, pitch, yaw)) ) - self._generate(name, urdf_content, pose) + return self._generate(name, urdf_content, pose) def generate_box_with_apriltag(self, name, l=0.25, w=0.2, h=0.2, mass=1, tag_id=0, x=0, y=0, z=0, roll=0, pitch=0, yaw=0): """生成带有AprilTag的盒子模型""" @@ -73,9 +74,9 @@ class GraspModelGenerator: orientation=Quaternion( *quaternion_from_euler(roll, pitch, yaw)) ) - self._generate(name, urdf_content, pose) + return self._generate(name, urdf_content, pose) if __name__ == '__main__': generator = GraspModelGenerator() - generator.generate_apriltag("tg2", 0, 2, 0, 1.1, 0, 1.57, 1.57) + generator.generate_apriltag("tg0", 0, 2, 0, 1.1, 0, 1.57, 0) diff --git a/scene/grasp-box/launch.sh b/scene/grasp-box/launch.sh index 84ccfa4..acae653 100755 --- a/scene/grasp-box/launch.sh +++ b/scene/grasp-box/launch.sh @@ -1,3 +1,63 @@ #!/bin/bash -# 启动仿真程序 \ No newline at end of file +# 检查 Python3 是否安装 +if ! command -v python3 &> /dev/null; then + echo -e "\033[31m错误:未找到 Python3,请确保已安装 Python3\033[0m" + exit 1 +fi + +# 检查 ROS 是否安装 +if ! command -v roslaunch &> /dev/null; then + echo -e "\033[31m错误:未找到 roslaunch,请确保已安装 ROS\033[0m" + exit 1 +fi + +# 执行 kuavo_tf2_web_republisher +echo -e "\033[32m启动 kuavo_tf2_web_republisher...\033[0m" +cd ~/kuavo_ros_application && source devel/setup.bash && roslaunch kuavo_tf2_web_republisher start_websocket_server.launch & +WEB_PID=$! +if [ $? -eq 0 ]; then + echo -e "\033[32mkuavo_tf2_web_republisher 启动成功 (PID: $WEB_PID)\033[0m" + echo "$WEB_PID" >> "$PID_FILE" +else + echo -e "\033[31mkuavo_tf2_web_republisher 启动失败\033[0m" + exit 1 +fi + +DELAY=5 +sleep $DELAY +# 执行 load_kuavo_gazebo_manipulate +echo -e "\033[32m启动 load_kuavo_gazebo_manipulate...\033[0m" +cd ~/kuavo-ros-opensource && source devel/setup.bash && roslaunch humanoid_controllers load_kuavo_gazebo_manipulate.launch joystick_type:=bt2pro & +GAZEBO_PID=$! +if [ $? -eq 0 ]; then + echo -e "\033[32mload_kuavo_gazebo_manipulate 启动成功 (PID: $GAZEBO_PID)\033[0m" + echo "$GAZEBO_PID" >> "$PID_FILE" +else + echo -e "\033[31mload_kuavo_gazebo_manipulate 启动失败\033[0m" + kill $WEB_PID 2>/dev/null + rm "$PID_FILE" 2>/dev/null + exit 1 +fi + +DELAY=10 +sleep $DELAY +# 执行 robot_strategies +echo -e "\033[32m启动 robot_strategies...\033[0m" +cd ~/kuavo-ros-opensource && source devel/setup.bash && roslaunch ar_control robot_strategies.launch & +STRATEGIES_PID=$! +if [ $? -eq 0 ]; then + echo -e "\033[32mrobot_strategies 启动成功 (PID: $STRATEGIES_PID)\033[0m" + echo "$STRATEGIES_PID" >> "$PID_FILE" +else + echo -e "\033[31mrobot_strategies 启动失败\033[0m" + kill $WEB_PID $GAZEBO_PID 2>/dev/null + rm "$PID_FILE" 2>/dev/null + exit 1 +fi + +# 清理后台进程 +echo -e "\033[32m清理后台 ROS 进程...\033[0m" +kill $WEB_PID $GAZEBO_PID $STRATEGIES_PID 2>/dev/null +rm "$PID_FILE" 2>/dev/null +echo -e "\033[32m已删除 PID 文件: $PID_FILE\033[0m" \ No newline at end of file diff --git a/scene/grasp-box/scripts/grasp_box_example.py b/scene/grasp-box/scripts/grasp_box_example.py new file mode 100644 index 0000000..2cd5092 --- /dev/null +++ b/scene/grasp-box/scripts/grasp_box_example.py @@ -0,0 +1,240 @@ +#!/usr/bin/env python3 +import time + +from kuavo_humanoid_sdk import (KuavoRobot, KuavoRobotState, KuavoRobotTools, + KuavoRobotVision, KuavoSDK) +from kuavo_humanoid_sdk.common.logger import SDKLogger +from kuavo_humanoid_sdk.interfaces.data_types import (AprilTagData, + PoseQuaternion) +from kuavo_humanoid_sdk.kuavo_strategy.grasp_box.grasp_box_strategy import ( + BoxInfo, KuavoGraspBox) + +""" + 安全须知: + 请注意 + 实物机器人当中因为搬箱子需要用到末端力控制, 请把 arm_transport_target_up 当中的sim_mode设置为False + 仿真机器人当中因为物理性质差异,所以不需要用到末端力控制 请把 arm_transport_target_up 当中的sim_mode设置为True + + 长春一汽绿色箱子质量为1.5kg, 请根据实际情况修改箱子质量 +""" +def main(real: bool = True): + start_time = time.time() # 记录开始时间 + # Initialize SDK + if not KuavoSDK().Init(): + print("Init KuavoSDK failed, exit!") + exit(1) + + SDKLogger.info("初始化机器人...") + + # 初始化机器人及相关组件 + robot = KuavoRobot() + robot_state = KuavoRobotState() + robot_tools = KuavoRobotTools() + robot_vision = KuavoRobotVision() + + # 初始化箱子抓取策略 + grasp_strategy = KuavoGraspBox(robot, robot_state, robot_tools, robot_vision) + + # 粘贴在箱子上的 AprilTag 信息: 需要 ID,尺寸和基于odom坐标系下的大致位姿 + box_tag = AprilTagData( + id=[1], # AprilTag ID + size=[0.1], # AprilTag 标签尺寸 + pose=[PoseQuaternion( + # TODO: 需要根据实际情况调整 + position=(0.0, -1.5, 0.8), # 基于odom坐标系下的大致位置, 查找时会对齐到这个方向 + orientation=(0.0, 0.0, 0.0, 1.0) # 四元数方向 + )] + ) + + # 粘贴在放置位置的 AprilTag 信息: 需要 ID,尺寸和基于odom坐标系下的大致位姿 + placement_tag = AprilTagData( + id=[0], # AprilTag ID + size=[0.1], # AprilTag 标签尺寸 + pose=[PoseQuaternion( + # TODO: 需要根据实际情况调整 + position=(0.0, 1.5, 1.5), # 基于odom坐标系下的大致位置, 查找时会对齐到这个方向 + orientation=(0.0, 0.0, 1.0, 0.0) # 四元数方向 - 旋转180度 + )] + ) + + # 创建箱子信息对象 + box_size = (0.3, 0.4, 0.22) # xyz( 宽, 长, 高) + box_mass = 1.5 + + time.sleep(1) + + # 执行完整抓取策略 + try: + SDKLogger.info("========== 开始执行箱子抓取策略 ==========") + + # !!! Important: 关闭 basePitch 限制, 否则无法进行大幅度的前后倾动作 会导致摔倒 + SDKLogger.info("⚠️ 重要提示: basePitch 限制需要关闭!!!") + SDKLogger.info("⚠️ 重要提示: basePitch 限制需要关闭!!!") + SDKLogger.info("⚠️ 重要提示: basePitch 限制需要关闭!!!") + grasp_strategy.robot.enable_base_pitch_limit(False) # 关闭 basePitch 限制 + # Retry up to 3 times to check pitch limit status + for i in range(3): + if not robot_state.pitch_limit_enabled(): + SDKLogger.info("✅ 已关闭 basePitch 限制") + break + else: + if i < 2: # Don't log on last attempt + SDKLogger.info(f"⚠️ 第{i+1}次检查 basePitch 限制状态...") + time.sleep(0.5) # Brief pause between retries + else: # Loop completed without break + SDKLogger.info("❌ 关闭 basePitch 限制失败") + return + # ----------------------------------------------------------- + + # 步骤1:使用头部寻找目标 + SDKLogger.info("1. 寻找目标箱子...") + grasp_strategy.robot.disable_head_tracking() + SDKLogger.info("✅ 已关闭头部追踪") + + find_success = grasp_strategy.head_find_target( + box_tag, + max_search_time=15.0, + search_pattern="rotate_body" # rotate_head + ) + + if not find_success: + SDKLogger.info("❌ 寻找目标失败,无法继续执行") + return + + SDKLogger.info("✅ 已找到目标箱子") + time.sleep(1) # 短暂暂停 + box_tag_data = robot_vision.get_data_by_id_from_odom(box_tag.id[0]) # 拿到箱子tag数据 odom系 + if not box_tag_data: + SDKLogger.info(f"❌ 未识别到 AprilTag ID 为{box_tag.id[0]} 的目标箱子") + return + + SDKLogger.info(f"box_tag_data: {box_tag_data}") + + box_info = BoxInfo.from_apriltag( + box_tag_data, + xyz_offset=(box_size[0]/2, 0.0, -0.00), # tag 粘贴在箱子正面,为了得到箱子中心点,因此偏移量为箱子宽度的一半 + size=box_size, # 箱子尺寸,单位:米 + mass=box_mass # 箱子重量,单位:千克 + ) + + SDKLogger.info(f"box_info: {box_info}") + + # 步骤2:走路接近目标 + # for i in range(1): + SDKLogger.info("2. 走路接近目标...") + approach_success = grasp_strategy.walk_approach_target( + box_tag.id[0], + target_distance=0.4, # 与目标箱子保持0.7米的距离 + approach_speed=0.2 # 接近速度0.2米/秒 + ) + + if not approach_success: + SDKLogger.info("❌ 接近目标失败,无法继续执行") + return + + SDKLogger.info("✅ 已成功接近目标") + time.sleep(1) # 短暂暂停 + + # 步骤3:手臂移动到抓取位置 + SDKLogger.info("3. 手臂移动到抓取位置...") + move_success = grasp_strategy.arm_move_to_target( + box_info, + arm_mode="manipulation_mpc" + ) + + if not move_success: + SDKLogger.info("❌ 手臂移动失败,无法继续执行") + return + + SDKLogger.info("✅ 手臂已到达抓取位置") + # grasp_strategy.robot.arm_reset() + time.sleep(1.0) # 短暂暂停 + + + # 步骤4:提起箱子 + SDKLogger.info("4. 提起箱子...") + transport_up_success = grasp_strategy.arm_transport_target_up( + box_info, + arm_mode="manipulation_mpc", + sim_mode=not real + ) + + if not transport_up_success: + SDKLogger.info("❌ 提起箱子失败") + return + time.sleep(1.0) # 短暂暂停 + + SDKLogger.info("✅ 成功提起箱子") + # grasp_strategy.robot.arm_reset() + time.sleep(1.0) # 展示一下成功提起的状态 + + # return # FIXME:测试提起箱子成功 + + # 步骤5:关闭头部追踪 + SDKLogger.info("5. 关闭头部追踪...") + grasp_strategy.robot.disable_head_tracking() + SDKLogger.info("✅ 已关闭头部追踪") + time.sleep(1.0) # 短暂暂停 + if not grasp_strategy.head_find_target( + placement_tag, + max_search_time=15.0, + search_pattern="rotate_body" # rotate_head + ): + SDKLogger.info("❌ 寻找目标失败,无法继续执行") + return + placement_tag_data = robot_vision.get_data_by_id_from_odom(placement_tag.id[0]) + if placement_tag_data is None: + SDKLogger.info(f"❌ 未识别到 AprilTag ID 为{placement_tag.id[0]} 的目标箱子") + return + + # 步骤6:移动到放置位置 + SDKLogger.info("6. 移动到放置位置...") + move_success = grasp_strategy.walk_approach_target( + placement_tag.id[0], + target_distance=0.4, + approach_speed=0.2, + ) + if not move_success: + SDKLogger.info("❌ 移动到放置位置失败") + return + time.sleep(1.0) # 短暂暂停 + + # 步骤7:放下箱子 + placement_box_info = BoxInfo.from_apriltag( + placement_tag_data, + xyz_offset=(box_size[0]/2, 0.0, -0.5), # tag 粘贴在货架上,需要把箱子放下距离货架的高度 -0.5m + size=box_size, # 箱子尺寸(长、宽、高),单位:米 + mass=box_mass # 箱子重量,单位:千克 + ) + SDKLogger.info("7. 放下箱子...") + transport_down_success = grasp_strategy.arm_transport_target_down( + placement_box_info, + arm_mode="manipulation_mpc" + ) + + if not transport_down_success: + SDKLogger.info("❌ 放下箱子失败") + return + + SDKLogger.info("✅ 成功放下箱子") + time.sleep(1.0) # 短暂暂停 + + # 步骤8: 回到初始位置 + SDKLogger.info("8. 回到初始位置...") + grasp_strategy.robot.control_command_pose_world(0, 0, 0, 0) + time.sleep(10) # 等待10s转身完成 + total_time = time.time() - start_time # 计算总时间 + SDKLogger.info(f"========== 搬箱子任务完成,总耗时: {total_time:.2f}秒 ==========") + except Exception as e: + SDKLogger.info(f"执行过程中发生错误: {e}") + finally: + # 确保机器人回到安全状态 + SDKLogger.info("将机器人恢复到安全姿态...") + # 这里可以添加使机器人回到默认姿态的代码 + +if __name__ == "__main__": + import argparse + parser = argparse.ArgumentParser() + parser.add_argument('--sim', action='store_true', default=False) + args, unknown = parser.parse_known_args() + main(not args.sim) \ No newline at end of file diff --git a/scene/grasp-box/scripts/grasp_box_example.sh b/scene/grasp-box/scripts/grasp_box_example.sh new file mode 100755 index 0000000..a2b3162 --- /dev/null +++ b/scene/grasp-box/scripts/grasp_box_example.sh @@ -0,0 +1,3 @@ +source ~/kuavo-ros-opensource/devel/setup.bash +cd ~/gazebo_world_manager/scene/grasp-box/scripts +python3 grasp_box_example.py --sim \ No newline at end of file diff --git a/scene_mgr.py b/scene_mgr.py new file mode 100644 index 0000000..f990795 --- /dev/null +++ b/scene_mgr.py @@ -0,0 +1,93 @@ +import dataclasses +import importlib +import os +import subprocess +import threading + +import scene_plugin + + +@dataclasses.dataclass +class Scene: + name: str + launch_file: str = None + scripts: dict = None + + +class SceneManager: + def __init__(self): + self.scene_dict = {} + self.running_scene = None + self.sim_process = None + self.sim_thread = None + self.scene_plugin = None + + def load_scene(self): + """ + 从场景文件夹加载场景。 + """ + scene_dir = os.path.join(os.path.dirname(__file__), 'scene') + if not os.path.exists(scene_dir): + raise RuntimeError('no scene exist') + names = [name for name in os.listdir( + scene_dir) if os.path.isdir(os.path.join(scene_dir, name))] + for name in names: + scene = Scene(name) + scene.launch_file = os.path.join(scene_dir, name, 'launch.sh') + scene.scripts = {} + if not os.path.exists(scene.launch_file): + continue + for script_name in os.listdir(os.path.join(scene_dir, name, 'scripts')): + if script_name.endswith('.sh'): + script_name_only = os.path.splitext(script_name)[0] + scene.scripts[script_name_only] = os.path.join( + scene_dir, name, 'scripts', script_name) + self.scene_dict[name] = scene + + def get_scene(self): + return list(self.scene_dict.keys()) + + def get_scripts(self, name): + return list(self.scene_dict[name].scripts.keys()) + + def start_scene(self, name): + if self.running_scene: + raise RuntimeError( + f'scene {self.running_scene} is already running') + if name not in self.scene_dict: + raise ValueError(f'scene {name} not found') + scene_plugin_module = importlib.import_module(f'scene.{name}.plugin') + self.scene_plugin = scene_plugin_module.Plugin(name) + self.running_scene = name + self.sim_process = subprocess.Popen( + ['bash', self.scene_dict[name].launch_file], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + + def shut_scene(self): + # TODO: 并没有正确终止仿真系列进程 + self.sim_process.terminate() + self.sim_process = None + self.running_scene = None + self.scene_plugin = None + + def run_script(self, name): + if not self.running_scene: + raise RuntimeError('no scene running') + script_path = self.scene_dict[self.running_scene].scripts.get(name, None) + if not script_path: + raise ValueError(f'script {name} not found') + subprocess.Popen(['bash', script_path]) + +if __name__ == '__main__': + mgr = SceneManager() + mgr.load_scene() + print(mgr.get_scene()) + print(mgr.get_scripts('grasp-box')) + mgr.start_scene('grasp-box') + import time + time.sleep(20) + mgr.run_script('grasp_box_example') + time.sleep(60) + mgr.shut_scene()