import asyncio import threading import websockets from websockets.exceptions import ConnectionClosed class HeartbeatServer: def __init__(self, host="0.0.0.0", port=12301, path="/api/v1/heartbeat", close_func=None): self.host = host self.port = port self.path = path self._server = None self._connection = None self._loop = None self._thread = None self._connection_close_func = close_func async def _handler(self, websocket, path): # 检查路径 if path != self.path: await websocket.close(code=1008, reason="Invalid path") return # 已有连接时拒绝新连接 if self._connection and not self._connection.closed: await websocket.close(code=1013, reason="Server already has a connection") print(f"拒绝来自 {websocket.remote_address} 的新连接(已有活跃连接)") return # 接受连接 self._connection = websocket client = websocket.remote_address print(f"客户端 {client} 已连接") try: async for message in websocket: print(f"收到来自 {client} 的消息: {message}") except ConnectionClosed: pass finally: if callable(self._connection_close_func): self._connection_close_func() print(f"客户端 {client} 已断开连接") self._connection = None async def _start_server(self): self._server = await websockets.serve(self._handler, self.host, self.port) print(f"HeartbeatServer 启动,监听 ws://{self.host}:{self.port}{self.path}") await self._server.wait_closed() def start(self): """同步启动服务器""" def run_loop(): self._loop = asyncio.new_event_loop() asyncio.set_event_loop(self._loop) self._loop.run_until_complete(self._start_server()) self._thread = threading.Thread(target=run_loop, daemon=True) self._thread.start() def stop(self): """同步关闭服务器""" if self._server and self._loop: def shutdown(): self._server.close() asyncio.run_coroutine_threadsafe(shutdown(), self._loop) print("HeartbeatServer 已关闭") def set_connection_close_func(self, func): self._connection_close_func = func if __name__ == "__main__": def ff(): print(111) server = HeartbeatServer(close_func=ff) server.start() try: while True: pass # 你的主线程逻辑 except KeyboardInterrupt: server.stop()