diff --git a/README.md b/README.md index 419e4f15..e6036774 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ The Splunk Attack Range builds instrumented cloud environments (AWS, Azure, GCP) docker compose --profile cli -f docker/docker-compose.yml run --rm attack_range build -t aws/splunk_minimal_aws ``` - Other actions: `destroy`, `simulate`, `share`. See [Detailed documentation](https://attack-range.readthedocs.io/en/latest/) for CLI usage and flags. + Other actions: `destroy`, `simulate`, `apply-role`, `share`. See [Detailed documentation](https://attack-range.readthedocs.io/en/latest/) for CLI usage and flags. --- @@ -60,7 +60,7 @@ The Splunk Attack Range builds instrumented cloud environments (AWS, Azure, GCP) | **Docker Compose** (recommended) | Run API + web app + optional CLI with one `docker compose`; no local Python/Ansible/Terraform. | | **Web app** | Build, destroy, simulate, and share via the UI at port 4321. | | **REST API** | Automate from scripts or CI; full OpenAPI docs at `/openapi/swagger`. | -| **CLI** | `attack_range.py build | destroy | simulate | share` for terminal-based workflows. | +| **CLI** | `attack_range.py build | destroy | simulate | apply-role | share` for terminal-based workflows. | --- diff --git a/api/README.md b/api/README.md index d4db42dd..8ae3c0a6 100644 --- a/api/README.md +++ b/api/README.md @@ -178,6 +178,46 @@ Get the status of a build or destroy operation by attack_range_id. - `running`: Attack range is fully deployed and running - `error`: Operation failed (includes `error`, `error_phase`, and `traceback` fields) +#### `POST /attack-range/apply-role` + +Stage and execute local Ansible roles against a target server in a running attack range. This is a **synchronous** operation. The attack range must be in `running` or `completed` status. + +Each role is provided as a **base64-encoded gzip tarball** of the role root directory (`tasks/`, `meta/`, etc.). Package a role with: + +```bash +tar czf my_role.tar.gz -C /path/to/parent my_role +ROLE_B64=$(base64 -i my_role.tar.gz | tr -d '\n') +``` + +**Request Body:** +```json +{ + "attack_range_id": "550e8400-e29b-41d4-a716-446655440000", + "target": "splunk", + "roles": [ + { + "content_base64": "", + "name": "optional.namespace.role_name", + "vars": { + "example_var": "value" + } + } + ] +} +``` + +**Response (200 OK):** +```json +{ + "status": "success", + "message": "Successfully applied 1 role(s) on splunk", + "attack_range_id": "550e8400-e29b-41d4-a716-446655440000", + "target": "splunk", + "roles_applied": ["custom.my_role"], + "execution_output": null +} +``` + ### Template Management #### `GET /templates` diff --git a/api/app.py b/api/app.py index b2a1f7b3..f7678145 100644 --- a/api/app.py +++ b/api/app.py @@ -48,6 +48,8 @@ ProviderCheckResponse, SimulateRequest, SimulateResponse, + ApplyRoleRequest, + ApplyRoleResponse, SplunkExportRequest, SplunkExportResponse, ShareRequest, @@ -1566,6 +1568,98 @@ def simulate_attack_range(body: SimulateRequest): ).model_dump()), 500 +@app.post( + "/attack-range/apply-role", + tags=[attack_range_tag], + responses={200: ApplyRoleResponse, 400: ErrorResponse, 404: ErrorResponse, 500: ErrorResponse}, + summary="Apply local Ansible roles", + description=( + "Stage and execute local Ansible roles against a target server in a running attack range. " + "Each role is provided as a base64-encoded gzip tarball of the role root. " + "This is a synchronous operation." + ), +) +def apply_role_attack_range(body: ApplyRoleRequest): + """Stage and execute local Ansible roles on a target server.""" + try: + config_path = os.path.join(CONFIG_DIR, f"{body.attack_range_id}.yml") + if not os.path.exists(config_path): + return jsonify(ErrorResponse( + message=f"Attack range with ID '{body.attack_range_id}' not found", + details=f"Config file not found: {config_path}", + ).model_dump()), 404 + + config = load_yaml_file(config_path) + if not config: + return jsonify(ErrorResponse( + message=f"Failed to load config for attack range '{body.attack_range_id}'", + ).model_dump()), 500 + + status = config.get("general", {}).get("status", "") + if status not in ["running", "completed"]: + return jsonify(ErrorResponse( + message=( + f"Cannot apply roles. Attack range status is '{status}'. " + "Must be 'running' or 'completed'." + ), + ).model_dump()), 400 + + attack_range_config = config.get("attack_range", []) + target_found = any(server.get("name") == body.target for server in attack_range_config) + if not target_found: + available_servers = [s.get("name") for s in attack_range_config if s.get("name")] + return jsonify(ErrorResponse( + message=f"Target server '{body.target}' not found in attack range configuration", + details=f"Available servers: {', '.join(available_servers) if available_servers else 'None'}", + ).model_dump()), 400 + + controller = AttackRangeController(config, config_path=config_path) + try: + roles_payload = [ + { + "content_base64": role.content_base64, + "name": role.name, + "vars": role.vars, + } + for role in body.roles + ] + result = controller.apply_role(body.target, roles_payload) + except ValueError as e: + return jsonify(ErrorResponse( + message="Apply role validation failed", + details=str(e), + ).model_dump()), 400 + except RuntimeError as e: + return jsonify(ErrorResponse( + message="Apply role execution failed", + details=str(e), + ).model_dump()), 500 + + roles_applied = result.get("roles_applied", []) + return jsonify(ApplyRoleResponse( + status="success", + message=( + f"Successfully applied {len(roles_applied)} role(s) on {body.target}" + ), + attack_range_id=body.attack_range_id, + target=body.target, + roles_applied=roles_applied, + execution_output=result.get("execution_output"), + ).model_dump()), 200 + + except ValidationError as e: + return jsonify(ErrorResponse( + message="Invalid apply-role request", + details=str(e), + ).model_dump()), 400 + except Exception as e: + error_traceback = traceback.format_exc() + return jsonify(ErrorResponse( + message="Failed to apply roles", + details=f"{str(e)}\n\n{error_traceback}", + ).model_dump()), 500 + + @app.post( "/attack-range/splunk/export", tags=[attack_range_tag], diff --git a/api/models.py b/api/models.py index d0493f5a..5201b473 100644 --- a/api/models.py +++ b/api/models.py @@ -311,6 +311,60 @@ class SimulateResponse(BaseModel): ) +class LocalRoleTarget(BaseModel): + """Local Ansible role packaged as a base64-encoded gzip tarball.""" + content_base64: str = Field( + ..., + description="Base64-encoded gzip tarball of the role root (tasks/, meta/, etc.)", + ) + name: Optional[str] = Field( + None, + description="Optional Ansible role name override (defaults to meta/main.yml or directory name)", + ) + vars: Dict[str, Any] = Field( + default_factory=dict, + description="Optional variables passed to the role", + ) + + +class ApplyRoleRequest(BaseModel): + """Request model for applying local Ansible roles after build.""" + attack_range_id: str = Field(..., description="Attack range ID") + target: str = Field(..., description="Target server name (must match a server name in attack_range config)") + roles: List[LocalRoleTarget] = Field( + ..., + min_length=1, + description="Local roles to stage and execute on the target host", + ) + + class Config: + json_schema_extra = { + "example": { + "attack_range_id": "550e8400-e29b-41d4-a716-446655440000", + "target": "splunk", + "roles": [ + { + "content_base64": "", + "vars": {"example_var": "value"}, + } + ], + } + } + + +class ApplyRoleResponse(BaseModel): + """Response model for apply-role operation.""" + status: str = Field(..., description="Operation status") + message: str = Field(..., description="Status message") + attack_range_id: str = Field(..., description="Attack range ID") + target: str = Field(..., description="Target server name") + roles_applied: List[str] = Field(..., description="Resolved Ansible role names that were applied") + execution_output: Optional[Dict[str, Any]] = Field( + None, + description="Optional Ansible execution details", + ) + + class SplunkExportRequest(BaseModel): """Request model for exporting raw events from the attack range Splunk server.""" attack_range_id: str = Field(..., description="Attack range ID") diff --git a/attack_range.py b/attack_range.py index 9ef76a74..9aefc256 100644 --- a/attack_range.py +++ b/attack_range.py @@ -7,9 +7,11 @@ import sys import argparse import glob +import json import yaml from attack_range.attack_range_controller import AttackRangeController +from attack_range.managers.ansible_manager import resolve_local_role_name from attack_range.utils import prepare_config_from_template, resolve_template_path os.environ["OBJC_DISABLE_INITIALIZE_FORK_SAFETY"] = "YES" @@ -232,6 +234,95 @@ def simulate_action(args): controller.simulate(args.target, techniques, atomics, atomic_files) +def _resolve_config_path(config_arg: str | None) -> str: + """Resolve config path from CLI arg or single config in config/.""" + config_dir = os.path.join(os.path.dirname(__file__), "config") + + if not config_arg: + yml_files = glob.glob(os.path.join(config_dir, "*.yml")) + if not yml_files: + print("Error: No config files found in config folder.") + print("Please specify a config file using --config") + sys.exit(1) + if len(yml_files) > 1: + print(f"Error: Multiple config files found ({len(yml_files)} files).") + print("Please specify which config file to use with --config") + print("\nAvailable config files:") + for yml_file in sorted(yml_files): + print(f" - {os.path.basename(yml_file)}") + sys.exit(1) + return os.path.abspath(yml_files[0]) + + config_path = config_arg + if os.path.dirname(config_path) in ("", "."): + config_path = os.path.join(config_dir, os.path.basename(config_path)) + if not os.path.exists(config_path): + print(f"Error: Config file not found: {config_path}") + sys.exit(1) + return os.path.abspath(config_path) + + +def _load_role_vars_file(vars_file_path: str, role_paths: list[str]) -> dict[str, dict]: + """Load per-role vars from YAML/JSON; shared vars apply to all roles when unkeyed.""" + with open(vars_file_path, "r", encoding="utf-8") as f: + if vars_file_path.endswith(".json"): + data = json.load(f) + else: + data = yaml.safe_load(f) + + if not data: + return {} + if not isinstance(data, dict): + print("Error: --vars-file must contain a YAML/JSON object.") + sys.exit(1) + + resolved_names = [resolve_local_role_name(os.path.abspath(path)) for path in role_paths] + if any(name in data for name in resolved_names): + return {name: data.get(name, {}) or {} for name in resolved_names} + return {name: data for name in resolved_names} + + +def apply_role_action(args): + """Execute apply-role action.""" + if not args.role: + print("Error: Specify at least one local role directory with -r/--role.") + sys.exit(1) + + for role_path in args.role: + if not os.path.isdir(role_path): + print(f"Error: Role directory not found: {role_path}") + sys.exit(1) + + config_path = _resolve_config_path(args.config) + vars_by_role = {} + if args.vars_file: + if not os.path.isfile(args.vars_file): + print(f"Error: Vars file not found: {args.vars_file}") + sys.exit(1) + vars_by_role = _load_role_vars_file(args.vars_file, args.role) + + roles = [] + for role_path in args.role: + abs_path = os.path.abspath(role_path) + role_name = resolve_local_role_name(abs_path) + role_entry = {"path": abs_path} + if vars_by_role: + role_entry["vars"] = vars_by_role.get(role_name, {}) + roles.append(role_entry) + + config = load_config(config_path) + controller = AttackRangeController(config, config_path=config_path) + try: + result = controller.apply_role(args.target, roles) + print(f"Successfully applied role(s) on {result['target']}: {', '.join(result['roles_applied'])}") + except ValueError as e: + print(f"Error: {e}") + sys.exit(1) + except RuntimeError as e: + print(f"Error: {e}") + sys.exit(1) + + def share_action(args): """Execute share action.""" config_dir = os.path.join(os.path.dirname(__file__), "config") @@ -354,6 +445,35 @@ def main(): ) simulate_parser.set_defaults(func=simulate_action) + # Apply-role action + apply_role_parser = subparsers.add_parser( + "apply-role", + help="Stage and execute local Ansible roles against a target server", + ) + apply_role_parser.add_argument( + "-c", + "--config", + help="Path to the config file (optional if only one config exists in config/ folder)", + ) + apply_role_parser.add_argument( + "-t", + "--target", + required=True, + help="Target server name (must match a server name in attack_range config)", + ) + apply_role_parser.add_argument( + "-r", + "--role", + action="append", + required=True, + help="Path to a local Ansible role directory (repeat for multiple roles)", + ) + apply_role_parser.add_argument( + "--vars-file", + help="YAML or JSON file with role variables (keyed by role name, or shared across all roles)", + ) + apply_role_parser.set_defaults(func=apply_role_action) + # Share action share_parser = subparsers.add_parser( "share", diff --git a/attack_range/attack_range_controller.py b/attack_range/attack_range_controller.py index a34d93db..489e117c 100644 --- a/attack_range/attack_range_controller.py +++ b/attack_range/attack_range_controller.py @@ -14,7 +14,7 @@ # Import managers from .managers.config_manager import ConfigManager from .managers.terraform_manager import TerraformManager -from .managers.ansible_manager import AnsibleManager +from .managers.ansible_manager import AnsibleManager, APPLY_LOCAL_ROLES_PLAYBOOK from .managers.ssh_manager import SSHManager from .managers.backend_manager import BackendManager @@ -534,6 +534,115 @@ def simulate( # Return execution output for API response return execution_output + def apply_role(self, target: str, roles: list | None = None) -> dict: + """ + Stage and execute local Ansible roles against a target server. + + :param target: Target server name (must match a server name in attack_range config) + :param roles: List of dicts with ``path`` (CLI) or ``content_base64`` (API), + optional ``name`` override, and optional ``vars`` + :raises ValueError: If validation fails + :raises RuntimeError: If role staging or playbook execution fails + """ + roles = roles or [] + if not roles: + error_msg = "No roles specified. Provide at least one local role." + self.logger.error(error_msg) + raise ValueError(error_msg) + + self.logger.info( + f"[action] > apply_role on target: {target} with {len(roles)} role(s)\n" + ) + + status = self.config.get("general", {}).get("status", "") + if status not in ["running", "completed"]: + error_msg = ( + f"Cannot apply roles. Attack range status is: {status}. " + "Must be 'running' or 'completed'." + ) + self.logger.error(error_msg) + raise ValueError(error_msg) + + attack_range_config = self.config.get("attack_range", []) + target_server = None + for server in attack_range_config: + if server.get("name") == target: + target_server = server + break + + if not target_server: + available_servers = [s.get("name") for s in attack_range_config if s.get("name")] + error_msg = f"Target server '{target}' not found in attack_range configuration." + self.logger.error(error_msg) + raise ValueError( + f"{error_msg} Available servers: " + f"{', '.join(available_servers) if available_servers else 'None'}" + ) + + self.ansible_manager.update_inventory_attack_range_servers() + self.ansible_manager.update_inventory_password() + + import yaml as yaml_lib + + with open(self.ansible_manager.inventory_path, "r", encoding="utf-8") as f: + inventory = yaml_lib.safe_load(f) + + if target not in inventory or "hosts" not in inventory.get(target, {}): + available_groups = [ + k for k, v in inventory.items() if isinstance(v, dict) and "hosts" in v + ] + error_msg = ( + f"Inventory group '{target}' not found. " + f"Available groups: {', '.join(available_groups) if available_groups else 'None'}" + ) + self.logger.error(error_msg) + raise ValueError(error_msg) + + role_specs = [] + roles_applied = [] + for role in roles: + role_path = role.get("path") + content_base64 = role.get("content_base64") + name_override = role.get("name") + role_vars = role.get("vars") or {} + + if role_path and content_base64: + raise ValueError("Each role must specify either path or content_base64, not both.") + if not role_path and not content_base64: + raise ValueError("Each role must specify either path or content_base64.") + + if role_path: + if not os.path.isdir(role_path): + raise ValueError(f"Role path is not a directory: {role_path}") + resolved_name = self.ansible_manager.stage_local_role( + os.path.abspath(role_path), name_override + ) + else: + resolved_name = self.ansible_manager.stage_local_role_from_tarball( + content_base64, name_override + ) + + roles_applied.append(resolved_name) + role_specs.append({"name": resolved_name, "vars": role_vars}) + + become = self.ansible_manager.get_server_become(target) + self.ansible_manager.update_apply_roles_playbook(target, role_specs, become) + + self.logger.info( + f"Running local roles on {target}: {', '.join(roles_applied)}" + ) + execution_output = self.ansible_manager.run_ansible_playbook_safe( + APPLY_LOCAL_ROLES_PLAYBOOK + ) + + self.logger.info(f"\nRole apply completed successfully on {target}") + + return { + "target": target, + "roles_applied": roles_applied, + "execution_output": execution_output, + } + def share(self, name: str) -> str: """ Share the attack range by generating a new WireGuard client config for the given name. diff --git a/attack_range/managers/ansible_manager.py b/attack_range/managers/ansible_manager.py index 1ad8c08d..bc5d81ff 100644 --- a/attack_range/managers/ansible_manager.py +++ b/attack_range/managers/ansible_manager.py @@ -5,10 +5,14 @@ playbook updates, and playbook execution. """ +import base64 +import io import json import os import sys import re +import shutil +import tarfile import tempfile import yaml import time @@ -17,12 +21,38 @@ import shlex import logging import ansible_runner -from typing import Optional, Dict, Any +from typing import Optional, Dict, Any, List # Galaxy role that must always be updated to latest before VPN playbooks (vpn.yaml, vpn_config.yaml) WIREGUARD_GALAXY_ROLE = "p4t12ick.ar_wireguard_vpn" WG_CI_CLIENT_CONFIG = "client1.conf" WG_CI_ROUTER_IP = "10.0.1.10" +LOCAL_ROLE_MAX_TAR_BYTES = 50 * 1024 * 1024 +APPLY_LOCAL_ROLES_PLAYBOOK = "apply_local_roles.yaml" + + +def resolve_local_role_name(role_path: str, override: Optional[str] = None) -> str: + """Resolve Galaxy-style role name from meta/main.yml or directory basename.""" + if override and str(override).strip(): + return str(override).strip() + + meta_candidates = ( + os.path.join(role_path, "meta", "main.yml"), + os.path.join(role_path, "meta", "main.yaml"), + ) + meta_file = next((path for path in meta_candidates if os.path.isfile(path)), None) + if meta_file: + with open(meta_file, "r", encoding="utf-8") as f: + meta = yaml.safe_load(f) or {} + galaxy_info = meta.get("galaxy_info") or {} + role_name = galaxy_info.get("role_name") + namespace = galaxy_info.get("namespace") or galaxy_info.get("author") + if role_name and namespace: + return f"{namespace}.{role_name}" + if role_name: + return str(role_name) + + return os.path.basename(os.path.abspath(role_path)) _ART_SUMMARY_TASK_MARKERS = ( "Atomic Red Team execution summary", @@ -1328,6 +1358,166 @@ def update_ansible_galaxy_roles(self) -> None: self.logger.info(f"All {len(roles_to_install)} ansible galaxy roles installed successfully") + def _validate_role_directory(self, path: str) -> None: + """Require a directory with tasks/main.yml or tasks/main.yaml.""" + if not os.path.isdir(path): + raise ValueError(f"Role path is not a directory: {path}") + tasks_candidates = ( + os.path.join(path, "tasks", "main.yml"), + os.path.join(path, "tasks", "main.yaml"), + ) + if not any(os.path.isfile(candidate) for candidate in tasks_candidates): + raise ValueError(f"Invalid Ansible role: missing tasks/main.yml at {path}") + + def _resolve_role_name(self, role_path: str, override: Optional[str] = None) -> str: + """Resolve Galaxy-style role name from meta/main.yml or directory basename.""" + return resolve_local_role_name(role_path, override) + + def _local_roles_dir(self) -> str: + roles_dir = os.path.join(self.ansible_dir, "roles") + os.makedirs(roles_dir, exist_ok=True) + return roles_dir + + def _stage_role_copy(self, role_path: str, role_name: str) -> str: + """Copy a validated role tree into terraform/ansible/roles/.""" + self._validate_role_directory(role_path) + roles_dir = self._local_roles_dir() + dest = os.path.join(roles_dir, role_name) + if os.path.exists(dest): + self.logger.warning(f"Overwriting existing staged role at {dest}") + shutil.rmtree(dest) + shutil.copytree(role_path, dest) + self.logger.info(f"Staged local role '{role_name}' at {dest}") + return role_name + + def _is_safe_tar_member(self, member: tarfile.TarInfo, dest_dir: str) -> bool: + if member.name.startswith("/") or member.name.startswith("\\"): + return False + target = os.path.realpath(os.path.join(dest_dir, member.name)) + dest_real = os.path.realpath(dest_dir) + return target == dest_real or target.startswith(dest_real + os.sep) + + def _find_extracted_role_root(self, extract_dir: str) -> str: + try: + self._validate_role_directory(extract_dir) + return extract_dir + except ValueError: + pass + + entries = [name for name in os.listdir(extract_dir) if not name.startswith(".")] + if len(entries) == 1: + candidate = os.path.join(extract_dir, entries[0]) + if os.path.isdir(candidate): + self._validate_role_directory(candidate) + return candidate + + for name in entries: + candidate = os.path.join(extract_dir, name) + if os.path.isdir(candidate): + try: + self._validate_role_directory(candidate) + return candidate + except ValueError: + continue + + raise ValueError("Could not find a valid Ansible role in tarball (expected tasks/main.yml)") + + def _extract_role_tarball(self, tarball_bytes: bytes, dest_dir: str) -> str: + os.makedirs(dest_dir, exist_ok=True) + with tarfile.open(fileobj=io.BytesIO(tarball_bytes), mode="r:*") as tar: + for member in tar.getmembers(): + if not self._is_safe_tar_member(member, dest_dir): + raise ValueError(f"Unsafe path in role tarball: {member.name}") + extract_kwargs = {} + if "filter" in tar.extractall.__code__.co_varnames: + extract_kwargs["filter"] = "data" + tar.extractall(dest_dir, **extract_kwargs) + return self._find_extracted_role_root(dest_dir) + + def stage_local_role(self, role_path: str, name: Optional[str] = None) -> str: + """Validate and stage a local role directory on the Ansible controller.""" + role_path = os.path.abspath(role_path) + role_name = self._resolve_role_name(role_path, name) + return self._stage_role_copy(role_path, role_name) + + def stage_local_role_from_tarball(self, content_base64: str, name: Optional[str] = None) -> str: + """Decode a base64 gzip tarball, extract safely, and stage the role.""" + try: + tarball_bytes = base64.b64decode(content_base64, validate=True) + except Exception as exc: + raise ValueError(f"Invalid base64 role tarball: {exc}") from exc + + if len(tarball_bytes) > LOCAL_ROLE_MAX_TAR_BYTES: + raise ValueError( + f"Role tarball exceeds maximum size of {LOCAL_ROLE_MAX_TAR_BYTES} bytes" + ) + + with tempfile.TemporaryDirectory() as extract_dir: + role_root = self._extract_role_tarball(tarball_bytes, extract_dir) + role_name = self._resolve_role_name(role_root, name) + return self._stage_role_copy(role_root, role_name) + + def get_server_become(self, target: str) -> Optional[bool]: + """ + Return playbook-level become for a target host group. + + None means omit become (default for Windows). True/false set become explicitly. + """ + attack_range_config = self.config.get("attack_range", []) + for entry in attack_range_config: + entry_name = entry.get("name") + roles = entry.get("roles", []) + is_match = entry_name == target + if not is_match: + for role in roles: + if isinstance(role, dict) and role.get("inventory_name") == target: + is_match = True + break + if not is_match: + continue + + is_windows = entry.get("windows", False) + entry_become = entry.get("become") + if is_windows: + return entry_become + if entry_become is not None: + return entry_become + return True + + return True + + def update_apply_roles_playbook( + self, + target_host: str, + role_specs: List[Dict[str, Any]], + become: Optional[bool], + ) -> None: + """Write apply_local_roles.yaml for staged local roles on a single target.""" + playbook_path = os.path.join(self.ansible_dir, APPLY_LOCAL_ROLES_PLAYBOOK) + play: Dict[str, Any] = { + "hosts": target_host, + "roles": [], + } + if become is True: + play["become"] = True + elif become is False: + play["become"] = False + + for spec in role_specs: + role_entry: Dict[str, Any] = {"role": spec["name"]} + role_vars = spec.get("vars") or {} + if role_vars: + role_entry["vars"] = role_vars + play["roles"].append(role_entry) + + with open(playbook_path, "w", encoding="utf-8") as f: + yaml.dump([play], f, default_flow_style=False, sort_keys=False, allow_unicode=True) + + self.logger.info( + f"{APPLY_LOCAL_ROLES_PLAYBOOK} generated for target '{target_host}' " + f"with {len(role_specs)} role(s)" + ) + def _ci_wireguard_config_path(self) -> str: return os.path.join(self.ansible_dir, "client_configs", WG_CI_CLIENT_CONFIG) diff --git a/docs/source/getting-started.md b/docs/source/getting-started.md index 36eb7e0a..5d60c8ad 100644 --- a/docs/source/getting-started.md +++ b/docs/source/getting-started.md @@ -43,7 +43,7 @@ Docker Compose runs the API, web app, and (optionally) the CLI without installin docker compose --profile cli -f docker/docker-compose.yml run --rm attack_range build -t aws/splunk_minimal_aws ``` - The CLI will prompt you to connect to the VPN, then continue the lab build. Other commands: `destroy`, `simulate`, `share` (see [Configuration](configuration.md) and CLI sections below). + The CLI will prompt you to connect to the VPN, then continue the lab build. Other commands: `destroy`, `simulate`, `apply-role`, `share` (see [Configuration](configuration.md) and CLI sections below). ## Using the Web App @@ -65,6 +65,7 @@ The REST API runs on port **4000** and provides: - **List:** `GET /attack-range/list` - **Destroy:** `POST /attack-range/destroy` with `attack_range_id` - **Simulate:** `POST /attack-range/simulate` with `attack_range_id`, `target`, and `techniques` and/or `atomics` (each atomic: `technique` + `guid` / `auto_generated_guid`) +- **Apply role:** `POST /attack-range/apply-role` with `attack_range_id`, `target`, and `roles[]` (each role: base64-encoded gzip tarball in `content_base64`, optional `name` and `vars`) - **Splunk export:** `POST /attack-range/splunk/export` with `attack_range_id`, `search`, `earliest_time`, `latest_time`, optional `max_results` - **Share:** `POST /attack-range/share` with `attack_range_id`, `name` - **Templates:** `GET /templates`, `GET /templates//` @@ -100,6 +101,10 @@ You can run the CLI: - `python attack_range.py simulate -t -te T1003.001,T1059.003 [-c config]` - `python attack_range.py simulate -t -a T1003.001:0be2230c-9ab3-4ac2-8826-3199b9a0ebf8 [-c config]` +- **apply-role** — Stage and execute local Ansible role directories against a target server after the range is running. + - `python attack_range.py apply-role -t -r /path/to/role [-r /path/to/role2] [--vars-file vars.yml] [-c config]` + - `--vars-file` may key variables by role name or provide shared vars for all roles in the request. + - **share** — Generate a new WireGuard client config for sharing. - `python attack_range.py share -n alice [-c config]` diff --git a/tests/test_apply_role_models.py b/tests/test_apply_role_models.py new file mode 100644 index 00000000..8e1f46c7 --- /dev/null +++ b/tests/test_apply_role_models.py @@ -0,0 +1,48 @@ +"""Tests for apply-role API request/response models.""" + +import pytest +from pydantic import ValidationError + +from api.models import ApplyRoleRequest, ApplyRoleResponse, LocalRoleTarget + + +def test_apply_role_request_accepts_single_role(): + req = ApplyRoleRequest( + attack_range_id="id", + target="splunk", + roles=[LocalRoleTarget(content_base64="dGVzdA==")], + ) + assert len(req.roles) == 1 + assert req.roles[0].content_base64 == "dGVzdA==" + + +def test_apply_role_request_accepts_role_vars(): + req = ApplyRoleRequest( + attack_range_id="id", + target="splunk", + roles=[ + LocalRoleTarget( + content_base64="dGVzdA==", + name="custom.my_role", + vars={"foo": "bar"}, + ) + ], + ) + assert req.roles[0].name == "custom.my_role" + assert req.roles[0].vars == {"foo": "bar"} + + +def test_apply_role_request_requires_at_least_one_role(): + with pytest.raises(ValidationError): + ApplyRoleRequest(attack_range_id="id", target="splunk", roles=[]) + + +def test_apply_role_response_shape(): + resp = ApplyRoleResponse( + status="success", + message="ok", + attack_range_id="id", + target="splunk", + roles_applied=["custom.my_role"], + ) + assert resp.roles_applied == ["custom.my_role"] diff --git a/tests/test_local_role_staging.py b/tests/test_local_role_staging.py new file mode 100644 index 00000000..676a9f26 --- /dev/null +++ b/tests/test_local_role_staging.py @@ -0,0 +1,126 @@ +"""Tests for local Ansible role staging helpers.""" + +import base64 +import io +import os +import tarfile +import tempfile + +import pytest + +from attack_range.managers.ansible_manager import ( + AnsibleManager, + LOCAL_ROLE_MAX_TAR_BYTES, + resolve_local_role_name, +) + + +def _make_minimal_role(tmp_path, role_dir_name="my_role", namespace="custom", role_name="my_role"): + role_path = tmp_path / role_dir_name + role_path.mkdir() + (role_path / "tasks").mkdir() + (role_path / "tasks" / "main.yml").write_text("- debug:\n msg: hello\n") + (role_path / "meta").mkdir() + (role_path / "meta" / "main.yml").write_text( + f"galaxy_info:\n role_name: {role_name}\n namespace: {namespace}\n" + ) + return role_path + + +def _tar_role(role_path: str) -> bytes: + buf = io.BytesIO() + with tarfile.open(fileobj=buf, mode="w:gz") as tar: + tar.add(role_path, arcname=os.path.basename(role_path)) + return buf.getvalue() + + +@pytest.fixture +def ansible_manager(tmp_path): + ansible_dir = tmp_path / "ansible" + ansible_dir.mkdir() + inventory_path = ansible_dir / "inventory.yaml" + inventory_path.write_text("all:\n hosts: {}\n") + logger = __import__("logging").getLogger("test") + return AnsibleManager( + str(ansible_dir), + str(inventory_path), + {"attack_range": [{"name": "splunk", "linux": True}]}, + "aws", + logger, + ) + + +def test_resolve_local_role_name_from_meta(tmp_path): + role_path = _make_minimal_role(tmp_path) + assert resolve_local_role_name(str(role_path)) == "custom.my_role" + + +def test_resolve_local_role_name_override(tmp_path): + role_path = _make_minimal_role(tmp_path) + assert resolve_local_role_name(str(role_path), "override.role") == "override.role" + + +def test_stage_local_role_copies_to_roles_dir(ansible_manager, tmp_path): + role_path = _make_minimal_role(tmp_path) + resolved = ansible_manager.stage_local_role(str(role_path)) + assert resolved == "custom.my_role" + staged = os.path.join(ansible_manager.ansible_dir, "roles", "custom.my_role", "tasks", "main.yml") + assert os.path.isfile(staged) + + +def test_stage_local_role_rejects_invalid_directory(ansible_manager, tmp_path): + invalid = tmp_path / "not_a_role" + invalid.mkdir() + with pytest.raises(ValueError, match="tasks/main.yml"): + ansible_manager.stage_local_role(str(invalid)) + + +def test_stage_local_role_from_tarball(ansible_manager, tmp_path): + role_path = _make_minimal_role(tmp_path) + tarball = _tar_role(str(role_path)) + encoded = base64.b64encode(tarball).decode("ascii") + resolved = ansible_manager.stage_local_role_from_tarball(encoded) + assert resolved == "custom.my_role" + + +def test_stage_local_role_from_tarball_rejects_traversal(ansible_manager): + buf = io.BytesIO() + with tarfile.open(fileobj=buf, mode="w:gz") as tar: + info = tarfile.TarInfo(name="../escape/tasks/main.yml") + info.size = 0 + tar.addfile(info) + encoded = base64.b64encode(buf.getvalue()).decode("ascii") + with pytest.raises(ValueError, match="Unsafe path"): + ansible_manager.stage_local_role_from_tarball(encoded) + + +def test_stage_local_role_from_tarball_rejects_oversized(ansible_manager): + oversized = b"x" * (LOCAL_ROLE_MAX_TAR_BYTES + 1) + encoded = base64.b64encode(oversized).decode("ascii") + with pytest.raises(ValueError, match="maximum size"): + ansible_manager.stage_local_role_from_tarball(encoded) + + +def test_update_apply_roles_playbook_linux_become(ansible_manager): + ansible_manager.update_apply_roles_playbook( + "splunk", + [{"name": "custom.my_role", "vars": {"foo": "bar"}}], + True, + ) + playbook_path = os.path.join(ansible_manager.ansible_dir, "apply_local_roles.yaml") + content = open(playbook_path, encoding="utf-8").read() + assert "hosts: splunk" in content + assert "become: true" in content + assert "custom.my_role" in content + assert "foo: bar" in content + + +def test_get_server_become_windows_default_none(ansible_manager): + ansible_manager.config = { + "attack_range": [{"name": "win", "windows": True}], + } + assert ansible_manager.get_server_become("win") is None + + +def test_get_server_become_linux_default_true(ansible_manager): + assert ansible_manager.get_server_become("splunk") is True