diff --git a/examples/browser_telemetry.py b/examples/browser_telemetry.py new file mode 100644 index 0000000..8d1795c --- /dev/null +++ b/examples/browser_telemetry.py @@ -0,0 +1,32 @@ +"""Example: stream live browser telemetry events from a session.""" + +from kernel import Kernel + + +def main() -> None: + client = Kernel() + + # Enable telemetry capture when creating the browser. + browser = client.browsers.create(telemetry={"enabled": True}) + + try: + # Telemetry is a default direct-to-VM routing subresource, so the stream + # connects straight to the browser VM automatically. + stream = client.browsers.telemetry.stream(browser.session_id) + + # Make a few browser activity calls to generate events. The "api" telemetry + # category emits an event per VM API call, so events arrive within ~1s. + for _ in range(3): + client.browsers.curl(browser.session_id, url="https://example.com", method="GET") + + # Print a few events, then stop so we don't wait on the 15s keepalive. + for count, message in enumerate(stream, start=1): + print(message.seq, message.event.type) + if count >= 3: + break + finally: + client.browsers.delete_by_id(browser.session_id) + + +if __name__ == "__main__": + main() diff --git a/src/kernel/_streaming.py b/src/kernel/_streaming.py index 5520edb..ddca7eb 100644 --- a/src/kernel/_streaming.py +++ b/src/kernel/_streaming.py @@ -251,7 +251,12 @@ def decode(self, line: str) -> ServerSentEvent | None: # See: https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation # noqa: E501 if not line: - if not self._event and not self._data and not self._last_event_id and self._retry is None: + # Whether to dispatch depends only on what was set in the *current* block. last_event_id + # is sticky across events (per the SSE spec, it is intentionally not reset below), so it + # must not be part of this check -- otherwise, once any event carries an id, every + # subsequent comment-only block (e.g. a ``:\n\n`` keepalive) would dispatch an empty + # event, which then fails to JSON-decode in the typed Stream wrapper. + if not self._event and not self._data and self._retry is None: return None sse = ServerSentEvent( diff --git a/src/kernel/lib/browser_routing/routing.py b/src/kernel/lib/browser_routing/routing.py index aa84cc1..f0f5e34 100644 --- a/src/kernel/lib/browser_routing/routing.py +++ b/src/kernel/lib/browser_routing/routing.py @@ -41,7 +41,7 @@ class BrowserRoutingConfig: def browser_routing_config_from_env() -> BrowserRoutingConfig: raw = os.environ.get("KERNEL_BROWSER_ROUTING_SUBRESOURCES") if raw is None: - return BrowserRoutingConfig(subresources=("curl",)) + return BrowserRoutingConfig(subresources=("curl", "telemetry")) if raw.strip() == "": return BrowserRoutingConfig() diff --git a/tests/test_browser_routing.py b/tests/test_browser_routing.py index ac84152..6b0c12a 100644 --- a/tests/test_browser_routing.py +++ b/tests/test_browser_routing.py @@ -97,6 +97,28 @@ def test_browser_request_uses_curl_raw() -> None: assert request.url.params.get("jwt") == "token-abc" +@respx.mock +def test_telemetry_stream_routes_directly_to_vm(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("KERNEL_BROWSER_ROUTING_SUBRESOURCES", "telemetry") + route = respx.get("http://browser-session.test/browser/kernel/telemetry/stream").mock( + return_value=httpx.Response( + 200, + headers={"content-type": "text/event-stream"}, + content=b'id: 1\ndata: {"category":"api"}\n\n', + ) + ) + with Kernel(base_url=base_url, api_key=api_key, _strict_response_validation=True) as client: + _cache_browser(client) + stream = client.browsers.telemetry.stream("sess-1") + stream.close() + + assert route.called + request = cast(httpx.Request, cast(Any, route.calls[0]).request) + assert request.url.path == "/browser/kernel/telemetry/stream" + assert request.url.params.get("jwt") == "token-abc" + assert request.headers.get("Authorization") is None + + @respx.mock def test_browser_request_params_cannot_override_target_url_or_jwt() -> None: route = respx.get("http://browser-session.test/browser/kernel/curl/raw").mock( @@ -315,7 +337,7 @@ def test_browser_route_from_browser_requires_base_url_and_jwt() -> None: def test_browser_routing_config_from_env_defaults_to_curl(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.delenv("KERNEL_BROWSER_ROUTING_SUBRESOURCES", raising=False) - assert browser_routing_config_from_env().subresources == ("curl",) + assert browser_routing_config_from_env().subresources == ("curl", "telemetry") def test_browser_routing_config_from_env_empty_string_disables_routing(monkeypatch: pytest.MonkeyPatch) -> None: diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 4b8e4e4..c2ebd43 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -26,6 +26,32 @@ def body() -> Iterator[bytes]: await assert_empty_iter(iterator) +@pytest.mark.asyncio +@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"]) +async def test_keepalive_comment_after_event_with_id(sync: bool, client: Kernel, async_client: AsyncKernel) -> None: + # A ``:`` comment frame (the server's SSE keepalive) that arrives after an event which set an + # id must be ignored, not dispatched as an empty event. last_event_id is sticky, so this is a + # regression guard against it leaking an undecodable empty frame into the typed stream. + def body() -> Iterator[bytes]: + yield b"id: 1\n" + yield b'data: {"foo":true}\n' + yield b"\n" + yield b":\n" + yield b"\n" + yield b'data: {"bar":false}\n' + yield b"\n" + + iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client) + + sse = await iter_next(iterator) + assert sse.json() == {"foo": True} + + sse = await iter_next(iterator) + assert sse.json() == {"bar": False} + + await assert_empty_iter(iterator) + + @pytest.mark.asyncio @pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"]) async def test_data_missing_event(sync: bool, client: Kernel, async_client: AsyncKernel) -> None: