Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 46 additions & 8 deletions src/kernel_ci_cloud_labs/pull_labs_poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,26 @@
ENV_BASE_CONFIG = "PULLAB_BASE_CONFIG"


# System and read-only database-managed fields on node objects.
# These fields are omitted when updating a node (PUT /node/<id>) to prevent
# FastAPI/Pydantic validation errors (HTTP 400 Bad Request / 422 Unprocessable Entity)
# since they are read-only and not accepted in the update schema.
NODE_READ_ONLY_FIELDS = {
"id",
"_id",
"created",
"updated",
"user",
"user_groups",
"owner",
"submitter",
"treeid",
"processed_by_kcidb_bridge",
"retry_counter",
"timeout",
}


def _parse_kcidb_rest(env_value: str) -> Tuple[Optional[str], Optional[str]]:
"""Parse a KCIDB_REST URL of the form https://<token>@<host>[/path].

Expand Down Expand Up @@ -115,9 +135,17 @@ def _http_get_json(url: str, token: Optional[str] = None, timeout: float = 30.0)
if token:
headers["Authorization"] = f"Bearer {token}"
req = urllib.request.Request(url, method="GET", headers=headers)
with urllib.request.urlopen(req, timeout=timeout) as resp:
body = resp.read().decode("utf-8", errors="replace")
return json.loads(body) if body else None
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
body = resp.read().decode("utf-8", errors="replace")
return json.loads(body) if body else None
except urllib.error.HTTPError as e:
try:
err_body = e.read().decode("utf-8", errors="replace")
logger.error("HTTP GET to %s failed (HTTP %s): %s. Response body: %s", url, e.code, e.reason, err_body)
except Exception:
pass
raise


def _http_put_json(
Expand All @@ -136,9 +164,17 @@ def _http_put_json(
headers["Authorization"] = f"Bearer {token}"
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(url, data=body, method="PUT", headers=headers)
with urllib.request.urlopen(req, timeout=timeout) as resp:
resp_body = resp.read().decode("utf-8", errors="replace")
return json.loads(resp_body) if resp_body else None
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
resp_body = resp.read().decode("utf-8", errors="replace")
return json.loads(resp_body) if resp_body else None
except urllib.error.HTTPError as e:
try:
err_body = e.read().decode("utf-8", errors="replace")
logger.error("HTTP PUT to %s failed (HTTP %s): %s. Response body: %s", url, e.code, e.reason, err_body)
except Exception:
pass
raise


def _validate_api_token(
Expand Down Expand Up @@ -615,8 +651,9 @@ def _claim_node(self, node: Dict[str, Any]) -> bool:
logger.info("Skipping node %s: already claimed (state=%s)", node_id, state)
return False
current["state"] = "running"
payload = {k: v for k, v in current.items() if k not in NODE_READ_ONLY_FIELDS}
try:
_http_put_json(url, current, token=self.api_token)
_http_put_json(url, payload, token=self.api_token)
except (urllib.error.URLError, json.JSONDecodeError) as e:
logger.error("Failed to claim node %s (PUT state=running): %s", node_id, e)
return False
Expand Down Expand Up @@ -647,8 +684,9 @@ def _finish_node(self, node_id: str, outcome: NodeOutcome) -> bool:
data["error_code"] = outcome.error_code
data["error_msg"] = outcome.error_msg
current["data"] = data
payload = {k: v for k, v in current.items() if k not in NODE_READ_ONLY_FIELDS}
try:
_http_put_json(url, current, token=self.api_token)
_http_put_json(url, payload, token=self.api_token)
except (urllib.error.URLError, json.JSONDecodeError) as e:
logger.error(
"Failed to finish node %s (PUT state=done result=%s): %s",
Expand Down
Loading