Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
95c192b
feat(configurator): env_params as first-class trial-identity field
rutayan-nv Jun 20, 2026
0dabd4e
refactor(configurator): env_params as cmd_args annotation, excluded f…
rutayan-nv Jun 23, 2026
d59572e
fix(configurator): tighten env_params validation, align env.csv, fix …
rutayan-nv Jun 24, 2026
ed42f9d
fix(workload): reject empty env_params candidate lists at build time
rutayan-nv Jun 24, 2026
9412c7b
refactor(configurator): model env_params as cohesive EnvParam/EnvPara…
rutayan-nv Jun 24, 2026
af3b6a8
refactor(configurator): collapse env_params sink into one concrete class
rutayan-nv Jun 25, 2026
dead658
refactor(configurator): gate env_params sampling on an agent capabili…
rutayan-nv Jun 25, 2026
69dd7d4
style(configurator): trim verbose comments toward self-documenting code
rutayan-nv Jun 25, 2026
7fec7f5
fix(test_scenario): skip env_params on post-overlay revalidation
rutayan-nv Jun 25, 2026
8f91664
fix(workload): reject scalar-only env_params annotations at parse time
rutayan-nv Jun 26, 2026
7db764b
feat(configurator): add ObsLeafDescriptor + structured-observation pr…
rutayan-nv Jun 16, 2026
584f145
test(env-params): suppress pyright on intentional ObsLeafDescriptor r…
rutayan-nv Jun 16, 2026
6b6d2bd
feat(configurator): add GymnasiumAdapter for CloudAI envs
rutayan-nv Jun 16, 2026
6c2452f
fix(gymnasium-adapter): accept continuous actions in step() type sign…
rutayan-nv Jun 16, 2026
89a5b8f
fix(gymnasium-adapter): validate structured-observation key parity; p…
rutayan-nv Jun 16, 2026
b868479
refactor(configurator): route gymnasium import through LazyImports si…
rutayan-nv Jun 16, 2026
4b59c49
fix(configurator): make lazy-gymnasium refactor land CI-green
rutayan-nv Jun 16, 2026
c30b723
feat(configurator): make GymnasiumAdapter inherit gymnasium.Env
rutayan-nv Jun 21, 2026
3fe318c
test(gym): cast action_space to Dict in adapter contract test
rutayan-nv Jun 21, 2026
092573e
feat(configurator): add GymnasiumAdapter.encode_action as public inve…
rutayan-nv Jun 21, 2026
2e2d431
style(configurator): ruff format gymnasium_adapter.py
rutayan-nv Jun 24, 2026
4f479dd
refactor(gymnasium-adapter): defer continuous-action support with Con…
rutayan-nv Jun 25, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ requires-python = ">=3.10"
"import-linter~=2.10",
"pytest-deadfixtures~=3.1",
"taplo~=0.9.3",
"gymnasium~=1.2",
]
rl = ["gymnasium~=1.2"]
docs = [
"sphinx~=8.1",
"nvidia-sphinx-theme~=0.0.8",
Expand Down
40 changes: 28 additions & 12 deletions src/cloudai/_core/test_scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class TestRun:
reports: Set[Type[ReportGenerationStrategy]] = field(default_factory=set)
extra_srun_args: str | None = None
num_nodes_explicit: bool = False
current_env_params: dict[str, Any] = field(default_factory=dict)

def __hash__(self) -> int:
return hash(self.name + self.test.name + str(self.iterations) + str(self.current_iteration))
Expand Down Expand Up @@ -156,7 +157,9 @@ def param_space(self) -> dict[str, Any]:
**{
key: value
for key, value in cmd_args_dict.items()
if isinstance(value, list) and not self.test.is_dse_excluded_arg(key)
if isinstance(value, list)
and not self.test.is_dse_excluded_arg(key)
and not self.test.is_env_sampled(key)
},
**{f"extra_env_vars.{key}": value for key, value in extra_env_vars_dict.items() if isinstance(value, list)},
}
Expand Down Expand Up @@ -184,27 +187,40 @@ def all_combinations(self) -> list[dict[str, Any]]:

return all_combinations

