Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/fixed-20260607-134043.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: fixed
body: Restrict file and directory permissions on auth, config, context, and log paths to prevent local credential exposure on multi-user systems
time: 2026-06-07T13:40:43+02:00
custom:
Author: iemejia
AuthorLink: https://github.com/iemejia
Comment thread
iemejia marked this conversation as resolved.
35 changes: 19 additions & 16 deletions src/fabric_cli/core/fab_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ def __init__(self):
self._load_env()

def _save_auth(self):
with open(self.auth_file, "w") as file:
file.write(json.dumps(self._get_auth_info()))
from fabric_cli.utils.fab_secure_io import write_restricted_file

write_restricted_file(self.auth_file, json.dumps(self._get_auth_info()))

def _load_auth(self):
if os.path.exists(self.auth_file) and os.stat(self.auth_file).st_size != 0:
Expand Down Expand Up @@ -433,18 +434,18 @@ def _is_token_defined(self, scope):
def acquire_token(self, scope: list[str], interactive_renew=True) -> dict:
"""
Core MSAL token acquisition method that returns the full MSAL result.

Comment thread
aviatco marked this conversation as resolved.
This method contains the common authentication logic shared between
get_access_token and msal_bridge.get_token. It performs no validation
on scopes - callers are responsible for their own validation needs.

Args:
scope: The scopes for which to request the token
interactive_renew: Whether to allow interactive authentication for user flows

Returns:
Full MSAL result dictionary containing access_token, expires_on, etc.

Raises:
FabricCLIError: When token acquisition fails
"""
Expand Down Expand Up @@ -495,10 +496,12 @@ def acquire_token(self, scope: list[str], interactive_renew=True) -> dict:

if token and token.get("error"):
fab_logger.log_debug(
f"Error in get token: {token.get('error_description')}")
f"Error in get token: {token.get('error_description')}"
)
raise FabricCLIError(
ErrorMessages.Auth.access_token_error(
"Something went wrong while trying to acquire a token. Please try to run `fab auth logout` and then `fab auth login` to re-login and acquire new tokens."),
"Something went wrong while trying to acquire a token. Please try to run `fab auth logout` and then `fab auth login` to re-login and acquire new tokens."
),
status_code=con.ERROR_AUTHENTICATION_FAILED,
)
if token is None or not token.get("access_token"):
Expand All @@ -512,17 +515,17 @@ def acquire_token(self, scope: list[str], interactive_renew=True) -> dict:
def get_access_token(self, scope: list[str], interactive_renew=True) -> str | None:
"""
Get an access token string for the specified scopes.

This method maintains the existing CLI API - returns just the token string
for backward compatibility. Uses the shared acquire_token method internally.

Args:
scope: The scopes for which to request the token
interactive_renew: Whether to allow interactive authentication for user flows

Returns:
Access token string or None if acquisition fails

Raises:
FabricCLIError: When token acquisition fails
"""
Expand Down Expand Up @@ -732,8 +735,8 @@ def _load_pem_certificate(
)
cert = x509.load_pem_x509_certificate(certificate_data, default_backend())
fingerprint = cert.fingerprint(
hashes.SHA1() # CodeQL [SM02167] SHA‑1 thumbprint is only a certificate identifier required by MSAL/Microsoft Entra, not a cryptographic operation
)
hashes.SHA1() # CodeQL [SM02167] SHA‑1 thumbprint is only a certificate identifier required by MSAL/Microsoft Entra, not a cryptographic operation
)
return self._Cert(certificate_data, private_key, fingerprint)

def _load_pkcs12_certificate(
Expand Down Expand Up @@ -774,8 +777,8 @@ def _load_pkcs12_certificate(
pem_bytes = b"".join(pem_sections)

fingerprint = cert.fingerprint(
hashes.SHA1() # CodeQL [SM02167] SHA‑1 thumbprint is only a certificate identifier required by MSAL/Microsoft Entra, not a cryptographic operation
)
hashes.SHA1() # CodeQL [SM02167] SHA‑1 thumbprint is only a certificate identifier required by MSAL/Microsoft Entra, not a cryptographic operation
)

return self._Cert(pem_bytes, private_key, fingerprint)

Expand Down
5 changes: 3 additions & 2 deletions src/fabric_cli/core/fab_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,9 @@ def _save_context_to_file(self) -> None:
}

try:
with open(self._context_file, "w") as f:
json.dump(context_data, f)
from fabric_cli.utils.fab_secure_io import write_restricted_file

