diff --git a/pyproject.toml b/pyproject.toml index 398326fc3..99e01ae81 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,7 +48,9 @@ requires-python = ">=3.10" "import-linter~=2.10", "pytest-deadfixtures~=3.1", "taplo~=0.9.3", + "gymnasium~=1.2", ] + rl = ["gymnasium~=1.2"] docs = [ "sphinx~=8.1", "nvidia-sphinx-theme~=0.0.8", diff --git a/src/cloudai/_core/test_scenario.py b/src/cloudai/_core/test_scenario.py index 15efe8b2e..7c31406d0 100644 --- a/src/cloudai/_core/test_scenario.py +++ b/src/cloudai/_core/test_scenario.py @@ -97,6 +97,7 @@ class TestRun: reports: Set[Type[ReportGenerationStrategy]] = field(default_factory=set) extra_srun_args: str | None = None num_nodes_explicit: bool = False + current_env_params: dict[str, Any] = field(default_factory=dict) def __hash__(self) -> int: return hash(self.name + self.test.name + str(self.iterations) + str(self.current_iteration)) @@ -156,7 +157,9 @@ def param_space(self) -> dict[str, Any]: **{ key: value for key, value in cmd_args_dict.items() - if isinstance(value, list) and not self.test.is_dse_excluded_arg(key) + if isinstance(value, list) + and not self.test.is_dse_excluded_arg(key) + and not self.test.is_env_sampled(key) }, **{f"extra_env_vars.{key}": value for key, value in extra_env_vars_dict.items() if isinstance(value, list)}, } @@ -184,27 +187,40 @@ def all_combinations(self) -> list[dict[str, Any]]: return all_combinations - def apply_params_set(self, action: dict[str, Any]) -> "TestRun": + def apply_params_set(self, action: dict[str, Any], env_params: dict[str, Any] | None = None) -> "TestRun": tdef = self.test.model_copy(deep=True) - for key, value in action.items(): + + def _apply(key: str, value: Any) -> None: if key.startswith("extra_env_vars."): tdef.extra_env_vars[key[len("extra_env_vars.") :]] = value + return + attrs = key.split(".") + obj = tdef.cmd_args + for attr in attrs[:-1]: + obj = obj[attr] if isinstance(obj, dict) else getattr(obj, attr) + if isinstance(obj, dict): + obj[attrs[-1]] = value else: - attrs = key.split(".") - obj = tdef.cmd_args - for attr in attrs[:-1]: - obj = obj[attr] if isinstance(obj, dict) else getattr(obj, attr) - if isinstance(obj, dict): - obj[attrs[-1]] = value - else: - setattr(obj, attrs[-1], value) + setattr(obj, attrs[-1], value) + + # RNG runs in the env before this call; applying only concrete values keeps this deterministic. + for key, value in action.items(): + _apply(key, value) + for key, value in (env_params or {}).items(): + _apply(key, value) - type(tdef)(**tdef.model_dump()) # trigger validation + # env_params is validated at parse time; after the overlay its target cmd_args fields hold + # concrete scalar draws, so re-validating it here would reject weighted specs. Drop it for + # this validation-only pass, which exists to validate the applied action values. + validation_args = tdef.model_dump() + validation_args.pop("env_params", None) + type(tdef)(**validation_args) # trigger validation new_tr = copy.deepcopy(self) new_tr.test = tdef if "NUM_NODES" in action: new_tr.num_nodes = action["NUM_NODES"] + new_tr.current_env_params = dict(env_params or {}) return new_tr diff --git a/src/cloudai/cli/handlers.py b/src/cloudai/cli/handlers.py index 3fd099dc0..5fb6796cf 100644 --- a/src/cloudai/cli/handlers.py +++ b/src/cloudai/cli/handlers.py @@ -27,6 +27,7 @@ import toml import yaml +from cloudai.configurator.env_params import validate_dse_env_params from cloudai.core import ( BaseInstaller, CloudAIGymEnv, @@ -39,6 +40,7 @@ System, TestParser, TestScenario, + TestScenarioParsingError, ) from cloudai.models.scenario import ReportConfig from cloudai.models.workload import TestDefinition @@ -133,8 +135,7 @@ def handle_dse_job(runner: Runner, args: argparse.Namespace) -> int: return 1 err = 0 - # Recoverable failures return a non-zero rc and are accumulated here; an unexpected exception - # (a bug) is a hard-fail. We capture it so reports still generate, then re-raise below. + # Capture an unexpected error so reports still generate, then re-raise below. run_error: Exception | None = None try: for tr in runner.runner.test_scenario.test_runs: @@ -177,7 +178,7 @@ def handle_dse_job(runner: Runner, args: argparse.Namespace) -> int: ) if run_error is not None: - raise run_error + raise run_error.with_traceback(run_error.__traceback__) logging.info("All jobs are complete.") return err @@ -303,6 +304,12 @@ def handle_dry_run_and_run(args: argparse.Namespace) -> int: return 1 system, test_scenario, tests = setup_result + try: + validate_dse_env_params(test_scenario) + except TestScenarioParsingError as e: + logging.error(str(e)) + return 1 + if not _handle_single_sbatch(args, system): return 1 @@ -491,7 +498,8 @@ def verify_test_scenarios( tests = Parser.parse_tests(test_tomls, system) hook_tests = Parser.parse_tests(hook_test_tomls, system) hooks = Parser.parse_hooks(hook_tomls, system, {t.name: t for t in hook_tests}) - Parser.parse_test_scenario(scenario_file, system, {t.name: t for t in tests}, hooks) + scenario = Parser.parse_test_scenario(scenario_file, system, {t.name: t for t in tests}, hooks) + validate_dse_env_params(scenario) except Exception: nfailed += 1 diff --git a/src/cloudai/configurator/__init__.py b/src/cloudai/configurator/__init__.py index f05b65c5b..a88432c41 100644 --- a/src/cloudai/configurator/__init__.py +++ b/src/cloudai/configurator/__init__.py @@ -18,11 +18,13 @@ from .base_gym import BaseGym from .cloudai_gym import CloudAIGymEnv, TrajectoryEntry from .grid_search import GridSearchAgent +from .gymnasium_adapter import GymnasiumAdapter __all__ = [ "BaseAgent", "BaseGym", "CloudAIGymEnv", "GridSearchAgent", + "GymnasiumAdapter", "TrajectoryEntry", ] diff --git a/src/cloudai/configurator/base_agent.py b/src/cloudai/configurator/base_agent.py index 321e6e659..5fe1059ef 100644 --- a/src/cloudai/configurator/base_agent.py +++ b/src/cloudai/configurator/base_agent.py @@ -58,6 +58,10 @@ class BaseAgent(ABC): Provides a unified interface and parameter management for action spaces. """ + # Opt-in: agents that consume per-trial env_params sampling set this True. Default False so + # env_params declared for a non-sampling agent are rejected rather than silently ignored. + samples_env_params: bool = False + def __init__(self, env: BaseGym, config: BaseAgentConfig): """ Initialize the agent with the environment. @@ -94,9 +98,8 @@ def select_action(self, observation: list[float] | None = None) -> tuple[int, di Args: observation: Latest observation produced by the environment (``env.reset()`` on the - first call, then the result of the prior ``env.step()``). Stateless agents such - as grid search or Bayesian optimization may ignore this; observation-conditioned - agents (RL, contextual bandits) should use it. + first call, then the result of the prior ``env.step()``). Stateless agents may + ignore this; observation-conditioned agents should use it. Returns: Tuple[int, Dict[str, Any]] | None: The current step index and a dictionary mapping action keys @@ -120,8 +123,7 @@ def run(self) -> int: Default: a step loop driven by the dispatcher (``select_action`` → ``env.step`` → ``update_policy`` per trial). Agents that drive their - own training loop (e.g. RLlib-based agents calling ``algo.train()``) - override this method. + own training loop override this method. Failure contract (``handle_dse_job`` consumes the result via ``err |= agent.run()``): @@ -131,7 +133,8 @@ def run(self) -> int: accumulated and the next ``TestRun`` still executes. Workload-level failures are already surfaced this way: ``CloudAIGymEnv.step`` maps a failed metric to ``rewards.metric_failure`` rather than raising, and - ``rllib_run`` catches training errors and returns ``rc=1``. + agents with their own training loop should likewise catch training + errors and return a non-zero code. - Raise for *unexpected* failures (framework/agent bugs). Exceptions propagate out of ``handle_dse_job`` and hard-fail the job so the bug is surfaced instead of masked as a penalizing reward. diff --git a/src/cloudai/configurator/cloudai_gym.py b/src/cloudai/configurator/cloudai_gym.py index 258a47f3c..b32e137cc 100644 --- a/src/cloudai/configurator/cloudai_gym.py +++ b/src/cloudai/configurator/cloudai_gym.py @@ -26,6 +26,7 @@ from .base_agent import RewardOverrides from .base_gym import BaseGym +from .env_params import EnvParams, EnvParamsSink @dataclasses.dataclass(frozen=True) @@ -36,6 +37,7 @@ class TrajectoryEntry: action: dict[str, Any] reward: float observation: list + env_params: dict[str, Any] = dataclasses.field(default_factory=dict) class CloudAIGymEnv(BaseGym): @@ -61,8 +63,15 @@ def __init__(self, test_run: TestRun, runner: BaseRunner, rewards: RewardOverrid self.max_steps = test_run.test.agent_steps self.reward_function = Registry().get_reward_function(test_run.test.agent_reward_function) self.trajectory: dict[int, list[TrajectoryEntry]] = {} + self.params: EnvParams | None = EnvParams.from_test(test_run.test) + self.env_params_sink = EnvParamsSink() super().__init__() + @property + def env_params_record_path(self) -> Path: + """``env.csv`` lives alongside ``trajectory.csv`` so a plain ``merge`` joins them.""" + return self.iteration_dir / "env.csv" + def define_action_space(self) -> Dict[str, list[Any]]: return self.test_run.param_space @@ -76,9 +85,10 @@ def define_observation_space(self) -> list: Define the observation space for the environment. Returns: - list: The observation space. + list: One float slot per agent metric (at least one), giving the correct shape + for adapters that derive ``gymnasium.spaces.Box`` from this output. """ - return [0.0] + return [0.0] * max(len(self.test_run.test.agent_metrics), 1) def reset( self, @@ -100,7 +110,7 @@ def reset( if seed is not None: lazy.np.random.seed(seed) self.test_run.current_iteration = 0 - observation = [0.0] + observation = self.define_observation_space() info = {} return observation, info @@ -119,7 +129,9 @@ def step(self, action: Any) -> Tuple[list, float, bool, dict]: - info (dict): Additional info for debugging. """ self.test_run.increment_step() - self.test_run = self.test_run.apply_params_set(action) + # RNG lives in the env: sample here, then apply action + sample so the run and cache key see them. + sampled_env_params = self.params.sample(self.test_run.step) if self.params else {} + self.test_run = self.test_run.apply_params_set(action, env_params=sampled_env_params) cached_result = self.get_cached_trajectory_result(action) if cached_result is not None: @@ -134,6 +146,7 @@ def step(self, action: Any) -> Tuple[list, float, bool, dict]: action=action, reward=cached_result.reward, observation=cached_result.observation, + env_params=dict(self.test_run.current_env_params), ) ) return cached_result.observation, cached_result.reward, False, {} @@ -162,6 +175,9 @@ def step(self, action: Any) -> Tuple[list, float, bool, dict]: self.test_run.step = new_tr.step self.test_run.output_path = new_tr.output_path + # The test_run rebuild above drops the sample; restore it so the entry, cache key, and env.csv match. + self.test_run.current_env_params = new_tr.current_env_params + observation = self.get_observation(action) reward = self.compute_reward(observation) @@ -171,6 +187,7 @@ def step(self, action: Any) -> Tuple[list, float, bool, dict]: action=action, reward=reward, observation=observation, + env_params=dict(self.test_run.current_env_params), ) ) @@ -230,7 +247,14 @@ def get_observation(self, action: Any) -> list: return observation def write_trajectory(self, entry: TrajectoryEntry): - """Append the trajectory to the CSV file and to the local attribute.""" + """ + Append the entry to the in-memory cache and trajectory.csv (plus env.csv when declared). + + ``trajectory.csv`` and the ``env.csv`` projection are sunk from the same + ``TrajectoryEntry`` here, so a trial that never produces an entry (e.g. a + constraint failure returns before this call) lands in neither file and the + two stay 1:1 step-aligned. + """ self.current_trajectory.append(entry) file_exists = self.trajectory_file_path.exists() @@ -243,17 +267,36 @@ def write_trajectory(self, entry: TrajectoryEntry): writer.writerow(["step", "action", "reward", "observation"]) writer.writerow([entry.step, entry.action, entry.reward, entry.observation]) + self.env_params_sink.write(self.env_params_record_path, entry.step, entry.env_params) + + @property + def iteration_dir(self) -> Path: + """Per-iteration output dir; trajectory.csv and env.csv both live here, step-aligned.""" + return self.runner.scenario_root / self.test_run.name / f"{self.test_run.current_iteration}" + @property def trajectory_file_path(self) -> Path: - return self.runner.scenario_root / self.test_run.name / f"{self.test_run.current_iteration}" / "trajectory.csv" + return self.iteration_dir / "trajectory.csv" @property def current_trajectory(self) -> list[TrajectoryEntry]: return self.trajectory.setdefault(self.test_run.current_iteration, []) def get_cached_trajectory_result(self, action: Any) -> TrajectoryEntry | None: + """ + Return a cached entry only when the full trial identity matches. + + Trial identity is ``(action, env_params)``: env-randomized parameters + change the workload's behaviour, so a trial repeating the same action + under a different ``env_params`` sample must miss and re-run. Empty + env_params on both sides is the back-compat path for workloads that + do not declare any ``[env_params.*]`` block. + """ + current_env_params = self.test_run.current_env_params for entry in self.current_trajectory: - if self._values_match_exact(entry.action, action): + if not self._values_match_exact(entry.action, action): + continue + if self._values_match_exact(entry.env_params, current_env_params): return entry return None diff --git a/src/cloudai/configurator/env_params.py b/src/cloudai/configurator/env_params.py new file mode 100644 index 000000000..c1b7ee0d0 --- /dev/null +++ b/src/cloudai/configurator/env_params.py @@ -0,0 +1,242 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Per-trial environment-parameter primitives for CloudAI DSE. + +An env-randomized parameter is a workload knob whose candidate values live in +``cmd_args`` (a plain list - the single source of truth, exactly like an +action-space dimension) but which the environment *samples* per trial rather +than letting the agent search it. ``env_params`` is the annotation that marks +such a field; it carries only *how* to sample (optional ``weights``), never the +values, and the knob never enters the agent's action space. +""" + +from __future__ import annotations + +import csv +import dataclasses +import math +import random +from pathlib import Path +from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Protocol, runtime_checkable + +from pydantic import BaseModel, ConfigDict, Field, model_validator +from typing_extensions import Self + +from cloudai._core.exceptions import TestScenarioParsingError +from cloudai._core.registry import Registry + +if TYPE_CHECKING: + from cloudai._core.test_scenario import TestScenario + + +class EnvParamSpec(BaseModel): + """ + Annotation marking one cmd_args field as env-sampled. + + Carries only *how* to sample - the candidate values themselves live in + ``cmd_args.`` as a plain list. ``weights`` (optional) are positional, + aligned 1:1 with that candidate list; omit for uniform sampling. The + length match against the candidate list is a cross-field check enforced by + ``TestDefinition`` (which can see ``cmd_args``); here we validate only the + weights' intrinsic shape. + """ + + model_config = ConfigDict(extra="forbid") + + weights: Optional[List[float]] = Field( + default=None, + description="Optional probability weights aligned with the cmd_args candidate list; uniform if omitted.", + ) + + @model_validator(mode="after") + def _validate_weights(self) -> Self: + if self.weights is None: + return self + for w in self.weights: + if not math.isfinite(w) or w < 0: + raise ValueError(f"env_params weights must be finite and non-negative; got {w}") + total = sum(self.weights) + if abs(total - 1.0) > 1e-6: + raise ValueError(f"env_params weights must sum to 1.0; got {total}") + return self + + +class ObsLeafDescriptor(BaseModel): + """ + Description of one leaf of a structured (named) observation. + + A structured observation maps each observed name to a self-describing leaf + so adapters can build the matching subspace without guessing: a ``"box"`` + leaf becomes a continuous vector of width ``dim`` (e.g. a log-encoded + env_param as ``dim=2``); a ``"discrete"`` leaf becomes a categorical of + size ``n``. Stateless agents that consume the flat observation ignore this. + """ + + model_config = ConfigDict(extra="forbid") + + kind: Literal["box", "discrete"] + dim: int = 1 + n: Optional[int] = None + + @model_validator(mode="after") + def _validate(self) -> Self: + if self.dim < 1: + raise ValueError(f"ObsLeafDescriptor dim must be >= 1; got {self.dim}") + if self.kind == "discrete" and (self.n is None or self.n < 1): + raise ValueError(f"ObsLeafDescriptor(kind='discrete') requires n >= 1; got n={self.n}") + return self + + +@runtime_checkable +class StructuredObservation(Protocol): + """ + Optional env hooks that expose a structured (per-leaf) observation. + + An env opts in by returning per-leaf :class:`ObsLeafDescriptor` from + ``structured_observation_descriptors`` (``None`` keeps the flat-vector + path) and encoding a raw observation into the matching named leaves via + ``encode_observation``. ``GymnasiumAdapter`` consumes these to expose a + ``gymnasium.spaces.Dict`` observation; the hooks are duck-typed, so envs + need not subclass this Protocol. + """ + + def structured_observation_descriptors(self) -> Optional[Dict[str, ObsLeafDescriptor]]: ... + + def encode_observation(self, observation: list) -> Dict[str, Any]: ... + + +@dataclasses.dataclass(frozen=True) +class EnvParam: + """ + One env-sampled knob, resolved from cmd_args: its candidate values and optional weights. + + Weights (when present) are positional, aligned 1:1 with ``candidates``; ``None`` means + uniform sampling. Keeping the two together makes each knob a self-contained draw. + """ + + candidates: List[Any] + weights: Optional[List[float]] = None + + def draw(self, rng: random.Random) -> Any: + if self.weights is not None: + return rng.choices(self.candidates, weights=self.weights, k=1)[0] + return rng.choice(self.candidates) + + +@dataclasses.dataclass(frozen=True) +class EnvParams: + """ + Resolved env-parameter sampling state for one run. + + Built via ``from_test`` only when a workload actually declares list-valued env_params; + otherwise ``from_test`` returns ``None`` and the env carries no env-params state at all. + Owns the per-parameter :class:`EnvParam` draws (resolved from ``cmd_args``, the single source + of truth) and the seed, and draws one value per parameter per trial. + """ + + params: Dict[str, EnvParam] + seed: int + + @classmethod + def from_test(cls, test: Any) -> Optional["EnvParams"]: + """ + Resolve a TestDefinition's env_params annotations, or ``None`` if nothing is sampled. + + Annotated fields are guaranteed list-valued by ``TestDefinition`` parse-time validation + (a scalar annotation is rejected there), so the non-list guard below is defensive. With + no annotations declared there is nothing to sample and we return ``None`` so callers + stay on the zero-overhead path. + """ + params: Dict[str, EnvParam] = {} + for name, spec in test.env_params.items(): + value = getattr(test.cmd_args, name, None) + if not isinstance(value, list): + continue + params[name] = EnvParam(candidates=value, weights=spec.weights) + if not params: + return None + seed = int((test.agent_config or {}).get("random_seed", 0)) + return cls(params=params, seed=seed) + + def sample(self, trial: int) -> Dict[str, Any]: + """ + Draw this trial's value for each parameter. + + Determinism: the same ``(seed, name, trial)`` yields the same draw across processes. + Independence: each parameter's RNG is seeded ``f"{seed}:{name}:{trial}"`` so adding or + removing one parameter never perturbs the others' draw sequences. + """ + return {name: param.draw(random.Random(f"{self.seed}:{name}:{trial}")) for name, param in self.params.items()} + + +class EnvParamsSink: + """ + Append per-trial env_params samples to a step-aligned CSV. + + The CSV mirrors how ``trajectory.csv`` serialises its ``action`` column + (one row per env.step(), sample dict stringified in a single cell) so the + two files align 1:1 on ``step`` and a plain ``merge`` joins them. + + Empty samples are skipped, so a run without env_params writes nothing and + callers can sink every trial unconditionally. + """ + + def write(self, path: Path, step: int, sample: Dict[str, Any]) -> None: + if step < 1: + raise ValueError(f"step must be a positive trial index (cloudai DSE is 1-based); got {step}") + if not sample: + return + new_file = not path.exists() + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("a", newline="") as f: + writer = csv.writer(f) + if new_file: + writer.writerow(("step", "env")) + writer.writerow([step, sample]) + + +def validate_dse_env_params(test_scenario: "TestScenario") -> None: + """ + Reject prepped configs that declare env_params no agent will sample. + + env_params are sampled per-trial by CloudAIGymEnv, but the sampling only matters for an agent + that opts into it via ``BaseAgent.samples_env_params``. A non-DSE run has no per-trial loop, and + an agent that does not opt in ignores env_params, so declaring them there is a silent no-op. + is_dse_job and the agent both resolve only on the fully prepped config, so this is validated + here rather than at parse time. + """ + agents = Registry().agents_map + + offenders = [] + for tr in test_scenario.test_runs: + if not tr.test.env_params: + continue + + agent = agents.get(tr.test.agent) + # Unknown agent: defer to the dedicated agent-resolution error rather than masking it here. + sampled = tr.is_dse_job and (agent is None or agent.samples_env_params) + if not sampled: + offenders.append(tr.name) + + if offenders: + raise TestScenarioParsingError( + f"Tests {offenders} declare env_params but no agent will sample them. env_params are sampled " + "per-trial only by a DSE run on an agent that opts into env_params sampling. Add a sweep " + "(a list-valued cmd_args/extra_env_vars entry or num_nodes) and use such an agent, or remove " + "env_params." + ) diff --git a/src/cloudai/configurator/gymnasium_adapter.py b/src/cloudai/configurator/gymnasium_adapter.py new file mode 100644 index 000000000..a90a35f15 --- /dev/null +++ b/src/cloudai/configurator/gymnasium_adapter.py @@ -0,0 +1,246 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Gymnasium adapter for CloudAI ``BaseGym`` environments. + +Translates a CloudAI :class:`BaseGym` into the ``gymnasium.Env`` 5-tuple shape +that RLlib-based agents (e.g. PPO / DQN) and external training loops expect. +``gymnasium`` is an optional dependency (the ``[rl]`` extra), so it is imported +lazily and only required when an adapter is actually instantiated. + +Design invariant — adapter is a pure pass-through over ``test_run.step``. +The trial counter is owned by ``TestRun`` and advanced exclusively by +``CloudAIGymEnv.step()``. Adapters that wrote ``test_run.step`` themselves — +mirroring a Gym-protocol episode-local counter — collapsed every +contextual-bandit rollout onto ``step=1`` because RLlib calls ``reset()`` per +trial. This adapter never mutates ``test_run.step``; contract tests pin that +property. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Optional, cast + +from cloudai.configurator.base_gym import BaseGym +from cloudai.configurator.env_params import StructuredObservation +from cloudai.util.lazy_imports import lazy + +if TYPE_CHECKING: + from gymnasium import Env as _GymnasiumEnvBase +else: + try: # ``gymnasium`` is an optional [rl] dependency; fall back to ``object`` when absent. + from gymnasium import Env as _GymnasiumEnvBase + except ImportError: + _GymnasiumEnvBase = object + + +class GymnasiumAdapter(_GymnasiumEnvBase): + """ + Expose a CloudAI :class:`BaseGym` as a ``gymnasium.Env``-shaped object. + + The adapter: + + * Builds a ``gymnasium.spaces.Dict`` of ``Discrete`` action spaces over + the *tunable* parameters (those with more than one candidate value), + and injects the *fixed* parameters (single candidate) automatically on + every step so agents never see them. + * Converts observations to ``float32`` ``numpy`` arrays sized by + ``env.define_observation_space()``. + * Returns the gymnasium 5-tuple ``(obs, reward, terminated, truncated, info)`` + from :meth:`step` and :meth:`step_raw`. + + ``gymnasium`` and ``numpy`` are optional dependencies (the ``[rl]`` extra); + instantiating the adapter without them raises ``ImportError``. + """ + + # Overrides gymnasium.Env.metadata (a non-ClassVar instance attribute); matching that + # shape satisfies pyright's override check, so RUF012's ClassVar suggestion is silenced. + metadata: dict[str, Any] = {"render_modes": ["human"]} # noqa: RUF012 + + def __init__(self, env: BaseGym) -> None: + np = self._np = lazy.np + spaces = self._spaces = lazy.gymnasium.spaces + self._env = env + + raw_action_space = env.define_action_space() + + # Two classes of params from cloudai's param_space: + # list with len > 1 -> discrete tunable, mapped to gym.Discrete. + # list with len == 1 -> fixed (collapsed); injected on every step so + # agents never see them. + self._discrete_params: dict[str, list] = { + k: v for k, v in raw_action_space.items() if isinstance(v, list) and len(v) > 1 + } + self._fixed_params: dict[str, Any] = { + k: v[0] for k, v in raw_action_space.items() if isinstance(v, list) and len(v) == 1 + } + + action_space_components: dict[str, Any] = { + name: spaces.Discrete(len(values)) for name, values in self._discrete_params.items() + } + self.action_space = spaces.Dict(action_space_components) + + # Observation space: prefer the env's structured (per-leaf) spec so the + # policy sees named, individually-encoded leaves (e.g. a log-encoded + # env_param as Box(2)); RLlib connectors own normalize + flatten. Falls + # back to a flat Box for envs that only expose define_observation_space. + self._obs_descriptors: Optional[dict[str, Any]] = self._structured_obs_descriptors(env) + if self._obs_descriptors: + self.observation_space = spaces.Dict( + {name: self._descriptor_to_space(desc) for name, desc in self._obs_descriptors.items()} + ) + else: + obs_shape = (len(env.define_observation_space()),) + self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=obs_shape, dtype=np.float32) + + @staticmethod + def _structured_obs_descriptors(env: BaseGym) -> Optional[dict[str, Any]]: + """ + Return the env's per-leaf obs descriptors, or ``None`` for the flat-Box path. + + The env owns the opt-in decision via ``structured_observation_descriptors`` + (returns ``None`` unless an observed name is a declared env_param). Envs + without that hook keep the legacy flat-Box path. + """ + getter = getattr(env, "structured_observation_descriptors", None) + if getter is None or not hasattr(env, "encode_observation"): + return None + descriptors = getter() + return descriptors or None + + def _descriptor_to_space(self, descriptor: Any) -> Any: + """Map a framework-agnostic ``ObsLeafDescriptor`` to a gymnasium subspace.""" + if descriptor.kind == "discrete": + return self._spaces.Discrete(descriptor.n) + return self._spaces.Box(low=-self._np.inf, high=self._np.inf, shape=(descriptor.dim,), dtype=self._np.float32) + + @property + def cloudai_env(self) -> BaseGym: + """Return the wrapped CloudAI :class:`BaseGym` (gymnasium's ``unwrapped`` returns ``self``).""" + return self._env + + def decode_action(self, action: dict[str, Any]) -> dict[str, Any]: + """ + Map raw gym actions back to native parameter values. + + Discrete actions are list indices and resolve to the corresponding list + entry. + + Raises: + ValueError: if ``action`` is missing tunable params, contains + unknown keys, or carries an out-of-range discrete index. + """ + self._assert_keys(action.keys(), set(self._discrete_params), "action") + return {name: self._decode_discrete(name, raw) for name, raw in action.items()} + + def _decode_discrete(self, name: str, raw: Any) -> Any: + values = self._discrete_params[name] + idx = int(raw) + if not 0 <= idx < len(values): + raise ValueError(f"Action index out of range for '{name}': {idx} (expected 0..{len(values) - 1})") + return values[idx] + + def encode_action(self, values: dict[str, Any]) -> dict[str, Any]: + """ + Map native parameter values back to raw gym actions; inverse of :meth:`decode_action`. + + Discrete values resolve to their index in the candidate list. Together + with :meth:`decode_action` this is an invertible pair on native values: + ``decode_action(encode_action(v)) == v`` for any ``v`` drawn from the + tunable params. + + Consumers that need to express known native configs in the policy's + action space — e.g. warm-start / behavioral cloning from a recorded + trajectory — call this instead of reaching into the adapter internals. + + Raises: + ValueError: if ``values`` does not cover exactly the tunable params, + or carries a discrete value absent from its candidate list. + """ + self._assert_keys(values.keys(), set(self._discrete_params), "values") + return {name: self._encode_discrete(name, value) for name, value in values.items()} + + def _encode_discrete(self, name: str, value: Any) -> int: + values = self._discrete_params[name] + try: + return values.index(value) + except ValueError: + raise ValueError(f"Value {value!r} for '{name}' is not a candidate; expected one of {values}") from None + + def reset( + self, + *, + seed: Optional[int] = None, + options: Optional[dict[str, Any]] = None, + ) -> tuple[Any, dict[str, Any]]: + obs, info = self._env.reset(seed=seed, options=options) + return self._as_obs_array(obs), info + + def step(self, action: dict[str, Any]) -> tuple[Any, float, bool, bool, dict[str, Any]]: + params = {**self._fixed_params, **self.decode_action(action)} + return self._step_with_params(params) + + def step_raw(self, params: dict[str, Any]) -> tuple[Any, float, bool, bool, dict[str, Any]]: + """ + Step the env with an already-decoded parameter dict; bypasses index decoding. + + Raises: + ValueError: if ``params`` does not cover exactly the tunable + + fixed param keys. + """ + expected = set(self._discrete_params) | set(self._fixed_params) + self._assert_keys(params.keys(), expected, "raw params") + return self._step_with_params(params) + + def render(self) -> None: + self._env.render() + + @staticmethod + def _assert_keys(received: Any, expected: set[str], ctx: str) -> None: + received_set = set(received) + if received_set == expected: + return + missing = sorted(expected - received_set) + extra = sorted(received_set - expected) + raise ValueError(f"{ctx} keys mismatch; missing={missing}, extra={extra}") + + def _step_with_params(self, params: dict[str, Any]) -> tuple[Any, float, bool, bool, dict[str, Any]]: + obs, reward, done, info = self._env.step(params) + return self._as_obs_array(obs), float(reward), bool(done), False, info + + def _as_obs_array(self, obs: Any) -> Any: + """ + Convert a raw env observation into the policy-facing observation. + + Structured path: the flat raw obs (which feeds the env's reward and + ``trajectory.csv``) is encoded per-leaf by the env and returned as a + ``dict`` keyed to match ``observation_space`` (a ``spaces.Dict``). + Flat path: a single ``float32`` ``Box`` array (legacy behaviour). + """ + descriptors = self._obs_descriptors + if descriptors is None: + return self._np.asarray(obs, dtype=self._np.float32) + env = cast(StructuredObservation, self._env) + encoded = env.encode_observation(list(obs)) + self._assert_keys(encoded.keys(), set(descriptors), "encoded observation") + return {name: self._leaf_to_value(descriptors[name], encoded[name]) for name in descriptors} + + def _leaf_to_value(self, descriptor: Any, leaf: Any) -> Any: + """Coerce one encoded leaf to its gymnasium subspace dtype.""" + if descriptor.kind == "discrete": + return int(leaf) + return self._np.asarray(leaf, dtype=self._np.float32) diff --git a/src/cloudai/core.py b/src/cloudai/core.py index 752d24972..f45f0c3f9 100644 --- a/src/cloudai/core.py +++ b/src/cloudai/core.py @@ -51,7 +51,9 @@ from ._core.test_scenario import METRIC_ERROR, MetricErrorSentinel, MetricValue, TestDependency, TestRun, TestScenario from .configurator.base_agent import BaseAgent, BaseAgentConfig, RewardOverrides from .configurator.cloudai_gym import CloudAIGymEnv +from .configurator.env_params import ObsLeafDescriptor, StructuredObservation from .configurator.grid_search import GridSearchAgent +from .configurator.gymnasium_adapter import GymnasiumAdapter from .models.workload import CmdArgs, NsysConfiguration, PredictorConfig, TestDefinition from .parser import Parser from .reporter import PerTestReporter, StatusReporter, TarballReporter @@ -75,6 +77,7 @@ "Grader", "GradingStrategy", "GridSearchAgent", + "GymnasiumAdapter", "HFModel", "InstallStatusResult", "Installable", @@ -85,6 +88,7 @@ "MetricValue", "MissingTestError", "NsysConfiguration", + "ObsLeafDescriptor", "Parser", "PerTestReporter", "PredictorConfig", @@ -96,6 +100,7 @@ "RewardOverrides", "Runner", "StatusReporter", + "StructuredObservation", "System", "SystemConfigParsingError", "TarballReporter", diff --git a/src/cloudai/models/workload.py b/src/cloudai/models/workload.py index 8b981d8ea..c5156ccc6 100644 --- a/src/cloudai/models/workload.py +++ b/src/cloudai/models/workload.py @@ -23,6 +23,8 @@ from cloudai.core import GitRepo, Installable, JobStatusResult, PythonExecutable, Registry, System, TestRun +from ..configurator.env_params import EnvParamSpec + class CmdArgs(BaseModel): """Test command arguments.""" @@ -110,6 +112,14 @@ class TestDefinition(BaseModel, ABC): agent_metrics: list[str] = Field(default=["default"]) agent_reward_function: str = "inverse" agent_config: dict[str, Any] | None = Field(default=None, description="Agent configuration.") + env_params: dict[str, EnvParamSpec] = Field( + default_factory=dict, + description=( + "Environment parameters sampled by the env per trial. Sibling to " + "cmd_args; not part of the agent's action space. CloudAIGymEnv samples, " + "persists to env.csv, and includes them in the trajectory cache key." + ), + ) @property def cmd_args_dict(self) -> Dict[str, Union[str, List[str]]]: @@ -134,19 +144,27 @@ def installables(self) -> list[Installable]: def constraint_check(self, tr: TestRun, system: Optional[System]) -> bool: return True + def is_env_sampled(self, cmd_args_path: str) -> bool: + """Whether a cmd_args field is env-sampled (env draws it per trial, not the agent).""" + return cmd_args_path in self.env_params + @property def is_dse_job(self) -> bool: - def check_dict(d: dict, parent_key: str = "") -> bool: + def check_dict(d: dict, parent_key: str = "", skip_env_params: bool = False) -> bool: if isinstance(d, dict): for key, value in d.items(): path = f"{parent_key}.{key}" if parent_key else key if self.is_dse_excluded_arg(path): continue - if isinstance(value, list) or (isinstance(value, dict) and check_dict(value, path)): + if skip_env_params and self.is_env_sampled(path): + continue + if isinstance(value, list) or ( + isinstance(value, dict) and check_dict(value, path, skip_env_params) + ): return True return False - return check_dict(self.cmd_args_dict) or check_dict(self.extra_env_vars) + return check_dict(self.cmd_args_dict, skip_env_params=True) or check_dict(self.extra_env_vars) @field_validator("dse_excluded_args", mode="before") @classmethod @@ -191,3 +209,64 @@ def validate_agent_config(self) -> Self: agent_config_class = agent_class.get_config_class() agent_config_class.model_validate(self.agent_config) return self + + @model_validator(mode="after") + def validate_env_params(self) -> Self: + """ + Validate env_params annotations against cmd_args. + + ``env_params`` is an annotation: each key names a ``cmd_args`` field whose value is + the candidate set (the single source of truth), and the entry carries only *how* to + sample. So each key must name a real ``cmd_args`` field whose value is a candidate + list; a scalar is already fixed, so annotating it is a meaningless label and is + rejected here. When ``weights`` are declared, the list needs >= 2 values and the + weights must align 1:1 with it. Sampling, persistence, the per-trial cmd_args overlay, + and the cache key all + live in ``CloudAIGymEnv``; keeping this shape check in core lets the overlay stay + agent- and workload-agnostic rather than re-implemented per workload. + """ + if not self.env_params: + return self + + cmd_args_fields = getattr(type(self.cmd_args), "model_fields", None) + if not cmd_args_fields: + return self + + unknown = sorted(k for k in self.env_params if k not in cmd_args_fields) + if unknown: + raise ValueError(f"env_params keys {unknown} are not cmd_args fields on {type(self.cmd_args).__name__}") + + for name, spec in self.env_params.items(): + self._validate_env_param_field(name, spec, getattr(self.cmd_args, name, None)) + return self + + @staticmethod + def _validate_env_param_field(name: str, spec: Any, value: Any) -> None: + """Reject one env_params entry whose target cmd_args field is not a valid candidate list.""" + if isinstance(value, (dict, BaseModel)): + raise ValueError( + f"env_params['{name}'] must target a leaf cmd_args field (a candidate list), " + "not a structured object; param_space/is_dse_job exclude the whole key, which would " + "silently drop nested action dimensions" + ) + if not isinstance(value, list): + raise ValueError( + f"env_params['{name}'] annotates cmd_args.{name}, which is not a candidate list " + f"(got {type(value).__name__}); the annotation only reclassifies a list-valued sweep as " + f"env-sampled, while a scalar is already fixed. Make cmd_args.{name} a list or remove " + "the annotation" + ) + if not value: + raise ValueError( + f"env_params['{name}'] references an empty candidate list in cmd_args.{name}; " + "provide at least one candidate (the sampler would otherwise fail on an empty draw)" + ) + if spec.weights is None: + return + if len(value) < 2: + raise ValueError(f"env_params['{name}'] declares weights but cmd_args.{name} needs >= 2 candidate values") + if len(spec.weights) != len(value): + raise ValueError( + f"env_params['{name}'] weights length {len(spec.weights)} does not match " + f"cmd_args.{name} candidate count {len(value)}" + ) diff --git a/src/cloudai/util/lazy_imports.py b/src/cloudai/util/lazy_imports.py index 65ba62df3..def790dfc 100644 --- a/src/cloudai/util/lazy_imports.py +++ b/src/cloudai/util/lazy_imports.py @@ -1,5 +1,5 @@ # SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES -# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -27,6 +27,7 @@ import bokeh.palettes as bokeh_pallettes import bokeh.plotting as bokeh_plotting import bokeh.transform as bokeh_transform + import gymnasium import kubernetes as k8s import numpy as np import pandas as pd @@ -39,6 +40,7 @@ def __init__(self): self._np: ModuleType | None = None self._pd: ModuleType | None = None self._k8s: ModuleType | None = None + self._gymnasium: ModuleType | None = None self._bokeh: ModuleType | None = None self._bokeh_plotting: ModuleType | None = None self._bokeh_models: ModuleType | None = None @@ -75,6 +77,19 @@ def k8s(self) -> k8s: # type: ignore[no-any-return] return cast("k8s", self._k8s) + @property + def gymnasium(self) -> gymnasium: # type: ignore[no-any-return] + """Lazy import of gymnasium (optional ``cloudai[rl]`` extra).""" + if self._gymnasium is None: + try: + import gymnasium + except ImportError as exc: + raise ImportError( + "gymnasium is required for GymnasiumAdapter. Install it with: pip install 'cloudai[rl]'" + ) from exc + self._gymnasium = gymnasium + return cast("gymnasium", self._gymnasium) + @property def bokeh(self) -> bokeh: # type: ignore[no-any-return] """Lazy import of bokeh.""" diff --git a/tests/test_cloudaigym.py b/tests/test_cloudaigym.py index 4cd7c6fb8..177a39dd7 100644 --- a/tests/test_cloudaigym.py +++ b/tests/test_cloudaigym.py @@ -21,6 +21,7 @@ import pytest from cloudai.configurator import CloudAIGymEnv, GridSearchAgent, TrajectoryEntry +from cloudai.configurator.env_params import EnvParamSpec from cloudai.core import BaseRunner, RewardOverrides, Runner, TestRun, TestScenario from cloudai.systems.slurm import SlurmSystem from cloudai.util import flatten_dict @@ -33,6 +34,7 @@ ) from cloudai.workloads.nemo_run.report_generation_strategy import NeMoRunReportGenerationStrategy from cloudai.workloads.nixl_bench import NIXLBenchCmdArgs, NIXLBenchTestDefinition +from tests.test_env_params import EnvVarCmdArgs, EnvVarTestDefinition @pytest.fixture @@ -441,3 +443,404 @@ def test_cached_step_appends_trajectory_row(nemorun: NeMoRunTestDefinition, tmp_ contents = csv_path.read_text().strip().splitlines() assert contents[0] == "step,action,reward,observation" assert contents[-1].startswith("5,") + + +def _seed_cached_entry_with_env_params( + env: CloudAIGymEnv, action: dict[str, object], env_params: dict[str, object] +) -> None: + """Seed env.trajectory with one entry carrying the given env_params.""" + entry = TrajectoryEntry(step=1, action=action, reward=0.5, observation=[100.0], env_params=env_params) + env.test_run.current_iteration = 0 + env.trajectory = {0: [entry]} + + +def test_cache_miss_when_env_params_differ(base_tr: TestRun, tmp_path: Path) -> None: + """Cache MUST miss when env_params differ, even if action is identical. + + Without this property the agent receives stale rewards on every cache hit + when the env varies per trial: any agent silently trains on labels that + do not correspond to the env they were nominally generated under. + """ + runner = MagicMock() + runner.scenario_root = tmp_path / "scenario" + runner.system = MagicMock() + runner.test_scenario = MagicMock(test_runs=[]) + runner.jobs = {} + runner.testrun_to_job_map = {} + + env = CloudAIGymEnv(test_run=base_tr, runner=runner, rewards=RewardOverrides()) + _seed_cached_entry_with_env_params(env, {"x": 10}, env_params={"ball_speed": 1}) + + env.test_run.current_env_params = {"ball_speed": 2} + + assert env.get_cached_trajectory_result({"x": 10}) is None, ( + "Cache must include env_params in its key. Keying on action alone means " + "trials repeating the same action under a different env_params sample " + "receive a stale cached reward." + ) + + +def test_cache_hit_when_action_and_env_params_match(base_tr: TestRun, tmp_path: Path) -> None: + """Same action AND same env_params must still HIT the cache.""" + runner = MagicMock() + runner.scenario_root = tmp_path / "scenario" + runner.system = MagicMock() + runner.test_scenario = MagicMock(test_runs=[]) + runner.jobs = {} + runner.testrun_to_job_map = {} + + env = CloudAIGymEnv(test_run=base_tr, runner=runner, rewards=RewardOverrides()) + _seed_cached_entry_with_env_params(env, {"x": 10}, env_params={"ball_speed": 2}) + + env.test_run.current_env_params = {"ball_speed": 2} + + result = env.get_cached_trajectory_result({"x": 10}) + assert result is not None + assert result.step == 1 + + +def test_cache_hit_when_neither_has_env_params(base_tr: TestRun, tmp_path: Path) -> None: + """Workloads without env_params behave exactly as today (back-compat).""" + runner = MagicMock() + runner.scenario_root = tmp_path / "scenario" + runner.system = MagicMock() + runner.test_scenario = MagicMock(test_runs=[]) + runner.jobs = {} + runner.testrun_to_job_map = {} + + env = CloudAIGymEnv(test_run=base_tr, runner=runner, rewards=RewardOverrides()) + env.test_run.current_iteration = 0 + env.trajectory = {0: [TrajectoryEntry(step=1, action={"x": 10}, reward=0.5, observation=[100.0])]} + # Note: neither the cached entry nor test_run carries env_params -> existing behavior. + + result = env.get_cached_trajectory_result({"x": 10}) + assert result is not None + assert result.step == 1 + + +def test_step_reruns_workload_when_env_params_change(tmp_path: Path) -> None: + """Integration: two env.step() calls with the same action but different sampled env_params re-run. + + Counterpart to test_cache_miss_when_env_params_differ but exercising the + full step() flow: increment_step -> sample env_params -> apply_params_set -> + cache lookup -> runner.run() -> write_trajectory. With seed 42 the sampler + draws ball_speed=3 then ball_speed=1 on the two consecutive trials, so the + cache key differs and the workload must re-run both times. + """ + tdef = EnvVarTestDefinition( + name="dr", + description="dr", + test_template_name="dr_template", + cmd_args=EnvVarCmdArgs(ball_speed=[1, 2, 3]), + env_params={"ball_speed": EnvParamSpec()}, + agent_metrics=["default"], + agent_config={"random_seed": 42}, + ) + test_run = TestRun( + name="dr_tr", + test=tdef, + num_nodes=1, + nodes=[], + output_path=tmp_path / "out" / "dr_tr" / "0", + ) + test_scenario = TestScenario(name="dr_scenario", test_runs=[test_run]) + + runner = MagicMock(spec=BaseRunner) + runner.scenario_root = tmp_path / "scenario" + runner.system = MagicMock() + runner.test_scenario = test_scenario + runner.jobs = {} + runner.testrun_to_job_map = {} + runner.shutting_down = False + runner.get_job_output_path.return_value = test_run.output_path + + env = CloudAIGymEnv(test_run=test_run, runner=runner, rewards=RewardOverrides()) + action = {"paddle_width": 4} + fake_obs = iter([[100.0], [50.0]]) + + with patch.object(env, "get_observation", side_effect=lambda _action: next(fake_obs)): + env.test_run.step = 0 + obs1, _r1, *_ = env.step(action) # samples ball_speed=3 + obs2, _r2, *_ = env.step(action) # samples ball_speed=1 + + assert runner.run.call_count == 2, ( + "Different sampled env_params between two env.step() calls with the same action " + "must trigger a workload re-run; the cache lookup must miss." + ) + assert obs1 != obs2, "fresh workload run should produce a fresh observation" + + +def test_env_csv_is_step_aligned_with_trajectory(tmp_path: Path) -> None: + """env.csv must have exactly one row per env.step() call, with steps aligned 1:1 to trajectory.csv. + + This pins the corpus-friendly contract: a downstream consumer can + ``pd.merge(traj, env, on="step")`` without losing rows on either side, + independent of whether the trial hit the trajectory cache. + """ + tdef = EnvVarTestDefinition( + name="dr", + description="dr", + test_template_name="dr_template", + cmd_args=EnvVarCmdArgs(ball_speed=[1, 2, 3]), + env_params={"ball_speed": EnvParamSpec()}, + agent_metrics=["default"], + agent_config={"random_seed": 42}, + ) + + test_run = TestRun( + name="dr_tr", + test=tdef, + num_nodes=1, + nodes=[], + output_path=tmp_path / "out" / "dr_tr" / "0", + ) + test_scenario = TestScenario(name="dr_scenario", test_runs=[test_run]) + + runner = MagicMock(spec=BaseRunner) + runner.scenario_root = tmp_path / "scenario" + runner.system = MagicMock() + runner.test_scenario = test_scenario + runner.jobs, runner.testrun_to_job_map, runner.shutting_down = {}, {}, False + runner.get_job_output_path.return_value = test_run.output_path + + env = CloudAIGymEnv(test_run=test_run, runner=runner, rewards=RewardOverrides()) + action_a, action_b = {"paddle_width": 4}, {"paddle_width": 8} + fake_obs = iter([[100.0], [50.0], [25.0]]) + + with patch.object(env, "get_observation", side_effect=lambda _action: next(fake_obs)): + env.test_run.step = 0 + for action in (action_a, action_b, action_a): + env.step(action) + + env_csv = env.env_params_record_path + traj_csv = env.trajectory_file_path + assert env_csv.exists(), "env.csv must be written when env_params is declared" + + env_steps = [int(line.split(",", 1)[0]) for line in env_csv.read_text().strip().splitlines()[1:]] + traj_steps = [int(line.split(",", 1)[0]) for line in traj_csv.read_text().strip().splitlines()[1:]] + assert env_steps == traj_steps == [1, 2, 3], ( + f"step columns must align 1:1 across env.csv ({env_steps}) and trajectory.csv ({traj_steps})" + ) + + +def test_env_csv_step_alignment_holds_on_constraint_failure(tmp_path: Path) -> None: + """A constraint failure must not desync env.csv from trajectory.csv. + + Runs three steps where the middle one fails ``constraint_check`` and the + other two succeed. ``env.csv`` is sunk inside ``write_trajectory`` from the + same ``TrajectoryEntry``, which is never reached on the early-return + constraint-failure path - so the failed step lands in neither file. The + corpus-friendly contract (``pd.merge(traj, env, on="step")`` loses no rows) + therefore holds via shared absence: both files record exactly the surviving + steps, aligned 1:1. + """ + tdef = EnvVarTestDefinition( + name="dr", + description="dr", + test_template_name="dr_template", + cmd_args=EnvVarCmdArgs(ball_speed=[1, 2, 3]), + env_params={"ball_speed": EnvParamSpec()}, + agent_metrics=["default"], + agent_config={"random_seed": 42}, + ) + + test_run = TestRun( + name="dr_tr", + test=tdef, + num_nodes=1, + nodes=[], + output_path=tmp_path / "out" / "dr_tr" / "0", + ) + test_scenario = TestScenario(name="dr_scenario", test_runs=[test_run]) + + runner = MagicMock(spec=BaseRunner) + runner.scenario_root = tmp_path / "scenario" + runner.system = MagicMock() + runner.test_scenario = test_scenario + runner.jobs, runner.testrun_to_job_map, runner.shutting_down = {}, {}, False + runner.get_job_output_path.return_value = test_run.output_path + + env = CloudAIGymEnv(test_run=test_run, runner=runner, rewards=RewardOverrides()) + + # Step 2 fails the constraint; steps 1 and 3 survive. get_observation is only + # reached on the surviving steps, so it yields exactly two values. + fake_obs = iter([[100.0], [25.0]]) + with ( + patch.object(EnvVarTestDefinition, "constraint_check", side_effect=[True, False, True]), + patch.object(env, "get_observation", side_effect=lambda _action: next(fake_obs)), + ): + env.test_run.step = 0 + for action in ({"paddle_width": 4}, {"paddle_width": 6}, {"paddle_width": 8}): + env.step(action) + + env_csv = env.env_params_record_path + traj_csv = env.trajectory_file_path + + assert env_csv.exists(), "surviving steps declare env_params -> env.csv must exist" + env_steps = [int(line.split(",", 1)[0]) for line in env_csv.read_text().strip().splitlines()[1:]] + traj_steps = ( + [int(line.split(",", 1)[0]) for line in traj_csv.read_text().strip().splitlines()[1:]] + if traj_csv.exists() + else [] + ) + assert env_steps == traj_steps == [1, 3], ( + f"the constraint-failed step (2) must appear in neither file; env.csv ({env_steps}) " + f"and trajectory.csv ({traj_steps}) must stay 1:1 aligned on the surviving steps" + ) + + +def test_step_cache_hit_with_declared_env_params_still_writes_env_csv(tmp_path: Path) -> None: + """End-to-end: cache HIT under observer-driven env_params still records env.csv. + + A cache hit still calls ``write_trajectory``, which sinks the trajectory row + and the matching env.csv row from the same entry - keeping the two files + step-aligned even when the workload itself is short-circuited. + Asserts: (a) the workload is NOT re-run (cache short-circuit), (b) + env.csv gains a row, (c) trajectory.csv gains a row carrying the + sampled env_params. + """ + import random as _random + + tdef = EnvVarTestDefinition( + name="dr", + description="dr", + test_template_name="dr_template", + cmd_args=EnvVarCmdArgs(ball_speed=[1, 2, 3]), + env_params={"ball_speed": EnvParamSpec()}, + agent_metrics=["default"], + agent_config={"random_seed": 42}, + ) + + test_run = TestRun( + name="dr_tr", + test=tdef, + num_nodes=1, + nodes=[], + output_path=tmp_path / "out" / "dr_tr" / "0", + ) + runner = MagicMock(spec=BaseRunner) + runner.scenario_root = tmp_path / "scenario" + runner.system = MagicMock() + runner.test_scenario = TestScenario(name="dr_scenario", test_runs=[test_run]) + runner.jobs, runner.testrun_to_job_map, runner.shutting_down = {}, {}, False + runner.get_job_output_path.return_value = test_run.output_path + + env = CloudAIGymEnv(test_run=test_run, runner=runner, rewards=RewardOverrides()) + assert env.params is not None, "TestDefinition.env_params declared -> EnvParams must be built" + + expected_sample = {"ball_speed": _random.Random("42:ball_speed:1").choice([1, 2, 3])} + action = {"paddle_width": 4} + env.test_run.current_iteration = 0 + env.trajectory = { + 0: [TrajectoryEntry(step=0, action=action, reward=0.42, observation=[0.84], env_params=expected_sample)] + } + env.test_run.step = 0 + + with patch.object(env, "get_observation", side_effect=AssertionError("cache miss path must not run")): + obs, reward, _done, _info = env.step(action) + + runner.run.assert_not_called() + assert reward == 0.42 and obs == [0.84] + + env_csv = env.env_params_record_path + assert env_csv.exists(), "cache HIT must NOT skip the observer; env.csv must record the trial" + env_rows = env_csv.read_text().strip().splitlines() + assert env_rows[0] == "step,env" + assert env_rows[1].startswith("1,"), f"expected step 1 row in env.csv, got {env_rows[1]!r}" + + traj_rows = env.trajectory[0] + assert len(traj_rows) == 2 and traj_rows[-1].env_params == expected_sample, ( + "cache-hit trajectory entry must record the per-trial env_params sample" + ) + + +def test_step_overlays_env_params_onto_cmd_args(tmp_path: Path) -> None: + """The per-trial env_params sample must be overlaid onto cmd_args before the workload runs. + + env_params are the env-side twin of the action: core overlays the sampled values onto + cmd_args inside step() so the workload actually runs with them, workload-agnostically + (no per-workload injection code). Here ``ball_speed`` is env-randomized (its candidate + list lives in cmd_args), so the value the workload runs with must equal the observer's + sample, not the whole candidate list. + """ + import random as _random + + tdef = EnvVarTestDefinition( + name="overlay", + description="overlay", + test_template_name="dr_template", + cmd_args=EnvVarCmdArgs(ball_speed=[1, 2]), + env_params={"ball_speed": EnvParamSpec()}, + agent_metrics=["default"], + agent_config={"random_seed": 42}, + ) + + test_run = TestRun( + name="overlay_tr", + test=tdef, + num_nodes=1, + nodes=[], + output_path=tmp_path / "out" / "overlay_tr" / "0", + ) + runner = MagicMock(spec=BaseRunner) + runner.scenario_root = tmp_path / "scenario" + runner.system = MagicMock() + runner.test_scenario = TestScenario(name="overlay_scenario", test_runs=[test_run]) + runner.jobs, runner.testrun_to_job_map, runner.shutting_down = {}, {}, False + runner.get_job_output_path.return_value = test_run.output_path + + env = CloudAIGymEnv(test_run=test_run, runner=runner, rewards=RewardOverrides()) + expected = _random.Random("42:ball_speed:1").choice([1, 2]) + + with patch.object(env, "get_observation", side_effect=lambda _action: [1.0]): + env.test_run.step = 0 + env.step({"paddle_width": 4}) + + ran_cmd_args = runner.test_scenario.test_runs[0].test.cmd_args + assert ran_cmd_args.ball_speed == expected, ( + "step() must overlay the sampled env_params onto cmd_args before runner.run(), so the " + f"workload runs with ball_speed={expected} (the observer's draw), not the candidate list." + ) + + +def test_param_space_excludes_env_params_keys(setup_env: tuple[TestRun, BaseRunner]): + """env_params keys must never surface in the grid/action space (sampled, not searched).""" + tr, _ = setup_env + tr.test = EnvVarTestDefinition( + name="dr", + description="dr", + test_template_name="dr_template", + # paddle_width is an ordinary action-space list; ball_speed is an env-sampled list. + cmd_args=EnvVarCmdArgs(paddle_width=[4, 8], ball_speed=[1, 2]), + env_params={"ball_speed": EnvParamSpec()}, + ) + + action_space = tr.param_space + + assert "paddle_width" in action_space, "an un-annotated cmd_args list must remain an action-space dimension" + assert "ball_speed" not in action_space, ( + "a knob declared in env_params is sampled by the env, not explored by the agent, so it " + "must be excluded from param_space even though its cmd_args value is a list." + ) + + +def test_no_env_csv_when_env_params_not_declared(nemorun: NeMoRunTestDefinition, tmp_path: Path) -> None: + """Workloads without [env_params.*] pay zero overhead: no observer, no env.csv.""" + tdef = nemorun.model_copy(deep=True) + tdef.cmd_args.data.global_batch_size = 8 + test_run = TestRun( + name="plain_tr", + test=tdef, + num_nodes=1, + nodes=[], + output_path=tmp_path / "out" / "plain_tr" / "0", + reports={NeMoRunReportGenerationStrategy}, + ) + runner = MagicMock(spec=BaseRunner) + runner.scenario_root = tmp_path / "scenario" + runner.system = MagicMock() + + env = CloudAIGymEnv(test_run=test_run, runner=runner, rewards=RewardOverrides()) + + assert env.params is None, "no env_params declared -> no EnvParams object" + assert not env.env_params_record_path.exists() diff --git a/tests/test_env_params.py b/tests/test_env_params.py new file mode 100644 index 000000000..eef3feba0 --- /dev/null +++ b/tests/test_env_params.py @@ -0,0 +1,402 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for the environment-parameter primitives in cloudai.configurator.env_params. + +Annotation model: candidate values live in ``cmd_args`` (the single source of +truth); an ``env_params`` entry is a thin annotation that reclassifies that +field from action-space to env-sampled, optionally carrying sampling +``weights``. These tests use a dedicated, list-capable fixture workload so the +candidate list is a first-class typed field (no serialization warnings). + +Worked example - Atari Breakout, the canonical RL arcade game: +``ball_speed`` is the env-sampled knob - the game serves the ball at a speed the +agent does not control, so we sample it per trial and the policy must stay +robust across ball speeds. ``paddle_width`` is an ordinary action-space +dimension - the knob the agent actually tunes. +""" + +from __future__ import annotations + +import dataclasses +import random +from pathlib import Path +from typing import List, Union + +import pytest +from pydantic import BaseModel, ValidationError + +from cloudai.configurator.env_params import ( + EnvParam, + EnvParams, + EnvParamSpec, + EnvParamsSink, + ObsLeafDescriptor, +) +from cloudai.core import TestRun +from cloudai.models.workload import CmdArgs, TestDefinition + + +class BrickGrid(BaseModel): + """A structured (non-leaf) cmd_args field, used to prove env_params rejects such targets.""" + + rows: int = 3 + + +class EnvVarCmdArgs(CmdArgs): + """cmd_args with top-level, list-capable fields for env_params annotation tests. + + ``ball_speed`` is the env-randomized knob; ``paddle_width`` stands in for an + ordinary action-space dimension. Both accept either a scalar or a candidate + list, so a candidate list round-trips through model validation/serialization + cleanly. ``brick_grid`` is a structured field (not a leaf) for negative tests. + """ + + ball_speed: Union[int, List[int]] = 1 + paddle_width: Union[int, List[int]] = 8 + brick_grid: BrickGrid = BrickGrid() + + +class EnvVarTestDefinition(TestDefinition): + """Minimal concrete TestDefinition wrapping ``EnvVarCmdArgs``.""" + + cmd_args: EnvVarCmdArgs = EnvVarCmdArgs() + + +def _tdef(env_params: dict, **cmd_args_overrides) -> EnvVarTestDefinition: + """Build the fixture TestDefinition through full model validation (so validators fire).""" + return EnvVarTestDefinition( + name="breakout", + description="breakout", + test_template_name="breakout_template", + cmd_args=EnvVarCmdArgs(**cmd_args_overrides), + env_params=env_params, + ) + + +# --- EnvParamSpec: weights are validated intrinsically (length is cross-field, see below) --- + + +def test_env_param_spec_accepts_normalized_weights() -> None: + spec = EnvParamSpec(weights=[0.7, 0.3]) + assert spec.weights == [0.7, 0.3] + + +def test_env_param_spec_bare_marker_is_valid() -> None: + """An annotation with no weights is a valid uniform marker.""" + assert EnvParamSpec().weights is None + + +def test_env_param_spec_rejects_unnormalized_weights() -> None: + """Strict sum: weights must sum to ~1.0 (relative weights like [7, 3] are rejected).""" + with pytest.raises(ValidationError, match="must sum to"): + EnvParamSpec(weights=[7.0, 3.0]) + + +def test_env_param_spec_rejects_negative_weights() -> None: + with pytest.raises(ValidationError, match="finite and non-negative"): + EnvParamSpec(weights=[-0.1, 1.1]) + + +@pytest.mark.parametrize("bad", [float("inf"), float("nan"), float("-inf")]) +def test_env_param_spec_rejects_non_finite_weights(bad: float) -> None: + with pytest.raises(ValidationError, match="finite and non-negative"): + EnvParamSpec(weights=[bad, 1.0]) + + +def test_env_param_spec_rejects_unknown_fields() -> None: + """Candidate values live in cmd_args, never in the annotation (no ``values`` key).""" + with pytest.raises(ValidationError): + EnvParamSpec.model_validate({"values": [1, 2]}) + + +# --- EnvParam: one resolved knob - candidate values, optional weights, a single draw --- + + +def test_env_param_defaults_to_unweighted() -> None: + """A knob built without weights samples uniformly (weights is None).""" + assert EnvParam(candidates=[1, 2, 3]).weights is None + + +def test_env_param_draw_returns_a_candidate() -> None: + """draw() always yields one of the knob's own candidate values.""" + knob = EnvParam(candidates=[1, 2, 3]) + assert all(knob.draw(random.Random(s)) in {1, 2, 3} for s in range(50)) + + +def test_env_param_draw_is_reproducible_for_a_given_rng() -> None: + """draw() consumes the caller's RNG (no internal seeding), so equal RNG state yields equal draws.""" + knob = EnvParam(candidates=[10, 20, 30, 40]) + assert knob.draw(random.Random(123)) == knob.draw(random.Random(123)) + + +def test_env_param_draw_honors_degenerate_weights() -> None: + """A degenerate weight ([1, 0]) collapses the draw onto the first candidate, whatever the RNG.""" + knob = EnvParam(candidates=[1, 2], weights=[1.0, 0.0]) + assert all(knob.draw(random.Random(s)) == 1 for s in range(50)) + + +def test_env_param_is_immutable() -> None: + """Frozen value object: a resolved knob cannot be mutated after construction.""" + knob = EnvParam(candidates=[1, 2, 3]) + with pytest.raises(dataclasses.FrozenInstanceError): + knob.candidates = [9] # pyright: ignore[reportAttributeAccessIssue] + + +# --- EnvParams.sample: draws from candidate lists resolved out of cmd_args --- + + +def test_sampler_is_deterministic_across_calls() -> None: + env_params = EnvParams(params={"ball_speed": EnvParam(candidates=[1, 2, 3])}, seed=42) + seq_a = [env_params.sample(t) for t in range(1, 6)] + seq_b = [env_params.sample(t) for t in range(1, 6)] + assert seq_a == seq_b, "same (seed, trial) must produce the same draw across calls" + + +def test_sampler_each_param_is_independent() -> None: + """Adding an unrelated parameter must not perturb existing parameters' draws.""" + base = EnvParams(params={"ball_speed": EnvParam(candidates=[1, 2, 3])}, seed=7) + extended = EnvParams( + params={"ball_speed": EnvParam(candidates=[1, 2, 3]), "brick_rows": EnvParam(candidates=[3, 4, 5])}, + seed=7, + ) + a = [base.sample(t)["ball_speed"] for t in range(1, 11)] + b = [extended.sample(t)["ball_speed"] for t in range(1, 11)] + assert a == b, "per-parameter RNG seeding must isolate parameters from each other" + + +def test_sampler_honors_weights() -> None: + """A degenerate weight ([1, 0]) must always pick the first candidate.""" + env_params = EnvParams(params={"ball_speed": EnvParam(candidates=[1, 2], weights=[1.0, 0.0])}, seed=1) + assert all(env_params.sample(t)["ball_speed"] == 1 for t in range(1, 20)) + + +def test_sample_covers_all_declared_params() -> None: + """sample() returns exactly one value per declared knob, each drawn from that knob's candidates.""" + env_params = EnvParams( + params={"ball_speed": EnvParam(candidates=[1, 2]), "paddle_width": EnvParam(candidates=[4, 8])}, + seed=3, + ) + drawn = env_params.sample(5) + assert set(drawn) == {"ball_speed", "paddle_width"} + assert drawn["ball_speed"] in {1, 2} + assert drawn["paddle_width"] in {4, 8} + + +def test_env_params_is_immutable() -> None: + """Frozen value object: resolved sampling state cannot be mutated after construction.""" + env_params = EnvParams(params={"ball_speed": EnvParam(candidates=[1, 2])}, seed=0) + with pytest.raises(dataclasses.FrozenInstanceError): + env_params.seed = 1 # pyright: ignore[reportAttributeAccessIssue] + + +# --- EnvParamsSink: unchanged persistence contract --- + + +def test_csv_sink_skips_empty_samples_and_rejects_zero_step(tmp_path: Path) -> None: + sink = EnvParamsSink() + path = tmp_path / "env.csv" + sink.write(path, 1, {}) # empty -> no-op, no file + assert not path.exists() + with pytest.raises(ValueError, match="must be a positive trial index"): + sink.write(path, 0, {"ball_speed": 1}) + + +def test_csv_sink_writes_header_then_rows(tmp_path: Path) -> None: + sink = EnvParamsSink() + path = tmp_path / "env.csv" + sink.write(path, 1, {"ball_speed": 2}) + sink.write(path, 2, {"ball_speed": 3}) + contents = path.read_text().strip().splitlines() + assert contents[0] == "step,env" + assert contents[1].startswith("1,") + assert contents[2].startswith("2,") + + +# --- EnvParams.from_test: resolves candidate lists from cmd_args, once at env formulation --- + + +def test_env_params_from_test_resolves_list_candidate() -> None: + """A list-valued cmd_args field resolves to its candidate list; sampling then draws from it.""" + env_params = EnvParams.from_test(_tdef({"ball_speed": EnvParamSpec()}, ball_speed=[1, 2, 3])) + + assert env_params is not None + assert env_params.params == {"ball_speed": EnvParam(candidates=[1, 2, 3], weights=None)} + assert env_params.sample(3)["ball_speed"] in {1, 2, 3} + + +def test_env_params_from_test_carries_weights() -> None: + """Weighted annotations propagate their weights alongside the resolved candidates.""" + env_params = EnvParams.from_test(_tdef({"ball_speed": EnvParamSpec(weights=[0.7, 0.3])}, ball_speed=[1, 2])) + + assert env_params is not None + assert env_params.params["ball_speed"].weights == [0.7, 0.3] + + +def test_env_params_from_test_reads_seed_from_agent_config() -> None: + """The sampler seed comes from agent_config.random_seed so a run is reproducible end to end.""" + tdef = _tdef({"ball_speed": EnvParamSpec()}, ball_speed=[1, 2, 3]) + tdef.agent_config = {"random_seed": 42} + + env_params = EnvParams.from_test(tdef) + + assert env_params is not None and env_params.seed == 42 + + +def test_env_params_from_test_defaults_seed_to_zero_without_agent_config() -> None: + """No agent_config (the default) -> seed 0, so a declared-but-unseeded run still samples.""" + env_params = EnvParams.from_test(_tdef({"ball_speed": EnvParamSpec()}, ball_speed=[1, 2, 3])) + + assert env_params is not None and env_params.seed == 0 + + +def test_env_params_from_test_defaults_seed_to_zero_when_key_absent() -> None: + """agent_config present but without random_seed still falls back to seed 0.""" + tdef = _tdef({"ball_speed": EnvParamSpec()}, ball_speed=[1, 2, 3]) + tdef.agent_config = {"other": 1} + + env_params = EnvParams.from_test(tdef) + + assert env_params is not None and env_params.seed == 0 + + +def test_env_params_from_test_resolves_multiple_params() -> None: + """Every list-valued annotation becomes its own knob, candidates resolved from cmd_args.""" + env_params = EnvParams.from_test( + _tdef( + {"ball_speed": EnvParamSpec(), "paddle_width": EnvParamSpec()}, + ball_speed=[1, 2], + paddle_width=[4, 8], + ) + ) + + assert env_params is not None + assert set(env_params.params) == {"ball_speed", "paddle_width"} + assert env_params.params["ball_speed"].candidates == [1, 2] + assert env_params.params["paddle_width"].candidates == [4, 8] + + +def test_env_params_from_test_none_when_no_env_params() -> None: + """No annotations declared -> no EnvParams object (the zero-overhead path).""" + assert EnvParams.from_test(_tdef({})) is None + + +# --- TestDefinition.validate_env_params: annotation validity (cross-field with cmd_args) --- + + +def test_env_params_uniform_list_is_accepted() -> None: + """A cmd_args candidate list annotated with a bare marker validates (uniform sampling).""" + tdef = _tdef({"ball_speed": EnvParamSpec()}, ball_speed=[1, 2]) + assert "ball_speed" in tdef.env_params + + +def test_env_params_weighted_list_is_accepted() -> None: + tdef = _tdef({"ball_speed": EnvParamSpec(weights=[0.7, 0.3])}, ball_speed=[1, 2]) + assert tdef.env_params["ball_speed"].weights == [0.7, 0.3] + + +def test_env_params_scalar_annotation_rejected() -> None: + """A scalar (fixed) knob carries nothing to sample; the annotation is a meaningless label and is + rejected at parse time (it only reclassifies a list-valued sweep as env-sampled).""" + with pytest.raises(ValidationError, match="not a candidate list"): + _tdef({"ball_speed": EnvParamSpec()}, ball_speed=2) + + +def test_env_params_unknown_key_rejected() -> None: + """env_params keys must name real cmd_args fields (the overlay targets cmd_args).""" + with pytest.raises(ValidationError, match="not cmd_args fields"): + _tdef({"ghost": EnvParamSpec()}, ball_speed=[1, 2]) + + +def test_env_params_weights_on_scalar_rejected() -> None: + """Weights require a candidate list; a scalar knob cannot carry them.""" + with pytest.raises(ValidationError, match="not a candidate list"): + _tdef({"ball_speed": EnvParamSpec(weights=[0.7, 0.3])}, ball_speed=2) + + +def test_env_params_weights_length_mismatch_rejected() -> None: + """Weights must align 1:1 with the cmd_args candidate list.""" + with pytest.raises(ValidationError, match="weights length"): + _tdef({"ball_speed": EnvParamSpec(weights=[0.5, 0.3, 0.2])}, ball_speed=[1, 2]) + + +def test_env_params_structured_target_rejected() -> None: + """A structured (non-leaf) cmd_args target is rejected: the observer can't sample it, yet + param_space/is_dse_job exclude the whole key, silently dropping nested action dimensions.""" + with pytest.raises(ValidationError, match="must target a leaf cmd_args field"): + _tdef({"brick_grid": EnvParamSpec()}) + + +def test_env_params_empty_candidate_list_rejected() -> None: + """An empty candidate list is rejected at build time: an unweighted spec would otherwise skip + validation and let the sampler fail later on an empty draw (rng.choice([])).""" + with pytest.raises(ValidationError, match="empty candidate list"): + _tdef({"ball_speed": EnvParamSpec()}, ball_speed=[]) + + +# --- is_dse_job: env-sampled lists are not search dimensions --- + + +def test_is_dse_job_false_for_env_param_only_workload() -> None: + """A cmd_args list that is purely env-sampled is not a search dimension -> not a DSE job.""" + tdef = _tdef({"ball_speed": EnvParamSpec()}, ball_speed=[1, 2, 3]) + assert tdef.is_dse_job is False + + +def test_is_dse_job_true_when_a_real_action_dimension_exists() -> None: + """An un-annotated cmd_args list is a real action dimension -> DSE, even alongside env_params.""" + tdef = _tdef({"ball_speed": EnvParamSpec()}, ball_speed=[1, 2, 3], paddle_width=[4, 8]) + assert tdef.is_dse_job is True + + +def test_apply_params_set_accepts_weighted_env_param_draw() -> None: + """Regression: apply_params_set re-validates after the overlay; a weighted env_param's scalar + draw must not trip validate_env_params (which would reject 'weights but not a candidate list').""" + tdef = _tdef({"ball_speed": EnvParamSpec(weights=[0.7, 0.3])}, ball_speed=[1, 2]) + tr = TestRun(name="tr", test=tdef, num_nodes=1, nodes=[]) + + new_tr = tr.apply_params_set({}, env_params={"ball_speed": 1}) + + assert new_tr.test.cmd_args.ball_speed == 1 + assert new_tr.current_env_params == {"ball_speed": 1} + + +# --- ObsLeafDescriptor: structured-observation leaf schema --- + + +def test_obs_leaf_descriptor_box_defaults() -> None: + leaf = ObsLeafDescriptor(kind="box", dim=2) + assert leaf.kind == "box" + assert leaf.dim == 2 + assert leaf.n is None + + +def test_obs_leaf_descriptor_discrete_requires_n() -> None: + leaf = ObsLeafDescriptor(kind="discrete", dim=1, n=3) + assert leaf.n == 3 + with pytest.raises(ValidationError, match="requires n"): + ObsLeafDescriptor(kind="discrete", dim=1) + with pytest.raises(ValidationError, match="requires n"): + ObsLeafDescriptor(kind="discrete", dim=1, n=0) + + +def test_obs_leaf_descriptor_rejects_bad_dim_and_extra_fields() -> None: + with pytest.raises(ValidationError, match="dim must be"): + ObsLeafDescriptor(kind="box", dim=0) + with pytest.raises(ValidationError): + ObsLeafDescriptor(kind="box", dim=1, unexpected=1) # type: ignore + with pytest.raises(ValidationError): + ObsLeafDescriptor(kind="categorical", dim=1) # type: ignore diff --git a/tests/test_gymnasium_adapter_contract.py b/tests/test_gymnasium_adapter_contract.py new file mode 100644 index 000000000..a5683a7cc --- /dev/null +++ b/tests/test_gymnasium_adapter_contract.py @@ -0,0 +1,372 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Caller-contract tests for ``GymnasiumAdapter``. + +The single invariant every consumer assumes: + + ``test_run.step`` is a **monotonic trial index** across the entire run. + +Gym's ``reset()`` is an *episode boundary*, not a trial boundary. For the +contextual-bandit configs (``agent_steps=1``), RLlib calls ``reset()`` before +*every* trial. An earlier adapter rewound ``test_run.step`` on reset and +collapsed every trial onto step 1 — silently overwriting output dirs and +producing duplicate-step rows in trajectory.csv / env.csv. + +These tests pin the negative invariant: the adapter must not mutate +``test_run.step``. That counter is owned by ``TestRun`` and advanced +exclusively by ``CloudAIGymEnv.step()``. +""" + +from __future__ import annotations + +from types import SimpleNamespace +from typing import Any, Optional + +import pytest + +from cloudai.configurator.base_gym import BaseGym +from cloudai.configurator.env_params import ObsLeafDescriptor + +try: + import gymnasium # noqa: F401 + + _HAS_GYMNASIUM = True +except ImportError: + _HAS_GYMNASIUM = False + +pytestmark = pytest.mark.skipif(not _HAS_GYMNASIUM, reason="gymnasium not installed") + +from cloudai.configurator.gymnasium_adapter import GymnasiumAdapter # noqa: E402 + + +class _StubBaseGym(BaseGym): + """Minimal BaseGym with a ``test_run`` attribute mirroring CloudAIGymEnv.""" + + def __init__(self) -> None: + self._action_space: dict[str, Any] = {"param_a": [1, 2, 3], "param_b": [10, 20]} + self._observation_space: list[float] = [0.0, 0.0, 0.0] + self.test_run = SimpleNamespace(step=0) + super().__init__() + + def define_action_space(self) -> dict[str, Any]: + return self._action_space + + def define_observation_space(self) -> list: + return self._observation_space + + def reset( + self, + seed: Optional[int] = None, + options: Optional[dict[str, Any]] = None, + ) -> tuple[list, dict[str, Any]]: + return [0.0, 0.0, 0.0], {} + + def step(self, action: Any) -> tuple[list, float, bool, dict]: + self.test_run.step += 1 + return [1.0, 2.0, 3.0], 0.5, False, {} + + def render(self, mode: str = "human") -> None: + return None + + def seed(self, seed: Optional[int] = None) -> None: + pass + + +class TestStepIsMonotonicTrialIndex: + """``test_run.step`` is a trial index, not an episode-local counter.""" + + def test_step_advances_within_single_episode(self) -> None: + gym = _StubBaseGym() + adapter = GymnasiumAdapter(gym) + adapter.reset() + + seen: list[int] = [] + for _ in range(3): + adapter.step({"param_a": 0, "param_b": 0}) + seen.append(gym.test_run.step) + + assert seen == [1, 2, 3] + + def test_step_is_monotonic_across_episode_boundaries(self) -> None: + """The bug: ``reset()`` rewinds ``_step_count`` to 0, so the next + ``step()`` writes ``test_run.step = 1`` again. With contextual-bandit + RLlib (one step per episode) this means every trial reports step 1. + """ + gym = _StubBaseGym() + adapter = GymnasiumAdapter(gym) + + seen: list[int] = [] + for _ in range(5): + adapter.reset() + adapter.step({"param_a": 0, "param_b": 0}) + seen.append(gym.test_run.step) + + assert seen == [1, 2, 3, 4, 5], ( + f"test_run.step must be a monotonic trial index across episodes; got {seen}. " + "reset() is a Gym episode boundary, not a trial boundary; rewinding the " + "trial counter collapses every contextual-bandit rollout onto step 1." + ) + + def test_mixed_within_and_across_episode_steps_are_monotonic(self) -> None: + gym = _StubBaseGym() + adapter = GymnasiumAdapter(gym) + + seen: list[int] = [] + for episode_len in (2, 1, 3): + adapter.reset() + for _ in range(episode_len): + adapter.step({"param_a": 0, "param_b": 0}) + seen.append(gym.test_run.step) + + assert seen == [1, 2, 3, 4, 5, 6], ( + f"test_run.step must be a monotonic trial index regardless of episode shape; got {seen}" + ) + + +class _ContextualStubBaseGym(BaseGym): + """BaseGym stub that simulates CloudAIGymEnv's contextual-obs contract. + + ``reset()`` returns an observation with a per-trial context value in + slot 1 (mimicking how the upstream env writes a sampled env_param into + the obs vector built at the trial boundary). Each call to ``reset()`` + advances the simulated trial counter so we can assert the adapter + surfaces the *current* context, not a stale one. + """ + + def __init__(self, contexts: list[float]) -> None: + self._contexts = list(contexts) + self._action_space: dict[str, Any] = {"param_a": [1, 2, 3], "param_b": [10, 20]} + self.test_run = SimpleNamespace(step=0) + super().__init__() + + def define_action_space(self) -> dict[str, Any]: + return self._action_space + + def define_observation_space(self) -> list: + return [0.0, 0.0] + + def reset( + self, + seed: Optional[int] = None, + options: Optional[dict[str, Any]] = None, + ) -> tuple[list, dict[str, Any]]: + ctx = self._contexts[self.test_run.step] + self.test_run.step += 1 + return [0.0, ctx], {} + + def step(self, action: Any) -> tuple[list, float, bool, dict]: + ctx = self._contexts[self.test_run.step - 1] + return [42.0, ctx], 0.5, False, {} + + def render(self, mode: str = "human") -> None: + return None + + def seed(self, seed: Optional[int] = None) -> None: + pass + + +class TestAdapterPropagatesContextualObservation: + """The adapter must pass through env-built observations unchanged. + + With the contextual-bandit fix in cloudai, ``CloudAIGymEnv.reset()`` + samples env_params at the trial boundary and bakes them into the obs + vector before returning. RLlib's policy reads obs from + ``adapter.reset()``, so the adapter must propagate that vector verbatim + (modulo numpy-float32 casting). The same propagation invariant applies + on ``adapter.step()``. + """ + + def test_reset_propagates_context_into_observation(self) -> None: + contexts = [0.001, 0.0, 0.01, 0.001] + gym = _ContextualStubBaseGym(contexts) + adapter = GymnasiumAdapter(gym) + + seen: list[float] = [] + for _ in range(len(contexts)): + obs, _info = adapter.reset() + seen.append(float(obs[1])) + + assert seen == pytest.approx(contexts, rel=1e-5), ( + f"adapter.reset() must surface the trial's context value (slot 1) verbatim " + f"(modulo float32 cast); got {seen}, expected {contexts}" + ) + + def test_step_propagates_context_into_observation(self) -> None: + contexts = [0.0, 0.01, 0.001] + gym = _ContextualStubBaseGym(contexts) + adapter = GymnasiumAdapter(gym) + + for ctx in contexts: + adapter.reset() + obs, _r, _term, _trunc, _info = adapter.step({"param_a": 0, "param_b": 0}) + assert float(obs[0]) == pytest.approx(42.0, rel=1e-5), ( + f"adapter.step() must propagate the env's measured-metric slot; got {obs[0]}" + ) + assert float(obs[1]) == pytest.approx(ctx, rel=1e-5), ( + f"adapter.step() must propagate the trial's context value; got {obs[1]}, expected {ctx}" + ) + + +class TestEncodeDecodeAreInverse: + """``encode_action`` is the inverse of ``decode_action`` on native values. + + Consumers (e.g. RLlib warm-start / behavioral cloning) must be able to + express a recorded native config in the policy's action space without + reaching into adapter internals. The public pair guarantees + ``decode_action(encode_action(v)) == v`` for any native ``v``. + """ + + def test_discrete_round_trip_decode_of_encode_is_identity(self) -> None: + adapter = GymnasiumAdapter(_StubBaseGym()) + native = {"param_a": 3, "param_b": 10} + assert adapter.decode_action(adapter.encode_action(native)) == native + + def test_discrete_encode_of_decode_is_identity(self) -> None: + adapter = GymnasiumAdapter(_StubBaseGym()) + action = {"param_a": 2, "param_b": 1} + assert adapter.encode_action(adapter.decode_action(action)) == action + + def test_encode_maps_value_to_candidate_index(self) -> None: + adapter = GymnasiumAdapter(_StubBaseGym()) + assert adapter.encode_action({"param_a": 2, "param_b": 20}) == {"param_a": 1, "param_b": 1} + + def test_encode_rejects_non_candidate_value(self) -> None: + adapter = GymnasiumAdapter(_StubBaseGym()) + with pytest.raises(ValueError, match="not a candidate"): + adapter.encode_action({"param_a": 7, "param_b": 10}) + + def test_encode_rejects_key_mismatch(self) -> None: + adapter = GymnasiumAdapter(_StubBaseGym()) + with pytest.raises(ValueError, match="keys mismatch"): + adapter.encode_action({"param_a": 1}) # missing param_b + + +class _StructuredStubBaseGym(BaseGym): + """BaseGym stub that opts in/out of the structured (Dict) obs path. + + Mirrors ``CloudAIGymEnv``'s gate: ``structured_observation_descriptors`` + returns ``None`` for a metrics-only env (no observed name is a declared + env_param) and a per-leaf descriptor dict otherwise. ``encode_observation`` + produces the matching encoded leaves. + """ + + def __init__(self, descriptors: Optional[dict[str, ObsLeafDescriptor]], obs_dim: int = 2) -> None: + self._descriptors = descriptors + self._obs_dim = obs_dim + self._action_space: dict[str, Any] = {"param_a": [1, 2, 3]} + self.test_run = SimpleNamespace(step=0) + super().__init__() + + def define_action_space(self) -> dict[str, Any]: + return self._action_space + + def define_observation_space(self) -> list: + return [0.0] * self._obs_dim + + def structured_observation_descriptors(self) -> Optional[dict[str, ObsLeafDescriptor]]: + return self._descriptors + + def encode_observation(self, raw_values: list) -> dict[str, Any]: + out: dict[str, Any] = {} + assert self._descriptors is not None + for i, (name, desc) in enumerate(self._descriptors.items()): + raw = raw_values[i] if i < len(raw_values) else 0.0 + if desc.kind == "discrete": + out[name] = int(raw) + elif desc.dim == 2: + out[name] = [1.0 if raw <= 0.0 else 0.0, float(raw)] + else: + out[name] = [float(raw)] + return out + + def reset( + self, + seed: Optional[int] = None, + options: Optional[dict[str, Any]] = None, + ) -> tuple[list, dict[str, Any]]: + return [0.0] * self._obs_dim, {} + + def step(self, action: Any) -> tuple[list, float, bool, dict]: + return [0.0] * self._obs_dim, 0.5, False, {} + + def render(self, mode: str = "human") -> None: + return None + + def seed(self, seed: Optional[int] = None) -> None: + pass + + +class TestStructuredObsGate: + """D1: the structured (Dict) obs space is opt-in; metrics-only stays flat Box.""" + + def test_metrics_only_env_falls_back_to_box(self) -> None: + """An env that opts out (``structured_observation_descriptors`` -> None) keeps a flat Box. + + This is the blast-radius guard: non-DR workloads (BO/GA/MAB on plain + metrics) must NOT silently switch to a Dict obs layout. + """ + import gymnasium + + gym_env = _StructuredStubBaseGym(descriptors=None, obs_dim=3) + adapter = GymnasiumAdapter(gym_env) + + assert isinstance(adapter.observation_space, gymnasium.spaces.Box) + assert adapter.observation_space.shape == (3,) + + def test_env_param_env_uses_dict(self) -> None: + """An env with a declared env_param leaf exposes a ``spaces.Dict``.""" + import gymnasium + + descriptors = { + "bus_bw": ObsLeafDescriptor(kind="box", dim=1), + "drop_rate": ObsLeafDescriptor(kind="box", dim=2), + } + adapter = GymnasiumAdapter(_StructuredStubBaseGym(descriptors=descriptors)) + + assert isinstance(adapter.observation_space, gymnasium.spaces.Dict) + assert set(adapter.observation_space.spaces) == {"bus_bw", "drop_rate"} + assert adapter.observation_space.spaces["drop_rate"].shape == (2,) + + +class TestCategoricalLeafSubspace: + """D3: a categorical (discrete) descriptor maps to ``Discrete(k)`` and decodes to an int.""" + + def test_discrete_descriptor_becomes_discrete_space(self) -> None: + import gymnasium + + descriptors = { + "bus_bw": ObsLeafDescriptor(kind="box", dim=1), + "variant": ObsLeafDescriptor(kind="discrete", dim=1, n=3), + } + adapter = GymnasiumAdapter(_StructuredStubBaseGym(descriptors=descriptors)) + + observation_space = adapter.observation_space + assert isinstance(observation_space, gymnasium.spaces.Dict) + variant_space = observation_space.spaces["variant"] + assert isinstance(variant_space, gymnasium.spaces.Discrete) + assert int(variant_space.n) == 3 + + def test_discrete_leaf_emitted_as_int_index(self) -> None: + """The emitted obs for a discrete leaf is an ``int`` the Discrete space accepts.""" + descriptors = {"variant": ObsLeafDescriptor(kind="discrete", dim=1, n=3)} + gym_env = _StructuredStubBaseGym(descriptors=descriptors, obs_dim=1) + adapter = GymnasiumAdapter(gym_env) + + obs, _ = adapter.reset() + assert isinstance(obs["variant"], int) + assert adapter.observation_space.contains(obs) diff --git a/tests/test_handlers.py b/tests/test_handlers.py index bcf01978f..247548287 100644 --- a/tests/test_handlers.py +++ b/tests/test_handlers.py @@ -26,19 +26,23 @@ from cloudai.cli.handlers import ( handle_dse_job, + validate_dse_env_params, verify_system_configs, verify_test_configs, verify_test_scenarios, ) +from cloudai.configurator.env_params import EnvParamSpec from cloudai.core import ( BaseAgent, BaseAgentConfig, + Parser, Registry, RewardOverrides, Runner, TestDependency, TestRun, TestScenario, + TestScenarioParsingError, ) from cloudai.models.scenario import ReportConfig from cloudai.reporter import StatusReporter @@ -52,6 +56,7 @@ class StubAgentConfig(BaseAgentConfig): class StubAgent(BaseAgent): received_configs: ClassVar[list[StubAgentConfig]] = [] + samples_env_params: bool = True # stands in for an env-aware learning agent def __init__(self, env, config: StubAgentConfig): self.env = env @@ -427,3 +432,84 @@ def test_handle_dse_job_documents_failure_in_reports_before_raising( contents = failure_report.read_text() assert "RuntimeError" in contents assert "agent blew up" in contents + + +def test_validate_dse_env_params_rejects_non_dse(base_tr: TestRun) -> None: + base_tr.test.env_params = {"ball_speed": EnvParamSpec()} + scenario = TestScenario(name="s", test_runs=[base_tr]) + with pytest.raises(TestScenarioParsingError, match="no agent will sample them"): + validate_dse_env_params(scenario) + + +def test_validate_dse_env_params_rejects_grid_search(dse_tr: TestRun) -> None: + """A DSE job on grid_search exhaustively searches the space, so env_params are noise -> reject.""" + dse_tr.test.env_params = {"ball_speed": EnvParamSpec()} + assert dse_tr.is_dse_job is True # it IS a DSE job... + assert dse_tr.test.agent == "grid_search" # ...but grid_search does not sample env_params + with pytest.raises(TestScenarioParsingError, match="no agent will sample them"): + validate_dse_env_params(TestScenario(name="s", test_runs=[dse_tr])) + + +def test_validate_dse_env_params_rejects_non_sampling_agent( + dse_tr: TestRun, stub_agent_name: str, monkeypatch: pytest.MonkeyPatch +) -> None: + """The check keys on the agent capability, not the name: a non-grid agent that opts out is rejected too.""" + monkeypatch.setattr(StubAgent, "samples_env_params", False) + dse_tr.test.env_params = {"ball_speed": EnvParamSpec()} + dse_tr.test.agent = stub_agent_name + assert dse_tr.is_dse_job is True and dse_tr.test.agent != "grid_search" + with pytest.raises(TestScenarioParsingError, match="no agent will sample them"): + validate_dse_env_params(TestScenario(name="s", test_runs=[dse_tr])) + + +def test_validate_dse_env_params_defers_unknown_agent(dse_tr: TestRun) -> None: + """An unknown agent is not flagged here; it is deferred to the dedicated agent-resolution error.""" + dse_tr.test.env_params = {"ball_speed": EnvParamSpec()} + dse_tr.test.agent = "does_not_exist_agent" + assert dse_tr.is_dse_job is True + assert dse_tr.test.agent not in Registry().agents_map # precondition: agent is truly unknown + validate_dse_env_params(TestScenario(name="s", test_runs=[dse_tr])) # no exception == deferred + + +def test_validate_dse_env_params_allows_dse_run(dse_tr: TestRun, stub_agent_name: str) -> None: + dse_tr.test.env_params = {"ball_speed": EnvParamSpec()} + dse_tr.test.agent = stub_agent_name # an env-aware agent (samples_env_params=True) consumes env_params + assert dse_tr.is_dse_job is True # precondition: DSE + env-aware agent + env_params is allowed + validate_dse_env_params(TestScenario(name="s", test_runs=[dse_tr])) # no exception == pass + + +def test_validate_dse_env_params_allows_num_nodes_sweep(base_tr: TestRun, stub_agent_name: str) -> None: + base_tr.test.env_params = {"ball_speed": EnvParamSpec()} + base_tr.test.agent = stub_agent_name + base_tr.num_nodes = [1, 2] + assert base_tr.is_dse_job is True # a num_nodes list sweep makes it DSE, so env_params is allowed + validate_dse_env_params(TestScenario(name="s", test_runs=[base_tr])) # no exception == pass + + +def test_validate_dse_env_params_allows_non_dse_without_env_params(base_tr: TestRun) -> None: + assert base_tr.is_dse_job is False # precondition: not DSE, but also no env_params declared + assert not base_tr.test.env_params + validate_dse_env_params(TestScenario(name="s", test_runs=[base_tr])) # no exception == pass + + +def test_verify_test_scenarios_rejects_env_params_without_dse( + base_tr: TestRun, monkeypatch: pytest.MonkeyPatch +) -> None: + base_tr.test.env_params = {"ball_speed": EnvParamSpec()} + bad = TestScenario(name="s", test_runs=[base_tr]) + monkeypatch.setattr(Parser, "parse_tests", lambda *a, **k: []) + monkeypatch.setattr(Parser, "parse_hooks", lambda *a, **k: {}) + monkeypatch.setattr(Parser, "parse_test_scenario", lambda *a, **k: bad) + assert verify_test_scenarios([Path("dummy.toml")], [], [], []) == 1 + + +def test_verify_test_scenarios_allows_env_params_with_dse( + dse_tr: TestRun, stub_agent_name: str, monkeypatch: pytest.MonkeyPatch +) -> None: + dse_tr.test.env_params = {"ball_speed": EnvParamSpec()} + dse_tr.test.agent = stub_agent_name # learning agent (not grid_search) + good = TestScenario(name="s", test_runs=[dse_tr]) + monkeypatch.setattr(Parser, "parse_tests", lambda *a, **k: []) + monkeypatch.setattr(Parser, "parse_hooks", lambda *a, **k: {}) + monkeypatch.setattr(Parser, "parse_test_scenario", lambda *a, **k: good) + assert verify_test_scenarios([Path("dummy.toml")], [], [], []) == 0 diff --git a/uv.lock b/uv.lock index 7f8a19b64..5a8911dbd 100644 --- a/uv.lock +++ b/uv.lock @@ -282,6 +282,7 @@ dependencies = [ [package.optional-dependencies] dev = [ { name = "build" }, + { name = "gymnasium" }, { name = "import-linter" }, { name = "pandas-stubs" }, { name = "pre-commit" }, @@ -312,6 +313,9 @@ docs-cms = [ { name = "sphinx-rtd-theme" }, { name = "sphinxcontrib-mermaid" }, ] +rl = [ + { name = "gymnasium" }, +] [package.metadata] requires-dist = [ @@ -320,6 +324,8 @@ requires-dist = [ { name = "bokeh", specifier = "~=3.8" }, { name = "build", marker = "extra == 'dev'", specifier = "~=1.4" }, { name = "click", specifier = "~=8.3" }, + { name = "gymnasium", marker = "extra == 'dev'", specifier = "~=1.2" }, + { name = "gymnasium", marker = "extra == 'rl'", specifier = "~=1.2" }, { name = "huggingface-hub", specifier = "~=1.4" }, { name = "import-linter", marker = "extra == 'dev'", specifier = "~=2.10" }, { name = "jinja2", specifier = "~=3.1.6" }, @@ -350,7 +356,16 @@ requires-dist = [ { name = "vulture", marker = "extra == 'dev'", specifier = "==2.14" }, { name = "websockets", specifier = "~=16.0" }, ] -provides-extras = ["dev", "docs", "docs-cms"] +provides-extras = ["dev", "rl", "docs", "docs-cms"] + +[[package]] +name = "cloudpickle" +version = "3.1.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/27/fb/576f067976d320f5f0114a8d9fa1215425441bb35627b1993e5afd8111e5/cloudpickle-3.1.2.tar.gz", hash = "sha256:7fda9eb655c9c230dab534f1983763de5835249750e85fbcef43aaa30a9a2414", size = 22330, upload-time = "2025-11-03T09:25:26.604Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/88/39/799be3f2f0f38cc727ee3b4f1445fe6d5e4133064ec2e4115069418a5bb6/cloudpickle-3.1.2-py3-none-any.whl", hash = "sha256:9acb47f6afd73f60dc1df93bb801b472f05ff42fa6c84167d25cb206be1fbf4a", size = 22228, upload-time = "2025-11-03T09:25:25.534Z" }, +] [[package]] name = "colorama" @@ -674,6 +689,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/8a/0e/97c33bf5009bdbac74fd2beace167cab3f978feb69cc36f1ef79360d6c4e/exceptiongroup-1.3.1-py3-none-any.whl", hash = "sha256:a7a39a3bd276781e98394987d3a5701d0c4edffb633bb7a5144577f82c773598", size = 16740, upload-time = "2025-11-21T23:01:53.443Z" }, ] +[[package]] +name = "farama-notifications" +version = "0.0.6" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/ec/91/14397890dde30adc4bee6462158933806207bc5dd10d7b4d09d5c33845cf/farama_notifications-0.0.6.tar.gz", hash = "sha256:b19acac4bb41d76e59e03394b5dd165f4761c86fa327f56307a35cbee3b60158", size = 2517, upload-time = "2026-04-24T08:43:57.603Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/7c/f0/21f81892e4ed10f4ec3ef2e7cf8635fb76e7c0907c55d0da66be50094760/farama_notifications-0.0.6-py3-none-any.whl", hash = "sha256:f84839188efa1ce5bb361c2a84881b2dc2c0d0d7fb661ff00421820170930935", size = 2897, upload-time = "2026-04-24T08:43:56.785Z" }, +] + [[package]] name = "fastapi" version = "0.136.3" @@ -884,6 +908,21 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/48/b2/b096ccce418882fbfda4f7496f9357aaa9a5af1896a9a7f60d9f2b275a06/grpcio-1.78.0-cp314-cp314-win_amd64.whl", hash = "sha256:dce09d6116df20a96acfdbf85e4866258c3758180e8c49845d6ba8248b6d0bbb", size = 4929852, upload-time = "2026-02-06T09:56:45.885Z" }, ] +[[package]] +name = "gymnasium" +version = "1.3.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "cloudpickle" }, + { name = "farama-notifications" }, + { name = "numpy" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/4d/ff/14b6880d703dfaca204490979d3254ccd280c99550798993319902873658/gymnasium-1.3.0.tar.gz", hash = "sha256:6939e86e835d6b71b6ba6bfd360487420876deafc79bfb7bacba83a7c446bcf3", size = 830646, upload-time = "2026-04-22T13:47:14.155Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e9/73/fda6a25f3beeb5e49d74330b44092b9e5a547395ccd478d1103ddcbff1fc/gymnasium-1.3.0-py3-none-any.whl", hash = "sha256:6b8c159a8540dcbcb221722d7efda24d78ebbcbc3bd2ea1c2611aa2a34471fc2", size = 953904, upload-time = "2026-04-22T13:47:12.13Z" }, +] + [[package]] name = "h11" version = "0.16.0"