diff --git a/examples/reach_plan_sweep.py b/examples/reach_plan_sweep.py deleted file mode 100644 index 1f90885..0000000 --- a/examples/reach_plan_sweep.py +++ /dev/null @@ -1,77 +0,0 @@ -"""Script to run reach_plan_sweep.""" - -import argparse - -from isaaclab.app import AppLauncher - -parser = argparse.ArgumentParser( - description=( - "Uniformly sample different object yaw rotations, select the runtime reach candidate, and sweep locally with" - " cuRobo." - ) -) -parser.add_argument("--pipeline_id", required=True, type=str) -parser.add_argument( - "--reach_skill_index", - default=0, - type=int, - help="Which reach skill to sweep at (0-based, globally across all subtasks)", -) -parser.add_argument("--num_samples", default=64, type=int) -parser.add_argument("--dx", default=0.01, type=float, help="dx range is [-dx, dx] meters") -parser.add_argument("--dy", default=0.01, type=float, help="dy range is [-dy, dy] meters") -parser.add_argument("--dz", default=0.01, type=float, help="dz range is [-dz, dz] meters") -parser.add_argument("--yaw_deg", default=10.0, type=float, help="yaw range is [-yaw_deg, yaw_deg] degrees") -parser.add_argument("--seed", default=42, type=int) -parser.add_argument("--top_k", default=10, type=int) -parser.add_argument( - "--num_object_rotations", - default=4, - type=int, - help="Number of uniformly sampled object yaw rotations in [0, 360) degrees.", -) -parser.add_argument( - "--ik_only", - action="store_true", - help="Use IK-only solving instead of full motion planning (faster reachability check)", -) - -AppLauncher.add_app_launcher_args(parser) -args_cli = parser.parse_args() - -# Launch Isaac Sim / IsaacLab app -app_launcher_args = vars(args_cli) -app_launcher = AppLauncher(app_launcher_args) -simulation_app = app_launcher.app - -import math - -import autosim_examples # noqa: F401 -from autosim import make_pipeline -from autosim.calibration.plan_sweep import ReachPlanSweepCfg, reach_plan_sweep -from autosim.calibration.pose_sampler import OffsetSampler - - -def main() -> None: - pipeline = make_pipeline(args_cli.pipeline_id) - reach_plan_sweep( - pipeline, - ReachPlanSweepCfg( - reach_skill_index=args_cli.reach_skill_index, - sampling=OffsetSampler( - num_samples=args_cli.num_samples, - dx_range=(-args_cli.dx, args_cli.dx), - dy_range=(-args_cli.dy, args_cli.dy), - dz_range=(-args_cli.dz, args_cli.dz), - yaw_range_rad=(-math.radians(args_cli.yaw_deg), math.radians(args_cli.yaw_deg)), - seed=args_cli.seed, - ), - top_k=args_cli.top_k, - ik_only=args_cli.ik_only, - num_object_rotations=args_cli.num_object_rotations, - ), - ) - - -if __name__ == "__main__": - main() diff --git a/source/autosim/autosim/calibration/__init__.py b/source/autosim/autosim/calibration/__init__.py deleted file mode 100644 index 0218ae0..0000000 --- a/source/autosim/autosim/calibration/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -from .plan_sweep import ReachPlanSweepCfg, reach_plan_sweep -from .pose_sampler import OffsetSampler, PoseSampler - -__all__ = [ - "PoseSampler", - "OffsetSampler", - "ReachPlanSweepCfg", - "reach_plan_sweep", -] diff --git a/source/autosim/autosim/calibration/plan_sweep.py b/source/autosim/autosim/calibration/plan_sweep.py deleted file mode 100644 index 923c264..0000000 --- a/source/autosim/autosim/calibration/plan_sweep.py +++ /dev/null @@ -1,383 +0,0 @@ -import math -import time -from dataclasses import dataclass, field -from typing import Any - -import isaaclab.utils.math as PoseUtils -import torch - -from autosim.core.pipeline import AutoSimPipeline -from autosim.core.registration import SkillRegistry -from autosim.skills.reach import ReachSkill -from autosim.utils.data_util import as_torch - -from .pose_sampler import OffsetSampler, PoseSampler - - -@dataclass -class ReachPlanSweepCfg: - reach_skill_index: int = 0 - """Which reach skill to sweep at (0-based, globally across all subtasks).""" - sampling: PoseSampler = field(default_factory=OffsetSampler) - """Sampler used to generate candidate poses around the base pose.""" - top_k: int = 10 - """Number of top poses to print.""" - ik_only: bool = False - """If True, use IK-only solving instead of full motion planning. Much faster for reachability checking; does not produce trajectories.""" - num_object_rotations: int = 4 - """Number of uniformly sampled object yaw rotations in [0, 2π).""" - - -def _tensor_to_list(x: torch.Tensor) -> list[float]: - """Convert a tensor to a flat Python float list for reporting/serialization.""" - return [float(v) for v in x.detach().cpu().flatten().tolist()] - - -def _fmt_pose(vals: list[float]) -> str: - """Format a 7-value pose as a Python list literal, ready to copy-paste into code.""" - return "[" + ", ".join(f"{v:.4f}" for v in vals) + "]" - - -def _quat_mul(q1: torch.Tensor, q2: torch.Tensor) -> torch.Tensor: - """Multiply quaternions in xyzw format elementwise over the leading dimensions.""" - x1, y1, z1, w1 = q1.unbind(-1) - x2, y2, z2, w2 = q2.unbind(-1) - return torch.stack( - [ - w1 * x2 + x1 * w2 + y1 * z2 - z1 * y2, - w1 * y2 - x1 * z2 + y1 * w2 + z1 * x2, - w1 * z2 + x1 * y2 - y1 * x2 + z1 * w2, - w1 * w2 - x1 * x2 - y1 * y2 - z1 * z2, - ], - dim=-1, - ) - - -def _uniform_yaw_rotations(device: torch.device, dtype: torch.dtype, num_rotations: int) -> list[torch.Tensor]: - """Generate uniformly spaced yaw-only quaternions over a full 360° rotation.""" - if num_rotations <= 0: - return [] - rotations = [] - for idx in range(num_rotations): - yaw = (2.0 * math.pi * idx) / num_rotations - half = yaw * 0.5 - rotations.append(torch.tensor([0.0, 0.0, math.sin(half), math.cos(half)], device=device, dtype=dtype)) - return rotations - - -def _row_sort_key(row: dict[str, Any]) -> tuple[float, float, float]: - """Rank sampled poses by success first, then shorter trajectories, then lower position error.""" - return ( - 0.0 if row["plan_success"] else 1.0, - float(row["traj_len"]) if row["traj_len"] is not None else 10**8, - float(row["position_error"]) if row["position_error"] is not None else 10**8, - ) - - -def _build_extra_target_link_goals( - reach_skill: ReachSkill, - activate_q: torch.Tensor, - target_poses_r: torch.Tensor, - env_extra_info, -) -> dict[str, torch.Tensor] | None: - """Build batched extra link goals by reusing ReachSkill logic.""" - if not reach_skill.cfg.extra_cfg.extra_target_link_names: - return None - - goal_list = [] - for target_pose in target_poses_r: - goal_list.append(reach_skill._build_extra_target_poses(activate_q, target_pose, env_extra_info)) # noqa: SLF001 - - first_goal = goal_list[0] - if first_goal is None: - return None - - return {link_name: torch.stack([goal[link_name] for goal in goal_list], dim=0) for link_name in first_goal.keys()} - - -def reach_plan_sweep(pipeline: AutoSimPipeline, cfg: ReachPlanSweepCfg) -> list[dict[str, Any]]: - """ - Execute the pipeline step by step. When the reach_skill_index-th reach skill - is encountered, capture the live robot and object state and sweep around the - reach target pose using cuRobo batch planning. - - All skills before the target reach skill are executed normally, so the - environment reflects the actual state at the point of interest. - - The offline tuning flow uniformly samples object yaw rotations, applies each - rotation to the target object, reuses the runtime reach candidate selection - logic to choose a base pose, and then runs the local sweep around that pose. - - Returns: - One result block per sampled object rotation. - """ - - pipeline.initialize() - - decompose_result = pipeline.decompose() - pipeline._check_skill_extra_cfg() - pipeline.reset_env() - - reach_skill_counter = 0 - - for subtask in decompose_result.subtasks: - for skill_info in subtask.skills: - skill = SkillRegistry.create( - skill_info.skill_type, - pipeline.cfg.skills.get(skill_info.skill_type).extra_cfg, - ) - - if pipeline._action_adapter.should_skip_apply(skill): - continue - - is_reach = skill_info.skill_type == "reach" - - if is_reach and reach_skill_counter == cfg.reach_skill_index: - obj_name = skill_info.target_object - return _sweep_all_rotations(pipeline, cfg, obj_name, skill) - - goal = skill.extract_goal_from_info(skill_info, pipeline._env, pipeline._env_extra_info) - if is_reach: - reach_skill_counter += 1 - - success, _, episode_done = pipeline._execute_single_skill(skill, goal) - if episode_done: - raise ValueError( - f"Episode completed during skill '{skill_info.skill_type}' (step {skill_info.step}) before reaching" - f" target reach skill (index {cfg.reach_skill_index})." - ) - if not success: - raise ValueError( - f"Skill '{skill_info.skill_type}' (step {skill_info.step}) failed before reaching target reach" - f" skill (index {cfg.reach_skill_index})." - ) - - raise ValueError( - f"reach_skill_index={cfg.reach_skill_index} is out of range: only {reach_skill_counter} reach skill(s) found in" - " the decompose result." - ) - - -def _sweep_all_rotations( - pipeline: AutoSimPipeline, - cfg: ReachPlanSweepCfg, - obj_name: str, - reach_skill: ReachSkill, -) -> list[dict[str, Any]]: - """Evaluate the target reach step across uniformly sampled object yaw rotations. - - For each sampled object rotation, this function temporarily updates the object's - world pose, reuses the runtime reach candidate selector to choose a base pose, - runs the local sweep around that base pose, and finally restores the original - object pose. - """ - env = pipeline._env - env_extra_info = pipeline._env_extra_info - obj = env.scene[obj_name] - - original_pose_w = as_torch(obj.data.root_pose_w)[pipeline._env_id].clone() - base_quat_w = original_pose_w[3:].clone() - env_ids = torch.tensor([pipeline._env_id], device=original_pose_w.device, dtype=torch.int32) - results: list[dict[str, Any]] = [] - - try: - for rotation_idx, yaw_quat_w in enumerate( - _uniform_yaw_rotations(original_pose_w.device, original_pose_w.dtype, cfg.num_object_rotations) - ): - rotated_pose_w = original_pose_w.clone().unsqueeze(0) - rotated_pose_w[0, 3:] = _quat_mul(base_quat_w.unsqueeze(0), yaw_quat_w.unsqueeze(0)).squeeze(0) - obj.write_root_pose_to_sim(rotated_pose_w, env_ids=env_ids) - - candidates = env_extra_info.get_reach_target_poses(obj_name) - selected_pose_oe = reach_skill._select_best_candidate( # noqa: SLF001 - env, obj_name, candidates, env_extra_info - ).to(env.device) - selected_idx = next( - idx for idx, pose in enumerate(candidates) if torch.allclose(pose.to(env.device), selected_pose_oe) - ) - - result_block = _sweep( - pipeline=pipeline, - cfg=cfg, - obj_name=obj_name, - base_pose_oe=selected_pose_oe, - reach_skill=reach_skill, - rotation_idx=rotation_idx, - object_pose_w=rotated_pose_w.squeeze(0), - selected_candidate_idx=selected_idx, - ) - results.append(result_block) - finally: - obj.write_root_pose_to_sim(original_pose_w.unsqueeze(0), env_ids=env_ids) - - _print_summary(obj_name, cfg, results) - return results - - -def _print_summary(obj_name: str, cfg: ReachPlanSweepCfg, results: list[dict[str, Any]]) -> None: - """Print the final per-rotation report and per-selected-candidate aggregate summary.""" - if not results: - return - - _SEP = "═" * 100 - print() - print(_SEP) - print(f" reach_plan_sweep summary │ object='{obj_name}' reach_skill_index={cfg.reach_skill_index}") - print(_SEP) - - candidate_summary: dict[int, dict[str, Any]] = {} - - for block in results: - rotation_idx = block["rotation_index"] - success_count = block["success_count"] - num_samples = block["num_samples"] - elapsed_ms = block["elapsed_ms"] - selected_candidate_idx = block["selected_candidate_idx"] - selected_base_pose_oe = block["selected_base_pose_oe"] - print( - f" rotation={rotation_idx:02d} selected_candidate={selected_candidate_idx} " - f"success={success_count}/{num_samples} ({success_count / num_samples:.1%}) time={elapsed_ms:.0f} ms" - ) - print(f" base_pose={_fmt_pose(selected_base_pose_oe)}") - for rank, row in enumerate(block["top_k"]): - mark = "✓" if row["plan_success"] else "✗" - metric = ( - f"traj_len={row['traj_len']}" if row["traj_len"] is not None else f"pos_err={row['position_error']:.4f}" - ) - print(f" [{rank}] {mark} {_fmt_pose(row['pose_oe'])} # {metric}") - print("─" * 100) - - summary = candidate_summary.setdefault( - selected_candidate_idx, - { - "count": 0, - "total_success": 0, - "total_samples": 0, - "total_time_ms": 0.0, - "base_pose": selected_base_pose_oe, - "recommended_pose": None, - "recommended_row": None, - }, - ) - summary["count"] += 1 - summary["total_success"] += success_count - summary["total_samples"] += num_samples - summary["total_time_ms"] += elapsed_ms - - candidate_best_row = min(block["top_k"], key=_row_sort_key) if block["top_k"] else None - if candidate_best_row is not None: - if summary["recommended_row"] is None or _row_sort_key(candidate_best_row) < _row_sort_key( - summary["recommended_row"] - ): - summary["recommended_row"] = candidate_best_row - summary["recommended_pose"] = candidate_best_row["pose_oe"] - - print() - print(_SEP) - print(" selected_candidate aggregate") - print(_SEP) - for candidate_idx in sorted(candidate_summary): - summary = candidate_summary[candidate_idx] - total_samples = summary["total_samples"] - success_rate = summary["total_success"] / total_samples if total_samples > 0 else 0.0 - avg_time_ms = summary["total_time_ms"] / summary["count"] if summary["count"] > 0 else 0.0 - print( - f" candidate={candidate_idx} selected_in={summary['count']} rotation(s) " - f"success={summary['total_success']}/{total_samples} ({success_rate:.1%}) avg_time={avg_time_ms:.0f} ms" - ) - print(f" base_pose={_fmt_pose(summary['base_pose'])}") - if summary["recommended_pose"] is not None: - print(f" recommended_pose={_fmt_pose(summary['recommended_pose'])}") - print(_SEP) - - -def _sweep( - pipeline: AutoSimPipeline, - cfg: ReachPlanSweepCfg, - obj_name: str, - base_pose_oe: torch.Tensor, - reach_skill: ReachSkill, - rotation_idx: int, - object_pose_w: torch.Tensor, - selected_candidate_idx: int, -) -> dict[str, Any]: - """ - Core sweep logic. Called once the environment is in the correct pre-reach state. - - Samples K candidate poses around the selected base reach target (object frame), - transforms them to robot root frame, then batch-plans with cuRobo. When - configured, extra link goals are generated from the live joint state using the - same extra-target strategy as `ReachSkill.extract_goal_from_info()`. - """ - - env = pipeline._env - env_id = pipeline._env_id - env_extra_info = pipeline._env_extra_info - planner = pipeline._motion_planner - robot = pipeline._robot - - base_pose_oe = torch.as_tensor(base_pose_oe, device=env.device, dtype=torch.float32).view(7) - poses_oe = cfg.sampling.sample(base_pose_oe) - k = int(poses_oe.shape[0]) - - obj_pose_w = as_torch(env.scene[obj_name].data.root_pose_w)[env_id] - obj_pos_w = obj_pose_w[:3].view(1, 3).repeat(k, 1) - obj_quat_w = obj_pose_w[3:].view(1, 4).repeat(k, 1) - - target_pos_w, target_quat_w = PoseUtils.combine_frame_transforms( - obj_pos_w, obj_quat_w, poses_oe[:, :3], poses_oe[:, 3:] - ) - robot_root_pose_w = as_torch(robot.data.root_pose_w)[env_id] - rr_pos_w = robot_root_pose_w[:3].view(1, 3).repeat(k, 1) - rr_quat_w = robot_root_pose_w[3:].view(1, 4).repeat(k, 1) - target_pos_r, target_quat_r = PoseUtils.subtract_frame_transforms(rr_pos_w, rr_quat_w, target_pos_w, target_quat_w) - - state = pipeline._build_world_state() - activate_q, activate_qd = reach_skill._build_activate_joint_state( # noqa: SLF001 - state.sim_joint_names, state.robot_joint_pos, state.robot_joint_vel - ) - if activate_qd is None: - raise ValueError("activate_qd should not be None when sweep planning reach trajectories.") - - target_poses_r = torch.cat((target_pos_r, target_quat_r), dim=-1) - link_goals = _build_extra_target_link_goals(reach_skill, activate_q, target_poses_r, env_extra_info) - - t0 = time.time() - if cfg.ik_only: - result = planner.solve_ik_batch(target_pos_r, target_quat_r, link_goals=link_goals) - else: - result = planner.plan_motion_batch(target_pos_r, target_quat_r, activate_q, activate_qd, link_goals=link_goals) - dt_ms = (time.time() - t0) * 1000.0 - - success = ( - result.success.detach().cpu().bool().reshape(-1) - if result.success is not None - else torch.zeros((k,), dtype=torch.bool) - ) - pos_err = result.position_error.detach().cpu().reshape(-1) if result.position_error is not None else None - traj_last = ( - torch.as_tensor(result.path_buffer_last_tstep).reshape(-1) - if (not cfg.ik_only and result.path_buffer_last_tstep is not None) - else None - ) - - rows = [] - for i in range(k): - rows.append({ - "pose_oe": _tensor_to_list(poses_oe[i]), - "plan_success": bool(success[i].item()), - "traj_len": int(traj_last[i]) if traj_last is not None else None, - "position_error": float(pos_err[i].item()) if pos_err is not None else None, - }) - - top_k = sorted(rows, key=_row_sort_key)[: cfg.top_k] - return { - "rotation_index": rotation_idx, - "object_pose_w": _tensor_to_list(object_pose_w), - "selected_candidate_idx": selected_candidate_idx, - "selected_base_pose_oe": _tensor_to_list(base_pose_oe), - "success_count": int(success.sum().item()), - "num_samples": k, - "elapsed_ms": dt_ms, - "top_k": top_k, - } diff --git a/source/autosim/autosim/calibration/pose_sampler.py b/source/autosim/autosim/calibration/pose_sampler.py deleted file mode 100644 index 9043a79..0000000 --- a/source/autosim/autosim/calibration/pose_sampler.py +++ /dev/null @@ -1,76 +0,0 @@ -from abc import ABC, abstractmethod - -import torch - - -class PoseSampler(ABC): - @abstractmethod - def sample(self, base_pose_oe: torch.Tensor) -> torch.Tensor: - """ - Sample candidate poses around a base pose in object frame. - - Args: - base_pose_oe: Tensor shape [7] in object frame [x,y,z,qx,qy,qz,qw] - - Returns: - poses_oe: Tensor [K, 7] of sampled poses in object frame - """ - - -class OffsetSampler(PoseSampler): - """Samples poses by adding uniform random dx/dy/dz/yaw offsets around a base pose.""" - - def __init__( - self, - num_samples: int = 64, - dx_range: tuple[float, float] = (-0.03, 0.03), - dy_range: tuple[float, float] = (-0.03, 0.03), - dz_range: tuple[float, float] = (-0.02, 0.02), - yaw_range_rad: tuple[float, float] = (-0.35, 0.35), - seed: int = 0, - ): - self.num_samples = num_samples - self.dx_range = dx_range - self.dy_range = dy_range - self.dz_range = dz_range - self.yaw_range_rad = yaw_range_rad - self.seed = seed - - def sample(self, base_pose_oe: torch.Tensor) -> torch.Tensor: - if base_pose_oe.shape != (7,): - raise ValueError(f"base_pose_oe must have shape [7], got {tuple(base_pose_oe.shape)}") - - g = torch.Generator(device=base_pose_oe.device) - g.manual_seed(int(self.seed)) - - k = self.num_samples - dx = torch.empty((k,), device=base_pose_oe.device).uniform_(*self.dx_range, generator=g) - dy = torch.empty((k,), device=base_pose_oe.device).uniform_(*self.dy_range, generator=g) - dz = torch.empty((k,), device=base_pose_oe.device).uniform_(*self.dz_range, generator=g) - dyaw = torch.empty((k,), device=base_pose_oe.device).uniform_(*self.yaw_range_rad, generator=g) - - pos = base_pose_oe[:3].view(1, 3).repeat(k, 1) + torch.stack([dx, dy, dz], dim=-1) - - # Apply yaw delta around object local +Z axis: q_new = q_base ⊗ q_delta - base_quat = base_pose_oe[3:].view(1, 4).repeat(k, 1) - half = dyaw * 0.5 - q_delta = torch.stack( - [torch.zeros_like(half), torch.zeros_like(half), torch.sin(half), torch.cos(half)], dim=-1 - ) - quat = self._quat_mul(base_quat, q_delta) - - return torch.cat([pos, quat], dim=-1) - - @staticmethod - def _quat_mul(q1: torch.Tensor, q2: torch.Tensor) -> torch.Tensor: - x1, y1, z1, w1 = q1.unbind(-1) - x2, y2, z2, w2 = q2.unbind(-1) - return torch.stack( - [ - w1 * x2 + x1 * w2 + y1 * z2 - z1 * y2, - w1 * y2 - x1 * z2 + y1 * w2 + z1 * x2, - w1 * z2 + x1 * y2 - y1 * x2 + z1 * w2, - w1 * w2 - x1 * x2 - y1 * y2 - z1 * z2, - ], - dim=-1, - ) diff --git a/source/autosim/autosim/capabilities/motion_planning/curobo/curobo_planner.py b/source/autosim/autosim/capabilities/motion_planning/curobo/curobo_planner.py index 702302e..0306446 100644 --- a/source/autosim/autosim/capabilities/motion_planning/curobo/curobo_planner.py +++ b/source/autosim/autosim/capabilities/motion_planning/curobo/curobo_planner.py @@ -105,9 +105,11 @@ def __init__( self.usd_helper = UsdHelper() self.usd_helper.load_stage(env.scene.stage) - # Warm up planner + # Warm up planner in batch mode (plan_motion always uses plan_batch) self._logger.info("Warming up motion planner...") - self.motion_gen.warmup(enable_graph=self.cfg.use_cuda_graph, warmup_js_trajopt=False) + self.motion_gen.warmup( + enable_graph=self.cfg.use_cuda_graph, warmup_js_trajopt=False, batch=self.cfg.max_batch_size + ) # Define supported cuRobo primitive types for object discovery and pose synchronization self.primitive_types: list[str] = ["mesh", "cuboid", "sphere", "capsule", "cylinder", "voxel", "blox"] @@ -682,135 +684,34 @@ def plan_motion( link_goals: dict[str, torch.Tensor] | None = None, ) -> JointState | None: """ - Plan a trajectory to reach a target pose from a current joint state. - - Args: - target_pos: Target position [x, y, z] - target_quat: Target quaternion [qx, qy, qz, qw] - current_q: Current joint positions - current_qd: Current joint velocities - link_goals: Optional dictionary mapping link names to target poses for other links - - Returns: - JointState of the trajectory or None if planning failed - """ - - # Refine collision world before planning - self._refine_curobo_world_collision() - - if current_qd is None: - current_qd = torch.zeros_like(current_q) - dof_needed = len(self.target_joint_names) - - # adjust the joint number - if len(current_q) < dof_needed: - pad = torch.zeros(dof_needed - len(current_q), dtype=current_q.dtype) - current_q = torch.concatenate([current_q, pad], axis=0) - current_qd = torch.concatenate([current_qd, torch.zeros_like(pad)], axis=0) - elif len(current_q) > dof_needed: - current_q = current_q[:dof_needed] - current_qd = current_qd[:dof_needed] - - joint_limits = self.motion_gen.kinematics.get_joint_limits() - current_q = torch.clamp( - self._to_curobo_device(current_q), joint_limits.position[0], joint_limits.position[1] - ).to(current_q.device) - - # build the target pose - goal = Pose( - position=self._to_curobo_device(target_pos), - quaternion=self._to_curobo_device(convert_quat(target_quat, to="wxyz")), # xyzw → wxyz - ) - - # build the current state - state = JointState( - position=self._to_curobo_device(current_q), - velocity=self._to_curobo_device(current_qd) * 0.0, - acceleration=self._to_curobo_device(current_qd) * 0.0, - jerk=self._to_curobo_device(current_qd) * 0.0, - joint_names=self.target_joint_names, - ) + Plan a trajectory to one of K candidate target poses from a shared start joint state. - current_joint_state: JointState = state.get_ordered_joint_state(self.target_joint_names) - - # Prepare link_poses for multi-arm robots - link_poses = None - if link_goals is not None: - # Use provided link goals - link_poses = { - link_name: Pose( - position=self._to_curobo_device(pose[:3]), - quaternion=self._to_curobo_device(convert_quat(pose[3:], to="wxyz")), # xyzw → wxyz - ) - for link_name, pose in link_goals.items() - } - - # Build per-call plan config: clone only when we need to attach a pose_cost_metric - # so the shared self.plan_config is never mutated. - if self.cfg.reach_partial_pose_weight is not None: - weights = torch.tensor( - self.cfg.reach_partial_pose_weight, - device=self.tensor_args.device, - dtype=self.tensor_args.dtype, - ) - pose_metric = PoseCostMetric(reach_partial_pose=True, reach_vec_weight=weights) - active_plan_config = self.plan_config.clone() - active_plan_config.pose_cost_metric = pose_metric - self._logger.debug(f"reach_partial_pose_weight applied: {self.cfg.reach_partial_pose_weight}") - else: - active_plan_config = self.plan_config - - # execute planning - result = self.motion_gen.plan_single( - current_joint_state.unsqueeze(0), - goal, - active_plan_config, - link_poses=link_poses, - ) - - if result.success.item(): - current_plan = result.get_interpolated_plan() - motion_plan = current_plan.get_ordered_joint_state(self.target_joint_names) - - self._logger.debug(f"planning succeeded with {len(motion_plan.position)} waypoints") - return motion_plan - else: - self._logger.warning(f"planning failed: {result.status}") - return None - - def plan_motion_batch( - self, - target_pos: torch.Tensor, - target_quat: torch.Tensor, - current_q: torch.Tensor, - current_qd: torch.Tensor | None = None, - link_goals: dict[str, torch.Tensor] | None = None, - ): - """ - Plan trajectories for a batch of target poses from the same start joint state. - - This uses cuRobo's batch API (`MotionGen.plan_batch`) under the hood. + Always uses cuRobo's batch planning under the hood (``MotionGen.plan_batch``). + K=1 is treated identically to K>1, so callers do not need to branch on candidate count. + Among successful candidates, the trajectory with minimum + ``position_error + rotation_error`` is returned. Args: - target_pos: Tensor of shape [K, 3], in robot root frame. - target_quat: Tensor of shape [K, 4] in [qx, qy, qz, qw], in robot root frame. - current_q: Tensor of shape [dof], current joint positions. - current_qd: Tensor of shape [dof], current joint velocities. Defaults to zeros. - link_goals: Optional dict mapping extra link names to tensors of shape [K, 7] - ([x, y, z, qx, qy, qz, qw], robot root frame) for multi-arm robots. Each entry - specifies the simultaneous target pose of that link for every sample in the batch. + target_pos: Target positions, shape ``[K, 3]``, in robot root frame. + target_quat: Target quaternions, shape ``[K, 4]`` in ``[qx, qy, qz, qw]``, in robot root frame. + current_q: Current joint positions, shape ``[dof]``. Shared across all K candidates. + current_qd: Current joint velocities, shape ``[dof]``. Defaults to zeros. + link_goals: Optional dict mapping extra link names to tensors of shape ``[K, 7]`` + (``[x, y, z, qx, qy, qz, qw]``, robot root frame) for multi-arm robots. Returns: - MotionGenResult (cuRobo). Check `result.success[k]` for each batch index. + JointState of the best successful trajectory (ordered by ``target_joint_names``), + or None if every candidate failed to plan. Note: - `time_dilation_factor` is always suppressed for batch planning because cuRobo's - `retime_trajectory` does not support batch results. + ``time_dilation_factor`` is suppressed because cuRobo's ``retime_trajectory`` does not + support batched results. """ # Refine collision world before planning self._refine_curobo_world_collision() + # Shape validation if target_pos.ndim != 2 or target_pos.shape[-1] != 3: raise ValueError(f"target_pos must have shape [K, 3], got {tuple(target_pos.shape)}") if target_quat.ndim != 2 or target_quat.shape[-1] != 4: @@ -819,101 +720,122 @@ def plan_motion_batch( raise ValueError( f"Batch size mismatch: target_pos has {target_pos.shape[0]}, target_quat has {target_quat.shape[0]}" ) - k = target_pos.shape[0] + k = int(target_pos.shape[0]) if link_goals is not None: - for ee_name, poses in link_goals.items(): + for link_name, poses in link_goals.items(): if poses.ndim != 2 or poses.shape != (k, 7): - raise ValueError(f"link_goals['{ee_name}'] must have shape [{k}, 7], got {tuple(poses.shape)}") + raise ValueError(f"link_goals['{link_name}'] must have shape [{k}, 7], got {tuple(poses.shape)}") + + # Pad batch to max_batch_size so the CUDA graph buffer shape stays fixed across calls. + # Extra padded entries reuse the first goal and are masked out when selecting results. + padded_k = self.cfg.max_batch_size + if k > padded_k: + from curobo.util.torch_utils import is_cuda_graph_reset_available + + if is_cuda_graph_reset_available(): + self._logger.warning( + f"Number of reach candidates ({k}) exceeds max_batch_size ({padded_k}). " + "CUDA graph will be reset (one-time performance cost). " + f"Consider increasing CuroboPlannerCfg.max_batch_size to at least {k}." + ) + padded_k = k + else: + raise ValueError( + f"Number of reach candidates ({k}) exceeds CuroboPlannerCfg.max_batch_size ({padded_k}) " + f"and CUDA graph reset is not available. Either increase max_batch_size to at least {k}, " + "or set environment variable CUROBO_TORCH_CUDA_GRAPH_RESET=1 with CUDA >= 12.0." + ) + if k < padded_k: + pad_count = padded_k - k + target_pos = torch.cat([target_pos, target_pos[:1].expand(pad_count, -1)], dim=0) + target_quat = torch.cat([target_quat, target_quat[:1].expand(pad_count, -1)], dim=0) + if link_goals is not None: + link_goals = { + ln: torch.cat([poses, poses[:1].expand(pad_count, -1)], dim=0) for ln, poses in link_goals.items() + } if current_qd is None: current_qd = torch.zeros_like(current_q) - dof_needed = len(self.target_joint_names) + + # adjust the joint number if len(current_q) < dof_needed: - pad = torch.zeros(dof_needed - len(current_q), dtype=current_q.dtype, device=current_q.device) + pad = torch.zeros(dof_needed - len(current_q), dtype=current_q.dtype) current_q = torch.concatenate([current_q, pad], axis=0) current_qd = torch.concatenate([current_qd, torch.zeros_like(pad)], axis=0) elif len(current_q) > dof_needed: current_q = current_q[:dof_needed] current_qd = current_qd[:dof_needed] + joint_limits = self.motion_gen.kinematics.get_joint_limits() + current_q = torch.clamp( + self._to_curobo_device(current_q), joint_limits.position[0], joint_limits.position[1] + ).to(current_q.device) + + # build the batched target pose goal = Pose( position=self._to_curobo_device(target_pos), quaternion=self._to_curobo_device(convert_quat(target_quat, to="wxyz")), # xyzw → wxyz ) + # build the batched current state (same start state replicated across padded_k seeds) start_state = JointState( position=self._to_curobo_device(current_q).view(1, -1), velocity=self._to_curobo_device(current_qd).view(1, -1) * 0.0, acceleration=self._to_curobo_device(current_qd).view(1, -1) * 0.0, jerk=self._to_curobo_device(current_qd).view(1, -1) * 0.0, joint_names=self.target_joint_names, - ).repeat_seeds(int(target_pos.shape[0])) + ).repeat_seeds(padded_k) + start_state = start_state.get_ordered_joint_state(self.target_joint_names) + # Prepare batched link_poses for multi-arm robots link_poses = None if link_goals is not None: link_poses = { - ee_name: Pose( + link_name: Pose( position=self._to_curobo_device(poses[:, :3]), quaternion=self._to_curobo_device(convert_quat(poses[:, 3:], to="wxyz")), # xyzw → wxyz ) - for ee_name, poses in link_goals.items() + for link_name, poses in link_goals.items() } - # plan_batch does not support retime_trajectory (batch result); disable time_dilation_factor - batch_plan_config = self.plan_config.clone() - batch_plan_config.time_dilation_factor = None - return self.motion_gen.plan_batch(start_state, goal, batch_plan_config, link_poses=link_poses) - def solve_ik_batch( - self, - target_pos: torch.Tensor, - target_quat: torch.Tensor, - link_goals: dict[str, torch.Tensor] | None = None, - ): - """ - Solve IK for a batch of target poses without trajectory optimization. + # Build per-call plan config. Always clone so we can safely disable time_dilation_factor + # (plan_batch does not support retime_trajectory) without mutating self.plan_config. + active_plan_config = self.plan_config.clone() + active_plan_config.time_dilation_factor = None + if self.cfg.reach_partial_pose_weight is not None: + weights = torch.tensor( + self.cfg.reach_partial_pose_weight, + device=self.tensor_args.device, + dtype=self.tensor_args.dtype, + ) + active_plan_config.pose_cost_metric = PoseCostMetric(reach_partial_pose=True, reach_vec_weight=weights) + self._logger.debug(f"reach_partial_pose_weight applied: {self.cfg.reach_partial_pose_weight}") - Faster than plan_motion_batch for reachability checking since it skips - trajectory optimization entirely. + # execute batched planning + result = self.motion_gen.plan_batch(start_state, goal, active_plan_config, link_poses=link_poses) - Args: - target_pos: Tensor of shape [K, 3], in robot root frame. - target_quat: Tensor of shape [K, 4] in [qx, qy, qz, qw], in robot root frame. - link_goals: Optional dict mapping extra link names to tensors of shape [K, 7] - ([x, y, z, qx, qy, qz, qw], robot root frame) for multi-arm robots. + # Only consider the first k results (the rest are padding duplicates) + success_mask = result.success.view(-1)[:k].bool() + num_success = int(success_mask.sum().item()) + if num_success == 0: + self._logger.warning(f"batch plan: all {k} candidates failed: {result.status}") + return None - Returns: - IKResult from cuRobo. Check result.success[k], result.position_error[k], - result.rotation_error[k] for each batch index. - """ + pos_err = result.position_error.view(-1)[:k] + rot_err = result.rotation_error.view(-1)[:k] + score = (pos_err + rot_err).masked_fill(~success_mask.to(pos_err.device), float("inf")) + best_idx = int(torch.argmin(score).item()) - # Refine collision world before planning - self._refine_curobo_world_collision() - - if target_pos.ndim != 2 or target_pos.shape[-1] != 3: - raise ValueError(f"target_pos must have shape [K, 3], got {tuple(target_pos.shape)}") - if target_quat.ndim != 2 or target_quat.shape[-1] != 4: - raise ValueError(f"target_quat must have shape [K, 4], got {tuple(target_quat.shape)}") - k = target_pos.shape[0] - if link_goals is not None: - for ee_name, poses in link_goals.items(): - if poses.ndim != 2 or poses.shape != (k, 7): - raise ValueError(f"link_goals['{ee_name}'] must have shape [{k}, 7], got {tuple(poses.shape)}") + paths = result.get_paths() # each path is already trimmed by path_buffer_last_tstep + motion_plan = paths[best_idx].get_ordered_joint_state(self.target_joint_names) - goal = Pose( - position=self._to_curobo_device(target_pos), - quaternion=self._to_curobo_device(convert_quat(target_quat, to="wxyz")), # xyzw → wxyz + self._logger.debug( + f"batch plan: {num_success}/{k} succeeded, picked idx={best_idx}, " + f"pos_err={float(pos_err[best_idx]):.4f}, rot_err={float(rot_err[best_idx]):.4f}, " + f"waypoints={len(motion_plan.position)}" ) - link_poses = None - if link_goals is not None: - link_poses = { - ee_name: Pose( - position=self._to_curobo_device(poses[:, :3]), - quaternion=self._to_curobo_device(convert_quat(poses[:, 3:], to="wxyz")), # xyzw → wxyz - ) - for ee_name, poses in link_goals.items() - } - return self.motion_gen.ik_solver.solve_batch(goal, link_poses=link_poses) + return motion_plan def plan_to_joint_config( self, diff --git a/source/autosim/autosim/capabilities/motion_planning/curobo/curobo_planner_cfg.py b/source/autosim/autosim/capabilities/motion_planning/curobo/curobo_planner_cfg.py index d5bfd7b..34c79ee 100644 --- a/source/autosim/autosim/capabilities/motion_planning/curobo/curobo_planner_cfg.py +++ b/source/autosim/autosim/capabilities/motion_planning/curobo/curobo_planner_cfg.py @@ -78,5 +78,11 @@ class CuroboPlannerCfg: cuda_device: int | None = 0 """Preferred CUDA device index; None uses torch.cuda.current_device() (respects CUDA_VISIBLE_DEVICES).""" + # Batch planning + max_batch_size: int = 10 + """Maximum batch size for plan_motion. Used to warmup CUDA graph with the correct buffer size. + Must be >= the largest number of reach candidates (K) used at runtime. If runtime K exceeds + this value, cuRobo will need to reset CUDA graph (requires CUDA >= 12.0 or CUROBO_TORCH_CUDA_GRAPH_RESET=1).""" + env_scene_prefix: str | None = "Scene" """Prefix of the environment scene.""" diff --git a/source/autosim/autosim/core/types.py b/source/autosim/autosim/core/types.py index 2b5ee98..bed5f1c 100644 --- a/source/autosim/autosim/core/types.py +++ b/source/autosim/autosim/core/types.py @@ -43,9 +43,14 @@ class SkillGoal: target_object: str | None = None """The target object of the skill.""" target_pose: torch.Tensor | None = None - """The target pose of the skill.""" + """The target pose(s) of the skill in the robot root frame. + + Shape is ``[K, 7]`` (xyz + xyzw quaternion). Single-target skills use ``K=1``; + batch-planning skills (e.g. :class:`ReachSkill`) use ``K>1`` to let the planner pick + the best successful candidate. + """ extra_target_poses: dict[str, torch.Tensor] | None = None - """The target poses of the extra end-effectors. dict[link_name, target_pose].""" + """The target poses of the extra end-effectors. ``dict[link_name, [K, 7]]`` same ``K`` as ``target_pose``.""" info: dict[str, Any] = field(default_factory=dict) """The information of the skill goal.""" diff --git a/source/autosim/autosim/skills/reach.py b/source/autosim/autosim/skills/reach.py index 872a52f..b11a0ec 100644 --- a/source/autosim/autosim/skills/reach.py +++ b/source/autosim/autosim/skills/reach.py @@ -56,7 +56,7 @@ def __init__(self, extra_cfg: ReachSkillExtraCfg) -> None: self._corrective_reach_done = False self._saved_env = None self._saved_target_object = None - self._saved_reach_offset = None + self._saved_reach_offsets = None self._saved_env_extra_info = None def _get_current_primary_and_extra_link_poses( @@ -223,7 +223,7 @@ def _compute_goal_from_offset( self, env: ManagerBasedEnv, target_object: str, - reach_offset: torch.Tensor, + reach_offsets: torch.Tensor, env_extra_info: EnvExtraInfo, ) -> SkillGoal: """Compute reach goal by transforming object-frame offsets into robot root frame. @@ -231,140 +231,126 @@ def _compute_goal_from_offset( Args: env: The Isaac Lab environment. target_object: Name of the target object in the scene. - reach_offset: [7] tensor (pos + quat) in object frame for the primary EE. + reach_offsets: [K, 7] tensor of K candidate offsets (pos + quat) in object frame. env_extra_info: Env info for cfg-driven extra target. Returns: - SkillGoal with target poses in robot root frame. + SkillGoal whose target_pose has shape [K, 7] in robot root frame, ready for batch planning. """ - object_pose_in_env = as_torch(env.scene[target_object].data.root_pose_w) + reach_offsets = reach_offsets.to(env.device) + k = int(reach_offsets.shape[0]) - object_pos_in_env = object_pose_in_env[:, :3] - object_quat_in_env = object_pose_in_env[:, 3:] + object_pose_in_env = as_torch(env.scene[target_object].data.root_pose_w) + object_pos_in_env = object_pose_in_env[:, :3].expand(k, -1) # [K, 3] + object_quat_in_env = object_pose_in_env[:, 3:].expand(k, -1) # [K, 4] - offset = reach_offset.to(env.device).unsqueeze(0) reach_target_pos_in_env, reach_target_quat_in_env = PoseUtils.combine_frame_transforms( - object_pos_in_env, object_quat_in_env, offset[:, :3], offset[:, 3:] + object_pos_in_env, object_quat_in_env, reach_offsets[:, :3], reach_offsets[:, 3:] + ) + self._logger.debug(f"Reach target positions in environment ({k} candidates): {reach_target_pos_in_env}") + self._logger.debug(f"Reach target quaternions in environment ({k} candidates): {reach_target_quat_in_env}") + # Debug marker only renders one pose; use the first candidate for visualization. + self._target_poses["target_pose"] = torch.cat( + (reach_target_pos_in_env[:1], reach_target_quat_in_env[:1]), dim=-1 ) - self._logger.debug(f"Reach target position in environment: {reach_target_pos_in_env}") - self._logger.debug(f"Reach target quaternion in environment: {reach_target_quat_in_env}") - self._target_poses["target_pose"] = torch.cat((reach_target_pos_in_env, reach_target_quat_in_env), dim=-1) self.visualize_debug_target_pose() robot = env.scene[env_extra_info.robot_name] - robot_root_pos_in_env = as_torch(robot.data.root_pose_w)[:, :3] - robot_root_quat_in_env = as_torch(robot.data.root_pose_w)[:, 3:] + robot_root_pose_in_env = as_torch(robot.data.root_pose_w) + robot_root_pos_in_env = robot_root_pose_in_env[:, :3].expand(k, -1) # [K, 3] + robot_root_quat_in_env = robot_root_pose_in_env[:, 3:].expand(k, -1) # [K, 4] reach_target_pos_in_root, reach_target_quat_in_root = PoseUtils.subtract_frame_transforms( robot_root_pos_in_env, robot_root_quat_in_env, reach_target_pos_in_env, reach_target_quat_in_env ) - target_pose = torch.cat((reach_target_pos_in_root, reach_target_quat_in_root), dim=-1).squeeze(0) + target_pose = torch.cat((reach_target_pos_in_root, reach_target_quat_in_root), dim=-1) # [K, 7] self._logger.debug( - f"Reach target pose in robot root frame: {reach_target_pos_in_root}, {reach_target_quat_in_root}" + f"Reach target poses in robot root frame ({k} candidates): " + f"pos={reach_target_pos_in_root}, quat={reach_target_quat_in_root}" ) activate_q, _ = self._build_activate_joint_state( robot.data.joint_names, as_torch(robot.data.joint_pos)[0], as_torch(robot.data.joint_vel)[0] ) - extra_target_poses = self._build_extra_target_poses(activate_q, target_pose, env_extra_info) + extra_target_poses = self._build_extra_target_poses_batch(activate_q, target_pose, env_extra_info) return SkillGoal(target_object=target_object, target_pose=target_pose, extra_target_poses=extra_target_poses) - def _select_best_candidate( + def _build_extra_target_poses_batch( self, - env: ManagerBasedEnv, - target_object: str, - candidates: list[torch.Tensor], + activate_q: torch.Tensor, + target_poses: torch.Tensor, env_extra_info: EnvExtraInfo, - ) -> torch.Tensor: - """Select the closest reach candidate in the target object's frame. - - The current end-effector pose is transformed from world frame into the target - object's frame and compared against all candidate poses stored in object frame. - Selection is based on a weighted score consisting of position error plus - orientation error. + ) -> dict[str, torch.Tensor] | None: + """Build batched extra link goals by invoking the per-target builder for each candidate. Args: - env: The Isaac Lab environment. - target_object: Name of the target object in the scene. - candidates: List of K ``[7]`` tensors ``(pos + quat)`` in object frame. - env_extra_info: Environment extra information. + activate_q: Active joint positions for FK / offset computation. + target_poses: [K, 7] candidate primary target poses in robot root frame. + env_extra_info: Env info forwarded to per-target builder (used by ``keep_initial_relative_offset`` + mode to share its cache across candidates). Returns: - The selected ``[7]`` offset tensor in object frame. - - Raises: - ValueError: If candidates is empty. + Dict mapping link name to [K, 7] stacked target pose tensor, or None when there are no + extra target links configured. """ - if not candidates: - raise ValueError(f"No reach candidates provided for object '{target_object}'.") - - poses_oe = torch.stack([c.to(env.device) for c in candidates], dim=0) # [K, 7] + if not self.cfg.extra_cfg.extra_target_link_names: + return None - robot = env.scene[env_extra_info.robot_name] - ee_link_idx = robot.data.body_names.index(env_extra_info.ee_link_name) - ee_pose_w = as_torch(robot.data.body_link_pose_w)[0, ee_link_idx] # [7] - obj_pose_w = as_torch(env.scene[target_object].data.root_pose_w)[0] # [7] - - ee_pos_oe, ee_quat_oe = PoseUtils.subtract_frame_transforms( - obj_pose_w[:3].unsqueeze(0), - obj_pose_w[3:].unsqueeze(0), - ee_pose_w[:3].unsqueeze(0), - ee_pose_w[3:].unsqueeze(0), - ) - ee_pos_oe = ee_pos_oe.squeeze(0) # [3] - ee_quat_oe = ee_quat_oe.squeeze(0) # [4] + per_k = [ + self._build_extra_target_poses(activate_q, target_poses[i], env_extra_info) + for i in range(target_poses.shape[0]) + ] + if per_k[0] is None: + return None - pos_err = torch.linalg.norm(poses_oe[:, :3] - ee_pos_oe.unsqueeze(0), dim=-1) - quat_dot = torch.sum(poses_oe[:, 3:] * ee_quat_oe.unsqueeze(0), dim=-1).abs().clamp(max=1.0) - rot_err = 1.0 * torch.acos(quat_dot) - score = pos_err + 1.0 * rot_err - - best_idx = int(torch.argmin(score).item()) - self._logger.debug( - f"Selected candidate {best_idx}/{len(candidates)} for '{target_object}' " - f"(position_error={float(pos_err[best_idx]):.4f}, rotation_error={float(rot_err[best_idx]):.4f})" - ) - return candidates[best_idx] + return {link_name: torch.stack([d[link_name] for d in per_k], dim=0) for link_name in per_k[0].keys()} def _compute_corrective_goal(self) -> SkillGoal | None: """Re-compute reach goal using the object's current actual pose. - This is called after the first trajectory finishes. The same relative offset (in object - frame) is re-applied to the object's current pose, so if the object was nudged during - approach the robot corrects for it. + This is called after the first trajectory finishes. The same K candidate offsets + (in object frame) are re-applied to the object's current pose, so if the object was + nudged during approach the robot corrects for it by batch-replanning over all candidates. """ goal = self._compute_goal_from_offset( self._saved_env, self._saved_target_object, - self._saved_reach_offset, + self._saved_reach_offsets, self._saved_env_extra_info, ) if goal is not None: - self._logger.info("corrective_reach: recomputed target from current object pose") + self._logger.info("corrective_reach: recomputed targets from current object pose") return goal def extract_goal_from_info( self, skill_info: SkillInfo, env: ManagerBasedEnv, env_extra_info: EnvExtraInfo ) -> SkillGoal: - """Return the target pose[x, y, z, qx, qy, qz, qw] in the robot root frame. + """Return K candidate target poses [K, 7] (xyz + xyzw quat) in the robot root frame. + + All candidate reach offsets are forwarded to cuRobo batch planning; the planner + picks the trajectory of the successful candidate with minimum + ``position_error + rotation_error``. + IMPORTANT: the robot root frame is not the same as the robot base frame. """ target_object = skill_info.target_object candidates = env_extra_info.get_reach_target_poses(target_object) - reach_offset = self._select_best_candidate(env, target_object, candidates, env_extra_info).to(env.device) + if not candidates: + raise ValueError(f"No reach candidates provided for object '{target_object}'.") + reach_offsets = torch.stack([c.to(env.device) for c in candidates], dim=0) # [K, 7] # Save state needed for corrective reach re-planning self._saved_env = env self._saved_target_object = target_object - self._saved_reach_offset = reach_offset + self._saved_reach_offsets = reach_offsets self._saved_env_extra_info = env_extra_info - return self._compute_goal_from_offset(env, target_object, reach_offset, env_extra_info) + return self._compute_goal_from_offset(env, target_object, reach_offsets, env_extra_info) def execute_plan(self, state: WorldState, goal: SkillGoal) -> bool: """Execute the plan of the reach skill.""" @@ -374,8 +360,8 @@ def execute_plan(self, state: WorldState, goal: SkillGoal) -> bool: # Set current target object for selective collision checking self._planner.set_target_object(goal.target_object) - target_pose = goal.target_pose # target pose in the robot root frame - target_pos, target_quat = target_pose[:3], target_pose[3:] + target_pose = goal.target_pose # [K, 7] in the robot root frame + target_pos, target_quat = target_pose[:, :3], target_pose[:, 3:] activate_q, activate_qd = self._build_activate_joint_state( state.sim_joint_names, state.robot_joint_pos, state.robot_joint_vel @@ -467,6 +453,6 @@ def reset(self) -> None: self._corrective_reach_done = False self._saved_env = None self._saved_target_object = None - self._saved_reach_offset = None + self._saved_reach_offsets = None self._saved_env_extra_info = None self._planner.set_target_object(None) diff --git a/source/autosim/autosim/skills/relative_reach.py b/source/autosim/autosim/skills/relative_reach.py index 9bc3b77..1e2f91e 100644 --- a/source/autosim/autosim/skills/relative_reach.py +++ b/source/autosim/autosim/skills/relative_reach.py @@ -150,8 +150,8 @@ def execute_plan(self, state: WorldState, goal: SkillGoal) -> bool: self._target_poses["target_pose"] = torch.cat((reach_target_pos_in_env, reach_target_quat_in_env), dim=-1) self._trajectory = self._planner.plan_motion( - target_pos, - target_quat, + target_pos.unsqueeze(0), + target_quat.unsqueeze(0), activate_q, activate_qd, ) diff --git a/source/autosim/autosim/skills/rotate.py b/source/autosim/autosim/skills/rotate.py index 3bcb7bb..1452a44 100644 --- a/source/autosim/autosim/skills/rotate.py +++ b/source/autosim/autosim/skills/rotate.py @@ -175,7 +175,7 @@ def execute_plan(self, state: WorldState, goal: SkillGoal) -> bool: f"axis_in_root={axis_in_root}, step_angle={step_angle:.4f}" ) - traj = self._planner.plan_motion(target_pos, target_quat, current_q, current_qd) + traj = self._planner.plan_motion(target_pos.unsqueeze(0), target_quat.unsqueeze(0), current_q, current_qd) if traj is None: self._logger.warning(f"Rotate planning failed at step {i + 1}/{steps}") if not trajectories: