gazebo_world_manager/http_server.py

282 lines
9.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import dataclasses
import socket
import subprocess
import threading
import time
import flask
import rospy
import werkzeug.serving
from flask import jsonify, request
from heartbeat import HeartbeatServer
from model_mgr.manager import ModelManager
from model_mgr.model_types import Model
from scene_mgr.manager import SceneManager
@dataclasses.dataclass
class HttpHost():
"""
HTTP主机信息
"""
hostname: str
port: int
def __post_init__(self):
if not self.hostname:
raise ValueError('Hostname cannot be empty')
if not (0 <= self.port <= 65535):
raise ValueError(f'Invalid port: {self.port}')
class HttpRPCServer():
"""
来自ability-sdk-python的http服务器。
HTTP RPC服务端实现。
使用add_route方法添加路由重写类以实现自定义功能。
"""
def __init__(self):
self._server = None
self._app = flask.Flask(__name__)
def add_route(self, uri: str, handler, methods: list):
"""
添加HTTP路由handler应符合Flask视图函数的规范。
"""
if not callable(handler):
raise ValueError('Handler must be a callable function')
self._app.add_url_rule(uri, view_func=handler, methods=methods)
def start(self, hostname: str = '0.0.0.0', port: int = None):
"""
启动并以阻塞方式提供服务。
"""
if self._server:
raise RuntimeError('Server is already running')
self.host = HttpHost(
hostname=hostname, port=port if port else self.find_free_port())
self._server = werkzeug.serving.make_server(
self.host.hostname, self.host.port, self._app, threaded=True)
self._server.serve_forever()
def stop(self):
"""
停止服务。
"""
if self._server:
self._server.shutdown()
self._server.server_close()
self._server = None
def find_free_port(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('127.0.0.1', 0))
return s.getsockname()[1]
class GazeboSimHttpServer(HttpRPCServer):
"""
Gazebo仿真环境的HTTP服务器。
"""
def __init__(self):
super().__init__()
# 模块与数据
self.rlock = threading.RLock()
self.scene_mgr = SceneManager()
self.model_mgr = None
def llost():
self.shut_scene()
self.heartbeat_server = HeartbeatServer(close_func=llost)
self.heartbeat_server.start()
# 启动roscore
self.roscore_proc = subprocess.Popen(
['roscore'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(5)
rospy.init_node('gazebo_world_manager', anonymous=True)
# 添加接口
self._app.add_url_rule(
'/api/v1/scenes', view_func=self.query_scenes, methods=['GET'])
self._app.add_url_rule('/api/v1/scene/load',
view_func=self.start_scene, methods=['GET'])
self._app.add_url_rule('/api/v1/scene/objects',
view_func=self.get_all_models, methods=['GET'])
self._app.add_url_rule(
'/api/v1/scene/object', view_func=self.modify_model, methods=['POST'])
self._app.add_url_rule('/api/v1/scene/object',
view_func=self.add_model, methods=['PUT'])
self._app.add_url_rule(
'/api/v1/scene/object', view_func=self.remove_model, methods=['DELETE'])
self._app.add_url_rule('/api/v1/scene/shut',
view_func=self.shut_scene, methods=['GET'])
print('Sever init done.')
def query_scenes(self):
# 查询场景
with self.rlock:
ret = {'code': 200, 'message': ''}
try:
scs = self.scene_mgr.get_scenes()
data = [sc.to_dict() for sc in scs]
ret['data'] = data
except Exception as e:
ret['code'] = 400
ret['message'] = str(e)
print(f'Server query scenes done. {ret}')
return jsonify(ret)
def start_scene(self):
# 启动场景
with self.rlock:
ret = {'code': 200, 'message': ''}
try:
# 检查参数
id = request.args.get('id', type=int, default=None)
if not id:
raise ValueError('Scene ID is required')
# 关闭现有场景
if self.scene_mgr.running_scene:
self.shut_scene()
# 启动场景
self.scene_mgr.start_scene(id)
self.model_mgr = ModelManager()
time.sleep(30)
self.model_mgr.load_models(
self.scene_mgr.scene_id_dict[id].preload_models)
except Exception as e:
ret['code'] = 400
ret['message'] = str(e)
print('Server start scene done.')
return jsonify(ret)
def shut_scene(self):
# 关闭场景
with self.rlock:
ret = {'code': 200, 'message': ''}
try:
if not self.scene_mgr.running_scene:
raise RuntimeError('No scene is running')
self.scene_mgr.shut_scene()
time.sleep(5)
self.model_mgr = None
except Exception as e:
ret['code'] = 400
ret['message'] = str(e)
print('Server shut scene done.')
return ret
def get_all_models(self):
# 获取所有模型
with self.rlock:
ret = {'code': 200, 'message': ''}
try:
mods = self.model_mgr.get_all_models()
cnt = len(mods)
data = {}
data['total_count'] = cnt
data['list'] = [m.to_dict() for m in mods]
ret['data'] = data
except Exception as e:
ret['code'] = 400
ret['message'] = str(e)
print(f'Server get all models done. {ret}')
return jsonify(ret)
def modify_model(self):
# 修改模型
with self.rlock:
ret = {'code': 200, 'message': ''}
try:
# 解析参数
info = request.get_json()
print(f'get mod req {info}')
scene_id = info.get('id')
if not scene_id or scene_id != self.scene_mgr.running_scene:
raise ValueError(
f'Scene {scene_id} does not match the running scene {self.scene_mgr.running_scene}')
object_id = info.get('object_id')
if not object_id:
raise ValueError('Object ID is required')
# 制作Model
info['id'] = object_id
model = self.model_mgr.get_model(object_id)
model.overwrite_from_dict(info)
# 添加模型
self.model_mgr.modify_model(model)
except Exception as e:
ret['code'] = 400
ret['message'] = str(e)
print('Server modify model done.')
return jsonify(ret)
def add_model(self):
# 添加模型
with self.rlock:
ret = {'code': 200, 'message': ''}
try:
# 解析参数
info = request.get_json()
print(f'get add req {info}')
scene_id = info.get('id')
if not scene_id or scene_id != self.scene_mgr.running_scene:
raise ValueError(
f'Scene {scene_id} does not match the running scene {self.scene_mgr.running_scene}')
# 制作Model和modify的区别在于没有id
info['id'] = -1
model = Model.from_dict(info)
# 添加模型
self.model_mgr.add_model(model)
except Exception as e:
ret['code'] = 400
ret['message'] = str(e)
print('Server add model done.')
return jsonify(ret)
def remove_model(self):
# 删除模型
with self.rlock:
ret = {'code': 200, 'message': ''}
try:
# 解析参数
info = request.get_json()
print(f'get del req {info}')
scene_id = info.get('id')
if not scene_id or scene_id != self.scene_mgr.running_scene:
raise ValueError(
f'Scene {scene_id} does not match the running scene {self.scene_mgr.running_scene}')
object_id = info.get('object_id')
# 删除模型
self.model_mgr.remove_model(object_id)
except Exception as e:
ret['code'] = 400
ret['message'] = str(e)
print('Server remove model done.')
return jsonify(ret)
def shutdown(self):
"""
关闭所有资源。
"""
# 1. 关闭场景(如果正在运行)
if self.scene_mgr.running_scene:
self.shut_scene()
# 2. 关闭 HeartbeatServer
self.heartbeat_server.stop()
# 3. 关闭 roscore 进程
if self.roscore_proc:
self.roscore_proc.terminate()
self.roscore_proc.wait()
# 4. 关闭 Flask HTTP 服务器
super().stop()
print("GazeboSimHttpServer stopped.")
if __name__ == '__main__':
server = GazeboSimHttpServer()
server.start(hostname='0.0.0.0', port=12300)