Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions lib/forkvm/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ resume once the mailbox has been patched and the guest finishes the network
handoff asynchronously. If the mailbox path is unavailable, restore falls back
to the older host-initiated guest network reconfigure path.

The post-resume wait is usually not netlink time. On cold restores, sampled
Firecracker traces show the resumed vCPU faulting guest memory back in from the
snapshot backing file before the guest-agent can observe VMGenID, read the
mailbox, and send the network ack. In host traces this wait is labeled as
`fault_guest_memory_from_disk`.

## Fork data copy behavior

- Guest directory copy is **sparse-only** for regular files.
Expand Down
125 changes: 125 additions & 0 deletions lib/instances/fork_phase_breakdown_perf_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
//go:build linux

package instances

import (
"context"
"fmt"
"os"
"sort"
"strconv"
"strings"
"testing"
"time"

"github.com/kernel/hypeman/lib/hypervisor"
"github.com/kernel/hypeman/lib/images"
"github.com/kernel/hypeman/lib/paths"
snapshottest "github.com/kernel/hypeman/lib/snapshot/testsupport"
"github.com/kernel/hypeman/lib/system"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
)

const forkPhaseBreakdownPerfEnv = "HYPEMAN_RUN_FORK_PHASE_BREAKDOWN_PERF"
const forkPhaseBreakdownPerfItersEnv = "HYPEMAN_FORK_PHASE_BREAKDOWN_PERF_ITERS"

func TestForkSnapshotPhaseBreakdownPerf(t *testing.T) {
if os.Getenv(forkPhaseBreakdownPerfEnv) != "1" {
t.Skipf("set %s=1 to run fork snapshot phase breakdown perf test", forkPhaseBreakdownPerfEnv)
}
requireFirecrackerIntegrationPrereqs(t)

recorder := tracetest.NewSpanRecorder()
provider := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(recorder))
previousProvider := otel.GetTracerProvider()
otel.SetTracerProvider(provider)
t.Cleanup(func() {
otel.SetTracerProvider(previousProvider)
_ = provider.Shutdown(context.Background())
})

ctx := context.Background()
mgr, tmpDir := setupTestManagerForFirecracker(t)
p := paths.New(tmpDir)
imageManager, err := images.NewManager(p, 1, nil)
require.NoError(t, err)
imageName := integrationTestImageRef(t, "docker.io/library/alpine:latest")
snapshottest.EnsureImageReady(t, ctx, p, imageManager, imageName)

systemManager := system.NewManager(p)
require.NoError(t, systemManager.EnsureSystemFiles(ctx))
require.NoError(t, mgr.networkManager.Initialize(ctx, nil))

env := map[string]string{
guestResumeNetworkMailboxEnv: "1",
guestResumeNetworkMailboxTokenEnv: fmt.Sprintf("phase-%d", time.Now().UnixNano()),
guestResumeNetworkDebugStagesEnv: "1",
}
source, err := mgr.CreateInstance(ctx, CreateInstanceRequest{
Name: "fc-phase-src",
Image: imageName,
Size: 1024 * 1024 * 1024,
OverlaySize: 1024 * 1024 * 1024,
Vcpus: 1,
NetworkEnabled: true,
Hypervisor: hypervisor.TypeFirecracker,
Cmd: []string{"sleep", "infinity"},
Env: env,
})
require.NoError(t, err)
t.Cleanup(func() { _ = mgr.DeleteInstance(context.Background(), source.Id) })

source, err = waitForInstanceState(ctx, mgr, source.Id, StateRunning, integrationTestTimeout(45*time.Second))
require.NoError(t, err)
require.NoError(t, waitForExecAgent(ctx, mgr, source.Id, 45*time.Second))

snapshot, err := mgr.CreateSnapshot(ctx, source.Id, CreateSnapshotRequest{
Kind: SnapshotKindStandby,
Name: "fc-phase-snap",
})
require.NoError(t, err)
t.Cleanup(func() { _ = mgr.DeleteSnapshot(context.Background(), snapshot.Id) })

waitForNetwork := true
iterations := forkPhaseBreakdownPerfIterations(t, 3)
for i := 1; i <= iterations; i++ {
beforeSpanCount := len(recorder.Ended())
start := time.Now()
fork, err := mgr.ForkSnapshot(ctx, snapshot.Id, ForkSnapshotRequest{
Name: fmt.Sprintf("fc-phase-%02d", i),
TargetState: StateRunning,
WaitForNetwork: &waitForNetwork,
})
forkElapsed := time.Since(start)
require.NoError(t, err)
require.Equal(t, StateRunning, fork.State)

spans := append([]sdktrace.ReadOnlySpan(nil), recorder.Ended()[beforeSpanCount:]...)
sort.Slice(spans, func(i, j int) bool {
return spans[i].StartTime().Before(spans[j].StartTime())
})
parts := make([]string, 0, len(spans))
for _, span := range spans {
parts = append(parts, fmt.Sprintf("%s=%d", span.Name(), span.EndTime().Sub(span.StartTime()).Milliseconds()))
}
t.Logf("FORK_PHASE_BREAKDOWN iter=%d fork_total_ms=%d spans=%s", i, forkElapsed.Milliseconds(), strings.Join(parts, ","))

require.NoError(t, waitForExecAgent(ctx, mgr, fork.Id, 45*time.Second))
_ = mgr.DeleteInstance(context.Background(), fork.Id)
}
}

