A cache-service subscriber can remain registered indefinitely if its stream context is canceled while its outbound message queue is empty. handleMessagesQueue checks ctx.Done() before calling Dequeue(), but once it enters Dequeue() there is no context-aware wakeup path. In that case the goroutine can remain blocked forever, Subscribe never returns, and the observer is never unsubscribed.
Impact
- Dead subscribers can remain in the informer observer list.
- Future informer events continue enqueueing into an unconsumed queue.
- Memory usage can grow over time for abandoned connections.
Related code
Expected behavior
When the client stream context is canceled, the subscriber loop should stop promptly even if there are no pending events, unwind back through Subscribe, and always unregister the observer.
Minimal reproduction
Add and run a test like this:
type fakeEventStream struct {
ctx context.Context
}
func (f *fakeEventStream) Send(*informer.Event) error { return context.Canceled }
func (f *fakeEventStream) SetHeader(metadata.MD) error { return nil }
func (f *fakeEventStream) SendHeader(metadata.MD) error { return nil }
func (f *fakeEventStream) SetTrailer(metadata.MD) {}
func (f *fakeEventStream) Context() context.Context { return f.ctx }
func (f *fakeEventStream) SendMsg(any) error { return nil }
func (f *fakeEventStream) RecvMsg(any) error { return nil }
func TestCanceledConnectionStaysBlockedUntilQueueGetsAnEvent(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
o := &connection{
log: slog.Default(),
server: &fakeEventStream{ctx: ctx},
messages: sync.NewQueue[*informer.Event](),
metrics: instrument.FromContext(context.Background()),
}
done := make(chan struct{})
go func() {
o.handleMessagesQueue(ctx)
close(done)
}()
time.Sleep(50 * time.Millisecond)
cancel()
select {
case <-done:
t.Fatal("handleMessagesQueue exited immediately, expected it to stay blocked in Dequeue")
case <-time.After(100 * time.Millisecond):
}
o.messages.Enqueue(&informer.Event{Type: informer.EventType_SYNC_FINISHED})
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("handleMessagesQueue stayed blocked even after an event was enqueued")
}
}
Then run:
go test ./pkg/kube/kubecache/service -run TestCanceledConnectionStaysBlockedUntilQueueGetsAnEvent
The test should show that canceling the context does not unblock handleMessagesQueue while the queue is empty. The goroutine only exits after a new event is enqueued.
Verification
I reproduced this locally with equivalent test logic.
Suggested fix direction
- Replace the current blocking dequeue with a context-aware queue or channel-based delivery path.
- Alternatively, add a close signal that wakes
Dequeue() when the stream context is canceled.
- Ensure the subscriber loop always returns on stream cancellation so
Unsubscribe is guaranteed to run.
A cache-service subscriber can remain registered indefinitely if its stream context is canceled while its outbound message queue is empty.
handleMessagesQueuechecksctx.Done()before callingDequeue(), but once it entersDequeue()there is no context-aware wakeup path. In that case the goroutine can remain blocked forever,Subscribenever returns, and the observer is never unsubscribed.Impact
Related code
SubscribehandleMessagesQueueQueue.DequeueExpected behavior
When the client stream context is canceled, the subscriber loop should stop promptly even if there are no pending events, unwind back through
Subscribe, and always unregister the observer.Minimal reproduction
Add and run a test like this:
Then run:
go test ./pkg/kube/kubecache/service -run TestCanceledConnectionStaysBlockedUntilQueueGetsAnEventThe test should show that canceling the context does not unblock
handleMessagesQueuewhile the queue is empty. The goroutine only exits after a new event is enqueued.Verification
I reproduced this locally with equivalent test logic.
Suggested fix direction
Dequeue()when the stream context is canceled.Unsubscribeis guaranteed to run.