import json import rospy from gazebo_msgs.msg import ModelState from gazebo_msgs.srv import (DeleteModel, GetLinkProperties, GetModelProperties, GetModelPropertiesResponse, GetModelState, GetModelStateResponse, GetWorldProperties, SetModelState, SpawnModel) from geometry_msgs.msg import Point, Pose, Quaternion, Twist, Vector3 from std_srvs.srv import Empty from tf.transformations import euler_from_quaternion, quaternion_from_euler class GazeboROSController: def __init__(self): if not rospy.core.is_initialized(): rospy.init_node('gazebo_world_manager', anonymous=True, disable_signals=True) self.model_info = None def get_model_names(self) -> list: rospy.wait_for_service('/gazebo/get_world_properties') try: get_world_properties = rospy.ServiceProxy( '/gazebo/get_world_properties', GetWorldProperties) world_properties = get_world_properties() return world_properties.model_names except rospy.ServiceException as e: rospy.logerr(f"Service call failed: {e}") return [] def get_model_state(self, model_names: list = None) -> dict: """ 获取模型的状态。 如果model_names为空,返回所有模型的状态,否则返回指定模型的状态。 """ if not model_names or len(model_names) == 0: model_names = self.get_model_names() print(f"Getting state for models: {model_names}") model_states = {} for name in model_names: rospy.wait_for_service('/gazebo/get_model_state') try: get_model_state = rospy.ServiceProxy( '/gazebo/get_model_state', GetModelState) state = get_model_state(name, '') model_states[name] = state except rospy.ServiceException as e: rospy.logerr(f"Service call failed for model {name}: {e}") model_states[name] = None return model_states def get_model_properties(self, model_names: list = None) -> dict: """ 获取模型的属性。 如果model_names为空,返回所有模型的属性,否则返回指定模型的属性。 """ if not model_names or len(model_names) == 0: model_names = self.get_model_names() print(f"Getting properties for models: {model_names}") model_properties = {} for name in model_names: rospy.wait_for_service('/gazebo/get_model_properties') try: get_model_properties = rospy.ServiceProxy( '/gazebo/get_model_properties', GetModelProperties) properties = get_model_properties(name) model_properties[name] = properties except rospy.ServiceException as e: rospy.logerr(f"Service call failed for model {name}: {e}") model_properties[name] = None return model_properties def get_link_properties(self, link_full_name: str) -> dict: """ 获取指定链接的属性。 """ rospy.wait_for_service('/gazebo/get_link_properties') try: get_link_properties = rospy.ServiceProxy( '/gazebo/get_link_properties', GetLinkProperties) properties = get_link_properties(link_full_name) return properties except rospy.ServiceException as e: rospy.logerr(f"Service call failed for link {link_full_name}: {e}") return None def format_model_state(self, state: GetModelStateResponse) -> dict: # 四元数转换为欧拉角 roll, pitch, yaw = euler_from_quaternion(( state.pose.orientation.x, state.pose.orientation.y, state.pose.orientation.z, state.pose.orientation.w )) return { 'pose': { 'position': { 'x': state.pose.position.x, 'y': state.pose.position.y, 'z': state.pose.position.z }, 'orientation': { 'roll': roll, 'pitch': pitch, 'yaw': yaw } }, 'twist': { 'linear': { 'x': state.twist.linear.x, 'y': state.twist.linear.y, 'z': state.twist.linear.z }, 'angular': { 'x': state.twist.angular.x, 'y': state.twist.angular.y, 'z': state.twist.angular.z } }} def format_model_properties(self, properties: GetModelPropertiesResponse) -> dict: return { 'parent_model_name': properties.parent_model_name, 'canonical_body_name': properties.canonical_body_name, 'body_names': properties.body_names, 'geom_names': properties.geom_names, 'joint_names': properties.joint_names, 'child_model_names': properties.child_model_names, 'is_static': properties.is_static } def format_model_info(self, model_name: str, state: GetModelStateResponse, properties: GetModelPropertiesResponse) -> dict: return { 'name': model_name, 'state': self.format_model_state(state), 'properties': self.format_model_properties(properties) } def get_model_state_f(self, model_names: list = None) -> list: """ 获取模型的状态。 如果model_names为空,返回所有模型的信息,否则返回指定模型的信息。 返回值已经格式化为list-dict形式。 """ if not model_names or len(model_names) == 0: model_names = self.get_model_names() # print(f"Getting info for models: {model_names}") model_info = [] states = self.get_model_state(model_names) for name in model_names: m_state = states[name] info = self.format_model_state(m_state) info['name'] = name model_info.append(info) return model_info def pause_physics(self): rospy.wait_for_service('/gazebo/pause_physics') pause_physics = rospy.ServiceProxy('/gazebo/pause_physics', Empty) pause_physics() def continue_physics(self): rospy.wait_for_service('/gazebo/unpause_physics') unpause_physics = rospy.ServiceProxy('/gazebo/unpause_physics', Empty) unpause_physics() def reset_simulation(self): rospy.wait_for_service('/gazebo/reset_simulation') reset_simulation = rospy.ServiceProxy( '/gazebo/reset_simulation', Empty) reset_simulation() def delete_model(self, name): rospy.wait_for_service('/gazebo/delete_model') delete_model = rospy.ServiceProxy('/gazebo/delete_model', DeleteModel) resp = delete_model(model_name=name) if not resp.success: raise RuntimeError( f"Failed to delete model {name}: {resp.status_message}") def set_model_state(self, name, state_json): # 解析位置 pos = state_json.get('pose', {}).get('position', {}) x = pos.get('x', 0.0) y = pos.get('y', 0.0) z = pos.get('z', 0.0) # 解析欧拉角并转为四元数 ori = state_json.get('pose', {}).get('orientation', {}) roll = ori.get('roll', 0.0) pitch = ori.get('pitch', 0.0) yaw = ori.get('yaw', 0.0) qx, qy, qz, qw = quaternion_from_euler(roll, pitch, yaw) # 解析twist linear = state_json.get('twist', {}).get('linear', {}) angular = state_json.get('twist', {}).get('angular', {}) linear_vec = Vector3( x=linear.get('x', 0.0), y=linear.get('y', 0.0), z=linear.get('z', 0.0) ) angular_vec = Vector3( x=angular.get('x', 0.0), y=angular.get('y', 0.0), z=angular.get('z', 0.0) ) # 参考系 reference_frame = state_json.get('reference', 'world') # 构造ModelState state = ModelState() state.model_name = name state.pose = Pose( position=Point(x, y, z), orientation=Quaternion(qx, qy, qz, qw) ) state.twist = Twist( linear=linear_vec, angular=angular_vec ) state.reference_frame = reference_frame rospy.wait_for_service('/gazebo/set_model_state') set_model_state = rospy.ServiceProxy( '/gazebo/set_model_state', SetModelState) resp = set_model_state(state) if not resp.success: raise RuntimeError( f"Failed to set model state for {name}: {resp.status_message}") def spawn_urdf_model(self, model_name, urdf_content, pose): # 生成模型 rospy.wait_for_service('/gazebo/spawn_urdf_model') spawn_service = rospy.ServiceProxy( '/gazebo/spawn_urdf_model', SpawnModel) response = spawn_service( model_name=model_name, model_xml=urdf_content, robot_namespace='', initial_pose=pose, reference_frame='world' ) if not response.success: raise RuntimeError( f"Failed to spawn {model_name}: {response.status_message}") if __name__ == '__main__': controller = GazeboROSController() print(json.dumps(controller.get_model_state_f(['block']))) jsond = {"pose": {"position": {"x": 1, "y": 2.0000041832461184, "z": 1}, "orientation": {"roll": 1.0008074575547628e-10, "pitch": -8.228587019137734e-12, "yaw": -3.1415190381173597}}, "twist": {"linear": { "x": 1.6640674574453256e-11, "y": 2.6277239545161537e-10, "z": 1.607170552758444e-05}, "angular": {"x": -2.2534679599858193e-09, "y": 1.551234375963502e-10, "z": -2.5445035969479123e-13}}, "name": "block"} controller.set_model_state('block', jsond)