gazebo_world_manager/gazebo_ctrl.py
2025-08-12 14:05:18 +08:00

255 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import rospy
from gazebo_msgs.msg import ModelState
from gazebo_msgs.srv import (DeleteModel, GetLinkProperties,
GetModelProperties, GetModelPropertiesResponse,
GetModelState, GetModelStateResponse,
GetWorldProperties, SetModelState, SpawnModel)
from geometry_msgs.msg import Point, Pose, Quaternion, Twist, Vector3
from std_srvs.srv import Empty
from tf.transformations import euler_from_quaternion, quaternion_from_euler
class GazeboROSController:
def __init__(self):
if not rospy.core.is_initialized():
rospy.init_node('gazebo_world_manager',
anonymous=True, disable_signals=True)
self.model_info = None
def get_model_names(self) -> list:
rospy.wait_for_service('/gazebo/get_world_properties')
try:
get_world_properties = rospy.ServiceProxy(
'/gazebo/get_world_properties', GetWorldProperties)
world_properties = get_world_properties()
return world_properties.model_names
except rospy.ServiceException as e:
rospy.logerr(f"Service call failed: {e}")
return []
def get_model_state(self, model_names: list = None) -> dict:
"""
获取模型的状态。
如果model_names为空返回所有模型的状态否则返回指定模型的状态。
"""
if not model_names or len(model_names) == 0:
model_names = self.get_model_names()
print(f"Getting state for models: {model_names}")
model_states = {}
for name in model_names:
rospy.wait_for_service('/gazebo/get_model_state')
try:
get_model_state = rospy.ServiceProxy(
'/gazebo/get_model_state', GetModelState)
state = get_model_state(name, '')
model_states[name] = state
except rospy.ServiceException as e:
rospy.logerr(f"Service call failed for model {name}: {e}")
model_states[name] = None
return model_states
def get_model_properties(self, model_names: list = None) -> dict:
"""
获取模型的属性。
如果model_names为空返回所有模型的属性否则返回指定模型的属性。
"""
if not model_names or len(model_names) == 0:
model_names = self.get_model_names()
print(f"Getting properties for models: {model_names}")
model_properties = {}
for name in model_names:
rospy.wait_for_service('/gazebo/get_model_properties')
try:
get_model_properties = rospy.ServiceProxy(
'/gazebo/get_model_properties', GetModelProperties)
properties = get_model_properties(name)
model_properties[name] = properties
except rospy.ServiceException as e:
rospy.logerr(f"Service call failed for model {name}: {e}")
model_properties[name] = None
return model_properties
def get_link_properties(self, link_full_name: str) -> dict:
"""
获取指定链接的属性。
"""
rospy.wait_for_service('/gazebo/get_link_properties')
try:
get_link_properties = rospy.ServiceProxy(
'/gazebo/get_link_properties', GetLinkProperties)
properties = get_link_properties(link_full_name)
return properties
except rospy.ServiceException as e:
rospy.logerr(f"Service call failed for link {link_full_name}: {e}")
return None
def format_model_state(self, state: GetModelStateResponse) -> dict:
# 四元数转换为欧拉角
roll, pitch, yaw = euler_from_quaternion((
state.pose.orientation.x,
state.pose.orientation.y,
state.pose.orientation.z,
state.pose.orientation.w
))
return {
'pose': {
'position': {
'x': state.pose.position.x,
'y': state.pose.position.y,
'z': state.pose.position.z
},
'orientation': {
'roll': roll,
'pitch': pitch,
'yaw': yaw
}
},
'twist': {
'linear': {
'x': state.twist.linear.x,
'y': state.twist.linear.y,
'z': state.twist.linear.z
},
'angular': {
'x': state.twist.angular.x,
'y': state.twist.angular.y,
'z': state.twist.angular.z
}
}}
def format_model_properties(self, properties: GetModelPropertiesResponse) -> dict:
return {
'parent_model_name': properties.parent_model_name,
'canonical_body_name': properties.canonical_body_name,
'body_names': properties.body_names,
'geom_names': properties.geom_names,
'joint_names': properties.joint_names,
'child_model_names': properties.child_model_names,
'is_static': properties.is_static
}
def format_model_info(self, model_name: str, state: GetModelStateResponse, properties: GetModelPropertiesResponse) -> dict:
return {
'name': model_name,
'state': self.format_model_state(state),
'properties': self.format_model_properties(properties)
}
def get_model_state_f(self, model_names: list = None) -> list:
"""
获取模型的状态。
如果model_names为空返回所有模型的信息否则返回指定模型的信息。
返回值已经格式化为list-dict形式。
"""
if not model_names or len(model_names) == 0:
model_names = self.get_model_names()
# print(f"Getting info for models: {model_names}")
model_info = []
states = self.get_model_state(model_names)
for name in model_names:
m_state = states[name]
info = self.format_model_state(m_state)
info['name'] = name
model_info.append(info)
return model_info
def pause_physics(self):
rospy.wait_for_service('/gazebo/pause_physics')
pause_physics = rospy.ServiceProxy('/gazebo/pause_physics', Empty)
pause_physics()
def continue_physics(self):
rospy.wait_for_service('/gazebo/unpause_physics')
unpause_physics = rospy.ServiceProxy('/gazebo/unpause_physics', Empty)
unpause_physics()
def reset_simulation(self):
rospy.wait_for_service('/gazebo/reset_simulation')
reset_simulation = rospy.ServiceProxy(
'/gazebo/reset_simulation', Empty)
reset_simulation()
def delete_model(self, name):
rospy.wait_for_service('/gazebo/delete_model')
delete_model = rospy.ServiceProxy('/gazebo/delete_model', DeleteModel)
resp = delete_model(model_name=name)
if not resp.success:
raise RuntimeError(
f"Failed to delete model {name}: {resp.status_message}")
def set_model_state(self, name, state_json):
# 解析位置
pos = state_json.get('pose', {}).get('position', {})
x = pos.get('x', 0.0)
y = pos.get('y', 0.0)
z = pos.get('z', 0.0)
# 解析欧拉角并转为四元数
ori = state_json.get('pose', {}).get('orientation', {})
roll = ori.get('roll', 0.0)
pitch = ori.get('pitch', 0.0)
yaw = ori.get('yaw', 0.0)
qx, qy, qz, qw = quaternion_from_euler(roll, pitch, yaw)
# 解析twist
linear = state_json.get('twist', {}).get('linear', {})
angular = state_json.get('twist', {}).get('angular', {})
linear_vec = Vector3(
x=linear.get('x', 0.0),
y=linear.get('y', 0.0),
z=linear.get('z', 0.0)
)
angular_vec = Vector3(
x=angular.get('x', 0.0),
y=angular.get('y', 0.0),
z=angular.get('z', 0.0)
)
# 参考系
reference_frame = state_json.get('reference', 'world')
# 构造ModelState
state = ModelState()
state.model_name = name
state.pose = Pose(
position=Point(x, y, z),
orientation=Quaternion(qx, qy, qz, qw)
)
state.twist = Twist(
linear=linear_vec,
angular=angular_vec
)
state.reference_frame = reference_frame
rospy.wait_for_service('/gazebo/set_model_state')
set_model_state = rospy.ServiceProxy(
'/gazebo/set_model_state', SetModelState)
resp = set_model_state(state)
if not resp.success:
raise RuntimeError(
f"Failed to set model state for {name}: {resp.status_message}")
def spawn_urdf_model(self, model_name, urdf_content, pose):
# 生成模型
rospy.wait_for_service('/gazebo/spawn_urdf_model')
spawn_service = rospy.ServiceProxy(
'/gazebo/spawn_urdf_model', SpawnModel)
response = spawn_service(
model_name=model_name,
model_xml=urdf_content,
robot_namespace='',
initial_pose=pose,
reference_frame='world'
)
if not response.success:
raise RuntimeError(
f"Failed to spawn {model_name}: {response.status_message}")
if __name__ == '__main__':
controller = GazeboROSController()
print(json.dumps(controller.get_model_state_f(['block'])))
jsond = {"pose": {"position": {"x": 1, "y": 2.0000041832461184, "z": 1}, "orientation": {"roll": 1.0008074575547628e-10, "pitch": -8.228587019137734e-12, "yaw": -3.1415190381173597}}, "twist": {"linear": {
"x": 1.6640674574453256e-11, "y": 2.6277239545161537e-10, "z": 1.607170552758444e-05}, "angular": {"x": -2.2534679599858193e-09, "y": 1.551234375963502e-10, "z": -2.5445035969479123e-13}}, "name": "block"}
controller.set_model_state('block', jsond)