Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ download-ch-binaries:
@echo "Binaries downloaded successfully"

# Firecracker version to embed
FIRECRACKER_VERSION := v1.14.2
FIRECRACKER_VERSION := v1.15.1

# Download Firecracker binaries
download-firecracker-binaries:
Expand Down
4 changes: 3 additions & 1 deletion lib/hypervisor/firecracker/binaries.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ type Version string

const (
V1_14_2 Version = "v1.14.2"
V1_15_1 Version = "v1.15.1"
)

const defaultVersion = V1_14_2
const defaultVersion = V1_15_1

var supportedVersions = []Version{
V1_15_1,
V1_14_2,
}

Expand Down
97 changes: 90 additions & 7 deletions lib/instances/guest_resume_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,25 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"io"
stdnet "net"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"

"github.com/kernel/hypeman/lib/hypervisor"
"github.com/kernel/hypeman/lib/logger"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sys/unix"
)

const guestResumeNetworkMailboxEnv = "HYPEMAN_RESUME_NETWORK_MAILBOX"
const guestResumeNetworkMailboxTokenEnv = "HYPEMAN_RESUME_NETWORK_MAILBOX_TOKEN"
const guestResumeNetworkPrefetchBytesEnv = "HYPEMAN_RESUME_NETWORK_PREFETCH_BYTES"
const firecrackerSnapshotMemoryFile = "memory"

const guestResumeNetworkMailboxSeqOffset = 64
Expand All @@ -44,6 +48,12 @@ type guestResumeNetworkUDPAck struct {
text string
}

type guestResumeNetworkUDPWaitResult struct {
appliedElapsed time.Duration
appliedAck string
stageElapsed map[string]time.Duration
}

type guestResumeNetworkUDPWaiter struct {
conn *stdnet.UDPConn
ch chan guestResumeNetworkUDPAck
Expand Down Expand Up @@ -117,27 +127,49 @@ func (w *guestResumeNetworkUDPWaiter) readLoop() {
}
}

