import threading import time from typing import List from gazebo_ctrl import GazeboROSController from model_mgr.generator import ModelGenerator from model_mgr.model_types import (Model, ModelType, Orientation, Pose, Position, Size) from scene_mgr.scene_types import PreloadModel class ModelManager: def __init__(self): self.controller = GazeboROSController() self.generator = ModelGenerator() self.model_name_dict = {} self.model_id_dict = {} self.rlock = threading.RLock() def load_models(self, pmodels: List[PreloadModel]): static_types = [ModelType.TABLE, ModelType.TAG] with self.rlock: # 性能平凡的机器上,需要先加载静物,再加载动物 for pmod in pmodels: if pmod.model.obj_type in static_types: self.add_model(pmod.model, pmod.gen) time.sleep(5) for pmod in pmodels: if pmod.model.obj_type not in static_types: self.add_model(pmod.model, pmod.gen) def add_model(self, model: Model, add_sim: bool = True): with self.rlock: if self.model_name_dict.get(model.name): raise ValueError( f"Model with name {model.name} already exists.") if self.model_id_dict.get(model.id): raise ValueError(f"Model with id {model.id} already exists.") if model.obj_type == ModelType.UNKNOWN or model.obj_type not in ModelType: raise ValueError(f"Invalid model type: {model.obj_type}") # 信息添加 self.model_name_dict[model.name] = model self.model_id_dict[model.id] = model # 仿真添加 if not add_sim: return # TODO: (谁爱做谁做)场景决定id-类型定义 if model.obj_type == ModelType.ROBOT: raise ValueError(f'Cannot add robot model {model.id} {model.name}') if model.obj_type == ModelType.TABLE: self.generator.generate_table(name=model.name, x=model.pose.position.x, y=model.pose.position.y, height=model.pose.position.z, roll=model.pose.orientation.roll, pitch=model.pose.orientation.pitch, yaw=model.pose.orientation.yaw) elif model.obj_type == ModelType.TAG: self.generator.generate_apriltag(name=model.name, tag_id=model.tag_id, x=model.pose.position.x, y=model.pose.position.y, z=model.pose.position.z, roll=model.pose.orientation.roll, pitch=model.pose.orientation.pitch, yaw=model.pose.orientation.yaw) elif model.obj_type == ModelType.BOX: self.generator.generate_box_with_apriltag(name=model.name, l=model.size.length, w=model.size.width, h=model.size.height, tag_id=model.tag_id, x=model.pose.position.x, y=model.pose.position.y, z=model.pose.position.z, roll=model.pose.orientation.roll, pitch=model.pose.orientation.pitch, yaw=model.pose.orientation.yaw) def remove_model(self, model_id: int): with self.rlock: # 信息删除 model = self.model_id_dict.get(model_id) if model.obj_type == ModelType.ROBOT: raise ValueError(f'Cannot remove robot model {model_id} {model.name}') name = model.name del self.model_id_dict[model_id] del self.model_name_dict[name] # 仿真删除 self.controller.delete_model(name) def get_model(self, model_id: int) -> Model: with self.rlock: mod = self.model_id_dict.get(model_id, None) if not mod: raise ValueError(f"Model with id {model_id} does not exist.") self._update_model_pose_from_gazebo(mod) return mod def get_all_models(self) -> List[Model]: with self.rlock: li = list(self.model_id_dict.values()) for mod in li: self._update_model_pose_from_gazebo(mod) return li def modify_model(self, model: Model): with self.rlock: # 视觉元素tag.id、box.size、table.height无法动态修改,于是我们选择先删除再添加 if model.obj_type == ModelType.ROBOT: raise ValueError(f'Cannot modify robot model {model.id} {model.name}') self.remove_model(model.id) self.add_model(model) def _update_model_pose_from_gazebo(self, model: Model): with self.rlock: name = model.name info = self.controller.get_model_state_f([name]) if info: model.pose = Pose.from_dict(info[0]['pose']) def _update_model_dict(self, model): with self.rlock: self.model_id_dict[model.id] = model self.model_name_dict[model.name] = model if __name__ == "__main__": mgr = ModelManager() mod = Model( id=1, name="box", obj_type=ModelType.BOX, pose=Pose( position=Position(x=2, y=1, z=1), orientation=Orientation(roll=0, pitch=0, yaw=0) ), size=Size(length=0.5, width=0.5, height=0.5) ) mgr.add_model(mod) liss = mgr.get_all_models() for m in liss: print(m.to_dict()) mod.pose.position.x = 3 mod.pose.position.y = 2 mgr.modify_model(mod) print(mgr.get_model(1).to_dict()) mgr.remove_model(1) print(mgr.get_all_models())