Skip to content

Commit 30376df

Browse files
authored
fix: taint optimistic locking (#180)
1 parent 608364a commit 30376df

File tree

2 files changed

+311
-21
lines changed

2 files changed

+311
-21
lines changed

internal/controller/node_controller.go

Lines changed: 63 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -241,37 +241,79 @@ func (r *RuleReadinessController) hasTaintBySpec(node *corev1.Node, taintSpec co
241241
}
242242

243243
// addTaintBySpec adds a taint to a node.
244+
// We use client.MergeFromWithOptimisticLock because patching a list with a
245+
// JSON merge patch can cause races due to the fact that it fully replaces
246+
// the list on a change. Optimistic locking ensures the patch fails with a
247+
// conflict error if the node was modified concurrently, allowing the
248+
// controller to retry with fresh state.
244249
func (r *RuleReadinessController) addTaintBySpec(ctx context.Context, node *corev1.Node, taintSpec corev1.Taint, ruleName string) error {
245-
patch := client.StrategicMergeFrom(node.DeepCopy())
246-
node.Spec.Taints = append(node.Spec.Taints, taintSpec)
247-
if err := r.Patch(ctx, node, patch); err != nil {
248-
return err
249-
}
250+
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
251+
// Fetch latest node state
252+
latestNode := &corev1.Node{}
253+
if err := r.Get(ctx, client.ObjectKey{Name: node.Name}, latestNode); err != nil {
254+
return err
255+
}
256+
257+
// Check if taint already exists
258+
if r.hasTaintBySpec(latestNode, taintSpec) {
259+
return nil
260+
}
261+
262+
stored := latestNode.DeepCopy()
263+
latestNode.Spec.Taints = append(latestNode.Spec.Taints, taintSpec)
264+
if err := r.Patch(ctx, latestNode, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
265+
return err
266+
}
267+
268+
message := fmt.Sprintf("Taint '%s:%s' added by rule '%s'", taintSpec.Key, taintSpec.Effect, ruleName)
269+
r.EventRecorder.Event(latestNode, corev1.EventTypeNormal, "TaintAdded", message)
250270

251-
message := fmt.Sprintf("Taint '%s:%s' added by rule '%s'", taintSpec.Key, taintSpec.Effect, ruleName)
252-
r.EventRecorder.Event(node, corev1.EventTypeNormal, "TaintAdded", message)
271+
// Update the original node reference with the latest state
272+
*node = *latestNode
253273

254-
return nil
274+
return nil
275+
})
255276
}
256277

257278
// removeTaintBySpec removes a taint from a node.
279+
// We use client.MergeFromWithOptimisticLock because patching a list with a
280+
// JSON merge patch can cause races due to the fact that it fully replaces
281+
// the list on a change. Optimistic locking ensures the patch fails with a
282+
// conflict error if the node was modified concurrently, allowing the
283+
// controller to retry with fresh state.
258284
func (r *RuleReadinessController) removeTaintBySpec(ctx context.Context, node *corev1.Node, taintSpec corev1.Taint, ruleName string) error {
259-
patch := client.StrategicMergeFrom(node.DeepCopy())
260-
var newTaints []corev1.Taint
261-
for _, taint := range node.Spec.Taints {
262-
if taint.Key != taintSpec.Key || taint.Effect != taintSpec.Effect {
263-
newTaints = append(newTaints, taint)
285+
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
286+
// Fetch latest node state
287+
latestNode := &corev1.Node{}
288+
if err := r.Get(ctx, client.ObjectKey{Name: node.Name}, latestNode); err != nil {
289+
return err
264290
}
265-
}
266-
node.Spec.Taints = newTaints
267-
if err := r.Patch(ctx, node, patch); err != nil {
268-
return err
269-
}
270291

271-
message := fmt.Sprintf("Taint '%s:%s' removed by rule '%s'", taintSpec.Key, taintSpec.Effect, ruleName)
272-
r.EventRecorder.Event(node, corev1.EventTypeNormal, "TaintRemoved", message)
292+
// Check if taint is already absent
293+
if !r.hasTaintBySpec(latestNode, taintSpec) {
294+
return nil
295+
}
273296

274-
return nil
297+
stored := latestNode.DeepCopy()
298+
var newTaints []corev1.Taint
299+
for _, taint := range latestNode.Spec.Taints {
300+
if taint.Key != taintSpec.Key || taint.Effect != taintSpec.Effect {
301+
newTaints = append(newTaints, taint)
302+
}
303+
}
304+
latestNode.Spec.Taints = newTaints
305+
if err := r.Patch(ctx, latestNode, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
306+
return err
307+
}
308+
309+
message := fmt.Sprintf("Taint '%s:%s' removed by rule '%s'", taintSpec.Key, taintSpec.Effect, ruleName)
310+
r.EventRecorder.Event(latestNode, corev1.EventTypeNormal, "TaintRemoved", message)
311+
312+
// Update the original node reference with the latest state
313+
*node = *latestNode
314+
315+
return nil
316+
})
275317
}
276318

