diff --git a/data/.lfs/go2_dds_china_office.mcap.tar.gz b/data/.lfs/go2_dds_china_office.mcap.tar.gz new file mode 100644 index 0000000000..6b56d5c7c6 --- /dev/null +++ b/data/.lfs/go2_dds_china_office.mcap.tar.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:0e86656ce07ae466a84b85e0d21982faea58708324f47b2dd2728368e8c8680c +size 426702644 diff --git a/data/.lfs/go2_dds_stairs.mcap.tar.gz b/data/.lfs/go2_dds_stairs.mcap.tar.gz new file mode 100644 index 0000000000..86dcafc650 --- /dev/null +++ b/data/.lfs/go2_dds_stairs.mcap.tar.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:8ad28ba489eb66eb42b6a636e6060d448f15b3807259472dd3e6ca1d0fc2c1c1 +size 344230938 diff --git a/dimos/mapping/utils/cli/dataset_validation.md b/dimos/mapping/utils/cli/dataset_validation.md index 5f32853ddf..9e4873ed91 100644 --- a/dimos/mapping/utils/cli/dataset_validation.md +++ b/dimos/mapping/utils/cli/dataset_validation.md @@ -1,7 +1,7 @@ Dataset Validation ```sh -dimos map summary recording_go2_mid360_2026-05-29_4-45pm-PST.db +dimos mem summary recording_go2_mid360_2026-05-29_4-45pm-PST.db Stream("color_image"): 11141 items, 2026-05-29 23:32:57 — 2026-05-29 23:45:57 (780.1s) Stream("fastlio_lidar"): 7240 items, 2026-05-29 23:32:56 — 2026-05-29 23:45:57 (781.7s) diff --git a/dimos/mapping/utils/cli/map.py b/dimos/mapping/utils/cli/map.py index a9af81ea08..190fdacee3 100644 --- a/dimos/mapping/utils/cli/map.py +++ b/dimos/mapping/utils/cli/map.py @@ -346,7 +346,7 @@ def main( ) -> None: """Rebuild a voxel map from a recorded SQLite dataset, write a .rrd, and open it in rerun.""" from dimos.mapping.loop_closure.pgo import PGO - from dimos.memory2.store.sqlite import SqliteStore + from dimos.memory2.cli.dataset import open_store, resolve_dataset from dimos.memory2.transform import QualityWindow, SpeedLimit from dimos.memory2.utils.progress import progress from dimos.msgs.sensor_msgs.CameraInfo import CameraInfo @@ -354,16 +354,15 @@ def main( from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2 from dimos.perception.fiducial.marker_transformer import DetectMarkers from dimos.robot.unitree.go2.connection import BASE_TO_OPTICAL, _camera_info_static - from dimos.utils.data import resolve_named_path from dimos.visualization.rerun.init import rerun_init - db_path = resolve_named_path(dataset, ".db") + db_path = resolve_dataset(dataset) + store = open_store(db_path) if out is None: out = Path.cwd() / f"{db_path.stem}.rrd" if export or full_pgo: pgo = True - store = SqliteStore(path=db_path) lidar = store.stream(lidar_stream, PointCloud2).from_time(seek or None).to_time(duration) print(lidar.summary()) diff --git a/dimos/mapping/utils/cli/replay.py b/dimos/mapping/utils/cli/replay.py index c78704f725..9897f6de79 100644 --- a/dimos/mapping/utils/cli/replay.py +++ b/dimos/mapping/utils/cli/replay.py @@ -206,50 +206,48 @@ def main( ), ) -> None: """Dump a recording to .rrd (lidar clouds + camera frames) and open it in rerun.""" - from dimos.mapping.utils.cli.summary import _stream_payload_types from dimos.mapping.voxels import VoxelMapTransformer - from dimos.memory2.store.sqlite import SqliteStore + from dimos.memory2.cli.dataset import open_store, resolve_dataset, stream_payload_types from dimos.memory2.transform import throttle from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped from dimos.msgs.nav_msgs.Odometry import Odometry from dimos.msgs.sensor_msgs.Image import Image from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2, register_colormap_annotation from dimos.robot.unitree.go2.connection import _camera_info_static - from dimos.utils.data import resolve_named_path - db_path = resolve_named_path(dataset, ".db") + src_path = resolve_dataset(dataset) + store = open_store(src_path) if out is None: - out = Path.cwd() / f"{db_path.stem}.rrd" + out = Path.cwd() / f"{src_path.stem}.rrd" cam_info = _camera_info_static() - # Resolve which streams to voxelize: all PointCloud2 streams, or the - # explicit --map-source subset. Validate up front so typos fail fast. - pc_streams = [n for n, t in _stream_payload_types(db_path).items() if t is PointCloud2] - map_sources = list(map_source) or pc_streams - if (map or map_final) and (bad := [s for s in map_sources if s not in pc_streams]): - raise typer.BadParameter(f"--map-source: not PointCloud2 stream(s): {', '.join(bad)}") - - rr.init("dimos map_rrd", recording_id=db_path.stem) - rr.save(str(out)) - register_colormap_annotation("turbo") - - # Static pinhole on the camera entity; per-frame Transform3D goes on the - # same entity. Image is the child so it projects through the pinhole. - pinhole = cam_info.to_rerun() - assert not isinstance(pinhole, list) - rr.log("world/camera", pinhole, static=True) - - # Static axis triads as children of each moving Transform3D, so the - # transforms are actually visible in the 3D view. - axes = rr.Arrows3D( - vectors=[[0.3, 0, 0], [0, 0.3, 0], [0, 0, 0.3]], - colors=[[255, 0, 0], [0, 255, 0], [0, 0, 255]], - ) - rr.log("world/fastlio/axes", axes, static=True) - rr.log("world/odom/axes", axes, static=True) - - store = SqliteStore(path=str(db_path)) with store: + # Resolve which streams to voxelize: all PointCloud2 streams, or the + # explicit --map-source subset. Validate up front so typos fail fast. + pc_streams = [n for n, t in stream_payload_types(store).items() if t is PointCloud2] + map_sources = list(map_source) or pc_streams + if (map or map_final) and (bad := [s for s in map_sources if s not in pc_streams]): + raise typer.BadParameter(f"--map-source: not PointCloud2 stream(s): {', '.join(bad)}") + + rr.init("dimos map_rrd", recording_id=src_path.stem) + rr.save(str(out)) + register_colormap_annotation("turbo") + + # Static pinhole on the camera entity; per-frame Transform3D goes on the + # same entity. Image is the child so it projects through the pinhole. + pinhole = cam_info.to_rerun() + assert not isinstance(pinhole, list) + rr.log("world/camera", pinhole, static=True) + + # Static axis triads as children of each moving Transform3D, so the + # transforms are actually visible in the 3D view. + axes = rr.Arrows3D( + vectors=[[0.3, 0, 0], [0, 0.3, 0], [0, 0, 0.3]], + colors=[[255, 0, 0], [0, 255, 0], [0, 0, 255]], + ) + rr.log("world/fastlio/axes", axes, static=True) + rr.log("world/odom/axes", axes, static=True) + print(store.summary()) def clipped(name: str, ptype: type[Any]) -> Stream[Any]: diff --git a/dimos/mapping/utils/cli/replay_marker.py b/dimos/mapping/utils/cli/replay_marker.py index 4d04815051..4afdf3a1e6 100644 --- a/dimos/mapping/utils/cli/replay_marker.py +++ b/dimos/mapping/utils/cli/replay_marker.py @@ -66,16 +66,16 @@ def main( ), ) -> None: """Dump an AprilTag detection replay to .rrd and open it in rerun.""" - from dimos.memory2.store.sqlite import SqliteStore + from dimos.memory2.cli.dataset import open_store, resolve_dataset from dimos.memory2.transform import QualityWindow, SpeedLimit from dimos.memory2.vis.color import Color from dimos.msgs.sensor_msgs.Image import Image from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2 from dimos.perception.fiducial.marker_transformer import DetectMarkers from dimos.robot.unitree.go2.connection import _camera_info_static - from dimos.utils.data import resolve_named_path - db_path = resolve_named_path(dataset, ".db") + db_path = resolve_dataset(dataset) + store = open_store(db_path) if out is None: out = Path.cwd() / f"{db_path.stem}.rrd" cam_info = _camera_info_static() @@ -89,7 +89,6 @@ def main( assert not isinstance(pinhole, list) rr.log("world/camera", pinhole, static=True) - store = SqliteStore(path=str(db_path)) with store: color_image = store.stream("color_image", Image).from_time(seek or None).to_time(duration) lidar = store.stream("lidar", PointCloud2).from_time(seek or None).to_time(duration) diff --git a/dimos/mapping/utils/cli/summary.py b/dimos/mapping/utils/cli/summary.py deleted file mode 100644 index 694565b6e8..0000000000 --- a/dimos/mapping/utils/cli/summary.py +++ /dev/null @@ -1,59 +0,0 @@ -# Copyright 2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Print ``Store.summary()`` for a memory2 sqlite recording. - -Usage: - uv run dimos map summary mid360 -""" - -from __future__ import annotations - -import json -from pathlib import Path -import sqlite3 - -import typer - -from dimos.memory2.codecs.base import _resolve_payload_type -from dimos.memory2.store.sqlite import SqliteStore -from dimos.utils.data import resolve_named_path - - -def _stream_payload_types(db_path: Path) -> dict[str, type]: - """Read each stream's registered payload type from the _streams table.""" - conn = sqlite3.connect(str(db_path)) - try: - rows = conn.execute("SELECT name, config FROM _streams").fetchall() - finally: - conn.close() - return {name: _resolve_payload_type(json.loads(cfg)["payload_module"]) for name, cfg in rows} - - -def main( - dataset: str = typer.Argument(..., help="Dataset .db: bare name (cwd or data/) or path"), -) -> None: - """Print per-stream counts and time ranges for a recorded SQLite dataset.""" - db_path = resolve_named_path(dataset, ".db") - payload_types = _stream_payload_types(db_path) - - store = SqliteStore(path=str(db_path)) - with store: - for name, ptype in payload_types.items(): - store.stream(name, ptype) - print(store.summary()) - - -if __name__ == "__main__": - typer.run(main) diff --git a/dimos/mapping/utils/cli/test_cli.py b/dimos/mapping/utils/cli/test_cli.py index cb9ece1f3c..c5656e462f 100644 --- a/dimos/mapping/utils/cli/test_cli.py +++ b/dimos/mapping/utils/cli/test_cli.py @@ -103,10 +103,13 @@ def dataset() -> str: def test_summary(dataset: str) -> None: - res = _run("summary", dataset) - assert res.returncode == 0, res.stderr - assert "lidar" in res.stdout - assert "odom" in res.stdout + # `summary` is generic and lives under `dimos mem` (not `dimos map`). + from dimos.robot.cli.dimos import main as cli_app + + res = _runner.invoke(cli_app, ["mem", "summary", dataset]) + assert res.exit_code == 0, res.output + assert "lidar" in res.output + assert "odom" in res.output @requires_turbojpeg diff --git a/dimos/mapping/voxels.py b/dimos/mapping/voxels.py index 11acb38100..0e7cc978d6 100644 --- a/dimos/mapping/voxels.py +++ b/dimos/mapping/voxels.py @@ -14,6 +14,7 @@ from __future__ import annotations +from collections import deque import time from typing import TYPE_CHECKING, Any @@ -43,6 +44,11 @@ class VoxelGrid: No Module/framework dependency. Can be used standalone or wrapped by VoxelGridMapper (Module) or VoxelMapTransformer (memory2 Transformer). + + ``time_window`` controls how long voxels live (by frame timestamp): + ``-1`` (default) keeps every frame forever (a growing global map); ``0`` + keeps only the current frame; ``N`` keeps the last ``N`` seconds (a rolling + window that tracks the local surface). Windowing overrides ``carve_columns``. """ def __init__( @@ -53,10 +59,13 @@ def __init__( carve_columns: bool = True, frame_id: str = "world", show_startup_log: bool = True, + time_window: float = -1.0, ) -> None: self._voxel_size = voxel_size self._carve_columns = carve_columns self._frame_id = frame_id + self._time_window = time_window + self._frames: deque[tuple[float, o3c.Tensor]] = deque() # (ts, voxel keys) for windowing dev = ( o3c.Device(device) @@ -101,7 +110,9 @@ def add_frame(self, frame: PointCloud2) -> None: vox = (pts / self._voxel_size).floor().to(self._key_dtype) keys_Nx3 = vox.contiguous() - if self._carve_columns: + if self._time_window >= 0: + self._add_windowed(keys_Nx3) + elif self._carve_columns: self._carve_and_insert(keys_Nx3) else: self._voxel_hashmap.activate(keys_Nx3) @@ -117,6 +128,24 @@ def add_frame(self, frame: PointCloud2) -> None: if str(self._dev).startswith("CUDA"): o3c.cuda.release_cache() + def _add_windowed(self, new_keys: o3c.Tensor) -> None: + """Keep only voxels from frames within ``time_window`` s of the latest frame. + + Buffers each frame's keys, drops frames that have aged out, then rebuilds + the active set as the union of the survivors (``activate`` dedups). Always + keeps at least the current frame, so ``time_window == 0`` is single-frame. + """ + ts = self._latest_frame_ts + self._frames.append((ts, new_keys)) + while len(self._frames) > 1 and ts - self._frames[0][0] > self._time_window: + self._frames.popleft() + + active = self._voxel_hashmap.active_buf_indices() + if active.shape[0] > 0: + self._voxel_hashmap.erase(self._voxel_hashmap.key_tensor()[active].contiguous()) + for _, keys in self._frames: + self._voxel_hashmap.activate(keys) + def _carve_and_insert(self, new_keys: o3c.Tensor) -> None: """Column carving: remove all existing voxels sharing (X,Y) with new_keys, then insert.""" if new_keys.shape[0] == 0: @@ -251,6 +280,7 @@ class VoxelGridMapperConfig(ModuleConfig): carve_columns: bool = True frame_id: str = "world" emit_every: int = 1 + time_window: float = -1.0 class VoxelGridMapper(StreamModule[PointCloud2, PointCloud2]): diff --git a/dimos/memory2/cli/app.py b/dimos/memory2/cli/app.py index 2424c600df..b39c4b9426 100644 --- a/dimos/memory2/cli/app.py +++ b/dimos/memory2/cli/app.py @@ -27,8 +27,32 @@ def rerun( out: str = typer.Option(None, "--out", help="Output .rrd (default: alongside the source)"), seconds: float = typer.Option(None, "--seconds", help="Only the first N seconds"), no_gui: bool = typer.Option(False, "--no-gui", help="Write the .rrd but don't open the viewer"), + root: str = typer.Option( + None, "--root", help="Nest every stream under this entity path (/)" + ), ) -> None: """Render a memory2 store into rerun (writes a .rrd, then opens the viewer).""" - from dimos.memory2.cli.render import open_store, render_store + from dimos.memory2.cli.dataset import open_dataset + from dimos.memory2.cli.render import render_store - render_store(open_store(path), out=out, seconds=seconds, no_gui=no_gui) + render_store(open_dataset(path), out=out, seconds=seconds, no_gui=no_gui, root=root) + + +@mem_app.command() +def summary( + dataset: str = typer.Argument(..., help="Dataset .db/.mcap: bare name (cwd or data/) or path"), +) -> None: + """Print per-stream counts and time ranges for a recorded dataset.""" + from dimos.memory2.cli.dataset import open_dataset + + store = open_dataset(dataset) + with store: + for name in store.list_streams(): + store.streams[name] # register so summary() includes it + print(store.summary()) + + # mcap files may carry channels we have no codec for (e.g. h264 video); + # list them so the inventory is complete rather than silently filtered. + uncodec = getattr(store, "uncodec_channels", None) + for topic, (count, schema) in sorted(uncodec().items()) if uncodec else []: + print(f" (no codec) {topic}: {count} msgs [{schema or '?'}]") diff --git a/dimos/memory2/cli/dataset.py b/dimos/memory2/cli/dataset.py new file mode 100644 index 0000000000..0b53ebf16f --- /dev/null +++ b/dimos/memory2/cli/dataset.py @@ -0,0 +1,64 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Central dispatcher for opening a recorded memory2 dataset as a read store. + +One entry point for every CLI that opens a recording. :func:`open_dataset` +resolves a dataset name/path (bare names look up the cwd / repo ``data/`` dir) +and picks the store by file extension: ``.db`` -> SqliteStore, ``.mcap`` -> +Go2McapStore. Use :func:`open_store` when the path is already resolved. +""" + +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING, Any, cast + +from dimos.utils.data import resolve_named_path + +if TYPE_CHECKING: + from dimos.memory2.backend import Backend + from dimos.memory2.store.base import Store + + +def open_store(path: str | Path) -> Store: + """Open an already-resolved dataset *path*, dispatching on its extension.""" + s = str(path) + if s.endswith(".mcap"): + from dimos.robot.unitree.go2.dds.store import Go2McapStore # lazy: robot-layer codecs + + return Go2McapStore(path=s) + if s.endswith(".db"): + from dimos.memory2.store.sqlite import SqliteStore + + return SqliteStore(path=s, must_exist=True) + raise ValueError(f"unsupported dataset {s!r}: expected a .db or .mcap path") + + +def resolve_dataset(dataset: str | Path) -> Path: + """Resolve a dataset name/path to a file (bare names -> ``.db``, cwd / data/).""" + return resolve_named_path(dataset, Path(dataset).suffix or ".db") + + +def open_dataset(dataset: str | Path) -> Store: + """Resolve a dataset name/path (bare names -> ``.db``) and open it read-only.""" + return open_store(resolve_dataset(dataset)) + + +def stream_payload_types(store: Store) -> dict[str, type]: + """Map each stream name in *store* to its payload type (any backend).""" + return { + name: cast("Backend[Any]", store.stream(name)._source).data_type + for name in store.list_streams() + } diff --git a/dimos/memory2/cli/render.py b/dimos/memory2/cli/render.py index 0869c7c45f..134e35d4af 100644 --- a/dimos/memory2/cli/render.py +++ b/dimos/memory2/cli/render.py @@ -32,17 +32,6 @@ from dimos.memory2.store.base import Store -def open_store(path: str) -> Store: - """Open a store by file type (``.db`` -> SqliteStore, else Go2 mcap).""" - if str(path).endswith(".db"): - from dimos.memory2.store.sqlite import SqliteStore - - return SqliteStore(path=path, must_exist=True) - from dimos.robot.unitree.go2.dds.store import Go2McapStore # lazy: robot-layer codec set - - return Go2McapStore(path=path) - - def _open_viewer(rrd: str) -> None: exe = shutil.which("rerun") if exe: @@ -58,11 +47,14 @@ def render_store( out: str | None = None, seconds: float | None = None, no_gui: bool = False, + root: str | None = None, ) -> str: """Render ``store`` to a ``.rrd`` and (unless ``no_gui``) open the rerun viewer. Logs every observation (full res); ``seconds`` bounds the time window from - the start. Returns the ``.rrd`` path. + the start. ``root`` nests every stream under that entity path + (``/``) — except a stream whose name matches ``root``'s last + segment, which stays at ```` itself. Returns the ``.rrd`` path. """ import rerun as rr @@ -73,6 +65,14 @@ def render_store( src = getattr(store.config, "path", None) or "store" out = str(Path(src).with_suffix(".rrd")) + base = root.strip("/") if root else "" + + def entity(name: str) -> str: + # /, but a stream named like root's last segment stays at . + if not base: + return name + return base if name == base.rsplit("/", 1)[-1] else f"{base}/{name}" + # Discover renderable streams (payload has a working to_rerun) + shared anchor. renderable = [] t0: float | None = None @@ -112,11 +112,12 @@ def render_store( continue rr.set_time("time", duration=obs.ts - t0) data = obs.data.to_rerun() + path = entity(name) if isinstance(data, list): # RerunMulti: [(subpath, archetype), ...] for sub, arch in data: - rr.log(f"{name}/{sub}", arch) + rr.log(f"{path}/{sub}", arch) else: - rr.log(name, data) + rr.log(path, data) report(obs) rr.rerun_shutdown() # flush + close the .rrd before opening it diff --git a/dimos/memory2/store/mcap.py b/dimos/memory2/store/mcap.py index 250d19efad..2b62fd8c42 100644 --- a/dimos/memory2/store/mcap.py +++ b/dimos/memory2/store/mcap.py @@ -149,17 +149,27 @@ def __init__( summary = make_reader(f).get_summary() self._stream_topic: dict[str, str] = {} # stream name -> topic self._available: dict[str, int] = {} # stream name -> message count + # Channels in the file we can't decode (no codec), kept for reporting so + # undecodable data isn't silently hidden — see :meth:`uncodec_channels`. + self._uncodec: dict[str, tuple[int, str | None]] = {} # topic -> (count, schema) if summary is not None and summary.statistics is not None: for cid, ch in summary.channels.items(): + count = summary.statistics.channel_message_counts.get(cid, 0) if ch.topic not in self._codecs: + sch = summary.schemas.get(ch.schema_id) + self._uncodec[ch.topic] = (count, sch.name if sch else None) continue name = name_of.get(ch.topic) or _slug(ch.topic) self._stream_topic[name] = ch.topic - self._available[name] = summary.statistics.channel_message_counts.get(cid, 0) + self._available[name] = count def list_streams(self) -> list[str]: return sorted(set(self._available) | set(self._streams)) + def uncodec_channels(self) -> dict[str, tuple[int, str | None]]: + """Topics present in the file with no registered codec: topic -> (count, schema).""" + return dict(self._uncodec) + def _create_backend( self, name: str, payload_type: type | None = None, **config: Any ) -> Backend[Any]: diff --git a/dimos/robot/cli/dimos.py b/dimos/robot/cli/dimos.py index 3f94a6be4e..bf780d2b7f 100644 --- a/dimos/robot/cli/dimos.py +++ b/dimos/robot/cli/dimos.py @@ -43,7 +43,6 @@ from dimos.mapping.utils.cli.rename import main as _map_rename_main from dimos.mapping.utils.cli.replay import main as _map_replay_main from dimos.mapping.utils.cli.replay_marker import main as _map_replay_marker_main -from dimos.mapping.utils.cli.summary import main as _map_summary_main from dimos.robot.unitree.go2.cli.go2tool import app as go2tool_app from dimos.utils.logging_config import setup_logger from dimos.visualization.rerun.constants import RerunOpenOption @@ -681,7 +680,6 @@ def send( map_app = typer.Typer(help="Voxel-map tools over recorded sqlite datasets") main.add_typer(map_app, name="map") map_app.command("global")(_map_main) -map_app.command("summary")(_map_summary_main) map_app.command("rename")(_map_rename_main) map_app.command("pose-fill")(_map_pose_fill_main) map_app.command("replay")(_map_replay_main) diff --git a/dimos/robot/unitree/go2/dds/codec.py b/dimos/robot/unitree/go2/dds/codec.py index 9dc3571b75..0f0262b508 100644 --- a/dimos/robot/unitree/go2/dds/codec.py +++ b/dimos/robot/unitree/go2/dds/codec.py @@ -36,7 +36,10 @@ from dimos.msgs.sensor_msgs.Imu import Imu from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2 from dimos.robot.unitree.go2.dds import cdr, ros +from dimos.robot.unitree.go2.dds.msgs.CompressedVideo import CompressedVideo from dimos.robot.unitree.go2.dds.msgs.ControlEvent import ControlEvent +from dimos.robot.unitree.go2.dds.msgs.HeightMap import HeightMap +from dimos.robot.unitree.go2.dds.msgs.LowCmd import LowCmd from dimos.robot.unitree.go2.dds.msgs.LowState import LowState from dimos.robot.unitree.go2.dds.msgs.SportModeState import SportModeState from dimos.robot.unitree.go2.dds.msgs.Telemetry import Telemetry @@ -107,10 +110,14 @@ def encode(self, msg: Any) -> bytes: # Go2 channel topic -> codec. The default registry (only platform we have today). GO2_CODECS: dict[str, DdsCodec] = { "rt/utlidar/cloud": FnCodec(PointCloud2, ros.decode_pointcloud2), + "rt/utlidar/cloud_deskewed": FnCodec(PointCloud2, ros.decode_pointcloud2), + "rt/utlidar/height_map_array": FnCodec(HeightMap, ros.decode_height_map), "rt/utlidar/imu": FnCodec(Imu, ros.decode_imu), "rt/utlidar/robot_odom": FnCodec(Odometry, ros.decode_odometry), "rt/frontvideo": FnCodec(Image, ros.decode_compressed_image), + "rt/frontvideo/h264": FnCodec(CompressedVideo, ros.decode_compressed_video), "rt/lowstate": CdrStructCodec(LowState), + "rt/lowcmd": CdrStructCodec(LowCmd), "rt/sportmodestate": CdrStructCodec(SportModeState), "telemetry": JsonCodec(Telemetry), "control_log": JsonCodec(ControlEvent), diff --git a/dimos/robot/unitree/go2/dds/msgs/BmsCmd.py b/dimos/robot/unitree/go2/dds/msgs/BmsCmd.py new file mode 100644 index 0000000000..d37e9fecb7 --- /dev/null +++ b/dimos/robot/unitree/go2/dds/msgs/BmsCmd.py @@ -0,0 +1,32 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""unitree_go::msg::dds_::BmsCmd_ — battery management command.""" + +from __future__ import annotations + +from dataclasses import dataclass + +import numpy as np + + +@dataclass +class BmsCmd: + off: int + reserve: np.ndarray # u8[3] + + __cdr_fields__ = [ + ("off", "u8"), + ("reserve", ("array", "u8", 3)), + ] diff --git a/dimos/robot/unitree/go2/dds/msgs/CompressedVideo.py b/dimos/robot/unitree/go2/dds/msgs/CompressedVideo.py new file mode 100644 index 0000000000..97a17e10a3 --- /dev/null +++ b/dimos/robot/unitree/go2/dds/msgs/CompressedVideo.py @@ -0,0 +1,53 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""foxglove_msgs::msg::CompressedVideo_ — one encoded video packet (rt/frontvideo/h264). + +Just the encoded bytes + codec name. Inter-frame codecs (h264) can't be decoded +one packet at a time — feed the ordered stream through +:class:`~dimos.robot.unitree.go2.dds.video.H264Decoder` to get ``Image`` frames. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +import numpy as np + + +@dataclass +class CompressedVideo: + data: np.ndarray # u8[], the encoded packet (Annex-B for h264) + format: str # codec name, e.g. "h264" + frame_id: str + + def to_rerun(self) -> Any: + """Log the encoded packet as a rerun ``VideoStream`` sample (viewer decodes). + + rerun decodes the stream in-viewer, so this stays per-packet and cheap — + no server-side decode, and the .rrd holds the compressed bytes. Iterate + from the start (or a keyframe) so the first sample the viewer sees is one. + """ + import rerun as rr + + codecs = { + "h264": rr.VideoCodec.H264, + "h265": rr.VideoCodec.H265, + "av1": rr.VideoCodec.AV1, + } + codec = codecs.get(self.format.lower()) + if codec is None: + raise ValueError(f"no rerun VideoCodec for format {self.format!r}") + return rr.VideoStream(codec, sample=self.data.tobytes()) diff --git a/dimos/robot/unitree/go2/dds/msgs/LowCmd.py b/dimos/robot/unitree/go2/dds/msgs/LowCmd.py new file mode 100644 index 0000000000..afe559e73b --- /dev/null +++ b/dimos/robot/unitree/go2/dds/msgs/LowCmd.py @@ -0,0 +1,65 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""unitree_go::msg::dds_::LowCmd_ — low-level motor commands (rt/lowcmd). + +The actuation counterpart to :class:`~dimos.robot.unitree.go2.dds.msgs.LowState.LowState`: +20 per-joint targets (position/velocity/torque + kp/kd gains) sent to the robot. +""" + +from __future__ import annotations + +from dataclasses import dataclass + +import numpy as np + +from dimos.robot.unitree.go2.dds.msgs.base import PrettyMsg +from dimos.robot.unitree.go2.dds.msgs.BmsCmd import BmsCmd +from dimos.robot.unitree.go2.dds.msgs.MotorCmd import MotorCmd + + +@dataclass(repr=False) +class LowCmd(PrettyMsg): + head: np.ndarray # u8[2] + level_flag: int + frame_reserve: int + sn: np.ndarray # u32[2] + version: np.ndarray # u32[2] + bandwidth: int + motor_cmd: list[MotorCmd] # [20] + bms_cmd: BmsCmd + wireless_remote: np.ndarray # u8[40] + led: np.ndarray # u8[12] + fan: np.ndarray # u8[2] + gpio: int + reserve: int + # NOTE: the SDK's trailing `crc` (uint32) is absent on this Go2's firmware + # wire format — verified against the recording (body ends after `reserve`), + # matching the same omission in LowState. + + __cdr_fields__ = [ + ("head", ("array", "u8", 2)), + ("level_flag", "u8"), + ("frame_reserve", "u8"), + ("sn", ("array", "u32", 2)), + ("version", ("array", "u32", 2)), + ("bandwidth", "u16"), + ("motor_cmd", ("array", MotorCmd, 20)), + ("bms_cmd", BmsCmd), + ("wireless_remote", ("array", "u8", 40)), + ("led", ("array", "u8", 12)), + ("fan", ("array", "u8", 2)), + ("gpio", "u8"), + ("reserve", "u32"), + ] diff --git a/dimos/robot/unitree/go2/dds/msgs/MotorCmd.py b/dimos/robot/unitree/go2/dds/msgs/MotorCmd.py new file mode 100644 index 0000000000..0a759397b5 --- /dev/null +++ b/dimos/robot/unitree/go2/dds/msgs/MotorCmd.py @@ -0,0 +1,42 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""unitree_go::msg::dds_::MotorCmd_ — one joint's command (target + gains).""" + +from __future__ import annotations + +from dataclasses import dataclass + +import numpy as np + + +@dataclass +class MotorCmd: + mode: int + q: float # target position (rad) + dq: float # target velocity (rad/s) + tau: float # feedforward torque (N·m) + kp: float # position gain + kd: float # velocity gain + reserve: np.ndarray # u32[3] + + __cdr_fields__ = [ + ("mode", "u8"), + ("q", "f32"), + ("dq", "f32"), + ("tau", "f32"), + ("kp", "f32"), + ("kd", "f32"), + ("reserve", ("array", "u32", 3)), + ] diff --git a/dimos/robot/unitree/go2/dds/ros.py b/dimos/robot/unitree/go2/dds/ros.py index 17e7abeed2..c9174a78c6 100644 --- a/dimos/robot/unitree/go2/dds/ros.py +++ b/dimos/robot/unitree/go2/dds/ros.py @@ -44,6 +44,8 @@ from dimos.msgs.sensor_msgs.Imu import Imu from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2 from dimos.robot.unitree.go2.dds import cdr +from dimos.robot.unitree.go2.dds.msgs.CompressedVideo import CompressedVideo +from dimos.robot.unitree.go2.dds.msgs.HeightMap import HeightMap # Shared wire specs (Header/Time) reused by the per-message layouts below. @@ -257,3 +259,59 @@ def decode_compressed_image(buf: bytes) -> Image | None: if bgr is None: return None return Image.from_numpy(bgr, ImageFormat.BGR, w.header.frame_id, _ts(w.header)) + + +# foxglove_msgs/CompressedVideo (field order differs from CompressedImage: +# timestamp, frame_id, data, format). +@dataclass +class _CVideoWire: + stamp: _Time + frame_id: str + data: np.ndarray # u8[] + format: str + + __cdr_fields__ = [ + ("stamp", _Time), + ("frame_id", "string"), + ("data", ("seq", "u8")), + ("format", "string"), + ] + + +def decode_compressed_video(buf: bytes) -> CompressedVideo: + """Decode the CDR envelope only — the encoded packet is left for H264Decoder.""" + w: _CVideoWire = cdr.decode(buf, _CVideoWire)[0] + return CompressedVideo(data=w.data, format=w.format, frame_id=w.frame_id) + + +# unitree_go/HeightMap (rt/utlidar/height_map_array) +@dataclass +class _HeightMapWire: + stamp: float + frame_id: str + resolution: float + width: int + height: int + origin: np.ndarray # f32[2] + data: np.ndarray # f32[] + + __cdr_fields__ = [ + ("stamp", "f64"), + ("frame_id", "string"), + ("resolution", "f32"), + ("width", "u32"), + ("height", "u32"), + ("origin", ("array", "f32", 2)), + ("data", ("seq", "f32")), + ] + + +def decode_height_map(buf: bytes) -> HeightMap: + w: _HeightMapWire = cdr.decode(buf, _HeightMapWire)[0] + return HeightMap( + resolution=w.resolution, + origin=w.origin, + data=w.data.reshape(w.height, w.width), + frame_id=w.frame_id, + ts=w.stamp, + ) diff --git a/dimos/robot/unitree/go2/dds/store.py b/dimos/robot/unitree/go2/dds/store.py index 7c2a9d9cbe..aeb34615a4 100644 --- a/dimos/robot/unitree/go2/dds/store.py +++ b/dimos/robot/unitree/go2/dds/store.py @@ -37,10 +37,14 @@ # memory2 stream name -> Go2 DDS topic. STREAMS: dict[str, str] = { "lidar": "rt/utlidar/cloud", + "lidar_deskewed": "rt/utlidar/cloud_deskewed", + "height_map": "rt/utlidar/height_map_array", "imu": "rt/utlidar/imu", "odom": "rt/utlidar/robot_odom", "color_image": "rt/frontvideo", + "color_image_h264": "rt/frontvideo/h264", "lowstate": "rt/lowstate", + "lowcmd": "rt/lowcmd", "sportmodestate": "rt/sportmodestate", } diff --git a/dimos/robot/unitree/go2/dds/video.py b/dimos/robot/unitree/go2/dds/video.py new file mode 100644 index 0000000000..60d5b6fa3e --- /dev/null +++ b/dimos/robot/unitree/go2/dds/video.py @@ -0,0 +1,68 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Stateful h264 decode for the Go2 front camera (``rt/frontvideo/h264``). + +The camera streams h264 where each P-frame references earlier frames, so a +packet can't be decoded on its own (unlike the single-frame jpeg +``rt/frontvideo`` path). This transform holds one PyAV decoder across the +*ordered* packet stream and emits decoded ``Image`` observations:: + + from dimos.robot.unitree.go2.dds.store import Go2McapStore + from dimos.robot.unitree.go2.dds.video import H264Decoder + + store = Go2McapStore(path="go2_dds_stairs.mcap") + for obs in store.streams.color_image_h264.transform(H264Decoder()): + obs.data # a BGR dimos Image (1280x720) + +Because state carries forward, iterate from the start (or a keyframe). Packets +the decoder can't yet resolve — e.g. P-frames after a mid-GOP seek — are +dropped until the next keyframe re-syncs it. +""" + +from __future__ import annotations + +from collections.abc import Iterator +from typing import TYPE_CHECKING + +from dimos.memory2.transform import Transformer +from dimos.msgs.sensor_msgs.Image import Image, ImageFormat +from dimos.robot.unitree.go2.dds.msgs.CompressedVideo import CompressedVideo + +if TYPE_CHECKING: + from dimos.memory2.type.observation import Observation + + +class H264Decoder(Transformer[CompressedVideo, Image]): + """Decode an ordered ``CompressedVideo`` (h264) stream into BGR ``Image`` frames.""" + + def __call__( + self, upstream: Iterator[Observation[CompressedVideo]] + ) -> Iterator[Observation[Image]]: + import av # optional dep (go2/unitree extra) + + decoder = av.codec.CodecContext.create("h264", "r") + for obs in upstream: + packet = obs.data + if packet is None: + continue + try: + frames = decoder.decode(av.packet.Packet(packet.data.tobytes())) + except av.error.FFmpegError: + continue # P-frame with no reference yet (e.g. seeked past a keyframe) + for frame in frames: + bgr = frame.to_ndarray(format="bgr24") + yield obs.derive( + data=Image.from_numpy(bgr, ImageFormat.BGR, packet.frame_id, obs.ts) + ) diff --git a/dimos/utils/cli/map.py b/dimos/utils/cli/map.py new file mode 100644 index 0000000000..20caceb8a7 --- /dev/null +++ b/dimos/utils/cli/map.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python3 +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License.