Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions pkg/noderesourcetopology/cache/overreserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ func (ov *OverReserve) ReserveNodeResources(nodeName string, pod *corev1.Pod) {
lh := ov.lh.WithValues(logging.KeyPod, klog.KObj(pod), logging.KeyPodUID, logging.PodUID(pod), logging.KeyNode, nodeName)
ov.lock.Lock()
defer ov.lock.Unlock()
if !ov.nrts.Contains(nodeName) {
lh.V(5).Info("ignoring reserve", "nrtinfo", "missing")
return
}
nodeAssumedResources, ok := ov.assumedResources[nodeName]
if !ok {
nodeAssumedResources = newResourceStore(ov.lh)
Expand All @@ -157,9 +161,6 @@ func (ov *OverReserve) ReserveNodeResources(nodeName string, pod *corev1.Pod) {

nodeAssumedResources.AddPod(pod)
lh.V(2).Info("post reserve", logging.KeyNode, nodeName, "assumedResources", nodeAssumedResources.String())

ov.nodesMaybeOverreserved.Delete(nodeName)
lh.V(6).Info("reset discard counter", logging.KeyNode, nodeName)
}

func (ov *OverReserve) UnreserveNodeResources(nodeName string, pod *corev1.Pod) {
Expand Down Expand Up @@ -365,8 +366,18 @@ func (ov *OverReserve) FlushNodes(lh logr.Logger, nrts ...*topologyv1alpha2.Node
}

// to be used only in tests
func (ov *OverReserve) Store() *nrtStore {
return ov.nrts
func (ov *OverReserve) Inject(nrt *topologyv1alpha2.NodeResourceTopology) {
ov.lock.Lock()
defer ov.lock.Unlock()
ov.nrts.Update(nrt)
}

// to be used only in tests
func (ov *OverReserve) HasAssumedResources(nodeName string) bool {
ov.lock.Lock()
defer ov.lock.Unlock()
_, ok := ov.assumedResources[nodeName]
return ok
}

func makeNodeToPodDataMap(lh logr.Logger, podLister podlisterv1.PodLister, isPodRelevant podprovider.PodFilterFunc) (map[string][]podData, error) {
Expand Down
142 changes: 128 additions & 14 deletions pkg/noderesourcetopology/cache/overreserve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func TestDirtyNodesMarkDiscarded(t *testing.T) {
}
}

func TestDirtyNodesUnmarkedOnReserve(t *testing.T) {
func TestDirtyNodesNotUnmarkedOnReserve(t *testing.T) {
fakeClient, err := tu.NewFakeClient()
if err != nil {
t.Fatal(err)
Expand All @@ -183,6 +183,11 @@ func TestDirtyNodesUnmarkedOnReserve(t *testing.T) {
"node-4",
}

// NRTs must be in the store for Reserve to track assumed resources
for _, nodeName := range availNodes {
nrtCache.Inject(makeTestNRT(nodeName))
}

for _, nodeName := range availNodes {
nrtCache.ReserveNodeResources(nodeName, &corev1.Pod{})
}
Expand All @@ -196,17 +201,31 @@ func TestDirtyNodesUnmarkedOnReserve(t *testing.T) {
nrtCache.NodeMaybeOverReserved(nodeName, &corev1.Pod{})
}

// assume noe update which unblocks node-4
// Reserve does NOT clear the dirty flag; only FlushNodes does.
nrtCache.ReserveNodeResources("node-4", &corev1.Pod{})

expectedNodes := []string{
"node-1",
dirtyNodes = nrtCache.GetDesyncedNodes(klog.Background())

if dirtyNodes.DirtyCount() != 2 {
t.Errorf("both nodes should still be dirty after Reserve, got: %v", dirtyNodes.MaybeOverReserved)
}
}

dirtyNodes = nrtCache.GetDesyncedNodes(klog.Background())
func TestReserveSkipsWithoutNRT(t *testing.T) {
fakeClient, err := tu.NewFakeClient()
if err != nil {
t.Fatal(err)
}

if !reflect.DeepEqual(dirtyNodes.MaybeOverReserved, expectedNodes) {
t.Errorf("got=%v expected=%v", dirtyNodes.MaybeOverReserved, expectedNodes)
nrtCache := mustOverReserve(t, fakeClient, &fakePodLister{})

// Reserve on a node with no NRT in the store should be a no-op
nrtCache.ReserveNodeResources("ghost-node", &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "test-pod", Namespace: "default"},
})

if nrtCache.HasAssumedResources("ghost-node") {
t.Errorf("Reserve should not accumulate assumedResources for a node without NRT in the store")
}
}

Expand Down Expand Up @@ -248,7 +267,7 @@ func TestGetCachedNRTCopyReserve(t *testing.T) {

nodeTopologies := makeDefaultTestTopology()
for _, obj := range nodeTopologies {
nrtCache.Store().Update(obj)
nrtCache.Inject(obj)
}

testPod := &corev1.Pod{
Expand Down Expand Up @@ -300,7 +319,7 @@ func TestGetCachedNRTCopyReleaseNone(t *testing.T) {

nodeTopologies := makeDefaultTestTopology()
for _, obj := range nodeTopologies {
nrtCache.Store().Update(obj)
nrtCache.Inject(obj)
}

testPod := &corev1.Pod{
Expand Down Expand Up @@ -341,7 +360,7 @@ func TestGetCachedNRTCopyReserveRelease(t *testing.T) {

nodeTopologies := makeDefaultTestTopology()
for _, obj := range nodeTopologies {
nrtCache.Store().Update(obj)
nrtCache.Inject(obj)
}

testPod := &corev1.Pod{
Expand Down Expand Up @@ -383,7 +402,7 @@ func TestFlush(t *testing.T) {

nodeTopologies := makeDefaultTestTopology()
for _, obj := range nodeTopologies {
nrtCache.Store().Update(obj)
nrtCache.Inject(obj)
}

testPod := &corev1.Pod{
Expand Down Expand Up @@ -478,7 +497,7 @@ func TestResyncNoPodFingerprint(t *testing.T) {

nodeTopologies := makeDefaultTestTopology()
for _, obj := range nodeTopologies {
nrtCache.Store().Update(obj)
nrtCache.Inject(obj)
}

testPod := &corev1.Pod{
Expand Down Expand Up @@ -556,7 +575,7 @@ func TestResyncMatchFingerprint(t *testing.T) {

nodeTopologies := makeDefaultTestTopology()
for _, obj := range nodeTopologies {
nrtCache.Store().Update(obj)
nrtCache.Inject(obj)
}

testPod := &corev1.Pod{
Expand Down Expand Up @@ -649,6 +668,101 @@ func isNRTEqual(a, b *topologyv1alpha2.NodeResourceTopology) bool {
equality.Semantic.DeepDerivative(a.Attributes, b.Attributes)
}

func TestResyncFingerprintMismatchKeepsNodeDirty(t *testing.T) {
fakeClient, err := tu.NewFakeClient()
if err != nil {
t.Fatal(err)
}

fakePodLister := &fakePodLister{}

nrtCache := mustOverReserve(t, fakeClient, fakePodLister)

nodeTopologies := makeDefaultTestTopology()
for _, obj := range nodeTopologies {
nrtCache.Inject(obj)
}

testPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
Namespace: "namespace1",
},
Spec: corev1.PodSpec{
NodeName: "node1",
Containers: []corev1.Container{
{
Resources: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("8"),
corev1.ResourceMemory: resource.MustParse("16Gi"),
},
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("8"),
corev1.ResourceMemory: resource.MustParse("16Gi"),
},
},
},
},
},
}
nrtCache.ReserveNodeResources("node1", testPod)
nrtCache.NodeMaybeOverReserved("node1", testPod)

// NRT on the API server has a fingerprint that does NOT match the pods
// in the lister. This forces a ErrSignatureMismatch in Resync().
nrtWithBadFingerprint := &topologyv1alpha2.NodeResourceTopology{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
Annotations: map[string]string{
podfingerprint.Annotation: "pfp0vFFFFdeadbeef000000",
},
},
Attributes: topologyv1alpha2.AttributeList{
{
Name: podfingerprint.Attribute,
Value: "pfp0vFFFFdeadbeef000000",
},
},
TopologyPolicies: []string{string(topologyv1alpha2.SingleNUMANodeContainerLevel)},
Zones: topologyv1alpha2.ZoneList{
{
Name: "node-0",
Type: "Node",
Resources: topologyv1alpha2.ResourceInfoList{
MakeTopologyResInfo(cpu, "32", "30"),
MakeTopologyResInfo(memory, "64Gi", "60Gi"),
MakeTopologyResInfo(nicResourceName, "16", "16"),
},
},
{
Name: "node-1",
Type: "Node",
Resources: topologyv1alpha2.ResourceInfoList{
MakeTopologyResInfo(cpu, "32", "22"),
MakeTopologyResInfo(memory, "64Gi", "44Gi"),
MakeTopologyResInfo(nicResourceName, "16", "16"),
},
},
},
}

runningPod := testPod.DeepCopy()
runningPod.Status.Phase = corev1.PodRunning

if err := fakeClient.Create(context.Background(), nrtWithBadFingerprint); err != nil {
t.Fatal(err)
}
fakePodLister.AddPod(runningPod)

nrtCache.Resync()

dirtyNodes := nrtCache.GetDesyncedNodes(klog.Background())
if dirtyNodes.Len() != 1 || dirtyNodes.MaybeOverReserved[0] != "node1" {
t.Errorf("node should stay dirty after fingerprint mismatch, got: %v", dirtyNodes.MaybeOverReserved)
}
}

func TestUnknownNodeWithForeignPods(t *testing.T) {
fakeClient, err := tu.NewFakeClient()
if err != nil {
Expand Down Expand Up @@ -728,7 +842,7 @@ func TestNodeWithForeignPods(t *testing.T) {
},
}
for _, obj := range nodeTopologies {
nrtCache.Store().Update(obj)
nrtCache.Inject(obj)
}

target := "node2"
Expand Down
2 changes: 1 addition & 1 deletion pkg/noderesourcetopology/cache/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
)

// nrtStore maps the NRT data by node name. It is not thread safe and needs to be protected by a lock.
// data is intentionally copied each time it enters and exists the store. E.g, no pointer sharing.
// data is intentionally copied each time it enters and exits the store. E.g, no pointer sharing.
type nrtStore struct {
data map[string]*topologyv1alpha2.NodeResourceTopology
lh logr.Logger
Expand Down