Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 46 additions & 10 deletions compliance/virtualmachines/roxagent/cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/stackrox/rox/compliance/node/index"
"github.com/stackrox/rox/compliance/virtualmachines/roxagent/discovery"
"github.com/stackrox/rox/compliance/virtualmachines/roxagent/vsockserver"
"github.com/stackrox/rox/compliance/virtualmachines/roxagent/watch"
v4 "github.com/stackrox/rox/generated/internalapi/scanner/v4"
v1 "github.com/stackrox/rox/generated/internalapi/virtualmachine/v1"
"github.com/stackrox/rox/pkg/httputil/proxy"
Expand All @@ -30,6 +31,13 @@ var agentVersion = "development" //XDef:STABLE_MAIN_VERSION

const mappingClientTimeout = 30 * time.Second

// scan_trigger fact values shared with Sensor's vmscraper/facts.go. See
// docs/superpowers/specs/2026-07-03-reactive-dnf-scanning-design.md.
const (
scanTriggerScheduled = "scheduled"
scanTriggerReactive = "reactive"
)

// ServeCmd returns the "serve" cobra subcommand for pull-mode operation.
func ServeCmd(ctx context.Context) *cobra.Command {
var (
Expand Down Expand Up @@ -64,7 +72,7 @@ func runServe(ctx context.Context, port uint32, hostPath, repoCPEURL string, res
if err != nil {
return fmt.Errorf("initial scan: %w", err)
}
cache.SetReport(report, discoverFacts(hostPath))
cache.SetReport(report, discoverFacts(hostPath, scanTriggerScheduled))
log.Infof("Initial scan complete, report cached. Num packages: %d", len(report.GetContents().GetPackages()))

handler := vsockserver.NewHandler(cache, agentVersion)
Expand Down Expand Up @@ -96,25 +104,52 @@ func runServe(ctx context.Context, port uint32, hostPath, repoCPEURL string, res

go srv.Serve(ctx, ln)

// Reactive scanning is best-effort: if the watcher can't start (e.g. no
// watchable RPM directory, inotify limit hit), roxagent logs a warning
// and falls back to periodic-only scanning rather than failing startup.
var triggeredCh <-chan struct{}
rpmWatcher, watchErr := watch.New(hostPath)
if watchErr != nil {
log.Warnf("Reactive DNF/RPM scanning disabled, falling back to periodic-only scanning: %v", watchErr)
} else {
defer func() { _ = rpmWatcher.Close() }()
triggeredCh = rpmWatcher.Triggered()
}

ticker := time.NewTicker(rescanEvery)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
log.Info("Starting periodic rescan")
r, err := scanWithDiagnostics(ctx, hostPath, repoCPEURL, httpClient)
if err != nil {
log.Errorf("Rescan failed: %v", err)
continue
}
cache.SetReport(r, discoverFacts(hostPath))
log.Infof("Rescan complete, report updated. Num packages: %d", len(r.GetContents().GetPackages()))
rescan(ctx, cache, hostPath, repoCPEURL, httpClient, scanTriggerScheduled)
ticker.Reset(rescanEvery)
case <-triggeredCh:
rescan(ctx, cache, hostPath, repoCPEURL, httpClient, scanTriggerReactive)
ticker.Reset(rescanEvery)
}
}
}

// rescan runs a fresh scan and, on success, publishes it to cache (which
// internally bumps the generation counter) tagged with the trigger that
// caused it. Both the periodic ticker and the reactive watcher call this
// same function — reactive and scheduled rescans are not separate code
// paths, only separate triggers into one loop. The periodic ticker is reset
// after any successful scan (reactive or scheduled) so a routine rescan
// doesn't immediately follow one that just refreshed the same data.
func rescan(ctx context.Context, cache *vsockserver.ReportCache, hostPath, repoCPEURL string, httpClient *http.Client, trigger string) {
log.Infof("Starting %s rescan", trigger)
r, err := scanWithDiagnostics(ctx, hostPath, repoCPEURL, httpClient)
if err != nil {
log.Errorf("%s rescan failed: %v", trigger, err)
return
}
cache.SetReport(r, discoverFacts(hostPath, trigger))
log.Infof("Rescan complete, report updated. Num packages: %d", len(r.GetContents().GetPackages()))
}

// scanWithDiagnostics runs the node indexer and surrounds it with filesystem
// and report diagnostics logging. This mirrors the diagnostics roxagent logs
// in push mode, so scan issues (e.g. "0 packages" or "0 repositories") can be
Expand Down Expand Up @@ -143,14 +178,15 @@ func scan(ctx context.Context, hostPath, repoCPEURL string, httpClient *http.Cli
return index.NewNodeIndexer(cfg).IndexNode(ctx)
}

func discoverFacts(hostPath string) map[string]string {
func discoverFacts(hostPath, trigger string) map[string]string {
d := discovery.DiscoverVMData(hostPath)
return map[string]string{
"detected_os": d.GetDetectedOs().String(),
"os_version": d.GetOsVersion(),
"activation_status": d.GetActivationStatus().String(),
"dnf_metadata_status": d.GetDnfMetadataStatus().String(),
"dnf_status": formatDnfStatusFlags(d.GetDnfStatus()),
"scan_trigger": trigger,
}
}

Expand Down
10 changes: 10 additions & 0 deletions compliance/virtualmachines/roxagent/cmd/serve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,13 @@ func TestFormatDnfStatusFlags(t *testing.T) {
})
}
}

