gazebo_world_manager/http_server.py
2025-08-11 20:19:29 +08:00

143 lines
4.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import dataclasses
import socket
import subprocess
import threading
import time
import flask
import rospy
import werkzeug.serving
from flask import jsonify, request
import gazebo_ctrl
from heartbeat import HeartbeatServer
from model_mgr.manager import ModelManager
from scene_mgr.manager import SceneManager
@dataclasses.dataclass
class HttpHost():
"""
HTTP主机信息
"""
hostname: str
port: int
def __post_init__(self):
if not self.hostname:
raise ValueError('Hostname cannot be empty')
if not (0 <= self.port <= 65535):
raise ValueError(f'Invalid port: {self.port}')
class HttpRPCServer():
"""
来自ability-sdk-python的http服务器。
HTTP RPC服务端实现。
使用add_route方法添加路由重写类以实现自定义功能。
"""
def __init__(self):
self._server = None
self._app = flask.Flask(__name__)
def add_route(self, uri: str, handler, methods: list):
"""
添加HTTP路由handler应符合Flask视图函数的规范。
"""
if not callable(handler):
raise ValueError('Handler must be a callable function')
self._app.add_url_rule(uri, view_func=handler, methods=methods)
def start(self, hostname: str = '0.0.0.0', port: int = None):
"""
启动并以阻塞方式提供服务。
"""
if self._server:
raise RuntimeError('Server is already running')
self.host = HttpHost(
hostname=hostname, port=port if port else self.find_free_port())
self._server = werkzeug.serving.make_server(
self.host.hostname, self.host.port, self._app, threaded=True)
self._server.serve_forever()
def stop(self):
"""
停止服务。
"""
if self._server:
self._server.shutdown()
self._server.server_close()
self._server = None
def find_free_port(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('127.0.0.1', 0))
return s.getsockname()[1]
class GazeboSimHttpServer(HttpRPCServer):
"""
Gazebo仿真环境的HTTP服务器。
"""
def __init__(self):
super().__init__()
# 模块与数据
self.rlock = threading.RLock()
self.scene_mgr = SceneManager()
self.model_mgr = ModelManager()
self.heartbeat_server = HeartbeatServer()
# 启动roscore
self.roscore_proc = subprocess.Popen(
['roscore'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(2)
rospy.init_node('gazebo_world_manager', anonymous=True)
# 添加接口
self._app.add_url_rule('/api/v1/scenes', view_func=self.query_scenes, methods=['GET'])
self._app.add_url_rule('/api/v1/scene/load', view_func=self.load_scene, methods=['GET'])
self._app.add_url_rule('/api/v1/scene/objects', view_func=self.get_scene_objects, methods=['GET'])
self._app.add_url_rule('/api/v1/scene/object', view_func=self.modify_scene_object, methods=['POST'])
self._app.add_url_rule('/api/v1/scene/object', view_func=self.add_scene_object, methods=['PUT'])
self._app.add_url_rule('/api/v1/scene/object', view_func=self.remove_scene_object, methods=['DELETE'])
def query_scenes(self):
# 查询场景
with self.rlock:
pass
def start_scene(self):
# 启动场景
with self.rlock:
pass
def shut_scene(self):
# 关闭场景仅供start_scene、心跳线程使用
with self.rlock:
pass
def get_all_models(self):
# 获取所有模型
with self.rlock:
pass
def modify_model(self):
# 修改模型
with self.rlock:
pass
def add_model(self):
# 添加模型
with self.rlock:
pass
def remove_model(self):
# 删除模型
with self.rlock:
pass
if __name__ == '__main__':
server = GazeboSimHttpServer()
server.start(hostname='0.0.0.0', port=12300)