Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,10 @@ the local OpenAI-compatible server.
- **Stays signed in.** The access token auto-refreshes. If you sign out in the
Chat app, the next request tells you to run `og-veil login`.
- **Survives a dead node.** If the chosen TEE goes offline, it reselects another
from the registry and retries once.
from the registry and retries once. A background loop also re-checks the
registry every few minutes and drops the cached gateway when it rotates out or
rotates its keys, so a registry change can't leave the proxy stuck failing
every request (`OG_VEIL_TEE_REFRESH_INTERVAL`).

## Configuration

Expand All @@ -176,6 +179,7 @@ Session + prefs live in `~/.opengradient/local/` (override with `OG_VEIL_HOME`).
| `OG_VEIL_EXPECTED_PCR_HASH` | `--expected-pcr` | — | Refuse any TEE whose `pcrHash` differs. |
| `OG_VEIL_APP_URL` | `--app-url` | `https://chat.opengradient.ai` | Chat app origin for login. |
| `OG_VEIL_PII_SCRUB` | `--pii-scrub` | off | Redact high-impact PII from prompts locally before they leave the machine. |
| `OG_VEIL_TEE_REFRESH_INTERVAL` | — | `300` | Seconds between background registry re-checks; drops a TEE that rotated out (or rotated keys) so the next request reselects. `0` disables. |

### Local PII redaction (opt-in)

Expand Down
159 changes: 159 additions & 0 deletions tests/test_gateway_refresh.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
"""Tests for the Gateway's periodic TEE refresh.

The gateway caches a TEE + relay client; without a periodic check, a TEE that
rotates out of the registry (or rotates its OHTTP/signing keys) would keep
failing every request until restart, because those failures don't surface as the
network error the reactive retry path looks for. These cover the loop that drops
the stale cache so the next request reselects.
"""

from __future__ import annotations

import time

from opengradient.client.tee_registry import OhttpConfig, TEEEndpoint

from veil.config import ServerConfig
from veil.gateway import Gateway


def _make_tee(
tee_id: str = "0xabc",
public_key: bytes = b"\x01" * 32,
key_id: int = 1,
signing: bytes = b"sign",
) -> TEEEndpoint:
return TEEEndpoint(
tee_id=tee_id,
endpoint="https://gw.example",
tls_cert_der=b"cert",
payment_address="0xpay",
signing_public_key_der=signing,
ohttp_config=OhttpConfig(
key_id=key_id,
kem_id=0x0020,
kdf_id=0x0001,
aead_id=0x0003,
public_key=public_key,
key_config=b"",
registered_at=0,
),
pcr_hash="0x00",
)


class _FakeConfig:
tee_registry_rpc_url = "http://localhost:8545"
tee_registry_address = "0x0000000000000000000000000000000000000000"
chat_api_base_url = "https://chat-api.example"
tee_registry_tee_type = None


class _FakeSession:
config = _FakeConfig()

@staticmethod
def auth_headers() -> dict:
return {}


class _FakeRegistry:
"""Stands in for TEERegistry; returns a canned active-TEE list."""

def __init__(self, active):
self._active = active
self.calls = 0

def get_active_tees_by_type(self, tee_type):
self.calls += 1
return list(self._active)


def _gateway(active, **config_kwargs) -> Gateway:
gw = Gateway(_FakeSession(), ServerConfig(**config_kwargs))
gw._registry = _FakeRegistry(active)
return gw


# --- _tee_still_current --------------------------------------------------------


def test_still_current_when_id_and_keys_match():
tee = _make_tee()
assert Gateway._tee_still_current(tee, [_make_tee()]) is True


def test_not_current_when_rotated_out():
tee = _make_tee(tee_id="0xabc")
assert Gateway._tee_still_current(tee, [_make_tee(tee_id="0xdef")]) is False
assert Gateway._tee_still_current(tee, []) is False


def test_not_current_when_ohttp_key_rotated():
tee = _make_tee(public_key=b"\x01" * 32)
rotated = _make_tee(public_key=b"\x02" * 32)
assert Gateway._tee_still_current(tee, [rotated]) is False


def test_not_current_when_signing_key_rotated():
tee = _make_tee(signing=b"old")
rotated = _make_tee(signing=b"new")
assert Gateway._tee_still_current(tee, [rotated]) is False


# --- _refresh_once -------------------------------------------------------------


def test_refresh_keeps_client_when_tee_unchanged():
gw = _gateway([_make_tee()])
gw._tee = _make_tee()
sentinel = object()
gw._client = sentinel

gw._refresh_once()

assert gw._client is sentinel
assert gw._tee is not None


def test_refresh_drops_client_when_tee_rotated_out():
gw = _gateway([]) # registry no longer lists our TEE
gw._tee = _make_tee()
gw._client = object()

gw._refresh_once()

assert gw._client is None
assert gw._tee is None


def test_refresh_noop_before_any_tee_resolved():
gw = _gateway([_make_tee()])
# No TEE resolved yet — nothing to check, and the registry shouldn't be hit.
gw._refresh_once()
assert gw._registry.calls == 0


# --- loop lifecycle ------------------------------------------------------------


def test_loop_disabled_when_interval_not_positive():
gw = _gateway([_make_tee()], tee_refresh_interval=0)
gw.start_refresh_loop()
assert gw._refresh_thread is None


def test_loop_drops_stale_tee_then_stops():
gw = _gateway([], tee_refresh_interval=0.02)
gw._tee = _make_tee()
gw._client = object()
gw.start_refresh_loop()
try:
deadline = time.monotonic() + 2.0
while gw._tee is not None and time.monotonic() < deadline:
time.sleep(0.01)
assert gw._tee is None
assert gw._client is None
finally:
gw.stop_refresh_loop()
assert gw._refresh_thread is None
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions veil/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ class ServerConfig:
# :mod:`veil.pii`.
pii_scrub: bool = False

# How often (seconds) the background loop re-checks the on-chain registry and
# drops the cached TEE when it has rotated out or rotated its keys, so the next
# request reselects a live gateway instead of hammering a stale one. Mirrors the
# SDK's RegistryTEEConnection refresh cadence. Set <= 0 to disable the loop.
tee_refresh_interval: float = 300.0

@classmethod
def from_env(cls) -> "ServerConfig":
return cls(
Expand All @@ -71,6 +77,7 @@ def from_env(cls) -> "ServerConfig":
expected_pcr_hash=_norm_hex(os.getenv("OG_VEIL_EXPECTED_PCR_HASH")),
pinned_tee_id=_norm_hex(os.getenv("OG_VEIL_TEE_ID")),
pii_scrub=_env_bool(os.getenv("OG_VEIL_PII_SCRUB")),
tee_refresh_interval=float(os.getenv("OG_VEIL_TEE_REFRESH_INTERVAL", "300")),
)

def advertised_base_url(self) -> str:
Expand Down
95 changes: 95 additions & 0 deletions veil/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import logging
import random
import threading
from typing import Sequence

import requests
from opengradient import OhttpRelayClient, TEERegistry, VerifiedChatResponse
Expand All @@ -37,6 +38,10 @@ def __init__(self, session: Session, config: ServerConfig):
self._lock = threading.Lock()
self._client: OhttpRelayClient | None = None
self._tee: TEEEndpoint | None = None
# Background loop that periodically re-checks the registry and drops the
# cached TEE once it rotates out / rotates keys (see ``start_refresh_loop``).
self._stop_refresh = threading.Event()
self._refresh_thread: threading.Thread | None = None
# Optional local PII redaction, applied to the request before it is
# encrypted to the TEE. ``None`` when disabled (the default).
self._redactor = build_redactor(enabled=config.pii_scrub)
Expand Down Expand Up @@ -114,6 +119,96 @@ def reset(self) -> None:
def active_tee(self) -> TEEEndpoint | None:
return self._tee