write_restricted_file(self._context_file, json.dumps(context_data))
except Exception:
utils_ui.print_warning(
"Warning: Failed to save context file. Context persistence may not work as expected."
Expand Down
45 changes: 42 additions & 3 deletions src/fabric_cli/core/fab_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
import fabric_cli.core.fab_constant as fab_constant
import fabric_cli.core.fab_state_config as fab_state_config
import fabric_cli.utils.fab_ui as utils_ui
from fabric_cli.utils.fab_secure_io import (
chmod_if_posix,
create_restricted_dir,
get_restricted_file_opener,
)
Comment thread
iemejia marked this conversation as resolved.

_logger_instance = None # Singleton instance
log_file_path = None # Path to the current log file
Expand Down Expand Up @@ -156,7 +161,7 @@ def log_debug_http_request_exception(e):
def _get_log_file_path():
"""Create a log file path in the user's log directory."""
log_dir = user_log_dir("fabric-cli")
os.makedirs(log_dir, exist_ok=True)
create_restricted_dir(log_dir)
return os.path.join(log_dir, "fabcli_debug.log")
Comment thread
iemejia marked this conversation as resolved.


Expand All @@ -171,13 +176,47 @@ def get_logger():
return _logger_instance


class _RestrictedRotatingFileHandler(RotatingFileHandler):
"""RotatingFileHandler that enforces 0o600 on log files (including rotated ones).

Overrides doRollover to tighten permissions on newly created backup files,
ensuring that rotated log files are not world-readable.
"""

def __init__(self, *args, **kwargs):
# Use a custom opener to create the initial log file with 0o600
super().__init__(*args, **kwargs)
# Tighten permissions on the log file (handles pre-existing files)
chmod_if_posix(self.baseFilename, 0o600)

def _open(self): # type: ignore[override]
"""Override _open to use restricted permissions for new log files."""
return open(
self.baseFilename,
self.mode,
encoding=self.encoding,
errors=self.errors,
opener=get_restricted_file_opener(),
)

def doRollover(self):
"""Perform rollover and enforce permissions on the rotated file."""
super().doRollover()
# After rotation, the old file is renamed (e.g., .log.1) and a new
# base file is created. Tighten permissions on all rotated backups.
for i in range(1, self.backupCount + 1):
rotated_file = self.rotation_filename(f"{self.baseFilename}.{i}")
if os.path.exists(rotated_file):
chmod_if_posix(rotated_file, 0o600)


def _setup_logger(file_name: str):
# Initialize the singleton logger
_logger_instance = logging.getLogger("FabricCLI")
_logger_instance.setLevel(logging.DEBUG)

# Configure the log file rotation.
file_handler = RotatingFileHandler(
# Configure the log file rotation with restricted permissions.
file_handler = _RestrictedRotatingFileHandler(
file_name,
maxBytes=5 * 1024 * 1024, # 5 MB
backupCount=7, # Retain 7 rotated files (35 MB total)
Expand Down
12 changes: 7 additions & 5 deletions src/fabric_cli/core/fab_state_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@

import json
import os
from os.path import exists, expanduser
from os.path import expanduser

from fabric_cli.core import fab_constant
from fabric_cli.utils.fab_secure_io import (
create_restricted_dir,
write_restricted_file,
)


def config_location():
_location = expanduser("~/.config/fab/")
if not exists(_location):
os.makedirs(_location)
create_restricted_dir(_location)
return _location
Comment thread
iemejia marked this conversation as resolved.


Expand All @@ -29,8 +32,7 @@ def read_config(file_path) -> dict:


def write_config(data):
with open(config_file, "w") as file:
json.dump(data, file, indent=4)
write_restricted_file(config_file, json.dumps(data, indent=4))


def set_config(key, value):
Expand Down
81 changes: 81 additions & 0 deletions src/fabric_cli/utils/fab_secure_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

Comment thread
aviatco marked this conversation as resolved.
"""Shared helpers for enforcing restrictive file and directory permissions.

On POSIX systems (Linux/macOS), these helpers ensure that sensitive files are
created with owner-only access (0o600 for files, 0o700 for directories) and
tighten permissions on pre-existing paths from older CLI versions.

On Windows, POSIX permission bits are a no-op — Windows uses ACLs instead,
and default user-profile ACLs already restrict access to the owner.
"""

import logging
import os

_logger = logging.getLogger(__name__)

# True on Linux/macOS; False on Windows where POSIX permission bits are a no-op.
IS_POSIX = os.name != "nt"


def chmod_if_posix(path: str, mode: int) -> None:
"""Best-effort chmod on POSIX; no-op on Windows.

Logs at debug level when chmod fails (e.g. permission denied on
restrictive filesystems) since the user cannot act on it.
"""
if IS_POSIX:
try:
os.chmod(path, mode)
except OSError as e:
_logger.debug("Failed to set permissions %o on %s: %s", mode, path, e)


def write_restricted_file(file_path: str, content: str) -> None:
"""Write content to a file with owner-only permissions (0o600).

Handles both new file creation and tightening permissions on
pre-existing files from older CLI versions. Permissions are
tightened *before* truncation so that sensitive content is never
written to a world-readable file descriptor.
"""
# Tighten permissions on pre-existing files before writing, so
# the truncate+write never exposes new content through a permissive fd.
if os.path.exists(file_path):
chmod_if_posix(file_path, 0o600)
fd = os.open(file_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
try:
with os.fdopen(fd, "w", encoding="utf-8") as file:
file.write(content)
except Exception:
# os.fdopen may fail before wrapping fd; close to avoid leak.
# If os.fdopen succeeded, fd is already closed by the with block.
try:
os.close(fd)
except OSError:
pass
raise


def get_restricted_file_opener():
"""Return a file opener that creates files with 0o600 on POSIX, or None on Windows.

Intended for use as the ``opener`` argument to :func:`open`. Returns
``None`` on Windows so that the default opener is used.
"""
if IS_POSIX:
return lambda path, flags: os.open(path, flags, 0o600)
return None


def create_restricted_dir(dir_path: str) -> None:
"""Create a directory with owner-only permissions (0o700).

Uses exist_ok=True to avoid TOCTOU races and tightens permissions
on pre-existing directories from older CLI versions.
"""
os.makedirs(dir_path, mode=0o700, exist_ok=True)
# Enforce permissions on pre-existing directories from older versions
chmod_if_posix(dir_path, 0o700)
33 changes: 20 additions & 13 deletions tests/test_core/test_context_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,19 @@ def mock_get_config(key):
# Create a mock workspace with the tenant as parent
workspace = Workspace("test_workspace", "5678", tenant, "Workspace")

# Mock json.dump to avoid actually writing to the file system
with patch("json.dump") as mock_json_dump, patch("builtins.open", MagicMock()):
# Mock write_restricted_file to avoid actually writing to the file system
with patch(
"fabric_cli.utils.fab_secure_io.write_restricted_file"
) as mock_write:

# Set the context
context.context = workspace

# Check that json.dump was called with the right data
mock_json_dump.assert_called_once()
args, _ = mock_json_dump.call_args
assert args[0] == {"path": workspace.path}
# Check that write_restricted_file was called with the right data
mock_write.assert_called_once()
args, _ = mock_write.call_args
assert args[0] == temp_context_file
assert json.loads(args[1]) == {"path": workspace.path}
finally:
os.remove(temp_context_file)

Expand Down Expand Up @@ -90,7 +93,9 @@ def test_context_persistence_not_used_in_interactive_mode(monkeypatch):

# Set runtime mode to interactive and enable persistence
context = Context()
monkeypatch.setattr(context, "get_runtime_mode", lambda: fab_constant.FAB_MODE_INTERACTIVE)
monkeypatch.setattr(
context, "get_runtime_mode", lambda: fab_constant.FAB_MODE_INTERACTIVE
)

def mock_get_config(key):
if key == fab_constant.FAB_CONTEXT_PERSISTENCE_ENABLED:
Expand Down Expand Up @@ -185,16 +190,18 @@ def mock_get_config(key):
tenant = Tenant("test_tenant", "1234")
workspace = Workspace("test_workspace", "5678", tenant, "Workspace")

# Mock json.dump to avoid actually writing to the file system
with patch("json.dump") as mock_json_dump, patch("builtins.open", MagicMock()):
# Mock write_restricted_file to avoid actually writing to the file system
with patch(
"fabric_cli.utils.fab_secure_io.write_restricted_file"
) as mock_write:

# Set the context - this SHOULD trigger file save when persistence is enabled
context.context = workspace

# Check that json.dump was called with the right data
mock_json_dump.assert_called_once()
args, _ = mock_json_dump.call_args
assert args[0] == {"path": workspace.path}
# Check that write_restricted_file was called with the right data
mock_write.assert_called_once()
args, _ = mock_write.call_args
assert json.loads(args[1]) == {"path": workspace.path}
finally:
os.remove(temp_context_file)

Expand Down
Loading
Loading