Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 28 additions & 12 deletions src/cloudai/_core/test_scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class TestRun:
reports: Set[Type[ReportGenerationStrategy]] = field(default_factory=set)
extra_srun_args: str | None = None
num_nodes_explicit: bool = False
current_env_params: dict[str, Any] = field(default_factory=dict)

def __hash__(self) -> int:
return hash(self.name + self.test.name + str(self.iterations) + str(self.current_iteration))
Expand Down Expand Up @@ -156,7 +157,9 @@ def param_space(self) -> dict[str, Any]:
**{
key: value
for key, value in cmd_args_dict.items()
if isinstance(value, list) and not self.test.is_dse_excluded_arg(key)
if isinstance(value, list)
and not self.test.is_dse_excluded_arg(key)
and not self.test.is_env_sampled(key)
},
**{f"extra_env_vars.{key}": value for key, value in extra_env_vars_dict.items() if isinstance(value, list)},
}
Expand Down Expand Up @@ -184,27 +187,40 @@ def all_combinations(self) -> list[dict[str, Any]]:

return all_combinations

def apply_params_set(self, action: dict[str, Any]) -> "TestRun":
def apply_params_set(self, action: dict[str, Any], env_params: dict[str, Any] | None = None) -> "TestRun":
tdef = self.test.model_copy(deep=True)
for key, value in action.items():

def _apply(key: str, value: Any) -> None:
Comment on lines -189 to +193

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd suggest reverting the changes with the _apply function (local functions are a troublesome thing during debugging) and rather replace the previous root

for key, value in action.items():

with

full_action = action | (env_params or {})
for key, value in full_action.items():

it makes the delta of the PR quite shorter + no inner function

if key.startswith("extra_env_vars."):
tdef.extra_env_vars[key[len("extra_env_vars.") :]] = value
return
attrs = key.split(".")
obj = tdef.cmd_args
for attr in attrs[:-1]:
obj = obj[attr] if isinstance(obj, dict) else getattr(obj, attr)
if isinstance(obj, dict):
obj[attrs[-1]] = value
else:
attrs = key.split(".")
obj = tdef.cmd_args
for attr in attrs[:-1]:
obj = obj[attr] if isinstance(obj, dict) else getattr(obj, attr)
if isinstance(obj, dict):
obj[attrs[-1]] = value
else:
setattr(obj, attrs[-1], value)
setattr(obj, attrs[-1], value)

# RNG runs in the env before this call; applying only concrete values keeps this deterministic.
for key, value in action.items():
_apply(key, value)
for key, value in (env_params or {}).items():
_apply(key, value)

type(tdef)(**tdef.model_dump()) # trigger validation
# env_params is validated at parse time; after the overlay its target cmd_args fields hold
# concrete scalar draws, so re-validating it here would reject weighted specs. Drop it for
# this validation-only pass, which exists to validate the applied action values.
validation_args = tdef.model_dump()
validation_args.pop("env_params", None)
type(tdef)(**validation_args) # trigger validation

new_tr = copy.deepcopy(self)
new_tr.test = tdef
if "NUM_NODES" in action:
new_tr.num_nodes = action["NUM_NODES"]
Comment thread
rutayan-nv marked this conversation as resolved.
new_tr.current_env_params = dict(env_params or {})

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
new_tr.current_env_params = dict(env_params or {})
new_tr.current_env_params = env_params or {}

return new_tr


Expand Down
14 changes: 11 additions & 3 deletions src/cloudai/cli/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import toml
import yaml

from cloudai.configurator.env_params import validate_dse_env_params
from cloudai.core import (
BaseInstaller,
CloudAIGymEnv,
Expand All @@ -39,6 +40,7 @@
System,
TestParser,
TestScenario,
TestScenarioParsingError,
)
from cloudai.models.scenario import ReportConfig
from cloudai.models.workload import TestDefinition
Expand Down Expand Up @@ -133,8 +135,7 @@ def handle_dse_job(runner: Runner, args: argparse.Namespace) -> int:
return 1

err = 0
# Recoverable failures return a non-zero rc and are accumulated here; an unexpected exception
# (a bug) is a hard-fail. We capture it so reports still generate, then re-raise below.
# Capture an unexpected error so reports still generate, then re-raise below.
run_error: Exception | None = None
try:
for tr in runner.runner.test_scenario.test_runs:
Expand Down Expand Up @@ -303,6 +304,12 @@ def handle_dry_run_and_run(args: argparse.Namespace) -> int:
return 1
system, test_scenario, tests = setup_result

try:
validate_dse_env_params(test_scenario)
except TestScenarioParsingError as e:
logging.error(str(e))
return 1

Comment thread
rutayan-nv marked this conversation as resolved.
if not _handle_single_sbatch(args, system):
return 1

Expand Down Expand Up @@ -491,7 +498,8 @@ def verify_test_scenarios(
tests = Parser.parse_tests(test_tomls, system)
hook_tests = Parser.parse_tests(hook_test_tomls, system)
hooks = Parser.parse_hooks(hook_tomls, system, {t.name: t for t in hook_tests})
Parser.parse_test_scenario(scenario_file, system, {t.name: t for t in tests}, hooks)
scenario = Parser.parse_test_scenario(scenario_file, system, {t.name: t for t in tests}, hooks)
validate_dse_env_params(scenario)
except Exception:
nfailed += 1

Expand Down
15 changes: 9 additions & 6 deletions src/cloudai/configurator/base_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ class BaseAgent(ABC):
Provides a unified interface and parameter management for action spaces.
"""

# Opt-in: agents that consume per-trial env_params sampling set this True. Default False so
# env_params declared for a non-sampling agent are rejected rather than silently ignored.
samples_env_params: bool = False

def __init__(self, env: BaseGym, config: BaseAgentConfig):
"""
Initialize the agent with the environment.
Expand Down Expand Up @@ -94,9 +98,8 @@ def select_action(self, observation: list[float] | None = None) -> tuple[int, di

Args:
observation: Latest observation produced by the environment (``env.reset()`` on the
first call, then the result of the prior ``env.step()``). Stateless agents such
as grid search or Bayesian optimization may ignore this; observation-conditioned
agents (RL, contextual bandits) should use it.
first call, then the result of the prior ``env.step()``). Stateless agents may
ignore this; observation-conditioned agents should use it.

Returns:
Tuple[int, Dict[str, Any]] | None: The current step index and a dictionary mapping action keys
Expand All @@ -120,8 +123,7 @@ def run(self) -> int:

Default: a step loop driven by the dispatcher (``select_action`` →
``env.step`` → ``update_policy`` per trial). Agents that drive their
own training loop (e.g. RLlib-based agents calling ``algo.train()``)
override this method.
own training loop override this method.

Failure contract (``handle_dse_job`` consumes the result via
``err |= agent.run()``):
Expand All @@ -131,7 +133,8 @@ def run(self) -> int:
accumulated and the next ``TestRun`` still executes. Workload-level
failures are already surfaced this way: ``CloudAIGymEnv.step`` maps a
failed metric to ``rewards.metric_failure`` rather than raising, and
``rllib_run`` catches training errors and returns ``rc=1``.
agents with their own training loop should likewise catch training
errors and return a non-zero code.
- Raise for *unexpected* failures (framework/agent bugs). Exceptions
propagate out of ``handle_dse_job`` and hard-fail the job so the bug
is surfaced instead of masked as a penalizing reward.
Expand Down
50 changes: 46 additions & 4 deletions src/cloudai/configurator/cloudai_gym.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from .base_agent import RewardOverrides
from .base_gym import BaseGym
from .env_params import EnvParams, EnvParamsSink


@dataclasses.dataclass(frozen=True)
Expand All @@ -36,6 +37,7 @@ class TrajectoryEntry:
action: dict[str, Any]
reward: float
observation: list
env_params: dict[str, Any] = dataclasses.field(default_factory=dict)


class CloudAIGymEnv(BaseGym):
Expand All @@ -61,8 +63,15 @@ def __init__(self, test_run: TestRun, runner: BaseRunner, rewards: RewardOverrid
self.max_steps = test_run.test.agent_steps
self.reward_function = Registry().get_reward_function(test_run.test.agent_reward_function)
self.trajectory: dict[int, list[TrajectoryEntry]] = {}
self.params: EnvParams | None = EnvParams.from_test(test_run.test)
self.env_params_sink = EnvParamsSink()
super().__init__()

@property
def env_params_record_path(self) -> Path:
"""``env.csv`` lives alongside ``trajectory.csv`` so a plain ``merge`` joins them."""
return self.iteration_dir / "env.csv"

def define_action_space(self) -> Dict[str, list[Any]]:
return self.test_run.param_space

Expand Down Expand Up @@ -119,7 +128,9 @@ def step(self, action: Any) -> Tuple[list, float, bool, dict]:
- info (dict): Additional info for debugging.
"""
self.test_run.increment_step()
self.test_run = self.test_run.apply_params_set(action)
# RNG lives in the env: sample here, then apply action + sample so the run and cache key see them.
sampled_env_params = self.params.sample(self.test_run.step) if self.params else {}
self.test_run = self.test_run.apply_params_set(action, env_params=sampled_env_params)

cached_result = self.get_cached_trajectory_result(action)
if cached_result is not None:
Expand All @@ -134,6 +145,7 @@ def step(self, action: Any) -> Tuple[list, float, bool, dict]:
action=action,
reward=cached_result.reward,
observation=cached_result.observation,
env_params=dict(self.test_run.current_env_params),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

and I see this (to my understanding) redundant dict(...) call across entire PR. is there really a need for it?

Suggested change
env_params=dict(self.test_run.current_env_params),
env_params=self.test_run.current_env_params,

)
)
return cached_result.observation, cached_result.reward, False, {}
Expand Down Expand Up @@ -162,6 +174,9 @@ def step(self, action: Any) -> Tuple[list, float, bool, dict]:
self.test_run.step = new_tr.step
self.test_run.output_path = new_tr.output_path

# The test_run rebuild above drops the sample; restore it so the entry, cache key, and env.csv match.
self.test_run.current_env_params = new_tr.current_env_params
Comment on lines +177 to +178

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the same line in src/cloudai/_core/test_scenario.py:223. If it's indeed a duplicate - I would vote for removing this one and keeping one in scenario


observation = self.get_observation(action)
reward = self.compute_reward(observation)

Expand All @@ -171,6 +186,7 @@ def step(self, action: Any) -> Tuple[list, float, bool, dict]:
action=action,
reward=reward,
observation=observation,
env_params=dict(self.test_run.current_env_params),
)
)

Expand Down Expand Up @@ -230,7 +246,14 @@ def get_observation(self, action: Any) -> list:
return observation

def write_trajectory(self, entry: TrajectoryEntry):
"""Append the trajectory to the CSV file and to the local attribute."""
"""
Append the entry to the in-memory cache and trajectory.csv (plus env.csv when declared).

``trajectory.csv`` and the ``env.csv`` projection are sunk from the same
``TrajectoryEntry`` here, so a trial that never produces an entry (e.g. a
constraint failure returns before this call) lands in neither file and the
two stay 1:1 step-aligned.
"""
self.current_trajectory.append(entry)

file_exists = self.trajectory_file_path.exists()
Expand All @@ -243,17 +266,36 @@ def write_trajectory(self, entry: TrajectoryEntry):
writer.writerow(["step", "action", "reward", "observation"])
writer.writerow([entry.step, entry.action, entry.reward, entry.observation])

self.env_params_sink.write(self.env_params_record_path, entry.step, entry.env_params)

@property
def iteration_dir(self) -> Path:
"""Per-iteration output dir; trajectory.csv and env.csv both live here, step-aligned."""
return self.runner.scenario_root / self.test_run.name / f"{self.test_run.current_iteration}"

@property
def trajectory_file_path(self) -> Path:
return self.runner.scenario_root / self.test_run.name / f"{self.test_run.current_iteration}" / "trajectory.csv"
return self.iteration_dir / "trajectory.csv"

@property
def current_trajectory(self) -> list[TrajectoryEntry]:
return self.trajectory.setdefault(self.test_run.current_iteration, [])

def get_cached_trajectory_result(self, action: Any) -> TrajectoryEntry | None:
"""
Return a cached entry only when the full trial identity matches.

Trial identity is ``(action, env_params)``: env-randomized parameters
change the workload's behaviour, so a trial repeating the same action
under a different ``env_params`` sample must miss and re-run. Empty
env_params on both sides is the back-compat path for workloads that
do not declare any ``[env_params.*]`` block.
"""
current_env_params = self.test_run.current_env_params
for entry in self.current_trajectory:
if self._values_match_exact(entry.action, action):
if not self._values_match_exact(entry.action, action):
continue
if self._values_match_exact(entry.env_params, current_env_params):
return entry
Comment on lines +296 to 299

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit again, but I'd like to push a bit for readibility. we sacrifice a few ms for validation but I believe it's just mush easier to read

Suggested change
if not self._values_match_exact(entry.action, action):
continue
if self._values_match_exact(entry.env_params, current_env_params):
return entry
action_match = self._values_match_exact(entry.action, action)
env_params_match = self._values_match_exact(entry.env_params, current_env_params)
if action_match and env_params_match:
return entry


return None
Expand Down
Loading
Loading