diff --git a/README.md b/README.md index 2011fbf..871b9ee 100644 --- a/README.md +++ b/README.md @@ -162,7 +162,10 @@ the local OpenAI-compatible server. - **Stays signed in.** The access token auto-refreshes. If you sign out in the Chat app, the next request tells you to run `og-veil login`. - **Survives a dead node.** If the chosen TEE goes offline, it reselects another - from the registry and retries once. + from the registry and retries once. A background loop also re-checks the + registry every few minutes and drops the cached gateway when it rotates out or + rotates its keys, so a registry change can't leave the proxy stuck failing + every request (`OG_VEIL_TEE_REFRESH_INTERVAL`). ## Configuration @@ -176,6 +179,7 @@ Session + prefs live in `~/.opengradient/local/` (override with `OG_VEIL_HOME`). | `OG_VEIL_EXPECTED_PCR_HASH` | `--expected-pcr` | — | Refuse any TEE whose `pcrHash` differs. | | `OG_VEIL_APP_URL` | `--app-url` | `https://chat.opengradient.ai` | Chat app origin for login. | | `OG_VEIL_PII_SCRUB` | `--pii-scrub` | off | Redact high-impact PII from prompts locally before they leave the machine. | +| `OG_VEIL_TEE_REFRESH_INTERVAL` | — | `300` | Seconds between background registry re-checks; drops a TEE that rotated out (or rotated keys) so the next request reselects. `0` disables. | ### Local PII redaction (opt-in) diff --git a/tests/test_gateway_refresh.py b/tests/test_gateway_refresh.py new file mode 100644 index 0000000..d88d8da --- /dev/null +++ b/tests/test_gateway_refresh.py @@ -0,0 +1,159 @@ +"""Tests for the Gateway's periodic TEE refresh. + +The gateway caches a TEE + relay client; without a periodic check, a TEE that +rotates out of the registry (or rotates its OHTTP/signing keys) would keep +failing every request until restart, because those failures don't surface as the +network error the reactive retry path looks for. These cover the loop that drops +the stale cache so the next request reselects. +""" + +from __future__ import annotations + +import time + +from opengradient.client.tee_registry import OhttpConfig, TEEEndpoint + +from veil.config import ServerConfig +from veil.gateway import Gateway + + +def _make_tee( + tee_id: str = "0xabc", + public_key: bytes = b"\x01" * 32, + key_id: int = 1, + signing: bytes = b"sign", +) -> TEEEndpoint: + return TEEEndpoint( + tee_id=tee_id, + endpoint="https://gw.example", + tls_cert_der=b"cert", + payment_address="0xpay", + signing_public_key_der=signing, + ohttp_config=OhttpConfig( + key_id=key_id, + kem_id=0x0020, + kdf_id=0x0001, + aead_id=0x0003, + public_key=public_key, + key_config=b"", + registered_at=0, + ), + pcr_hash="0x00", + ) + + +class _FakeConfig: + tee_registry_rpc_url = "http://localhost:8545" + tee_registry_address = "0x0000000000000000000000000000000000000000" + chat_api_base_url = "https://chat-api.example" + tee_registry_tee_type = None + + +class _FakeSession: + config = _FakeConfig() + + @staticmethod + def auth_headers() -> dict: + return {} + + +class _FakeRegistry: + """Stands in for TEERegistry; returns a canned active-TEE list.""" + + def __init__(self, active): + self._active = active + self.calls = 0 + + def get_active_tees_by_type(self, tee_type): + self.calls += 1 + return list(self._active) + + +def _gateway(active, **config_kwargs) -> Gateway: + gw = Gateway(_FakeSession(), ServerConfig(**config_kwargs)) + gw._registry = _FakeRegistry(active) + return gw + + +# --- _tee_still_current -------------------------------------------------------- + + +def test_still_current_when_id_and_keys_match(): + tee = _make_tee() + assert Gateway._tee_still_current(tee, [_make_tee()]) is True + + +def test_not_current_when_rotated_out(): + tee = _make_tee(tee_id="0xabc") + assert Gateway._tee_still_current(tee, [_make_tee(tee_id="0xdef")]) is False + assert Gateway._tee_still_current(tee, []) is False + + +def test_not_current_when_ohttp_key_rotated(): + tee = _make_tee(public_key=b"\x01" * 32) + rotated = _make_tee(public_key=b"\x02" * 32) + assert Gateway._tee_still_current(tee, [rotated]) is False + + +def test_not_current_when_signing_key_rotated(): + tee = _make_tee(signing=b"old") + rotated = _make_tee(signing=b"new") + assert Gateway._tee_still_current(tee, [rotated]) is False + + +# --- _refresh_once ------------------------------------------------------------- + + +def test_refresh_keeps_client_when_tee_unchanged(): + gw = _gateway([_make_tee()]) + gw._tee = _make_tee() + sentinel = object() + gw._client = sentinel + + gw._refresh_once() + + assert gw._client is sentinel + assert gw._tee is not None + + +def test_refresh_drops_client_when_tee_rotated_out(): + gw = _gateway([]) # registry no longer lists our TEE + gw._tee = _make_tee() + gw._client = object() + + gw._refresh_once() + + assert gw._client is None + assert gw._tee is None + + +def test_refresh_noop_before_any_tee_resolved(): + gw = _gateway([_make_tee()]) + # No TEE resolved yet — nothing to check, and the registry shouldn't be hit. + gw._refresh_once() + assert gw._registry.calls == 0 + + +# --- loop lifecycle ------------------------------------------------------------ + + +def test_loop_disabled_when_interval_not_positive(): + gw = _gateway([_make_tee()], tee_refresh_interval=0) + gw.start_refresh_loop() + assert gw._refresh_thread is None + + +def test_loop_drops_stale_tee_then_stops(): + gw = _gateway([], tee_refresh_interval=0.02) + gw._tee = _make_tee() + gw._client = object() + gw.start_refresh_loop() + try: + deadline = time.monotonic() + 2.0 + while gw._tee is not None and time.monotonic() < deadline: + time.sleep(0.01) + assert gw._tee is None + assert gw._client is None + finally: + gw.stop_refresh_loop() + assert gw._refresh_thread is None diff --git a/uv.lock b/uv.lock index 3fb649d..022469a 100644 --- a/uv.lock +++ b/uv.lock @@ -2329,7 +2329,7 @@ wheels = [ [[package]] name = "opengradient-veil" -version = "0.2.6" +version = "0.2.7" source = { editable = "." } dependencies = [ { name = "click" }, diff --git a/veil/config.py b/veil/config.py index 5a152d0..56c3f72 100644 --- a/veil/config.py +++ b/veil/config.py @@ -63,6 +63,12 @@ class ServerConfig: # :mod:`veil.pii`. pii_scrub: bool = False + # How often (seconds) the background loop re-checks the on-chain registry and + # drops the cached TEE when it has rotated out or rotated its keys, so the next + # request reselects a live gateway instead of hammering a stale one. Mirrors the + # SDK's RegistryTEEConnection refresh cadence. Set <= 0 to disable the loop. + tee_refresh_interval: float = 300.0 + @classmethod def from_env(cls) -> "ServerConfig": return cls( @@ -71,6 +77,7 @@ def from_env(cls) -> "ServerConfig": expected_pcr_hash=_norm_hex(os.getenv("OG_VEIL_EXPECTED_PCR_HASH")), pinned_tee_id=_norm_hex(os.getenv("OG_VEIL_TEE_ID")), pii_scrub=_env_bool(os.getenv("OG_VEIL_PII_SCRUB")), + tee_refresh_interval=float(os.getenv("OG_VEIL_TEE_REFRESH_INTERVAL", "300")), ) def advertised_base_url(self) -> str: diff --git a/veil/gateway.py b/veil/gateway.py index dcc3841..df0a57b 100644 --- a/veil/gateway.py +++ b/veil/gateway.py @@ -12,6 +12,7 @@ import logging import random import threading +from typing import Sequence import requests from opengradient import OhttpRelayClient, TEERegistry, VerifiedChatResponse @@ -37,6 +38,10 @@ def __init__(self, session: Session, config: ServerConfig): self._lock = threading.Lock() self._client: OhttpRelayClient | None = None self._tee: TEEEndpoint | None = None + # Background loop that periodically re-checks the registry and drops the + # cached TEE once it rotates out / rotates keys (see ``start_refresh_loop``). + self._stop_refresh = threading.Event() + self._refresh_thread: threading.Thread | None = None # Optional local PII redaction, applied to the request before it is # encrypted to the TEE. ``None`` when disabled (the default). self._redactor = build_redactor(enabled=config.pii_scrub) @@ -114,6 +119,96 @@ def reset(self) -> None: def active_tee(self) -> TEEEndpoint | None: return self._tee + # --- periodic registry refresh ----------------------------------------- + def start_refresh_loop(self) -> None: + """Start the background loop that re-checks the registry, if enabled. + + The cached TEE/client is otherwise only reselected reactively, when a + request raises a network error (see :meth:`chat`). That misses the case the + operator hits in practice: the registry rotates a gateway out (or rotates + its OHTTP/signing keys) while this process holds a stale endpoint. Those + failures surface as ``RelayError``/``VerificationError``, which the reactive + path doesn't retry — so without this loop the proxy keeps hammering a dead + TEE indefinitely. This mirrors the SDK's ``RegistryTEEConnection`` refresh. + + Idempotent and a no-op when ``tee_refresh_interval <= 0``. + """ + if self._config.tee_refresh_interval <= 0: + return + if self._refresh_thread is not None and self._refresh_thread.is_alive(): + return + self._stop_refresh.clear() + self._refresh_thread = threading.Thread( + target=self._refresh_loop, name="veil-tee-refresh", daemon=True + ) + self._refresh_thread.start() + + def stop_refresh_loop(self) -> None: + """Signal the background refresh loop to exit and wait briefly for it.""" + self._stop_refresh.set() + thread = self._refresh_thread + if thread is not None: + thread.join(timeout=5.0) + self._refresh_thread = None + + def _refresh_loop(self) -> None: + interval = self._config.tee_refresh_interval + # Event.wait doubles as the sleep so ``stop_refresh_loop`` wakes us promptly. + while not self._stop_refresh.wait(interval): + try: + self._refresh_once() + except Exception: # noqa: BLE001 — never let the loop die on a transient error + logger.warning( + "Background TEE refresh failed; will retry next cycle.", exc_info=True + ) + + def _refresh_once(self) -> None: + """Drop the cached client if its TEE is no longer active/unchanged in the registry.""" + current = self._tee + if current is None: + return # nothing resolved yet; the first request will select one + + active = self._registry.get_active_tees_by_type(self._tee_type) + if self._tee_still_current(current, active): + logger.debug( + "Current TEE %s still active and unchanged; no refresh needed.", current.tee_id + ) + return + + logger.info( + "TEE %s rotated out of the registry (or rotated keys) — dropping cached gateway so the next request reselects.", + current.tee_id, + ) + # Only clear if we'd be clearing the same TEE we just inspected; a concurrent + # reactive reset() may already have moved us onto a fresh one. + with self._lock: + if self._tee is current: + self._client = None + self._tee = None + + @staticmethod + def _tee_still_current(current: TEEEndpoint, active: Sequence[TEEEndpoint]) -> bool: + """True if ``current`` is still active with the same key material. + + Matching on ``tee_id`` alone isn't enough: a gateway can keep its id while + rotating the OHTTP/HPKE key or its signing key, which silently breaks the + cached client's encryption and signature checks. Compare the exact bits the + cached :class:`OhttpRelayClient` pinned at construction. + """ + want = current.tee_id.lower() + for tee in active: + if tee.tee_id.lower() != want: + continue + same_ohttp = ( + tee.ohttp_config is not None + and current.ohttp_config is not None + and tee.ohttp_config.public_key == current.ohttp_config.public_key + and tee.ohttp_config.key_id == current.ohttp_config.key_id + ) + same_signing = tee.signing_public_key_der == current.signing_public_key_der + return same_ohttp and same_signing + return False + # --- inference --------------------------------------------------------- def chat(self, body: dict) -> VerifiedChatResponse: # Redact PII locally before anything is sealed to the enclave. Done once, diff --git a/veil/server.py b/veil/server.py index 859725d..6ecee4f 100644 --- a/veil/server.py +++ b/veil/server.py @@ -152,6 +152,10 @@ def serve(config: ServerConfig) -> None: except GatewayError as exc: raise SystemExit(f"could not resolve a TEE gateway: {exc}") + # Periodically re-resolve from the registry so a TEE that rotates out (or + # rotates its keys) is dropped instead of failing every request until restart. + gateway.start_refresh_loop() + app = create_app(gateway) tee = gateway.active_tee print( @@ -163,4 +167,7 @@ def serve(config: ServerConfig) -> None: f" export OPENAI_API_KEY=og-veil # ignored; the Chat session authenticates\n" ) # threaded=True so streaming requests don't block health checks / other calls. - app.run(host=config.host, port=config.port, threaded=True) + try: + app.run(host=config.host, port=config.port, threaded=True) + finally: + gateway.stop_refresh_loop()