Skip to content

Commit f2282e2

Browse files
committed
refactor: replace global sharedResourceLock with a fine-grained resource-specific mutex pool for GCE load balancer operations
1 parent 9bff285 commit f2282e2

File tree

4 files changed

+293
-19
lines changed

4 files changed

+293
-19
lines changed

providers/gce/gce.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,9 @@ type Cloud struct {
176176
// sharedResourceLock is used to serialize GCE operations that may mutate shared state to
177177
// prevent inconsistencies. For example, load balancers manipulation methods will take the
178178
// lock to prevent shared resources from being prematurely deleted while the operation is
179-
// in progress.
180179
sharedResourceLock sync.Mutex
180+
// sharedResourceLocks is a concurrent map used for resource-specific fine-grained locking of shared resources (e.g. InstanceGroups, shared HealthChecks).
181+
sharedResourceLocks sync.Map // map[string]*sync.Mutex
181182
// AlphaFeatureGate gates gce alpha features in Cloud instance.
182183
// Related wrapper functions that interacts with gce alpha api should examine whether
183184
// the corresponding api is enabled.
@@ -219,6 +220,19 @@ type Cloud struct {
219220
enableL4DenyFirewallRollbackCleanup bool
220221
}
221222

223+
type SharedResourceType string
224+
225+
const (
226+
ResourceTypeHealthCheck SharedResourceType = "hc"
227+
ResourceTypeInstanceGroup SharedResourceType = "ig"
228+
)
229+
230+
func (g *Cloud) getLockForResource(resType SharedResourceType, name string) *sync.Mutex {
231+
key := string(resType) + ":" + name
232+
v, _ := g.sharedResourceLocks.LoadOrStore(key, &sync.Mutex{})
233+
return v.(*sync.Mutex)
234+
}
235+
222236
// ConfigGlobal is the in memory representation of the gce.conf config data
223237
// TODO: replace gcfg with json
224238
type ConfigGlobal struct {

providers/gce/gce_loadbalancer_internal.go

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"sort"
2727
"strconv"
2828
"strings"
29+
"sync"
2930

3031
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
3132
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
@@ -128,10 +129,6 @@ func (g *Cloud) ensureInternalLoadBalancer(clusterName, clusterID string, svc *v
128129
}
129130
}
130131

131-
// Lock the sharedResourceLock to prevent any deletions of shared resources while assembling shared resources here
132-
g.sharedResourceLock.Lock()
133-
defer g.sharedResourceLock.Unlock()
134-
135132
// Ensure health check exists before creating the backend service. The health check is shared
136133
// if externalTrafficPolicy=Cluster.
137134
sharedHealthCheck := !servicehelpers.RequestsOnlyLocalTraffic(svc)
@@ -354,9 +351,6 @@ func (g *Cloud) updateInternalLoadBalancer(clusterName, clusterID string, svc *v
354351
klog.V(2).Infof("Skipped updateInternalLoadBalancer for service %s/%s as service contains %q loadBalancerClass.", svc.Namespace, svc.Name, *svc.Spec.LoadBalancerClass)
355352
return cloudprovider.ImplementedElsewhere
356353
}
357-
g.sharedResourceLock.Lock()
358-
defer g.sharedResourceLock.Unlock()
359-
360354
igName := makeInstanceGroupName(clusterID)
361355
igLinks, err := g.ensureInternalInstanceGroups(igName, nodes)
362356
if err != nil {
@@ -393,9 +387,6 @@ func (g *Cloud) ensureInternalLoadBalancerDeleted(clusterName, clusterID string,
393387
sharedBackend := shareBackendService(svc)
394388
sharedHealthCheck := !servicehelpers.RequestsOnlyLocalTraffic(svc)
395389

396-
g.sharedResourceLock.Lock()
397-
defer g.sharedResourceLock.Unlock()
398-
399390
klog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): attempting delete of region internal address", loadBalancerName)
400391
ensureAddressDeleted(g, loadBalancerName, g.region)
401392

@@ -608,17 +599,43 @@ func (g *Cloud) ensureInternalHealthCheck(name string, svcName types.NamespacedN
608599
}
609600

610601
if hc == nil {
611-
klog.V(2).Infof("ensureInternalHealthCheck: did not find health check %v, creating one with port %v path %v", name, port, path)
612-
if err = g.CreateHealthCheck(expectedHC); err != nil {
613-
return nil, err
614-
}
615-
hc, err = g.GetHealthCheck(name)
602+
var created bool
603+
err = func() error {
604+
var lock *sync.Mutex
605+
if shared {
606+
lock = g.getLockForResource(ResourceTypeHealthCheck, name)
607+
lock.Lock()
608+
}
609+
defer func() {
610+
if lock != nil {
611+
lock.Unlock()
612+
}
613+
}()
614+
615+
hc, err = g.GetHealthCheck(name)
616+
if err != nil && !isNotFound(err) {
617+
return err
618+
}
619+
620+
if hc == nil {
621+
klog.V(2).Infof("ensureInternalHealthCheck: did not find health check %v, creating one with port %v path %v", name, port, path)
622+
if err = g.CreateHealthCheck(expectedHC); err != nil {
623+
return err
624+
}
625+
hc, err = g.GetHealthCheck(name)
626+
if err != nil {
627+
return err
628+
}
629+
created = true
630+
}
631+
return nil
632+
}()
616633
if err != nil {
617-
klog.Errorf("Failed to get http health check %v", err)
618634
return nil, err
619635
}
620-
klog.V(2).Infof("ensureInternalHealthCheck: created health check %v", name)
621-
return hc, nil
636+
if created {
637+
return hc, nil
638+
}
622639
}
623640

624641
if needToUpdateHealthChecks(hc, expectedHC) {
@@ -638,6 +655,10 @@ func (g *Cloud) ensureInternalHealthCheck(name string, svcName types.NamespacedN
638655
}
639656

640657
func (g *Cloud) ensureInternalInstanceGroup(name, zone string, nodes []*v1.Node, emptyZoneNodes []*v1.Node) (string, error) {
658+
lock := g.getLockForResource(ResourceTypeInstanceGroup, name+"-"+zone)
659+
lock.Lock()
660+
defer lock.Unlock()
661+
641662
klog.V(2).Infof("ensureInternalInstanceGroup(%v, %v): checking group that it contains %v nodes [node names limited, total number of nodes: %d], the following nodes have empty string in the zone field and won't be deleted: %v", name, zone, loggableNodeNames(nodes), len(nodes), loggableNodeNames(emptyZoneNodes))
642663
ig, err := g.GetInstanceGroup(name, zone)
643664
if err != nil && !isNotFound(err) {

providers/gce/gce_loadbalancer_internal_test.go

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,23 @@ package gce
2222
import (
2323
"context"
2424
"fmt"
25+
"net/http"
2526
"reflect"
2627
"sort"
2728
"strings"
29+
"sync/atomic"
2830
"testing"
31+
"time"
2932

3033
"github.com/stretchr/testify/assert"
3134
"github.com/stretchr/testify/require"
35+
"golang.org/x/sync/errgroup"
3236

3337
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
3438
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
3539
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/mock"
3640
"google.golang.org/api/compute/v1"
41+
"google.golang.org/api/googleapi"
3742
v1 "k8s.io/api/core/v1"
3843
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3944
"k8s.io/apimachinery/pkg/types"
@@ -2502,3 +2507,204 @@ func TestEnsureInternalLoadBalancerClass(t *testing.T) {
25022507
}
25032508
}
25042509
}
2510+
2511+
func TestEnsureInternalBackendServiceConflict(t *testing.T) {
2512+
t.Parallel()
2513+
2514+
vals := DefaultTestClusterValues()
2515+
nodeNames := []string{"test-node-1"}
2516+
2517+
gce, err := fakeGCECloud(vals)
2518+
require.NoError(t, err)
2519+
2520+
svc := fakeLoadbalancerService(string(LBTypeInternal))
2521+
lbName := gce.GetLoadBalancerName(context.TODO(), "", svc)
2522+
nodes, err := createAndInsertNodes(gce, nodeNames, vals.ZoneName)
2523+
require.NoError(t, err)
2524+
igName := makeInstanceGroupName(vals.ClusterID)
2525+
igLinks, err := gce.ensureInternalInstanceGroups(igName, nodes)
2526+
require.NoError(t, err)
2527+
2528+
sharedBackend := shareBackendService(svc)
2529+
bsName := makeBackendServiceName(lbName, vals.ClusterID, sharedBackend, cloud.SchemeInternal, "TCP", svc.Spec.SessionAffinity)
2530+
2531+
// Create backend initially
2532+
err = gce.ensureInternalBackendService(bsName, "description", svc.Spec.SessionAffinity, cloud.SchemeInternal, "TCP", igLinks, "")
2533+
require.NoError(t, err)
2534+
2535+
// Mock 412 error
2536+
c := gce.c.(*cloud.MockGCE)
2537+
c.MockRegionBackendServices.UpdateHook = func(ctx context.Context, key *meta.Key, obj *compute.BackendService, m *cloud.MockRegionBackendServices, options ...cloud.Option) error {
2538+
return &googleapi.Error{Code: http.StatusPreconditionFailed, Message: "Precondition Failed"}
2539+
}
2540+
2541+
// Update the Backend Service to trigger the update hook
2542+
err = gce.ensureInternalBackendService(bsName, "description", v1.ServiceAffinityNone, cloud.SchemeInternal, "TCP", igLinks, "")
2543+
2544+
// Verify that the error is propagated
2545+
require.Error(t, err)
2546+
assert.Contains(t, err.Error(), "Precondition Failed")
2547+
assert.IsType(t, &googleapi.Error{}, err)
2548+
if gErr, ok := err.(*googleapi.Error); ok {
2549+
assert.Equal(t, http.StatusPreconditionFailed, gErr.Code)
2550+
}
2551+
}
2552+
2553+
func TestResourceLockErrorRecovery(t *testing.T) {
2554+
t.Parallel()
2555+
vals := DefaultTestClusterValues()
2556+
gce, _ := fakeGCECloud(vals)
2557+
c := gce.c.(*cloud.MockGCE)
2558+
2559+
svc := fakeLoadbalancerService(string(LBTypeInternal))
2560+
svcName := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}
2561+
2562+
var calls int32
2563+
c.MockHealthChecks.InsertHook = func(ctx context.Context, key *meta.Key, obj *compute.HealthCheck, m *cloud.MockHealthChecks, options ...cloud.Option) (bool, error) {
2564+
if atomic.AddInt32(&calls, 1) == 1 {
2565+
return true, &googleapi.Error{Code: http.StatusInternalServerError, Message: "Simulated GCP Error"}
2566+
}
2567+
return false, nil
2568+
}
2569+
2570+
// 1st request should error out and release lock
2571+
_, err := gce.ensureInternalHealthCheck("hc-lock-test", svcName, true, "/", 80)
2572+
require.Error(t, err)
2573+
assert.Contains(t, err.Error(), "Simulated GCP Error")
2574+
2575+
// 2nd request should successfully acquire the lock, create the health check, and succeed.
2576+
// We use a channel to ensure that if the lock was leaked, the test fails quickly instead of timing out.
2577+
errCh := make(chan error, 1)
2578+
hcCh := make(chan *compute.HealthCheck, 1)
2579+
go func() {
2580+
hc, err := gce.ensureInternalHealthCheck("hc-lock-test", svcName, true, "/", 80)
2581+
errCh <- err
2582+
hcCh <- hc
2583+
}()
2584+
2585+
select {
2586+
case err := <-errCh:
2587+
hc := <-hcCh
2588+
require.NoError(t, err)
2589+
assert.NotNil(t, hc)
2590+
case <-time.After(2 * time.Second):
2591+
t.Fatal("Deadlock detected: Second request timed out trying to acquire lock. The lock was likely leaked.")
2592+
}
2593+
2594+
assert.Equal(t, int32(2), atomic.LoadInt32(&calls))
2595+
}
2596+
2597+
func TestEnsureInternalInstanceGroupNodeSyncScaling(t *testing.T) {
2598+
t.Parallel()
2599+
vals := DefaultTestClusterValues()
2600+
gce, _ := fakeGCECloud(vals)
2601+
c := gce.c.(*cloud.MockGCE)
2602+
2603+
igName := "test-ig-node-scale"
2604+
zone := vals.ZoneName
2605+
2606+
// Inject a small sleep in Get and Insert to widen the race window.
2607+
c.MockInstanceGroups.GetHook = func(ctx context.Context, key *meta.Key, m *cloud.MockInstanceGroups, options ...cloud.Option) (bool, *compute.InstanceGroup, error) {
2608+
time.Sleep(2 * time.Millisecond)
2609+
return false, nil, nil
2610+
}
2611+
c.MockInstanceGroups.InsertHook = func(ctx context.Context, key *meta.Key, obj *compute.InstanceGroup, m *cloud.MockInstanceGroups, options ...cloud.Option) (bool, error) {
2612+
time.Sleep(2 * time.Millisecond)
2613+
return false, nil
2614+
}
2615+
2616+
var eg errgroup.Group
2617+
workers := 20
2618+
2619+
for i := 0; i < workers; i++ {
2620+
workerID := i
2621+
eg.Go(func() error {
2622+
var nodes []*v1.Node
2623+
for j := 0; j < (workerID%5)+1; j++ {
2624+
nodeName := fmt.Sprintf("node-%d", j)
2625+
nodes = append(nodes, &v1.Node{
2626+
ObjectMeta: metav1.ObjectMeta{Name: nodeName},
2627+
})
2628+
}
2629+
2630+
_, err := gce.ensureInternalInstanceGroup(igName, zone, nodes, nil)
2631+
return err
2632+
})
2633+
}
2634+
2635+
err := eg.Wait()
2636+
require.NoError(t, err, "All workers should complete without error")
2637+
2638+
// We verify that the final state precisely matches one of the expected valid subsets.
2639+
instances, err := gce.ListInstancesInInstanceGroup(igName, zone, "ALL")
2640+
require.NoError(t, err)
2641+
2642+
actualNodes := make(map[string]bool)
2643+
for _, ins := range instances {
2644+
parts := strings.Split(ins.Instance, "/")
2645+
actualNodes[parts[len(parts)-1]] = true
2646+
}
2647+
2648+
validStates := []map[string]bool{
2649+
{"node-0": true},
2650+
{"node-0": true, "node-1": true},
2651+
{"node-0": true, "node-1": true, "node-2": true},
2652+
{"node-0": true, "node-1": true, "node-2": true, "node-3": true},
2653+
{"node-0": true, "node-1": true, "node-2": true, "node-3": true, "node-4": true},
2654+
}
2655+
2656+
isValid := false
2657+
for _, state := range validStates {
2658+
if reflect.DeepEqual(actualNodes, state) {
2659+
isValid = true
2660+
break
2661+
}
2662+
}
2663+
assert.True(t, isValid, "Final InstanceGroup count should precisely match exactly one of the known synchronized states, got: %v", actualNodes)
2664+
}
2665+
2666+
func TestSharedVsNonSharedHealthCheckContention(t *testing.T) {
2667+
t.Parallel()
2668+
vals := DefaultTestClusterValues()
2669+
gce, _ := fakeGCECloud(vals)
2670+
c := gce.c.(*cloud.MockGCE)
2671+
2672+
svc := fakeLoadbalancerService(string(LBTypeInternal))
2673+
svcName := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}
2674+
2675+
var sharedInsertCount int32
2676+
2677+
c.MockHealthChecks.InsertHook = func(ctx context.Context, key *meta.Key, obj *compute.HealthCheck, m *cloud.MockHealthChecks, options ...cloud.Option) (bool, error) {
2678+
time.Sleep(5 * time.Millisecond)
2679+
if obj.Name == "shared-hc" {
2680+
atomic.AddInt32(&sharedInsertCount, 1)
2681+
}
2682+
return false, nil
2683+
}
2684+
c.MockHealthChecks.GetHook = func(ctx context.Context, key *meta.Key, m *cloud.MockHealthChecks, options ...cloud.Option) (bool, *compute.HealthCheck, error) {
2685+
time.Sleep(5 * time.Millisecond)
2686+
return false, nil, nil
2687+
}
2688+
2689+
var eg errgroup.Group
2690+
workers := 50
2691+
2692+
for i := 0; i < workers; i++ {
2693+
workerID := i
2694+
eg.Go(func() error {
2695+
if workerID%2 == 0 {
2696+
_, err := gce.ensureInternalHealthCheck("shared-hc", svcName, true, "/", 80)
2697+
return err
2698+
} else {
2699+
hcName := fmt.Sprintf("unique-hc-%d", workerID)
2700+
_, err := gce.ensureInternalHealthCheck(hcName, svcName, false, "/", 80)
2701+
return err
2702+
}
2703+
})
2704+
}
2705+
2706+
err := eg.Wait()
2707+
require.NoError(t, err, "All health check routines should complete without error")
2708+
2709+
assert.Equal(t, int32(1), atomic.LoadInt32(&sharedInsertCount), "Shared health check should only be inserted exactly once")
2710+
}

providers/gce/gce_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -638,3 +638,36 @@ func TestGetProjectsBasePath(t *testing.T) {
638638
}
639639
}
640640
}
641+
642+
func TestSharedResourceLocksScoping(t *testing.T) {
643+
vals := DefaultTestClusterValues()
644+
gce := NewFakeGCECloud(vals)
645+
646+
// Use the exact same resource name across different ResourceTypes
647+
// to check that the namespace isolation prevents collisions.
648+
const sharedName = "collide-test"
649+
650+
gce.getLockForResource(ResourceTypeHealthCheck, sharedName).Lock()
651+
gce.getLockForResource(ResourceTypeHealthCheck, sharedName).Unlock()
652+
653+
gce.getLockForResource(ResourceTypeInstanceGroup, sharedName).Lock()
654+
gce.getLockForResource(ResourceTypeInstanceGroup, sharedName).Unlock()
655+
656+
var keys []string
657+
gce.sharedResourceLocks.Range(func(key, value any) bool {
658+
keys = append(keys, key.(string))
659+
return true
660+
})
661+
662+
// This check proves that namespace scoping prevented a key collision.
663+
if len(keys) != 2 {
664+
t.Fatalf("Expected exactly 2 locks (scoping failed to prevent collision), got %d", len(keys))
665+
}
666+
667+
for _, k := range keys {
668+
// Validating the internal prefix grammar.
669+
if !strings.HasPrefix(k, string(ResourceTypeHealthCheck)+":") && !strings.HasPrefix(k, string(ResourceTypeInstanceGroup)+":") {
670+
t.Errorf("Unexpected lock scoped in sharedResourceLocks: %s", k)
671+
}
672+
}
673+
}

0 commit comments

Comments
 (0)