def apply_params_set(self, action: dict[str, Any]) -> "TestRun":
def apply_params_set(self, action: dict[str, Any], env_params: dict[str, Any] | None = None) -> "TestRun":
tdef = self.test.model_copy(deep=True)
for key, value in action.items():

def _apply(key: str, value: Any) -> None:
if key.startswith("extra_env_vars."):
tdef.extra_env_vars[key[len("extra_env_vars.") :]] = value
return
attrs = key.split(".")
obj = tdef.cmd_args
for attr in attrs[:-1]:
obj = obj[attr] if isinstance(obj, dict) else getattr(obj, attr)
if isinstance(obj, dict):
obj[attrs[-1]] = value
else:
attrs = key.split(".")
obj = tdef.cmd_args
for attr in attrs[:-1]:
obj = obj[attr] if isinstance(obj, dict) else getattr(obj, attr)
if isinstance(obj, dict):
obj[attrs[-1]] = value
else:
setattr(obj, attrs[-1], value)
setattr(obj, attrs[-1], value)

# RNG runs in the env before this call; applying only concrete values keeps this deterministic.
for key, value in action.items():
_apply(key, value)
for key, value in (env_params or {}).items():
_apply(key, value)

type(tdef)(**tdef.model_dump()) # trigger validation
# env_params is validated at parse time; after the overlay its target cmd_args fields hold
# concrete scalar draws, so re-validating it here would reject weighted specs. Drop it for
# this validation-only pass, which exists to validate the applied action values.
validation_args = tdef.model_dump()
validation_args.pop("env_params", None)
type(tdef)(**validation_args) # trigger validation

new_tr = copy.deepcopy(self)
new_tr.test = tdef
if "NUM_NODES" in action:
new_tr.num_nodes = action["NUM_NODES"]
new_tr.current_env_params = dict(env_params or {})
return new_tr


Expand Down
16 changes: 12 additions & 4 deletions src/cloudai/cli/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import toml
import yaml

from cloudai.configurator.env_params import validate_dse_env_params
from cloudai.core import (
BaseInstaller,
CloudAIGymEnv,
Expand All @@ -39,6 +40,7 @@
System,
TestParser,
TestScenario,
TestScenarioParsingError,
)
from cloudai.models.scenario import ReportConfig
from cloudai.models.workload import TestDefinition
Expand Down Expand Up @@ -133,8 +135,7 @@ def handle_dse_job(runner: Runner, args: argparse.Namespace) -> int:
return 1

err = 0
# Recoverable failures return a non-zero rc and are accumulated here; an unexpected exception
# (a bug) is a hard-fail. We capture it so reports still generate, then re-raise below.
# Capture an unexpected error so reports still generate, then re-raise below.
run_error: Exception | None = None
try:
for tr in runner.runner.test_scenario.test_runs:
Expand Down Expand Up @@ -177,7 +178,7 @@ def handle_dse_job(runner: Runner, args: argparse.Namespace) -> int:
)

if run_error is not None:
raise run_error
raise run_error.with_traceback(run_error.__traceback__)

logging.info("All jobs are complete.")
return err
Expand Down Expand Up @@ -303,6 +304,12 @@ def handle_dry_run_and_run(args: argparse.Namespace) -> int:
return 1
system, test_scenario, tests = setup_result

try:
validate_dse_env_params(test_scenario)
except TestScenarioParsingError as e:
logging.error(str(e))
return 1

if not _handle_single_sbatch(args, system):
return 1