func TestDiscoverFacts_IncludesScanTrigger(t *testing.T) {
hostPath := t.TempDir()

reactiveFacts := discoverFacts(hostPath, scanTriggerReactive)
assert.Equal(t, "reactive", reactiveFacts["scan_trigger"])

scheduledFacts := discoverFacts(hostPath, scanTriggerScheduled)
assert.Equal(t, "scheduled", scheduledFacts["scan_trigger"])
}
5 changes: 5 additions & 0 deletions compliance/virtualmachines/roxagent/watch/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package watch

import "github.com/stackrox/rox/pkg/logging"

var log = logging.LoggerForModule()
112 changes: 112 additions & 0 deletions compliance/virtualmachines/roxagent/watch/watch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Package watch detects changes to the local RPM package database so
// roxagent can rescan sooner than its periodic interval allows.
package watch

import (
"errors"
"fmt"

"github.com/fsnotify/fsnotify"
"github.com/stackrox/rox/compliance/virtualmachines/roxagent/internal/hostprobe"
)

// candidateRPMDirs are RPM database directories to watch for package-change
// signals, in preference order (modern layout first). We watch the
// directories, not a specific DB file: RPM transactions commit via
// create/rename/write of WAL or temp files rather than a single in-place
// write, so watching the containing directory reliably catches all of these
// (the fsnotify-recommended pattern for this kind of change detection).
var candidateRPMDirs = []string{
"/usr/lib/sysimage/rpm", // rpm >= 4.16, sqlite backend (RHEL 9+/Fedora)
"/var/lib/rpm", // older layout (RHEL 7/8), or a symlink to the above
}

// Watcher signals when the RPM package database changes on disk.
type Watcher struct {
fsw *fsnotify.Watcher
triggerCh chan struct{}
}

// New starts watching the first existing, watchable RPM database directory
// under hostPath. Returns an error if none of the candidate directories
// could be watched; callers should treat this as best-effort and fall back
// to periodic-only scanning rather than failing startup.
func New(hostPath string) (*Watcher, error) {
fsw, err := fsnotify.NewWatcher()
if err != nil {
return nil, fmt.Errorf("creating fsnotify watcher: %w", err)
}

dir, err := addFirstWatchableDir(fsw, hostPath, candidateRPMDirs)
if err != nil {
_ = fsw.Close()
return nil, err
}
log.Infof("Watching %q for reactive RPM/DNF change detection", dir)

w := &Watcher{
fsw: fsw,
triggerCh: make(chan struct{}, 1),
}
go w.run()
return w, nil
}

func addFirstWatchableDir(fsw *fsnotify.Watcher, hostPath string, dirs []string) (string, error) {
var errs []error
for _, dir := range dirs {
path := hostprobe.HostPathFor(hostPath, dir)
if err := fsw.Add(path); err != nil {
errs = append(errs, fmt.Errorf("%s: %w", path, err))
continue
}
return path, nil
}
return "", fmt.Errorf("no watchable RPM database directory found: %w", errors.Join(errs...))
}

func (w *Watcher) run() {
for {
select {
case event, ok := <-w.fsw.Events:
if !ok {
return
}
if !isRelevant(event) {
continue
}
// Non-blocking send: the first event in a burst queues a
// trigger; every subsequent event while it's still pending is
// dropped, collapsing a whole RPM transaction's writes into one
// pending rescan. This is trivial coalescing, not a debounce —
// see the design doc for why that's an accepted Phase 1
// limitation (a rescan can start mid-transaction; the next
// trigger picks up the final state).
select {
case w.triggerCh <- struct{}{}:
default:
}
case err, ok := <-w.fsw.Errors:
if !ok {
return
}
log.Warnf("RPM database watcher error: %v", err)
}
}
}

func isRelevant(event fsnotify.Event) bool {
return event.Has(fsnotify.Write) || event.Has(fsnotify.Create) || event.Has(fsnotify.Rename)
}

// Triggered returns a channel that receives a value shortly after a change
// is detected in the watched RPM database directory. Multiple changes before
// the channel is drained collapse into a single pending trigger.
func (w *Watcher) Triggered() <-chan struct{} {
return w.triggerCh
}

// Close stops the underlying fsnotify watcher.
func (w *Watcher) Close() error {
return w.fsw.Close()
}
121 changes: 121 additions & 0 deletions compliance/virtualmachines/roxagent/watch/watch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package watch

import (
"fmt"
"os"
"path/filepath"
"testing"
"time"

"github.com/fsnotify/fsnotify"
"github.com/stackrox/rox/compliance/virtualmachines/roxagent/internal/hostprobe"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

const eventWaitTimeout = 2 * time.Second

func TestNew_NoCandidateDirectory_ReturnsError(t *testing.T) {
hostPath := t.TempDir() // empty: neither candidate directory exists

w, err := New(hostPath)
require.Error(t, err)
require.Nil(t, w)
}

func TestNew_WatchesLegacyRPMDir(t *testing.T) {
hostPath := t.TempDir()
rpmDir := hostprobe.HostPathFor(hostPath, "/var/lib/rpm")
require.NoError(t, os.MkdirAll(rpmDir, 0o755))

w, err := New(hostPath)
require.NoError(t, err)
require.NotNil(t, w)
defer func() { _ = w.Close() }()

require.NoError(t, os.WriteFile(filepath.Join(rpmDir, "rpmdb.sqlite"), []byte("x"), 0o644))

select {
case <-w.Triggered():
case <-time.After(eventWaitTimeout):
t.Fatal("expected a trigger after writing to the watched directory")
}
}

func TestNew_PrefersModernRPMDirWhenBothExist(t *testing.T) {
hostPath := t.TempDir()
modernDir := hostprobe.HostPathFor(hostPath, "/usr/lib/sysimage/rpm")
legacyDir := hostprobe.HostPathFor(hostPath, "/var/lib/rpm")
require.NoError(t, os.MkdirAll(modernDir, 0o755))
require.NoError(t, os.MkdirAll(legacyDir, 0o755))

w, err := New(hostPath)
require.NoError(t, err)
require.NotNil(t, w)
defer func() { _ = w.Close() }()

// A write in the non-preferred legacy dir must NOT be seen, proving the
// modern (first candidate) directory is the one actually being watched.
require.NoError(t, os.WriteFile(filepath.Join(legacyDir, "ignored"), []byte("x"), 0o644))
select {
case <-w.Triggered():
t.Fatal("should not see events from the non-preferred legacy directory")
case <-time.After(200 * time.Millisecond):
}

require.NoError(t, os.WriteFile(filepath.Join(modernDir, "rpmdb.sqlite"), []byte("x"), 0o644))
select {
case <-w.Triggered():
case <-time.After(eventWaitTimeout):
t.Fatal("expected a trigger after writing to the preferred modern directory")
}
}

func TestWatcher_CoalescesBurstOfEvents(t *testing.T) {
hostPath := t.TempDir()
rpmDir := hostprobe.HostPathFor(hostPath, "/var/lib/rpm")
require.NoError(t, os.MkdirAll(rpmDir, 0o755))

w, err := New(hostPath)
require.NoError(t, err)
defer func() { _ = w.Close() }()

// Simulate a whole RPM transaction's worth of rapid writes.
for i := range 10 {
require.NoError(t, os.WriteFile(filepath.Join(rpmDir, fmt.Sprintf("f%d", i)), []byte("x"), 0o644))
}

select {
case <-w.Triggered():
case <-time.After(eventWaitTimeout):
t.Fatal("expected a trigger after the write burst")
}

// The burst must collapse into exactly one pending trigger: no second
// value should already be queued behind it.
select {
case <-w.Triggered():
t.Fatal("burst of events should collapse into a single trigger, not a backlog")
default:
}
}

func TestIsRelevant(t *testing.T) {
t.Parallel()
cases := map[string]struct {
op fsnotify.Op
expected bool
}{
"write is relevant": {op: fsnotify.Write, expected: true},
"create is relevant": {op: fsnotify.Create, expected: true},
"rename is relevant": {op: fsnotify.Rename, expected: true},
"chmod is not relevant": {op: fsnotify.Chmod, expected: false},
"remove is not relevant": {op: fsnotify.Remove, expected: false},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
t.Parallel()
assert.Equal(t, tc.expected, isRelevant(fsnotify.Event{Op: tc.op}))
})
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ require (
github.com/docker/distribution v2.8.3+incompatible
github.com/facebookincubator/nvdtools v0.1.5
github.com/fatih/color v1.19.0
github.com/fsnotify/fsnotify v1.10.1
github.com/georgysavva/scany/v2 v2.1.4
github.com/go-jose/go-jose/v4 v4.1.4
github.com/go-logr/logr v1.4.3
Expand Down Expand Up @@ -299,7 +300,6 @@ require (
github.com/fatih/camelcase v1.0.0 // indirect
github.com/fatih/structs v1.1.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.10.1 // indirect
github.com/fxamacker/cbor/v2 v2.9.0 // indirect
github.com/getsentry/sentry-go v0.34.0 // indirect
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32 // indirect
Expand Down
Loading
Loading