go2 dds decoding improvements#2521
Conversation
Greptile SummaryThis PR improves Go2 DDS recording decode coverage: it adds CDR message types for
Confidence Score: 2/5Not safe to merge: the missing HeightMap.py causes an ImportError at load time for codec.py and ros.py, breaking all Go2 DDS stream decoding. Both codec.py and ros.py import HeightMap from a module that was never committed. These are top-level imports, so loading either file — which happens the moment any Go2 DDS topic is decoded — raises ImportError. This makes the entire DDS codec pipeline non-functional, including all the other new decoders added in this PR. dimos/robot/unitree/go2/dds/codec.py and dimos/robot/unitree/go2/dds/ros.py — both reference the missing HeightMap class. Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
A["mcap file"] --> B["Go2McapStore"]
B --> C{"stream name"}
C -->|"rt/utlidar/cloud"| D["PointCloud2 → FnCodec(decode_pointcloud2)"]
C -->|"rt/utlidar/cloud_deskewed"| D
C -->|"rt/utlidar/height_map_array"| E["HeightMap → FnCodec(decode_height_map)\n⚠️ ImportError: HeightMap.py missing"]
C -->|"rt/frontvideo"| F["Image → FnCodec(decode_compressed_image)"]
C -->|"rt/frontvideo/h264"| G["CompressedVideo → FnCodec(decode_compressed_video)"]
G --> H["H264Decoder transformer\n(stateful PyAV decode)"]
H --> I["Image (BGR)"]
C -->|"rt/lowstate"| J["LowState → CdrStructCodec"]
C -->|"rt/lowcmd"| K["LowCmd → CdrStructCodec\n(MotorCmd × 20 + BmsCmd)"]
C -->|"rt/sportmodestate"| L["SportModeState → CdrStructCodec"]
B --> M["uncodec_channels()\n(topics with no codec)"]
M --> N["mem summary output"]
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
flowchart TD
A["mcap file"] --> B["Go2McapStore"]
B --> C{"stream name"}
C -->|"rt/utlidar/cloud"| D["PointCloud2 → FnCodec(decode_pointcloud2)"]
C -->|"rt/utlidar/cloud_deskewed"| D
C -->|"rt/utlidar/height_map_array"| E["HeightMap → FnCodec(decode_height_map)\n⚠️ ImportError: HeightMap.py missing"]
C -->|"rt/frontvideo"| F["Image → FnCodec(decode_compressed_image)"]
C -->|"rt/frontvideo/h264"| G["CompressedVideo → FnCodec(decode_compressed_video)"]
G --> H["H264Decoder transformer\n(stateful PyAV decode)"]
H --> I["Image (BGR)"]
C -->|"rt/lowstate"| J["LowState → CdrStructCodec"]
C -->|"rt/lowcmd"| K["LowCmd → CdrStructCodec\n(MotorCmd × 20 + BmsCmd)"]
C -->|"rt/sportmodestate"| L["SportModeState → CdrStructCodec"]
B --> M["uncodec_channels()\n(topics with no codec)"]
M --> N["mem summary output"]
Reviews (2): Last reviewed commit: "perception of height" | Re-trigger Greptile |
| for obs in upstream: | ||
| packet = obs.data | ||
| if packet is None: | ||
| continue | ||
| try: | ||
| frames = decoder.decode(av.packet.Packet(packet.data.tobytes())) | ||
| except av.error.FFmpegError: | ||
| continue # P-frame with no reference yet (e.g. seeked past a keyframe) | ||
| for frame in frames: | ||
| bgr = frame.to_ndarray(format="bgr24") | ||
| yield obs.derive( | ||
| data=Image.from_numpy(bgr, ImageFormat.BGR, packet.frame_id, obs.ts) | ||
| ) |
There was a problem hiding this comment.
H264Decoder silently drops buffered frames at end of stream
After the last packet is consumed, the codec context may still hold decoded frames waiting on temporal dependencies (B-frames / delayed output). Without a flush step, those frames are never yielded. For a typical h264 GOP of 30–120 frames, the last 1–4 seconds of a recording can disappear without any error. The fix is to drive a final decoder.flush() call and yield any remaining frames after the upstream loop exits.
| def stream_payload_types(store: Store) -> dict[str, type]: | ||
| """Map each stream name in *store* to its payload type (any backend).""" | ||
| return { | ||
| name: cast("Backend[Any]", store.stream(name)._source).data_type | ||
| for name in store.list_streams() | ||
| } |
There was a problem hiding this comment.
stream_payload_types reaches into private _source attribute
store.stream(name)._source is not part of the public Store/Stream API. Any refactor that renames or removes _source (or changes how Backend exposes data_type) will produce a silent AttributeError at runtime rather than a type error. replay.py calls this path every time a recording is replayed. Consider exposing data_type through the public stream() return value or through a dedicated Store.payload_type(name) accessor.
❌ 1 Tests Failed:
View the top 1 failed test(s) by shortest run time
To view more test analytics, go to the Test Analytics Dashboard |
| from dimos.robot.unitree.go2.dds import cdr, ros | ||
| from dimos.robot.unitree.go2.dds.msgs.CompressedVideo import CompressedVideo | ||
| from dimos.robot.unitree.go2.dds.msgs.ControlEvent import ControlEvent | ||
| from dimos.robot.unitree.go2.dds.msgs.HeightMap import HeightMap |
There was a problem hiding this comment.
Missing
HeightMap.py — module-level ImportError breaks all Go2 DDS decoding
HeightMap is imported at module level from dimos.robot.unitree.go2.dds.msgs.HeightMap, but no such file exists in the msgs/ directory (neither a HeightMap.py nor an __init__.py that exports the class). The same import appears in ros.py line 48. Because these are top-level imports, any attempt to load codec.py or ros.py — which happens as soon as the Go2 codec registry is used — raises ImportError, silently disabling all Go2 DDS stream decoding. The BmsCmd.py, CompressedVideo.py, LowCmd.py, and MotorCmd.py additions all have corresponding files, but HeightMap.py was not included in the PR.
| @@ -0,0 +1,14 @@ | |||
| #!/usr/bin/env python3 | |||
There was a problem hiding this comment.
Did you mean to commit this?
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| """foxglove_msgs::msg::CompressedVideo_ — one encoded video packet (rt/frontvideo/h264). |
There was a problem hiding this comment.
Are we still using foxglove?
| store = open_dataset(dataset) | ||
| with store: | ||
| for name in store.list_streams(): | ||
| store.streams[name] # register so summary() includes it |
There was a problem hiding this comment.
This kinda indicates a deficiency in the API if you have to include empty statements to get .summary() to work.
Shouldn't .summary() loop over the streams instead?
|
|
||
| # mcap files may carry channels we have no codec for (e.g. h264 video); | ||
| # list them so the inventory is complete rather than silently filtered. | ||
| uncodec = getattr(store, "uncodec_channels", None) |
There was a problem hiding this comment.
I don't like this approach of getting random methods by name.
If you specifically need McapStore.uncodec_channels, then check if store is a McapStore. If multiple stores are expected to have uncodec_channels, then declare uncodec_channels on Store and set it to return [] by default and let McapStore.uncodec_channels implement a different functionality.
| # list them so the inventory is complete rather than silently filtered. | ||
| uncodec = getattr(store, "uncodec_channels", None) | ||
| for topic, (count, schema) in sorted(uncodec().items()) if uncodec else []: | ||
| print(f" (no codec) {topic}: {count} msgs [{schema or '?'}]") |
There was a problem hiding this comment.
Better to use typer.echo(...).
dimos mem rerun --root /sportmodestate go2_dds_stairs.mcap