Skip to content
Closed
5 changes: 5 additions & 0 deletions src/cloudai/_core/test_scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ def has_more_iterations(self) -> bool:
"""
return self.current_iteration + 1 < self.iterations

def increment_step(self) -> int:
"""Advance the trial counter and return the new value."""
self.step += 1
return self.step

@property
def metric_reporter(self) -> Optional[Type[ReportGenerationStrategy]]:
if not self.reports:
Expand Down
108 changes: 62 additions & 46 deletions src/cloudai/cli/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import copy
import logging
import signal
import traceback
from contextlib import contextmanager
from pathlib import Path
from typing import Callable, List, Optional
Expand Down Expand Up @@ -132,64 +133,79 @@ def handle_dse_job(runner: Runner, args: argparse.Namespace) -> int:
return 1

err = 0
for tr in runner.runner.test_scenario.test_runs:
test_run = copy.deepcopy(tr)

agent_type = test_run.test.agent
agent_class = registry.agents_map.get(agent_type)
if agent_class is None:
logging.error(
f"No agent available for type: {agent_type}. Please make sure {agent_type} "
f"is a valid agent type. Available agents: {registry.agents_map.keys()}"
# Recoverable failures return a non-zero rc and are accumulated here; an unexpected exception
# (a bug) is a hard-fail. We capture it so reports still generate, then re-raise below.
run_error: Exception | None = None
try:
for tr in runner.runner.test_scenario.test_runs:
test_run = copy.deepcopy(tr)

agent_type = test_run.test.agent
agent_class = registry.agents_map.get(agent_type)
if agent_class is None:
logging.error(
f"No agent available for type: {agent_type}. Please make sure {agent_type} "
f"is a valid agent type. Available agents: {registry.agents_map.keys()}"
)
err = 1
continue

agent_config_data = test_run.test.agent_config or {}
agent_config = agent_class.get_config_class()(**agent_config_data)
env = CloudAIGymEnv(
test_run=test_run,
runner=runner.runner,
rewards=agent_config.rewards,
)
err = 1
continue
if agent_config.start_action == "first":
logging.info(f"Using deterministic first sweep for the chosen agent: {env.first_sweep}.")

agent_config_data = test_run.test.agent_config or {}
agent_config = agent_class.get_config_class()(**agent_config_data)
env = CloudAIGymEnv(
test_run=test_run,
runner=runner.runner,
rewards=agent_config.rewards,
)
if agent_config.start_action == "first":
logging.info(f"Using deterministic first sweep for the chosen agent: {env.first_sweep}.")

agent = agent_class(env, agent_config)

observation, _ = env.reset()
for _ in range(agent.max_steps):
result = agent.select_action(observation=observation)
if result is None:
break
step, action = result
env.test_run.step = step
logging.info(f"Running step {step} (of {agent.max_steps}) with action {action}")
prev_observation = observation
observation, reward, done, *_ = env.step(action)
agent.update_policy(
{
"trial_index": step,
"value": reward,
"observation": observation,
"prev_observation": prev_observation,
"action": action,
"done": done,
}
)
logging.info(f"Step {step}: Observation: {[round(obs, 4) for obs in observation]}, Reward: {reward:.4f}")
agent = agent_class(env, agent_config)

err |= agent.run()
except Exception as exc:
run_error = exc
logging.exception("DSE job aborted by an unexpected error; generating reports before failing.")

if args.mode == "run":
runner.runner.test_scenario.test_runs = original_test_runs
generate_reports(runner.runner.system, runner.runner.test_scenario, runner.runner.scenario_root)
generate_reports(
runner.runner.system,
runner.runner.test_scenario,
runner.runner.scenario_root,
error=run_error,
)

if run_error is not None:
raise run_error

logging.info("All jobs are complete.")
return err


def generate_reports(system: System, test_scenario: TestScenario, result_dir: Path) -> None:
def _record_run_failure(result_dir: Path, error: BaseException) -> None:
"""Persist an aborting error into the results dir so the failure is documented with the reports."""
failure_path = result_dir / "dse_failure.txt"
tb = "".join(traceback.format_exception(type(error), error, error.__traceback__))
try:
result_dir.mkdir(parents=True, exist_ok=True)
failure_path.write_text(f"DSE job aborted by an unexpected {type(error).__name__}: {error}\n\n{tb}")
logging.info(f"Documented the aborting error in {failure_path}")
except OSError:
logging.exception(f"Failed to write failure report to {failure_path}")


def generate_reports(
system: System,
test_scenario: TestScenario,
result_dir: Path,
error: BaseException | None = None,
) -> None:
registry = Registry()

if error is not None:
_record_run_failure(result_dir, error)

for name, reporter_class in registry.ordered_scenario_reports():
logging.debug(f"Generating report '{name}' ({reporter_class.__name__})")

Expand Down
53 changes: 51 additions & 2 deletions src/cloudai/configurator/base_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from abc import ABC, abstractmethod
from typing import Any, Dict, Literal

Expand Down Expand Up @@ -87,7 +88,7 @@ def configure(self, config: dict[str, Any]) -> None:
pass

@abstractmethod
def select_action(self, observation: list[float] | None = None) -> tuple[int, dict[str, Any]]:
def select_action(self, observation: list[float] | None = None) -> tuple[int, dict[str, Any]] | None:
"""
Select an action from the action space.

