diff --git a/src/kernel/_client.py b/src/kernel/_client.py index 537d159..dd430e8 100644 --- a/src/kernel/_client.py +++ b/src/kernel/_client.py @@ -25,7 +25,7 @@ is_mapping_t, get_async_library, ) -from ._compat import cached_property +from ._compat import model_copy, cached_property from ._models import FinalRequestOptions from ._version import __version__ from ._streaming import Stream as Stream, AsyncStream as AsyncStream @@ -39,8 +39,10 @@ BrowserRouteCache, BrowserRoutingConfig, strip_direct_vm_auth, + response_is_browser_gone, rewrite_direct_vm_options, browser_routing_config_from_env, + fallback_session_id_for_options, maybe_evict_browser_route_from_response, maybe_populate_browser_route_cache_from_response, ) @@ -304,6 +306,35 @@ def default_headers(self) -> dict[str, str | Omit]: **self._custom_headers, } + @override + def request( + self, + cast_to: Type[ResponseT], + options: FinalRequestOptions, + *, + stream: bool = False, + stream_cls: type[Stream[Any]] | None = None, + ) -> Any: + # Capture the ORIGINAL (pre-rewrite) options so that, if the routed VM + # reports the browser is gone, we can re-issue the exact same request to + # the control plane. `super().request` rewrites these to target the VM. + original_options = model_copy(options) + fallback_session_id = fallback_session_id_for_options( + original_options, cache=self.browser_route_cache, config=self._browser_routing + ) + try: + return super().request(cast_to, options, stream=stream, stream_cls=stream_cls) + except APIStatusError as err: + if fallback_session_id is None or not response_is_browser_gone(err.response): + raise + # The browser is authoritatively gone: evict its cached route so the + # re-issued request is NOT rewritten back to the (dead) VM, then hit + # the control plane exactly once with the original request. The route + # is gone, so `_prepare_options` is a no-op, Authorization is restored + # by the normal auth flow, and the jwt query param is dropped. + self.browser_route_cache.delete(fallback_session_id) + return super().request(cast_to, model_copy(original_options), stream=stream, stream_cls=stream_cls) + @override def _prepare_options(self, options: Any) -> Any: options = cast(Any, super()._prepare_options(options)) @@ -635,6 +666,37 @@ def default_headers(self) -> dict[str, str | Omit]: **self._custom_headers, } + @override + async def request( + self, + cast_to: Type[ResponseT], + options: FinalRequestOptions, + *, + stream: bool = False, + stream_cls: type[AsyncStream[Any]] | None = None, + ) -> Any: + # Capture the ORIGINAL (pre-rewrite) options so that, if the routed VM + # reports the browser is gone, we can re-issue the exact same request to + # the control plane. `super().request` rewrites these to target the VM. + original_options = model_copy(options) + fallback_session_id = fallback_session_id_for_options( + original_options, cache=self.browser_route_cache, config=self._browser_routing + ) + try: + return await super().request(cast_to, options, stream=stream, stream_cls=stream_cls) + except APIStatusError as err: + if fallback_session_id is None or not response_is_browser_gone(err.response): + raise + # The browser is authoritatively gone: evict its cached route so the + # re-issued request is NOT rewritten back to the (dead) VM, then hit + # the control plane exactly once with the original request. The route + # is gone, so `_prepare_options` is a no-op, Authorization is restored + # by the normal auth flow, and the jwt query param is dropped. + self.browser_route_cache.delete(fallback_session_id) + return await super().request( + cast_to, model_copy(original_options), stream=stream, stream_cls=stream_cls + ) + @override async def _prepare_options(self, options: Any) -> Any: options = cast(Any, await super()._prepare_options(options)) diff --git a/src/kernel/lib/browser_routing/routing.py b/src/kernel/lib/browser_routing/routing.py index f0f5e34..be7c052 100644 --- a/src/kernel/lib/browser_routing/routing.py +++ b/src/kernel/lib/browser_routing/routing.py @@ -37,6 +37,54 @@ class BrowserRoutingConfig: _BROWSER_POOL_ACQUIRE_PATH = re.compile(r"^/(?:v\d+/)?browser_pools/[^/]+/acquire/?$") _BROWSER_POOL_RELEASE_PATH = re.compile(r"^/(?:v\d+/)?browser_pools/[^/]+/release/?$") +# Body code returned by the VM proxy (metro-api, kernel#2317) when a routed +# request targets a DELETED/GONE browser. There is intentionally no special +# response header: we key off this body code only. A live VM's own 404s do not +# carry this code, and transient/real upstream failures return 5xx instead. +BROWSER_GONE_CODE = "browser_gone" + +# Registry of routed paths that are ELIGIBLE for control-plane fallback when the +# VM reports the browser is gone (404 + code == "browser_gone"). Eligibility is +# expressed against the parsed routed path as (subresource, suffix). Everything +# not listed here is default-OFF: a browser_gone 404 on a non-eligible path +# propagates unchanged. Adding a future eligible endpoint is a one-line edit. +_FALLBACK_ELIGIBLE_ROUTED_PATHS: frozenset[tuple[str, str]] = frozenset( + { + # PROSPECTIVE: GET /browsers/{id}/telemetry/events. The pull endpoint / + # `telemetry.events(...)` method does NOT exist yet; this pre-registers + # the opt-in so control-plane fallback works the moment that method + # ships, with no further routing-layer changes required. + ("telemetry", "/events"), + } +) + + +def is_fallback_eligible_routed_path(subresource: str, suffix: str) -> bool: + """Return True if a routed path opted into control-plane fallback. + + `subresource` and `suffix` are the components produced by + `match_direct_vm_path` for a `/browsers/{id}/{subresource}{suffix}` URL. + """ + return (subresource, suffix) in _FALLBACK_ELIGIBLE_ROUTED_PATHS + + +def response_is_browser_gone(response: httpx.Response) -> bool: + """Return True iff a 404 response body has JSON code == "browser_gone". + + Only call this for a 404. The body is read defensively; any + parse/shape problem is treated as "not browser_gone" so the original + response propagates unchanged. + """ + if response.status_code != 404: + return False + try: + body = response.json() + except Exception: + return False + if not isinstance(body, Mapping): + return False + return cast(Mapping[object, object], body).get("code") == BROWSER_GONE_CODE + def browser_routing_config_from_env() -> BrowserRoutingConfig: raw = os.environ.get("KERNEL_BROWSER_ROUTING_SUBRESOURCES") @@ -216,6 +264,42 @@ def rewrite_direct_vm_options( return rewritten +def fallback_session_id_for_options( + options: FinalRequestOptions, + *, + cache: BrowserRouteCache, + config: BrowserRoutingConfig, +) -> str | None: + """Return the session id to fall back for, or None if not eligible. + + Decides — from the ORIGINAL (pre-rewrite) request options — whether a + control-plane fallback is permitted. All must hold: + 1. the request was actually routed to the VM (allowlisted subresource + + a cached route exists for the session); + 2. the HTTP method is GET; + 3. the routed path is in the fallback-eligible registry. + + The caller is still responsible for confirming the VM returned a + browser_gone 404 before acting on the returned session id. + """ + if options.method.upper() != "GET": + return None + + match = match_direct_vm_path(options.url) + if match is None: + return None + + session_id, subresource, suffix = match + if subresource not in set(config.subresources): + return None + if cache.get(session_id) is None: + return None + if not is_fallback_eligible_routed_path(subresource, suffix): + return None + + return session_id + + def strip_direct_vm_auth(request: httpx.Request, *, cache: BrowserRouteCache) -> None: raw = str(request.url) for route in cache.values(): diff --git a/tests/test_browser_routing.py b/tests/test_browser_routing.py index 6b0c12a..91a2371 100644 --- a/tests/test_browser_routing.py +++ b/tests/test_browser_routing.py @@ -7,7 +7,13 @@ import respx import pytest -from kernel import Kernel, AsyncKernel, InternalServerError +from kernel import ( + Kernel, + AsyncKernel, + NotFoundError, + APIConnectionError, + InternalServerError, +) from kernel.lib.browser_routing.util import jwt_from_cdp_ws_url from kernel.lib.browser_routing.routing import ( BrowserRoute, @@ -343,3 +349,228 @@ def test_browser_routing_config_from_env_defaults_to_curl(monkeypatch: pytest.Mo def test_browser_routing_config_from_env_empty_string_disables_routing(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setenv("KERNEL_BROWSER_ROUTING_SUBRESOURCES", "") assert browser_routing_config_from_env().subresources == () + + +# --------------------------------------------------------------------------- +# Control-plane fallback (browser_gone 404) — see kernel#2317. +# +# The prospective eligible endpoint is GET /browsers/{id}/telemetry/events. +# That SDK method does not exist yet, so these tests exercise the routed GET +# via the low-level `client.get(...)` against that exact path. `telemetry` +# routing is enabled locally/explicitly per test (the default-subresources +# constant is intentionally NOT modified by this PR). +# --------------------------------------------------------------------------- + +_EVENTS_PATH = "/browsers/sess-1/telemetry/events" +_VM_EVENTS_URL = "http://browser-session.test/browser/kernel/telemetry/events" +_GONE_BODY = {"code": "browser_gone", "message": "browser not found"} + + +def test_telemetry_events_is_fallback_eligible() -> None: + from kernel.lib.browser_routing.routing import is_fallback_eligible_routed_path + + assert is_fallback_eligible_routed_path("telemetry", "/events") is True + assert is_fallback_eligible_routed_path("telemetry", "/stream") is False + assert is_fallback_eligible_routed_path("process", "/exec") is False + + +@respx.mock +def test_eligible_get_browser_gone_falls_back_to_control_plane(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("KERNEL_BROWSER_ROUTING_SUBRESOURCES", "telemetry") + vm_route = respx.get(_VM_EVENTS_URL).mock(return_value=httpx.Response(404, json=_GONE_BODY)) + cp_route = respx.get(f"{base_url}{_EVENTS_PATH}").mock(return_value=httpx.Response(200, json={"events": []})) + + with Kernel(base_url=base_url, api_key=api_key, _strict_response_validation=True) as client: + _cache_browser(client) + response = client.get(_EVENTS_PATH, cast_to=httpx.Response) + + # VM hit exactly once, control plane hit exactly once (no loop). + assert vm_route.call_count == 1 + assert cp_route.call_count == 1 + assert response.status_code == 200 + assert response.json() == {"events": []} + + # Control-plane re-issue restores Authorization and drops the jwt param. + cp_request = cast(httpx.Request, cast(Any, cp_route.calls[0]).request) + assert cp_request.headers.get("Authorization") == f"Bearer {api_key}" + assert cp_request.url.params.get("jwt") is None + + # The dead route is evicted authoritatively. + assert client.browser_route_cache.get("sess-1") is None + + +@respx.mock +def test_eligible_get_browser_gone_then_cp_errors_returns_as_is_no_loop(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("KERNEL_BROWSER_ROUTING_SUBRESOURCES", "telemetry") + vm_route = respx.get(_VM_EVENTS_URL).mock(return_value=httpx.Response(404, json=_GONE_BODY)) + cp_route = respx.get(f"{base_url}{_EVENTS_PATH}").mock(return_value=httpx.Response(500, json={"error": "boom"})) + + with Kernel(base_url=base_url, api_key=api_key, max_retries=0, _strict_response_validation=True) as client: + _cache_browser(client) + with pytest.raises(InternalServerError): + client.get(_EVENTS_PATH, cast_to=httpx.Response) + + # VM once, control plane once — the CP error is surfaced, never retried/looped. + assert vm_route.call_count == 1 + assert cp_route.call_count == 1 + + +@respx.mock +def test_non_eligible_path_browser_gone_does_not_fall_back(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("KERNEL_BROWSER_ROUTING_SUBRESOURCES", "telemetry") + vm_route = respx.get("http://browser-session.test/browser/kernel/telemetry/stream").mock( + return_value=httpx.Response(404, json=_GONE_BODY) + ) + cp_route = respx.get(f"{base_url}/browsers/sess-1/telemetry/stream").mock( + return_value=httpx.Response(200, json={"ok": True}) + ) + + with Kernel(base_url=base_url, api_key=api_key, _strict_response_validation=True) as client: + _cache_browser(client) + with pytest.raises(NotFoundError): + client.get("/browsers/sess-1/telemetry/stream", cast_to=httpx.Response) + + # The VM 404 propagates unchanged; no control-plane fallback; route kept. + assert vm_route.call_count == 1 + assert cp_route.call_count == 0 + assert client.browser_route_cache.get("sess-1") is not None + + +@respx.mock +def test_eligible_get_transient_5xx_does_not_fall_back(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("KERNEL_BROWSER_ROUTING_SUBRESOURCES", "telemetry") + vm_route = respx.get(_VM_EVENTS_URL).mock(return_value=httpx.Response(502, json={"error": "bad gateway"})) + cp_route = respx.get(f"{base_url}{_EVENTS_PATH}").mock(return_value=httpx.Response(200, json={"events": []})) + + with Kernel(base_url=base_url, api_key=api_key, max_retries=0, _strict_response_validation=True) as client: + _cache_browser(client) + with pytest.raises(InternalServerError): + client.get(_EVENTS_PATH, cast_to=httpx.Response) + + # Transient 5xx is just returned; we do NOT retry the dead VM then fall back. + assert vm_route.call_count == 1 + assert cp_route.call_count == 0 + assert client.browser_route_cache.get("sess-1") is not None + + +@respx.mock +def test_eligible_get_connection_error_does_not_fall_back(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("KERNEL_BROWSER_ROUTING_SUBRESOURCES", "telemetry") + vm_route = respx.get(_VM_EVENTS_URL).mock(side_effect=httpx.ConnectError("nope")) + cp_route = respx.get(f"{base_url}{_EVENTS_PATH}").mock(return_value=httpx.Response(200, json={"events": []})) + + with Kernel(base_url=base_url, api_key=api_key, max_retries=0, _strict_response_validation=True) as client: + _cache_browser(client) + with pytest.raises(APIConnectionError): + client.get(_EVENTS_PATH, cast_to=httpx.Response) + + assert vm_route.call_count == 1 + assert cp_route.call_count == 0 + + +@respx.mock +def test_eligible_get_success_does_not_fall_back(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("KERNEL_BROWSER_ROUTING_SUBRESOURCES", "telemetry") + vm_route = respx.get(_VM_EVENTS_URL).mock(return_value=httpx.Response(200, json={"events": [1, 2]})) + cp_route = respx.get(f"{base_url}{_EVENTS_PATH}").mock(return_value=httpx.Response(200, json={"events": []})) + + with Kernel(base_url=base_url, api_key=api_key, _strict_response_validation=True) as client: + _cache_browser(client) + response = client.get(_EVENTS_PATH, cast_to=httpx.Response) + + assert vm_route.call_count == 1 + assert cp_route.call_count == 0 + assert response.json() == {"events": [1, 2]} + assert client.browser_route_cache.get("sess-1") is not None + + +@respx.mock +def test_eligible_but_post_does_not_fall_back(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("KERNEL_BROWSER_ROUTING_SUBRESOURCES", "telemetry") + vm_route = respx.post(_VM_EVENTS_URL).mock(return_value=httpx.Response(404, json=_GONE_BODY)) + cp_route = respx.post(f"{base_url}{_EVENTS_PATH}").mock(return_value=httpx.Response(200, json={"events": []})) + + with Kernel(base_url=base_url, api_key=api_key, _strict_response_validation=True) as client: + _cache_browser(client) + with pytest.raises(NotFoundError): + client.post(_EVENTS_PATH, cast_to=httpx.Response, body={}) + + # POST is not GET: browser_gone 404 propagates, no fallback. + assert vm_route.call_count == 1 + assert cp_route.call_count == 0 + + +@respx.mock +def test_eligible_get_plain_404_without_browser_gone_does_not_fall_back(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("KERNEL_BROWSER_ROUTING_SUBRESOURCES", "telemetry") + vm_route = respx.get(_VM_EVENTS_URL).mock( + return_value=httpx.Response(404, json={"code": "not_found", "message": "nope"}) + ) + cp_route = respx.get(f"{base_url}{_EVENTS_PATH}").mock(return_value=httpx.Response(200, json={"events": []})) + + with Kernel(base_url=base_url, api_key=api_key, _strict_response_validation=True) as client: + _cache_browser(client) + with pytest.raises(NotFoundError): + client.get(_EVENTS_PATH, cast_to=httpx.Response) + + # A live VM's own 404 (no browser_gone code) propagates unchanged. + assert vm_route.call_count == 1 + assert cp_route.call_count == 0 + assert client.browser_route_cache.get("sess-1") is not None + + +@respx.mock +def test_non_routed_request_untouched_on_browser_gone(monkeypatch: pytest.MonkeyPatch) -> None: + # No cached route -> request is never routed to a VM. Even a verbatim + # browser_gone 404 from the control plane must NOT trigger any fallback loop. + monkeypatch.setenv("KERNEL_BROWSER_ROUTING_SUBRESOURCES", "telemetry") + cp_route = respx.get(f"{base_url}{_EVENTS_PATH}").mock(return_value=httpx.Response(404, json=_GONE_BODY)) + + with Kernel(base_url=base_url, api_key=api_key, _strict_response_validation=True) as client: + with pytest.raises(NotFoundError): + client.get(_EVENTS_PATH, cast_to=httpx.Response) + + assert cp_route.call_count == 1 + + +@pytest.mark.asyncio +@respx.mock +async def test_async_eligible_get_browser_gone_falls_back_to_control_plane(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("KERNEL_BROWSER_ROUTING_SUBRESOURCES", "telemetry") + vm_route = respx.get(_VM_EVENTS_URL).mock(return_value=httpx.Response(404, json=_GONE_BODY)) + cp_route = respx.get(f"{base_url}{_EVENTS_PATH}").mock(return_value=httpx.Response(200, json={"events": []})) + + async with AsyncKernel(base_url=base_url, api_key=api_key, _strict_response_validation=True) as client: + route = browser_route_from_browser(_fake_browser()) + assert route is not None + client.browser_route_cache.set(route) + response = await client.get(_EVENTS_PATH, cast_to=httpx.Response) + + assert vm_route.call_count == 1 + assert cp_route.call_count == 1 + assert response.json() == {"events": []} + cp_request = cast(httpx.Request, cast(Any, cp_route.calls[0]).request) + assert cp_request.headers.get("Authorization") == f"Bearer {api_key}" + assert cp_request.url.params.get("jwt") is None + assert client.browser_route_cache.get("sess-1") is None + + +@pytest.mark.asyncio +@respx.mock +async def test_async_eligible_get_transient_5xx_does_not_fall_back(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("KERNEL_BROWSER_ROUTING_SUBRESOURCES", "telemetry") + vm_route = respx.get(_VM_EVENTS_URL).mock(return_value=httpx.Response(503, json={"error": "unavailable"})) + cp_route = respx.get(f"{base_url}{_EVENTS_PATH}").mock(return_value=httpx.Response(200, json={"events": []})) + + async with AsyncKernel( + base_url=base_url, api_key=api_key, max_retries=0, _strict_response_validation=True + ) as client: + route = browser_route_from_browser(_fake_browser()) + assert route is not None + client.browser_route_cache.set(route) + with pytest.raises(InternalServerError): + await client.get(_EVENTS_PATH, cast_to=httpx.Response) + + assert vm_route.call_count == 1 + assert cp_route.call_count == 0 + assert client.browser_route_cache.get("sess-1") is not None