From 876acfe3141f2f66ddf04b6b9b0418d695336f2d Mon Sep 17 00:00:00 2001 From: Denys Fedoryshchenko Date: Sat, 23 May 2026 01:48:38 +0300 Subject: [PATCH 1/3] Implement more proper boot logging retrieval and publishing with upload to s3 bucket Signed-off-by: Denys Fedoryshchenko --- src/kernel_ci_cloud_labs/core/artifacts.py | 266 +++++++++++++++++++++ src/kernel_ci_cloud_labs/core/pipeline.py | 71 ++++++ src/kernel_ci_cloud_labs/launch_vm.py | 41 +++- src/kernel_ci_cloud_labs/setup_validate.py | 182 +++++++++++++- tests/test_artifacts.py | 173 ++++++++++++++ 5 files changed, 726 insertions(+), 7 deletions(-) create mode 100644 src/kernel_ci_cloud_labs/core/artifacts.py create mode 100644 tests/test_artifacts.py diff --git a/src/kernel_ci_cloud_labs/core/artifacts.py b/src/kernel_ci_cloud_labs/core/artifacts.py new file mode 100644 index 0000000..ebe330d --- /dev/null +++ b/src/kernel_ci_cloud_labs/core/artifacts.py @@ -0,0 +1,266 @@ +"""Per-run artifact collection and manifest for KCIDB submission. + +This module fills the gap between two existing flows: + + * `launch_vm.py` (inside the ECS container) uploads per-instance output — + notably ``console-output.log`` (kernel boot log) — to S3 under + ``s3:////test_/output//``. + + * `pull_labs_poller.submit_tests()` posts KCIDB test rows whose + ``log_url`` / ``output_files`` fields are currently always empty. + +The orchestrator (``core.pipeline.run_pipeline``) calls +:func:`collect_run_artifacts` after VM CloudWatch logs have been pulled. +For each instance under the run prefix it: + + 1. Downloads the kernel boot console log from S3 to + ``logs/run_/vms/-console.log`` so a developer can grep + it locally without an `aws s3 cp` round-trip. + 2. Constructs the public HTTPS URL of that object on S3 — the value KCIDB + expects in the ``tests[*].log_url`` field. The URL only resolves when + the bucket carries the public-read policy installed by + ``setup_validate.check_s3_logs_public_policy``. + 3. Records a manifest entry per artifact (sha256, size, content-type, + S3 URI, https URL) into ``logs/run_/artifacts.json``. + +The KCIDB submitter (currently ``pull_labs_poller``) is expected to consume +this manifest and pass each entry's ``log_url`` field into +``kcidb_submit.build_test_row(log_url=..., output_files=[...])`` keyed by +``(test, instance_id)``. If the project later switches to files.kernelci.org, +only the URL-construction step in this module changes — the manifest schema +and the submitter integration stay the same. +""" + +__authors__ = ["Denys Fedoryshchenko "] +__copyright__ = "Copyright (c) 2026 KernelCI project. All Rights Reserved." +# SPDX-License-Identifier: Apache-2.0 + + +import hashlib +import json +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +from kernel_ci_cloud_labs.core.logging_config import get_logger + +logger = get_logger(__name__) + + +# Schema version for artifacts.json. Bump when the on-disk format changes +# in a way an external uploader needs to know about. +ARTIFACTS_MANIFEST_VERSION = 1 + +# Files we know about under the per-instance S3 output prefix. Each entry +# maps the basename in S3 to (kcidb_role, content_type). `kcidb_role` +# steers a future uploader's decision between populating ``log_url`` (a +# single primary log) or appending to ``output_files`` (everything else). +# +# "console-output.log" is the boot log — the natural ``log_url`` for boot +# tests. For functional tests the executor may later prefer the latest +# ``run-N-output.log``; that decision is left to the uploader/poller. +_KNOWN_ARTIFACT_KINDS: Dict[str, Tuple[str, str]] = { + "console-output.log": ("log", "text/plain; charset=utf-8"), +} + + +def _sha256_file(path: Path) -> str: + h = hashlib.sha256() + with path.open("rb") as f: + for chunk in iter(lambda: f.read(1024 * 1024), b""): + h.update(chunk) + return h.hexdigest() + + +def _download_to(s3_client, bucket: str, key: str, dest: Path) -> Optional[int]: + """Download ``s3://bucket/key`` to ``dest``. Returns size in bytes, or + None if the key does not exist. + + NoSuchKey is the expected outcome for instances where launch_vm.py + failed before the console buffer was uploaded — we want a quiet skip, + not a noisy warning. + """ + try: + dest.parent.mkdir(parents=True, exist_ok=True) + obj = s3_client.get_object(Bucket=bucket, Key=key) + body = obj["Body"].read() + dest.write_bytes(body) + return len(body) + except s3_client.exceptions.NoSuchKey: + return None + except Exception as e: # pylint: disable=broad-exception-caught + logger.warning("Failed to download s3://%s/%s: %s", bucket, key, e) + return None + + +def _discover_instances(s3_client, bucket: str, run_prefix: str) -> List[Tuple[str, str]]: + """List (test_name, instance_id) pairs under ``run_prefix`` in S3. + + The layout written by launch_vm.py is: + + /test_/output// + + We list ``run_prefix/`` with delimiter='/' twice to walk the two + intermediate directories without dragging back every file in the run. + """ + pairs: List[Tuple[str, str]] = [] + try: + test_dirs = s3_client.list_objects_v2( + Bucket=bucket, + Prefix=f"{run_prefix}/", + Delimiter="/", + ).get("CommonPrefixes", []) + except Exception as e: # pylint: disable=broad-exception-caught + logger.warning("Could not list S3 prefix %s/: %s", run_prefix, e) + return pairs + + for tdir in test_dirs: + test_prefix = tdir.get("Prefix", "") + # Expect ".../test_/" — anything else (e.g. the bare TEST_CONFIG + # JSON sitting in run_prefix/) is not an instance container. + leaf = test_prefix.rstrip("/").rsplit("/", 1)[-1] + if not leaf.startswith("test_"): + continue + test_name = leaf[len("test_") :] + + try: + inst_dirs = s3_client.list_objects_v2( + Bucket=bucket, + Prefix=f"{test_prefix}output/", + Delimiter="/", + ).get("CommonPrefixes", []) + except Exception as e: # pylint: disable=broad-exception-caught + logger.warning("Could not list S3 prefix %soutput/: %s", test_prefix, e) + continue + + for idir in inst_dirs: + iprefix = idir.get("Prefix", "") + instance_id = iprefix.rstrip("/").rsplit("/", 1)[-1] + if instance_id: + pairs.append((test_name, instance_id)) + + return pairs + + +def s3_public_url(bucket: str, region: str, key: str) -> str: + """Virtual-hosted-style public HTTPS URL for an S3 object. + + Form: ``https://.s3..amazonaws.com/``. Only + resolves when the bucket policy grants anonymous ``s3:GetObject`` on + the key — see ``setup_validate.check_s3_logs_public_policy``. + """ + return f"https://{bucket}.s3.{region}.amazonaws.com/{key}" + + +def collect_run_artifacts( + run_dir: Path, + *, + s3_client, + bucket: str, + region: str, + run_prefix: str, + origin: Optional[str] = None, +) -> Dict[str, Any]: + """Collect uploadable artifacts for a finished run. + + For every (test, instance) discovered under ``run_prefix`` in S3 this: + * downloads ``console-output.log`` into ``run_dir/vms/-console.log``; + * records a manifest entry with sha256/size/content-type, the S3 URI + it was fetched from, and the path it should land at on the remote + store. + + The manifest is written to ``run_dir/artifacts.json`` and also returned. + + Args: + run_dir: Local directory for this pipeline run (already created). + s3_client: boto3 S3 client. + bucket: S3 bucket name. + region: AWS region (used to build the public HTTPS URL). + run_prefix: Per-run S3 prefix (e.g. ``run__``). + origin: Optional KCIDB origin string; embedded into the manifest for + traceability. + + Returns: + The manifest dict (also persisted to ``artifacts.json``). + """ + run_dir = Path(run_dir) + vms_dir = run_dir / "vms" + vms_dir.mkdir(parents=True, exist_ok=True) + + pairs = _discover_instances(s3_client, bucket, run_prefix) + if not pairs: + logger.info("No per-instance S3 output found under %s/", run_prefix) + + entries: List[Dict[str, Any]] = [] + for test_name, instance_id in pairs: + s3_key = f"{run_prefix}/test_{test_name}/output/{instance_id}/console-output.log" + local_path = vms_dir / f"{instance_id}-console.log" + s3_uri = f"s3://{bucket}/{s3_key}" + log_url = s3_public_url(bucket, region, s3_key) + role, ctype = _KNOWN_ARTIFACT_KINDS["console-output.log"] + + size = _download_to(s3_client, bucket, s3_key, local_path) + if size is None: + # Console buffer never made it to S3 — typical when the EC2 + # GetConsoleOutput call returned an empty buffer (very early + # boot failure) or the launcher exited before cleanup. We still + # emit a 'missing' entry so the KCIDB submitter knows there is + # nothing to link for this instance. + entries.append( + { + "test": test_name, + "instance_id": instance_id, + "kind": "console-output.log", + "kcidb_role": role, + "status": "missing", + "s3_uri": s3_uri, + "log_url": None, + "local_path": None, + "sha256": None, + "size_bytes": 0, + "content_type": ctype, + } + ) + continue + + entries.append( + { + "test": test_name, + "instance_id": instance_id, + "kind": "console-output.log", + "kcidb_role": role, + "status": "ready", + "s3_uri": s3_uri, + "log_url": log_url, + "local_path": str(local_path.relative_to(run_dir)), + "sha256": _sha256_file(local_path), + "size_bytes": size, + "content_type": ctype, + } + ) + + manifest = { + "schema_version": ARTIFACTS_MANIFEST_VERSION, + "generated_at": datetime.now(timezone.utc).isoformat(timespec="seconds"), + "run_prefix": run_prefix, + "s3_bucket": bucket, + "origin": origin, + "artifacts": entries, + } + + manifest_path = run_dir / "artifacts.json" + with manifest_path.open("w", encoding="utf-8") as f: + json.dump(manifest, f, indent=2) + f.write("\n") + + ready = sum(1 for e in entries if e["status"] == "ready") + missing = sum(1 for e in entries if e["status"] == "missing") + logger.info( + "✓ Wrote artifacts manifest (%d ready, %d missing) to %s", + ready, + missing, + manifest_path, + ) + return manifest + + diff --git a/src/kernel_ci_cloud_labs/core/pipeline.py b/src/kernel_ci_cloud_labs/core/pipeline.py index 1849e5f..3adbdca 100644 --- a/src/kernel_ci_cloud_labs/core/pipeline.py +++ b/src/kernel_ci_cloud_labs/core/pipeline.py @@ -11,6 +11,7 @@ from pathlib import Path from kernel_ci_cloud_labs.auth.aws_cloudwatch_manager import AWSCloudWatchManager +from kernel_ci_cloud_labs.core.artifacts import collect_run_artifacts from kernel_ci_cloud_labs.core.logging_config import create_run_directory, get_logger logger = get_logger(__name__) @@ -235,6 +236,48 @@ def create_summary(run_dir, start_time, task_arn, expected_vm_count=None, s3_con return summary +def _warn_if_logs_not_public(provider, storage): + """Read-only probe of the bucket's public-read policy for boot logs. + + Imported lazily so that `core.pipeline` does not pull in setup_validate + (and its boto3 surface) at module import time for callers that only + need parse_vm_logs / create_summary. + """ + bucket = getattr(storage, "bucket", None) + if not bucket: + return + try: + from kernel_ci_cloud_labs.setup_validate import ( # local import: optional + _PUBLIC_LOGS_SID, + _expected_bucket_policy, + _statement_matches, + ) + + s3_client = provider.auth.get_client("s3") if hasattr(provider, "auth") else None + if s3_client is None: + return + try: + policy_str = s3_client.get_bucket_policy(Bucket=bucket)["Policy"] + policy = json.loads(policy_str) + except Exception: # pylint: disable=broad-exception-caught + policy = None + + statements = (policy or {}).get("Statement", []) + expected = _expected_bucket_policy(bucket)["Statement"][0] + match = next((s for s in statements if s.get("Sid") == _PUBLIC_LOGS_SID), None) + if match and _statement_matches(match, expected): + return + logger.warning( + "Bucket %s is missing the PublicReadKernelBootLogs policy; " + "boot-log URLs published to KCIDB will return AccessDenied. " + "Run `aws setup validate --bucket %s --fix` to repair.", + bucket, + bucket, + ) + except Exception as e: # pylint: disable=broad-exception-caught + logger.debug("Policy probe skipped: %s", e) + + def run_pipeline( provider, storage, run_dir=None ): # pylint: disable=too-many-locals,too-many-branches,too-many-statements @@ -251,6 +294,12 @@ def run_pipeline( logger.info("Run directory: %s", run_dir) logger.debug("Provider: %s, Storage: %s", type(provider).__name__, type(storage).__name__) + # Lightweight check: are kernel boot logs going to be reachable by KCIDB + # dashboard users via the public S3 URL we'll publish? We only warn — a + # broken policy doesn't justify aborting a run, and `aws setup validate + # --fix` is the supported way to repair it. + _warn_if_logs_not_public(provider, storage) + try: # Get test info from config test_id = None @@ -497,6 +546,28 @@ def run_pipeline( logger.info("-" * 60) + # Pull kernel boot logs from S3 to logs/run_/vms/-console.log + # and emit artifacts.json — the manifest the KCIDB submitter + # consumes to populate tests[*].log_url. Failures here are + # non-fatal: the test results in S3 remain the source of truth. + try: + logger.info("\n=== Collecting boot logs & artifacts manifest ===") + s3_client = provider.auth.get_client("s3") + origin = None + if hasattr(provider, "config") and provider.config: + origin = provider.config.get("kernelci", {}).get("kcidb_origin") + region = provider.config.get("region", "") if hasattr(provider, "config") else "" + collect_run_artifacts( + Path(run_dir), + s3_client=s3_client, + bucket=storage.bucket, + region=region, + run_prefix=run_prefix, + origin=origin, + ) + except Exception as e: # pylint: disable=broad-exception-caught + logger.warning("Could not collect artifacts manifest: %s", e) + # Benchmark regression analysis try: from kernel_ci_cloud_labs.core.benchmark_analyzer import ( diff --git a/src/kernel_ci_cloud_labs/launch_vm.py b/src/kernel_ci_cloud_labs/launch_vm.py index 137dfec..7cc9b5a 100644 --- a/src/kernel_ci_cloud_labs/launch_vm.py +++ b/src/kernel_ci_cloud_labs/launch_vm.py @@ -72,6 +72,12 @@ def __init__(self, vm_config=None): self.test_id = f"{self.test}-{str(uuid.uuid4())[:8]}" self.instance_id = None + # Console-output capture is idempotent: once we've uploaded a non-empty + # buffer we don't re-upload. The buffer in EC2 only grows, so any + # second call would either be identical or strictly larger — we + # accept "strictly larger" by allowing capture again until we get a + # non-empty response. + self._console_captured = False # Configure boto3 with exponential backoff retry strategy retry_config = Config( @@ -302,6 +308,10 @@ def execute_test_via_ssm(self): # pylint: disable=too-many-statements elif status == "Cancelled": log_error("✗ SSM command was cancelled") + # Grab console buffer before the VM is terminated by + # cleanup() — the tail (panic / OOM / kernel trace) is + # the most useful artifact when SSM did not return Success. + self.capture_console_output(reason="ssm-failure") return False except self.ssm.exceptions.InvocationDoesNotExist: @@ -319,6 +329,10 @@ def execute_test_via_ssm(self): # pylint: disable=too-many-statements log_not(" Cancelled SSM command") except Exception: pass + # Same rationale as the SSM-failure branch above: capture the console + # buffer now, while the VM is still alive, so a panic on the watchdog + # path is not lost when cleanup() races the EC2 GetConsoleOutput lag. + self.capture_console_output(reason="ssm-failure") return False def check_test_result(self): @@ -346,12 +360,29 @@ def check_test_result(self): log_error(f"✗ Failed to read result.txt: {e}") return False - def capture_console_output(self): - """Fetch EC2 serial console output (kernel boot log) and upload to S3.""" + def capture_console_output(self, reason="cleanup"): + """Fetch EC2 serial console output (kernel boot log) and upload to S3. + + Safe to call multiple times: subsequent calls will overwrite the S3 + object with a (typically larger) snapshot. Once we have successfully + captured a non-empty buffer, later calls are skipped — except when + forced via a different `reason` from the SSM-failure path, where the + most recent state of the buffer is more interesting than an earlier + capture taken before the failure was visible. + + Args: + reason: Free-text label for the call site (logged for diagnostics). + Currently used values: "cleanup", "ssm-failure". + """ if not self.instance_id: return + if self._console_captured and reason == "cleanup": + # cleanup() always runs in the finally block, even if we already + # grabbed the buffer on SSM failure. Skip the redundant fetch. + log_not(" Console output already captured this run, skipping") + return - log_not("\n=== Capturing console output ===") + log_not(f"\n=== Capturing console output ({reason}) ===") try: resp = self.ec2.get_console_output(InstanceId=self.instance_id, Latest=True) except Exception as e: @@ -376,14 +407,16 @@ def capture_console_output(self): Key=s3_key, Body=output.encode("utf-8"), ContentType="text/plain; charset=utf-8", + Metadata={"capture-reason": reason}, ) log_not(f"✓ Console output uploaded ({len(output)} bytes) to s3://{self.s3_bucket}/{s3_key}") + self._console_captured = True except Exception as e: log_not(f" Failed to upload console output: {e}") def cleanup(self): """Capture console output, then terminate instance.""" - self.capture_console_output() + self.capture_console_output(reason="cleanup") if self.instance_id: log_not(f"\n=== Terminating instance {self.instance_id} ===") diff --git a/src/kernel_ci_cloud_labs/setup_validate.py b/src/kernel_ci_cloud_labs/setup_validate.py index a158e42..bb51f5b 100644 --- a/src/kernel_ci_cloud_labs/setup_validate.py +++ b/src/kernel_ci_cloud_labs/setup_validate.py @@ -70,14 +70,18 @@ def _create_s3_bucket(s3, bucket_name: str, region: str) -> bool: Bucket=bucket_name, CreateBucketConfiguration={"LocationConstraint": region}, ) - # Default-on: block public access; modern best practice. + # Block public ACLs (modern best practice) but allow bucket + # *policies* to grant access — kernel boot logs published to KCIDB + # are made public-readable via a narrow bucket-policy statement + # added by check_s3_logs_public_policy(). Without these two flags + # set to False, that policy would be rejected by AWS. s3.put_public_access_block( Bucket=bucket_name, PublicAccessBlockConfiguration={ "BlockPublicAcls": True, "IgnorePublicAcls": True, - "BlockPublicPolicy": True, - "RestrictPublicBuckets": True, + "BlockPublicPolicy": False, + "RestrictPublicBuckets": False, }, ) print(f"✓ Created S3 bucket: {bucket_name}") @@ -87,6 +91,172 @@ def _create_s3_bucket(s3, bucket_name: str, region: str) -> bool: return False +# Resource pattern allowed by the public-read bucket policy. We expose +# *only* kernel boot console logs (one file per VM instance per test), +# so the rest of the bucket — test payloads, results.txt, stats.json, +# benchmark CSVs — stays private. The pattern matches the layout written +# by `launch_vm.capture_console_output`: +# /test_/output//console-output.log +_PUBLIC_LOGS_KEY_PATTERN = "*/test_*/output/*/console-output.log" +_PUBLIC_LOGS_SID = "PublicReadKernelBootLogs" + + +def _expected_bucket_policy(bucket_name: str) -> dict: + """Bucket policy that grants anonymous GET on the boot-log prefix only.""" + return { + "Version": "2012-10-17", + "Statement": [ + { + "Sid": _PUBLIC_LOGS_SID, + "Effect": "Allow", + "Principal": "*", + "Action": "s3:GetObject", + "Resource": f"arn:aws:s3:::{bucket_name}/{_PUBLIC_LOGS_KEY_PATTERN}", + } + ], + } + + +def check_s3_logs_public_policy(bucket_name: str, region: str, fix: bool = False) -> bool: + """Verify the bucket allows anonymous GET on kernel boot logs. + + Two preconditions: + 1. The bucket's PublicAccessBlock must permit bucket policies + (BlockPublicPolicy=False, RestrictPublicBuckets=False). ACL-based + public access stays blocked — we publish via bucket policy only. + 2. A statement with Sid='PublicReadKernelBootLogs' must grant + s3:GetObject on the console-output.log pattern to Principal:*. + + With ``fix=True``, both preconditions are repaired: + - PublicAccessBlock is rewritten with ACLs still blocked but bucket + policies permitted; + - the bucket policy is merged with the expected statement (existing + statements are preserved; an existing statement with the same Sid + is replaced). + """ + print(f"\n=== Checking S3 logs public-read policy: {bucket_name} ===") + s3 = boto3.client("s3", region_name=region) + + # --- PublicAccessBlock ---------------------------------------------------- + pab_ok = _check_public_access_block(s3, bucket_name, fix=fix) + + # --- Bucket policy -------------------------------------------------------- + policy_ok = _check_bucket_policy_statement(s3, bucket_name, fix=fix) + + return pab_ok and policy_ok + + +def _check_public_access_block(s3, bucket_name: str, fix: bool) -> bool: + """Ensure BlockPublicPolicy / RestrictPublicBuckets are False.""" + desired = { + "BlockPublicAcls": True, + "IgnorePublicAcls": True, + "BlockPublicPolicy": False, + "RestrictPublicBuckets": False, + } + try: + current = s3.get_public_access_block(Bucket=bucket_name)[ + "PublicAccessBlockConfiguration" + ] + except ClientError as e: + code = e.response.get("Error", {}).get("Code", "") + if code != "NoSuchPublicAccessBlockConfiguration": + print(f"✗ Could not read PublicAccessBlock ({code}): {e}") + return False + current = {} + + blockers = [k for k in ("BlockPublicPolicy", "RestrictPublicBuckets") if current.get(k, True)] + if not blockers: + print("✓ PublicAccessBlock permits bucket policies") + return True + + print(f"✗ PublicAccessBlock blocks public policies: {', '.join(blockers)}=True") + if not fix: + print(" (pass --fix to relax; ACLs will stay blocked)") + return False + try: + s3.put_public_access_block( + Bucket=bucket_name, + PublicAccessBlockConfiguration=desired, + ) + print("✓ PublicAccessBlock updated (ACLs still blocked)") + return True + except ClientError as e: + print(f"✗ Failed to update PublicAccessBlock: {e}") + return False + + +def _check_bucket_policy_statement(s3, bucket_name: str, fix: bool) -> bool: + """Ensure the bucket policy has our PublicReadKernelBootLogs statement.""" + expected_stmt = _expected_bucket_policy(bucket_name)["Statement"][0] + expected_resource = expected_stmt["Resource"] + + try: + policy_str = s3.get_bucket_policy(Bucket=bucket_name)["Policy"] + existing = json.loads(policy_str) + except ClientError as e: + code = e.response.get("Error", {}).get("Code", "") + if code != "NoSuchBucketPolicy": + print(f"✗ Could not read bucket policy ({code}): {e}") + return False + existing = None + except (ValueError, json.JSONDecodeError) as e: + print(f"✗ Bucket policy is not valid JSON: {e}") + return False + + statements = (existing or {}).get("Statement", []) + match = next((s for s in statements if s.get("Sid") == _PUBLIC_LOGS_SID), None) + if match and _statement_matches(match, expected_stmt): + print(f"✓ Bucket policy grants public-read on {_PUBLIC_LOGS_KEY_PATTERN}") + return True + + if match: + print(f"✗ Statement '{_PUBLIC_LOGS_SID}' exists but does not match expected shape") + else: + print(f"✗ Statement '{_PUBLIC_LOGS_SID}' missing from bucket policy") + print(f" Expected Resource: {expected_resource}") + if not fix: + print(" (pass --fix to add/replace the statement)") + return False + + # Merge: drop any existing statement with our Sid, append the expected one. + new_statements = [s for s in statements if s.get("Sid") != _PUBLIC_LOGS_SID] + new_statements.append(expected_stmt) + new_policy = { + "Version": (existing or {}).get("Version", "2012-10-17"), + "Statement": new_statements, + } + try: + s3.put_bucket_policy(Bucket=bucket_name, Policy=json.dumps(new_policy)) + print("✓ Bucket policy updated with PublicReadKernelBootLogs statement") + return True + except ClientError as e: + print(f"✗ Failed to update bucket policy: {e}") + return False + + +def _statement_matches(actual: dict, expected: dict) -> bool: + """Compare the fields we care about; tolerate extra keys AWS may add.""" + if actual.get("Effect") != expected["Effect"]: + return False + if actual.get("Principal") != expected["Principal"]: + return False + # Action and Resource may come back as strings or single-item lists. + if _as_set(actual.get("Action")) != _as_set(expected["Action"]): + return False + if _as_set(actual.get("Resource")) != _as_set(expected["Resource"]): + return False + return True + + +def _as_set(value) -> set: + if value is None: + return set() + if isinstance(value, list): + return set(value) + return {value} + + def check_console_output_permission(region: Optional[str] = None) -> bool: """Probe ec2:GetConsoleOutput. Uses a non-existent instance id so the only permission we test is the IAM action itself.""" @@ -247,6 +417,12 @@ def validate(bucket: Optional[str] = None, if bucket: results["s3_bucket"] = check_s3_bucket(bucket, region, fix=fix) + # Only check the policy if the bucket exists/was created — checking + # against a missing bucket produces a confusing cascade of errors. + if results["s3_bucket"]: + results["s3_logs_public_policy"] = check_s3_logs_public_policy( + bucket, region, fix=fix + ) results["kernelci_api_token"] = check_kernelci_api_token(api_base_uri, api_token) results["kcidb_jwt"] = check_kcidb_jwt() diff --git a/tests/test_artifacts.py b/tests/test_artifacts.py new file mode 100644 index 0000000..0a25610 --- /dev/null +++ b/tests/test_artifacts.py @@ -0,0 +1,173 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# Copyright (C) 2026 Collabora Limited +# Author: Denys Fedoryshchenko + +"""Unit tests for core.artifacts (boot-log collection + KCIDB manifest).""" + +import json +from unittest.mock import MagicMock + +from kernel_ci_cloud_labs.core.artifacts import ( + ARTIFACTS_MANIFEST_VERSION, + _discover_instances, + collect_run_artifacts, + s3_public_url, +) + + +# Minimal fake S3 client. boto3 attaches an exceptions.NoSuchKey class to +# the client at construction; the artifacts module references that exact +# class, so we have to expose it on the mock the same way. +class _NoSuchKey(Exception): + pass + + +def _make_s3_mock(*, listings=None, objects=None): + """Build a MagicMock S3 client. + + Args: + listings: dict keyed by (Prefix, Delimiter) → list_objects_v2 response. + objects: dict keyed by S3 key → bytes for get_object. Missing keys + raise NoSuchKey. + """ + listings = listings or {} + objects = objects or {} + + s3 = MagicMock() + s3.exceptions.NoSuchKey = _NoSuchKey + + def fake_list(Bucket, Prefix, Delimiter=None): # pylint: disable=invalid-name,unused-argument + return listings.get((Prefix, Delimiter), {"CommonPrefixes": [], "Contents": []}) + + def fake_get(Bucket, Key): # pylint: disable=invalid-name,unused-argument + if Key not in objects: + raise _NoSuchKey(Key) + body = MagicMock() + body.read.return_value = objects[Key] + return {"Body": body} + + s3.list_objects_v2.side_effect = fake_list + s3.get_object.side_effect = fake_get + return s3 + + +def test_s3_public_url_shape(): + """Public URL is virtual-hosted style with the region path segment.""" + url = s3_public_url("kernel-ci-bucket", "us-west-2", "run_x/foo.log") + assert url == "https://kernel-ci-bucket.s3.us-west-2.amazonaws.com/run_x/foo.log" + + +def test_discover_instances_filters_non_test_dirs(): + """Only ``test_/`` subdirectories are walked; stray files ignored.""" + s3 = _make_s3_mock( + listings={ + ("run_abc/", "/"): { + "CommonPrefixes": [ + {"Prefix": "run_abc/test_baseline/"}, + {"Prefix": "run_abc/test_unixbench/"}, + # Stray non-test directory (e.g. the TEST_CONFIG dir). + {"Prefix": "run_abc/_config/"}, + ], + }, + ("run_abc/test_baseline/output/", "/"): { + "CommonPrefixes": [ + {"Prefix": "run_abc/test_baseline/output/i-aaaa1111/"}, + ], + }, + ("run_abc/test_unixbench/output/", "/"): { + "CommonPrefixes": [ + {"Prefix": "run_abc/test_unixbench/output/i-bbbb2222/"}, + {"Prefix": "run_abc/test_unixbench/output/i-cccc3333/"}, + ], + }, + ("run_abc/_config/output/", "/"): {"CommonPrefixes": []}, + } + ) + + pairs = sorted(_discover_instances(s3, "bucket", "run_abc")) + assert pairs == [ + ("baseline", "i-aaaa1111"), + ("unixbench", "i-bbbb2222"), + ("unixbench", "i-cccc3333"), + ] + + +def test_collect_run_artifacts_ready_and_missing(tmp_path): + """End-to-end: one instance has a console log, another does not.""" + # i-aaaa1111 has a buffer in S3; i-bbbb2222 was terminated before upload. + console_bytes = b"[ 0.000000] Linux version 6.10 ...\n" + s3 = _make_s3_mock( + listings={ + ("run_xy/", "/"): { + "CommonPrefixes": [{"Prefix": "run_xy/test_boot/"}], + }, + ("run_xy/test_boot/output/", "/"): { + "CommonPrefixes": [ + {"Prefix": "run_xy/test_boot/output/i-aaaa1111/"}, + {"Prefix": "run_xy/test_boot/output/i-bbbb2222/"}, + ], + }, + }, + objects={ + "run_xy/test_boot/output/i-aaaa1111/console-output.log": console_bytes, + # i-bbbb2222/console-output.log intentionally absent → NoSuchKey + }, + ) + + manifest = collect_run_artifacts( + tmp_path, + s3_client=s3, + bucket="my-bucket", + region="eu-west-1", + run_prefix="run_xy", + origin="myorigin", + ) + + # On-disk manifest exists and matches the returned dict. + on_disk = json.loads((tmp_path / "artifacts.json").read_text()) + assert on_disk == manifest + + assert manifest["schema_version"] == ARTIFACTS_MANIFEST_VERSION + assert manifest["s3_bucket"] == "my-bucket" + assert manifest["origin"] == "myorigin" + assert manifest["run_prefix"] == "run_xy" + + by_id = {e["instance_id"]: e for e in manifest["artifacts"]} + assert set(by_id) == {"i-aaaa1111", "i-bbbb2222"} + + ready = by_id["i-aaaa1111"] + assert ready["status"] == "ready" + assert ready["kcidb_role"] == "log" + assert ready["size_bytes"] == len(console_bytes) + # sha256 of known bytes is deterministic — assert it's a 64-char hex. + assert ready["sha256"] is not None and len(ready["sha256"]) == 64 + assert ready["log_url"] == ( + "https://my-bucket.s3.eu-west-1.amazonaws.com/" + "run_xy/test_boot/output/i-aaaa1111/console-output.log" + ) + # Local file actually written. + local = tmp_path / ready["local_path"] + assert local.read_bytes() == console_bytes + + missing = by_id["i-bbbb2222"] + assert missing["status"] == "missing" + assert missing["log_url"] is None + assert missing["local_path"] is None + assert missing["sha256"] is None + assert missing["size_bytes"] == 0 + + +def test_collect_run_artifacts_empty_run(tmp_path): + """Run prefix with no test_* subdirs produces an empty (but valid) manifest.""" + s3 = _make_s3_mock(listings={("run_empty/", "/"): {"CommonPrefixes": []}}) + + manifest = collect_run_artifacts( + tmp_path, + s3_client=s3, + bucket="b", + region="us-east-1", + run_prefix="run_empty", + ) + assert manifest["artifacts"] == [] + assert (tmp_path / "artifacts.json").exists() From a85ebf3bb0fc59dc3a4acb94119e5ac59dc05f3d Mon Sep 17 00:00:00 2001 From: Denys Fedoryshchenko Date: Sat, 23 May 2026 02:26:16 +0300 Subject: [PATCH 2/3] Implement log scrubber Signed-off-by: Denys Fedoryshchenko --- src/kernel_ci_cloud_labs/core/log_scrub.py | 146 +++++++++++++++++ src/kernel_ci_cloud_labs/launch_vm.py | 18 ++- tests/test_log_scrub.py | 175 +++++++++++++++++++++ 3 files changed, 338 insertions(+), 1 deletion(-) create mode 100644 src/kernel_ci_cloud_labs/core/log_scrub.py create mode 100644 tests/test_log_scrub.py diff --git a/src/kernel_ci_cloud_labs/core/log_scrub.py b/src/kernel_ci_cloud_labs/core/log_scrub.py new file mode 100644 index 0000000..1486322 --- /dev/null +++ b/src/kernel_ci_cloud_labs/core/log_scrub.py @@ -0,0 +1,146 @@ +"""Redact obvious secrets from text destined for a public S3 object. + +The kernel boot console buffer captured by ``launch_vm.capture_console_output`` +is uploaded to a public-read S3 bucket and the URL is published to KCIDB. +That buffer is almost always pure kernel boot output (uninteresting from a +secrets standpoint), but two things can leak credentials into it: + + * The EC2 user-data script in ``launch_vm.spawn_vm`` runs with ``set -x`` + under ``logger -t user-data -s 2>/dev/console``, so anything it echoes + lands in the console buffer. We don't put secrets there today, but a + future edit easily could. + + * Userland on the test VM is free to write to ``/dev/kmsg`` or + ``/dev/console``; a misbehaving test that echoes a token to either + surface would publish it. + +Rather than rely on every future contributor remembering this, scrub the +buffer once at the upload boundary. Patterns are deliberately conservative: +they match well-known credential shapes and the obvious ``KEY=VALUE`` +spellings, not anything that "looks" like a secret. False negatives are +expected; false positives that mangle legitimate boot output (PCI IDs, +MAC addresses, kernel version strings) are not. + +The function returns the scrubbed text and a per-rule counter so the caller +can log how much was redacted without echoing the originals. +""" + +__authors__ = ["Denys Fedoryshchenko "] +__copyright__ = "Copyright (c) 2026 KernelCI project. All Rights Reserved." +# SPDX-License-Identifier: Apache-2.0 + + +import re +from typing import Dict, Tuple + +# Replacement marker that survives grep — ``REDACTED`` alone is too common +# in real output, the bracketed form is not. +REDACTION = "[REDACTED:{kind}]" + + +def _redactor(kind: str): + """Build a re.sub replacement that fills in the rule name on substitution.""" + return lambda _m: REDACTION.format(kind=kind) + + +# Each rule: (name, compiled_regex). Order matters only when patterns can +# overlap — generally bearer/JWT > AWS keys > generic KEY=VALUE so the +# more-specific match wins. +_RULES = [ + # PEM-armored private keys. DOTALL so the body matches across newlines. + ( + "pem-private-key", + re.compile( + r"-----BEGIN [A-Z0-9 ]*PRIVATE KEY-----.*?-----END [A-Z0-9 ]*PRIVATE KEY-----", + re.DOTALL, + ), + ), + # OpenSSH authorized_keys form (single line). + ( + "ssh-public-key", + re.compile(r"\bssh-(?:rsa|ed25519|dss|ecdsa-[a-z0-9-]+)\s+[A-Za-z0-9+/=]{40,}"), + ), + # JWTs: three base64url segments separated by dots; second segment starts + # with "eyJ" (the JSON header begins with '{'). + ( + "jwt", + re.compile(r"\beyJ[A-Za-z0-9_\-]+\.eyJ[A-Za-z0-9_\-]+\.[A-Za-z0-9_\-]+"), + ), + # GitHub token families. Length suffixes are the published lower bounds. + ("github-token", re.compile(r"\b(?:ghp|gho|ghu|ghs|ghr)_[A-Za-z0-9]{36,}\b")), + # AWS access key IDs: AKIA = long-lived user, ASIA = STS temporary. + ("aws-access-key-id", re.compile(r"\b(?:AKIA|ASIA)[0-9A-Z]{16}\b")), + # "Authorization: Bearer ..." / "Bearer eyJ..." style. Group 1 captures + # the "Bearer " keyword + whitespace so the substitution can put it back + # — a scrubbed log that still shows the keyword is easier to triage. + ( + "bearer-token", + re.compile(r"\b(Bearer\s+)([A-Za-z0-9._\-+/=]{8,})", re.IGNORECASE), + ), + # Generic KEY=VALUE / key: value with a secret-shaped name. The value + # must look credential-ish (>=8 non-space chars, contains some entropy); + # we don't try to be clever, just non-empty up to whitespace. + ( + "credential-kv", + re.compile( + r"""(?ix) + \b + (?: password | passwd | secret | api[_-]?key | access[_-]?key + | private[_-]?key | session[_-]?token | auth[_-]?token + | client[_-]?secret ) + \s* [:=] \s* + ['"]? + ([^\s'"]{6,}) + ['"]? + """ + ), + ), +] + + +def scrub_text(text: str) -> Tuple[str, Dict[str, int]]: + """Apply every redaction rule to ``text``. + + Returns the scrubbed text and a ``{rule_name: hits}`` dict (rules with + zero hits are omitted). The function is pure: same input → same output. + + Empty input returns ``("", {})`` without touching the regex engine. + """ + if not text: + return text, {} + + counts: Dict[str, int] = {} + out = text + for name, rx in _RULES: + # Some rules keep part of the match visible for grep-ability: + # * credential-kv: keep "KEY=" / "KEY:", redact the value (group 1). + # * bearer-token: keep "Bearer " (group 1), redact the token + # (group 2). + # Everything else replaces the whole match. + if name == "credential-kv": + def repl(m, _kind=name): + return m.group(0).replace(m.group(1), REDACTION.format(kind=_kind)) + + out, n = rx.subn(repl, out) + elif name == "bearer-token": + def repl_bearer(m, _kind=name): + return m.group(1) + REDACTION.format(kind=_kind) + + out, n = rx.subn(repl_bearer, out) + else: + out, n = rx.subn(_redactor(name), out) + if n: + counts[name] = n + return out, counts + + +def scrub_bytes(data: bytes, encoding: str = "utf-8") -> Tuple[bytes, Dict[str, int]]: + """Convenience wrapper for the upload path, which holds the buffer as bytes. + + Decodes with ``errors="replace"`` because boot consoles sometimes carry + stray non-UTF-8 bytes (early-boot framebuffer noise, partial UART frames) + and we'd rather scrub a slightly-mangled copy than skip scrubbing. + """ + text = data.decode(encoding, errors="replace") + scrubbed, counts = scrub_text(text) + return scrubbed.encode(encoding, errors="replace"), counts diff --git a/src/kernel_ci_cloud_labs/launch_vm.py b/src/kernel_ci_cloud_labs/launch_vm.py index 7cc9b5a..569ad68 100644 --- a/src/kernel_ci_cloud_labs/launch_vm.py +++ b/src/kernel_ci_cloud_labs/launch_vm.py @@ -15,6 +15,8 @@ import boto3 from botocore.config import Config +from kernel_ci_cloud_labs.core.log_scrub import scrub_text + def log_error(msg): """Print error to stderr (shows in container log).""" @@ -400,6 +402,15 @@ def capture_console_output(self, reason="cleanup"): except Exception: output = output_b64 + # Scrub before upload. The bucket is public-read (KCIDB dashboard users + # follow the URL we publish), so any secret that lands here would be + # world-visible. Counters are logged; original strings are not. + scrubbed, redaction_counts = scrub_text(output) + if redaction_counts: + summary = ", ".join(f"{k}={v}" for k, v in sorted(redaction_counts.items())) + log_not(f" Console scrub redacted: {summary}") + output = scrubbed + s3_key = f"{self.run_prefix}/test_{self.test}/output/{self.instance_id}/console-output.log" try: self.s3.put_object( @@ -407,7 +418,12 @@ def capture_console_output(self, reason="cleanup"): Key=s3_key, Body=output.encode("utf-8"), ContentType="text/plain; charset=utf-8", - Metadata={"capture-reason": reason}, + Metadata={ + "capture-reason": reason, + # Records that the buffer passed through the scrubber, so an + # operator inspecting the object knows it's not raw. + "scrubbed": "v1", + }, ) log_not(f"✓ Console output uploaded ({len(output)} bytes) to s3://{self.s3_bucket}/{s3_key}") self._console_captured = True diff --git a/tests/test_log_scrub.py b/tests/test_log_scrub.py new file mode 100644 index 0000000..0e517b7 --- /dev/null +++ b/tests/test_log_scrub.py @@ -0,0 +1,175 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# Copyright (C) 2026 KernelCI project +# Author: Denys Fedoryshchenko + +"""Unit tests for core.log_scrub. + +The scrubber runs at the upload boundary in launch_vm.capture_console_output, +just before put_object on a public-read bucket. These tests pin down two +things: + + * Known credential shapes are redacted and counted. + * Innocuous boot-console content (kernel banners, PCI IDs, MACs) is left + alone — false positives would mangle the very output that motivates + publishing the log. +""" + +from kernel_ci_cloud_labs.core.log_scrub import REDACTION, scrub_bytes, scrub_text + + +# ---- helpers ---------------------------------------------------------------- + + +def _marker(kind: str) -> str: + return REDACTION.format(kind=kind) + + +# ---- positive cases (must be redacted) -------------------------------------- + + +def test_jwt_is_redacted(): + sample = ( + "Authorization line: " + "eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.SflKxwRJSMeKKF2QT4fwpMeJf36" + ) + out, counts = scrub_text(sample) + assert counts == {"jwt": 1} + assert "eyJhbGciOiJIUzI1NiJ9" not in out + assert _marker("jwt") in out + + +def test_bearer_token_is_redacted_keyword_kept(): + sample = "GET / HTTP/1.1\\r\\nAuthorization: Bearer abc.DEF-123_xyz" + out, counts = scrub_text(sample) + assert counts.get("bearer-token") == 1 + assert "Bearer" in out # the literal keyword is preserved + assert "abc.DEF-123_xyz" not in out + + +def test_aws_access_key_id_is_redacted(): + sample = "creds dumped: AKIAIOSFODNN7EXAMPLE and ASIAYJRWVUHQQ4EXAMPL" + out, counts = scrub_text(sample) + assert counts.get("aws-access-key-id") == 2 + assert "AKIA" not in out + assert "ASIA" not in out + + +def test_github_pat_is_redacted(): + pat = "ghp_" + "A" * 36 + out, counts = scrub_text(f"token={pat} ok") + assert counts.get("github-token") == 1 + assert pat not in out + + +def test_pem_private_key_block_is_redacted(): + sample = ( + "before\n" + "-----BEGIN RSA PRIVATE KEY-----\n" + "MIIEowIBAAKCAQEAxxxxxxxx\nyyyyyyyyzzzz\n" + "-----END RSA PRIVATE KEY-----\n" + "after" + ) + out, counts = scrub_text(sample) + assert counts.get("pem-private-key") == 1 + assert "MIIEowIBAAKCAQEAxxxxxxxx" not in out + assert "before" in out and "after" in out + + +def test_ssh_public_key_is_redacted(): + key = "ssh-ed25519 AAAA" + "B" * 60 + " user@host" + out, counts = scrub_text(key) + assert counts.get("ssh-public-key") == 1 + assert "AAAAB" not in out + + +def test_credential_kv_password_export(): + # Mirrors what `set -x` would echo for the export loop in launch_vm.spawn_vm. + sample = "+ export PASSWORD=hunter2-secret" + out, counts = scrub_text(sample) + assert counts.get("credential-kv") == 1 + assert "hunter2-secret" not in out + # Key name is preserved for debuggability. + assert "PASSWORD=" in out + + +def test_credential_kv_session_token_quoted(): + sample = 'config: session_token="abc-def-1234-XYZ"' + out, counts = scrub_text(sample) + assert counts.get("credential-kv") == 1 + assert "abc-def-1234-XYZ" not in out + + +def test_multiple_secrets_counted_separately(): + sample = ( + "AKIAIOSFODNN7EXAMPLE\n" + "eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxIn0.signaturepart\n" + "Bearer abcdefghij\n" + ) + out, counts = scrub_text(sample) + assert counts["aws-access-key-id"] == 1 + assert counts["jwt"] == 1 + assert counts["bearer-token"] == 1 + assert "AKIA" not in out and "eyJhbGc" not in out and "abcdefghij" not in out + + +# ---- negative cases (must NOT be touched) ----------------------------------- + + +def test_kernel_boot_banner_is_untouched(): + """A canonical first second of dmesg should pass through verbatim.""" + sample = ( + "[ 0.000000] Linux version 6.10.0-1-amd64 (build@kbuilder) " + "(gcc-13 (Debian 13.2.0-23) 13.2.0, GNU ld (GNU Binutils for Debian) 2.42) " + "#1 SMP PREEMPT_DYNAMIC Debian 6.10.6-1 (2024-08-19)\n" + "[ 0.000123] BIOS-provided physical RAM map:\n" + "[ 0.000456] BIOS-e820: [mem 0x0000000000000000-0x000000000009fbff] usable\n" + ) + out, counts = scrub_text(sample) + assert out == sample + assert counts == {} + + +def test_pci_and_mac_addresses_are_untouched(): + sample = ( + "pci 0000:00:1f.2: reg 0x10: [io 0x0000-0x0007]\n" + "eth0: link up, 1000Mbps, full-duplex, lpa 0x45e1\n" + "eth0: hw_addr 00:11:22:33:44:55\n" + "wlan0: 02:00:00:aa:bb:cc associated with ap\n" + ) + out, counts = scrub_text(sample) + assert out == sample + assert counts == {} + + +def test_short_password_below_threshold_is_ignored(): + """The KV rule requires 6+ chars to limit false positives on noise.""" + sample = "password=x # placeholder" + out, counts = scrub_text(sample) + assert out == sample + assert counts == {} + + +def test_empty_input_short_circuits(): + out, counts = scrub_text("") + assert out == "" + assert counts == {} + + +# ---- bytes wrapper ---------------------------------------------------------- + + +def test_scrub_bytes_roundtrip(): + raw = b"AKIAIOSFODNN7EXAMPLE on a line\n" + out, counts = scrub_bytes(raw) + assert counts == {"aws-access-key-id": 1} + assert b"AKIA" not in out + assert isinstance(out, bytes) + + +def test_scrub_bytes_tolerates_invalid_utf8(): + """Boot consoles occasionally carry framebuffer/UART noise; we don't skip.""" + raw = b"prefix \xff\xfe AKIAIOSFODNN7EXAMPLE \xc3\x28 suffix" + out, counts = scrub_bytes(raw) + assert counts.get("aws-access-key-id") == 1 + assert b"AKIA" not in out From 0f2c779cd93617843a7ccc349f3fb6bea5d73fb6 Mon Sep 17 00:00:00 2001 From: Denys Fedoryshchenko Date: Sat, 23 May 2026 02:49:49 +0300 Subject: [PATCH 3/3] kcidb: emit one tests[*] row per VM with boot-log URL MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The poller used to submit one aggregated tests[*] row per test name with log_url=None. Now it emits one row per VM instance, joins each row with the boot-log URL collected in artifacts.json, and gives each row a stable test_id (instance_id) instead of a positional one. Also fixes a latent bug in parse_vm_logs: it globbed vms/*.log and so picked up -console.log files written by collect_run_artifacts. Those don't carry the SUCCESS marker, so without this fix every boot log was counted as a failed VM and total_vms was doubled. Behaviour summary: * pipeline.parse_vm_logs() skips *-console.log; reports a new per-instance `instances` list ({test, instance_id, status}). * pipeline.create_summary() exposes that as summary[vms][instances]. * pull_labs_poller._load_artifact_log_urls() indexes artifacts.json by (test, instance_id) — missing/corrupt manifest is non-fatal. * pull_labs_poller._extract_test_results() emits one row per VM with per-row log_url; legacy per-test fallback retained. * pull_labs_poller._handle_job(): test_id is now {node_id}.{iid}; misc carries instance_id for traceability. Signed-off-by: Denys Fedoryshchenko --- src/kernel_ci_cloud_labs/core/pipeline.py | 39 +++- src/kernel_ci_cloud_labs/pull_labs_poller.py | 115 +++++++++--- tests/test_pull_labs_poller.py | 181 +++++++++++++++++++ 3 files changed, 310 insertions(+), 25 deletions(-) diff --git a/src/kernel_ci_cloud_labs/core/pipeline.py b/src/kernel_ci_cloud_labs/core/pipeline.py index 3adbdca..96203f3 100644 --- a/src/kernel_ci_cloud_labs/core/pipeline.py +++ b/src/kernel_ci_cloud_labs/core/pipeline.py @@ -18,21 +18,41 @@ def parse_vm_logs(vms_dir): - """Parse VM log files and extract test statistics.""" + """Parse VM log files and extract test statistics. + + Expects one ``vms/.log`` per VM (written from CloudWatch + streams). Boot-console logs written alongside as + ``vms/-console.log`` by ``core.artifacts.collect_run_artifacts`` + are **skipped** — they don't carry the "Test execution completed: + SUCCESS" marker and would otherwise double-count VMs and report every + boot log as a failure. + + Returns: + dict with the aggregate counters plus an ``instances`` list: + ``[{"test": str, "instance_id": str, "status": "PASS"|"FAIL"}, ...]``. + The list is the per-VM ground truth consumed by the KCIDB submitter + (one tests[*] row per instance). + """ stats = { "total_vms": 0, "successful": 0, "failed": 0, "test_names": set(), "failed_tests": {}, + "instances": [], } if not vms_dir.exists(): return stats - for log_file in vms_dir.glob("*.log"): + for log_file in sorted(vms_dir.glob("*.log")): + # Boot-console logs share the directory but are not a per-VM execution + # log; see docstring. + if log_file.name.endswith("-console.log"): + continue + stats["total_vms"] += 1 - log_name = log_file.stem + instance_id = log_file.stem # the EC2 instance ID try: with open(log_file, "r", encoding="utf-8") as f: @@ -51,11 +71,17 @@ def parse_vm_logs(vms_dir): # Check success/failure if "Test execution completed: SUCCESS" in content: stats["successful"] += 1 + status = "PASS" else: stats["failed"] += 1 + status = "FAIL" if test_name not in stats["failed_tests"]: stats["failed_tests"][test_name] = [] - stats["failed_tests"][test_name].append(log_name) + stats["failed_tests"][test_name].append(instance_id) + + stats["instances"].append( + {"test": test_name, "instance_id": instance_id, "status": status} + ) except Exception as e: logger.warning("Could not parse VM log %s: %s", log_file, e) @@ -198,6 +224,11 @@ def create_summary(run_dir, start_time, task_arn, expected_vm_count=None, s3_con "missing": (expected_vm_count - vm_stats["total_vms"] if expected_vm_count is not None else 0), "test_names": sorted(list(vm_stats["test_names"])), "failed_by_test": vm_stats["failed_tests"], + # Per-instance ground truth. The KCIDB submitter + # (pull_labs_poller._extract_test_results) joins this list with + # artifacts.json by (test, instance_id) to attach a log_url to + # each tests[*] row. + "instances": vm_stats["instances"], }, } diff --git a/src/kernel_ci_cloud_labs/pull_labs_poller.py b/src/kernel_ci_cloud_labs/pull_labs_poller.py index be3bf5a..29d61c4 100644 --- a/src/kernel_ci_cloud_labs/pull_labs_poller.py +++ b/src/kernel_ci_cloud_labs/pull_labs_poller.py @@ -391,19 +391,77 @@ def _test_name_to_path(name: str) -> str: return "boot" if name.strip().lower() in _BOOT_TEST_NAMES else name +def _load_artifact_log_urls(run_directory: Optional[str]) -> Dict[Tuple[str, str], str]: + """Read ``artifacts.json`` and index its ``log_url``s by (test, instance_id). + + Returns an empty dict on any error (missing file, malformed JSON, schema + mismatch). Missing log URLs in KCIDB are a quality issue; failing to + submit the run is a worse outcome. + """ + if not run_directory: + return {} + path = os.path.join(run_directory, "artifacts.json") + if not os.path.isfile(path): + return {} + try: + with open(path, "r", encoding="utf-8") as f: + manifest = json.load(f) + except (OSError, ValueError) as e: + logger.warning("Could not read %s: %s", path, e) + return {} + + index: Dict[Tuple[str, str], str] = {} + for entry in manifest.get("artifacts", []) or []: + test = entry.get("test") + instance_id = entry.get("instance_id") + log_url = entry.get("log_url") + if test and instance_id and log_url: + index[(test, instance_id)] = log_url + return index + + def _extract_test_results(summary: Dict[str, Any]) -> Tuple[List[Dict[str, Any]], Optional[str]]: - """Pull per-test status out of the summary dict returned by run_pipeline. + """Pull per-instance test status out of the summary dict. - The summary shape is owned by core/pipeline.create_summary(); this - helper isolates the dependency on its exact field names. + Emits one row per VM instance — matching one ``tests[*]`` row in KCIDB — + and attaches its boot-log URL by joining with + ``/artifacts.json`` on ``(test, instance_id)``. A row + without a manifest entry (boot console never made it to S3) carries + ``log_url=None`` and is still submitted, so KCIDB sees the result. + + Falls back to the legacy per-test aggregation (one row per test name) + when ``summary["vms"]["instances"]`` is absent — keeps older + in-flight summary files and unit tests using the old shape working. + + The second tuple element (legacy ``log_url`` slot) is always ``None``; + log URLs are now per-row and live in ``row["log_url"]``. """ - rows: List[Dict[str, Any]] = [] vms = summary.get("vms", {}) or {} - test_names = vms.get("test_names") or [] - failed_by_test = vms.get("failed_by_test") or {} - for name in test_names: - status = "FAIL" if failed_by_test.get(name) else "PASS" - rows.append({"name": _test_name_to_path(name), "status": status}) + instances = vms.get("instances") + + # Legacy path: no per-instance breakdown -> one row per test name, no URLs. + if not instances: + rows: List[Dict[str, Any]] = [] + test_names = vms.get("test_names") or [] + failed_by_test = vms.get("failed_by_test") or {} + for name in test_names: + status = "FAIL" if failed_by_test.get(name) else "PASS" + rows.append({"name": _test_name_to_path(name), "status": status}) + return rows, None + + url_by_pair = _load_artifact_log_urls(summary.get("run_directory")) + rows = [] + for inst in instances: + test = inst.get("test", "unknown") + instance_id = inst.get("instance_id", "") + rows.append( + { + "name": _test_name_to_path(test), + "status": inst.get("status", "ERROR"), + "instance_id": instance_id, + "log_url": url_by_pair.get((test, instance_id)), + } + ) return rows, None @@ -833,19 +891,34 @@ def _execute_job( per_test = [{"name": "boot.infrastructure", "status": "ERROR"}] log_url = None - test_rows = [ - build_test_row( - origin=self.kcidb_origin, - build_id=build_id, - test_id=f"{node_id}.{idx}", - path=t.get("name", f"test_{idx}"), - status=to_kcidb_status(t.get("status", "error")), - duration_ms=t.get("duration_ms"), - log_url=log_url, - misc={"kernelci_node_id": node_id}, + # Per-row `log_url` (from artifacts.json) supersedes the legacy + # job-level `log_url` returned by the executor. The legacy value is + # only consulted when the row itself does not carry one — true for + # the fallback path in _extract_test_results and for executor crashes. + # When a row carries an instance_id, fold it into the test_id so the + # KCIDB row is stable across retries (instead of positional `.{idx}`). + test_rows = [] + for idx, t in enumerate(per_test or []): + instance_suffix = t.get("instance_id") or str(idx) + test_rows.append( + build_test_row( + origin=self.kcidb_origin, + build_id=build_id, + test_id=f"{node_id}.{instance_suffix}", + path=t.get("name", f"test_{idx}"), + status=to_kcidb_status(t.get("status", "error")), + duration_ms=t.get("duration_ms"), + log_url=t.get("log_url") or log_url, + misc={ + "kernelci_node_id": node_id, + **( + {"instance_id": t["instance_id"]} + if t.get("instance_id") + else {} + ), + }, + ) ) - for idx, t in enumerate(per_test or []) - ] if not test_rows: # No per-test results came back -> the outcome is unknown, itself # an infrastructure failure (-> node result incomplete). diff --git a/tests/test_pull_labs_poller.py b/tests/test_pull_labs_poller.py index 92f7c4d..787c341 100644 --- a/tests/test_pull_labs_poller.py +++ b/tests/test_pull_labs_poller.py @@ -320,6 +320,146 @@ def test_boot_remap_preserves_failure_status(self): assert rows == [{"name": "boot", "status": "FAIL"}] +class TestExtractTestResultsPerInstance: + """When summary["vms"]["instances"] is present the extractor must emit + one row per VM and join boot-log URLs from artifacts.json.""" + + @staticmethod + def _write_manifest(run_dir, artifacts): + path = os.path.join(run_dir, "artifacts.json") + with open(path, "w", encoding="utf-8") as f: + json.dump({"artifacts": artifacts}, f) + return path + + def test_one_row_per_instance_with_log_url(self, tmp_path): + # Two VMs for the same test, both with a manifest entry. + self._write_manifest( + tmp_path, + [ + { + "test": "baseline", + "instance_id": "i-aaa", + "log_url": "https://b.s3.eu-west-1.amazonaws.com/a.log", + "status": "ready", + }, + { + "test": "baseline", + "instance_id": "i-bbb", + "log_url": "https://b.s3.eu-west-1.amazonaws.com/b.log", + "status": "ready", + }, + ], + ) + summary = { + "run_directory": str(tmp_path), + "vms": { + "instances": [ + {"test": "baseline", "instance_id": "i-aaa", "status": "PASS"}, + {"test": "baseline", "instance_id": "i-bbb", "status": "FAIL"}, + ], + }, + } + rows, log = _extract_test_results(summary) + assert log is None + # boot remap applied; instance_id and log_url preserved per row. + assert rows == [ + { + "name": "boot", + "status": "PASS", + "instance_id": "i-aaa", + "log_url": "https://b.s3.eu-west-1.amazonaws.com/a.log", + }, + { + "name": "boot", + "status": "FAIL", + "instance_id": "i-bbb", + "log_url": "https://b.s3.eu-west-1.amazonaws.com/b.log", + }, + ] + + def test_missing_manifest_entry_leaves_log_url_none(self, tmp_path): + # i-bbb's console upload failed -> no manifest entry; the row is + # still emitted with status from the summary, log_url=None. + self._write_manifest( + tmp_path, + [ + { + "test": "baseline", + "instance_id": "i-aaa", + "log_url": "https://b.s3.eu-west-1.amazonaws.com/a.log", + }, + ], + ) + summary = { + "run_directory": str(tmp_path), + "vms": { + "instances": [ + {"test": "baseline", "instance_id": "i-aaa", "status": "PASS"}, + {"test": "baseline", "instance_id": "i-bbb", "status": "FAIL"}, + ], + }, + } + rows, _ = _extract_test_results(summary) + urls = {r["instance_id"]: r["log_url"] for r in rows} + assert urls == {"i-aaa": "https://b.s3.eu-west-1.amazonaws.com/a.log", + "i-bbb": None} + + def test_no_artifacts_json_still_emits_per_instance_rows(self, tmp_path): + # run_directory exists but artifacts.json never got written + # (e.g. collect_run_artifacts failed). Each row has log_url=None. + summary = { + "run_directory": str(tmp_path), + "vms": { + "instances": [ + {"test": "ltp", "instance_id": "i-aaa", "status": "PASS"}, + ], + }, + } + rows, _ = _extract_test_results(summary) + assert rows == [ + {"name": "ltp", "status": "PASS", + "instance_id": "i-aaa", "log_url": None}, + ] + + def test_corrupt_artifacts_json_is_non_fatal(self, tmp_path, caplog): + (tmp_path / "artifacts.json").write_text("{not valid json") + summary = { + "run_directory": str(tmp_path), + "vms": { + "instances": [ + {"test": "ltp", "instance_id": "i-x", "status": "PASS"}, + ], + }, + } + with caplog.at_level(logging.WARNING): + rows, _ = _extract_test_results(summary) + assert rows[0]["log_url"] is None + assert any("artifacts.json" in r.message for r in caplog.records) + + def test_artifacts_join_requires_both_test_and_instance(self, tmp_path): + # Manifest entries lacking test/instance_id/log_url are skipped + # rather than producing partial matches. + self._write_manifest( + tmp_path, + [ + {"test": "ltp", "instance_id": "i-a"}, # no log_url + {"test": "ltp", "log_url": "https://x"}, # no instance_id + {"instance_id": "i-b", "log_url": "https://y"}, # no test + ], + ) + summary = { + "run_directory": str(tmp_path), + "vms": { + "instances": [ + {"test": "ltp", "instance_id": "i-a", "status": "PASS"}, + {"test": "ltp", "instance_id": "i-b", "status": "PASS"}, + ], + }, + } + rows, _ = _extract_test_results(summary) + assert all(r["log_url"] is None for r in rows) + + class TestTestNameToPath: """_test_name_to_path() remaps boot test names to the 'boot' path.""" @@ -541,6 +681,47 @@ def test_translate_failure_finishes_invalid_job_params(self): assert outcome.error_code == "invalid_job_params" assert "missing artifacts.kernel" in outcome.error_msg + def test_per_instance_rows_carry_log_url_and_stable_test_id(self): + """When executor returns per-instance rows with log_url, the submitted + KCIDB rows must each carry that URL and a test_id derived from the + instance_id (not the positional index).""" + per_test = [ + {"name": "boot", "status": "PASS", "instance_id": "i-aaaa1111", + "log_url": "https://b.s3.eu-west-1.amazonaws.com/a.log"}, + {"name": "boot", "status": "FAIL", "instance_id": "i-bbbb2222", + "log_url": "https://b.s3.eu-west-1.amazonaws.com/b.log"}, + ] + p = PullLabsPoller( + _minimal_kc(), + job_executor=lambda cfg: (per_test, None), + ) + seen = {} + with patch.object(p, "_claim_node", return_value=True), \ + patch.object(p, "_finish_node"), \ + patch(_GET, return_value={"artifacts": {}}), \ + patch("kernel_ci_cloud_labs.pull_labs_poller.translate_job", + return_value={}), \ + patch( + "kernel_ci_cloud_labs.pull_labs_poller.submit_tests", + side_effect=lambda url, jwt, origin, build_id, rows: seen.update(rows=rows), + ): + p.process_event(_job_event(node_id="ndX")) + + rows = seen["rows"] + assert len(rows) == 2 + by_id = {r["id"]: r for r in rows} + # test_id derived from instance_id => stable across retries. + assert set(by_id) == {"pullab_cloud_aws:ndX.i-aaaa1111", "pullab_cloud_aws:ndX.i-bbbb2222"} + # Per-row log_url survives the build_test_row pass-through. + assert by_id["pullab_cloud_aws:ndX.i-aaaa1111"]["log_url"] == \ + "https://b.s3.eu-west-1.amazonaws.com/a.log" + assert by_id["pullab_cloud_aws:ndX.i-bbbb2222"]["log_url"] == \ + "https://b.s3.eu-west-1.amazonaws.com/b.log" + # instance_id surfaces in misc for traceability. + assert by_id["pullab_cloud_aws:ndX.i-aaaa1111"]["misc"]["instance_id"] == "i-aaaa1111" + # Aggregated node outcome from per-instance statuses. + # (one fail among two -> fail; verified indirectly via existing tests). + # --------------------------------------------------------------------------- # Default-executor dependency validation