diff --git a/server/cmd/api/api/api.go b/server/cmd/api/api/api.go index 6e5b4440..4f6c671e 100644 --- a/server/cmd/api/api/api.go +++ b/server/cmd/api/api/api.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "log/slog" "os" "os/exec" "sync" @@ -103,7 +104,7 @@ func New( return nil, fmt.Errorf("captureSession cannot be nil") } - mon := cdpmonitor.New(upstreamMgr, captureSession.Publish, displayNum) + mon := cdpmonitor.New(upstreamMgr, captureSession.Publish, displayNum, slog.Default()) ctx, cancel := context.WithCancel(context.Background()) return &ApiService{ diff --git a/server/lib/cdpmonitor/cdp_test.go b/server/lib/cdpmonitor/cdp_test.go index aedce159..0917451c 100644 --- a/server/lib/cdpmonitor/cdp_test.go +++ b/server/lib/cdpmonitor/cdp_test.go @@ -333,3 +333,39 @@ func startMonitor(t *testing.T, srv *testServer, fn ResponderFunc) (*Monitor, *e } return m, ec, cleanup } + +// newComputedMonitor creates an unconnected Monitor for testing computed state +// (network_idle, layout_settled, navigation_settled) without a real websocket. +func newComputedMonitor(t *testing.T) (*Monitor, *eventCollector) { + t.Helper() + ec := newEventCollector() + upstream := newTestUpstream("ws://127.0.0.1:0") + m := New(upstream, ec.publishFn(), 0, discardLogger) + return m, ec +} + +// navigateMonitor sends a Page.frameNavigated to reset computed state. +func navigateMonitor(m *Monitor, url string) { + p, _ := json.Marshal(map[string]any{ + "frame": map[string]any{"id": "f1", "url": url}, + }) + m.handleFrameNavigated(p, "s1") +} + +// simulateRequest sends a Network.requestWillBeSent through the handler. +func simulateRequest(m *Monitor, id string) { + p, _ := json.Marshal(map[string]any{ + "requestId": id, "resourceType": "Document", + "request": map[string]any{"method": "GET", "url": "https://example.com/" + id}, + }) + m.handleNetworkRequest(p, "s1") +} + +// simulateFinished stores minimal state and sends Network.loadingFinished. +func simulateFinished(m *Monitor, id string) { + m.pendReqMu.Lock() + m.pendingRequests[id] = networkReqState{method: "GET", url: "https://example.com/" + id} + m.pendReqMu.Unlock() + p, _ := json.Marshal(map[string]any{"requestId": id}) + m.handleLoadingFinished(p, "s1") +} diff --git a/server/lib/cdpmonitor/handlers.go b/server/lib/cdpmonitor/handlers.go new file mode 100644 index 00000000..83fe6ea9 --- /dev/null +++ b/server/lib/cdpmonitor/handlers.go @@ -0,0 +1,486 @@ +package cdpmonitor + +import ( + "encoding/base64" + "encoding/json" + "time" + + "github.com/kernel/kernel-images/server/lib/events" +) + +// logUnmarshalErr logs a Debug message when a handler can't parse CDP params. +// These indicate Chrome sent an unexpected params shape, rare and non-actionable +// at Warn/Error level, but useful in verbose mode. +func (m *Monitor) logUnmarshalErr(method string, err error) { + m.log.Debug("cdpmonitor: failed to parse CDP params", "method", method, "err", err) +} + +// publishEvent stamps common fields and publishes an event. +func (m *Monitor) publishEvent(eventType string, category events.EventCategory, source events.Source, sourceEvent string, data json.RawMessage, sessionID string) { + src := source + src.Event = sourceEvent + if sessionID != "" { + if src.Metadata == nil { + src.Metadata = make(map[string]string) + } + src.Metadata[MetadataKeyCDPSessionID] = sessionID + } + m.publish(events.Event{ + Ts: time.Now().UnixMicro(), + Type: eventType, + Category: category, + Source: src, + Data: data, + }) +} + +// dispatchEvent routes a CDP event to its handler. +func (m *Monitor) dispatchEvent(msg cdpMessage) { + switch msg.Method { + case "Runtime.consoleAPICalled": + m.handleConsole(msg.Params, msg.SessionID) + case "Runtime.exceptionThrown": + m.handleExceptionThrown(msg.Params, msg.SessionID) + case "Runtime.bindingCalled": + m.handleBindingCalled(msg.Params, msg.SessionID) + case "Network.requestWillBeSent": + m.handleNetworkRequest(msg.Params, msg.SessionID) + case "Network.responseReceived": + m.handleResponseReceived(msg.Params, msg.SessionID) + case "Network.loadingFinished": + m.handleLoadingFinished(msg.Params, msg.SessionID) + case "Network.loadingFailed": + m.handleLoadingFailed(msg.Params, msg.SessionID) + case "Page.frameNavigated": + m.handleFrameNavigated(msg.Params, msg.SessionID) + case "Page.domContentEventFired": + m.handleDOMContentLoaded(msg.Params, msg.SessionID) + case "Page.loadEventFired": + m.handleLoadEventFired(msg.Params, msg.SessionID) + case "PerformanceTimeline.timelineEventAdded": + m.handleTimelineEvent(msg.Params, msg.SessionID) + case "Target.attachedToTarget": + m.handleAttachedToTarget(msg) + case "Target.targetCreated": + m.handleTargetCreated(msg.Params, msg.SessionID) + case "Target.targetDestroyed": + m.handleTargetDestroyed(msg.Params, msg.SessionID) + case "Target.detachedFromTarget": + m.handleDetachedFromTarget(msg.Params) + } +} + +func (m *Monitor) handleConsole(params json.RawMessage, sessionID string) { + var p cdpConsoleParams + if err := json.Unmarshal(params, &p); err != nil { + m.logUnmarshalErr("Runtime.consoleAPICalled", err) + return + } + + text := "" + if len(p.Args) > 0 { + text = consoleArgString(p.Args[0]) + } + argValues := make([]string, 0, len(p.Args)) + for _, a := range p.Args { + argValues = append(argValues, consoleArgString(a)) + } + data, _ := json.Marshal(map[string]any{ + "level": p.Type, + "text": text, + "args": argValues, + "stack_trace": p.StackTrace, + }) + m.publishEvent(EventConsoleLog, events.CategoryConsole, events.Source{Kind: events.KindCDP}, "Runtime.consoleAPICalled", data, sessionID) +} + +func (m *Monitor) handleExceptionThrown(params json.RawMessage, sessionID string) { + var p cdpExceptionDetails + if err := json.Unmarshal(params, &p); err != nil { + m.logUnmarshalErr("Runtime.exceptionThrown", err) + return + } + data, _ := json.Marshal(map[string]any{ + "text": p.ExceptionDetails.Text, + "line": p.ExceptionDetails.LineNumber, + "column": p.ExceptionDetails.ColumnNumber, + "url": p.ExceptionDetails.URL, + "stack_trace": p.ExceptionDetails.StackTrace, + }) + m.publishEvent(EventConsoleError, events.CategoryConsole, events.Source{Kind: events.KindCDP}, "Runtime.exceptionThrown", data, sessionID) + m.tryScreenshot(m.getLifecycleCtx()) +} + +// bindingMinInterval is the minimum time between accepted __kernelEvent binding +// calls per session. This caps throughput at 20 events/s per session, preventing +// a misbehaving page from flooding the event pipeline. +const bindingMinInterval = 50 * time.Millisecond + +// handleBindingCalled processes __kernelEvent binding calls from the page. +func (m *Monitor) handleBindingCalled(params json.RawMessage, sessionID string) { + var p struct { + Name string `json:"name"` + Payload string `json:"payload"` + } + if err := json.Unmarshal(params, &p); err != nil { + m.logUnmarshalErr("Runtime.bindingCalled", err) + return + } + if p.Name != bindingName { + return + } + + payload := json.RawMessage(p.Payload) + if !json.Valid(payload) { + return + } + var header struct { + Type string `json:"type"` + } + if err := json.Unmarshal(payload, &header); err != nil { + return + } + switch header.Type { + case EventInteractionClick, EventInteractionKey, EventScrollSettled: + default: + return + } + + // Rate-limit per (session, event type): cap at 20 events/s per pair so a + // misbehaving page cannot flood the event pipeline with a single event type. + now := time.Now() + rateKey := sessionID + ":" + header.Type + m.bindingRateMu.Lock() + last := m.bindingLastSeen[rateKey] + if now.Sub(last) < bindingMinInterval { + m.bindingRateMu.Unlock() + return + } + m.bindingLastSeen[rateKey] = now + m.bindingRateMu.Unlock() + + m.publishEvent(header.Type, events.CategoryInteraction, events.Source{Kind: events.KindCDP}, "Runtime.bindingCalled", payload, sessionID) +} + +// handleTimelineEvent processes PerformanceTimeline layout-shift events. +func (m *Monitor) handleTimelineEvent(params json.RawMessage, sessionID string) { + var p struct { + Event struct { + Type string `json:"type"` + LayoutShift json.RawMessage `json:"layoutShiftDetails,omitempty"` + } `json:"event"` + } + if err := json.Unmarshal(params, &p); err != nil { + m.logUnmarshalErr("PerformanceTimeline.timelineEventAdded", err) + return + } + if p.Event.Type != timelineEventLayoutShift { + return + } + m.publishEvent(EventLayoutShift, events.CategoryPage, events.Source{Kind: events.KindCDP}, "PerformanceTimeline.timelineEventAdded", params, sessionID) + m.computed.onLayoutShift() +} + +// handleNetworkRequest publishes network_request events. +// NOTE: events include raw headers, post_data, and (on response) truncated +// bodies which may contain cookies, bearer tokens, or other credentials. +// This mirrors what CDP/DevTools itself exposes. Consumers should treat the +// event stream as privileged data; opt-in redaction can be added later. +func (m *Monitor) handleNetworkRequest(params json.RawMessage, sessionID string) { + var p cdpNetworkRequestParams + if err := json.Unmarshal(params, &p); err != nil { + m.logUnmarshalErr("Network.requestWillBeSent", err) + return + } + // Extract only the initiator type; the stack trace is too verbose and dominates event size. + var initiatorType string + var raw struct { + Type string `json:"type"` + } + if json.Unmarshal(p.Initiator, &raw) == nil { + initiatorType = raw.Type + } + + // Redirects reuse the same requestId and fire additional requestWillBeSent + // events, but only a single loadingFinished fires per chain. Only increment + // netPending for genuinely new requests to avoid permanently inflating the + // counter and blocking network_idle. + m.pendReqMu.Lock() + existing, isRedirect := m.pendingRequests[p.RequestID] + addedAt := existing.addedAt + if !isRedirect { + addedAt = time.Now() + } + m.pendingRequests[p.RequestID] = networkReqState{ + sessionID: sessionID, + method: p.Request.Method, + url: p.Request.URL, + headers: p.Request.Headers, + postData: p.Request.PostData, + resourceType: p.ResourceType, + addedAt: addedAt, + } + m.pendReqMu.Unlock() + ev := map[string]any{ + "method": p.Request.Method, + "url": p.Request.URL, + "headers": p.Request.Headers, + "initiator_type": initiatorType, + } + if p.Request.PostData != "" { + ev["post_data"] = p.Request.PostData + } + if p.ResourceType != "" { + ev["resource_type"] = p.ResourceType + } + data, _ := json.Marshal(ev) + m.publishEvent(EventNetworkRequest, events.CategoryNetwork, events.Source{Kind: events.KindCDP}, "Network.requestWillBeSent", data, sessionID) + if !isRedirect { + m.computed.onRequest() + } +} + +func (m *Monitor) handleResponseReceived(params json.RawMessage, _ string) { + var p cdpResponseReceivedParams + if err := json.Unmarshal(params, &p); err != nil { + m.logUnmarshalErr("Network.responseReceived", err) + return + } + m.pendReqMu.Lock() + if state, ok := m.pendingRequests[p.RequestID]; ok { + state.status = p.Response.Status + state.statusText = p.Response.StatusText + state.resHeaders = p.Response.Headers + state.mimeType = p.Response.MimeType + m.pendingRequests[p.RequestID] = state + } + m.pendReqMu.Unlock() +} + +func (m *Monitor) handleLoadingFinished(params json.RawMessage, sessionID string) { + var p struct { + RequestID string `json:"requestId"` + } + if err := json.Unmarshal(params, &p); err != nil { + m.logUnmarshalErr("Network.loadingFinished", err) + return + } + m.pendReqMu.Lock() + state, ok := m.pendingRequests[p.RequestID] + if ok { + delete(m.pendingRequests, p.RequestID) + } + m.pendReqMu.Unlock() + if !ok { + return + } + m.computed.onLoadingFinished() + // Fetch response body async to avoid blocking readLoop; binary types are skipped. + m.asyncWg.Go(func() { + body := m.fetchResponseBody(p.RequestID, sessionID, state) + ev := map[string]any{ + "method": state.method, + "url": state.url, + "status": state.status, + "headers": state.resHeaders, + } + if state.statusText != "" { + ev["status_text"] = state.statusText + } + if state.mimeType != "" { + ev["mime_type"] = state.mimeType + } + if state.resourceType != "" { + ev["resource_type"] = state.resourceType + } + if body != "" { + ev["body"] = body + } + data, _ := json.Marshal(ev) + m.publishEvent(EventNetworkResponse, events.CategoryNetwork, events.Source{Kind: events.KindCDP}, "Network.loadingFinished", data, sessionID) + }) +} + +// fetchResponseBody retrieves and truncates the response body for textual resources. +func (m *Monitor) fetchResponseBody(requestID, sessionID string, state networkReqState) string { + if !isTextualResource(state.resourceType, state.mimeType) { + return "" + } + result, err := m.send(m.getLifecycleCtx(), "Network.getResponseBody", map[string]any{ + "requestId": requestID, + }, sessionID) + if err != nil { + return "" + } + var resp struct { + Body string `json:"body"` + Base64Encoded bool `json:"base64Encoded"` + } + if json.Unmarshal(result, &resp) != nil { + return "" + } + body := resp.Body + if resp.Base64Encoded { + decoded, err := base64.StdEncoding.DecodeString(body) + if err != nil { + return "" + } + body = string(decoded) + } + return truncateBody(body, bodyCapFor(state.mimeType)) +} + +func (m *Monitor) handleLoadingFailed(params json.RawMessage, sessionID string) { + var p struct { + RequestID string `json:"requestId"` + ErrorText string `json:"errorText"` + Canceled bool `json:"canceled"` + } + if err := json.Unmarshal(params, &p); err != nil { + m.logUnmarshalErr("Network.loadingFailed", err) + return + } + m.pendReqMu.Lock() + state, ok := m.pendingRequests[p.RequestID] + if ok { + delete(m.pendingRequests, p.RequestID) + } + m.pendReqMu.Unlock() + + ev := map[string]any{ + "error_text": p.ErrorText, + "canceled": p.Canceled, + } + if ok { + ev["url"] = state.url + } + data, _ := json.Marshal(ev) + m.publishEvent(EventNetworkLoadingFailed, events.CategoryNetwork, events.Source{Kind: events.KindCDP}, "Network.loadingFailed", data, sessionID) + if ok { + m.computed.onLoadingFinished() + } +} + +func (m *Monitor) handleFrameNavigated(params json.RawMessage, sessionID string) { + var p struct { + Frame struct { + ID string `json:"id"` + ParentID string `json:"parentId"` + URL string `json:"url"` + } `json:"frame"` + } + if err := json.Unmarshal(params, &p); err != nil { + m.logUnmarshalErr("Page.frameNavigated", err) + return + } + data, _ := json.Marshal(map[string]any{ + "url": p.Frame.URL, + "frame_id": p.Frame.ID, + "parent_frame_id": p.Frame.ParentID, + }) + m.publishEvent(EventNavigation, events.CategoryPage, events.Source{Kind: events.KindCDP}, "Page.frameNavigated", data, sessionID) + + // Only reset state for top-level navigations; subframe (iframe) navigations + // should not disrupt main-page tracking. + if p.Frame.ParentID == "" { + m.mainSessionID.Store(sessionID) + m.pendReqMu.Lock() + for id, req := range m.pendingRequests { + if req.sessionID == sessionID { + delete(m.pendingRequests, id) + } + } + inflight := len(m.pendingRequests) + // Reset computed state while still holding pendReqMu so new requests + // arriving concurrently can't increment netPending before the reset. + m.computed.resetOnNavigation(inflight) + m.pendReqMu.Unlock() + } +} + +func (m *Monitor) handleDOMContentLoaded(params json.RawMessage, sessionID string) { + m.publishEvent(EventDOMContentLoaded, events.CategoryPage, events.Source{Kind: events.KindCDP}, "Page.domContentEventFired", params, sessionID) + // Only advance the state machine for the main frame; subframe events arrive + // on their own sessionId and would trigger navigation_settled prematurely. + if m.mainSessionID.Load() == sessionID { + m.computed.onDOMContentLoaded() + } +} + +func (m *Monitor) handleLoadEventFired(params json.RawMessage, sessionID string) { + m.publishEvent(EventPageLoad, events.CategoryPage, events.Source{Kind: events.KindCDP}, "Page.loadEventFired", params, sessionID) + if m.mainSessionID.Load() == sessionID { + m.computed.onPageLoad() + m.tryScreenshot(m.getLifecycleCtx()) + } +} + +// handleAttachedToTarget stores the new session then enables domains and injects script. +func (m *Monitor) handleAttachedToTarget(msg cdpMessage) { + var params cdpAttachedToTargetParams + if err := json.Unmarshal(msg.Params, ¶ms); err != nil { + m.logUnmarshalErr("Target.attachedToTarget", err) + return + } + m.sessionsMu.Lock() + m.sessions[params.SessionID] = targetInfo{ + targetID: params.TargetInfo.TargetID, + url: params.TargetInfo.URL, + targetType: params.TargetInfo.Type, + } + m.sessionsMu.Unlock() + + targetType := params.TargetInfo.Type + // Async to avoid blocking the readLoop. + m.asyncWg.Go(func() { + ctx := m.getLifecycleCtx() + m.enableDomains(ctx, params.SessionID, targetType) + if isPageLikeTarget(targetType) { + _ = m.injectScript(ctx, params.SessionID) + } + }) +} + +func (m *Monitor) handleTargetCreated(params json.RawMessage, sessionID string) { + var p cdpTargetCreatedParams + if err := json.Unmarshal(params, &p); err != nil { + m.logUnmarshalErr("Target.targetCreated", err) + return + } + data, _ := json.Marshal(map[string]any{ + "target_id": p.TargetInfo.TargetID, + "target_type": p.TargetInfo.Type, + "url": p.TargetInfo.URL, + }) + m.publishEvent(EventTargetCreated, events.CategoryPage, events.Source{Kind: events.KindCDP}, "Target.targetCreated", data, sessionID) +} + +func (m *Monitor) handleTargetDestroyed(params json.RawMessage, sessionID string) { + var p struct { + TargetID string `json:"targetId"` + } + if err := json.Unmarshal(params, &p); err != nil { + m.logUnmarshalErr("Target.targetDestroyed", err) + return + } + data, _ := json.Marshal(map[string]any{ + "target_id": p.TargetID, + }) + m.publishEvent(EventTargetDestroyed, events.CategoryPage, events.Source{Kind: events.KindCDP}, "Target.targetDestroyed", data, sessionID) +} + +func (m *Monitor) handleDetachedFromTarget(params json.RawMessage) { + var p struct { + SessionID string `json:"sessionId"` + } + if err := json.Unmarshal(params, &p); err != nil { + m.logUnmarshalErr("Target.detachedFromTarget", err) + return + } + if p.SessionID == "" { + return + } + m.sessionsMu.Lock() + delete(m.sessions, p.SessionID) + m.sessionsMu.Unlock() +} diff --git a/server/lib/cdpmonitor/handlers_stub.go b/server/lib/cdpmonitor/handlers_stub.go deleted file mode 100644 index 1d4a1a9b..00000000 --- a/server/lib/cdpmonitor/handlers_stub.go +++ /dev/null @@ -1,5 +0,0 @@ -package cdpmonitor - -// dispatchEvent is a placeholder so monitor.go compiles without handlers.go. -// Replaced by the real implementation in PR 3. -func (m *Monitor) dispatchEvent(msg cdpMessage) {} diff --git a/server/lib/cdpmonitor/handlers_test.go b/server/lib/cdpmonitor/handlers_test.go new file mode 100644 index 00000000..1518b412 --- /dev/null +++ b/server/lib/cdpmonitor/handlers_test.go @@ -0,0 +1,358 @@ +package cdpmonitor + +import ( + "encoding/json" + "sync/atomic" + "testing" + "time" + + "github.com/kernel/kernel-images/server/lib/events" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestConsoleEvents(t *testing.T) { + srv := newTestServer(t) + defer srv.close() + + _, ec, cleanup := startMonitor(t, srv, nil) + defer cleanup() + + t.Run("console_log", func(t *testing.T) { + srv.sendToMonitor(t, map[string]any{ + "method": "Runtime.consoleAPICalled", + "params": map[string]any{ + "type": "log", + "args": []any{map[string]any{"type": "string", "value": "hello world"}}, + }, + }) + ev := ec.waitFor(t, "console_log", 2*time.Second) + assert.Equal(t, events.CategoryConsole, ev.Category) + assert.Equal(t, events.KindCDP, ev.Source.Kind) + assert.Equal(t, "Runtime.consoleAPICalled", ev.Source.Event) + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + assert.Equal(t, "log", data["level"]) + assert.Equal(t, "hello world", data["text"]) + }) + + t.Run("exception_thrown", func(t *testing.T) { + srv.sendToMonitor(t, map[string]any{ + "method": "Runtime.exceptionThrown", + "params": map[string]any{ + "timestamp": 1234.5, + "exceptionDetails": map[string]any{ + "text": "Uncaught TypeError", + "lineNumber": 42, + "columnNumber": 7, + "url": "https://example.com/app.js", + }, + }, + }) + ev := ec.waitFor(t, "console_error", 2*time.Second) + assert.Equal(t, events.CategoryConsole, ev.Category) + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + assert.Equal(t, "Uncaught TypeError", data["text"]) + assert.Equal(t, float64(42), data["line"]) + }) + + t.Run("non_string_args", func(t *testing.T) { + srv.sendToMonitor(t, map[string]any{ + "method": "Runtime.consoleAPICalled", + "params": map[string]any{ + "type": "log", + "args": []any{ + map[string]any{"type": "number", "value": 42}, + map[string]any{"type": "object", "value": map[string]any{"key": "val"}}, + map[string]any{"type": "undefined"}, + }, + }, + }) + ev := ec.waitForNew(t, "console_log", 2*time.Second) + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + args := data["args"].([]any) + assert.Equal(t, "42", args[0]) + assert.Contains(t, args[1], "key") + assert.Equal(t, "undefined", args[2]) + }) +} + +func TestNetworkEvents(t *testing.T) { + srv := newTestServer(t) + defer srv.close() + + var getBodyCalled atomic.Bool + responder := func(msg cdpMessage) any { + if msg.Method == "Network.getResponseBody" { + getBodyCalled.Store(true) + return map[string]any{ + "id": msg.ID, + "result": map[string]any{"body": `{"ok":true}`, "base64Encoded": false}, + } + } + return nil + } + _, ec, cleanup := startMonitor(t, srv, responder) + defer cleanup() + + t.Run("request_and_response", func(t *testing.T) { + srv.sendToMonitor(t, map[string]any{ + "method": "Network.requestWillBeSent", + "params": map[string]any{ + "requestId": "req-001", + "resourceType": "XHR", + "request": map[string]any{ + "method": "POST", + "url": "https://api.example.com/data", + "headers": map[string]any{"Content-Type": "application/json"}, + }, + "initiator": map[string]any{"type": "script"}, + }, + }) + ev := ec.waitFor(t, "network_request", 2*time.Second) + assert.Equal(t, events.CategoryNetwork, ev.Category) + assert.Equal(t, "Network.requestWillBeSent", ev.Source.Event) + + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + assert.Equal(t, "POST", data["method"]) + assert.Equal(t, "https://api.example.com/data", data["url"]) + + srv.sendToMonitor(t, map[string]any{ + "method": "Network.responseReceived", + "params": map[string]any{ + "requestId": "req-001", + "response": map[string]any{ + "status": 200, "statusText": "OK", + "headers": map[string]any{"Content-Type": "application/json"}, "mimeType": "application/json", + }, + }, + }) + srv.sendToMonitor(t, map[string]any{ + "method": "Network.loadingFinished", + "params": map[string]any{"requestId": "req-001"}, + }) + + ev2 := ec.waitFor(t, "network_response", 3*time.Second) + assert.Equal(t, "Network.loadingFinished", ev2.Source.Event) + var data2 map[string]any + require.NoError(t, json.Unmarshal(ev2.Data, &data2)) + assert.Equal(t, float64(200), data2["status"]) + assert.NotEmpty(t, data2["body"]) + }) + + t.Run("loading_failed", func(t *testing.T) { + srv.sendToMonitor(t, map[string]any{ + "method": "Network.requestWillBeSent", + "params": map[string]any{ + "requestId": "req-002", + "request": map[string]any{"method": "GET", "url": "https://fail.example.com/"}, + }, + }) + ec.waitForNew(t, "network_request", 2*time.Second) + + srv.sendToMonitor(t, map[string]any{ + "method": "Network.loadingFailed", + "params": map[string]any{ + "requestId": "req-002", + "errorText": "net::ERR_CONNECTION_REFUSED", + "canceled": false, + }, + }) + ev := ec.waitFor(t, "network_loading_failed", 2*time.Second) + assert.Equal(t, events.CategoryNetwork, ev.Category) + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + assert.Equal(t, "net::ERR_CONNECTION_REFUSED", data["error_text"]) + }) + + t.Run("binary_resource_skips_body", func(t *testing.T) { + getBodyCalled.Store(false) + srv.sendToMonitor(t, map[string]any{ + "method": "Network.requestWillBeSent", + "params": map[string]any{ + "requestId": "img-001", + "resourceType": "Image", + "request": map[string]any{"method": "GET", "url": "https://example.com/photo.png"}, + }, + }) + srv.sendToMonitor(t, map[string]any{ + "method": "Network.responseReceived", + "params": map[string]any{ + "requestId": "img-001", + "response": map[string]any{"status": 200, "statusText": "OK", "headers": map[string]any{}, "mimeType": "image/png"}, + }, + }) + srv.sendToMonitor(t, map[string]any{ + "method": "Network.loadingFinished", + "params": map[string]any{"requestId": "img-001"}, + }) + + ev := ec.waitForNew(t, "network_response", 3*time.Second) + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + assert.Nil(t, data["body"], "binary resource should not have body field") + assert.False(t, getBodyCalled.Load(), "should not call getResponseBody for images") + }) +} + +func TestPageEvents(t *testing.T) { + srv := newTestServer(t) + defer srv.close() + + _, ec, cleanup := startMonitor(t, srv, nil) + defer cleanup() + + srv.sendToMonitor(t, map[string]any{ + "method": "Page.frameNavigated", + "params": map[string]any{ + "frame": map[string]any{"id": "frame-1", "url": "https://example.com/page"}, + }, + }) + ev := ec.waitFor(t, "navigation", 2*time.Second) + assert.Equal(t, events.CategoryPage, ev.Category) + assert.Equal(t, "Page.frameNavigated", ev.Source.Event) + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + assert.Equal(t, "https://example.com/page", data["url"]) + + srv.sendToMonitor(t, map[string]any{ + "method": "Page.domContentEventFired", + "params": map[string]any{"timestamp": 1000.0}, + }) + ev2 := ec.waitFor(t, "dom_content_loaded", 2*time.Second) + assert.Equal(t, events.CategoryPage, ev2.Category) + srv.sendToMonitor(t, map[string]any{ + "method": "Page.loadEventFired", + "params": map[string]any{"timestamp": 1001.0}, + }) + ev3 := ec.waitFor(t, "page_load", 2*time.Second) + assert.Equal(t, events.CategoryPage, ev3.Category) +} + +func TestTargetEvents(t *testing.T) { + srv := newTestServer(t) + defer srv.close() + + _, ec, cleanup := startMonitor(t, srv, nil) + defer cleanup() + + srv.sendToMonitor(t, map[string]any{ + "method": "Target.targetCreated", + "params": map[string]any{ + "targetInfo": map[string]any{"targetId": "t-1", "type": "page", "url": "https://new.example.com"}, + }, + }) + ev := ec.waitFor(t, "target_created", 2*time.Second) + assert.Equal(t, events.CategoryPage, ev.Category) + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + assert.Equal(t, "t-1", data["target_id"]) + + srv.sendToMonitor(t, map[string]any{ + "method": "Target.targetDestroyed", + "params": map[string]any{"targetId": "t-1"}, + }) + ev2 := ec.waitFor(t, "target_destroyed", 2*time.Second) + assert.Equal(t, events.CategoryPage, ev2.Category) +} + +func TestBindingAndTimeline(t *testing.T) { + srv := newTestServer(t) + defer srv.close() + + _, ec, cleanup := startMonitor(t, srv, nil) + defer cleanup() + + t.Run("interaction_click", func(t *testing.T) { + srv.sendToMonitor(t, map[string]any{ + "method": "Runtime.bindingCalled", + "params": map[string]any{ + "name": "__kernelEvent", + "payload": `{"type":"interaction_click","x":10,"y":20,"selector":"button","tag":"BUTTON","text":"OK"}`, + }, + }) + ev := ec.waitFor(t, "interaction_click", 2*time.Second) + assert.Equal(t, events.CategoryInteraction, ev.Category) + assert.Equal(t, "Runtime.bindingCalled", ev.Source.Event) + }) + + t.Run("scroll_settled", func(t *testing.T) { + srv.sendToMonitor(t, map[string]any{ + "method": "Runtime.bindingCalled", + "params": map[string]any{ + "name": "__kernelEvent", + "payload": `{"type":"scroll_settled","from_x":0,"from_y":0,"to_x":0,"to_y":500,"target_selector":"body"}`, + }, + }) + ev := ec.waitFor(t, "scroll_settled", 2*time.Second) + assert.Equal(t, events.CategoryInteraction, ev.Category) + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + assert.Equal(t, float64(500), data["to_y"]) + }) + + t.Run("layout_shift", func(t *testing.T) { + srv.sendToMonitor(t, map[string]any{ + "method": "PerformanceTimeline.timelineEventAdded", + "params": map[string]any{ + "event": map[string]any{"type": "layout-shift"}, + }, + }) + ev := ec.waitFor(t, "layout_shift", 2*time.Second) + assert.Equal(t, events.KindCDP, ev.Source.Kind) + assert.Equal(t, "PerformanceTimeline.timelineEventAdded", ev.Source.Event) + }) + + t.Run("unknown_binding_ignored", func(t *testing.T) { + srv.sendToMonitor(t, map[string]any{ + "method": "Runtime.bindingCalled", + "params": map[string]any{ + "name": "someOtherBinding", + "payload": `{"type":"interaction_click"}`, + }, + }) + ec.assertNone(t, "interaction_click", 100*time.Millisecond) + }) + + t.Run("rate_limited_per_session", func(t *testing.T) { + // Send two binding events back-to-back within the 50ms window. + // Only the first should produce a published event. + before := func() int { + ec.mu.Lock() + defer ec.mu.Unlock() + count := 0 + for _, ev := range ec.events { + if ev.Type == EventInteractionClick { + count++ + } + } + return count + } + countBefore := before() + + for range 3 { + srv.sendToMonitor(t, map[string]any{ + "method": "Runtime.bindingCalled", + "params": map[string]any{ + "name": "__kernelEvent", + "payload": `{"type":"interaction_click","x":1,"y":1,"selector":"a","tag":"A","text":"x"}`, + }, + }) + } + + // Wait a bit for async delivery, then check only 1 new event was published. + time.Sleep(200 * time.Millisecond) + ec.mu.Lock() + countAfter := 0 + for _, ev := range ec.events { + if ev.Type == EventInteractionClick { + countAfter++ + } + } + ec.mu.Unlock() + assert.Equal(t, countBefore+1, countAfter, "rate limiter should have dropped the 2nd and 3rd events") + }) +} diff --git a/server/lib/cdpmonitor/monitor.go b/server/lib/cdpmonitor/monitor.go index 631fcd06..a0323f8f 100644 --- a/server/lib/cdpmonitor/monitor.go +++ b/server/lib/cdpmonitor/monitor.go @@ -408,7 +408,7 @@ func (m *Monitor) initSession(ctx context.Context) { // Without auto-attach the monitor will never see new targets: treat as fatal. m.log.Error("cdpmonitor: Target.setAutoAttach failed — monitor will not observe new targets", "err", err) m.publish(events.Event{ - Ts: time.Now().UnixMilli(), + Ts: time.Now().UnixMicro(), Type: EventMonitorInitFailed, Category: events.CategorySystem, Source: events.Source{Kind: events.KindLocalProcess}, @@ -516,7 +516,7 @@ func (m *Monitor) handleUpstreamRestart(ctx context.Context, newURL string) { return } m.publish(events.Event{ - Ts: time.Now().UnixMilli(), + Ts: time.Now().UnixMicro(), Type: EventMonitorDisconnected, Category: events.CategorySystem, Source: events.Source{Kind: events.KindLocalProcess}, @@ -537,7 +537,7 @@ func (m *Monitor) handleUpstreamRestart(ctx context.Context, newURL string) { if ctx.Err() == nil { m.running.Store(false) m.publish(events.Event{ - Ts: time.Now().UnixMilli(), + Ts: time.Now().UnixMicro(), Type: EventMonitorReconnectFailed, Category: events.CategorySystem, Source: events.Source{Kind: events.KindLocalProcess}, @@ -559,7 +559,7 @@ func (m *Monitor) handleUpstreamRestart(ctx context.Context, newURL string) { m.log.Info("cdpmonitor: reconnected", "url", newURL, "duration_ms", reconnectDurationMs) m.publish(events.Event{ - Ts: time.Now().UnixMilli(), + Ts: time.Now().UnixMicro(), Type: EventMonitorReconnected, Category: events.CategorySystem, Source: events.Source{Kind: events.KindLocalProcess}, diff --git a/server/lib/cdpmonitor/monitor_test.go b/server/lib/cdpmonitor/monitor_test.go index d0047186..f86e33c8 100644 --- a/server/lib/cdpmonitor/monitor_test.go +++ b/server/lib/cdpmonitor/monitor_test.go @@ -168,3 +168,190 @@ func TestInitSessionAutoAttachFailure(t *testing.T) { ec.waitFor(t, EventMonitorInitFailed, 3*time.Second) } + +func TestAutoAttach(t *testing.T) { + srv := newTestServer(t) + defer srv.close() + + ec := newEventCollector() + upstream := newTestUpstream(srv.wsURL()) + m := New(upstream, ec.publishFn(), 99, discardLogger) + require.NoError(t, m.Start(context.Background())) + defer m.Stop() + + msg := srv.readFromMonitor(t, 3*time.Second) + assert.Equal(t, "Target.setAutoAttach", msg.Method) + + var params struct { + AutoAttach bool `json:"autoAttach"` + WaitForDebuggerOnStart bool `json:"waitForDebuggerOnStart"` + Flatten bool `json:"flatten"` + } + require.NoError(t, json.Unmarshal(msg.Params, ¶ms)) + assert.True(t, params.AutoAttach) + assert.False(t, params.WaitForDebuggerOnStart) + assert.True(t, params.Flatten) + + stopResponder := make(chan struct{}) + go listenAndRespond(srv, stopResponder, nil) + defer close(stopResponder) + srv.sendToMonitor(t, map[string]any{"id": msg.ID, "result": map[string]any{}}) + + srv.sendToMonitor(t, map[string]any{ + "method": "Target.attachedToTarget", + "params": map[string]any{ + "sessionId": "session-abc", + "targetInfo": map[string]any{"targetId": "target-xyz", "type": "page", "url": "https://example.com"}, + }, + }) + require.Eventually(t, func() bool { + m.sessionsMu.RLock() + defer m.sessionsMu.RUnlock() + _, ok := m.sessions["session-abc"] + return ok + }, 2*time.Second, 50*time.Millisecond, "session not stored") + + m.sessionsMu.RLock() + info := m.sessions["session-abc"] + m.sessionsMu.RUnlock() + assert.Equal(t, "target-xyz", info.targetID) + assert.Equal(t, "page", info.targetType) +} + +func TestAttachExistingTargets(t *testing.T) { + srv := newTestServer(t) + defer srv.close() + + responder := func(msg cdpMessage) any { + switch msg.Method { + case "Target.getTargets": + return map[string]any{ + "id": msg.ID, + "result": map[string]any{ + "targetInfos": []any{ + map[string]any{"targetId": "existing-1", "type": "page", "url": "https://preexisting.example.com"}, + }, + }, + } + case "Target.attachToTarget": + srv.sendToMonitor(t, map[string]any{ + "method": "Target.attachedToTarget", + "params": map[string]any{ + "sessionId": "session-existing-1", + "targetInfo": map[string]any{"targetId": "existing-1", "type": "page", "url": "https://preexisting.example.com"}, + }, + }) + return map[string]any{"id": msg.ID, "result": map[string]any{"sessionId": "session-existing-1"}} + } + return nil + } + + m, _, cleanup := startMonitor(t, srv, responder) + defer cleanup() + + require.Eventually(t, func() bool { + m.sessionsMu.RLock() + defer m.sessionsMu.RUnlock() + _, ok := m.sessions["session-existing-1"] + return ok + }, 3*time.Second, 50*time.Millisecond, "existing target not auto-attached") + + m.sessionsMu.RLock() + info := m.sessions["session-existing-1"] + m.sessionsMu.RUnlock() + assert.Equal(t, "existing-1", info.targetID) +} + +// TestRedirectCounter verifies that redirect hops (same requestId, multiple +// requestWillBeSent) do not double-increment netPending, which would permanently +// block network_idle. +func TestRedirectCounter(t *testing.T) { + m, ec := newComputedMonitor(t) + navigateMonitor(m, "https://example.com") + + // First requestWillBeSent — genuine new request. + p1, _ := json.Marshal(map[string]any{ + "requestId": "r-redirect", + "resourceType": "Document", + "request": map[string]any{"method": "GET", "url": "https://example.com/old"}, + "initiator": map[string]any{"type": "other"}, + }) + m.handleNetworkRequest(p1, "s1") + + // Second requestWillBeSent with the same requestId — this is the redirect hop. + p2, _ := json.Marshal(map[string]any{ + "requestId": "r-redirect", + "resourceType": "Document", + "request": map[string]any{"method": "GET", "url": "https://example.com/new"}, + "initiator": map[string]any{"type": "other"}, + }) + m.handleNetworkRequest(p2, "s1") + + // Only one loadingFinished fires per redirect chain. + p3, _ := json.Marshal(map[string]any{"requestId": "r-redirect"}) + m.handleLoadingFinished(p3, "s1") + + // If netPending was double-incremented, network_idle would never fire. + ec.waitFor(t, "network_idle", 2*time.Second) +} + +// TestSubframeNavigationNoReset verifies that a frameNavigated event with a +// non-empty parentId does not reset computed state (netPending, timers, etc.). +func TestSubframeNavigationNoReset(t *testing.T) { + m, ec := newComputedMonitor(t) + navigateMonitor(m, "https://example.com") // top-level nav, sets mainSessionID + + // Start a request on the main frame. + simulateRequest(m, "main-req") + + // An iframe navigates — should not reset state or clear pendingRequests. + iframeNav, _ := json.Marshal(map[string]any{ + "frame": map[string]any{ + "id": "iframe-frame", + "parentId": "top-frame", + "url": "https://iframe.example.com", + }, + }) + m.handleFrameNavigated(iframeNav, "s1") + + // mainSessionID should still be "s1", not reset by the subframe nav. + assert.Equal(t, "s1", m.mainSessionID.Load(), "mainSessionID should not change on subframe nav") + + // Finishing the main request should still drive network_idle (state not reset). + simulateFinished(m, "main-req") + ec.waitFor(t, "network_idle", 2*time.Second) +} + +func TestSubframeLifecycleIgnored(t *testing.T) { + t.Run("subframe_dom_content_loaded_does_not_advance_state", func(t *testing.T) { + m, ec := newComputedMonitor(t) + navigateMonitor(m, "https://example.com") // sets mainSessionID = "s1" + + // Fire domContentLoaded from an iframe session, not the main frame. + m.handleDOMContentLoaded(json.RawMessage(`{}`), "iframe-session") + + // Now fire the real main-frame domContentLoaded + the rest of the conditions. + simulateRequest(m, "r1") + simulateFinished(m, "r1") + m.handleLoadEventFired(json.RawMessage(`{}`), "s1") + // navigation_settled requires navDOMLoaded; if the iframe event had set it, + // the event might fire without the main-frame DOMContentLoaded arriving. + // Assert it does NOT fire yet (iframe set navDOMLoaded but main frame hasn't). + ec.assertNone(t, "navigation_settled", 1500*time.Millisecond) + }) + + t.Run("subframe_load_event_does_not_start_layout_timer", func(t *testing.T) { + m, ec := newComputedMonitor(t) + navigateMonitor(m, "https://example.com") + + // Subframe fires loadEventFired — should not start the layout_settled timer. + m.handleLoadEventFired(json.RawMessage(`{}`), "iframe-session") + ec.assertNone(t, "layout_settled", 1500*time.Millisecond) + + // Main frame fires — timer should start now. + t0 := time.Now() + m.handleLoadEventFired(json.RawMessage(`{}`), "s1") + ec.waitFor(t, "layout_settled", 3*time.Second) + assert.GreaterOrEqual(t, time.Since(t0).Milliseconds(), int64(900), "fired too early") + }) +} diff --git a/server/lib/cdpmonitor/screenshot.go b/server/lib/cdpmonitor/screenshot.go index 4f1002a2..2a06d406 100644 --- a/server/lib/cdpmonitor/screenshot.go +++ b/server/lib/cdpmonitor/screenshot.go @@ -67,7 +67,7 @@ func (m *Monitor) captureScreenshot(parentCtx context.Context) { data, _ := json.Marshal(map[string]string{screenshotDataKey: encoded}) m.publish(events.Event{ - Ts: time.Now().UnixMilli(), + Ts: time.Now().UnixMicro(), Type: EventScreenshot, Category: events.CategorySystem, Source: events.Source{Kind: events.KindLocalProcess},