gazebo_world_manager/model_mgr/manager.py
2025-08-12 14:05:18 +08:00

123 lines
5.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import threading
from typing import List
from gazebo_ctrl import GazeboROSController
from model_mgr.generator import ModelGenerator
from model_mgr.model_types import (Model, ModelType, Orientation, Pose,
Position, Size)
from scene_mgr.scene_types import PreloadModel
class ModelManager:
def __init__(self):
self.controller = GazeboROSController()
self.generator = ModelGenerator()
self.model_name_dict = {}
self.model_id_dict = {}
self.rlock = threading.RLock()
def load_models(self, pmodels: List[PreloadModel]):
with self.rlock:
for pmod in pmodels:
self.add_model(pmod.model, pmod.gen)
def add_model(self, model: Model, add_sim: bool = True):
with self.rlock:
if self.model_name_dict.get(model.name):
raise ValueError(
f"Model with name {model.name} already exists.")
if self.model_id_dict.get(model.id):
raise ValueError(f"Model with id {model.id} already exists.")
if model.obj_type == ModelType.UNKNOWN or model.obj_type not in ModelType:
raise ValueError(f"Invalid model type: {model.obj_type}")
# 信息添加
self.model_name_dict[model.name] = model
self.model_id_dict[model.id] = model
# 仿真添加
if not add_sim:
return
# TODO: 谁爱做谁做场景决定id-类型定义
if model.obj_type == ModelType.ROBOT:
raise ValueError(f'Cannot add robot model {model.id} {model.name}')
if model.obj_type == ModelType.TABLE:
self.generator.generate_table(name=model.name, x=model.pose.position.x, y=model.pose.position.y, height=model.pose.position.z,
roll=model.pose.orientation.roll, pitch=model.pose.orientation.pitch, yaw=model.pose.orientation.yaw)
elif model.obj_type == ModelType.TAG:
self.generator.generate_apriltag(name=model.name, tag_id=model.tag_id, x=model.pose.position.x, y=model.pose.position.y,
z=model.pose.position.z, roll=model.pose.orientation.roll, pitch=model.pose.orientation.pitch, yaw=model.pose.orientation.yaw)
elif model.obj_type == ModelType.BOX:
self.generator.generate_box_with_apriltag(name=model.name, l=model.size.length, w=model.size.width, h=model.size.height,
tag_id=model.tag_id, x=model.pose.position.x, y=model.pose.position.y, z=model.pose.position.z,
roll=model.pose.orientation.roll, pitch=model.pose.orientation.pitch, yaw=model.pose.orientation.yaw)
def remove_model(self, model_id: int):
with self.rlock:
# 信息删除
model = self.model_id_dict.get(model_id)
if model.obj_type == ModelType.ROBOT:
raise ValueError(f'Cannot remove robot model {model_id} {model.name}')
name = model.name
del self.model_id_dict[model_id]
del self.model_name_dict[name]
# 仿真删除
self.controller.delete_model(name)
def get_model(self, model_id: int) -> Model:
with self.rlock:
mod = self.model_id_dict.get(model_id, None)
if not mod:
raise ValueError(f"Model with id {model_id} does not exist.")
self._update_model_pose_from_gazebo(mod)
return mod
def get_all_models(self) -> List[Model]:
with self.rlock:
li = list(self.model_id_dict.values())
for mod in li:
self._update_model_pose_from_gazebo(mod)
return li
def modify_model(self, model: Model):
with self.rlock:
# 视觉元素tag.id、box.size、table.height无法动态修改于是我们选择先删除再添加
if model.obj_type == ModelType.ROBOT:
raise ValueError(f'Cannot remove robot model {model.id} {model.name}')
self.remove_model(model.id)
self.add_model(model)
def _update_model_pose_from_gazebo(self, model: Model):
with self.rlock:
name = model.name
info = self.controller.get_model_state_f([name])
if info:
model.pose = Pose.from_dict(info[0]['pose'])
def _update_model_dict(self, model):
with self.rlock:
self.model_id_dict[model.id] = model
self.model_name_dict[model.name] = model
if __name__ == "__main__":
mgr = ModelManager()
mod = Model(
id=1,
name="box",
obj_type=ModelType.BOX,
pose=Pose(
position=Position(x=2, y=1, z=1),
orientation=Orientation(roll=0, pitch=0, yaw=0)
),
size=Size(length=0.5, width=0.5, height=0.5)
)
mgr.add_model(mod)
liss = mgr.get_all_models()
for m in liss:
print(m.to_dict())
mod.pose.position.x = 3
mod.pose.position.y = 2
mgr.modify_model(mod)
print(mgr.get_model(1).to_dict())
mgr.remove_model(1)
print(mgr.get_all_models())