Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ The Splunk Attack Range builds instrumented cloud environments (AWS, Azure, GCP)
docker compose --profile cli -f docker/docker-compose.yml run --rm attack_range build -t aws/splunk_minimal_aws
```

Other actions: `destroy`, `simulate`, `share`. See [Detailed documentation](https://attack-range.readthedocs.io/en/latest/) for CLI usage and flags.
Other actions: `destroy`, `simulate`, `apply-role`, `share`. See [Detailed documentation](https://attack-range.readthedocs.io/en/latest/) for CLI usage and flags.

---

Expand All @@ -60,7 +60,7 @@ The Splunk Attack Range builds instrumented cloud environments (AWS, Azure, GCP)
| **Docker Compose** (recommended) | Run API + web app + optional CLI with one `docker compose`; no local Python/Ansible/Terraform. |
| **Web app** | Build, destroy, simulate, and share via the UI at port 4321. |
| **REST API** | Automate from scripts or CI; full OpenAPI docs at `/openapi/swagger`. |
| **CLI** | `attack_range.py build | destroy | simulate | share` for terminal-based workflows. |
| **CLI** | `attack_range.py build | destroy | simulate | apply-role | share` for terminal-based workflows. |

---

Expand Down
40 changes: 40 additions & 0 deletions api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,46 @@ Get the status of a build or destroy operation by attack_range_id.
- `running`: Attack range is fully deployed and running
- `error`: Operation failed (includes `error`, `error_phase`, and `traceback` fields)

#### `POST /attack-range/apply-role`

Stage and execute local Ansible roles against a target server in a running attack range. This is a **synchronous** operation. The attack range must be in `running` or `completed` status.

Each role is provided as a **base64-encoded gzip tarball** of the role root directory (`tasks/`, `meta/`, etc.). Package a role with:

```bash
tar czf my_role.tar.gz -C /path/to/parent my_role
ROLE_B64=$(base64 -i my_role.tar.gz | tr -d '\n')
```

**Request Body:**
```json
{
"attack_range_id": "550e8400-e29b-41d4-a716-446655440000",
"target": "splunk",
"roles": [
{
"content_base64": "<base64-encoded-role-tarball>",
"name": "optional.namespace.role_name",
"vars": {
"example_var": "value"
}
}
]
}
```

**Response (200 OK):**
```json
{
"status": "success",
"message": "Successfully applied 1 role(s) on splunk",
"attack_range_id": "550e8400-e29b-41d4-a716-446655440000",
"target": "splunk",
"roles_applied": ["custom.my_role"],
"execution_output": null
}
```

### Template Management

#### `GET /templates`
Expand Down
94 changes: 94 additions & 0 deletions api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
ProviderCheckResponse,
SimulateRequest,
SimulateResponse,
ApplyRoleRequest,
ApplyRoleResponse,
SplunkExportRequest,
SplunkExportResponse,
ShareRequest,
Expand Down Expand Up @@ -1566,6 +1568,98 @@ def simulate_attack_range(body: SimulateRequest):
).model_dump()), 500


@app.post(
"/attack-range/apply-role",
tags=[attack_range_tag],
responses={200: ApplyRoleResponse, 400: ErrorResponse, 404: ErrorResponse, 500: ErrorResponse},
summary="Apply local Ansible roles",
description=(
"Stage and execute local Ansible roles against a target server in a running attack range. "
"Each role is provided as a base64-encoded gzip tarball of the role root. "
"This is a synchronous operation."
),
)
def apply_role_attack_range(body: ApplyRoleRequest):
"""Stage and execute local Ansible roles on a target server."""
try:
config_path = os.path.join(CONFIG_DIR, f"{body.attack_range_id}.yml")
if not os.path.exists(config_path):
return jsonify(ErrorResponse(
message=f"Attack range with ID '{body.attack_range_id}' not found",
details=f"Config file not found: {config_path}",
).model_dump()), 404

config = load_yaml_file(config_path)
if not config:
return jsonify(ErrorResponse(
message=f"Failed to load config for attack range '{body.attack_range_id}'",
).model_dump()), 500

status = config.get("general", {}).get("status", "")
if status not in ["running", "completed"]:
return jsonify(ErrorResponse(
message=(
f"Cannot apply roles. Attack range status is '{status}'. "
"Must be 'running' or 'completed'."
),
).model_dump()), 400

attack_range_config = config.get("attack_range", [])
target_found = any(server.get("name") == body.target for server in attack_range_config)
if not target_found:
available_servers = [s.get("name") for s in attack_range_config if s.get("name")]
return jsonify(ErrorResponse(
message=f"Target server '{body.target}' not found in attack range configuration",
details=f"Available servers: {', '.join(available_servers) if available_servers else 'None'}",
).model_dump()), 400

controller = AttackRangeController(config, config_path=config_path)
try:
roles_payload = [
{
"content_base64": role.content_base64,
"name": role.name,
"vars": role.vars,
}
for role in body.roles
]
result = controller.apply_role(body.target, roles_payload)
except ValueError as e:
return jsonify(ErrorResponse(
message="Apply role validation failed",
details=str(e),
).model_dump()), 400
except RuntimeError as e:
return jsonify(ErrorResponse(
message="Apply role execution failed",
details=str(e),
).model_dump()), 500

roles_applied = result.get("roles_applied", [])
return jsonify(ApplyRoleResponse(
status="success",
message=(
f"Successfully applied {len(roles_applied)} role(s) on {body.target}"
),
attack_range_id=body.attack_range_id,
target=body.target,
roles_applied=roles_applied,
execution_output=result.get("execution_output"),
).model_dump()), 200

except ValidationError as e:
return jsonify(ErrorResponse(
message="Invalid apply-role request",
details=str(e),
).model_dump()), 400
except Exception as e:
error_traceback = traceback.format_exc()
return jsonify(ErrorResponse(
message="Failed to apply roles",
details=f"{str(e)}\n\n{error_traceback}",
).model_dump()), 500


@app.post(
"/attack-range/splunk/export",
tags=[attack_range_tag],
Expand Down
54 changes: 54 additions & 0 deletions api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,60 @@ class SimulateResponse(BaseModel):
)


class LocalRoleTarget(BaseModel):
"""Local Ansible role packaged as a base64-encoded gzip tarball."""
content_base64: str = Field(
...,
description="Base64-encoded gzip tarball of the role root (tasks/, meta/, etc.)",
)
name: Optional[str] = Field(
None,
description="Optional Ansible role name override (defaults to meta/main.yml or directory name)",
)
vars: Dict[str, Any] = Field(
default_factory=dict,
description="Optional variables passed to the role",
)


class ApplyRoleRequest(BaseModel):
"""Request model for applying local Ansible roles after build."""
attack_range_id: str = Field(..., description="Attack range ID")
target: str = Field(..., description="Target server name (must match a server name in attack_range config)")
roles: List[LocalRoleTarget] = Field(
...,
min_length=1,
description="Local roles to stage and execute on the target host",
)

class Config:
json_schema_extra = {
"example": {
"attack_range_id": "550e8400-e29b-41d4-a716-446655440000",
"target": "splunk",
"roles": [
{
"content_base64": "<base64-encoded-role-tarball>",
"vars": {"example_var": "value"},
}
],
}
}


class ApplyRoleResponse(BaseModel):
"""Response model for apply-role operation."""
status: str = Field(..., description="Operation status")
message: str = Field(..., description="Status message")
attack_range_id: str = Field(..., description="Attack range ID")
target: str = Field(..., description="Target server name")
roles_applied: List[str] = Field(..., description="Resolved Ansible role names that were applied")
execution_output: Optional[Dict[str, Any]] = Field(
None,
description="Optional Ansible execution details",
)


class SplunkExportRequest(BaseModel):
"""Request model for exporting raw events from the attack range Splunk server."""
attack_range_id: str = Field(..., description="Attack range ID")
Expand Down
120 changes: 120 additions & 0 deletions attack_range.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
import sys
import argparse
import glob
import json
import yaml

from attack_range.attack_range_controller import AttackRangeController
from attack_range.managers.ansible_manager import resolve_local_role_name
from attack_range.utils import prepare_config_from_template, resolve_template_path

os.environ["OBJC_DISABLE_INITIALIZE_FORK_SAFETY"] = "YES"
Expand Down Expand Up @@ -232,6 +234,95 @@ def simulate_action(args):
controller.simulate(args.target, techniques, atomics, atomic_files)


def _resolve_config_path(config_arg: str | None) -> str:
"""Resolve config path from CLI arg or single config in config/."""
config_dir = os.path.join(os.path.dirname(__file__), "config")

if not config_arg:
yml_files = glob.glob(os.path.join(config_dir, "*.yml"))
if not yml_files:
print("Error: No config files found in config folder.")
print("Please specify a config file using --config")
sys.exit(1)
if len(yml_files) > 1:
print(f"Error: Multiple config files found ({len(yml_files)} files).")
print("Please specify which config file to use with --config")
print("\nAvailable config files:")
for yml_file in sorted(yml_files):
print(f" - {os.path.basename(yml_file)}")
sys.exit(1)
return os.path.abspath(yml_files[0])

config_path = config_arg
if os.path.dirname(config_path) in ("", "."):
config_path = os.path.join(config_dir, os.path.basename(config_path))
if not os.path.exists(config_path):
print(f"Error: Config file not found: {config_path}")
sys.exit(1)
return os.path.abspath(config_path)


def _load_role_vars_file(vars_file_path: str, role_paths: list[str]) -> dict[str, dict]:
"""Load per-role vars from YAML/JSON; shared vars apply to all roles when unkeyed."""
with open(vars_file_path, "r", encoding="utf-8") as f:
if vars_file_path.endswith(".json"):
data = json.load(f)
else:
data = yaml.safe_load(f)

if not data:
return {}
if not isinstance(data, dict):
print("Error: --vars-file must contain a YAML/JSON object.")
sys.exit(1)

resolved_names = [resolve_local_role_name(os.path.abspath(path)) for path in role_paths]
if any(name in data for name in resolved_names):
return {name: data.get(name, {}) or {} for name in resolved_names}
return {name: data for name in resolved_names}


def apply_role_action(args):
"""Execute apply-role action."""
if not args.role:
print("Error: Specify at least one local role directory with -r/--role.")
sys.exit(1)

for role_path in args.role:
if not os.path.isdir(role_path):
print(f"Error: Role directory not found: {role_path}")
sys.exit(1)

config_path = _resolve_config_path(args.config)
vars_by_role = {}
if args.vars_file:
if not os.path.isfile(args.vars_file):
print(f"Error: Vars file not found: {args.vars_file}")
sys.exit(1)
vars_by_role = _load_role_vars_file(args.vars_file, args.role)

roles = []
for role_path in args.role:
abs_path = os.path.abspath(role_path)
role_name = resolve_local_role_name(abs_path)
role_entry = {"path": abs_path}
if vars_by_role:
role_entry["vars"] = vars_by_role.get(role_name, {})
roles.append(role_entry)

config = load_config(config_path)
controller = AttackRangeController(config, config_path=config_path)
try:
result = controller.apply_role(args.target, roles)
print(f"Successfully applied role(s) on {result['target']}: {', '.join(result['roles_applied'])}")
except ValueError as e:
print(f"Error: {e}")
sys.exit(1)
except RuntimeError as e:
print(f"Error: {e}")
sys.exit(1)


def share_action(args):
"""Execute share action."""
config_dir = os.path.join(os.path.dirname(__file__), "config")
Expand Down Expand Up @@ -354,6 +445,35 @@ def main():
)
simulate_parser.set_defaults(func=simulate_action)

# Apply-role action
apply_role_parser = subparsers.add_parser(
"apply-role",
help="Stage and execute local Ansible roles against a target server",
)
apply_role_parser.add_argument(
"-c",
"--config",
help="Path to the config file (optional if only one config exists in config/ folder)",
)
apply_role_parser.add_argument(
"-t",
"--target",
required=True,
help="Target server name (must match a server name in attack_range config)",
)
apply_role_parser.add_argument(
"-r",
"--role",
action="append",
required=True,
help="Path to a local Ansible role directory (repeat for multiple roles)",
)
apply_role_parser.add_argument(
"--vars-file",
help="YAML or JSON file with role variables (keyed by role name, or shared across all roles)",
)
apply_role_parser.set_defaults(func=apply_role_action)

# Share action
share_parser = subparsers.add_parser(
"share",
Expand Down
Loading
Loading