diff --git a/src/cloudai/_core/test_scenario.py b/src/cloudai/_core/test_scenario.py index 15efe8b2e..7c31406d0 100644 --- a/src/cloudai/_core/test_scenario.py +++ b/src/cloudai/_core/test_scenario.py @@ -97,6 +97,7 @@ class TestRun: reports: Set[Type[ReportGenerationStrategy]] = field(default_factory=set) extra_srun_args: str | None = None num_nodes_explicit: bool = False + current_env_params: dict[str, Any] = field(default_factory=dict) def __hash__(self) -> int: return hash(self.name + self.test.name + str(self.iterations) + str(self.current_iteration)) @@ -156,7 +157,9 @@ def param_space(self) -> dict[str, Any]: **{ key: value for key, value in cmd_args_dict.items() - if isinstance(value, list) and not self.test.is_dse_excluded_arg(key) + if isinstance(value, list) + and not self.test.is_dse_excluded_arg(key) + and not self.test.is_env_sampled(key) }, **{f"extra_env_vars.{key}": value for key, value in extra_env_vars_dict.items() if isinstance(value, list)}, } @@ -184,27 +187,40 @@ def all_combinations(self) -> list[dict[str, Any]]: return all_combinations - def apply_params_set(self, action: dict[str, Any]) -> "TestRun": + def apply_params_set(self, action: dict[str, Any], env_params: dict[str, Any] | None = None) -> "TestRun": tdef = self.test.model_copy(deep=True) - for key, value in action.items(): + + def _apply(key: str, value: Any) -> None: if key.startswith("extra_env_vars."): tdef.extra_env_vars[key[len("extra_env_vars.") :]] = value + return + attrs = key.split(".") + obj = tdef.cmd_args + for attr in attrs[:-1]: + obj = obj[attr] if isinstance(obj, dict) else getattr(obj, attr) + if isinstance(obj, dict): + obj[attrs[-1]] = value else: - attrs = key.split(".") - obj = tdef.cmd_args - for attr in attrs[:-1]: - obj = obj[attr] if isinstance(obj, dict) else getattr(obj, attr) - if isinstance(obj, dict): - obj[attrs[-1]] = value - else: - setattr(obj, attrs[-1], value) + setattr(obj, attrs[-1], value) + + # RNG runs in the env before this call; applying only concrete values keeps this deterministic. + for key, value in action.items(): + _apply(key, value) + for key, value in (env_params or {}).items(): + _apply(key, value) - type(tdef)(**tdef.model_dump()) # trigger validation + # env_params is validated at parse time; after the overlay its target cmd_args fields hold + # concrete scalar draws, so re-validating it here would reject weighted specs. Drop it for + # this validation-only pass, which exists to validate the applied action values. + validation_args = tdef.model_dump() + validation_args.pop("env_params", None) + type(tdef)(**validation_args) # trigger validation new_tr = copy.deepcopy(self) new_tr.test = tdef if "NUM_NODES" in action: new_tr.num_nodes = action["NUM_NODES"] + new_tr.current_env_params = dict(env_params or {}) return new_tr diff --git a/src/cloudai/cli/handlers.py b/src/cloudai/cli/handlers.py index 3fd099dc0..fb6f9b7d8 100644 --- a/src/cloudai/cli/handlers.py +++ b/src/cloudai/cli/handlers.py @@ -27,6 +27,7 @@ import toml import yaml +from cloudai.configurator.env_params import validate_dse_env_params from cloudai.core import ( BaseInstaller, CloudAIGymEnv, @@ -39,6 +40,7 @@ System, TestParser, TestScenario, + TestScenarioParsingError, ) from cloudai.models.scenario import ReportConfig from cloudai.models.workload import TestDefinition @@ -133,8 +135,7 @@ def handle_dse_job(runner: Runner, args: argparse.Namespace) -> int: return 1 err = 0 - # Recoverable failures return a non-zero rc and are accumulated here; an unexpected exception - # (a bug) is a hard-fail. We capture it so reports still generate, then re-raise below. + # Capture an unexpected error so reports still generate, then re-raise below. run_error: Exception | None = None try: for tr in runner.runner.test_scenario.test_runs: @@ -303,6 +304,12 @@ def handle_dry_run_and_run(args: argparse.Namespace) -> int: return 1 system, test_scenario, tests = setup_result + try: + validate_dse_env_params(test_scenario) + except TestScenarioParsingError as e: + logging.error(str(e)) + return 1 + if not _handle_single_sbatch(args, system): return 1 @@ -491,7 +498,8 @@ def verify_test_scenarios( tests = Parser.parse_tests(test_tomls, system) hook_tests = Parser.parse_tests(hook_test_tomls, system) hooks = Parser.parse_hooks(hook_tomls, system, {t.name: t for t in hook_tests}) - Parser.parse_test_scenario(scenario_file, system, {t.name: t for t in tests}, hooks) + scenario = Parser.parse_test_scenario(scenario_file, system, {t.name: t for t in tests}, hooks) + validate_dse_env_params(scenario) except Exception: nfailed += 1 diff --git a/src/cloudai/configurator/base_agent.py b/src/cloudai/configurator/base_agent.py index 321e6e659..5fe1059ef 100644 --- a/src/cloudai/configurator/base_agent.py +++ b/src/cloudai/configurator/base_agent.py @@ -58,6 +58,10 @@ class BaseAgent(ABC): Provides a unified interface and parameter management for action spaces. """ + # Opt-in: agents that consume per-trial env_params sampling set this True. Default False so + # env_params declared for a non-sampling agent are rejected rather than silently ignored. + samples_env_params: bool = False + def __init__(self, env: BaseGym, config: BaseAgentConfig): """ Initialize the agent with the environment. @@ -94,9 +98,8 @@ def select_action(self, observation: list[float] | None = None) -> tuple[int, di Args: observation: Latest observation produced by the environment (``env.reset()`` on the - first call, then the result of the prior ``env.step()``). Stateless agents such - as grid search or Bayesian optimization may ignore this; observation-conditioned - agents (RL, contextual bandits) should use it. + first call, then the result of the prior ``env.step()``). Stateless agents may + ignore this; observation-conditioned agents should use it. Returns: Tuple[int, Dict[str, Any]] | None: The current step index and a dictionary mapping action keys @@ -120,8 +123,7 @@ def run(self) -> int: Default: a step loop driven by the dispatcher (``select_action`` → ``env.step`` → ``update_policy`` per trial). Agents that drive their - own training loop (e.g. RLlib-based agents calling ``algo.train()``) - override this method. + own training loop override this method. Failure contract (``handle_dse_job`` consumes the result via ``err |= agent.run()``): @@ -131,7 +133,8 @@ def run(self) -> int: accumulated and the next ``TestRun`` still executes. Workload-level failures are already surfaced this way: ``CloudAIGymEnv.step`` maps a failed metric to ``rewards.metric_failure`` rather than raising, and - ``rllib_run`` catches training errors and returns ``rc=1``. + agents with their own training loop should likewise catch training + errors and return a non-zero code. - Raise for *unexpected* failures (framework/agent bugs). Exceptions propagate out of ``handle_dse_job`` and hard-fail the job so the bug is surfaced instead of masked as a penalizing reward. diff --git a/src/cloudai/configurator/cloudai_gym.py b/src/cloudai/configurator/cloudai_gym.py index 258a47f3c..0b4a3b0c0 100644 --- a/src/cloudai/configurator/cloudai_gym.py +++ b/src/cloudai/configurator/cloudai_gym.py @@ -26,6 +26,7 @@ from .base_agent import RewardOverrides from .base_gym import BaseGym +from .env_params import EnvParams, EnvParamsSink @dataclasses.dataclass(frozen=True) @@ -36,6 +37,7 @@ class TrajectoryEntry: action: dict[str, Any] reward: float observation: list + env_params: dict[str, Any] = dataclasses.field(default_factory=dict) class CloudAIGymEnv(BaseGym): @@ -61,8 +63,15 @@ def __init__(self, test_run: TestRun, runner: BaseRunner, rewards: RewardOverrid self.max_steps = test_run.test.agent_steps self.reward_function = Registry().get_reward_function(test_run.test.agent_reward_function) self.trajectory: dict[int, list[TrajectoryEntry]] = {} + self.params: EnvParams | None = EnvParams.from_test(test_run.test) + self.env_params_sink = EnvParamsSink() super().__init__() + @property + def env_params_record_path(self) -> Path: + """``env.csv`` lives alongside ``trajectory.csv`` so a plain ``merge`` joins them.""" + return self.iteration_dir / "env.csv" + def define_action_space(self) -> Dict[str, list[Any]]: return self.test_run.param_space @@ -119,7 +128,9 @@ def step(self, action: Any) -> Tuple[list, float, bool, dict]: - info (dict): Additional info for debugging. """ self.test_run.increment_step() - self.test_run = self.test_run.apply_params_set(action) + # RNG lives in the env: sample here, then apply action + sample so the run and cache key see them. + sampled_env_params = self.params.sample(self.test_run.step) if self.params else {} + self.test_run = self.test_run.apply_params_set(action, env_params=sampled_env_params) cached_result = self.get_cached_trajectory_result(action) if cached_result is not None: @@ -134,6 +145,7 @@ def step(self, action: Any) -> Tuple[list, float, bool, dict]: action=action, reward=cached_result.reward, observation=cached_result.observation, + env_params=dict(self.test_run.current_env_params), ) ) return cached_result.observation, cached_result.reward, False, {} @@ -162,6 +174,9 @@ def step(self, action: Any) -> Tuple[list, float, bool, dict]: self.test_run.step = new_tr.step self.test_run.output_path = new_tr.output_path + # The test_run rebuild above drops the sample; restore it so the entry, cache key, and env.csv match. + self.test_run.current_env_params = new_tr.current_env_params + observation = self.get_observation(action) reward = self.compute_reward(observation) @@ -171,6 +186,7 @@ def step(self, action: Any) -> Tuple[list, float, bool, dict]: action=action, reward=reward, observation=observation, + env_params=dict(self.test_run.current_env_params), ) ) @@ -230,7 +246,14 @@ def get_observation(self, action: Any) -> list: return observation def write_trajectory(self, entry: TrajectoryEntry): - """Append the trajectory to the CSV file and to the local attribute.""" + """ + Append the entry to the in-memory cache and trajectory.csv (plus env.csv when declared). + + ``trajectory.csv`` and the ``env.csv`` projection are sunk from the same + ``TrajectoryEntry`` here, so a trial that never produces an entry (e.g. a + constraint failure returns before this call) lands in neither file and the + two stay 1:1 step-aligned. + """ self.current_trajectory.append(entry) file_exists = self.trajectory_file_path.exists() @@ -243,17 +266,36 @@ def write_trajectory(self, entry: TrajectoryEntry): writer.writerow(["step", "action", "reward", "observation"]) writer.writerow([entry.step, entry.action, entry.reward, entry.observation]) + self.env_params_sink.write(self.env_params_record_path, entry.step, entry.env_params) + + @property + def iteration_dir(self) -> Path: + """Per-iteration output dir; trajectory.csv and env.csv both live here, step-aligned.""" + return self.runner.scenario_root / self.test_run.name / f"{self.test_run.current_iteration}" + @property def trajectory_file_path(self) -> Path: - return self.runner.scenario_root / self.test_run.name / f"{self.test_run.current_iteration}" / "trajectory.csv" + return self.iteration_dir / "trajectory.csv" @property def current_trajectory(self) -> list[TrajectoryEntry]: return self.trajectory.setdefault(self.test_run.current_iteration, []) def get_cached_trajectory_result(self, action: Any) -> TrajectoryEntry | None: + """ + Return a cached entry only when the full trial identity matches. + + Trial identity is ``(action, env_params)``: env-randomized parameters + change the workload's behaviour, so a trial repeating the same action + under a different ``env_params`` sample must miss and re-run. Empty + env_params on both sides is the back-compat path for workloads that + do not declare any ``[env_params.*]`` block. + """ + current_env_params = self.test_run.current_env_params for entry in self.current_trajectory: - if self._values_match_exact(entry.action, action): + if not self._values_match_exact(entry.action, action): + continue + if self._values_match_exact(entry.env_params, current_env_params): return entry return None diff --git a/src/cloudai/configurator/env_params.py b/src/cloudai/configurator/env_params.py new file mode 100644 index 000000000..6d98c6903 --- /dev/null +++ b/src/cloudai/configurator/env_params.py @@ -0,0 +1,198 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Per-trial environment-parameter primitives for CloudAI DSE. + +An env-randomized parameter is a workload knob whose candidate values live in +``cmd_args`` (a plain list - the single source of truth, exactly like an +action-space dimension) but which the environment *samples* per trial rather +than letting the agent search it. ``env_params`` is the annotation that marks +such a field; it carries only *how* to sample (optional ``weights``), never the +values, and the knob never enters the agent's action space. +""" + +from __future__ import annotations + +import csv +import dataclasses +import math +import random +from pathlib import Path +from typing import TYPE_CHECKING, Any, Dict, List, Optional + +from pydantic import BaseModel, ConfigDict, Field, model_validator +from typing_extensions import Self + +from cloudai._core.exceptions import TestScenarioParsingError +from cloudai._core.registry import Registry + +if TYPE_CHECKING: + from cloudai._core.test_scenario import TestScenario + + +class EnvParamSpec(BaseModel): + """ + Annotation marking one cmd_args field as env-sampled. + + Carries only *how* to sample - the candidate values themselves live in + ``cmd_args.`` as a plain list. ``weights`` (optional) are positional, + aligned 1:1 with that candidate list; omit for uniform sampling. The + length match against the candidate list is a cross-field check enforced by + ``TestDefinition`` (which can see ``cmd_args``); here we validate only the + weights' intrinsic shape. + """ + + model_config = ConfigDict(extra="forbid") + + weights: Optional[List[float]] = Field( + default=None, + description="Optional probability weights aligned with the cmd_args candidate list; uniform if omitted.", + ) + + @model_validator(mode="after") + def _validate_weights(self) -> Self: + if self.weights is None: + return self + for w in self.weights: + if not math.isfinite(w) or w < 0: + raise ValueError(f"env_params weights must be finite and non-negative; got {w}") + total = sum(self.weights) + if abs(total - 1.0) > 1e-6: + raise ValueError(f"env_params weights must sum to 1.0; got {total}") + return self + + +@dataclasses.dataclass(frozen=True) +class EnvParam: + """ + One env-sampled knob, resolved from cmd_args: its candidate values and optional weights. + + Weights (when present) are positional, aligned 1:1 with ``candidates``; ``None`` means + uniform sampling. Keeping the two together makes each knob a self-contained draw. + """ + + candidates: List[Any] + weights: Optional[List[float]] = None + + def draw(self, rng: random.Random) -> Any: + if self.weights is not None: + return rng.choices(self.candidates, weights=self.weights, k=1)[0] + return rng.choice(self.candidates) + + +@dataclasses.dataclass(frozen=True) +class EnvParams: + """ + Resolved env-parameter sampling state for one run. + + Built via ``from_test`` only when a workload actually declares list-valued env_params; + otherwise ``from_test`` returns ``None`` and the env carries no env-params state at all. + Owns the per-parameter :class:`EnvParam` draws (resolved from ``cmd_args``, the single source + of truth) and the seed, and draws one value per parameter per trial. + """ + + params: Dict[str, EnvParam] + seed: int + + @classmethod + def from_test(cls, test: Any) -> Optional["EnvParams"]: + """ + Resolve a TestDefinition's env_params annotations, or ``None`` if nothing is sampled. + + Annotated fields are guaranteed list-valued by ``TestDefinition`` parse-time validation + (a scalar annotation is rejected there), so the non-list guard below is defensive. With + no annotations declared there is nothing to sample and we return ``None`` so callers + stay on the zero-overhead path. + """ + params: Dict[str, EnvParam] = {} + for name, spec in test.env_params.items(): + value = getattr(test.cmd_args, name, None) + if not isinstance(value, list): + continue + params[name] = EnvParam(candidates=value, weights=spec.weights) + if not params: + return None + seed = int((test.agent_config or {}).get("random_seed", 0)) + return cls(params=params, seed=seed) + + def sample(self, trial: int) -> Dict[str, Any]: + """ + Draw this trial's value for each parameter. + + Determinism: the same ``(seed, name, trial)`` yields the same draw across processes. + Independence: each parameter's RNG is seeded ``f"{seed}:{name}:{trial}"`` so adding or + removing one parameter never perturbs the others' draw sequences. + """ + return {name: param.draw(random.Random(f"{self.seed}:{name}:{trial}")) for name, param in self.params.items()} + + +class EnvParamsSink: + """ + Append per-trial env_params samples to a step-aligned CSV. + + The CSV mirrors how ``trajectory.csv`` serialises its ``action`` column + (one row per env.step(), sample dict stringified in a single cell) so the + two files align 1:1 on ``step`` and a plain ``merge`` joins them. + + Empty samples are skipped, so a run without env_params writes nothing and + callers can sink every trial unconditionally. + """ + + def write(self, path: Path, step: int, sample: Dict[str, Any]) -> None: + if step < 1: + raise ValueError(f"step must be a positive trial index (cloudai DSE is 1-based); got {step}") + if not sample: + return + new_file = not path.exists() + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("a", newline="") as f: + writer = csv.writer(f) + if new_file: + writer.writerow(("step", "env")) + writer.writerow([step, sample]) + + +def validate_dse_env_params(test_scenario: "TestScenario") -> None: + """ + Reject prepped configs that declare env_params no agent will sample. + + env_params are sampled per-trial by CloudAIGymEnv, but the sampling only matters for an agent + that opts into it via ``BaseAgent.samples_env_params``. A non-DSE run has no per-trial loop, and + an agent that does not opt in ignores env_params, so declaring them there is a silent no-op. + is_dse_job and the agent both resolve only on the fully prepped config, so this is validated + here rather than at parse time. + """ + agents = Registry().agents_map + + offenders = [] + for tr in test_scenario.test_runs: + if not tr.test.env_params: + continue + + agent = agents.get(tr.test.agent) + # Unknown agent: defer to the dedicated agent-resolution error rather than masking it here. + sampled = tr.is_dse_job and (agent is None or agent.samples_env_params) + if not sampled: + offenders.append(tr.name) + + if offenders: + raise TestScenarioParsingError( + f"Tests {offenders} declare env_params but no agent will sample them. env_params are sampled " + "per-trial only by a DSE run on an agent that opts into env_params sampling. Add a sweep " + "(a list-valued cmd_args/extra_env_vars entry or num_nodes) and use such an agent, or remove " + "env_params." + ) diff --git a/src/cloudai/models/workload.py b/src/cloudai/models/workload.py index 8b981d8ea..c5156ccc6 100644 --- a/src/cloudai/models/workload.py +++ b/src/cloudai/models/workload.py @@ -23,6 +23,8 @@ from cloudai.core import GitRepo, Installable, JobStatusResult, PythonExecutable, Registry, System, TestRun +from ..configurator.env_params import EnvParamSpec + class CmdArgs(BaseModel): """Test command arguments.""" @@ -110,6 +112,14 @@ class TestDefinition(BaseModel, ABC): agent_metrics: list[str] = Field(default=["default"]) agent_reward_function: str = "inverse" agent_config: dict[str, Any] | None = Field(default=None, description="Agent configuration.") + env_params: dict[str, EnvParamSpec] = Field( + default_factory=dict, + description=( + "Environment parameters sampled by the env per trial. Sibling to " + "cmd_args; not part of the agent's action space. CloudAIGymEnv samples, " + "persists to env.csv, and includes them in the trajectory cache key." + ), + ) @property def cmd_args_dict(self) -> Dict[str, Union[str, List[str]]]: @@ -134,19 +144,27 @@ def installables(self) -> list[Installable]: def constraint_check(self, tr: TestRun, system: Optional[System]) -> bool: return True + def is_env_sampled(self, cmd_args_path: str) -> bool: + """Whether a cmd_args field is env-sampled (env draws it per trial, not the agent).""" + return cmd_args_path in self.env_params + @property def is_dse_job(self) -> bool: - def check_dict(d: dict, parent_key: str = "") -> bool: + def check_dict(d: dict, parent_key: str = "", skip_env_params: bool = False) -> bool: if isinstance(d, dict): for key, value in d.items(): path = f"{parent_key}.{key}" if parent_key else key if self.is_dse_excluded_arg(path): continue - if isinstance(value, list) or (isinstance(value, dict) and check_dict(value, path)): + if skip_env_params and self.is_env_sampled(path): + continue + if isinstance(value, list) or ( + isinstance(value, dict) and check_dict(value, path, skip_env_params) + ): return True return False - return check_dict(self.cmd_args_dict) or check_dict(self.extra_env_vars) + return check_dict(self.cmd_args_dict, skip_env_params=True) or check_dict(self.extra_env_vars) @field_validator("dse_excluded_args", mode="before") @classmethod @@ -191,3 +209,64 @@ def validate_agent_config(self) -> Self: agent_config_class = agent_class.get_config_class() agent_config_class.model_validate(self.agent_config) return self + + @model_validator(mode="after") + def validate_env_params(self) -> Self: + """ + Validate env_params annotations against cmd_args. + + ``env_params`` is an annotation: each key names a ``cmd_args`` field whose value is + the candidate set (the single source of truth), and the entry carries only *how* to + sample. So each key must name a real ``cmd_args`` field whose value is a candidate + list; a scalar is already fixed, so annotating it is a meaningless label and is + rejected here. When ``weights`` are declared, the list needs >= 2 values and the + weights must align 1:1 with it. Sampling, persistence, the per-trial cmd_args overlay, + and the cache key all + live in ``CloudAIGymEnv``; keeping this shape check in core lets the overlay stay + agent- and workload-agnostic rather than re-implemented per workload. + """ + if not self.env_params: + return self + + cmd_args_fields = getattr(type(self.cmd_args), "model_fields", None) + if not cmd_args_fields: + return self + + unknown = sorted(k for k in self.env_params if k not in cmd_args_fields) + if unknown: + raise ValueError(f"env_params keys {unknown} are not cmd_args fields on {type(self.cmd_args).__name__}") + + for name, spec in self.env_params.items(): + self._validate_env_param_field(name, spec, getattr(self.cmd_args, name, None)) + return self + + @staticmethod + def _validate_env_param_field(name: str, spec: Any, value: Any) -> None: + """Reject one env_params entry whose target cmd_args field is not a valid candidate list.""" + if isinstance(value, (dict, BaseModel)): + raise ValueError( + f"env_params['{name}'] must target a leaf cmd_args field (a candidate list), " + "not a structured object; param_space/is_dse_job exclude the whole key, which would " + "silently drop nested action dimensions" + ) + if not isinstance(value, list): + raise ValueError( + f"env_params['{name}'] annotates cmd_args.{name}, which is not a candidate list " + f"(got {type(value).__name__}); the annotation only reclassifies a list-valued sweep as " + f"env-sampled, while a scalar is already fixed. Make cmd_args.{name} a list or remove " + "the annotation" + ) + if not value: + raise ValueError( + f"env_params['{name}'] references an empty candidate list in cmd_args.{name}; " + "provide at least one candidate (the sampler would otherwise fail on an empty draw)" + ) + if spec.weights is None: + return + if len(value) < 2: + raise ValueError(f"env_params['{name}'] declares weights but cmd_args.{name} needs >= 2 candidate values") + if len(spec.weights) != len(value): + raise ValueError( + f"env_params['{name}'] weights length {len(spec.weights)} does not match " + f"cmd_args.{name} candidate count {len(value)}" + ) diff --git a/tests/test_cloudaigym.py b/tests/test_cloudaigym.py index 4cd7c6fb8..177a39dd7 100644 --- a/tests/test_cloudaigym.py +++ b/tests/test_cloudaigym.py @@ -21,6 +21,7 @@ import pytest from cloudai.configurator import CloudAIGymEnv, GridSearchAgent, TrajectoryEntry +from cloudai.configurator.env_params import EnvParamSpec from cloudai.core import BaseRunner, RewardOverrides, Runner, TestRun, TestScenario from cloudai.systems.slurm import SlurmSystem from cloudai.util import flatten_dict @@ -33,6 +34,7 @@ ) from cloudai.workloads.nemo_run.report_generation_strategy import NeMoRunReportGenerationStrategy from cloudai.workloads.nixl_bench import NIXLBenchCmdArgs, NIXLBenchTestDefinition +from tests.test_env_params import EnvVarCmdArgs, EnvVarTestDefinition @pytest.fixture @@ -441,3 +443,404 @@ def test_cached_step_appends_trajectory_row(nemorun: NeMoRunTestDefinition, tmp_ contents = csv_path.read_text().strip().splitlines() assert contents[0] == "step,action,reward,observation" assert contents[-1].startswith("5,") + + +def _seed_cached_entry_with_env_params( + env: CloudAIGymEnv, action: dict[str, object], env_params: dict[str, object] +) -> None: + """Seed env.trajectory with one entry carrying the given env_params.""" + entry = TrajectoryEntry(step=1, action=action, reward=0.5, observation=[100.0], env_params=env_params) + env.test_run.current_iteration = 0 + env.trajectory = {0: [entry]} + + +def test_cache_miss_when_env_params_differ(base_tr: TestRun, tmp_path: Path) -> None: + """Cache MUST miss when env_params differ, even if action is identical. + + Without this property the agent receives stale rewards on every cache hit + when the env varies per trial: any agent silently trains on labels that + do not correspond to the env they were nominally generated under. + """ + runner = MagicMock() + runner.scenario_root = tmp_path / "scenario" + runner.system = MagicMock() + runner.test_scenario = MagicMock(test_runs=[]) + runner.jobs = {} + runner.testrun_to_job_map = {} + + env = CloudAIGymEnv(test_run=base_tr, runner=runner, rewards=RewardOverrides()) + _seed_cached_entry_with_env_params(env, {"x": 10}, env_params={"ball_speed": 1}) + + env.test_run.current_env_params = {"ball_speed": 2} + + assert env.get_cached_trajectory_result({"x": 10}) is None, ( + "Cache must include env_params in its key. Keying on action alone means " + "trials repeating the same action under a different env_params sample " + "receive a stale cached reward." + ) + + +def test_cache_hit_when_action_and_env_params_match(base_tr: TestRun, tmp_path: Path) -> None: + """Same action AND same env_params must still HIT the cache.""" + runner = MagicMock() + runner.scenario_root = tmp_path / "scenario" + runner.system = MagicMock() + runner.test_scenario = MagicMock(test_runs=[]) + runner.jobs = {} + runner.testrun_to_job_map = {} + + env = CloudAIGymEnv(test_run=base_tr, runner=runner, rewards=RewardOverrides()) + _seed_cached_entry_with_env_params(env, {"x": 10}, env_params={"ball_speed": 2}) + + env.test_run.current_env_params = {"ball_speed": 2} + + result = env.get_cached_trajectory_result({"x": 10}) + assert result is not None + assert result.step == 1 + + +def test_cache_hit_when_neither_has_env_params(base_tr: TestRun, tmp_path: Path) -> None: + """Workloads without env_params behave exactly as today (back-compat).""" + runner = MagicMock() + runner.scenario_root = tmp_path / "scenario" + runner.system = MagicMock() + runner.test_scenario = MagicMock(test_runs=[]) + runner.jobs = {} + runner.testrun_to_job_map = {} + + env = CloudAIGymEnv(test_run=base_tr, runner=runner, rewards=RewardOverrides()) + env.test_run.current_iteration = 0 + env.trajectory = {0: [TrajectoryEntry(step=1, action={"x": 10}, reward=0.5, observation=[100.0])]} + # Note: neither the cached entry nor test_run carries env_params -> existing behavior. + + result = env.get_cached_trajectory_result({"x": 10}) + assert result is not None + assert result.step == 1 + + +def test_step_reruns_workload_when_env_params_change(tmp_path: Path) -> None: + """Integration: two env.step() calls with the same action but different sampled env_params re-run. + + Counterpart to test_cache_miss_when_env_params_differ but exercising the + full step() flow: increment_step -> sample env_params -> apply_params_set -> + cache lookup -> runner.run() -> write_trajectory. With seed 42 the sampler + draws ball_speed=3 then ball_speed=1 on the two consecutive trials, so the + cache key differs and the workload must re-run both times. + """ + tdef = EnvVarTestDefinition( + name="dr", + description="dr", + test_template_name="dr_template", + cmd_args=EnvVarCmdArgs(ball_speed=[1, 2, 3]), + env_params={"ball_speed": EnvParamSpec()}, + agent_metrics=["default"], + agent_config={"random_seed": 42}, + ) + test_run = TestRun( + name="dr_tr", + test=tdef, + num_nodes=1, + nodes=[], + output_path=tmp_path / "out" / "dr_tr" / "0", + ) + test_scenario = TestScenario(name="dr_scenario", test_runs=[test_run]) + + runner = MagicMock(spec=BaseRunner) + runner.scenario_root = tmp_path / "scenario" + runner.system = MagicMock() + runner.test_scenario = test_scenario + runner.jobs = {} + runner.testrun_to_job_map = {} + runner.shutting_down = False + runner.get_job_output_path.return_value = test_run.output_path + + env = CloudAIGymEnv(test_run=test_run, runner=runner, rewards=RewardOverrides()) + action = {"paddle_width": 4} + fake_obs = iter([[100.0], [50.0]]) + + with patch.object(env, "get_observation", side_effect=lambda _action: next(fake_obs)): + env.test_run.step = 0 + obs1, _r1, *_ = env.step(action) # samples ball_speed=3 + obs2, _r2, *_ = env.step(action) # samples ball_speed=1 + + assert runner.run.call_count == 2, ( + "Different sampled env_params between two env.step() calls with the same action " + "must trigger a workload re-run; the cache lookup must miss." + ) + assert obs1 != obs2, "fresh workload run should produce a fresh observation" + + +def test_env_csv_is_step_aligned_with_trajectory(tmp_path: Path) -> None: + """env.csv must have exactly one row per env.step() call, with steps aligned 1:1 to trajectory.csv. + + This pins the corpus-friendly contract: a downstream consumer can + ``pd.merge(traj, env, on="step")`` without losing rows on either side, + independent of whether the trial hit the trajectory cache. + """ + tdef = EnvVarTestDefinition( + name="dr", + description="dr", + test_template_name="dr_template", + cmd_args=EnvVarCmdArgs(ball_speed=[1, 2, 3]), + env_params={"ball_speed": EnvParamSpec()}, + agent_metrics=["default"], + agent_config={"random_seed": 42}, + ) + + test_run = TestRun( + name="dr_tr", + test=tdef, + num_nodes=1, + nodes=[], + output_path=tmp_path / "out" / "dr_tr" / "0", + ) + test_scenario = TestScenario(name="dr_scenario", test_runs=[test_run]) + + runner = MagicMock(spec=BaseRunner) + runner.scenario_root = tmp_path / "scenario" + runner.system = MagicMock() + runner.test_scenario = test_scenario + runner.jobs, runner.testrun_to_job_map, runner.shutting_down = {}, {}, False + runner.get_job_output_path.return_value = test_run.output_path + + env = CloudAIGymEnv(test_run=test_run, runner=runner, rewards=RewardOverrides()) + action_a, action_b = {"paddle_width": 4}, {"paddle_width": 8} + fake_obs = iter([[100.0], [50.0], [25.0]]) + + with patch.object(env, "get_observation", side_effect=lambda _action: next(fake_obs)): + env.test_run.step = 0 + for action in (action_a, action_b, action_a): + env.step(action) + + env_csv = env.env_params_record_path + traj_csv = env.trajectory_file_path + assert env_csv.exists(), "env.csv must be written when env_params is declared" + + env_steps = [int(line.split(",", 1)[0]) for line in env_csv.read_text().strip().splitlines()[1:]] + traj_steps = [int(line.split(",", 1)[0]) for line in traj_csv.read_text().strip().splitlines()[1:]] + assert env_steps == traj_steps == [1, 2, 3], ( + f"step columns must align 1:1 across env.csv ({env_steps}) and trajectory.csv ({traj_steps})" + ) + + +def test_env_csv_step_alignment_holds_on_constraint_failure(tmp_path: Path) -> None: + """A constraint failure must not desync env.csv from trajectory.csv. + + Runs three steps where the middle one fails ``constraint_check`` and the + other two succeed. ``env.csv`` is sunk inside ``write_trajectory`` from the + same ``TrajectoryEntry``, which is never reached on the early-return + constraint-failure path - so the failed step lands in neither file. The + corpus-friendly contract (``pd.merge(traj, env, on="step")`` loses no rows) + therefore holds via shared absence: both files record exactly the surviving + steps, aligned 1:1. + """ + tdef = EnvVarTestDefinition( + name="dr", + description="dr", + test_template_name="dr_template", + cmd_args=EnvVarCmdArgs(ball_speed=[1, 2, 3]), + env_params={"ball_speed": EnvParamSpec()}, + agent_metrics=["default"], + agent_config={"random_seed": 42}, + ) + + test_run = TestRun( + name="dr_tr", + test=tdef, + num_nodes=1, + nodes=[], + output_path=tmp_path / "out" / "dr_tr" / "0", + ) + test_scenario = TestScenario(name="dr_scenario", test_runs=[test_run]) + + runner = MagicMock(spec=BaseRunner) + runner.scenario_root = tmp_path / "scenario" + runner.system = MagicMock() + runner.test_scenario = test_scenario + runner.jobs, runner.testrun_to_job_map, runner.shutting_down = {}, {}, False + runner.get_job_output_path.return_value = test_run.output_path + + env = CloudAIGymEnv(test_run=test_run, runner=runner, rewards=RewardOverrides()) + + # Step 2 fails the constraint; steps 1 and 3 survive. get_observation is only + # reached on the surviving steps, so it yields exactly two values. + fake_obs = iter([[100.0], [25.0]]) + with ( + patch.object(EnvVarTestDefinition, "constraint_check", side_effect=[True, False, True]), + patch.object(env, "get_observation", side_effect=lambda _action: next(fake_obs)), + ): + env.test_run.step = 0 + for action in ({"paddle_width": 4}, {"paddle_width": 6}, {"paddle_width": 8}): + env.step(action) + + env_csv = env.env_params_record_path + traj_csv = env.trajectory_file_path + + assert env_csv.exists(), "surviving steps declare env_params -> env.csv must exist" + env_steps = [int(line.split(",", 1)[0]) for line in env_csv.read_text().strip().splitlines()[1:]] + traj_steps = ( + [int(line.split(",", 1)[0]) for line in traj_csv.read_text().strip().splitlines()[1:]] + if traj_csv.exists() + else [] + ) + assert env_steps == traj_steps == [1, 3], ( + f"the constraint-failed step (2) must appear in neither file; env.csv ({env_steps}) " + f"and trajectory.csv ({traj_steps}) must stay 1:1 aligned on the surviving steps" + ) + + +def test_step_cache_hit_with_declared_env_params_still_writes_env_csv(tmp_path: Path) -> None: + """End-to-end: cache HIT under observer-driven env_params still records env.csv. + + A cache hit still calls ``write_trajectory``, which sinks the trajectory row + and the matching env.csv row from the same entry - keeping the two files + step-aligned even when the workload itself is short-circuited. + Asserts: (a) the workload is NOT re-run (cache short-circuit), (b) + env.csv gains a row, (c) trajectory.csv gains a row carrying the + sampled env_params. + """ + import random as _random + + tdef = EnvVarTestDefinition( + name="dr", + description="dr", + test_template_name="dr_template", + cmd_args=EnvVarCmdArgs(ball_speed=[1, 2, 3]), + env_params={"ball_speed": EnvParamSpec()}, + agent_metrics=["default"], + agent_config={"random_seed": 42}, + ) + + test_run = TestRun( + name="dr_tr", + test=tdef, + num_nodes=1, + nodes=[], + output_path=tmp_path / "out" / "dr_tr" / "0", + ) + runner = MagicMock(spec=BaseRunner) + runner.scenario_root = tmp_path / "scenario" + runner.system = MagicMock() + runner.test_scenario = TestScenario(name="dr_scenario", test_runs=[test_run]) + runner.jobs, runner.testrun_to_job_map, runner.shutting_down = {}, {}, False + runner.get_job_output_path.return_value = test_run.output_path + + env = CloudAIGymEnv(test_run=test_run, runner=runner, rewards=RewardOverrides()) + assert env.params is not None, "TestDefinition.env_params declared -> EnvParams must be built" + + expected_sample = {"ball_speed": _random.Random("42:ball_speed:1").choice([1, 2, 3])} + action = {"paddle_width": 4} + env.test_run.current_iteration = 0 + env.trajectory = { + 0: [TrajectoryEntry(step=0, action=action, reward=0.42, observation=[0.84], env_params=expected_sample)] + } + env.test_run.step = 0 + + with patch.object(env, "get_observation", side_effect=AssertionError("cache miss path must not run")): + obs, reward, _done, _info = env.step(action) + + runner.run.assert_not_called() + assert reward == 0.42 and obs == [0.84] + + env_csv = env.env_params_record_path + assert env_csv.exists(), "cache HIT must NOT skip the observer; env.csv must record the trial" + env_rows = env_csv.read_text().strip().splitlines() + assert env_rows[0] == "step,env" + assert env_rows[1].startswith("1,"), f"expected step 1 row in env.csv, got {env_rows[1]!r}" + + traj_rows = env.trajectory[0] + assert len(traj_rows) == 2 and traj_rows[-1].env_params == expected_sample, ( + "cache-hit trajectory entry must record the per-trial env_params sample" + ) + + +def test_step_overlays_env_params_onto_cmd_args(tmp_path: Path) -> None: + """The per-trial env_params sample must be overlaid onto cmd_args before the workload runs. + + env_params are the env-side twin of the action: core overlays the sampled values onto + cmd_args inside step() so the workload actually runs with them, workload-agnostically + (no per-workload injection code). Here ``ball_speed`` is env-randomized (its candidate + list lives in cmd_args), so the value the workload runs with must equal the observer's + sample, not the whole candidate list. + """ + import random as _random + + tdef = EnvVarTestDefinition( + name="overlay", + description="overlay", + test_template_name="dr_template", + cmd_args=EnvVarCmdArgs(ball_speed=[1, 2]), + env_params={"ball_speed": EnvParamSpec()}, + agent_metrics=["default"], + agent_config={"random_seed": 42}, + ) + + test_run = TestRun( + name="overlay_tr", + test=tdef, + num_nodes=1, + nodes=[], + output_path=tmp_path / "out" / "overlay_tr" / "0", + ) + runner = MagicMock(spec=BaseRunner) + runner.scenario_root = tmp_path / "scenario" + runner.system = MagicMock() + runner.test_scenario = TestScenario(name="overlay_scenario", test_runs=[test_run]) + runner.jobs, runner.testrun_to_job_map, runner.shutting_down = {}, {}, False + runner.get_job_output_path.return_value = test_run.output_path + + env = CloudAIGymEnv(test_run=test_run, runner=runner, rewards=RewardOverrides()) + expected = _random.Random("42:ball_speed:1").choice([1, 2]) + + with patch.object(env, "get_observation", side_effect=lambda _action: [1.0]): + env.test_run.step = 0 + env.step({"paddle_width": 4}) + + ran_cmd_args = runner.test_scenario.test_runs[0].test.cmd_args + assert ran_cmd_args.ball_speed == expected, ( + "step() must overlay the sampled env_params onto cmd_args before runner.run(), so the " + f"workload runs with ball_speed={expected} (the observer's draw), not the candidate list." + ) + + +def test_param_space_excludes_env_params_keys(setup_env: tuple[TestRun, BaseRunner]): + """env_params keys must never surface in the grid/action space (sampled, not searched).""" + tr, _ = setup_env + tr.test = EnvVarTestDefinition( + name="dr", + description="dr", + test_template_name="dr_template", + # paddle_width is an ordinary action-space list; ball_speed is an env-sampled list. + cmd_args=EnvVarCmdArgs(paddle_width=[4, 8], ball_speed=[1, 2]), + env_params={"ball_speed": EnvParamSpec()}, + ) + + action_space = tr.param_space + + assert "paddle_width" in action_space, "an un-annotated cmd_args list must remain an action-space dimension" + assert "ball_speed" not in action_space, ( + "a knob declared in env_params is sampled by the env, not explored by the agent, so it " + "must be excluded from param_space even though its cmd_args value is a list." + ) + + +def test_no_env_csv_when_env_params_not_declared(nemorun: NeMoRunTestDefinition, tmp_path: Path) -> None: + """Workloads without [env_params.*] pay zero overhead: no observer, no env.csv.""" + tdef = nemorun.model_copy(deep=True) + tdef.cmd_args.data.global_batch_size = 8 + test_run = TestRun( + name="plain_tr", + test=tdef, + num_nodes=1, + nodes=[], + output_path=tmp_path / "out" / "plain_tr" / "0", + reports={NeMoRunReportGenerationStrategy}, + ) + runner = MagicMock(spec=BaseRunner) + runner.scenario_root = tmp_path / "scenario" + runner.system = MagicMock() + + env = CloudAIGymEnv(test_run=test_run, runner=runner, rewards=RewardOverrides()) + + assert env.params is None, "no env_params declared -> no EnvParams object" + assert not env.env_params_record_path.exists() diff --git a/tests/test_env_params.py b/tests/test_env_params.py new file mode 100644 index 000000000..66cfd4352 --- /dev/null +++ b/tests/test_env_params.py @@ -0,0 +1,373 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for the environment-parameter primitives in cloudai.configurator.env_params. + +Annotation model: candidate values live in ``cmd_args`` (the single source of +truth); an ``env_params`` entry is a thin annotation that reclassifies that +field from action-space to env-sampled, optionally carrying sampling +``weights``. These tests use a dedicated, list-capable fixture workload so the +candidate list is a first-class typed field (no serialization warnings). + +Worked example - Atari Breakout, the canonical RL arcade game: +``ball_speed`` is the env-sampled knob - the game serves the ball at a speed the +agent does not control, so we sample it per trial and the policy must stay +robust across ball speeds. ``paddle_width`` is an ordinary action-space +dimension - the knob the agent actually tunes. +""" + +from __future__ import annotations + +import dataclasses +import random +from pathlib import Path +from typing import List, Union + +import pytest +from pydantic import BaseModel, ValidationError + +from cloudai.configurator.env_params import ( + EnvParam, + EnvParams, + EnvParamSpec, + EnvParamsSink, +) +from cloudai.core import TestRun +from cloudai.models.workload import CmdArgs, TestDefinition + + +class BrickGrid(BaseModel): + """A structured (non-leaf) cmd_args field, used to prove env_params rejects such targets.""" + + rows: int = 3 + + +class EnvVarCmdArgs(CmdArgs): + """cmd_args with top-level, list-capable fields for env_params annotation tests. + + ``ball_speed`` is the env-randomized knob; ``paddle_width`` stands in for an + ordinary action-space dimension. Both accept either a scalar or a candidate + list, so a candidate list round-trips through model validation/serialization + cleanly. ``brick_grid`` is a structured field (not a leaf) for negative tests. + """ + + ball_speed: Union[int, List[int]] = 1 + paddle_width: Union[int, List[int]] = 8 + brick_grid: BrickGrid = BrickGrid() + + +class EnvVarTestDefinition(TestDefinition): + """Minimal concrete TestDefinition wrapping ``EnvVarCmdArgs``.""" + + cmd_args: EnvVarCmdArgs = EnvVarCmdArgs() + + +def _tdef(env_params: dict, **cmd_args_overrides) -> EnvVarTestDefinition: + """Build the fixture TestDefinition through full model validation (so validators fire).""" + return EnvVarTestDefinition( + name="breakout", + description="breakout", + test_template_name="breakout_template", + cmd_args=EnvVarCmdArgs(**cmd_args_overrides), + env_params=env_params, + ) + + +# --- EnvParamSpec: weights are validated intrinsically (length is cross-field, see below) --- + + +def test_env_param_spec_accepts_normalized_weights() -> None: + spec = EnvParamSpec(weights=[0.7, 0.3]) + assert spec.weights == [0.7, 0.3] + + +def test_env_param_spec_bare_marker_is_valid() -> None: + """An annotation with no weights is a valid uniform marker.""" + assert EnvParamSpec().weights is None + + +def test_env_param_spec_rejects_unnormalized_weights() -> None: + """Strict sum: weights must sum to ~1.0 (relative weights like [7, 3] are rejected).""" + with pytest.raises(ValidationError, match="must sum to"): + EnvParamSpec(weights=[7.0, 3.0]) + + +def test_env_param_spec_rejects_negative_weights() -> None: + with pytest.raises(ValidationError, match="finite and non-negative"): + EnvParamSpec(weights=[-0.1, 1.1]) + + +@pytest.mark.parametrize("bad", [float("inf"), float("nan"), float("-inf")]) +def test_env_param_spec_rejects_non_finite_weights(bad: float) -> None: + with pytest.raises(ValidationError, match="finite and non-negative"): + EnvParamSpec(weights=[bad, 1.0]) + + +def test_env_param_spec_rejects_unknown_fields() -> None: + """Candidate values live in cmd_args, never in the annotation (no ``values`` key).""" + with pytest.raises(ValidationError): + EnvParamSpec.model_validate({"values": [1, 2]}) + + +# --- EnvParam: one resolved knob - candidate values, optional weights, a single draw --- + + +def test_env_param_defaults_to_unweighted() -> None: + """A knob built without weights samples uniformly (weights is None).""" + assert EnvParam(candidates=[1, 2, 3]).weights is None + + +def test_env_param_draw_returns_a_candidate() -> None: + """draw() always yields one of the knob's own candidate values.""" + knob = EnvParam(candidates=[1, 2, 3]) + assert all(knob.draw(random.Random(s)) in {1, 2, 3} for s in range(50)) + + +def test_env_param_draw_is_reproducible_for_a_given_rng() -> None: + """draw() consumes the caller's RNG (no internal seeding), so equal RNG state yields equal draws.""" + knob = EnvParam(candidates=[10, 20, 30, 40]) + assert knob.draw(random.Random(123)) == knob.draw(random.Random(123)) + + +def test_env_param_draw_honors_degenerate_weights() -> None: + """A degenerate weight ([1, 0]) collapses the draw onto the first candidate, whatever the RNG.""" + knob = EnvParam(candidates=[1, 2], weights=[1.0, 0.0]) + assert all(knob.draw(random.Random(s)) == 1 for s in range(50)) + + +def test_env_param_is_immutable() -> None: + """Frozen value object: a resolved knob cannot be mutated after construction.""" + knob = EnvParam(candidates=[1, 2, 3]) + with pytest.raises(dataclasses.FrozenInstanceError): + knob.candidates = [9] # pyright: ignore[reportAttributeAccessIssue] + + +# --- EnvParams.sample: draws from candidate lists resolved out of cmd_args --- + + +def test_sampler_is_deterministic_across_calls() -> None: + env_params = EnvParams(params={"ball_speed": EnvParam(candidates=[1, 2, 3])}, seed=42) + seq_a = [env_params.sample(t) for t in range(1, 6)] + seq_b = [env_params.sample(t) for t in range(1, 6)] + assert seq_a == seq_b, "same (seed, trial) must produce the same draw across calls" + + +def test_sampler_each_param_is_independent() -> None: + """Adding an unrelated parameter must not perturb existing parameters' draws.""" + base = EnvParams(params={"ball_speed": EnvParam(candidates=[1, 2, 3])}, seed=7) + extended = EnvParams( + params={"ball_speed": EnvParam(candidates=[1, 2, 3]), "brick_rows": EnvParam(candidates=[3, 4, 5])}, + seed=7, + ) + a = [base.sample(t)["ball_speed"] for t in range(1, 11)] + b = [extended.sample(t)["ball_speed"] for t in range(1, 11)] + assert a == b, "per-parameter RNG seeding must isolate parameters from each other" + + +def test_sampler_honors_weights() -> None: + """A degenerate weight ([1, 0]) must always pick the first candidate.""" + env_params = EnvParams(params={"ball_speed": EnvParam(candidates=[1, 2], weights=[1.0, 0.0])}, seed=1) + assert all(env_params.sample(t)["ball_speed"] == 1 for t in range(1, 20)) + + +def test_sample_covers_all_declared_params() -> None: + """sample() returns exactly one value per declared knob, each drawn from that knob's candidates.""" + env_params = EnvParams( + params={"ball_speed": EnvParam(candidates=[1, 2]), "paddle_width": EnvParam(candidates=[4, 8])}, + seed=3, + ) + drawn = env_params.sample(5) + assert set(drawn) == {"ball_speed", "paddle_width"} + assert drawn["ball_speed"] in {1, 2} + assert drawn["paddle_width"] in {4, 8} + + +def test_env_params_is_immutable() -> None: + """Frozen value object: resolved sampling state cannot be mutated after construction.""" + env_params = EnvParams(params={"ball_speed": EnvParam(candidates=[1, 2])}, seed=0) + with pytest.raises(dataclasses.FrozenInstanceError): + env_params.seed = 1 # pyright: ignore[reportAttributeAccessIssue] + + +# --- EnvParamsSink: unchanged persistence contract --- + + +def test_csv_sink_skips_empty_samples_and_rejects_zero_step(tmp_path: Path) -> None: + sink = EnvParamsSink() + path = tmp_path / "env.csv" + sink.write(path, 1, {}) # empty -> no-op, no file + assert not path.exists() + with pytest.raises(ValueError, match="must be a positive trial index"): + sink.write(path, 0, {"ball_speed": 1}) + + +def test_csv_sink_writes_header_then_rows(tmp_path: Path) -> None: + sink = EnvParamsSink() + path = tmp_path / "env.csv" + sink.write(path, 1, {"ball_speed": 2}) + sink.write(path, 2, {"ball_speed": 3}) + contents = path.read_text().strip().splitlines() + assert contents[0] == "step,env" + assert contents[1].startswith("1,") + assert contents[2].startswith("2,") + + +# --- EnvParams.from_test: resolves candidate lists from cmd_args, once at env formulation --- + + +def test_env_params_from_test_resolves_list_candidate() -> None: + """A list-valued cmd_args field resolves to its candidate list; sampling then draws from it.""" + env_params = EnvParams.from_test(_tdef({"ball_speed": EnvParamSpec()}, ball_speed=[1, 2, 3])) + + assert env_params is not None + assert env_params.params == {"ball_speed": EnvParam(candidates=[1, 2, 3], weights=None)} + assert env_params.sample(3)["ball_speed"] in {1, 2, 3} + + +def test_env_params_from_test_carries_weights() -> None: + """Weighted annotations propagate their weights alongside the resolved candidates.""" + env_params = EnvParams.from_test(_tdef({"ball_speed": EnvParamSpec(weights=[0.7, 0.3])}, ball_speed=[1, 2])) + + assert env_params is not None + assert env_params.params["ball_speed"].weights == [0.7, 0.3] + + +def test_env_params_from_test_reads_seed_from_agent_config() -> None: + """The sampler seed comes from agent_config.random_seed so a run is reproducible end to end.""" + tdef = _tdef({"ball_speed": EnvParamSpec()}, ball_speed=[1, 2, 3]) + tdef.agent_config = {"random_seed": 42} + + env_params = EnvParams.from_test(tdef) + + assert env_params is not None and env_params.seed == 42 + + +def test_env_params_from_test_defaults_seed_to_zero_without_agent_config() -> None: + """No agent_config (the default) -> seed 0, so a declared-but-unseeded run still samples.""" + env_params = EnvParams.from_test(_tdef({"ball_speed": EnvParamSpec()}, ball_speed=[1, 2, 3])) + + assert env_params is not None and env_params.seed == 0 + + +def test_env_params_from_test_defaults_seed_to_zero_when_key_absent() -> None: + """agent_config present but without random_seed still falls back to seed 0.""" + tdef = _tdef({"ball_speed": EnvParamSpec()}, ball_speed=[1, 2, 3]) + tdef.agent_config = {"other": 1} + + env_params = EnvParams.from_test(tdef) + + assert env_params is not None and env_params.seed == 0 + + +def test_env_params_from_test_resolves_multiple_params() -> None: + """Every list-valued annotation becomes its own knob, candidates resolved from cmd_args.""" + env_params = EnvParams.from_test( + _tdef( + {"ball_speed": EnvParamSpec(), "paddle_width": EnvParamSpec()}, + ball_speed=[1, 2], + paddle_width=[4, 8], + ) + ) + + assert env_params is not None + assert set(env_params.params) == {"ball_speed", "paddle_width"} + assert env_params.params["ball_speed"].candidates == [1, 2] + assert env_params.params["paddle_width"].candidates == [4, 8] + + +def test_env_params_from_test_none_when_no_env_params() -> None: + """No annotations declared -> no EnvParams object (the zero-overhead path).""" + assert EnvParams.from_test(_tdef({})) is None + + +# --- TestDefinition.validate_env_params: annotation validity (cross-field with cmd_args) --- + + +def test_env_params_uniform_list_is_accepted() -> None: + """A cmd_args candidate list annotated with a bare marker validates (uniform sampling).""" + tdef = _tdef({"ball_speed": EnvParamSpec()}, ball_speed=[1, 2]) + assert "ball_speed" in tdef.env_params + + +def test_env_params_weighted_list_is_accepted() -> None: + tdef = _tdef({"ball_speed": EnvParamSpec(weights=[0.7, 0.3])}, ball_speed=[1, 2]) + assert tdef.env_params["ball_speed"].weights == [0.7, 0.3] + + +def test_env_params_scalar_annotation_rejected() -> None: + """A scalar (fixed) knob carries nothing to sample; the annotation is a meaningless label and is + rejected at parse time (it only reclassifies a list-valued sweep as env-sampled).""" + with pytest.raises(ValidationError, match="not a candidate list"): + _tdef({"ball_speed": EnvParamSpec()}, ball_speed=2) + + +def test_env_params_unknown_key_rejected() -> None: + """env_params keys must name real cmd_args fields (the overlay targets cmd_args).""" + with pytest.raises(ValidationError, match="not cmd_args fields"): + _tdef({"ghost": EnvParamSpec()}, ball_speed=[1, 2]) + + +def test_env_params_weights_on_scalar_rejected() -> None: + """Weights require a candidate list; a scalar knob cannot carry them.""" + with pytest.raises(ValidationError, match="not a candidate list"): + _tdef({"ball_speed": EnvParamSpec(weights=[0.7, 0.3])}, ball_speed=2) + + +def test_env_params_weights_length_mismatch_rejected() -> None: + """Weights must align 1:1 with the cmd_args candidate list.""" + with pytest.raises(ValidationError, match="weights length"): + _tdef({"ball_speed": EnvParamSpec(weights=[0.5, 0.3, 0.2])}, ball_speed=[1, 2]) + + +def test_env_params_structured_target_rejected() -> None: + """A structured (non-leaf) cmd_args target is rejected: the observer can't sample it, yet + param_space/is_dse_job exclude the whole key, silently dropping nested action dimensions.""" + with pytest.raises(ValidationError, match="must target a leaf cmd_args field"): + _tdef({"brick_grid": EnvParamSpec()}) + + +def test_env_params_empty_candidate_list_rejected() -> None: + """An empty candidate list is rejected at build time: an unweighted spec would otherwise skip + validation and let the sampler fail later on an empty draw (rng.choice([])).""" + with pytest.raises(ValidationError, match="empty candidate list"): + _tdef({"ball_speed": EnvParamSpec()}, ball_speed=[]) + + +# --- is_dse_job: env-sampled lists are not search dimensions --- + + +def test_is_dse_job_false_for_env_param_only_workload() -> None: + """A cmd_args list that is purely env-sampled is not a search dimension -> not a DSE job.""" + tdef = _tdef({"ball_speed": EnvParamSpec()}, ball_speed=[1, 2, 3]) + assert tdef.is_dse_job is False + + +def test_is_dse_job_true_when_a_real_action_dimension_exists() -> None: + """An un-annotated cmd_args list is a real action dimension -> DSE, even alongside env_params.""" + tdef = _tdef({"ball_speed": EnvParamSpec()}, ball_speed=[1, 2, 3], paddle_width=[4, 8]) + assert tdef.is_dse_job is True + + +def test_apply_params_set_accepts_weighted_env_param_draw() -> None: + """Regression: apply_params_set re-validates after the overlay; a weighted env_param's scalar + draw must not trip validate_env_params (which would reject 'weights but not a candidate list').""" + tdef = _tdef({"ball_speed": EnvParamSpec(weights=[0.7, 0.3])}, ball_speed=[1, 2]) + tr = TestRun(name="tr", test=tdef, num_nodes=1, nodes=[]) + + new_tr = tr.apply_params_set({}, env_params={"ball_speed": 1}) + + assert new_tr.test.cmd_args.ball_speed == 1 + assert new_tr.current_env_params == {"ball_speed": 1} diff --git a/tests/test_handlers.py b/tests/test_handlers.py index bcf01978f..247548287 100644 --- a/tests/test_handlers.py +++ b/tests/test_handlers.py @@ -26,19 +26,23 @@ from cloudai.cli.handlers import ( handle_dse_job, + validate_dse_env_params, verify_system_configs, verify_test_configs, verify_test_scenarios, ) +from cloudai.configurator.env_params import EnvParamSpec from cloudai.core import ( BaseAgent, BaseAgentConfig, + Parser, Registry, RewardOverrides, Runner, TestDependency, TestRun, TestScenario, + TestScenarioParsingError, ) from cloudai.models.scenario import ReportConfig from cloudai.reporter import StatusReporter @@ -52,6 +56,7 @@ class StubAgentConfig(BaseAgentConfig): class StubAgent(BaseAgent): received_configs: ClassVar[list[StubAgentConfig]] = [] + samples_env_params: bool = True # stands in for an env-aware learning agent def __init__(self, env, config: StubAgentConfig): self.env = env @@ -427,3 +432,84 @@ def test_handle_dse_job_documents_failure_in_reports_before_raising( contents = failure_report.read_text() assert "RuntimeError" in contents assert "agent blew up" in contents + + +def test_validate_dse_env_params_rejects_non_dse(base_tr: TestRun) -> None: + base_tr.test.env_params = {"ball_speed": EnvParamSpec()} + scenario = TestScenario(name="s", test_runs=[base_tr]) + with pytest.raises(TestScenarioParsingError, match="no agent will sample them"): + validate_dse_env_params(scenario) + + +def test_validate_dse_env_params_rejects_grid_search(dse_tr: TestRun) -> None: + """A DSE job on grid_search exhaustively searches the space, so env_params are noise -> reject.""" + dse_tr.test.env_params = {"ball_speed": EnvParamSpec()} + assert dse_tr.is_dse_job is True # it IS a DSE job... + assert dse_tr.test.agent == "grid_search" # ...but grid_search does not sample env_params + with pytest.raises(TestScenarioParsingError, match="no agent will sample them"): + validate_dse_env_params(TestScenario(name="s", test_runs=[dse_tr])) + + +def test_validate_dse_env_params_rejects_non_sampling_agent( + dse_tr: TestRun, stub_agent_name: str, monkeypatch: pytest.MonkeyPatch +) -> None: + """The check keys on the agent capability, not the name: a non-grid agent that opts out is rejected too.""" + monkeypatch.setattr(StubAgent, "samples_env_params", False) + dse_tr.test.env_params = {"ball_speed": EnvParamSpec()} + dse_tr.test.agent = stub_agent_name + assert dse_tr.is_dse_job is True and dse_tr.test.agent != "grid_search" + with pytest.raises(TestScenarioParsingError, match="no agent will sample them"): + validate_dse_env_params(TestScenario(name="s", test_runs=[dse_tr])) + + +def test_validate_dse_env_params_defers_unknown_agent(dse_tr: TestRun) -> None: + """An unknown agent is not flagged here; it is deferred to the dedicated agent-resolution error.""" + dse_tr.test.env_params = {"ball_speed": EnvParamSpec()} + dse_tr.test.agent = "does_not_exist_agent" + assert dse_tr.is_dse_job is True + assert dse_tr.test.agent not in Registry().agents_map # precondition: agent is truly unknown + validate_dse_env_params(TestScenario(name="s", test_runs=[dse_tr])) # no exception == deferred + + +def test_validate_dse_env_params_allows_dse_run(dse_tr: TestRun, stub_agent_name: str) -> None: + dse_tr.test.env_params = {"ball_speed": EnvParamSpec()} + dse_tr.test.agent = stub_agent_name # an env-aware agent (samples_env_params=True) consumes env_params + assert dse_tr.is_dse_job is True # precondition: DSE + env-aware agent + env_params is allowed + validate_dse_env_params(TestScenario(name="s", test_runs=[dse_tr])) # no exception == pass + + +def test_validate_dse_env_params_allows_num_nodes_sweep(base_tr: TestRun, stub_agent_name: str) -> None: + base_tr.test.env_params = {"ball_speed": EnvParamSpec()} + base_tr.test.agent = stub_agent_name + base_tr.num_nodes = [1, 2] + assert base_tr.is_dse_job is True # a num_nodes list sweep makes it DSE, so env_params is allowed + validate_dse_env_params(TestScenario(name="s", test_runs=[base_tr])) # no exception == pass + + +def test_validate_dse_env_params_allows_non_dse_without_env_params(base_tr: TestRun) -> None: + assert base_tr.is_dse_job is False # precondition: not DSE, but also no env_params declared + assert not base_tr.test.env_params + validate_dse_env_params(TestScenario(name="s", test_runs=[base_tr])) # no exception == pass + + +def test_verify_test_scenarios_rejects_env_params_without_dse( + base_tr: TestRun, monkeypatch: pytest.MonkeyPatch +) -> None: + base_tr.test.env_params = {"ball_speed": EnvParamSpec()} + bad = TestScenario(name="s", test_runs=[base_tr]) + monkeypatch.setattr(Parser, "parse_tests", lambda *a, **k: []) + monkeypatch.setattr(Parser, "parse_hooks", lambda *a, **k: {}) + monkeypatch.setattr(Parser, "parse_test_scenario", lambda *a, **k: bad) + assert verify_test_scenarios([Path("dummy.toml")], [], [], []) == 1 + + +def test_verify_test_scenarios_allows_env_params_with_dse( + dse_tr: TestRun, stub_agent_name: str, monkeypatch: pytest.MonkeyPatch +) -> None: + dse_tr.test.env_params = {"ball_speed": EnvParamSpec()} + dse_tr.test.agent = stub_agent_name # learning agent (not grid_search) + good = TestScenario(name="s", test_runs=[dse_tr]) + monkeypatch.setattr(Parser, "parse_tests", lambda *a, **k: []) + monkeypatch.setattr(Parser, "parse_hooks", lambda *a, **k: {}) + monkeypatch.setattr(Parser, "parse_test_scenario", lambda *a, **k: good) + assert verify_test_scenarios([Path("dummy.toml")], [], [], []) == 0