diff --git a/src/kernel_ci_cloud_labs/pull_labs_poller.py b/src/kernel_ci_cloud_labs/pull_labs_poller.py index 3538fa8..be3bf5a 100644 --- a/src/kernel_ci_cloud_labs/pull_labs_poller.py +++ b/src/kernel_ci_cloud_labs/pull_labs_poller.py @@ -13,7 +13,8 @@ Long-lived service (or one-shot job) that: 1. Polls kernelci-api /events for new pull-lab jobs. - 2. Claims each job node (state=running) so other pollers skip it. + 2. Claims each job node by recording its data.job_id — kernelci-api has no + node state usable as a "claimed" marker (see _claim_node). 3. Fetches each job's PULL_LABS job_definition JSON. 4. Translates it into a pullab_cloud run config and runs the pipeline. 5. Submits per-test results directly to KCIDB. @@ -38,6 +39,7 @@ import urllib.error import urllib.parse import urllib.request +import uuid from dataclasses import dataclass from typing import Any, Callable, Dict, List, Optional, Tuple @@ -627,12 +629,37 @@ def _node_url(self, node_id: str) -> str: return f"{self.api_base_uri.rstrip('/')}/node/{node_id}" def _claim_node(self, node: Dict[str, Any]) -> bool: - """Claim a job node by transitioning it to state=running. - - Re-reads the node first: if it is no longer "available", another - poller has already taken it, so we skip it. This narrows -- but, - without an atomic compare-and-set in kernelci-api, cannot fully - close -- the window for two pollers claiming the same job. + """Claim a job node by recording this poller's job id on it. + + kernelci-api has no node *state* that can serve as a "claimed" + marker. Its state machine (kernelci-core, + Node.validate_node_state_transition) only permits:: + + running -> available, closing, done + available -> closing, done + closing -> done + + so a job node polled in "available" state cannot be moved to + "running" -- the API rejects it with HTTP 400 "Transition not allowed + with state: running". The only intermediate state reachable from + "available" is "closing", and that is unusable too: kernelci-pipeline + (src/timeout.py, Closing handler) auto-transitions any "closing" node + with no running descendants to "done" -- with no result -- within + ~60s, which would finish a multi-minute boot job out from under us. + + Instead we claim by writing data.job_id -- the "Runtime job ID" field + of the node's data model (kernelci-core TestData). The pull-lab + poller *is* the runtime, so this is the semantically correct field; + the node stays "available" (available -> available is a no-op + transition) and the value persists because job_id is a declared + field. A node that already carries a data.job_id has been claimed. + + The claim is best effort: kernelci-api has no compare-and-set, so the + PUT is a full-document overwrite and two pollers that both read the + node before either writes can each claim it. Parallel pollers must + therefore be partitioned by platform (KERNELCI_PLATFORMS) so they + never compete for the same node; this claim only skips a node already + taken or finished, it cannot guarantee exclusion. Returns True only if this poller now owns the node. """ @@ -648,16 +675,30 @@ def _claim_node(self, node: Dict[str, Any]) -> bool: return False state = current.get("state") if state != "available": - logger.info("Skipping node %s: already claimed (state=%s)", node_id, state) + logger.info( + "Skipping node %s: no longer available (state=%s)", node_id, state + ) + return False + data = current.get("data") or {} + existing = data.get("job_id") + if existing: + logger.info( + "Skipping node %s: already claimed (data.job_id=%s)", + node_id, existing, + ) return False - current["state"] = "running" + job_id = f"{self.runtime_name}:{uuid.uuid4().hex}" + data["job_id"] = job_id + current["data"] = data payload = {k: v for k, v in current.items() if k not in NODE_READ_ONLY_FIELDS} try: + # HTTPError is a URLError subclass, so a 400/422 from the PUT is + # caught here too: a failed claim just skips the node. _http_put_json(url, payload, token=self.api_token) except (urllib.error.URLError, json.JSONDecodeError) as e: - logger.error("Failed to claim node %s (PUT state=running): %s", node_id, e) + logger.error("Failed to claim node %s (PUT data.job_id): %s", node_id, e) return False - logger.info("Claimed node %s (state=running)", node_id) + logger.info("Claimed node %s (data.job_id=%s)", node_id, job_id) return True def _finish_node(self, node_id: str, outcome: NodeOutcome) -> bool: @@ -705,11 +746,11 @@ def _finish_node(self, node_id: str, outcome: NodeOutcome) -> bool: def process_event(self, event: Dict[str, Any]) -> bool: """Process one event end to end. Returns True on success. - The job node is claimed (state=running) before any work starts and - finished (state=done + result, plus error_code/error_msg on an - infrastructure failure) afterwards, whatever the outcome. A node we - fail to claim -- already taken, or an API error -- is skipped - without being run or submitted. + The job node is claimed (data.job_id recorded) before any work + starts and finished (state=done + result, plus error_code/error_msg + on an infrastructure failure) afterwards, whatever the outcome. A + node we cannot claim -- already taken, finished, or an API error -- + is skipped without being run or submitted. """ node = event.get("node") or {} node_id = node.get("id") diff --git a/tests/test_pull_labs_poller.py b/tests/test_pull_labs_poller.py index 75bc2e9..92f7c4d 100644 --- a/tests/test_pull_labs_poller.py +++ b/tests/test_pull_labs_poller.py @@ -371,21 +371,38 @@ def test_never_returns_incomplete(self): class TestNodeStateUpdates: - """_claim_node() / _finish_node() PUT node state to kernelci-api.""" + """_claim_node() records data.job_id; _finish_node() PUTs state=done.""" - def test_claim_available_node_puts_running(self): + def test_claim_available_node_records_job_id(self): + # kernelci-api has no claimable *state*, so claiming writes the + # node's data.job_id ("Runtime job ID") and leaves state=available. p = PullLabsPoller(_minimal_kc()) puts = [] - with patch(_GET, return_value={"id": "n1", "state": "available"}), \ + with patch(_GET, return_value={"id": "n1", "state": "available", "data": {}}), \ patch(_PUT, side_effect=lambda url, payload, **kw: puts.append((url, payload))): assert p._claim_node({"id": "n1"}) is True assert len(puts) == 1 assert puts[0][0].endswith("/node/n1") - assert puts[0][1]["state"] == "running" + # state untouched (available -> available is a no-op transition); + # the claim lives in data.job_id. + assert puts[0][1]["state"] == "available" + assert puts[0][1]["data"]["job_id"] - def test_claim_skips_already_claimed_node(self): + def test_claim_skips_node_already_claimed(self): + # A node that already carries a data.job_id has been picked up. p = PullLabsPoller(_minimal_kc()) - with patch(_GET, return_value={"id": "n1", "state": "running"}), \ + with patch(_GET, return_value={ + "id": "n1", "state": "available", + "data": {"job_id": "other-poller:abc123"}}), \ + patch(_PUT) as put: + assert p._claim_node({"id": "n1"}) is False + put.assert_not_called() + + def test_claim_skips_node_no_longer_available(self): + # A node that has moved on from "available" (already finished by the + # pipeline or another poller) is skipped -- without any PUT. + p = PullLabsPoller(_minimal_kc()) + with patch(_GET, return_value={"id": "n1", "state": "done"}), \ patch(_PUT) as put: assert p._claim_node({"id": "n1"}) is False put.assert_not_called() @@ -397,7 +414,7 @@ def test_claim_skips_on_get_error(self): def test_claim_skips_on_put_error(self): p = PullLabsPoller(_minimal_kc()) - with patch(_GET, return_value={"id": "n1", "state": "available"}), \ + with patch(_GET, return_value={"id": "n1", "state": "available", "data": {}}), \ patch(_PUT, side_effect=urllib.error.URLError("boom")): assert p._claim_node({"id": "n1"}) is False