func (w *guestResumeNetworkUDPWaiter) WaitApplied(ctx context.Context, mac, ip string) (time.Duration, string, error) {
func (w *guestResumeNetworkUDPWaiter) WaitApplied(ctx context.Context, mac, ip string) (guestResumeNetworkUDPWaitResult, error) {
if w == nil {
return 0, "", fmt.Errorf("guest resume network UDP waiter is nil")
return guestResumeNetworkUDPWaitResult{}, fmt.Errorf("guest resume network UDP waiter is nil")
}

start := time.Now()
wantMAC := "mac=" + strings.ToLower(mac)
wantIP := "ip=" + ip
result := guestResumeNetworkUDPWaitResult{
stageElapsed: make(map[string]time.Duration),
}
for {
select {
case ack := <-w.ch:
text := strings.ToLower(ack.text)
if strings.Contains(text, "stage=applied") && strings.Contains(text, wantMAC) && strings.Contains(text, wantIP) {
return ack.received.Sub(start), ack.text, nil
if !strings.Contains(text, wantMAC) || !strings.Contains(text, wantIP) {
continue
}
if stage, ok := guestResumeNetworkAckStage(text); ok {
if _, exists := result.stageElapsed[stage]; !exists {
result.stageElapsed[stage] = ack.received.Sub(start)
}
}
if strings.Contains(text, "stage=applied") {
result.appliedElapsed = ack.received.Sub(start)
result.appliedAck = ack.text
return result, nil
}
case <-ctx.Done():
return 0, "", ctx.Err()
return guestResumeNetworkUDPWaitResult{}, ctx.Err()
}
}
}

func guestResumeNetworkAckStage(text string) (string, bool) {
for _, field := range strings.Fields(text) {
if stage, ok := strings.CutPrefix(field, "stage="); ok && stage != "" {
return stage, true
}
}
return "", false
}

func (m *manager) waitForGuestResumeNetworkUDPAck(ctx context.Context, waiter *guestResumeNetworkUDPWaiter, stored *StoredMetadata, cfg *guestNetworkConfig) error {
if waiter == nil || cfg == nil {
return nil
Expand All @@ -152,12 +184,16 @@ func (m *manager) waitForGuestResumeNetworkUDPAck(ctx context.Context, waiter *g
waitCtx, cancel := context.WithTimeout(waitCtx, 2*time.Second)
defer cancel()

elapsed, ack, err := waiter.WaitApplied(waitCtx, cfg.mac, cfg.ip)
result, err := waiter.WaitApplied(waitCtx, cfg.mac, cfg.ip)
span := trace.SpanFromContext(waitCtx)
for stage, elapsed := range result.stageElapsed {
span.SetAttributes(attribute.Int64("guest_resume_network_ack_"+stage+"_ms", elapsed.Milliseconds()))
}
waitSpanEnd(err)
if err != nil {
return err
}
log.InfoContext(ctx, "guest resume network UDP ack received", "instance_id", stored.Id, "elapsed", elapsed, "ack", ack)
log.InfoContext(ctx, "guest resume network UDP ack received", "instance_id", stored.Id, "elapsed", result.appliedElapsed, "ack", result.appliedAck, "stages", result.stageElapsed)
return nil
}

Expand Down Expand Up @@ -218,6 +254,53 @@ func patchGuestResumeNetworkMailbox(snapshotDir, token string, payload *guestRes
if _, err := file.WriteAt(u32[:], idx+int64(guestResumeNetworkMailboxSeqOffset)); err != nil {
return fmt.Errorf("write resume network mailbox sequence: %w", err)
}
if n := guestResumeNetworkPrefetchBytes(); n > 0 {
if err := prefetchSnapshotMemoryWindow(file, info.Size(), idx, n); err != nil {
return fmt.Errorf("prefetch resume network mailbox memory window: %w", err)
}
}
return nil
}

func guestResumeNetworkPrefetchBytes() int64 {
raw := strings.TrimSpace(os.Getenv(guestResumeNetworkPrefetchBytesEnv))
if raw == "" {
return 0
}
n, err := strconv.ParseInt(raw, 10, 64)
if err != nil || n <= 0 {
return 0
}
return n
}

func prefetchSnapshotMemoryWindow(file *os.File, size int64, center int64, windowBytes int64) error {
if windowBytes <= 0 || size <= 0 {
return nil
}
start := center - windowBytes/2
if start < 0 {
start = 0
}
if start > size {
start = size
}
end := start + windowBytes
if end > size {
end = size
}

buf := make([]byte, 1024*1024)
for off := start; off < end; {
n := int64(len(buf))
if remaining := end - off; remaining < n {
n = remaining
}
if _, err := file.ReadAt(buf[:n], off); err != nil && err != io.EOF {
return err
}
off += n
}
return nil
}

Expand Down
184 changes: 184 additions & 0 deletions lib/instances/resume_network_signal_perf_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
//go:build linux

package instances

import (
"context"
"fmt"
"os"
"strconv"
"strings"
"testing"
"time"

"github.com/kernel/hypeman/lib/hypervisor"
"github.com/kernel/hypeman/lib/images"
"github.com/kernel/hypeman/lib/paths"
snapshottest "github.com/kernel/hypeman/lib/snapshot/testsupport"
"github.com/kernel/hypeman/lib/system"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
)

const resumeNetworkSignalPerfEnv = "HYPEMAN_RUN_RESUME_NETWORK_SIGNAL_PERF"
const resumeNetworkSignalPerfItersEnv = "HYPEMAN_RESUME_NETWORK_SIGNAL_PERF_ITERS"
const resumeNetworkSignalPerfWaitEnv = "HYPEMAN_RESUME_NETWORK_SIGNAL_PERF_WAIT_FOR_NETWORK"
const resumeNetworkSignalGuestEnv = "HYPEMAN_RESUME_NETWORK_SIGNAL"
const resumeNetworkAckStagesGuestEnv = "HYPEMAN_RESUME_NETWORK_ACK_STAGES"

func TestResumeNetworkSignalPerf(t *testing.T) {
if os.Getenv(resumeNetworkSignalPerfEnv) != "1" {
t.Skipf("set %s=1 to run resume network signal perf test", resumeNetworkSignalPerfEnv)
}
requireFirecrackerIntegrationPrereqs(t)

recorder := tracetest.NewSpanRecorder()
provider := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(recorder))
previousProvider := otel.GetTracerProvider()
otel.SetTracerProvider(provider)
t.Cleanup(func() {
otel.SetTracerProvider(previousProvider)
_ = provider.Shutdown(context.Background())
})

ctx := context.Background()
mgr, tmpDir := setupTestManagerForFirecracker(t)
p := paths.New(tmpDir)
imageManager, err := images.NewManager(p, 1, nil)
require.NoError(t, err)
imageName := integrationTestImageRef(t, "docker.io/library/alpine:latest")
snapshottest.EnsureImageReady(t, ctx, p, imageManager, imageName)

systemManager := system.NewManager(p)
require.NoError(t, systemManager.EnsureSystemFiles(ctx))
require.NoError(t, mgr.networkManager.Initialize(ctx, nil))

signal := strings.TrimSpace(os.Getenv(resumeNetworkSignalGuestEnv))
if signal == "" {
signal = "auto"
}
waitForNetwork := strings.TrimSpace(os.Getenv(resumeNetworkSignalPerfWaitEnv)) != "0"
env := map[string]string{
guestResumeNetworkMailboxEnv: "1",
guestResumeNetworkMailboxTokenEnv: fmt.Sprintf("perf-%d", time.Now().UnixNano()),
resumeNetworkSignalGuestEnv: signal,
resumeNetworkAckStagesGuestEnv: "1",
}

source, err := mgr.CreateInstance(ctx, CreateInstanceRequest{
Name: "fc-rn-signal-src",
Image: imageName,
Size: 1024 * 1024 * 1024,
OverlaySize: 1024 * 1024 * 1024,
Vcpus: 1,
NetworkEnabled: true,
Hypervisor: hypervisor.TypeFirecracker,
Cmd: []string{"sleep", "infinity"},
Env: env,
})
require.NoError(t, err)
sourceID := source.Id
t.Cleanup(func() { _ = mgr.DeleteInstance(context.Background(), sourceID) })

source, err = waitForInstanceState(ctx, mgr, sourceID, StateRunning, integrationTestTimeout(45*time.Second))
require.NoError(t, err)
require.NoError(t, waitForExecAgent(ctx, mgr, sourceID, 45*time.Second))

snapshot, err := mgr.CreateSnapshot(ctx, sourceID, CreateSnapshotRequest{
Kind: SnapshotKindStandby,
Name: "fc-rn-signal-snap",
})
require.NoError(t, err)
t.Cleanup(func() { _ = mgr.DeleteSnapshot(context.Background(), snapshot.Id) })

iterations := resumeNetworkSignalPerfIterations(t, 10)
for i := 1; i <= iterations; i++ {
before := len(recorder.Ended())
start := time.Now()
fork, err := mgr.ForkSnapshot(ctx, snapshot.Id, ForkSnapshotRequest{
Name: fmt.Sprintf("fc-rn-signal-%02d", i),
TargetState: StateRunning,
WaitForNetwork: &waitForNetwork,
})
forkElapsed := time.Since(start)
require.NoError(t, err)
require.Equal(t, StateRunning, fork.State)

spans := append([]sdktrace.ReadOnlySpan(nil), recorder.Ended()[before:]...)
t.Log(formatResumeNetworkSignalPerfLine(i, signal, waitForNetwork, forkElapsed, spans))

require.NoError(t, waitForExecAgent(ctx, mgr, fork.Id, 45*time.Second))
_ = mgr.DeleteInstance(context.Background(), fork.Id)
}
}

func resumeNetworkSignalPerfIterations(t *testing.T, fallback int) int {
t.Helper()
raw := strings.TrimSpace(os.Getenv(resumeNetworkSignalPerfItersEnv))
if raw == "" {
return fallback
}
n, err := strconv.Atoi(raw)
require.NoError(t, err)
require.Positive(t, n)
return n
}

func formatResumeNetworkSignalPerfLine(iter int, signal string, waitForNetwork bool, forkElapsed time.Duration, spans []sdktrace.ReadOnlySpan) string {
ackWait := lastSpanNamed(spans, "guest.resume_network.udp_ack_wait")
mailboxAckMS := spanAttrInt64(ackWait, "guest_resume_network_ack_mailbox_ms")
appliedAckMS := spanAttrInt64(ackWait, "guest_resume_network_ack_applied_ms")
applyAfterMailboxMS := int64(-1)
if mailboxAckMS >= 0 && appliedAckMS >= 0 {
applyAfterMailboxMS = appliedAckMS - mailboxAckMS
}
return fmt.Sprintf(
"PERF_SIGNAL iter=%d signal=%s wait_for_network=%t fork_total_ms=%d restore_from_snapshot_ms=%d resume_vm_ms=%d reconfigure_guest_network_ms=%d guest_resume_network_udp_ack_wait_ms=%d guest_mailbox_ack_ms=%d guest_applied_ack_ms=%d guest_apply_after_mailbox_ms=%d",
iter,
signal,
waitForNetwork,
forkElapsed.Milliseconds(),
spanDurationMS(lastSpanNamed(spans, "restore_from_snapshot")),
spanDurationMS(lastSpanNamed(spans, "resume_vm")),
spanDurationMS(lastSpanNamed(spans, "reconfigure_guest_network")),
spanDurationMS(ackWait),
mailboxAckMS,
appliedAckMS,
applyAfterMailboxMS,
)
}

func lastSpanNamed(spans []sdktrace.ReadOnlySpan, name string) sdktrace.ReadOnlySpan {
for i := len(spans) - 1; i >= 0; i-- {
if spans[i].Name() == name {
return spans[i]
}
}
return nil
}

func spanDurationMS(span sdktrace.ReadOnlySpan) int64 {
if span == nil {
return -1
}
return span.EndTime().Sub(span.StartTime()).Milliseconds()
}

func spanAttrInt64(span sdktrace.ReadOnlySpan, key string) int64 {
if span == nil {
return -1
}
for _, attr := range span.Attributes() {
if string(attr.Key) != key {
continue
}
switch attr.Value.Type() {
case attribute.INT64:
return attr.Value.AsInt64()
}
}
return -1
}
Loading
Loading