diff --git a/heartbeat.py b/heartbeat.py new file mode 100644 index 0000000..5f59e5b --- /dev/null +++ b/heartbeat.py @@ -0,0 +1,83 @@ +import asyncio +import threading + +import websockets +from websockets.exceptions import ConnectionClosed + + +class HeartbeatServer: + def __init__(self, host="0.0.0.0", port=12301, path="/api/v1/heartbeat", close_func=None): + self.host = host + self.port = port + self.path = path + self._server = None + self._connection = None + self._loop = None + self._thread = None + self._connection_close_func = close_func + + async def _handler(self, websocket, path): + # 检查路径 + if path != self.path: + await websocket.close(code=1008, reason="Invalid path") + return + + # 已有连接时拒绝新连接 + if self._connection and not self._connection.closed: + await websocket.close(code=1013, reason="Server already has a connection") + print(f"拒绝来自 {websocket.remote_address} 的新连接(已有活跃连接)") + return + + # 接受连接 + self._connection = websocket + client = websocket.remote_address + print(f"客户端 {client} 已连接") + + try: + async for message in websocket: + print(f"收到来自 {client} 的消息: {message}") + except ConnectionClosed: + pass + finally: + if callable(self._connection_close_func): + self._connection_close_func() + print(f"客户端 {client} 已断开连接") + self._connection = None + + async def _start_server(self): + self._server = await websockets.serve(self._handler, self.host, self.port) + print(f"HeartbeatServer 启动,监听 ws://{self.host}:{self.port}{self.path}") + await self._server.wait_closed() + + def start(self): + """同步启动服务器""" + def run_loop(): + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + self._loop.run_until_complete(self._start_server()) + + self._thread = threading.Thread(target=run_loop, daemon=True) + self._thread.start() + + def stop(self): + """同步关闭服务器""" + if self._server and self._loop: + def shutdown(): + self._server.close() + asyncio.run_coroutine_threadsafe(shutdown(), self._loop) + print("HeartbeatServer 已关闭") + + def set_connection_close_func(self, func): + self._connection_close_func = func + + +if __name__ == "__main__": + def ff(): + print(111) + server = HeartbeatServer(close_func=ff) + server.start() + try: + while True: + pass # 你的主线程逻辑 + except KeyboardInterrupt: + server.stop() diff --git a/http_server.py b/http_server.py index fb5df56..6e0971e 100644 --- a/http_server.py +++ b/http_server.py @@ -10,6 +10,7 @@ import werkzeug.serving from flask import jsonify, request import gazebo_ctrl +from heartbeat import HeartbeatServer from model_mgr.manager import ModelManager from scene_mgr.manager import SceneManager @@ -82,9 +83,11 @@ class GazeboSimHttpServer(HttpRPCServer): def __init__(self): super().__init__() + # 模块与数据 self.rlock = threading.RLock() self.scene_mgr = SceneManager() self.model_mgr = ModelManager() + self.heartbeat_server = HeartbeatServer() # 启动roscore self.roscore_proc = subprocess.Popen( ['roscore'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)