Expand Down Expand Up @@ -491,7 +498,8 @@ def verify_test_scenarios(
tests = Parser.parse_tests(test_tomls, system)
hook_tests = Parser.parse_tests(hook_test_tomls, system)
hooks = Parser.parse_hooks(hook_tomls, system, {t.name: t for t in hook_tests})
Parser.parse_test_scenario(scenario_file, system, {t.name: t for t in tests}, hooks)
scenario = Parser.parse_test_scenario(scenario_file, system, {t.name: t for t in tests}, hooks)
validate_dse_env_params(scenario)
except Exception:
nfailed += 1

Expand Down
2 changes: 2 additions & 0 deletions src/cloudai/configurator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
from .base_gym import BaseGym
from .cloudai_gym import CloudAIGymEnv, TrajectoryEntry
from .grid_search import GridSearchAgent
from .gymnasium_adapter import GymnasiumAdapter

__all__ = [
"BaseAgent",
"BaseGym",
"CloudAIGymEnv",
"GridSearchAgent",
"GymnasiumAdapter",
"TrajectoryEntry",
]
15 changes: 9 additions & 6 deletions src/cloudai/configurator/base_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ class BaseAgent(ABC):
Provides a unified interface and parameter management for action spaces.
"""

# Opt-in: agents that consume per-trial env_params sampling set this True. Default False so
# env_params declared for a non-sampling agent are rejected rather than silently ignored.
samples_env_params: bool = False

def __init__(self, env: BaseGym, config: BaseAgentConfig):
"""
Initialize the agent with the environment.
Expand Down Expand Up @@ -94,9 +98,8 @@ def select_action(self, observation: list[float] | None = None) -> tuple[int, di

Args:
observation: Latest observation produced by the environment (``env.reset()`` on the
first call, then the result of the prior ``env.step()``). Stateless agents such
as grid search or Bayesian optimization may ignore this; observation-conditioned
agents (RL, contextual bandits) should use it.
first call, then the result of the prior ``env.step()``). Stateless agents may
ignore this; observation-conditioned agents should use it.

Returns:
Tuple[int, Dict[str, Any]] | None: The current step index and a dictionary mapping action keys
Expand All @@ -120,8 +123,7 @@ def run(self) -> int:

Default: a step loop driven by the dispatcher (``select_action`` →
``env.step`` → ``update_policy`` per trial). Agents that drive their
own training loop (e.g. RLlib-based agents calling ``algo.train()``)
override this method.
own training loop override this method.

Failure contract (``handle_dse_job`` consumes the result via
``err |= agent.run()``):
Expand All @@ -131,7 +133,8 @@ def run(self) -> int:
accumulated and the next ``TestRun`` still executes. Workload-level
failures are already surfaced this way: ``CloudAIGymEnv.step`` maps a
failed metric to ``rewards.metric_failure`` rather than raising, and
``rllib_run`` catches training errors and returns ``rc=1``.
agents with their own training loop should likewise catch training
errors and return a non-zero code.
- Raise for *unexpected* failures (framework/agent bugs). Exceptions
propagate out of ``handle_dse_job`` and hard-fail the job so the bug
is surfaced instead of masked as a penalizing reward.
Expand Down
57 changes: 50 additions & 7 deletions src/cloudai/configurator/cloudai_gym.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from .base_agent import RewardOverrides
from .base_gym import BaseGym
from .env_params import EnvParams, EnvParamsSink


@dataclasses.dataclass(frozen=True)
Expand All @@ -36,6 +37,7 @@ class TrajectoryEntry:
action: dict[str, Any]
reward: float
observation: list
env_params: dict[str, Any] = dataclasses.field(default_factory=dict)


class CloudAIGymEnv(BaseGym):
Expand All @@ -61,8 +63,15 @@ def __init__(self, test_run: TestRun, runner: BaseRunner, rewards: RewardOverrid
self.max_steps = test_run.test.agent_steps
self.reward_function = Registry().get_reward_function(test_run.test.agent_reward_function)
self.trajectory: dict[int, list[TrajectoryEntry]] = {}
self.params: EnvParams | None = EnvParams.from_test(test_run.test)
self.env_params_sink = EnvParamsSink()
super().__init__()

@property
def env_params_record_path(self) -> Path:
"""``env.csv`` lives alongside ``trajectory.csv`` so a plain ``merge`` joins them."""
return self.iteration_dir / "env.csv"

def define_action_space(self) -> Dict[str, list[Any]]:
return self.test_run.param_space

Expand All @@ -76,9 +85,10 @@ def define_observation_space(self) -> list:
Define the observation space for the environment.

Returns:
list: The observation space.
list: One float slot per agent metric (at least one), giving the correct shape
for adapters that derive ``gymnasium.spaces.Box`` from this output.
"""
return [0.0]
return [0.0] * max(len(self.test_run.test.agent_metrics), 1)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it safe for existing agents?


def reset(
self,
Expand All @@ -100,7 +110,7 @@ def reset(
if seed is not None:
lazy.np.random.seed(seed)
self.test_run.current_iteration = 0
observation = [0.0]
observation = self.define_observation_space()
info = {}
return observation, info

Expand All @@ -119,7 +129,9 @@ def step(self, action: Any) -> Tuple[list, float, bool, dict]:
- info (dict): Additional info for debugging.
"""
self.test_run.increment_step()
self.test_run = self.test_run.apply_params_set(action)
# RNG lives in the env: sample here, then apply action + sample so the run and cache key see them.
sampled_env_params = self.params.sample(self.test_run.step) if self.params else {}
self.test_run = self.test_run.apply_params_set(action, env_params=sampled_env_params)

cached_result = self.get_cached_trajectory_result(action)
if cached_result is not None:
Expand All @@ -134,6 +146,7 @@ def step(self, action: Any) -> Tuple[list, float, bool, dict]:
action=action,
reward=cached_result.reward,
observation=cached_result.observation,
env_params=dict(self.test_run.current_env_params),
)
)
return cached_result.observation, cached_result.reward, False, {}
Expand Down Expand Up @@ -162,6 +175,9 @@ def step(self, action: Any) -> Tuple[list, float, bool, dict]:
self.test_run.step = new_tr.step
self.test_run.output_path = new_tr.output_path

