Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions examples/browser_telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Example: stream live browser telemetry events from a session."""

from kernel import Kernel


def main() -> None:
client = Kernel()

# Enable telemetry capture when creating the browser.
browser = client.browsers.create(telemetry={"enabled": True})

try:
# Telemetry is a default direct-to-VM routing subresource, so the stream
# connects straight to the browser VM automatically.
stream = client.browsers.telemetry.stream(browser.session_id)

# Make a few browser activity calls to generate events. The "api" telemetry
# category emits an event per VM API call, so events arrive within ~1s.
for _ in range(3):
client.browsers.curl(browser.session_id, url="https://example.com", method="GET")

# Print a few events, then stop so we don't wait on the 15s keepalive.
for count, message in enumerate(stream, start=1):
print(message.seq, message.event.type)
if count >= 3:
break
finally:
client.browsers.delete_by_id(browser.session_id)


if __name__ == "__main__":
main()
7 changes: 6 additions & 1 deletion src/kernel/_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,12 @@ def decode(self, line: str) -> ServerSentEvent | None:
# See: https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation # noqa: E501

if not line:
if not self._event and not self._data and not self._last_event_id and self._retry is None:
# Whether to dispatch depends only on what was set in the *current* block. last_event_id
# is sticky across events (per the SSE spec, it is intentionally not reset below), so it
# must not be part of this check -- otherwise, once any event carries an id, every
# subsequent comment-only block (e.g. a ``:\n\n`` keepalive) would dispatch an empty
# event, which then fails to JSON-decode in the typed Stream wrapper.
if not self._event and not self._data and self._retry is None:
return None

sse = ServerSentEvent(
Expand Down
2 changes: 1 addition & 1 deletion src/kernel/lib/browser_routing/routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class BrowserRoutingConfig:
def browser_routing_config_from_env() -> BrowserRoutingConfig:
raw = os.environ.get("KERNEL_BROWSER_ROUTING_SUBRESOURCES")
if raw is None:
return BrowserRoutingConfig(subresources=("curl",))
return BrowserRoutingConfig(subresources=("curl", "telemetry"))
if raw.strip() == "":
return BrowserRoutingConfig()

Expand Down
24 changes: 23 additions & 1 deletion tests/test_browser_routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,28 @@ def test_browser_request_uses_curl_raw() -> None:
assert request.url.params.get("jwt") == "token-abc"


@respx.mock
def test_telemetry_stream_routes_directly_to_vm(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("KERNEL_BROWSER_ROUTING_SUBRESOURCES", "telemetry")
route = respx.get("http://browser-session.test/browser/kernel/telemetry/stream").mock(
return_value=httpx.Response(
200,
headers={"content-type": "text/event-stream"},
content=b'id: 1\ndata: {"category":"api"}\n\n',
)
)
with Kernel(base_url=base_url, api_key=api_key, _strict_response_validation=True) as client:
_cache_browser(client)
stream = client.browsers.telemetry.stream("sess-1")
stream.close()

assert route.called
request = cast(httpx.Request, cast(Any, route.calls[0]).request)
assert request.url.path == "/browser/kernel/telemetry/stream"
assert request.url.params.get("jwt") == "token-abc"
assert request.headers.get("Authorization") is None


@respx.mock
def test_browser_request_params_cannot_override_target_url_or_jwt() -> None:
route = respx.get("http://browser-session.test/browser/kernel/curl/raw").mock(
Expand Down Expand Up @@ -315,7 +337,7 @@ def test_browser_route_from_browser_requires_base_url_and_jwt() -> None:

def test_browser_routing_config_from_env_defaults_to_curl(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.delenv("KERNEL_BROWSER_ROUTING_SUBRESOURCES", raising=False)
assert browser_routing_config_from_env().subresources == ("curl",)
assert browser_routing_config_from_env().subresources == ("curl", "telemetry")


def test_browser_routing_config_from_env_empty_string_disables_routing(monkeypatch: pytest.MonkeyPatch) -> None:
Expand Down
26 changes: 26 additions & 0 deletions tests/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,32 @@ def body() -> Iterator[bytes]:
await assert_empty_iter(iterator)


@pytest.mark.asyncio
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
async def test_keepalive_comment_after_event_with_id(sync: bool, client: Kernel, async_client: AsyncKernel) -> None:
# A ``:`` comment frame (the server's SSE keepalive) that arrives after an event which set an
# id must be ignored, not dispatched as an empty event. last_event_id is sticky, so this is a
# regression guard against it leaking an undecodable empty frame into the typed stream.
def body() -> Iterator[bytes]:
yield b"id: 1\n"
yield b'data: {"foo":true}\n'
yield b"\n"
yield b":\n"
yield b"\n"
yield b'data: {"bar":false}\n'
yield b"\n"

iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)

sse = await iter_next(iterator)
assert sse.json() == {"foo": True}

sse = await iter_next(iterator)
assert sse.json() == {"bar": False}

await assert_empty_iter(iterator)


@pytest.mark.asyncio
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
async def test_data_missing_event(sync: bool, client: Kernel, async_client: AsyncKernel) -> None:
Expand Down