Expand All @@ -98,7 +99,8 @@ def select_action(self, observation: list[float] | None = None) -> tuple[int, di
agents (RL, contextual bandits) should use it.

Returns:
Tuple[int, Dict[str, Any]]: The current step index and a dictionary mapping action keys to selected values.
Tuple[int, Dict[str, Any]] | None: The current step index and a dictionary mapping action keys
to selected values, or ``None`` to signal termination of the agent loop (``run()`` stops).
"""
pass

Expand All @@ -111,3 +113,50 @@ def update_policy(self, _feedback: Dict[str, Any]) -> None:
feedback (Dict[str, Any]): Feedback information from the environment.
"""
pass

def run(self) -> int:
"""
Orchestrate this agent's exploration over ``self.env``.

Default: a step loop driven by the dispatcher (``select_action`` →
``env.step`` → ``update_policy`` per trial). Agents that drive their
own training loop (e.g. RLlib-based agents calling ``algo.train()``)
override this method.

Failure contract (``handle_dse_job`` consumes the result via
``err |= agent.run()``):

- Return a non-zero code for *recoverable* failures (e.g. a workload run
that failed but should not abort the rest of the sweep). The code is
accumulated and the next ``TestRun`` still executes. Workload-level
failures are already surfaced this way: ``CloudAIGymEnv.step`` maps a
failed metric to ``rewards.metric_failure`` rather than raising, and
``rllib_run`` catches training errors and returns ``rc=1``.
- Raise for *unexpected* failures (framework/agent bugs). Exceptions
propagate out of ``handle_dse_job`` and hard-fail the job so the bug
is surfaced instead of masked as a penalizing reward.

Returns:
int: Process-style return code (``0`` success, non-zero recoverable failure).
"""
observation, _ = self.env.reset()
for _ in range(self.max_steps):
result = self.select_action(observation=observation)
if result is None:
break
step, action = result
logging.info(f"Running step {step} (of {self.max_steps}) with action {action}")
prev_observation = observation
observation, reward, done, *_ = self.env.step(action)
self.update_policy(
{
"trial_index": step,
"value": reward,
"observation": observation,
"prev_observation": prev_observation,
"action": action,
"done": done,
}
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
logging.info(f"Step {step}: Observation: {[round(obs, 4) for obs in observation]}, Reward: {reward:.4f}")
Comment thread
coderabbitai[bot] marked this conversation as resolved.
return 0
1 change: 1 addition & 0 deletions src/cloudai/configurator/cloudai_gym.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def step(self, action: Any) -> Tuple[list, float, bool, dict]:
- done (bool): Whether the episode is done.
- info (dict): Additional info for debugging.
"""
self.test_run.increment_step()
self.test_run = self.test_run.apply_params_set(action)

cached_result = self.get_cached_trajectory_result(action)
Expand Down
141 changes: 137 additions & 4 deletions tests/test_cloudaigym.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,13 @@ def test_tr_output_path(setup_env: tuple[TestRun, BaseRunner]):
agent = GridSearchAgent(env, GridSearchAgent.get_config_class()())

_, action = agent.select_action()
env.test_run.step = 42
env.test_run.step = 41
env.step(action)

assert env.test_run.output_path.name == "42"
assert env.test_run.output_path.name == "42", (
"CloudAIGymEnv.step() must advance test_run.step before computing output_path; "
"starting at 41, step #42's artifacts must land in dir '42'."
)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -417,7 +420,7 @@ def test_cached_step_appends_trajectory_row(nemorun: NeMoRunTestDefinition, tmp_
env.test_run.current_iteration = 0
env.trajectory = {0: [TrajectoryEntry(step=1, action=cached_action, reward=0.42, observation=[0.84])]}

env.test_run.step = 5
env.test_run.step = 4
obs, reward, done, _info = env.step(cached_action)

runner.run.assert_not_called()
Expand All @@ -426,7 +429,10 @@ def test_cached_step_appends_trajectory_row(nemorun: NeMoRunTestDefinition, tmp_
assert done is False
rows = env.trajectory[0]
assert len(rows) == 2
assert rows[-1].step == 5
assert rows[-1].step == 5, (
"CloudAIGymEnv.step() advances test_run.step before recording the trajectory row; "
"the cached row must be tagged with the advanced trial index, not the pre-step value."
)
assert rows[-1].reward == 0.42
assert rows[-1].action == cached_action

Expand All @@ -435,3 +441,130 @@ def test_cached_step_appends_trajectory_row(nemorun: NeMoRunTestDefinition, tmp_
contents = csv_path.read_text().strip().splitlines()
assert contents[0] == "step,action,reward,observation"
assert contents[-1].startswith("5,")


def _seed_cached_entry_with_env_params(
env: CloudAIGymEnv, action: dict[str, object], env_params: dict[str, object]
) -> None:
"""Seed env.trajectory with one entry, attaching env_params via object.__setattr__.

TrajectoryEntry is a frozen dataclass and does not yet declare env_params.
Once the field is added, drop this helper and pass env_params as a kwarg.
"""
entry = TrajectoryEntry(step=1, action=action, reward=0.5, observation=[100.0])
object.__setattr__(entry, "env_params", env_params)
env.test_run.current_iteration = 0
env.trajectory = {0: [entry]}


def test_cache_miss_when_env_params_differ(base_tr: TestRun, tmp_path: Path) -> None:
"""Cache MUST miss when env_params differ, even if action is identical.

Without this property the agent receives stale rewards on every cache hit
under domain randomization. PPO/DQN/BO all silently train on labels that
do not correspond to the env they were nominally generated under.
"""
runner = MagicMock()
runner.scenario_root = tmp_path / "scenario"
runner.system = MagicMock()
runner.test_scenario = MagicMock(test_runs=[])
runner.jobs = {}
runner.testrun_to_job_map = {}

env = CloudAIGymEnv(test_run=base_tr, runner=runner, rewards=RewardOverrides())
_seed_cached_entry_with_env_params(env, {"x": 10}, env_params={"drop_rate": 0.001})

env.test_run.current_env_params = {"drop_rate": 0.01} # type: ignore[attr-defined]

assert env.get_cached_trajectory_result({"x": 10}) is None, (
"Cache must include env_params in its key. The current implementation "
"keys on action alone, so trials repeating the same action under a "
"different env_params sample receive a stale cached reward. See "
"env-params-cloudai-corpus-plan.md."
)


def test_cache_hit_when_action_and_env_params_match(base_tr: TestRun, tmp_path: Path) -> None:
"""Same action AND same env_params must still HIT the cache."""
runner = MagicMock()
runner.scenario_root = tmp_path / "scenario"
runner.system = MagicMock()
runner.test_scenario = MagicMock(test_runs=[])
runner.jobs = {}
runner.testrun_to_job_map = {}

env = CloudAIGymEnv(test_run=base_tr, runner=runner, rewards=RewardOverrides())
_seed_cached_entry_with_env_params(env, {"x": 10}, env_params={"drop_rate": 0.001})

env.test_run.current_env_params = {"drop_rate": 0.001} # type: ignore[attr-defined]

result = env.get_cached_trajectory_result({"x": 10})
assert result is not None
assert result.step == 1


def test_cache_hit_when_neither_has_env_params(base_tr: TestRun, tmp_path: Path) -> None:
"""Workloads without env_params behave exactly as today (back-compat)."""
runner = MagicMock()
runner.scenario_root = tmp_path / "scenario"
runner.system = MagicMock()
runner.test_scenario = MagicMock(test_runs=[])
runner.jobs = {}
runner.testrun_to_job_map = {}

env = CloudAIGymEnv(test_run=base_tr, runner=runner, rewards=RewardOverrides())
env.test_run.current_iteration = 0
env.trajectory = {0: [TrajectoryEntry(step=1, action={"x": 10}, reward=0.5, observation=[100.0])]}
# Note: neither the cached entry nor test_run carries env_params -> existing behavior.

result = env.get_cached_trajectory_result({"x": 10})
assert result is not None
assert result.step == 1


def test_step_reruns_workload_when_env_params_change(nemorun: NeMoRunTestDefinition, tmp_path: Path) -> None:
"""Integration: env.step() with same action but different env_params re-runs the workload.

Counterpart to test_cache_miss_when_env_params_differ but exercising the
full step() flow: increment_step -> apply_params_set -> cache lookup ->
runner.run() -> write_trajectory.
"""
tdef = nemorun.model_copy(deep=True)
tdef.cmd_args.data.global_batch_size = 8
tdef.agent_metrics = ["default"]
test_run = TestRun(
name="dr_tr",
test=tdef,
num_nodes=1,
nodes=[],
output_path=tmp_path / "out" / "dr_tr" / "0",
reports={NeMoRunReportGenerationStrategy},
)
test_scenario = TestScenario(name="dr_scenario", test_runs=[test_run])

runner = MagicMock(spec=BaseRunner)
runner.scenario_root = tmp_path / "scenario"
runner.system = MagicMock()
runner.test_scenario = test_scenario
runner.jobs = {}
runner.testrun_to_job_map = {}
runner.shutting_down = False
runner.get_job_output_path.return_value = test_run.output_path

env = CloudAIGymEnv(test_run=test_run, runner=runner, rewards=RewardOverrides())
action = {"trainer.max_steps": 1000}
fake_obs = iter([[100.0], [50.0]])

with patch.object(env, "get_observation", side_effect=lambda _action: next(fake_obs)):
env.test_run.step = 0
env.test_run.current_env_params = {"drop_rate": 0.001} # type: ignore[attr-defined]
obs1, _r1, *_ = env.step(action)

env.test_run.current_env_params = {"drop_rate": 0.01} # type: ignore[attr-defined]
obs2, _r2, *_ = env.step(action)

assert runner.run.call_count == 2, (
"Different env_params between two env.step() calls with the same action "
"must trigger a workload re-run; the cache lookup must miss."
)
assert obs1 != obs2, "fresh workload run should produce a fresh observation"
Loading
Loading