gazebo_world_manager/http_server.py

73 lines
2.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import dataclasses
import socket
import flask
import werkzeug.serving
@dataclasses.dataclass
class HttpHost():
"""
HTTP主机信息
"""
hostname: str
port: int
def __post_init__(self):
if not self.hostname:
raise ValueError("Hostname cannot be empty")
if not (0 <= self.port <= 65535):
raise ValueError(f"Invalid port: {self.port}")
class HttpRPCServer():
"""
来自ability-sdk-python的http服务器。
HTTP RPC服务端实现。
使用add_route方法添加路由重写类以实现自定义功能。
"""
def __init__(self):
self._server = None
self._app = flask.Flask(__name__)
def add_route(self, uri: str, handler, methods: list):
"""
添加HTTP路由handler应符合Flask视图函数的规范。
"""
if not callable(handler):
raise ValueError("Handler must be a callable function")
self._app.add_url_rule(uri, view_func=handler, methods=methods)
def start(self, hostname: str = "0.0.0.0", port: int = None):
"""
启动并以阻塞方式提供服务。
"""
if self._server:
raise RuntimeError("Server is already running")
self.host = HttpHost(
hostname=hostname, port=port if port else self.find_free_port())
self._server = werkzeug.serving.make_server(
self.host.hostname, self.host.port, self._app, threaded=True)
self._server.serve_forever()
def stop(self):
"""
停止服务。
"""
if self._server:
self._server.shutdown()
self._server.server_close()
self._server = None
def find_free_port(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
return s.getsockname()[1]
class GazeboSimHttpServer(HttpRPCServer):
"""
Gazebo仿真环境的HTTP服务器。
"""
def __init__(self):
super().__init__()