# The test_run rebuild above drops the sample; restore it so the entry, cache key, and env.csv match.
self.test_run.current_env_params = new_tr.current_env_params

observation = self.get_observation(action)
reward = self.compute_reward(observation)

Expand All @@ -171,6 +187,7 @@ def step(self, action: Any) -> Tuple[list, float, bool, dict]:
action=action,
reward=reward,
observation=observation,
env_params=dict(self.test_run.current_env_params),
)
)

Expand Down Expand Up @@ -230,7 +247,14 @@ def get_observation(self, action: Any) -> list:
return observation

def write_trajectory(self, entry: TrajectoryEntry):
"""Append the trajectory to the CSV file and to the local attribute."""
"""
Append the entry to the in-memory cache and trajectory.csv (plus env.csv when declared).

``trajectory.csv`` and the ``env.csv`` projection are sunk from the same
``TrajectoryEntry`` here, so a trial that never produces an entry (e.g. a
constraint failure returns before this call) lands in neither file and the
two stay 1:1 step-aligned.
"""
self.current_trajectory.append(entry)

file_exists = self.trajectory_file_path.exists()
Expand All @@ -243,17 +267,36 @@ def write_trajectory(self, entry: TrajectoryEntry):
writer.writerow(["step", "action", "reward", "observation"])
writer.writerow([entry.step, entry.action, entry.reward, entry.observation])

self.env_params_sink.write(self.env_params_record_path, entry.step, entry.env_params)

@property
def iteration_dir(self) -> Path:
"""Per-iteration output dir; trajectory.csv and env.csv both live here, step-aligned."""
return self.runner.scenario_root / self.test_run.name / f"{self.test_run.current_iteration}"

@property
def trajectory_file_path(self) -> Path:
return self.runner.scenario_root / self.test_run.name / f"{self.test_run.current_iteration}" / "trajectory.csv"
return self.iteration_dir / "trajectory.csv"

@property
def current_trajectory(self) -> list[TrajectoryEntry]:
return self.trajectory.setdefault(self.test_run.current_iteration, [])

def get_cached_trajectory_result(self, action: Any) -> TrajectoryEntry | None:
"""
Return a cached entry only when the full trial identity matches.

Trial identity is ``(action, env_params)``: env-randomized parameters
change the workload's behaviour, so a trial repeating the same action
under a different ``env_params`` sample must miss and re-run. Empty
env_params on both sides is the back-compat path for workloads that
do not declare any ``[env_params.*]`` block.
"""
current_env_params = self.test_run.current_env_params
for entry in self.current_trajectory:
if self._values_match_exact(entry.action, action):
if not self._values_match_exact(entry.action, action):
continue
if self._values_match_exact(entry.env_params, current_env_params):
return entry

return None
Expand Down
Loading
Loading