277319
// Bootstrap completion tracking.

internal/controller/node_controller_test.go

Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,21 @@ package controller
1818

1919
import (
2020
"context"
21+
"sync/atomic"
2122
"time"
2223

2324
. "github.com/onsi/ginkgo/v2"
2425
. "github.com/onsi/gomega"
2526
corev1 "k8s.io/api/core/v1"
2627
apierrors "k8s.io/apimachinery/pkg/api/errors"
2728
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29+
"k8s.io/apimachinery/pkg/runtime"
2830
"k8s.io/apimachinery/pkg/types"
2931
"k8s.io/client-go/kubernetes/fake"
3032
"k8s.io/client-go/tools/record"
33+
"sigs.k8s.io/controller-runtime/pkg/client"
34+
fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake"
35+
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"
3136
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3237

3338
nodereadinessiov1alpha1 "sigs.k8s.io/node-readiness-controller/api/v1alpha1"
@@ -689,4 +694,247 @@ var _ = Describe("Node Controller", func() {
689694
}, time.Second*5).Should(BeTrue(), "NodeEvaluation should be updated with new condition and taint status")
690695
})
691696
})
697+
698+
// These tests use the controller-runtime fake client (not envtest's
699+
// k8sClient) with interceptors to simulate concurrent node modifications.
700+
// The fake client enforces resourceVersion checks, so when
701+
// MergeFromWithOptimisticLock is used and another write bumps the
702+
// resourceVersion, the patch fails with a Conflict error — the same
703+
// behavior a real API server would produce.
704+
Context("optimistic locking on taint operations", func() {
705+
var (
706+
ctx context.Context
707+
testScheme *runtime.Scheme
708+
)
709+
710+
BeforeEach(func() {
711+
ctx = context.Background()
712+
testScheme = runtime.NewScheme()
713+
Expect(corev1.AddToScheme(testScheme)).To(Succeed())
714+
})
715+
716+
It("should retry and succeed when removeTaintBySpec encounters a conflict", func() {
717+
node := &corev1.Node{
718+
ObjectMeta: metav1.ObjectMeta{Name: "ol-remove-conflict"},
719+
Spec: corev1.NodeSpec{
720+
Taints: []corev1.Taint{
721+
{Key: "readiness.k8s.io/test", Effect: corev1.TaintEffectNoSchedule},
722+
{Key: "other-controller/taint", Effect: corev1.TaintEffectNoSchedule},
723+
},
724+
},
725+
}
726+
727+
var patchCount atomic.Int32
728+
729+
// The interceptor simulates a concurrent modification: on the
730+
// first Patch call it updates the node (bumping resourceVersion)
731+
// before delegating to the real Patch. Because
732+
// MergeFromWithOptimisticLock embeds the original resourceVersion,
733+
// the fake client detects the mismatch and returns a Conflict.
734+
// The retry logic should handle this and succeed on the second attempt.
735+
fc := fakeclient.NewClientBuilder().
736+
WithScheme(testScheme).
737+
WithObjects(node).
738+
WithInterceptorFuncs(interceptor.Funcs{
739+
Patch: func(ctx context.Context, c client.WithWatch, obj client.Object, patch client.Patch, opts ...client.PatchOption) error {
740+
if obj.GetName() == "ol-remove-conflict" && patchCount.Add(1) == 1 {
741+
// Simulate concurrent modification by another controller.
742+
current := &corev1.Node{}
743+
Expect(c.Get(ctx, types.NamespacedName{Name: obj.GetName()}, current)).To(Succeed())
744+
current.Spec.Taints = append(current.Spec.Taints, corev1.Taint{
745+
Key: "concurrent-controller/new-taint", Effect: corev1.TaintEffectNoSchedule,
746+
})
747+
Expect(c.Update(ctx, current)).To(Succeed())
748+
}
749+
return c.Patch(ctx, obj, patch, opts...)
750+
},
751+
}).
752+
Build()
753+
754+
controller := &RuleReadinessController{
755+
Client: fc,
756+
Scheme: testScheme,
757+
clientset: fake.NewSimpleClientset(),
758+
ruleCache: make(map[string]*nodereadinessiov1alpha1.NodeReadinessRule),
759+
EventRecorder: record.NewFakeRecorder(10),
760+
}
761+
762+
Expect(fc.Get(ctx, types.NamespacedName{Name: node.Name}, node)).To(Succeed())
763+
764+
err := controller.removeTaintBySpec(ctx, node, corev1.Taint{
765+
Key: "readiness.k8s.io/test",
766+
Effect: corev1.TaintEffectNoSchedule,
767+
}, "test-rule")
768+
769+
// Should succeed after retry
770+
Expect(err).NotTo(HaveOccurred())
771+
772+
// Verify the taint was removed and concurrent modification was preserved
773+
updated := &corev1.Node{}
774+
Expect(fc.Get(ctx, types.NamespacedName{Name: node.Name}, updated)).To(Succeed())
775+
Expect(updated.Spec.Taints).To(HaveLen(2))
776+
777+
// Check that our taint was removed but the others remain
778+
taintKeys := make(map[string]bool)
779+
for _, taint := range updated.Spec.Taints {
780+
taintKeys[taint.Key] = true
781+
}
782+
Expect(taintKeys).NotTo(HaveKey("readiness.k8s.io/test"))
783+
Expect(taintKeys).To(HaveKey("other-controller/taint"))
784+
Expect(taintKeys).To(HaveKey("concurrent-controller/new-taint"))
785+
786+
// Verify that the patch was attempted twice (first failed, second succeeded)
787+
Expect(patchCount.Load()).To(BeNumerically(">=", 2))
788+
})
789+
790+
It("should retry and succeed when addTaintBySpec encounters a conflict", func() {
791+
node := &corev1.Node{
792+
ObjectMeta: metav1.ObjectMeta{Name: "ol-add-conflict"},
793+
Spec: corev1.NodeSpec{
794+
Taints: []corev1.Taint{
795+
{Key: "other-controller/taint", Effect: corev1.TaintEffectNoSchedule},
796+
},
797+
},
798+
}
799+
800+
var patchCount atomic.Int32
801+
802+
// The interceptor simulates a concurrent modification on the first
803+
// patch attempt, which should trigger a retry that succeeds.
804+
fc := fakeclient.NewClientBuilder().
805+
WithScheme(testScheme).
806+
WithObjects(node).
807+
WithInterceptorFuncs(interceptor.Funcs{
808+
Patch: func(ctx context.Context, c client.WithWatch, obj client.Object, patch client.Patch, opts ...client.PatchOption) error {
809+
if obj.GetName() == "ol-add-conflict" && patchCount.Add(1) == 1 {
810+
current := &corev1.Node{}
811+
Expect(c.Get(ctx, types.NamespacedName{Name: obj.GetName()}, current)).To(Succeed())
812+
current.Spec.Taints = append(current.Spec.Taints, corev1.Taint{
813+
Key: "concurrent-controller/new-taint", Effect: corev1.TaintEffectNoSchedule,
814+
})
815+
Expect(c.Update(ctx, current)).To(Succeed())
816+
}
817+
return c.Patch(ctx, obj, patch, opts...)
818+
},
819+
}).
820+
Build()
821+
822+
controller := &RuleReadinessController{
823+
Client: fc,
824+
Scheme: testScheme,
825+
clientset: fake.NewSimpleClientset(),
826+
ruleCache: make(map[string]*nodereadinessiov1alpha1.NodeReadinessRule),
827+
EventRecorder: record.NewFakeRecorder(10),
828+
}
829+
830+
Expect(fc.Get(ctx, types.NamespacedName{Name: node.Name}, node)).To(Succeed())
831+
832+
err := controller.addTaintBySpec(ctx, node, corev1.Taint{
833+
Key: "readiness.k8s.io/test",
834+
Effect: corev1.TaintEffectNoSchedule,
835+
}, "test-rule")
836+
837+
// Should succeed after retry
838+
Expect(err).NotTo(HaveOccurred())
839+
840+
// Verify both taints are present (ours and the concurrent one)
841+
updated := &corev1.Node{}
842+
Expect(fc.Get(ctx, types.NamespacedName{Name: node.Name}, updated)).To(Succeed())
843+
Expect(updated.Spec.Taints).To(HaveLen(3))
844+
845+
// Check that all expected taints are present
846+
taintKeys := make(map[string]bool)
847+
for _, taint := range updated.Spec.Taints {
848+
taintKeys[taint.Key] = true
849+
}
850+
Expect(taintKeys).To(HaveKey("readiness.k8s.io/test"))
851+
Expect(taintKeys).To(HaveKey("other-controller/taint"))
852+
Expect(taintKeys).To(HaveKey("concurrent-controller/new-taint"))
853+
854+
// Verify that the patch was attempted twice (first failed, second succeeded)
855+
Expect(patchCount.Load()).To(BeNumerically(">=", 2))
856+
})
857+
858+
It("should succeed when no concurrent modification occurs", func() {
859+
node := &corev1.Node{
860+
ObjectMeta: metav1.ObjectMeta{Name: "ol-no-conflict"},
861+
Spec: corev1.NodeSpec{
862+
Taints: []corev1.Taint{
863+
{Key: "readiness.k8s.io/test", Effect: corev1.TaintEffectNoSchedule},
864+
{Key: "other/taint", Effect: corev1.TaintEffectNoSchedule},
865+
},
866+
},
867+
}
868+
869+
fc := fakeclient.NewClientBuilder().
870+
WithScheme(testScheme).
871+
WithObjects(node).
872+
Build()
873+
874+
controller := &RuleReadinessController{
875+
Client: fc,
876+
Scheme: testScheme,
877+
clientset: fake.NewSimpleClientset(),
878+
ruleCache: make(map[string]*nodereadinessiov1alpha1.NodeReadinessRule),
879+
EventRecorder: record.NewFakeRecorder(10),
880+
}
881+
882+
Expect(fc.Get(ctx, types.NamespacedName{Name: node.Name}, node)).To(Succeed())
883+
884+
err := controller.removeTaintBySpec(ctx, node, corev1.Taint{
885+
Key: "readiness.k8s.io/test",
886+
Effect: corev1.TaintEffectNoSchedule,
887+
}, "test-rule")
888+
Expect(err).NotTo(HaveOccurred())
889+
890+
updated := &corev1.Node{}
891+
Expect(fc.Get(ctx, types.NamespacedName{Name: node.Name}, updated)).To(Succeed())
892+
Expect(updated.Spec.Taints).To(HaveLen(1))
893+
Expect(updated.Spec.Taints[0].Key).To(Equal("other/taint"))
894+
})
895+
896+
It("should skip patch when removing a taint that does not exist", func() {
897+
node := &corev1.Node{
898+
ObjectMeta: metav1.ObjectMeta{Name: "ol-noop"},
899+
Spec: corev1.NodeSpec{
900+
Taints: []corev1.Taint{
901+
{Key: "other/taint", Effect: corev1.TaintEffectNoSchedule},
902+
},
903+
},
904+
}
905+
906+
var patchCalled atomic.Bool
907+
908+
fc := fakeclient.NewClientBuilder().
909+
WithScheme(testScheme).
910+
WithObjects(node).
911+
WithInterceptorFuncs(interceptor.Funcs{
912+
Patch: func(ctx context.Context, c client.WithWatch, obj client.Object, patch client.Patch, opts ...client.PatchOption) error {
913+
if obj.GetName() == "ol-noop" {
914+
patchCalled.Store(true)
915+
}
916+
return c.Patch(ctx, obj, patch, opts...)
917+
},
918+
}).
919+
Build()
920+
921+
controller := &RuleReadinessController{
922+
Client: fc,
923+
Scheme: testScheme,
924+
clientset: fake.NewSimpleClientset(),
925+
ruleCache: make(map[string]*nodereadinessiov1alpha1.NodeReadinessRule),
926+
EventRecorder: record.NewFakeRecorder(10),
927+
}
928+
929+
Expect(fc.Get(ctx, types.NamespacedName{Name: node.Name}, node)).To(Succeed())
930+
931+
err := controller.removeTaintBySpec(ctx, node, corev1.Taint{
932+
Key: "readiness.k8s.io/nonexistent",
933+
Effect: corev1.TaintEffectNoSchedule,
934+
}, "test-rule")
935+
Expect(err).NotTo(HaveOccurred())
936+
Expect(patchCalled.Load()).To(BeFalse(),
937+
"Patch should not be called when taint removal is a no-op")
938+
})
939+
})
692940
})

0 commit comments

Comments
 (0)