From 4e6fc1d967fb0563034c631577a1046a159e97be Mon Sep 17 00:00:00 2001 From: Yusuf Ozturk Date: Tue, 30 Jun 2026 23:53:48 +0200 Subject: [PATCH] Live data and RFC support --- internal/config/case.go | 10 +++++++++ internal/runner/runner.go | 45 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+) diff --git a/internal/config/case.go b/internal/config/case.go index 9affab5..293708d 100644 --- a/internal/config/case.go +++ b/internal/config/case.go @@ -1185,6 +1185,16 @@ type FleetConfig struct { // before-pre-process / director). LiveWhere string `yaml:"live_where"` LiveSource string `yaml:"live_source"` + // LiveMinFrameBytes, when > 0, asserts the largest live_data/console_log + // reply frame is at least this many bytes — i.e. a REAL data frame came back, + // not just the tiny "Capture Started/Completed" lifecycle markers. Used by the + // where:raw mis-frame case so a marker-only capture can't pass on count alone. + LiveMinFrameBytes int `yaml:"live_min_frame_bytes"` + // LiveSourceID targets a specific source for the live_data scenario: the + // device id when live_source=device, the route id when live_source=advanced-route + // (0 = "no specific route", which the director normalizes to a director-wide + // routing-boundary capture). Ignored for live_source=director. Defaults to 0. + LiveSourceID int `yaml:"live_source_id"` // SettleSeconds is how long to wait for a scenario's effect (default 45). SettleSeconds int `yaml:"settle_seconds"` // DeliverConfig, when set, is a file under the case's configs/ dir holding an diff --git a/internal/runner/runner.go b/internal/runner/runner.go index 4bc104c..7083078 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -3537,6 +3537,7 @@ type fleetStatus struct { Inbound map[string]struct { Count int `json:"count"` LastData string `json:"last_data"` + MaxLen int `json:"max_len"` } `json:"inbound"` // Stats are decoded from the director's forwarded VMF metric frames by // the simulator (see fleetsim/vmfstats.go), keyed by "." @@ -3600,6 +3601,16 @@ func (st *fleetStatus) lastData(id, key string) string { return d.Inbound[key].LastData } +// maxLen returns the largest frame (bytes) seen for the director's inbound key, +// or 0 if absent. Used to distinguish a real data frame from tiny lifecycle markers. +func (st *fleetStatus) maxLen(id, key string) int { + d, ok := st.Directors[id] + if !ok { + return 0 + } + return d.Inbound[key].MaxLen +} + // fleetSimStatus reads the simulator's observation snapshot via `docker exec wget`. // We use the busybox `wget` that ships in the simulator's alpine base rather than // curl, so the bench-fleetsim image needs no extra package (and carries no extra @@ -3664,6 +3675,25 @@ func fleetWaitCount(simContainer, dirID, key string, min int, deadline time.Time return last, false } +// fleetWaitMaxLen polls until the largest frame seen for inbound[key] is >= min +// bytes (or the deadline passes), returning the final max and whether it was +// reached. A capture's lifecycle markers ("Capture Started/Completed", tiny) +// arrive immediately on session start, while real captured data only flushes on +// the session's periodic (≈5s) ticker — so this must POLL, not sample once. +func fleetWaitMaxLen(simContainer, dirID, key string, min int, deadline time.Time) (int, bool) { + last := 0 + for time.Now().Before(deadline) { + if st, err := fleetSimStatus(simContainer); err == nil { + last = st.maxLen(dirID, key) + if last >= min { + return last, true + } + } + time.Sleep(3 * time.Second) + } + return last, false +} + // fleetWaitStats polls the simulator's DECODED stats until every expected // bucket/counter is reached, then requires an EXACT match — the counters are // deterministic for a fixed, replayed input, so an over-count is as much a @@ -4343,6 +4373,7 @@ func (r *Runner) runFleetAutomationCorrectness(tc *config.TestCase, subject conf "capture_line": 100, "where": fleetStr(fc.LiveWhere, "before-pre-process"), "source_type": fleetStr(fc.LiveSource, "director"), + "source_id": fc.LiveSourceID, } if fc.Scenario == "console_log" { cmd = "console_log" @@ -4362,6 +4393,20 @@ func (r *Runner) runFleetAutomationCorrectness(tc *config.TestCase, subject conf } else { finalCount = int64(c) fmt.Printf(" %s streamed %d frame(s) ✓\n", cmd, c) + + // Discriminating check: a capture that returns only lifecycle markers + // ("Capture Started/Completed", ~40 bytes) still produces frames, so a + // bare count >= 1 can pass even when NO real data was captured (e.g. + // mis-framed input that a normal where never sees). live_min_frame_bytes + // asserts at least one frame carried real captured data. Used by the + // where:raw mis-frame case. + if fc.LiveMinFrameBytes > 0 { + if maxLen, ok := fleetWaitMaxLen(simContainer, dirID, key, fc.LiveMinFrameBytes, scenarioDeadline()); !ok { + errs = append(errs, fmt.Sprintf("%s returned only small frames (max %d bytes < live_min_frame_bytes %d) — capture saw lifecycle markers, no real data", cmd, maxLen, fc.LiveMinFrameBytes)) + } else { + fmt.Printf(" %s largest frame %d bytes (>= %d) — real data captured ✓\n", cmd, maxLen, fc.LiveMinFrameBytes) + } + } } case "stats":