From cb5072516416627ffde7198900484c46dda5b9dc Mon Sep 17 00:00:00 2001 From: Rafael Garcia Date: Wed, 3 Jun 2026 10:12:01 -0400 Subject: [PATCH 1/3] feat: route browser telemetry directly to the VM by default Telemetry is now a default direct-to-VM routing subresource. The telemetry stream method path is changed from /browsers/{id}/telemetry to /browsers/{id}/telemetry/stream so it mirrors the browser VM endpoint: when the request is rewritten for direct routing it yields {base_url}/telemetry/stream, which is the SSE stream on the VM (the VM's /telemetry is a different, non-stream JSON endpoint). "telemetry" is added to the default KERNEL_BROWSER_ROUTING_SUBRESOURCES allowlist alongside "curl". Co-Authored-By: Claude Opus 4.8 (1M context) --- src/kernel/lib/browser_routing/routing.py | 2 +- tests/test_browser_routing.py | 24 ++++++++++++++++++++++- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/src/kernel/lib/browser_routing/routing.py b/src/kernel/lib/browser_routing/routing.py index aa84cc1..f0f5e34 100644 --- a/src/kernel/lib/browser_routing/routing.py +++ b/src/kernel/lib/browser_routing/routing.py @@ -41,7 +41,7 @@ class BrowserRoutingConfig: def browser_routing_config_from_env() -> BrowserRoutingConfig: raw = os.environ.get("KERNEL_BROWSER_ROUTING_SUBRESOURCES") if raw is None: - return BrowserRoutingConfig(subresources=("curl",)) + return BrowserRoutingConfig(subresources=("curl", "telemetry")) if raw.strip() == "": return BrowserRoutingConfig() diff --git a/tests/test_browser_routing.py b/tests/test_browser_routing.py index ac84152..6b0c12a 100644 --- a/tests/test_browser_routing.py +++ b/tests/test_browser_routing.py @@ -97,6 +97,28 @@ def test_browser_request_uses_curl_raw() -> None: assert request.url.params.get("jwt") == "token-abc" +@respx.mock +def test_telemetry_stream_routes_directly_to_vm(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("KERNEL_BROWSER_ROUTING_SUBRESOURCES", "telemetry") + route = respx.get("http://browser-session.test/browser/kernel/telemetry/stream").mock( + return_value=httpx.Response( + 200, + headers={"content-type": "text/event-stream"}, + content=b'id: 1\ndata: {"category":"api"}\n\n', + ) + ) + with Kernel(base_url=base_url, api_key=api_key, _strict_response_validation=True) as client: + _cache_browser(client) + stream = client.browsers.telemetry.stream("sess-1") + stream.close() + + assert route.called + request = cast(httpx.Request, cast(Any, route.calls[0]).request) + assert request.url.path == "/browser/kernel/telemetry/stream" + assert request.url.params.get("jwt") == "token-abc" + assert request.headers.get("Authorization") is None + + @respx.mock def test_browser_request_params_cannot_override_target_url_or_jwt() -> None: route = respx.get("http://browser-session.test/browser/kernel/curl/raw").mock( @@ -315,7 +337,7 @@ def test_browser_route_from_browser_requires_base_url_and_jwt() -> None: def test_browser_routing_config_from_env_defaults_to_curl(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.delenv("KERNEL_BROWSER_ROUTING_SUBRESOURCES", raising=False) - assert browser_routing_config_from_env().subresources == ("curl",) + assert browser_routing_config_from_env().subresources == ("curl", "telemetry") def test_browser_routing_config_from_env_empty_string_disables_routing(monkeypatch: pytest.MonkeyPatch) -> None: From 4c29993cdda50fcde19ee7bf696a268cd8b0eb0b Mon Sep 17 00:00:00 2001 From: Rafael Garcia Date: Wed, 3 Jun 2026 10:34:56 -0400 Subject: [PATCH 2/3] feat(examples): add browser-telemetry example Co-Authored-By: Claude Opus 4.8 (1M context) --- examples/browser_telemetry.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 examples/browser_telemetry.py diff --git a/examples/browser_telemetry.py b/examples/browser_telemetry.py new file mode 100644 index 0000000..8d1795c --- /dev/null +++ b/examples/browser_telemetry.py @@ -0,0 +1,32 @@ +"""Example: stream live browser telemetry events from a session.""" + +from kernel import Kernel + + +def main() -> None: + client = Kernel() + + # Enable telemetry capture when creating the browser. + browser = client.browsers.create(telemetry={"enabled": True}) + + try: + # Telemetry is a default direct-to-VM routing subresource, so the stream + # connects straight to the browser VM automatically. + stream = client.browsers.telemetry.stream(browser.session_id) + + # Make a few browser activity calls to generate events. The "api" telemetry + # category emits an event per VM API call, so events arrive within ~1s. + for _ in range(3): + client.browsers.curl(browser.session_id, url="https://example.com", method="GET") + + # Print a few events, then stop so we don't wait on the 15s keepalive. + for count, message in enumerate(stream, start=1): + print(message.seq, message.event.type) + if count >= 3: + break + finally: + client.browsers.delete_by_id(browser.session_id) + + +if __name__ == "__main__": + main() From a0ee2b2653c581839be11bb29be5b68166e80658 Mon Sep 17 00:00:00 2001 From: Rafael Garcia Date: Wed, 3 Jun 2026 11:00:24 -0400 Subject: [PATCH 3/3] fix(streaming): don't dispatch empty SSE keepalive comment frames The SSE decoder's empty-block guard included last_event_id, which is sticky across events per the SSE spec. Once any event carried an id, every subsequent comment-only block (e.g. the server's ":\n\n" keepalive, sent every 15s on an idle stream) fell through the guard and dispatched an empty-data event. The typed Stream wrapper then calls .json() on it unconditionally, raising JSONDecodeError and killing the stream. This made idle browser telemetry streams crash after ~15s. Drop last_event_id from the guard so dispatch depends only on the current block's event/data/retry fields. Event-typed empty-data frames still dispatch (unchanged). Adds a regression test for a keepalive comment following an id-bearing event. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/kernel/_streaming.py | 7 ++++++- tests/test_streaming.py | 26 ++++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/src/kernel/_streaming.py b/src/kernel/_streaming.py index 5520edb..ddca7eb 100644 --- a/src/kernel/_streaming.py +++ b/src/kernel/_streaming.py @@ -251,7 +251,12 @@ def decode(self, line: str) -> ServerSentEvent | None: # See: https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation # noqa: E501 if not line: - if not self._event and not self._data and not self._last_event_id and self._retry is None: + # Whether to dispatch depends only on what was set in the *current* block. last_event_id + # is sticky across events (per the SSE spec, it is intentionally not reset below), so it + # must not be part of this check -- otherwise, once any event carries an id, every + # subsequent comment-only block (e.g. a ``:\n\n`` keepalive) would dispatch an empty + # event, which then fails to JSON-decode in the typed Stream wrapper. + if not self._event and not self._data and self._retry is None: return None sse = ServerSentEvent( diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 4b8e4e4..c2ebd43 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -26,6 +26,32 @@ def body() -> Iterator[bytes]: await assert_empty_iter(iterator) +@pytest.mark.asyncio +@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"]) +async def test_keepalive_comment_after_event_with_id(sync: bool, client: Kernel, async_client: AsyncKernel) -> None: + # A ``:`` comment frame (the server's SSE keepalive) that arrives after an event which set an + # id must be ignored, not dispatched as an empty event. last_event_id is sticky, so this is a + # regression guard against it leaking an undecodable empty frame into the typed stream. + def body() -> Iterator[bytes]: + yield b"id: 1\n" + yield b'data: {"foo":true}\n' + yield b"\n" + yield b":\n" + yield b"\n" + yield b'data: {"bar":false}\n' + yield b"\n" + + iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client) + + sse = await iter_next(iterator) + assert sse.json() == {"foo": True} + + sse = await iter_next(iterator) + assert sse.json() == {"bar": False} + + await assert_empty_iter(iterator) + + @pytest.mark.asyncio @pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"]) async def test_data_missing_event(sync: bool, client: Kernel, async_client: AsyncKernel) -> None: