Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 57 additions & 16 deletions src/kernel_ci_cloud_labs/pull_labs_poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@

Long-lived service (or one-shot job) that:
1. Polls kernelci-api /events for new pull-lab jobs.
2. Claims each job node (state=running) so other pollers skip it.
2. Claims each job node by recording its data.job_id — kernelci-api has no
node state usable as a "claimed" marker (see _claim_node).
3. Fetches each job's PULL_LABS job_definition JSON.
4. Translates it into a pullab_cloud run config and runs the pipeline.
5. Submits per-test results directly to KCIDB.
Expand All @@ -38,6 +39,7 @@
import urllib.error
import urllib.parse
import urllib.request
import uuid
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Tuple

Expand Down Expand Up @@ -627,12 +629,37 @@ def _node_url(self, node_id: str) -> str:
return f"{self.api_base_uri.rstrip('/')}/node/{node_id}"

def _claim_node(self, node: Dict[str, Any]) -> bool:
"""Claim a job node by transitioning it to state=running.

Re-reads the node first: if it is no longer "available", another
poller has already taken it, so we skip it. This narrows -- but,
without an atomic compare-and-set in kernelci-api, cannot fully
close -- the window for two pollers claiming the same job.
"""Claim a job node by recording this poller's job id on it.

kernelci-api has no node *state* that can serve as a "claimed"
marker. Its state machine (kernelci-core,
Node.validate_node_state_transition) only permits::

running -> available, closing, done
available -> closing, done
closing -> done

so a job node polled in "available" state cannot be moved to
"running" -- the API rejects it with HTTP 400 "Transition not allowed
with state: running". The only intermediate state reachable from
"available" is "closing", and that is unusable too: kernelci-pipeline
(src/timeout.py, Closing handler) auto-transitions any "closing" node
with no running descendants to "done" -- with no result -- within
~60s, which would finish a multi-minute boot job out from under us.

Instead we claim by writing data.job_id -- the "Runtime job ID" field
of the node's data model (kernelci-core TestData). The pull-lab
poller *is* the runtime, so this is the semantically correct field;
the node stays "available" (available -> available is a no-op
transition) and the value persists because job_id is a declared
field. A node that already carries a data.job_id has been claimed.

The claim is best effort: kernelci-api has no compare-and-set, so the
PUT is a full-document overwrite and two pollers that both read the
node before either writes can each claim it. Parallel pollers must
therefore be partitioned by platform (KERNELCI_PLATFORMS) so they
never compete for the same node; this claim only skips a node already
taken or finished, it cannot guarantee exclusion.

Returns True only if this poller now owns the node.
"""
Expand All @@ -648,16 +675,30 @@ def _claim_node(self, node: Dict[str, Any]) -> bool:
return False
state = current.get("state")
if state != "available":
logger.info("Skipping node %s: already claimed (state=%s)", node_id, state)
logger.info(
"Skipping node %s: no longer available (state=%s)", node_id, state
)
return False
data = current.get("data") or {}
existing = data.get("job_id")
if existing:
logger.info(
"Skipping node %s: already claimed (data.job_id=%s)",
node_id, existing,
)
return False
current["state"] = "running"
job_id = f"{self.runtime_name}:{uuid.uuid4().hex}"
data["job_id"] = job_id
current["data"] = data
payload = {k: v for k, v in current.items() if k not in NODE_READ_ONLY_FIELDS}
try:
# HTTPError is a URLError subclass, so a 400/422 from the PUT is
# caught here too: a failed claim just skips the node.
_http_put_json(url, payload, token=self.api_token)
except (urllib.error.URLError, json.JSONDecodeError) as e:
logger.error("Failed to claim node %s (PUT state=running): %s", node_id, e)
logger.error("Failed to claim node %s (PUT data.job_id): %s", node_id, e)
return False
logger.info("Claimed node %s (state=running)", node_id)
logger.info("Claimed node %s (data.job_id=%s)", node_id, job_id)
return True

def _finish_node(self, node_id: str, outcome: NodeOutcome) -> bool:
Expand Down Expand Up @@ -705,11 +746,11 @@ def _finish_node(self, node_id: str, outcome: NodeOutcome) -> bool:
def process_event(self, event: Dict[str, Any]) -> bool:
"""Process one event end to end. Returns True on success.

The job node is claimed (state=running) before any work starts and
finished (state=done + result, plus error_code/error_msg on an
infrastructure failure) afterwards, whatever the outcome. A node we
fail to claim -- already taken, or an API error -- is skipped
without being run or submitted.
The job node is claimed (data.job_id recorded) before any work
starts and finished (state=done + result, plus error_code/error_msg
on an infrastructure failure) afterwards, whatever the outcome. A
node we cannot claim -- already taken, finished, or an API error --
is skipped without being run or submitted.
"""
node = event.get("node") or {}
node_id = node.get("id")
Expand Down
31 changes: 24 additions & 7 deletions tests/test_pull_labs_poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,21 +371,38 @@ def test_never_returns_incomplete(self):


class TestNodeStateUpdates:
"""_claim_node() / _finish_node() PUT node state to kernelci-api."""
"""_claim_node() records data.job_id; _finish_node() PUTs state=done."""

def test_claim_available_node_puts_running(self):
def test_claim_available_node_records_job_id(self):
# kernelci-api has no claimable *state*, so claiming writes the
# node's data.job_id ("Runtime job ID") and leaves state=available.
p = PullLabsPoller(_minimal_kc())
puts = []
with patch(_GET, return_value={"id": "n1", "state": "available"}), \
with patch(_GET, return_value={"id": "n1", "state": "available", "data": {}}), \
patch(_PUT, side_effect=lambda url, payload, **kw: puts.append((url, payload))):
assert p._claim_node({"id": "n1"}) is True
assert len(puts) == 1
assert puts[0][0].endswith("/node/n1")
assert puts[0][1]["state"] == "running"
# state untouched (available -> available is a no-op transition);
# the claim lives in data.job_id.
assert puts[0][1]["state"] == "available"
assert puts[0][1]["data"]["job_id"]

def test_claim_skips_already_claimed_node(self):
def test_claim_skips_node_already_claimed(self):
# A node that already carries a data.job_id has been picked up.
p = PullLabsPoller(_minimal_kc())
with patch(_GET, return_value={"id": "n1", "state": "running"}), \
with patch(_GET, return_value={
"id": "n1", "state": "available",
"data": {"job_id": "other-poller:abc123"}}), \
patch(_PUT) as put:
assert p._claim_node({"id": "n1"}) is False
put.assert_not_called()

def test_claim_skips_node_no_longer_available(self):
# A node that has moved on from "available" (already finished by the
# pipeline or another poller) is skipped -- without any PUT.
p = PullLabsPoller(_minimal_kc())
with patch(_GET, return_value={"id": "n1", "state": "done"}), \
patch(_PUT) as put:
assert p._claim_node({"id": "n1"}) is False
put.assert_not_called()
Expand All @@ -397,7 +414,7 @@ def test_claim_skips_on_get_error(self):

def test_claim_skips_on_put_error(self):
p = PullLabsPoller(_minimal_kc())
with patch(_GET, return_value={"id": "n1", "state": "available"}), \
with patch(_GET, return_value={"id": "n1", "state": "available", "data": {}}), \
patch(_PUT, side_effect=urllib.error.URLError("boom")):
assert p._claim_node({"id": "n1"}) is False

Expand Down
Loading