implement namespace isolation for multi-embodiment blueprints with du…#2519
implement namespace isolation for multi-embodiment blueprints with du…#2519Karan24Soni wants to merge 3 commits into
Conversation
…plicate module support
Greptile SummaryThis PR introduces namespace isolation for multi-embodiment blueprints by replacing the
Confidence Score: 3/5The namespace isolation plumbing is largely correct, but two public lifecycle APIs — unload_module and restart_module — were not updated and will always raise an error when called against any namespaced module, making half of the module lifecycle API non-functional for the exact deployment pattern this PR is meant to enable. The core routing fix (InstanceKey, proxy.remote_name patching, namespaced topics) is sound and the happy-path deployment works. However, unload_module and restart_module hard-code InstanceKey(None, module_class), so any caller that tries to stop or hot-reload a namespaced module will always get a ValueError even when the module is running. This leaves the lifecycle management of multi-embodiment deployments broken on the same PR that introduces them. dimos/core/coordination/module_coordinator.py — specifically the unload_module (line 411) and restart_module (line 482) public methods that need a namespace= parameter to match the updated restart_module_by_class_name. Important Files Changed
Sequence Diagram%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant User
participant Blueprint
participant ModuleCoordinator
participant WorkerManagerPython
participant Worker as Worker Process
User->>Blueprint: autoconnect(robot1_bp, robot2_bp)
Blueprint->>Blueprint: "namespace("robot1") stamps all atoms with ns="robot1""
Blueprint->>Blueprint: _eliminate_duplicates keyed on (namespace, module)
User->>ModuleCoordinator: load_blueprint(system_bp)
ModuleCoordinator->>ModuleCoordinator: "_deploy_all_modules injects __dimos_rpc_name__="robot1/ModuleA""
ModuleCoordinator->>WorkerManagerPython: deploy_parallel(specs with kwargs)
WorkerManagerPython->>Worker: spawn, ModuleBase.__init__ pops __dimos_rpc_name__
Worker-->>WorkerManagerPython: "proxy (remote_name still = "ModuleA")"
WorkerManagerPython-->>ModuleCoordinator: proxy
ModuleCoordinator->>ModuleCoordinator: "patch proxy.remote_name = "robot1/ModuleA""
ModuleCoordinator->>ModuleCoordinator: "store InstanceKey("robot1", ModuleA) -> proxy"
ModuleCoordinator->>ModuleCoordinator: _connect_streams keyed on (namespace, remapped_name, type)
Note over ModuleCoordinator: topic = /robot1/camera (isolated per namespace)
ModuleCoordinator->>ModuleCoordinator: _connect_module_refs resolves refs within same namespace
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant User
participant Blueprint
participant ModuleCoordinator
participant WorkerManagerPython
participant Worker as Worker Process
User->>Blueprint: autoconnect(robot1_bp, robot2_bp)
Blueprint->>Blueprint: "namespace("robot1") stamps all atoms with ns="robot1""
Blueprint->>Blueprint: _eliminate_duplicates keyed on (namespace, module)
User->>ModuleCoordinator: load_blueprint(system_bp)
ModuleCoordinator->>ModuleCoordinator: "_deploy_all_modules injects __dimos_rpc_name__="robot1/ModuleA""
ModuleCoordinator->>WorkerManagerPython: deploy_parallel(specs with kwargs)
WorkerManagerPython->>Worker: spawn, ModuleBase.__init__ pops __dimos_rpc_name__
Worker-->>WorkerManagerPython: "proxy (remote_name still = "ModuleA")"
WorkerManagerPython-->>ModuleCoordinator: proxy
ModuleCoordinator->>ModuleCoordinator: "patch proxy.remote_name = "robot1/ModuleA""
ModuleCoordinator->>ModuleCoordinator: "store InstanceKey("robot1", ModuleA) -> proxy"
ModuleCoordinator->>ModuleCoordinator: _connect_streams keyed on (namespace, remapped_name, type)
Note over ModuleCoordinator: topic = /robot1/camera (isolated per namespace)
ModuleCoordinator->>ModuleCoordinator: _connect_module_refs resolves refs within same namespace
Reviews (3): Last reviewed commit: "fixed namespace" | Re-trigger Greptile |
| for namespace, remapped_name, stream_type in streams.keys(): | ||
| map_key = (remapped_name, stream_type) | ||
| if map_key in self._transport_registry: | ||
| transport = self._transport_registry[map_key] | ||
| else: | ||
| transport = _get_transport_for(blueprint, remapped_name, stream_type) | ||
| self._transport_registry[key] = transport | ||
| for module, original_name in streams[key]: | ||
| instance = self.get_instance(module) # type: ignore[assignment] | ||
| transport = _get_transport_for(blueprint, remapped_name, stream_type, namespace) | ||
| self._transport_registry[map_key] = transport |
There was a problem hiding this comment.
Stream transport isolation broken for same-named streams across namespaces
_transport_registry is keyed on (remapped_name, stream_type) without the namespace, so when two namespaced blueprints share a stream name and type (e.g., both robot1 and robot2 declare a camera: Out[Image] stream), the second namespace processed finds the first namespace's transport in the registry and reuses it. Both robots end up wired to the same underlying data channel, defeating the namespace isolation this PR is meant to provide.
Concrete failure: deploy autoconnect(...).namespace("robot1") and autoconnect(...).namespace("robot2") where both modules declare a camera output — robot2's camera transport will be robot1's topic (/robot1/camera instead of /robot2/camera), and messages will be shared.
| for namespace, remapped_name, stream_type in streams.keys(): | |
| map_key = (remapped_name, stream_type) | |
| if map_key in self._transport_registry: | |
| transport = self._transport_registry[map_key] | |
| else: | |
| transport = _get_transport_for(blueprint, remapped_name, stream_type) | |
| self._transport_registry[key] = transport | |
| for module, original_name in streams[key]: | |
| instance = self.get_instance(module) # type: ignore[assignment] | |
| transport = _get_transport_for(blueprint, remapped_name, stream_type, namespace) | |
| self._transport_registry[map_key] = transport | |
| for namespace, remapped_name, stream_type in streams.keys(): | |
| map_key = (namespace, remapped_name, stream_type) | |
| if map_key in self._transport_registry: | |
| transport = self._transport_registry[map_key] | |
| else: | |
| transport = _get_transport_for(blueprint, remapped_name, stream_type, namespace) | |
| self._transport_registry[map_key] = transport |
| def _deploy_group(dep: str) -> None: | ||
| deployed = self._managers[dep].deploy_parallel(specs_by_deployment[dep], blueprint_args) | ||
| for index, module in zip(indices_by_deployment[dep], deployed, strict=True): | ||
| results[index] = module | ||
| for i, module in enumerate(deployed): | ||
| if module is not None: | ||
| spec_kwargs = specs_by_deployment[dep][i][2] | ||
| rpc_name = spec_kwargs.get("__dimos_rpc_name__") | ||
| if rpc_name and hasattr(module, "remote_name"): | ||
| module.remote_name = rpc_name | ||
|
|
||
| original_index = indices_by_deployment[dep][i] | ||
| results[original_index] = module |
There was a problem hiding this comment.
strict=True removed from zip, silently masking length mismatches
The original zip(indices_by_deployment[dep], deployed, strict=True) was a runtime assertion that the manager returned exactly as many proxies as specs were submitted. The replacement enumerate(deployed) drops this check: if deployed is shorter than indices_by_deployment[dep], those result slots are never filled and the deployment silently loses modules; if deployed is longer, indices_by_deployment[dep][i] raises an IndexError at the end of the loop instead of at the mismatched position. The safety net that made mismatches loud and early is gone.
| kwargs["__dimos_namespace__"] = key.namespace | ||
| kwargs["__dimos_rpc_name__"] = f"{key.namespace}/{new_class.__name__}" if key.namespace else new_class.__name__ | ||
|
|
||
| python_wm = cast("WorkerManagerPython", self._managers["python"]) | ||
| new_proxy = python_wm.deploy_fresh(new_class, self._global_config, kwargs) | ||
| self._deployed_modules[new_class] = new_proxy | ||
| if hasattr(new_proxy, "remote_name"): | ||
| new_proxy.remote_name = kwargs["__dimos_rpc_name__"] | ||
|
|
||
| self._deployed_modules[new_key] = new_proxy | ||
|
|
||
| new_bp = new_class.blueprint(**kwargs) | ||
| new_atom = new_bp.active_blueprints[0] |
There was a problem hiding this comment.
Internal framework keys leaked into
BlueprintAtom.kwargs via blueprint() call
kwargs is augmented with __dimos_namespace__ and __dimos_rpc_name__ before being passed to new_class.blueprint(**kwargs). Those keys end up stored inside the new BlueprintAtom.kwargs. On the next restart, old_atom.kwargs already carries them, so they accumulate. While ModuleBase.__init__ pops them before passing to super().__init__, any code path that inspects BlueprintAtom.kwargs to reconstruct user-visible configuration will see framework-internal noise. Consider stripping these keys before the blueprint() call and re-injecting them when needed.
| reload_source: bool = True, | ||
| ) -> None: | ||
| with self._modules_lock: | ||
| for cls in self._deployed_modules: | ||
| if cls.__name__ == class_name: | ||
| self._restart_module(cls, reload_source=reload_source) | ||
| for key in self._deployed_modules: | ||
| if key.module.__name__ == class_name: |
There was a problem hiding this comment.
restart_module_by_class_name is non-deterministic when a class appears in multiple namespaces
The method iterates _deployed_modules and restarts the first key whose module.__name__ matches. Dict iteration order is insertion order in Python 3.7+, so if ModuleA is deployed as both robot1/ModuleA and robot2/ModuleA, the method will always restart whichever was inserted first with no way for the caller to choose the other. There is also no public API to restart a namespaced module by explicit (namespace, class_name) pair.
| #new | ||
| namespace: str | None = None | ||
| #end |
There was a problem hiding this comment.
Left-over development markers should be removed before merging
#new / #end block comments appear throughout the diff (blueprints.py, module_coordinator.py, and module.py). These are staging artifacts and should be stripped from production code.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
Problem
so basically if you tried to run multiple robots in the same coordinator using namespaces, it just bricked. the worker booted up fine and listened on the namespaced topic (like
robot1/ModuleA), butWorkerManagerPythoncompletely ignored the namespace when building theRPCClientproxy. the proxy hardcoded its routing to just the base class nameModuleA.because of this, the proxy would sit there for 120 seconds screaming into the void waiting for a response while the worker was listening somewhere else. totally blocked multi-embodiment setups where we need duplicate modules running side-by-side.
Solution
forced the proxy to actually respect the namespace so it routes correctly without touching the underlying rpc spec.
__dimos_rpc_name__into the kwargs during initial deployment and inside_restart_module.deployanddeploy_parallelin the coordinator so the exact second the proxy is handed back from the worker manager, we manually overrideproxy.remote_name = kwargs["__dimos_rpc_name__"].you can now load
autoconnect(...).namespace("robot1")androbot2with the exact same module classes and it just works.How to Test
added a brand new regression test to guarantee nobody breaks namespace isolation again:
PYTHONPATH=. python -m pytest dimos/core/tests/test_blueprint_namespaces.py -q --tb=short -xand verify the reload/restart hooks are still totally green:
PYTHONPATH=. python -m pytest dimos/core/coordination/test_module_coordinator.py -q --tb=short -k "restart or reload or unload" -n auto -x