diff --git a/.changes/unreleased/fixed-20260607-134043.yaml b/.changes/unreleased/fixed-20260607-134043.yaml new file mode 100644 index 000000000..e3d83b44f --- /dev/null +++ b/.changes/unreleased/fixed-20260607-134043.yaml @@ -0,0 +1,6 @@ +kind: fixed +body: Restrict file and directory permissions on auth, config, context, and log paths to prevent local credential exposure on multi-user systems +time: 2026-06-07T13:40:43+02:00 +custom: + Author: iemejia + AuthorLink: https://github.com/iemejia diff --git a/src/fabric_cli/core/fab_auth.py b/src/fabric_cli/core/fab_auth.py index a12b890c4..100858613 100644 --- a/src/fabric_cli/core/fab_auth.py +++ b/src/fabric_cli/core/fab_auth.py @@ -56,8 +56,9 @@ def __init__(self): self._load_env() def _save_auth(self): - with open(self.auth_file, "w") as file: - file.write(json.dumps(self._get_auth_info())) + from fabric_cli.utils.fab_secure_io import write_restricted_file + + write_restricted_file(self.auth_file, json.dumps(self._get_auth_info())) def _load_auth(self): if os.path.exists(self.auth_file) and os.stat(self.auth_file).st_size != 0: @@ -433,18 +434,18 @@ def _is_token_defined(self, scope): def acquire_token(self, scope: list[str], interactive_renew=True) -> dict: """ Core MSAL token acquisition method that returns the full MSAL result. - + This method contains the common authentication logic shared between get_access_token and msal_bridge.get_token. It performs no validation on scopes - callers are responsible for their own validation needs. - + Args: scope: The scopes for which to request the token interactive_renew: Whether to allow interactive authentication for user flows - + Returns: Full MSAL result dictionary containing access_token, expires_on, etc. - + Raises: FabricCLIError: When token acquisition fails """ @@ -495,10 +496,12 @@ def acquire_token(self, scope: list[str], interactive_renew=True) -> dict: if token and token.get("error"): fab_logger.log_debug( - f"Error in get token: {token.get('error_description')}") + f"Error in get token: {token.get('error_description')}" + ) raise FabricCLIError( ErrorMessages.Auth.access_token_error( - "Something went wrong while trying to acquire a token. Please try to run `fab auth logout` and then `fab auth login` to re-login and acquire new tokens."), + "Something went wrong while trying to acquire a token. Please try to run `fab auth logout` and then `fab auth login` to re-login and acquire new tokens." + ), status_code=con.ERROR_AUTHENTICATION_FAILED, ) if token is None or not token.get("access_token"): @@ -512,17 +515,17 @@ def acquire_token(self, scope: list[str], interactive_renew=True) -> dict: def get_access_token(self, scope: list[str], interactive_renew=True) -> str | None: """ Get an access token string for the specified scopes. - + This method maintains the existing CLI API - returns just the token string for backward compatibility. Uses the shared acquire_token method internally. - + Args: scope: The scopes for which to request the token interactive_renew: Whether to allow interactive authentication for user flows - + Returns: Access token string or None if acquisition fails - + Raises: FabricCLIError: When token acquisition fails """ @@ -732,8 +735,8 @@ def _load_pem_certificate( ) cert = x509.load_pem_x509_certificate(certificate_data, default_backend()) fingerprint = cert.fingerprint( - hashes.SHA1() # CodeQL [SM02167] SHA‑1 thumbprint is only a certificate identifier required by MSAL/Microsoft Entra, not a cryptographic operation - ) + hashes.SHA1() # CodeQL [SM02167] SHA‑1 thumbprint is only a certificate identifier required by MSAL/Microsoft Entra, not a cryptographic operation + ) return self._Cert(certificate_data, private_key, fingerprint) def _load_pkcs12_certificate( @@ -774,8 +777,8 @@ def _load_pkcs12_certificate( pem_bytes = b"".join(pem_sections) fingerprint = cert.fingerprint( - hashes.SHA1() # CodeQL [SM02167] SHA‑1 thumbprint is only a certificate identifier required by MSAL/Microsoft Entra, not a cryptographic operation - ) + hashes.SHA1() # CodeQL [SM02167] SHA‑1 thumbprint is only a certificate identifier required by MSAL/Microsoft Entra, not a cryptographic operation + ) return self._Cert(pem_bytes, private_key, fingerprint) diff --git a/src/fabric_cli/core/fab_context.py b/src/fabric_cli/core/fab_context.py index 10c9da2e1..ed18b7c93 100644 --- a/src/fabric_cli/core/fab_context.py +++ b/src/fabric_cli/core/fab_context.py @@ -159,8 +159,9 @@ def _save_context_to_file(self) -> None: } try: - with open(self._context_file, "w") as f: - json.dump(context_data, f) + from fabric_cli.utils.fab_secure_io import write_restricted_file + + write_restricted_file(self._context_file, json.dumps(context_data)) except Exception: utils_ui.print_warning( "Warning: Failed to save context file. Context persistence may not work as expected." diff --git a/src/fabric_cli/core/fab_logger.py b/src/fabric_cli/core/fab_logger.py index 9af40c276..86f750a66 100644 --- a/src/fabric_cli/core/fab_logger.py +++ b/src/fabric_cli/core/fab_logger.py @@ -13,6 +13,11 @@ import fabric_cli.core.fab_constant as fab_constant import fabric_cli.core.fab_state_config as fab_state_config import fabric_cli.utils.fab_ui as utils_ui +from fabric_cli.utils.fab_secure_io import ( + chmod_if_posix, + create_restricted_dir, + get_restricted_file_opener, +) _logger_instance = None # Singleton instance log_file_path = None # Path to the current log file @@ -156,7 +161,7 @@ def log_debug_http_request_exception(e): def _get_log_file_path(): """Create a log file path in the user's log directory.""" log_dir = user_log_dir("fabric-cli") - os.makedirs(log_dir, exist_ok=True) + create_restricted_dir(log_dir) return os.path.join(log_dir, "fabcli_debug.log") @@ -171,13 +176,47 @@ def get_logger(): return _logger_instance +class _RestrictedRotatingFileHandler(RotatingFileHandler): + """RotatingFileHandler that enforces 0o600 on log files (including rotated ones). + + Overrides doRollover to tighten permissions on newly created backup files, + ensuring that rotated log files are not world-readable. + """ + + def __init__(self, *args, **kwargs): + # Use a custom opener to create the initial log file with 0o600 + super().__init__(*args, **kwargs) + # Tighten permissions on the log file (handles pre-existing files) + chmod_if_posix(self.baseFilename, 0o600) + + def _open(self): # type: ignore[override] + """Override _open to use restricted permissions for new log files.""" + return open( + self.baseFilename, + self.mode, + encoding=self.encoding, + errors=self.errors, + opener=get_restricted_file_opener(), + ) + + def doRollover(self): + """Perform rollover and enforce permissions on the rotated file.""" + super().doRollover() + # After rotation, the old file is renamed (e.g., .log.1) and a new + # base file is created. Tighten permissions on all rotated backups. + for i in range(1, self.backupCount + 1): + rotated_file = self.rotation_filename(f"{self.baseFilename}.{i}") + if os.path.exists(rotated_file): + chmod_if_posix(rotated_file, 0o600) + + def _setup_logger(file_name: str): # Initialize the singleton logger _logger_instance = logging.getLogger("FabricCLI") _logger_instance.setLevel(logging.DEBUG) - # Configure the log file rotation. - file_handler = RotatingFileHandler( + # Configure the log file rotation with restricted permissions. + file_handler = _RestrictedRotatingFileHandler( file_name, maxBytes=5 * 1024 * 1024, # 5 MB backupCount=7, # Retain 7 rotated files (35 MB total) diff --git a/src/fabric_cli/core/fab_state_config.py b/src/fabric_cli/core/fab_state_config.py index 2e4413904..a7f946a24 100644 --- a/src/fabric_cli/core/fab_state_config.py +++ b/src/fabric_cli/core/fab_state_config.py @@ -3,15 +3,18 @@ import json import os -from os.path import exists, expanduser +from os.path import expanduser from fabric_cli.core import fab_constant +from fabric_cli.utils.fab_secure_io import ( + create_restricted_dir, + write_restricted_file, +) def config_location(): _location = expanduser("~/.config/fab/") - if not exists(_location): - os.makedirs(_location) + create_restricted_dir(_location) return _location @@ -29,8 +32,7 @@ def read_config(file_path) -> dict: def write_config(data): - with open(config_file, "w") as file: - json.dump(data, file, indent=4) + write_restricted_file(config_file, json.dumps(data, indent=4)) def set_config(key, value): diff --git a/src/fabric_cli/utils/fab_secure_io.py b/src/fabric_cli/utils/fab_secure_io.py new file mode 100644 index 000000000..84958e037 --- /dev/null +++ b/src/fabric_cli/utils/fab_secure_io.py @@ -0,0 +1,81 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Shared helpers for enforcing restrictive file and directory permissions. + +On POSIX systems (Linux/macOS), these helpers ensure that sensitive files are +created with owner-only access (0o600 for files, 0o700 for directories) and +tighten permissions on pre-existing paths from older CLI versions. + +On Windows, POSIX permission bits are a no-op — Windows uses ACLs instead, +and default user-profile ACLs already restrict access to the owner. +""" + +import logging +import os + +_logger = logging.getLogger(__name__) + +# True on Linux/macOS; False on Windows where POSIX permission bits are a no-op. +IS_POSIX = os.name != "nt" + + +def chmod_if_posix(path: str, mode: int) -> None: + """Best-effort chmod on POSIX; no-op on Windows. + + Logs at debug level when chmod fails (e.g. permission denied on + restrictive filesystems) since the user cannot act on it. + """ + if IS_POSIX: + try: + os.chmod(path, mode) + except OSError as e: + _logger.debug("Failed to set permissions %o on %s: %s", mode, path, e) + + +def write_restricted_file(file_path: str, content: str) -> None: + """Write content to a file with owner-only permissions (0o600). + + Handles both new file creation and tightening permissions on + pre-existing files from older CLI versions. Permissions are + tightened *before* truncation so that sensitive content is never + written to a world-readable file descriptor. + """ + # Tighten permissions on pre-existing files before writing, so + # the truncate+write never exposes new content through a permissive fd. + if os.path.exists(file_path): + chmod_if_posix(file_path, 0o600) + fd = os.open(file_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) + try: + with os.fdopen(fd, "w", encoding="utf-8") as file: + file.write(content) + except Exception: + # os.fdopen may fail before wrapping fd; close to avoid leak. + # If os.fdopen succeeded, fd is already closed by the with block. + try: + os.close(fd) + except OSError: + pass + raise + + +def get_restricted_file_opener(): + """Return a file opener that creates files with 0o600 on POSIX, or None on Windows. + + Intended for use as the ``opener`` argument to :func:`open`. Returns + ``None`` on Windows so that the default opener is used. + """ + if IS_POSIX: + return lambda path, flags: os.open(path, flags, 0o600) + return None + + +def create_restricted_dir(dir_path: str) -> None: + """Create a directory with owner-only permissions (0o700). + + Uses exist_ok=True to avoid TOCTOU races and tightens permissions + on pre-existing directories from older CLI versions. + """ + os.makedirs(dir_path, mode=0o700, exist_ok=True) + # Enforce permissions on pre-existing directories from older versions + chmod_if_posix(dir_path, 0o700) diff --git a/tests/test_core/test_context_persistence.py b/tests/test_core/test_context_persistence.py index 0c6752397..01c71ed41 100644 --- a/tests/test_core/test_context_persistence.py +++ b/tests/test_core/test_context_persistence.py @@ -46,16 +46,19 @@ def mock_get_config(key): # Create a mock workspace with the tenant as parent workspace = Workspace("test_workspace", "5678", tenant, "Workspace") - # Mock json.dump to avoid actually writing to the file system - with patch("json.dump") as mock_json_dump, patch("builtins.open", MagicMock()): + # Mock write_restricted_file to avoid actually writing to the file system + with patch( + "fabric_cli.utils.fab_secure_io.write_restricted_file" + ) as mock_write: # Set the context context.context = workspace - # Check that json.dump was called with the right data - mock_json_dump.assert_called_once() - args, _ = mock_json_dump.call_args - assert args[0] == {"path": workspace.path} + # Check that write_restricted_file was called with the right data + mock_write.assert_called_once() + args, _ = mock_write.call_args + assert args[0] == temp_context_file + assert json.loads(args[1]) == {"path": workspace.path} finally: os.remove(temp_context_file) @@ -90,7 +93,9 @@ def test_context_persistence_not_used_in_interactive_mode(monkeypatch): # Set runtime mode to interactive and enable persistence context = Context() - monkeypatch.setattr(context, "get_runtime_mode", lambda: fab_constant.FAB_MODE_INTERACTIVE) + monkeypatch.setattr( + context, "get_runtime_mode", lambda: fab_constant.FAB_MODE_INTERACTIVE + ) def mock_get_config(key): if key == fab_constant.FAB_CONTEXT_PERSISTENCE_ENABLED: @@ -185,16 +190,18 @@ def mock_get_config(key): tenant = Tenant("test_tenant", "1234") workspace = Workspace("test_workspace", "5678", tenant, "Workspace") - # Mock json.dump to avoid actually writing to the file system - with patch("json.dump") as mock_json_dump, patch("builtins.open", MagicMock()): + # Mock write_restricted_file to avoid actually writing to the file system + with patch( + "fabric_cli.utils.fab_secure_io.write_restricted_file" + ) as mock_write: # Set the context - this SHOULD trigger file save when persistence is enabled context.context = workspace - # Check that json.dump was called with the right data - mock_json_dump.assert_called_once() - args, _ = mock_json_dump.call_args - assert args[0] == {"path": workspace.path} + # Check that write_restricted_file was called with the right data + mock_write.assert_called_once() + args, _ = mock_write.call_args + assert json.loads(args[1]) == {"path": workspace.path} finally: os.remove(temp_context_file) diff --git a/tests/test_core/test_fab_auth.py b/tests/test_core/test_fab_auth.py index 5cf2e5686..c42934024 100644 --- a/tests/test_core/test_fab_auth.py +++ b/tests/test_core/test_fab_auth.py @@ -824,9 +824,13 @@ def acquire_token_for_client(self, *, scopes): with pytest.raises(FabricCLIError) as exc_info: auth.get_access_token(["dummy_scope"]) assert exc_info.value.status_code == con.ERROR_AUTHENTICATION_FAILED - assert exc_info.value.message == "Failed to get access token: Something went wrong while trying to acquire a token. Please try to run `fab auth logout` and then `fab auth login` to re-login and acquire new tokens" + assert ( + exc_info.value.message + == "Failed to get access token: Something went wrong while trying to acquire a token. Please try to run `fab auth logout` and then `fab auth login` to re-login and acquire new tokens" + ) assert exc_info.value.status_code == con.ERROR_AUTHENTICATION_FAILED + def test_set_access_mode_success(monkeypatch): """Test setting a valid access mode""" auth = FabAuth() @@ -1064,3 +1068,55 @@ def test_auth_mode_migration(tmp_path): assert ( auth.get_identity_type() == "user" ), "get_identity_type returns wrong value after migration" + + +# region security: auth file permission tests + +_skip_on_windows = pytest.mark.skipif( + os.name == "nt", reason="POSIX permission tests not applicable on Windows" +) + + +@_skip_on_windows +def test_save_auth_creates_file_with_restricted_permissions_success(tmp_path): + """Verify auth.json is created with mode 0o600 (owner read/write only).""" + auth = FabAuth() + auth.auth_file = os.path.join(str(tmp_path), "auth.json") + auth._auth_info = {con.IDENTITY_TYPE: "user"} + + auth._save_auth() + + assert os.path.exists(auth.auth_file) + mode = oct(os.stat(auth.auth_file).st_mode & 0o777) + assert mode == "0o600", f"auth.json has mode {mode}, expected 0o600" + + # Verify content is still correct + with open(auth.auth_file, "r") as f: + data = json.load(f) + assert data[con.IDENTITY_TYPE] == "user" + + +@_skip_on_windows +def test_save_auth_tightens_permissions_on_existing_file_success(tmp_path): + """Verify _save_auth() enforces 0o600 on a pre-existing permissive file.""" + auth = FabAuth() + auth.auth_file = os.path.join(str(tmp_path), "auth.json") + + # Create file with world-readable permissions (0o644) to simulate a + # pre-existing file written by an older CLI version without hardening + with open(auth.auth_file, "w") as f: + json.dump({con.IDENTITY_TYPE: "user"}, f) + os.chmod(auth.auth_file, 0o644) + + auth._auth_info = {con.IDENTITY_TYPE: "spn"} + auth._save_auth() + + mode = oct(os.stat(auth.auth_file).st_mode & 0o777) + assert mode == "0o600", f"auth.json has mode {mode} after overwrite, expected 0o600" + + with open(auth.auth_file, "r") as f: + data = json.load(f) + assert data[con.IDENTITY_TYPE] == "spn" + + +# endregion diff --git a/tests/test_core/test_fab_logger.py b/tests/test_core/test_fab_logger.py index 7ad67a36d..35f466edd 100644 --- a/tests/test_core/test_fab_logger.py +++ b/tests/test_core/test_fab_logger.py @@ -249,3 +249,99 @@ def mock_log_warning(): def mock_get_log_file_path(): with patch("fabric_cli.core.fab_logger.get_log_file_path") as mock: yield mock + + +# ── Security: log directory and file permissions ───────────────────────────── + +_skip_on_windows = pytest.mark.skipif( + os.name == "nt", reason="POSIX permission tests not applicable on Windows" +) + + +@_skip_on_windows +def test_get_log_file_path_creates_directory_with_restricted_permissions_success( + monkeypatch, tmp_path +): + """Verify log directory is created with mode 0o700 (owner-only).""" + log_dir = tmp_path / "fabric-cli" / "log" + monkeypatch.setattr(logger, "user_log_dir", lambda app_name: str(log_dir)) + + result = logger._get_log_file_path() + assert result.endswith("fabcli_debug.log") + assert log_dir.exists() + + mode = oct(log_dir.stat().st_mode & 0o777) + assert mode == "0o700", f"Log directory has mode {mode}, expected 0o700" + + +@_skip_on_windows +def test_log_file_created_with_restricted_permissions_success(monkeypatch, tmp_path): + """Verify log file is created with mode 0o600 and is readable by the owner.""" + log_dir = tmp_path / "fabric-cli" / "log" + log_dir.mkdir(parents=True, mode=0o700) + monkeypatch.setattr(logger, "user_log_dir", lambda app_name: str(log_dir)) + + # Reset singleton so _setup_logger creates a fresh handler + monkeypatch.setattr(logger, "_logger_instance", None) + + log_file = log_dir / "fabcli_debug.log" + log_instance = logger._setup_logger(str(log_file)) + + # Write something to ensure the file exists + log_instance.debug("permission test entry") + + assert log_file.exists() + mode = oct(log_file.stat().st_mode & 0o777) + assert mode == "0o600", f"Log file has mode {mode}, expected 0o600" + + # Verify the owner can still read the file + content = log_file.read_text() + assert "permission test entry" in content + + # Cleanup: remove handlers to avoid accumulation on the global singleton + for handler in log_instance.handlers[:]: + log_instance.removeHandler(handler) + handler.close() + + +@_skip_on_windows +def test_log_file_rotation_preserves_restricted_permissions_success( + monkeypatch, tmp_path +): + """Verify rotated log files maintain 0o600 permissions.""" + log_dir = tmp_path / "fabric-cli" / "log" + log_dir.mkdir(parents=True, mode=0o700) + log_file = log_dir / "fabcli_debug.log" + + # Create a handler with a small maxBytes to force rotation quickly + handler = logger._RestrictedRotatingFileHandler( + str(log_file), + maxBytes=100, # Very small to trigger rotation + backupCount=2, + ) + formatter = logging.Formatter("%(message)s") + handler.setFormatter(formatter) + + test_logger = logging.getLogger("FabricCLI_rotation_test") + test_logger.setLevel(logging.DEBUG) + test_logger.addHandler(handler) + + # Write enough data to trigger at least one rotation + for i in range(20): + test_logger.debug(f"rotation test entry {i} with padding to exceed limit") + + # Check that at least one rotated file was created + rotated_file_1 = log_dir / "fabcli_debug.log.1" + assert rotated_file_1.exists(), "Expected at least one rotated log file" + + # Verify permissions on the base log file + mode = oct(log_file.stat().st_mode & 0o777) + assert mode == "0o600", f"Base log file has mode {mode}, expected 0o600" + + # Verify permissions on the rotated file + mode = oct(rotated_file_1.stat().st_mode & 0o777) + assert mode == "0o600", f"Rotated log file has mode {mode}, expected 0o600" + + # Cleanup + test_logger.removeHandler(handler) + handler.close() diff --git a/tests/test_core/test_fab_state_config.py b/tests/test_core/test_fab_state_config.py index df0b171c7..1356415ea 100644 --- a/tests/test_core/test_fab_state_config.py +++ b/tests/test_core/test_fab_state_config.py @@ -5,6 +5,8 @@ import os import tempfile +import pytest + import fabric_cli.core.fab_state_config as cfg from fabric_cli.core import fab_constant @@ -115,10 +117,14 @@ def _create_temp_config(monkeypatch, tmp_path, config_data): def test_init_defaults_removes_mode_key_success(monkeypatch, tmp_path): """If an existing config file contains 'mode', init_defaults must delete it.""" - config_file = _create_temp_config(monkeypatch, tmp_path, { - fab_constant.FAB_MODE: fab_constant.FAB_MODE_INTERACTIVE, - fab_constant.FAB_CACHE_ENABLED: "true", - }) + config_file = _create_temp_config( + monkeypatch, + tmp_path, + { + fab_constant.FAB_MODE: fab_constant.FAB_MODE_INTERACTIVE, + fab_constant.FAB_CACHE_ENABLED: "true", + }, + ) cfg.init_defaults() @@ -129,9 +135,13 @@ def test_init_defaults_removes_mode_key_success(monkeypatch, tmp_path): def test_init_defaults_no_mode_key_success(monkeypatch, tmp_path): """Config without 'mode' must initialize cleanly (distinct from removes_mode_key: verifies no error on absence).""" - config_file = _create_temp_config(monkeypatch, tmp_path, { - fab_constant.FAB_DEBUG_ENABLED: "true", - }) + config_file = _create_temp_config( + monkeypatch, + tmp_path, + { + fab_constant.FAB_DEBUG_ENABLED: "true", + }, + ) cfg.init_defaults() @@ -148,20 +158,114 @@ def test_init_defaults_applies_missing_defaults_success(monkeypatch, tmp_path): result = cfg.read_config(config_file) for key, default_val in fab_constant.CONFIG_DEFAULT_VALUES.items(): - assert result.get(key) == default_val, ( - f"Expected default for '{key}' = '{default_val}', got '{result.get(key)}'" - ) + assert ( + result.get(key) == default_val + ), f"Expected default for '{key}' = '{default_val}', got '{result.get(key)}'" def test_init_defaults_preserves_user_overrides_success(monkeypatch, tmp_path): """User-set values must not be overwritten by defaults.""" - config_file = _create_temp_config(monkeypatch, tmp_path, { - fab_constant.FAB_CACHE_ENABLED: "false", - }) + config_file = _create_temp_config( + monkeypatch, + tmp_path, + { + fab_constant.FAB_CACHE_ENABLED: "false", + }, + ) cfg.init_defaults() result = cfg.read_config(config_file) assert result[fab_constant.FAB_CACHE_ENABLED] == "false" + +# endregion + + +# region security: file permission tests + +_skip_on_windows = pytest.mark.skipif( + os.name == "nt", reason="POSIX permission tests not applicable on Windows" +) + + +@_skip_on_windows +def test_config_location_creates_directory_with_restricted_permissions_success( + monkeypatch, tmp_path +): + """Verify config directory is created with mode 0o700 (owner-only access).""" + monkeypatch.setattr( + "fabric_cli.core.fab_state_config.expanduser", + lambda path: path.replace("~", str(tmp_path)), + ) + + location = cfg.config_location() + assert os.path.isdir(location) + + mode = oct(os.stat(location).st_mode & 0o777) + assert mode == "0o700", f"Config directory has mode {mode}, expected 0o700" + + +@_skip_on_windows +def test_config_location_tightens_permissions_on_existing_directory_success( + monkeypatch, tmp_path +): + """Verify config_location() enforces 0o700 on a pre-existing permissive directory.""" + config_dir = tmp_path / ".config" / "fab" + config_dir.mkdir(parents=True, mode=0o755) + monkeypatch.setattr( + "fabric_cli.core.fab_state_config.expanduser", + lambda path: path.replace("~", str(tmp_path)), + ) + + cfg.config_location() + + mode = oct(config_dir.stat().st_mode & 0o777) + assert mode == "0o700", f"Config directory has mode {mode}, expected 0o700" + + +@_skip_on_windows +def test_write_config_creates_file_with_restricted_permissions_success( + monkeypatch, tmp_path +): + """Verify config files are created with mode 0o600 (owner read/write only).""" + config_file = os.path.join(str(tmp_path), "config.json") + monkeypatch.setattr(cfg, "config_file", config_file) + + cfg.write_config({"key": "value"}) + + assert os.path.exists(config_file) + mode = oct(os.stat(config_file).st_mode & 0o777) + assert mode == "0o600", f"Config file has mode {mode}, expected 0o600" + + # Verify content is still correct + data = cfg.read_config(config_file) + assert data == {"key": "value"} + + +@_skip_on_windows +def test_write_config_tightens_permissions_on_existing_file_success( + monkeypatch, tmp_path +): + """Verify write_config() enforces 0o600 on a pre-existing permissive file.""" + config_file = os.path.join(str(tmp_path), "config.json") + monkeypatch.setattr(cfg, "config_file", config_file) + + # Create file with world-readable permissions (0o644) to simulate a + # pre-existing file written by an older CLI version without hardening + with open(config_file, "w") as f: + json.dump({"old": "data"}, f) + os.chmod(config_file, 0o644) + + cfg.write_config({"new": "data"}) + + mode = oct(os.stat(config_file).st_mode & 0o777) + assert ( + mode == "0o600" + ), f"Config file has mode {mode} after overwrite, expected 0o600" + + data = cfg.read_config(config_file) + assert data == {"new": "data"} + + # endregion