237 lines
9.4 KiB
Python
237 lines
9.4 KiB
Python
import json
|
||
|
||
import rospy
|
||
from gazebo_msgs.msg import ModelState
|
||
from gazebo_msgs.srv import (DeleteModel, GetLinkProperties,
|
||
GetModelProperties, GetModelPropertiesResponse,
|
||
GetModelState, GetModelStateResponse,
|
||
GetWorldProperties, SetModelState)
|
||
from geometry_msgs.msg import Point, Pose, Quaternion, Twist, Vector3
|
||
from std_srvs.srv import Empty
|
||
from tf.transformations import euler_from_quaternion, quaternion_from_euler
|
||
|
||
|
||
class GazeboROSController:
|
||
def __init__(self):
|
||
if not rospy.core.is_initialized():
|
||
rospy.init_node('gazebo_world_manager',
|
||
anonymous=True, disable_signals=True)
|
||
self.model_info = None
|
||
|
||
def get_model_names(self) -> list:
|
||
rospy.wait_for_service('/gazebo/get_world_properties')
|
||
try:
|
||
get_world_properties = rospy.ServiceProxy(
|
||
'/gazebo/get_world_properties', GetWorldProperties)
|
||
world_properties = get_world_properties()
|
||
return world_properties.model_names
|
||
except rospy.ServiceException as e:
|
||
rospy.logerr(f"Service call failed: {e}")
|
||
return []
|
||
|
||
def get_model_state(self, model_names: list = None) -> dict:
|
||
"""
|
||
获取模型的状态。
|
||
如果model_names为空,返回所有模型的状态,否则返回指定模型的状态。
|
||
"""
|
||
if not model_names or len(model_names) == 0:
|
||
model_names = self.get_model_names()
|
||
print(f"Getting state for models: {model_names}")
|
||
model_states = {}
|
||
for name in model_names:
|
||
rospy.wait_for_service('/gazebo/get_model_state')
|
||
try:
|
||
get_model_state = rospy.ServiceProxy(
|
||
'/gazebo/get_model_state', GetModelState)
|
||
state = get_model_state(name, '')
|
||
model_states[name] = state
|
||
except rospy.ServiceException as e:
|
||
rospy.logerr(f"Service call failed for model {name}: {e}")
|
||
model_states[name] = None
|
||
return model_states
|
||
|
||
def get_model_properties(self, model_names: list = None) -> dict:
|
||
"""
|
||
获取模型的属性。
|
||
如果model_names为空,返回所有模型的属性,否则返回指定模型的属性。
|
||
"""
|
||
if not model_names or len(model_names) == 0:
|
||
model_names = self.get_model_names()
|
||
print(f"Getting properties for models: {model_names}")
|
||
model_properties = {}
|
||
for name in model_names:
|
||
rospy.wait_for_service('/gazebo/get_model_properties')
|
||
try:
|
||
get_model_properties = rospy.ServiceProxy(
|
||
'/gazebo/get_model_properties', GetModelProperties)
|
||
properties = get_model_properties(name)
|
||
model_properties[name] = properties
|
||
except rospy.ServiceException as e:
|
||
rospy.logerr(f"Service call failed for model {name}: {e}")
|
||
model_properties[name] = None
|
||
return model_properties
|
||
|
||
def get_link_properties(self, link_full_name: str) -> dict:
|
||
"""
|
||
获取指定链接的属性。
|
||
"""
|
||
rospy.wait_for_service('/gazebo/get_link_properties')
|
||
try:
|
||
get_link_properties = rospy.ServiceProxy(
|
||
'/gazebo/get_link_properties', GetLinkProperties)
|
||
properties = get_link_properties(link_full_name)
|
||
return properties
|
||
except rospy.ServiceException as e:
|
||
rospy.logerr(f"Service call failed for link {link_full_name}: {e}")
|
||
return None
|
||
|
||
def format_model_state(self, state: GetModelStateResponse) -> dict:
|
||
# 四元数转换为欧拉角
|
||
roll, pitch, yaw = euler_from_quaternion((
|
||
state.pose.orientation.x,
|
||
state.pose.orientation.y,
|
||
state.pose.orientation.z,
|
||
state.pose.orientation.w
|
||
))
|
||
return {
|
||
'pose': {
|
||
'position': {
|
||
'x': state.pose.position.x,
|
||
'y': state.pose.position.y,
|
||
'z': state.pose.position.z
|
||
},
|
||
'orientation': {
|
||
'roll': roll,
|
||
'pitch': pitch,
|
||
'yaw': yaw
|
||
}
|
||
},
|
||
'twist': {
|
||
'linear': {
|
||
'x': state.twist.linear.x,
|
||
'y': state.twist.linear.y,
|
||
'z': state.twist.linear.z
|
||
},
|
||
'angular': {
|
||
'x': state.twist.angular.x,
|
||
'y': state.twist.angular.y,
|
||
'z': state.twist.angular.z
|
||
}
|
||
}}
|
||
|
||
def format_model_properties(self, properties: GetModelPropertiesResponse) -> dict:
|
||
return {
|
||
'parent_model_name': properties.parent_model_name,
|
||
'canonical_body_name': properties.canonical_body_name,
|
||
'body_names': properties.body_names,
|
||
'geom_names': properties.geom_names,
|
||
'joint_names': properties.joint_names,
|
||
'child_model_names': properties.child_model_names,
|
||
'is_static': properties.is_static
|
||
}
|
||
|
||
def format_model_info(self, model_name: str, state: GetModelStateResponse, properties: GetModelPropertiesResponse) -> dict:
|
||
return {
|
||
'name': model_name,
|
||
'state': self.format_model_state(state),
|
||
'properties': self.format_model_properties(properties)
|
||
}
|
||
|
||
def get_model_state_f(self, model_names: list = None) -> list:
|
||
"""
|
||
获取模型的状态。
|
||
如果model_names为空,返回所有模型的信息,否则返回指定模型的信息。
|
||
返回值已经格式化为list-dict形式。
|
||
"""
|
||
if not model_names or len(model_names) == 0:
|
||
model_names = self.get_model_names()
|
||
print(f"Getting info for models: {model_names}")
|
||
model_info = []
|
||
states = self.get_model_state(model_names)
|
||
for name in model_names:
|
||
m_state = states[name]
|
||
info = self.format_model_state(m_state)
|
||
info['name'] = name
|
||
model_info.append(info)
|
||
return model_info
|
||
|
||
def pause_physics(self):
|
||
rospy.wait_for_service('/gazebo/pause_physics')
|
||
pause_physics = rospy.ServiceProxy('/gazebo/pause_physics', Empty)
|
||
pause_physics()
|
||
|
||
def continue_physics(self):
|
||
rospy.wait_for_service('/gazebo/unpause_physics')
|
||
unpause_physics = rospy.ServiceProxy('/gazebo/unpause_physics', Empty)
|
||
unpause_physics()
|
||
|
||
def reset_simulation(self):
|
||
rospy.wait_for_service('/gazebo/reset_simulation')
|
||
reset_simulation = rospy.ServiceProxy(
|
||
'/gazebo/reset_simulation', Empty)
|
||
reset_simulation()
|
||
|
||
def delete_model(self, name):
|
||
rospy.wait_for_service('/gazebo/delete_model')
|
||
delete_model = rospy.ServiceProxy('/gazebo/delete_model', DeleteModel)
|
||
resp = delete_model(model_name=name)
|
||
if not resp.success:
|
||
raise RuntimeError(
|
||
f"Failed to delete model {name}: {resp.status_message}")
|
||
|
||
def set_model_state(self, name, state_json):
|
||
# 解析位置
|
||
pos = state_json.get('pose', {}).get('position', {})
|
||
x = pos.get('x', 0.0)
|
||
y = pos.get('y', 0.0)
|
||
z = pos.get('z', 0.0)
|
||
|
||
# 解析欧拉角并转为四元数
|
||
ori = state_json.get('pose', {}).get('orientation', {})
|
||
roll = ori.get('roll', 0.0)
|
||
pitch = ori.get('pitch', 0.0)
|
||
yaw = ori.get('yaw', 0.0)
|
||
qx, qy, qz, qw = quaternion_from_euler(roll, pitch, yaw)
|
||
|
||
# 解析twist
|
||
linear = state_json.get('twist', {}).get('linear', {})
|
||
angular = state_json.get('twist', {}).get('angular', {})
|
||
linear_vec = Vector3(
|
||
x=linear.get('x', 0.0),
|
||
y=linear.get('y', 0.0),
|
||
z=linear.get('z', 0.0)
|
||
)
|
||
angular_vec = Vector3(
|
||
x=angular.get('x', 0.0),
|
||
y=angular.get('y', 0.0),
|
||
z=angular.get('z', 0.0)
|
||
)
|
||
# 参考系
|
||
reference_frame = state_json.get('reference', 'world')
|
||
|
||
# 构造ModelState
|
||
state = ModelState()
|
||
state.model_name = name
|
||
state.pose = Pose(
|
||
position=Point(x, y, z),
|
||
orientation=Quaternion(qx, qy, qz, qw)
|
||
)
|
||
state.twist = Twist(
|
||
linear=linear_vec,
|
||
angular=angular_vec
|
||
)
|
||
state.reference_frame = reference_frame
|
||
rospy.wait_for_service('/gazebo/set_model_state')
|
||
set_model_state = rospy.ServiceProxy(
|
||
'/gazebo/set_model_state', SetModelState)
|
||
resp = set_model_state(state)
|
||
if not resp.success:
|
||
raise RuntimeError(
|
||
f"Failed to set model state for {name}: {resp.status_message}")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
controller = GazeboROSController()
|
||
print(json.dumps(controller.get_model_state_f(['block'])))
|
||
jsond = {"pose": {"position": {"x": 1, "y": 2.0000041832461184, "z": 1}, "orientation": {"roll": 1.0008074575547628e-10, "pitch": -8.228587019137734e-12, "yaw": -3.1415190381173597}}, "twist": {"linear": {"x": 1.6640674574453256e-11, "y": 2.6277239545161537e-10, "z": 1.607170552758444e-05}, "angular": {"x": -2.2534679599858193e-09, "y": 1.551234375963502e-10, "z": -2.5445035969479123e-13}}, "name": "block"}
|
||
controller.set_model_state('block', jsond) |