func forkPhaseBreakdownPerfIterations(t *testing.T, fallback int) int {
t.Helper()
raw := strings.TrimSpace(os.Getenv(forkPhaseBreakdownPerfItersEnv))
if raw == "" {
return fallback
}
n, err := strconv.Atoi(raw)
require.NoError(t, err)
require.Positive(t, n)
return n
}
55 changes: 46 additions & 9 deletions lib/instances/guest_resume_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ type guestResumeNetworkUDPAck struct {
text string
}

type guestResumeNetworkUDPWaitResult struct {
appliedElapsed time.Duration
appliedAck string
stageElapsed map[string]time.Duration
stageAck map[string]string
}

type guestResumeNetworkUDPWaiter struct {
conn *stdnet.UDPConn
ch chan guestResumeNetworkUDPAck
Expand Down Expand Up @@ -117,25 +124,53 @@ func (w *guestResumeNetworkUDPWaiter) readLoop() {
}
}

func (w *guestResumeNetworkUDPWaiter) WaitApplied(ctx context.Context, mac, ip string) (time.Duration, string, error) {
func (w *guestResumeNetworkUDPWaiter) WaitApplied(ctx context.Context, mac, ip string) (guestResumeNetworkUDPWaitResult, error) {
if w == nil {
return 0, "", fmt.Errorf("guest resume network UDP waiter is nil")
return guestResumeNetworkUDPWaitResult{}, fmt.Errorf("guest resume network UDP waiter is nil")
}

start := time.Now()
wantMAC := "mac=" + strings.ToLower(mac)
wantIP := "ip=" + ip
result := guestResumeNetworkUDPWaitResult{
stageElapsed: make(map[string]time.Duration),
stageAck: make(map[string]string),
}
for {
select {
case ack := <-w.ch:
text := strings.ToLower(ack.text)
if strings.Contains(text, "stage=applied") && strings.Contains(text, wantMAC) && strings.Contains(text, wantIP) {
return ack.received.Sub(start), ack.text, nil
if !strings.Contains(text, wantMAC) || !strings.Contains(text, wantIP) {
continue
}
if stage, ok := guestResumeNetworkAckStage(text); ok {
if _, exists := result.stageElapsed[stage]; !exists {
result.stageElapsed[stage] = ack.received.Sub(start)
result.stageAck[stage] = ack.text
if deepTrace := restoreDeepTraceFromContext(ctx); deepTrace != nil {
deepTrace.Mark("guest_"+stage+"_received", ack.text)
deepTrace.Sample("guest_" + stage + "_received")
}
}
}
if strings.Contains(text, "stage=applied") {
result.appliedElapsed = ack.received.Sub(start)
result.appliedAck = ack.text
return result, nil
}
case <-ctx.Done():
return 0, "", ctx.Err()
return guestResumeNetworkUDPWaitResult{}, ctx.Err()
}
}
}

func guestResumeNetworkAckStage(text string) (string, bool) {
for _, field := range strings.Fields(text) {
if stage, ok := strings.CutPrefix(field, "stage="); ok && stage != "" {
return stage, true
}
}
return "", false
}

func (m *manager) waitForGuestResumeNetworkUDPAck(ctx context.Context, waiter *guestResumeNetworkUDPWaiter, stored *StoredMetadata, cfg *guestNetworkConfig) error {
Expand All @@ -144,20 +179,22 @@ func (m *manager) waitForGuestResumeNetworkUDPAck(ctx context.Context, waiter *g
}

log := logger.FromContext(ctx)
waitCtx, waitSpanEnd := m.startLifecycleStep(ctx, "guest.resume_network.udp_ack_wait",
waitCtx, waitSpanEnd := m.startLifecycleStep(ctx, "guest.resume_network.fault_guest_memory_from_disk",
attribute.String("instance_id", stored.Id),
attribute.String("hypervisor", string(stored.HypervisorType)),
attribute.String("operation", "guest_resume_network_udp_ack_wait"),
attribute.String("operation", "guest_resume_network_fault_guest_memory_from_disk"),
attribute.String("wait_for", "guest_network_applied_ack"),
attribute.String("observed_dominant_wait", "fault_guest_memory_from_disk"),
)
waitCtx, cancel := context.WithTimeout(waitCtx, 2*time.Second)
defer cancel()

elapsed, ack, err := waiter.WaitApplied(waitCtx, cfg.mac, cfg.ip)
result, err := waiter.WaitApplied(waitCtx, cfg.mac, cfg.ip)
waitSpanEnd(err)
if err != nil {
return err
}
log.InfoContext(ctx, "guest resume network UDP ack received", "instance_id", stored.Id, "elapsed", elapsed, "ack", ack)
log.InfoContext(ctx, "guest resume network UDP ack received", "instance_id", stored.Id, "elapsed", result.appliedElapsed, "ack", result.appliedAck, "stages", result.stageElapsed)
return nil
}

Expand Down
52 changes: 49 additions & 3 deletions lib/instances/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,21 +293,41 @@ func (m *manager) restoreInstance(
// Store the PID for later cleanup
stored.HypervisorPID = &pid

deepTrace, traceErr := newRestoreDeepTrace(ctx, stored, pid, snapshotDir)
if traceErr != nil {
log.WarnContext(ctx, "failed to start restore deep trace", "instance_id", id, "pid", pid, "error", traceErr)
} else if deepTrace != nil {
defer func() { deepTrace.Close("restore_done", retErr) }()
ctx = withRestoreDeepTrace(ctx, deepTrace)
}

// 6. Transition: Paused → Running (resume)
resumeCtx, resumeSpanEnd := m.startLifecycleStep(ctx, "resume_vm",
attribute.String("instance_id", id),
attribute.String("hypervisor", string(stored.HypervisorType)),
attribute.String("operation", "resume_vm"),
)
log.InfoContext(ctx, "resuming VM", "instance_id", id)
if deepTrace != nil {
deepTrace.Mark("resume_call_start", "")
deepTrace.Sample("resume_call_start")
}
if err := hv.Resume(resumeCtx); err != nil {
if deepTrace != nil {
deepTrace.Mark("resume_error", err.Error())
deepTrace.Sample("resume_error")
}
resumeSpanEnd(err)
log.ErrorContext(ctx, "failed to resume VM", "instance_id", id, "error", err)
// Cleanup on failure
hv.Shutdown(ctx)
releaseNetwork()
return nil, fmt.Errorf("resume vm failed: %w", err)
}
if deepTrace != nil {
deepTrace.Mark("resume_returned", "")
deepTrace.Sample("resume_returned")
}
resumeSpanEnd(nil)
// Mark the instance visible before releasing its pending reservation so we
// never create an undercount window. The tiny overlap is intentionally
Expand All @@ -323,19 +343,45 @@ func (m *manager) restoreInstance(
// still has the source VM's old IP configuration. Reconfigure guest networking after
// resume so host ingress to the new private IP works reliably.
if allocatedNet != nil && !stored.SkipGuestAgent {
reconfigureCtx, reconfigureSpanEnd := m.startLifecycleStep(ctx, "reconfigure_guest_network",
guestNetworkStep := "reconfigure_guest_network"
guestNetworkOperation := "reconfigure_guest_network"
guestNetworkAttrs := []attribute.KeyValue{
attribute.String("instance_id", id),
attribute.String("hypervisor", string(stored.HypervisorType)),
attribute.String("operation", "reconfigure_guest_network"),
)
}
if resumeNetworkMailboxPatched && waitForGuestNetwork {
guestNetworkStep = "fault_guest_memory_from_disk"
guestNetworkOperation = "fault_guest_memory_from_disk"
guestNetworkAttrs = append(guestNetworkAttrs,
attribute.String("wait_for", "guest_network_applied_ack"),
attribute.String("observed_dominant_wait", "fault_guest_memory_from_disk"),
)
} else if resumeNetworkMailboxPatched {
guestNetworkStep = "guest_network_mailbox_handoff"
guestNetworkOperation = "guest_network_mailbox_handoff"
}
guestNetworkAttrs = append(guestNetworkAttrs, attribute.String("operation", guestNetworkOperation))
reconfigureCtx, reconfigureSpanEnd := m.startLifecycleStep(ctx, guestNetworkStep, guestNetworkAttrs...)
var reconfigureErr error
if resumeNetworkMailboxPatched && waitForGuestNetwork {
if deepTrace != nil {
deepTrace.Mark("wait_guest_network_start", "")
deepTrace.Sample("wait_guest_network_start")
}
reconfigureErr = m.waitForGuestResumeNetworkUDPAck(reconfigureCtx, resumeNetworkAckWaiter, stored, resumeNetworkAckCfg)
} else if resumeNetworkMailboxPatched {
log.InfoContext(ctx, "guest resume network mailbox patched", "instance_id", id)
} else {
reconfigureErr = reconfigureGuestNetwork(reconfigureCtx, stored, allocatedNet)
}
if deepTrace != nil {
stage := "reconfigure_guest_network_done"
if reconfigureErr != nil {
stage = "reconfigure_guest_network_error"
}
deepTrace.Mark(stage, "")
deepTrace.Sample(stage)
}
reconfigureSpanEnd(reconfigureErr)
if reconfigureErr != nil {
log.ErrorContext(ctx, "failed to configure guest network after restore", "instance_id", id, "error", reconfigureErr)
Expand Down
Loading
Loading