diff --git a/lib/forkvm/README.md b/lib/forkvm/README.md index 88474c72..2586b443 100644 --- a/lib/forkvm/README.md +++ b/lib/forkvm/README.md @@ -32,6 +32,12 @@ resume once the mailbox has been patched and the guest finishes the network handoff asynchronously. If the mailbox path is unavailable, restore falls back to the older host-initiated guest network reconfigure path. +The post-resume wait is usually not netlink time. On cold restores, sampled +Firecracker traces show the resumed vCPU faulting guest memory back in from the +snapshot backing file before the guest-agent can observe VMGenID, read the +mailbox, and send the network ack. In host traces this wait is labeled as +`fault_guest_memory_from_disk`. + ## Fork data copy behavior - Guest directory copy is **sparse-only** for regular files. diff --git a/lib/instances/fork_phase_breakdown_perf_test.go b/lib/instances/fork_phase_breakdown_perf_test.go new file mode 100644 index 00000000..7c03a15e --- /dev/null +++ b/lib/instances/fork_phase_breakdown_perf_test.go @@ -0,0 +1,125 @@ +//go:build linux + +package instances + +import ( + "context" + "fmt" + "os" + "sort" + "strconv" + "strings" + "testing" + "time" + + "github.com/kernel/hypeman/lib/hypervisor" + "github.com/kernel/hypeman/lib/images" + "github.com/kernel/hypeman/lib/paths" + snapshottest "github.com/kernel/hypeman/lib/snapshot/testsupport" + "github.com/kernel/hypeman/lib/system" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" +) + +const forkPhaseBreakdownPerfEnv = "HYPEMAN_RUN_FORK_PHASE_BREAKDOWN_PERF" +const forkPhaseBreakdownPerfItersEnv = "HYPEMAN_FORK_PHASE_BREAKDOWN_PERF_ITERS" + +func TestForkSnapshotPhaseBreakdownPerf(t *testing.T) { + if os.Getenv(forkPhaseBreakdownPerfEnv) != "1" { + t.Skipf("set %s=1 to run fork snapshot phase breakdown perf test", forkPhaseBreakdownPerfEnv) + } + requireFirecrackerIntegrationPrereqs(t) + + recorder := tracetest.NewSpanRecorder() + provider := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(recorder)) + previousProvider := otel.GetTracerProvider() + otel.SetTracerProvider(provider) + t.Cleanup(func() { + otel.SetTracerProvider(previousProvider) + _ = provider.Shutdown(context.Background()) + }) + + ctx := context.Background() + mgr, tmpDir := setupTestManagerForFirecracker(t) + p := paths.New(tmpDir) + imageManager, err := images.NewManager(p, 1, nil) + require.NoError(t, err) + imageName := integrationTestImageRef(t, "docker.io/library/alpine:latest") + snapshottest.EnsureImageReady(t, ctx, p, imageManager, imageName) + + systemManager := system.NewManager(p) + require.NoError(t, systemManager.EnsureSystemFiles(ctx)) + require.NoError(t, mgr.networkManager.Initialize(ctx, nil)) + + env := map[string]string{ + guestResumeNetworkMailboxEnv: "1", + guestResumeNetworkMailboxTokenEnv: fmt.Sprintf("phase-%d", time.Now().UnixNano()), + guestResumeNetworkDebugStagesEnv: "1", + } + source, err := mgr.CreateInstance(ctx, CreateInstanceRequest{ + Name: "fc-phase-src", + Image: imageName, + Size: 1024 * 1024 * 1024, + OverlaySize: 1024 * 1024 * 1024, + Vcpus: 1, + NetworkEnabled: true, + Hypervisor: hypervisor.TypeFirecracker, + Cmd: []string{"sleep", "infinity"}, + Env: env, + }) + require.NoError(t, err) + t.Cleanup(func() { _ = mgr.DeleteInstance(context.Background(), source.Id) }) + + source, err = waitForInstanceState(ctx, mgr, source.Id, StateRunning, integrationTestTimeout(45*time.Second)) + require.NoError(t, err) + require.NoError(t, waitForExecAgent(ctx, mgr, source.Id, 45*time.Second)) + + snapshot, err := mgr.CreateSnapshot(ctx, source.Id, CreateSnapshotRequest{ + Kind: SnapshotKindStandby, + Name: "fc-phase-snap", + }) + require.NoError(t, err) + t.Cleanup(func() { _ = mgr.DeleteSnapshot(context.Background(), snapshot.Id) }) + + waitForNetwork := true + iterations := forkPhaseBreakdownPerfIterations(t, 3) + for i := 1; i <= iterations; i++ { + beforeSpanCount := len(recorder.Ended()) + start := time.Now() + fork, err := mgr.ForkSnapshot(ctx, snapshot.Id, ForkSnapshotRequest{ + Name: fmt.Sprintf("fc-phase-%02d", i), + TargetState: StateRunning, + WaitForNetwork: &waitForNetwork, + }) + forkElapsed := time.Since(start) + require.NoError(t, err) + require.Equal(t, StateRunning, fork.State) + + spans := append([]sdktrace.ReadOnlySpan(nil), recorder.Ended()[beforeSpanCount:]...) + sort.Slice(spans, func(i, j int) bool { + return spans[i].StartTime().Before(spans[j].StartTime()) + }) + parts := make([]string, 0, len(spans)) + for _, span := range spans { + parts = append(parts, fmt.Sprintf("%s=%d", span.Name(), span.EndTime().Sub(span.StartTime()).Milliseconds())) + } + t.Logf("FORK_PHASE_BREAKDOWN iter=%d fork_total_ms=%d spans=%s", i, forkElapsed.Milliseconds(), strings.Join(parts, ",")) + + require.NoError(t, waitForExecAgent(ctx, mgr, fork.Id, 45*time.Second)) + _ = mgr.DeleteInstance(context.Background(), fork.Id) + } +} + +func forkPhaseBreakdownPerfIterations(t *testing.T, fallback int) int { + t.Helper() + raw := strings.TrimSpace(os.Getenv(forkPhaseBreakdownPerfItersEnv)) + if raw == "" { + return fallback + } + n, err := strconv.Atoi(raw) + require.NoError(t, err) + require.Positive(t, n) + return n +} diff --git a/lib/instances/guest_resume_network.go b/lib/instances/guest_resume_network.go index 354657b0..46488560 100644 --- a/lib/instances/guest_resume_network.go +++ b/lib/instances/guest_resume_network.go @@ -44,6 +44,13 @@ type guestResumeNetworkUDPAck struct { text string } +type guestResumeNetworkUDPWaitResult struct { + appliedElapsed time.Duration + appliedAck string + stageElapsed map[string]time.Duration + stageAck map[string]string +} + type guestResumeNetworkUDPWaiter struct { conn *stdnet.UDPConn ch chan guestResumeNetworkUDPAck @@ -117,25 +124,53 @@ func (w *guestResumeNetworkUDPWaiter) readLoop() { } } -func (w *guestResumeNetworkUDPWaiter) WaitApplied(ctx context.Context, mac, ip string) (time.Duration, string, error) { +func (w *guestResumeNetworkUDPWaiter) WaitApplied(ctx context.Context, mac, ip string) (guestResumeNetworkUDPWaitResult, error) { if w == nil { - return 0, "", fmt.Errorf("guest resume network UDP waiter is nil") + return guestResumeNetworkUDPWaitResult{}, fmt.Errorf("guest resume network UDP waiter is nil") } start := time.Now() wantMAC := "mac=" + strings.ToLower(mac) wantIP := "ip=" + ip + result := guestResumeNetworkUDPWaitResult{ + stageElapsed: make(map[string]time.Duration), + stageAck: make(map[string]string), + } for { select { case ack := <-w.ch: text := strings.ToLower(ack.text) - if strings.Contains(text, "stage=applied") && strings.Contains(text, wantMAC) && strings.Contains(text, wantIP) { - return ack.received.Sub(start), ack.text, nil + if !strings.Contains(text, wantMAC) || !strings.Contains(text, wantIP) { + continue + } + if stage, ok := guestResumeNetworkAckStage(text); ok { + if _, exists := result.stageElapsed[stage]; !exists { + result.stageElapsed[stage] = ack.received.Sub(start) + result.stageAck[stage] = ack.text + if deepTrace := restoreDeepTraceFromContext(ctx); deepTrace != nil { + deepTrace.Mark("guest_"+stage+"_received", ack.text) + deepTrace.Sample("guest_" + stage + "_received") + } + } + } + if strings.Contains(text, "stage=applied") { + result.appliedElapsed = ack.received.Sub(start) + result.appliedAck = ack.text + return result, nil } case <-ctx.Done(): - return 0, "", ctx.Err() + return guestResumeNetworkUDPWaitResult{}, ctx.Err() + } + } +} + +func guestResumeNetworkAckStage(text string) (string, bool) { + for _, field := range strings.Fields(text) { + if stage, ok := strings.CutPrefix(field, "stage="); ok && stage != "" { + return stage, true } } + return "", false } func (m *manager) waitForGuestResumeNetworkUDPAck(ctx context.Context, waiter *guestResumeNetworkUDPWaiter, stored *StoredMetadata, cfg *guestNetworkConfig) error { @@ -144,20 +179,22 @@ func (m *manager) waitForGuestResumeNetworkUDPAck(ctx context.Context, waiter *g } log := logger.FromContext(ctx) - waitCtx, waitSpanEnd := m.startLifecycleStep(ctx, "guest.resume_network.udp_ack_wait", + waitCtx, waitSpanEnd := m.startLifecycleStep(ctx, "guest.resume_network.fault_guest_memory_from_disk", attribute.String("instance_id", stored.Id), attribute.String("hypervisor", string(stored.HypervisorType)), - attribute.String("operation", "guest_resume_network_udp_ack_wait"), + attribute.String("operation", "guest_resume_network_fault_guest_memory_from_disk"), + attribute.String("wait_for", "guest_network_applied_ack"), + attribute.String("observed_dominant_wait", "fault_guest_memory_from_disk"), ) waitCtx, cancel := context.WithTimeout(waitCtx, 2*time.Second) defer cancel() - elapsed, ack, err := waiter.WaitApplied(waitCtx, cfg.mac, cfg.ip) + result, err := waiter.WaitApplied(waitCtx, cfg.mac, cfg.ip) waitSpanEnd(err) if err != nil { return err } - log.InfoContext(ctx, "guest resume network UDP ack received", "instance_id", stored.Id, "elapsed", elapsed, "ack", ack) + log.InfoContext(ctx, "guest resume network UDP ack received", "instance_id", stored.Id, "elapsed", result.appliedElapsed, "ack", result.appliedAck, "stages", result.stageElapsed) return nil } diff --git a/lib/instances/restore.go b/lib/instances/restore.go index 714f319e..17e4726d 100644 --- a/lib/instances/restore.go +++ b/lib/instances/restore.go @@ -293,6 +293,14 @@ func (m *manager) restoreInstance( // Store the PID for later cleanup stored.HypervisorPID = &pid + deepTrace, traceErr := newRestoreDeepTrace(ctx, stored, pid, snapshotDir) + if traceErr != nil { + log.WarnContext(ctx, "failed to start restore deep trace", "instance_id", id, "pid", pid, "error", traceErr) + } else if deepTrace != nil { + defer func() { deepTrace.Close("restore_done", retErr) }() + ctx = withRestoreDeepTrace(ctx, deepTrace) + } + // 6. Transition: Paused → Running (resume) resumeCtx, resumeSpanEnd := m.startLifecycleStep(ctx, "resume_vm", attribute.String("instance_id", id), @@ -300,7 +308,15 @@ func (m *manager) restoreInstance( attribute.String("operation", "resume_vm"), ) log.InfoContext(ctx, "resuming VM", "instance_id", id) + if deepTrace != nil { + deepTrace.Mark("resume_call_start", "") + deepTrace.Sample("resume_call_start") + } if err := hv.Resume(resumeCtx); err != nil { + if deepTrace != nil { + deepTrace.Mark("resume_error", err.Error()) + deepTrace.Sample("resume_error") + } resumeSpanEnd(err) log.ErrorContext(ctx, "failed to resume VM", "instance_id", id, "error", err) // Cleanup on failure @@ -308,6 +324,10 @@ func (m *manager) restoreInstance( releaseNetwork() return nil, fmt.Errorf("resume vm failed: %w", err) } + if deepTrace != nil { + deepTrace.Mark("resume_returned", "") + deepTrace.Sample("resume_returned") + } resumeSpanEnd(nil) // Mark the instance visible before releasing its pending reservation so we // never create an undercount window. The tiny overlap is intentionally @@ -323,19 +343,45 @@ func (m *manager) restoreInstance( // still has the source VM's old IP configuration. Reconfigure guest networking after // resume so host ingress to the new private IP works reliably. if allocatedNet != nil && !stored.SkipGuestAgent { - reconfigureCtx, reconfigureSpanEnd := m.startLifecycleStep(ctx, "reconfigure_guest_network", + guestNetworkStep := "reconfigure_guest_network" + guestNetworkOperation := "reconfigure_guest_network" + guestNetworkAttrs := []attribute.KeyValue{ attribute.String("instance_id", id), attribute.String("hypervisor", string(stored.HypervisorType)), - attribute.String("operation", "reconfigure_guest_network"), - ) + } + if resumeNetworkMailboxPatched && waitForGuestNetwork { + guestNetworkStep = "fault_guest_memory_from_disk" + guestNetworkOperation = "fault_guest_memory_from_disk" + guestNetworkAttrs = append(guestNetworkAttrs, + attribute.String("wait_for", "guest_network_applied_ack"), + attribute.String("observed_dominant_wait", "fault_guest_memory_from_disk"), + ) + } else if resumeNetworkMailboxPatched { + guestNetworkStep = "guest_network_mailbox_handoff" + guestNetworkOperation = "guest_network_mailbox_handoff" + } + guestNetworkAttrs = append(guestNetworkAttrs, attribute.String("operation", guestNetworkOperation)) + reconfigureCtx, reconfigureSpanEnd := m.startLifecycleStep(ctx, guestNetworkStep, guestNetworkAttrs...) var reconfigureErr error if resumeNetworkMailboxPatched && waitForGuestNetwork { + if deepTrace != nil { + deepTrace.Mark("wait_guest_network_start", "") + deepTrace.Sample("wait_guest_network_start") + } reconfigureErr = m.waitForGuestResumeNetworkUDPAck(reconfigureCtx, resumeNetworkAckWaiter, stored, resumeNetworkAckCfg) } else if resumeNetworkMailboxPatched { log.InfoContext(ctx, "guest resume network mailbox patched", "instance_id", id) } else { reconfigureErr = reconfigureGuestNetwork(reconfigureCtx, stored, allocatedNet) } + if deepTrace != nil { + stage := "reconfigure_guest_network_done" + if reconfigureErr != nil { + stage = "reconfigure_guest_network_error" + } + deepTrace.Mark(stage, "") + deepTrace.Sample(stage) + } reconfigureSpanEnd(reconfigureErr) if reconfigureErr != nil { log.ErrorContext(ctx, "failed to configure guest network after restore", "instance_id", id, "error", reconfigureErr) diff --git a/lib/instances/restore_deep_trace_linux.go b/lib/instances/restore_deep_trace_linux.go new file mode 100644 index 00000000..53dd29aa --- /dev/null +++ b/lib/instances/restore_deep_trace_linux.go @@ -0,0 +1,524 @@ +//go:build linux + +package instances + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + "time" + + "github.com/kernel/hypeman/lib/logger" +) + +const restoreDeepTraceEnv = "HYPEMAN_RESTORE_DEEP_TRACE" +const restoreDeepTraceDirEnv = "HYPEMAN_RESTORE_DEEP_TRACE_DIR" +const restoreDeepTraceEventsEnv = "HYPEMAN_RESTORE_DEEP_TRACE_EVENTS" + +type restoreDeepTraceContextKey struct{} + +type restoreDeepTrace struct { + instanceID string + pid int + dir string + traceRoot string + traceMarker *os.File + previousTracingOn string + enabledEvents []string + missingEvents []string + marks []restoreDeepTraceMark + samples []restoreDeepTraceSample + log *slog.Logger +} + +type restoreDeepTraceMark struct { + Stage string `json:"stage"` + Time time.Time `json:"time"` + Detail string `json:"detail,omitempty"` +} + +type restoreDeepTraceSample struct { + Stage string `json:"stage"` + Time time.Time `json:"time"` + Process restoreDeepTraceThread `json:"process"` + IO map[string]int64 `json:"io,omitempty"` + Threads []restoreDeepTraceThread `json:"threads"` + ReadError string `json:"read_error,omitempty"` +} + +type restoreDeepTraceThread struct { + TID int `json:"tid"` + Comm string `json:"comm,omitempty"` + State string `json:"state,omitempty"` + MinFaults int64 `json:"min_faults"` + MajFaults int64 `json:"maj_faults"` + UTimeTicks int64 `json:"utime_ticks"` + STimeTicks int64 `json:"stime_ticks"` + Threads int64 `json:"threads,omitempty"` + Processor int64 `json:"processor,omitempty"` +} + +type restoreDeepTraceSummary struct { + InstanceID string `json:"instance_id"` + PID int `json:"pid"` + TraceRoot string `json:"trace_root"` + TracePath string `json:"trace_path"` + EnabledEvents []string `json:"enabled_events"` + MissingEvents []string `json:"missing_events,omitempty"` + Marks []restoreDeepTraceMark `json:"marks"` + Samples []restoreDeepTraceSample `json:"samples"` + ProcessDeltas map[string]restoreDeepTraceThreadDelta `json:"process_deltas,omitempty"` + IODeltas map[string]map[string]int64 `json:"io_deltas,omitempty"` + ThreadDeltaSummary map[string]restoreDeepTraceThreadDelta `json:"thread_delta_summary,omitempty"` +} + +type restoreDeepTraceThreadDelta struct { + MinFaults int64 `json:"min_faults"` + MajFaults int64 `json:"maj_faults"` + UTimeTicks int64 `json:"utime_ticks"` + STimeTicks int64 `json:"stime_ticks"` +} + +func newRestoreDeepTrace(ctx context.Context, stored *StoredMetadata, pid int, snapshotDir string) (*restoreDeepTrace, error) { + if strings.TrimSpace(os.Getenv(restoreDeepTraceEnv)) != "1" { + return nil, nil + } + if stored == nil || pid <= 0 { + return nil, nil + } + + traceRoot, err := findRestoreDeepTraceRoot() + if err != nil { + return nil, err + } + baseDir := strings.TrimSpace(os.Getenv(restoreDeepTraceDirEnv)) + if baseDir == "" { + baseDir = filepath.Join(os.TempDir(), "hypeman-restore-debug") + } + dir := filepath.Join(baseDir, fmt.Sprintf("%s-%d", stored.Id, time.Now().UnixNano())) + if err := os.MkdirAll(dir, 0755); err != nil { + return nil, fmt.Errorf("create restore deep trace dir: %w", err) + } + + t := &restoreDeepTrace{ + instanceID: stored.Id, + pid: pid, + dir: dir, + traceRoot: traceRoot, + log: logger.FromContext(ctx), + } + meta := map[string]any{ + "instance_id": stored.Id, + "pid": pid, + "snapshot_dir": snapshotDir, + "started_at": time.Now().UTC().Format(time.RFC3339Nano), + } + t.writeJSON("metadata.json", meta) + + if err := t.configureTracing(); err != nil { + t.Close("configure_error", err) + return nil, err + } + t.Mark("trace_start", "") + t.Sample("trace_start") + t.log.InfoContext(ctx, "restore deep trace started", "instance_id", stored.Id, "pid", pid, "dir", dir, "events", t.enabledEvents, "missing_events", t.missingEvents) + return t, nil +} + +func withRestoreDeepTrace(ctx context.Context, t *restoreDeepTrace) context.Context { + if t == nil { + return ctx + } + return context.WithValue(ctx, restoreDeepTraceContextKey{}, t) +} + +func restoreDeepTraceFromContext(ctx context.Context) *restoreDeepTrace { + t, _ := ctx.Value(restoreDeepTraceContextKey{}).(*restoreDeepTrace) + return t +} + +func (t *restoreDeepTrace) Mark(stage, detail string) { + if t == nil { + return + } + mark := restoreDeepTraceMark{ + Stage: stage, + Time: time.Now().UTC(), + Detail: detail, + } + t.marks = append(t.marks, mark) + if t.traceMarker != nil { + _, _ = fmt.Fprintf(t.traceMarker, "hypeman_%s instance=%s pid=%d %s\n", stage, t.instanceID, t.pid, detail) + } +} + +func (t *restoreDeepTrace) Sample(stage string) { + if t == nil || t.pid <= 0 { + return + } + sample := readRestoreDeepTraceSample(stage, t.pid) + t.samples = append(t.samples, sample) + t.writeJSON(fmt.Sprintf("proc_%02d_%s.json", len(t.samples), sanitizeDeepTraceName(stage)), sample) +} + +func (t *restoreDeepTrace) Close(stage string, err error) { + if t == nil { + return + } + detail := "" + if err != nil { + detail = "error=" + sanitizeTraceMarkerValue(err.Error()) + } + t.Mark(stage, detail) + t.Sample(stage) + + _ = writeTracingFile(t.traceRoot, "tracing_on", "0") + tracePath := filepath.Join(t.dir, "trace.txt") + if data, readErr := os.ReadFile(filepath.Join(t.traceRoot, "trace")); readErr == nil { + _ = os.WriteFile(tracePath, data, 0644) + } + + if t.traceMarker != nil { + _ = t.traceMarker.Close() + t.traceMarker = nil + } + for _, event := range t.enabledEvents { + _ = writeEventEnable(t.traceRoot, event, false) + } + if t.previousTracingOn != "" { + _ = writeTracingFile(t.traceRoot, "tracing_on", strings.TrimSpace(t.previousTracingOn)) + } + + summary := restoreDeepTraceSummary{ + InstanceID: t.instanceID, + PID: t.pid, + TraceRoot: t.traceRoot, + TracePath: tracePath, + EnabledEvents: t.enabledEvents, + MissingEvents: t.missingEvents, + Marks: t.marks, + Samples: t.samples, + ProcessDeltas: t.processDeltas(), + IODeltas: t.ioDeltas(), + ThreadDeltaSummary: t.threadDeltaSummary(), + } + t.writeJSON("summary.json", summary) + if t.log != nil { + t.log.Info("restore deep trace finished", "instance_id", t.instanceID, "pid", t.pid, "dir", t.dir, "error", err) + } +} + +func (t *restoreDeepTrace) Dir() string { + if t == nil { + return "" + } + return t.dir +} + +func (t *restoreDeepTrace) configureTracing() error { + if data, err := os.ReadFile(filepath.Join(t.traceRoot, "tracing_on")); err == nil { + t.previousTracingOn = string(data) + } + _ = writeTracingFile(t.traceRoot, "tracing_on", "0") + _ = writeTracingFile(t.traceRoot, "trace", "0") + _ = writeTracingFile(t.traceRoot, "buffer_size_kb", "16384") + + available, err := readAvailableTraceEvents(t.traceRoot) + if err != nil { + return err + } + for _, event := range restoreDeepTraceRequestedEvents() { + if _, ok := available[event]; !ok { + t.missingEvents = append(t.missingEvents, event) + continue + } + if err := writeEventEnable(t.traceRoot, event, true); err != nil { + t.missingEvents = append(t.missingEvents, event+"("+err.Error()+")") + continue + } + t.enabledEvents = append(t.enabledEvents, event) + } + if len(t.enabledEvents) == 0 { + return fmt.Errorf("no requested ftrace events could be enabled") + } + + marker, err := os.OpenFile(filepath.Join(t.traceRoot, "trace_marker"), os.O_WRONLY|os.O_APPEND, 0) + if err == nil { + t.traceMarker = marker + } + return writeTracingFile(t.traceRoot, "tracing_on", "1") +} + +func findRestoreDeepTraceRoot() (string, error) { + for _, path := range []string{"/sys/kernel/tracing", "/sys/kernel/debug/tracing"} { + if info, err := os.Stat(filepath.Join(path, "available_events")); err == nil && !info.IsDir() { + return path, nil + } + } + return "", fmt.Errorf("kernel tracing filesystem is not available") +} + +func restoreDeepTraceRequestedEvents() []string { + if raw := strings.TrimSpace(os.Getenv(restoreDeepTraceEventsEnv)); raw != "" { + parts := strings.Split(raw, ",") + out := make([]string, 0, len(parts)) + for _, part := range parts { + if event := strings.TrimSpace(part); event != "" { + out = append(out, event) + } + } + return out + } + return []string{ + "sched:sched_switch", + "sched:sched_wakeup", + "sched:sched_wakeup_new", + "kvm:kvm_entry", + "kvm:kvm_exit", + "kvm:kvm_page_fault", + "exceptions:page_fault_user", + "exceptions:page_fault_kernel", + "block:block_rq_issue", + "block:block_rq_complete", + "filemap:mm_filemap_add_to_page_cache", + "filemap:mm_filemap_delete_from_page_cache", + "filemap:filemap_add_to_page_cache", + "filemap:filemap_delete_from_page_cache", + "writeback:writeback_dirty_page", + } +} + +func readAvailableTraceEvents(traceRoot string) (map[string]struct{}, error) { + data, err := os.ReadFile(filepath.Join(traceRoot, "available_events")) + if err != nil { + return nil, fmt.Errorf("read available ftrace events: %w", err) + } + out := make(map[string]struct{}) + for _, line := range strings.Split(string(data), "\n") { + line = strings.TrimSpace(line) + if line != "" { + out[line] = struct{}{} + } + } + return out, nil +} + +func writeEventEnable(traceRoot, event string, enabled bool) error { + system, name, ok := strings.Cut(event, ":") + if !ok || system == "" || name == "" { + return fmt.Errorf("invalid ftrace event %q", event) + } + value := "0" + if enabled { + value = "1" + } + return writeTracingFile(traceRoot, filepath.Join("events", system, name, "enable"), value) +} + +func writeTracingFile(traceRoot, rel, value string) error { + return os.WriteFile(filepath.Join(traceRoot, rel), []byte(value), 0644) +} + +func readRestoreDeepTraceSample(stage string, pid int) restoreDeepTraceSample { + sample := restoreDeepTraceSample{ + Stage: stage, + Time: time.Now().UTC(), + } + process, err := readRestoreDeepTraceThread(filepath.Join("/proc", strconv.Itoa(pid), "stat")) + if err != nil { + sample.ReadError = err.Error() + return sample + } + sample.Process = process + sample.IO = readRestoreDeepTraceIO(pid) + sample.Threads = readRestoreDeepTraceThreads(pid) + return sample +} + +func readRestoreDeepTraceThreads(pid int) []restoreDeepTraceThread { + taskDir := filepath.Join("/proc", strconv.Itoa(pid), "task") + entries, err := os.ReadDir(taskDir) + if err != nil { + return nil + } + threads := make([]restoreDeepTraceThread, 0, len(entries)) + for _, entry := range entries { + if !entry.IsDir() { + continue + } + thread, err := readRestoreDeepTraceThread(filepath.Join(taskDir, entry.Name(), "stat")) + if err == nil { + threads = append(threads, thread) + } + } + sort.Slice(threads, func(i, j int) bool { + return threads[i].TID < threads[j].TID + }) + return threads +} + +func readRestoreDeepTraceThread(path string) (restoreDeepTraceThread, error) { + data, err := os.ReadFile(path) + if err != nil { + return restoreDeepTraceThread{}, err + } + raw := strings.TrimSpace(string(data)) + closeIdx := strings.LastIndex(raw, ")") + openIdx := strings.Index(raw, "(") + if openIdx < 0 || closeIdx < openIdx { + return restoreDeepTraceThread{}, fmt.Errorf("invalid proc stat format for %s", path) + } + tid, _ := strconv.Atoi(strings.TrimSpace(raw[:openIdx])) + comm := raw[openIdx+1 : closeIdx] + fields := strings.Fields(strings.TrimSpace(raw[closeIdx+1:])) + if len(fields) < 37 { + return restoreDeepTraceThread{}, fmt.Errorf("short proc stat for %s", path) + } + return restoreDeepTraceThread{ + TID: tid, + Comm: comm, + State: fields[0], + MinFaults: parseInt64Field(fields, 7), + MajFaults: parseInt64Field(fields, 9), + UTimeTicks: parseInt64Field(fields, 11), + STimeTicks: parseInt64Field(fields, 12), + Threads: parseInt64Field(fields, 17), + Processor: parseInt64Field(fields, 36), + }, nil +} + +func readRestoreDeepTraceIO(pid int) map[string]int64 { + data, err := os.ReadFile(filepath.Join("/proc", strconv.Itoa(pid), "io")) + if err != nil { + return nil + } + out := make(map[string]int64) + for _, line := range strings.Split(string(data), "\n") { + key, value, ok := strings.Cut(line, ":") + if !ok { + continue + } + n, err := strconv.ParseInt(strings.TrimSpace(value), 10, 64) + if err == nil { + out[strings.TrimSpace(key)] = n + } + } + return out +} + +func parseInt64Field(fields []string, idx int) int64 { + if idx < 0 || idx >= len(fields) { + return 0 + } + n, _ := strconv.ParseInt(fields[idx], 10, 64) + return n +} + +func (t *restoreDeepTrace) processDeltas() map[string]restoreDeepTraceThreadDelta { + if t == nil || len(t.samples) < 2 { + return nil + } + first := t.samples[0].Process + out := make(map[string]restoreDeepTraceThreadDelta) + for _, sample := range t.samples[1:] { + out[sample.Stage] = deltaRestoreDeepTraceThread(first, sample.Process) + } + return out +} + +func (t *restoreDeepTrace) ioDeltas() map[string]map[string]int64 { + if t == nil || len(t.samples) < 2 || len(t.samples[0].IO) == 0 { + return nil + } + first := t.samples[0].IO + out := make(map[string]map[string]int64) + for _, sample := range t.samples[1:] { + if len(sample.IO) == 0 { + continue + } + delta := make(map[string]int64) + for key, value := range sample.IO { + delta[key] = value - first[key] + } + out[sample.Stage] = delta + } + return out +} + +func (t *restoreDeepTrace) threadDeltaSummary() map[string]restoreDeepTraceThreadDelta { + if t == nil || len(t.samples) < 2 { + return nil + } + first := threadsByID(t.samples[0].Threads) + out := make(map[string]restoreDeepTraceThreadDelta) + for _, sample := range t.samples[1:] { + var sum restoreDeepTraceThreadDelta + for _, thread := range sample.Threads { + base, ok := first[thread.TID] + if !ok { + continue + } + delta := deltaRestoreDeepTraceThread(base, thread) + sum.MinFaults += delta.MinFaults + sum.MajFaults += delta.MajFaults + sum.UTimeTicks += delta.UTimeTicks + sum.STimeTicks += delta.STimeTicks + } + out[sample.Stage] = sum + } + return out +} + +func threadsByID(threads []restoreDeepTraceThread) map[int]restoreDeepTraceThread { + out := make(map[int]restoreDeepTraceThread, len(threads)) + for _, thread := range threads { + out[thread.TID] = thread + } + return out +} + +func deltaRestoreDeepTraceThread(start, end restoreDeepTraceThread) restoreDeepTraceThreadDelta { + return restoreDeepTraceThreadDelta{ + MinFaults: end.MinFaults - start.MinFaults, + MajFaults: end.MajFaults - start.MajFaults, + UTimeTicks: end.UTimeTicks - start.UTimeTicks, + STimeTicks: end.STimeTicks - start.STimeTicks, + } +} + +func (t *restoreDeepTrace) writeJSON(name string, value any) { + if t == nil || t.dir == "" { + return + } + data, err := json.MarshalIndent(value, "", " ") + if err != nil { + return + } + _ = os.WriteFile(filepath.Join(t.dir, name), append(data, '\n'), 0644) +} + +func sanitizeDeepTraceName(name string) string { + name = strings.ToLower(strings.TrimSpace(name)) + name = strings.Map(func(r rune) rune { + if (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') || r == '_' || r == '-' { + return r + } + return '_' + }, name) + if name == "" { + return "sample" + } + return name +} + +func sanitizeTraceMarkerValue(value string) string { + value = strings.ReplaceAll(value, "\n", " ") + value = strings.ReplaceAll(value, "\r", " ") + return value +} diff --git a/lib/instances/restore_deep_trace_other.go b/lib/instances/restore_deep_trace_other.go new file mode 100644 index 00000000..d9af8477 --- /dev/null +++ b/lib/instances/restore_deep_trace_other.go @@ -0,0 +1,26 @@ +//go:build !linux + +package instances + +import ( + "context" +) + +type restoreDeepTrace struct{} + +func newRestoreDeepTrace(context.Context, *StoredMetadata, int, string) (*restoreDeepTrace, error) { + return nil, nil +} + +func withRestoreDeepTrace(ctx context.Context, _ *restoreDeepTrace) context.Context { + return ctx +} + +func restoreDeepTraceFromContext(context.Context) *restoreDeepTrace { + return nil +} + +func (t *restoreDeepTrace) Mark(string, string) {} +func (t *restoreDeepTrace) Sample(string) {} +func (t *restoreDeepTrace) Close(string, error) {} +func (t *restoreDeepTrace) Dir() string { return "" } diff --git a/lib/instances/restore_deep_trace_perf_test.go b/lib/instances/restore_deep_trace_perf_test.go new file mode 100644 index 00000000..f1f993b3 --- /dev/null +++ b/lib/instances/restore_deep_trace_perf_test.go @@ -0,0 +1,200 @@ +//go:build linux + +package instances + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strconv" + "strings" + "testing" + "time" + + "github.com/kernel/hypeman/lib/hypervisor" + "github.com/kernel/hypeman/lib/images" + "github.com/kernel/hypeman/lib/paths" + snapshottest "github.com/kernel/hypeman/lib/snapshot/testsupport" + "github.com/kernel/hypeman/lib/system" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" +) + +const restoreDeepTracePerfEnv = "HYPEMAN_RUN_RESTORE_DEEP_TRACE_PERF" +const restoreDeepTracePerfItersEnv = "HYPEMAN_RESTORE_DEEP_TRACE_PERF_ITERS" +const guestResumeNetworkDebugStagesEnv = "HYPEMAN_RESUME_NETWORK_DEBUG_STAGES" + +func TestRestoreDeepTracePerf(t *testing.T) { + if os.Getenv(restoreDeepTracePerfEnv) != "1" { + t.Skipf("set %s=1 to run restore deep trace perf test", restoreDeepTracePerfEnv) + } + requireFirecrackerIntegrationPrereqs(t) + + recorder := tracetest.NewSpanRecorder() + provider := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(recorder)) + previousProvider := otel.GetTracerProvider() + otel.SetTracerProvider(provider) + t.Cleanup(func() { + otel.SetTracerProvider(previousProvider) + _ = provider.Shutdown(context.Background()) + }) + + ctx := context.Background() + mgr, tmpDir := setupTestManagerForFirecracker(t) + traceDir := strings.TrimSpace(os.Getenv(restoreDeepTraceDirEnv)) + if traceDir == "" { + traceDir = filepath.Join(os.TempDir(), "hypeman-restore-debug-test") + t.Setenv(restoreDeepTraceDirEnv, traceDir) + } + require.NoError(t, os.RemoveAll(traceDir)) + + p := paths.New(tmpDir) + imageManager, err := images.NewManager(p, 1, nil) + require.NoError(t, err) + imageName := integrationTestImageRef(t, "docker.io/library/alpine:latest") + snapshottest.EnsureImageReady(t, ctx, p, imageManager, imageName) + + systemManager := system.NewManager(p) + require.NoError(t, systemManager.EnsureSystemFiles(ctx)) + require.NoError(t, mgr.networkManager.Initialize(ctx, nil)) + + env := map[string]string{ + guestResumeNetworkMailboxEnv: "1", + guestResumeNetworkMailboxTokenEnv: fmt.Sprintf("deep-%d", time.Now().UnixNano()), + guestResumeNetworkDebugStagesEnv: "1", + } + source, err := mgr.CreateInstance(ctx, CreateInstanceRequest{ + Name: "fc-deep-trace-src", + Image: imageName, + Size: 1024 * 1024 * 1024, + OverlaySize: 1024 * 1024 * 1024, + Vcpus: 1, + NetworkEnabled: true, + Hypervisor: hypervisor.TypeFirecracker, + Cmd: []string{"sleep", "infinity"}, + Env: env, + }) + require.NoError(t, err) + sourceID := source.Id + t.Cleanup(func() { _ = mgr.DeleteInstance(context.Background(), sourceID) }) + + source, err = waitForInstanceState(ctx, mgr, sourceID, StateRunning, integrationTestTimeout(45*time.Second)) + require.NoError(t, err) + require.NoError(t, waitForExecAgent(ctx, mgr, sourceID, 45*time.Second)) + + snapshot, err := mgr.CreateSnapshot(ctx, sourceID, CreateSnapshotRequest{ + Kind: SnapshotKindStandby, + Name: "fc-deep-trace-snap", + }) + require.NoError(t, err) + t.Cleanup(func() { _ = mgr.DeleteSnapshot(context.Background(), snapshot.Id) }) + + t.Setenv(restoreDeepTraceEnv, "1") + waitForNetwork := true + iterations := restoreDeepTracePerfIterations(t, 3) + for i := 1; i <= iterations; i++ { + beforeSpanCount := len(recorder.Ended()) + beforeTraceDirs := currentRestoreDeepTraceDirs(t, traceDir) + start := time.Now() + fork, err := mgr.ForkSnapshot(ctx, snapshot.Id, ForkSnapshotRequest{ + Name: fmt.Sprintf("fc-deep-trace-%02d", i), + TargetState: StateRunning, + WaitForNetwork: &waitForNetwork, + }) + forkElapsed := time.Since(start) + require.NoError(t, err) + require.Equal(t, StateRunning, fork.State) + + spans := append([]sdktrace.ReadOnlySpan(nil), recorder.Ended()[beforeSpanCount:]...) + tracePath := newestRestoreDeepTraceDir(t, traceDir, beforeTraceDirs) + t.Log(formatRestoreDeepTracePerfLine(i, forkElapsed, spans, tracePath)) + + require.NoError(t, waitForExecAgent(ctx, mgr, fork.Id, 45*time.Second)) + _ = mgr.DeleteInstance(context.Background(), fork.Id) + } +} + +func restoreDeepTracePerfIterations(t *testing.T, fallback int) int { + t.Helper() + raw := strings.TrimSpace(os.Getenv(restoreDeepTracePerfItersEnv)) + if raw == "" { + return fallback + } + n, err := strconv.Atoi(raw) + require.NoError(t, err) + require.Positive(t, n) + return n +} + +func currentRestoreDeepTraceDirs(t *testing.T, traceDir string) map[string]struct{} { + t.Helper() + out := make(map[string]struct{}) + entries, err := os.ReadDir(traceDir) + if err != nil { + if os.IsNotExist(err) { + return out + } + require.NoError(t, err) + } + for _, entry := range entries { + if entry.IsDir() { + out[entry.Name()] = struct{}{} + } + } + return out +} + +func newestRestoreDeepTraceDir(t *testing.T, traceDir string, before map[string]struct{}) string { + t.Helper() + entries, err := os.ReadDir(traceDir) + require.NoError(t, err) + var newest string + var newestMod time.Time + for _, entry := range entries { + if !entry.IsDir() { + continue + } + if _, exists := before[entry.Name()]; exists { + continue + } + info, err := entry.Info() + require.NoError(t, err) + if newest == "" || info.ModTime().After(newestMod) { + newest = filepath.Join(traceDir, entry.Name()) + newestMod = info.ModTime() + } + } + return newest +} + +func formatRestoreDeepTracePerfLine(iter int, forkElapsed time.Duration, spans []sdktrace.ReadOnlySpan, tracePath string) string { + return fmt.Sprintf( + "DEEP_TRACE iter=%d fork_total_ms=%d restore_from_snapshot_ms=%d resume_vm_ms=%d fault_guest_memory_from_disk_ms=%d guest_resume_network_fault_guest_memory_from_disk_ms=%d trace_dir=%s", + iter, + forkElapsed.Milliseconds(), + restoreDeepTraceSpanDurationMS(restoreDeepTraceLastSpanNamed(spans, "restore_from_snapshot")), + restoreDeepTraceSpanDurationMS(restoreDeepTraceLastSpanNamed(spans, "resume_vm")), + restoreDeepTraceSpanDurationMS(restoreDeepTraceLastSpanNamed(spans, "fault_guest_memory_from_disk")), + restoreDeepTraceSpanDurationMS(restoreDeepTraceLastSpanNamed(spans, "guest.resume_network.fault_guest_memory_from_disk")), + tracePath, + ) +} + +func restoreDeepTraceLastSpanNamed(spans []sdktrace.ReadOnlySpan, name string) sdktrace.ReadOnlySpan { + for i := len(spans) - 1; i >= 0; i-- { + if spans[i].Name() == name { + return spans[i] + } + } + return nil +} + +func restoreDeepTraceSpanDurationMS(span sdktrace.ReadOnlySpan) int64 { + if span == nil { + return -1 + } + return span.EndTime().Sub(span.StartTime()).Milliseconds() +} diff --git a/lib/system/guest_agent/resume_network.go b/lib/system/guest_agent/resume_network.go index 060cd687..5b1fa260 100644 --- a/lib/system/guest_agent/resume_network.go +++ b/lib/system/guest_agent/resume_network.go @@ -24,6 +24,7 @@ import ( const resumeNetworkMailboxEnv = "HYPEMAN_RESUME_NETWORK_MAILBOX" const resumeNetworkMailboxTokenEnv = "HYPEMAN_RESUME_NETWORK_MAILBOX_TOKEN" +const resumeNetworkDebugStagesEnv = "HYPEMAN_RESUME_NETWORK_DEBUG_STAGES" const vmgenIDKmsgSignal = "crng reseeded due to virtual machine fork" const resumeNetworkMailboxSize = 4096 const resumeNetworkMailboxSeqOffset = 64 @@ -108,6 +109,7 @@ func resumeNetworkMailboxLoop(s *guestServer, mailbox []byte) { } func waitAndApplyResumeNetworkMailbox(s *guestServer, buf []byte) error { + signalSeen := time.Now() for { seq := atomicLoadUint32(buf[resumeNetworkMailboxSeqOffset:]) if seq == 0 { @@ -125,6 +127,13 @@ func waitAndApplyResumeNetworkMailbox(s *guestServer, buf []byte) error { return fmt.Errorf("decode mailbox payload: %w", err) } + debugStages := strings.TrimSpace(os.Getenv(resumeNetworkDebugStagesEnv)) == "1" + if debugStages { + elapsed := time.Since(signalSeen).Microseconds() + sendResumeNetworkAck(payload, "signal_seen", fmt.Sprintf("guest_signal_to_mailbox_us=%d", elapsed)) + sendResumeNetworkAck(payload, "mailbox_seen", fmt.Sprintf("guest_signal_to_mailbox_us=%d", elapsed)) + sendResumeNetworkAck(payload, "netlink_start", fmt.Sprintf("guest_signal_to_netlink_start_us=%d", time.Since(signalSeen).Microseconds())) + } _, err := s.ReconfigureNetwork(context.Background(), &pb.ReconfigureNetworkRequest{ InterfaceName: payload.InterfaceName, Mac: payload.MAC, @@ -133,15 +142,21 @@ func waitAndApplyResumeNetworkMailbox(s *guestServer, buf []byte) error { Gateway: payload.Gateway, }) if err != nil { + if debugStages { + sendResumeNetworkAck(payload, "netlink_error", fmt.Sprintf("guest_signal_to_netlink_error_us=%d", time.Since(signalSeen).Microseconds())) + } return err } - sendResumeNetworkAck(payload, "applied") + if debugStages { + sendResumeNetworkAck(payload, "netlink_done", fmt.Sprintf("guest_signal_to_netlink_done_us=%d", time.Since(signalSeen).Microseconds())) + } + sendResumeNetworkAck(payload, "applied", fmt.Sprintf("guest_signal_to_applied_us=%d", time.Since(signalSeen).Microseconds())) atomicStoreUint32(buf[resumeNetworkMailboxSeqOffset:], 0) return nil } } -func sendResumeNetworkAck(payload resumeNetworkPayload, stage string) { +func sendResumeNetworkAck(payload resumeNetworkPayload, stage string, fields ...string) { if payload.AckPort == 0 || payload.Gateway == "" { return } @@ -154,7 +169,11 @@ func sendResumeNetworkAck(payload resumeNetworkPayload, stage string) { } defer conn.Close() - _, _ = fmt.Fprintf(conn, "stage=%s mac=%s ip=%s\n", stage, payload.MAC, payload.IPv4) + extra := "" + if len(fields) > 0 { + extra = " " + strings.Join(fields, " ") + } + _, _ = fmt.Fprintf(conn, "stage=%s mac=%s ip=%s%s\n", stage, payload.MAC, payload.IPv4, extra) } func atomicLoadUint32(buf []byte) uint32 {