feat: 添加服务框架

This commit is contained in:
zpff 2025-08-09 22:38:39 +08:00
parent 1843c21972
commit 3fbf51603a
3 changed files with 160 additions and 8 deletions

View File

@ -1,8 +1,15 @@
import dataclasses
import importlib
import os
import socket
import subprocess
import threading
import flask
import werkzeug.serving
from flask import jsonify, request
import gazebo_ctrl
@dataclasses.dataclass
@ -15,9 +22,10 @@ class HttpHost():
def __post_init__(self):
if not self.hostname:
raise ValueError("Hostname cannot be empty")
raise ValueError('Hostname cannot be empty')
if not (0 <= self.port <= 65535):
raise ValueError(f"Invalid port: {self.port}")
raise ValueError(f'Invalid port: {self.port}')
class HttpRPCServer():
"""
@ -35,15 +43,15 @@ class HttpRPCServer():
添加HTTP路由handler应符合Flask视图函数的规范
"""
if not callable(handler):
raise ValueError("Handler must be a callable function")
raise ValueError('Handler must be a callable function')
self._app.add_url_rule(uri, view_func=handler, methods=methods)
def start(self, hostname: str = "0.0.0.0", port: int = None):
def start(self, hostname: str = '0.0.0.0', port: int = None):
"""
启动并以阻塞方式提供服务
"""
if self._server:
raise RuntimeError("Server is already running")
raise RuntimeError('Server is already running')
self.host = HttpHost(
hostname=hostname, port=port if port else self.find_free_port())
self._server = werkzeug.serving.make_server(
@ -61,13 +69,142 @@ class HttpRPCServer():
def find_free_port(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
s.bind(('127.0.0.1', 0))
return s.getsockname()[1]
class GazeboSimHttpServer(HttpRPCServer):
"""
Gazebo仿真环境的HTTP服务器
"""
def __init__(self):
super().__init__()
super().__init__()
# Gazebo仿真/程序控制
self.add_route('/simulation/scene', self.get_simulation_scene, ['GET'])
self.add_route('/simulation/start', self.start_simulation, ['GET'])
self.add_route('/simulation/shutdown',
self.shutdown_simulation, ['GET'])
self.add_route('/simulation/run-script', self.run_script, ['POST'])
# Gazebo仿真/仿真控制
self.add_route('/physics/continue', self.continue_physics, ['GET'])
self.add_route('/physics/pause', self.pause_physics, ['GET'])
self.add_route('/physics/reset', self.reset_physics, ['GET'])
# Gazebo仿真/模型控制
self.add_route('/model/state', self.get_model_state, ['GET'])
self.add_route('/model/state', self.set_model_state, ['POST'])
self.add_route('/model/spawn', self.spawn_model, ['POST'])
self.add_route('/model/delete', self.delete_model, ['POST'])
# Gazebo仿真/场景专有控制
self.add_route('/<scene>/model/state',
self.get_scene_model_state, ['GET'])
self.add_route('/<scene>/model/state',
self.set_scene_model_state, ['POST'])
self.add_route('/<scene>/model/spawn',
self.spawn_scene_model, ['POST'])
# ===== 处理函数实现 =====
def get_simulation_scene(self):
scene_dir = os.path.join(os.path.dirname(__file__), 'scene')
if not os.path.exists(scene_dir):
return jsonify([])
scenes = [name for name in os.listdir(
scene_dir) if os.path.isdir(os.path.join(scene_dir, name))]
return jsonify(scenes)
def start_simulation(self):
scene = request.args.get('scene', 'grasp-box')
self.scene_name = scene
# 找到场景文件夹下的launch文件
launch_file = os.path.join('scene', scene, 'launch.sh')
if not os.path.exists(launch_file):
return jsonify({'error': 'Launch file not found'}), 404
# 加载场景插件
plugin_py = os.path.join('scene', scene, 'plugin.py')
if not os.path.exists(plugin_py):
return jsonify({'error': 'Plugin file not found'}), 404
self.scene_plugin_class = importlib.import_module(f'scene.{scene}.plugin')
self.scene_plugin = self.scene_plugin_class.Plugin(scene)
# 执行launch文件
def run_sim():
self.sim_process = subprocess.Popen(['bash', launch_file])
if hasattr(self, 'sim_thread') and self.sim_thread.is_alive():
return jsonify({'error': 'Simulation already running'}), 400
self.sim_thread = threading.Thread(target=run_sim, daemon=True)
self.sim_thread.start()
self.gazebo_controller = gazebo_ctrl.GazeboROSController()
return 'OK', 200
def shutdown_simulation(self):
if hasattr(self, 'sim_process') and self.sim_process:
self.sim_process.terminate()
self.sim_process = None
self.sim_thread.join(timeout=1)
self.sim_thread = None
self.scene_name = None
self.scene_plugin = None
self.scene_plugin_class = None
return jsonify({'status': 'stopped'})
def run_script(self):
name = request.args.get('name')
if not name:
return jsonify({'error': 'Missing name'}), 400
return jsonify({'status': 'script executed', 'name': name})
def continue_physics(self):
return jsonify({'status': 'physics continued'})
def pause_physics(self):
return jsonify({'status': 'physics paused'})
def reset_physics(self):
return jsonify({'status': 'physics reset'})
def get_model_state(self):
names = request.args.getlist('names') or []
return jsonify([{'name': n, 'pose': {}, 'twist': {}} for n in names])
def set_model_state(self):
data = request.get_json(silent=True)
if not data or 'name' not in data:
return jsonify({'error': 'Missing required field name'}), 400
return jsonify({'status': 'model state updated', 'data': data})
def spawn_model(self):
data = request.get_json(silent=True)
required = ['pose', 'name', 'xml']
if not data or not all(k in data for k in required):
return jsonify({'error': f'Missing required fields {required}'}), 400
return jsonify({'status': 'model spawned', 'data': data})
def delete_model(self):
data = request.get_json(silent=True)
if not data or 'name' not in data:
return jsonify({'error': 'Missing required field name'}), 400
return jsonify({'status': 'model deleted', 'name': data['name']})
def get_scene_model_state(self, scene):
if scene != self.scene_name:
return jsonify({'error': f'running {self.scene_name} but try to access {scene}'}), 400
return self.scene_plugin.get_scene_model_state()
def set_scene_model_state(self, scene):
if scene != self.scene_name:
return jsonify({'error': f'running {self.scene_name} but try to access {scene}'}), 400
return self.scene_plugin.post_scene_model_state()
def spawn_scene_model(self, scene):
if scene != self.scene_name:
return jsonify({'error': f'running {self.scene_name} but try to access {scene}'}), 400
return self.scene_plugin.post_scene_model_spawn()
if __name__ == '__main__':
server = GazeboSimHttpServer()
server.start(hostname='0.0.0.0', port=12300)

View File

@ -0,0 +1,15 @@
import scene_plugin
class Plugin(scene_plugin.ScenePluginBase):
def __init__(self, scene_name):
super().__init__(scene_name)
def post_scene_model_spawn(self):
raise NotImplementedError()
def post_scene_model_state(self):
raise NotImplementedError()
def get_scene_model_state(self):
raise NotImplementedError()

View File

@ -1,4 +1,4 @@
class ScenePlugin:
class ScenePluginBase:
"""
场景自定义模型的处理
"""