# --- periodic registry refresh -----------------------------------------
def start_refresh_loop(self) -> None:
"""Start the background loop that re-checks the registry, if enabled.

The cached TEE/client is otherwise only reselected reactively, when a
request raises a network error (see :meth:`chat`). That misses the case the
operator hits in practice: the registry rotates a gateway out (or rotates
its OHTTP/signing keys) while this process holds a stale endpoint. Those
failures surface as ``RelayError``/``VerificationError``, which the reactive
path doesn't retry — so without this loop the proxy keeps hammering a dead
TEE indefinitely. This mirrors the SDK's ``RegistryTEEConnection`` refresh.

Idempotent and a no-op when ``tee_refresh_interval <= 0``.
"""
if self._config.tee_refresh_interval <= 0:
return
if self._refresh_thread is not None and self._refresh_thread.is_alive():
return
self._stop_refresh.clear()
self._refresh_thread = threading.Thread(
target=self._refresh_loop, name="veil-tee-refresh", daemon=True
)
self._refresh_thread.start()

def stop_refresh_loop(self) -> None:
"""Signal the background refresh loop to exit and wait briefly for it."""
self._stop_refresh.set()
thread = self._refresh_thread
if thread is not None:
thread.join(timeout=5.0)
self._refresh_thread = None

def _refresh_loop(self) -> None:
interval = self._config.tee_refresh_interval
# Event.wait doubles as the sleep so ``stop_refresh_loop`` wakes us promptly.
while not self._stop_refresh.wait(interval):
try:
self._refresh_once()
except Exception: # noqa: BLE001 — never let the loop die on a transient error
logger.warning(
"Background TEE refresh failed; will retry next cycle.", exc_info=True
)

def _refresh_once(self) -> None:
"""Drop the cached client if its TEE is no longer active/unchanged in the registry."""
current = self._tee
if current is None:
return # nothing resolved yet; the first request will select one

active = self._registry.get_active_tees_by_type(self._tee_type)
if self._tee_still_current(current, active):
logger.debug(
"Current TEE %s still active and unchanged; no refresh needed.", current.tee_id
)
return

logger.info(
"TEE %s rotated out of the registry (or rotated keys) — dropping cached gateway so the next request reselects.",
current.tee_id,
)
# Only clear if we'd be clearing the same TEE we just inspected; a concurrent
# reactive reset() may already have moved us onto a fresh one.
with self._lock:
if self._tee is current:
self._client = None
self._tee = None

@staticmethod
def _tee_still_current(current: TEEEndpoint, active: Sequence[TEEEndpoint]) -> bool:
"""True if ``current`` is still active with the same key material.

Matching on ``tee_id`` alone isn't enough: a gateway can keep its id while
rotating the OHTTP/HPKE key or its signing key, which silently breaks the
cached client's encryption and signature checks. Compare the exact bits the
cached :class:`OhttpRelayClient` pinned at construction.
"""
want = current.tee_id.lower()
for tee in active:
if tee.tee_id.lower() != want:
continue
same_ohttp = (
tee.ohttp_config is not None
and current.ohttp_config is not None
and tee.ohttp_config.public_key == current.ohttp_config.public_key
and tee.ohttp_config.key_id == current.ohttp_config.key_id
)
same_signing = tee.signing_public_key_der == current.signing_public_key_der
return same_ohttp and same_signing
return False

# --- inference ---------------------------------------------------------
def chat(self, body: dict) -> VerifiedChatResponse:
# Redact PII locally before anything is sealed to the enclave. Done once,
Expand Down
9 changes: 8 additions & 1 deletion veil/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ def serve(config: ServerConfig) -> None:
except GatewayError as exc:
raise SystemExit(f"could not resolve a TEE gateway: {exc}")

# Periodically re-resolve from the registry so a TEE that rotates out (or
# rotates its keys) is dropped instead of failing every request until restart.
gateway.start_refresh_loop()

app = create_app(gateway)
tee = gateway.active_tee
print(
Expand All @@ -163,4 +167,7 @@ def serve(config: ServerConfig) -> None:
f" export OPENAI_API_KEY=og-veil # ignored; the Chat session authenticates\n"
)
# threaded=True so streaming requests don't block health checks / other calls.
app.run(host=config.host, port=config.port, threaded=True)
try:
app.run(host=config.host, port=config.port, threaded=True)
finally:
gateway.stop_refresh_loop()
Loading