Skip to content

k8s-cache: canceled subscribers can remain registered forever when the outbound queue is empty #1829

@MrAlias

Description

@MrAlias

A cache-service subscriber can remain registered indefinitely if its stream context is canceled while its outbound message queue is empty. handleMessagesQueue checks ctx.Done() before calling Dequeue(), but once it enters Dequeue() there is no context-aware wakeup path. In that case the goroutine can remain blocked forever, Subscribe never returns, and the observer is never unsubscribed.

Impact

  • Dead subscribers can remain in the informer observer list.
  • Future informer events continue enqueueing into an unconsumed queue.
  • Memory usage can grow over time for abandoned connections.

Related code

Expected behavior

When the client stream context is canceled, the subscriber loop should stop promptly even if there are no pending events, unwind back through Subscribe, and always unregister the observer.

Minimal reproduction

Add and run a test like this:

type fakeEventStream struct {
	ctx context.Context
}

func (f *fakeEventStream) Send(*informer.Event) error { return context.Canceled }
func (f *fakeEventStream) SetHeader(metadata.MD) error { return nil }
func (f *fakeEventStream) SendHeader(metadata.MD) error { return nil }
func (f *fakeEventStream) SetTrailer(metadata.MD) {}
func (f *fakeEventStream) Context() context.Context { return f.ctx }
func (f *fakeEventStream) SendMsg(any) error { return nil }
func (f *fakeEventStream) RecvMsg(any) error { return nil }

func TestCanceledConnectionStaysBlockedUntilQueueGetsAnEvent(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())

	o := &connection{
		log:      slog.Default(),
		server:   &fakeEventStream{ctx: ctx},
		messages: sync.NewQueue[*informer.Event](),
		metrics:  instrument.FromContext(context.Background()),
	}

	done := make(chan struct{})
	go func() {
		o.handleMessagesQueue(ctx)
		close(done)
	}()

	time.Sleep(50 * time.Millisecond)
	cancel()

	select {
	case <-done:
		t.Fatal("handleMessagesQueue exited immediately, expected it to stay blocked in Dequeue")
	case <-time.After(100 * time.Millisecond):
	}

	o.messages.Enqueue(&informer.Event{Type: informer.EventType_SYNC_FINISHED})

	select {
	case <-done:
	case <-time.After(time.Second):
		t.Fatal("handleMessagesQueue stayed blocked even after an event was enqueued")
	}
}

Then run:

go test ./pkg/kube/kubecache/service -run TestCanceledConnectionStaysBlockedUntilQueueGetsAnEvent

The test should show that canceling the context does not unblock handleMessagesQueue while the queue is empty. The goroutine only exits after a new event is enqueued.

Verification

I reproduced this locally with equivalent test logic.

Suggested fix direction

  • Replace the current blocking dequeue with a context-aware queue or channel-based delivery path.
  • Alternatively, add a close signal that wakes Dequeue() when the stream context is canceled.
  • Ensure the subscriber loop always returns on stream cancellation so Unsubscribe is guaranteed to run.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    Status

    Todo

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions