Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions src/mas/devops/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@
for pre-release versions and wildcard version strings.
"""

import logging
import re
import semver
import requests
from requests.auth import HTTPBasicAuth

logger = logging.getLogger(__name__)


def isVersionBefore(_compare_to_version, _current_version):
Expand Down Expand Up @@ -74,3 +80,92 @@ def isVersionEqualOrAfter(_compare_to_version, _current_version):
current_version = semver.VersionInfo.parse(strippedVersion)
compareToVersion = semver.VersionInfo.parse(_compare_to_version)
return current_version.compare(compareToVersion) >= 0


def validateIBMEntitlementKey(entitlementKey: str, repository: str = "cp/mas/coreapi", timeout: int = 30) -> bool:
"""Validate IBM entitlement key against cp.icr.io registry.

This function validates an IBM entitlement key by attempting to obtain
an authentication token from the IBM Container Registry and verifying
access to the specified repository.

Args:
entitlementKey (str): IBM entitlement key to validate.
repository (str, optional): Repository to test access against. Defaults to "cp/mas/coreapi".
timeout (int, optional): Request timeout in seconds. Defaults to 30.

Returns:
bool: True if key is valid and grants access to the repository, False otherwise.

Raises:
requests.exceptions.RequestException: If network request fails.
"""
try:
registry_url = f"https://cp.icr.io/v2/{repository}/tags/list"
logger.debug(f"Validating entitlement key against {repository}")

# First request without auth to get the auth challenge
response = requests.get(registry_url, timeout=timeout)

if response.status_code == 401:
# Parse WWW-Authenticate header to get token endpoint
auth_header = response.headers.get("WWW-Authenticate", "")
logger.debug(f"Auth challenge received: {auth_header[:100]}...")

# Extract realm and service from auth header
realm_match = re.search(r'realm="([^"]+)"', auth_header)
service_match = re.search(r'service="([^"]+)"', auth_header)
scope_match = re.search(r'scope="([^"]+)"', auth_header)

if not realm_match:
logger.error("Could not parse authentication realm")
return False

token_url = realm_match.group(1)
params = {}

if service_match:
params["service"] = service_match.group(1)
if scope_match:
params["scope"] = scope_match.group(1)
else:
params["scope"] = f"repository:{repository}:pull"

logger.debug(f"Token endpoint: {token_url}")

# Get authentication token
token_response = requests.get(token_url, params=params, auth=HTTPBasicAuth("cp", entitlementKey), timeout=timeout)

if token_response.status_code != 200:
logger.error(f"Failed to get token (HTTP {token_response.status_code})")
return False

token_data = token_response.json()
token = token_data.get("token") or token_data.get("access_token")

if not token:
logger.error("No token received - invalid entitlement key")
return False

# Validate token by accessing registry
logger.debug("Validating token against registry")
headers = {"Authorization": f"Bearer {token}"}
validate_response = requests.get(registry_url, headers=headers, timeout=timeout)

if validate_response.status_code == 200:
logger.info(f"Valid entitlement key with access to {repository}")
return True
else:
logger.error(f"Token validation failed (HTTP {validate_response.status_code})")
return False

elif response.status_code == 200:
logger.info("Registry accessible without authentication (public repository)")
return True
else:
logger.error(f"Unexpected response (HTTP {response.status_code})")
return False

except requests.exceptions.RequestException as e:
logger.error(f"Request failed: {e}")
raise
Loading