Skip to content

transform: process-event decorator can block store notifications after shutdown #1830

@MrAlias

Description

@MrAlias

The Kubernetes process-event decorator can remain subscribed to the store after its main loop exits. Its On method sends pod events into an unbuffered channel, so after shutdown there is no receiver left. The next store notification can then block inside the dead observer and stall synchronous notifier fanout.

Impact

  • A stopped decorator can still receive store notifications.
  • Store.Notify can block behind a dead observer.
  • Other metadata consumers can stall because notifier delivery is synchronous.

Related code

Expected behavior

When the decorator shuts down, it should stop receiving store events. Store fanout should not be able to block on a stale decorator instance whose receive loop has already exited.

Minimal reproduction

Add and run a test like this:

func TestProcEventDecoratorLeavesBlockingObserverAfterShutdown(t *testing.T) {
	inf := &fakeInformer{}
	store := kube.NewStore(inf, kube.ResourceLabels{}, nil, imetrics.NoopReporter{})
	input := make(chan exec.ProcessEvent)
	output := msg.NewQueue[exec.ProcessEvent](msg.ChannelBufferLen(1))
	defer output.Close()

	dec := procEventMetadataDecorator{
		store:       store,
		log:         slog.With("test", "TestProcEventDecoratorLeavesBlockingObserverAfterShutdown"),
		input:       input,
		output:      output,
		podsInfoCh:  make(chan Event[*informer.ObjectMeta]),
		tracker:     newPidContainerTracker(),
		clusterName: "cluster",
	}

	store.Subscribe(&dec)

	ctx, cancel := context.WithCancel(context.Background())
	done := make(chan struct{})
	go func() {
		dec.k8sLoop(ctx)
		close(done)
	}()

	cancel()

	select {
	case <-done:
	case <-time.After(time.Second):
		t.Fatal("k8sLoop did not stop after context cancellation")
	}

	notifyDone := make(chan struct{})
	go func() {
		store.Notify(&informer.Event{
			Type: informer.EventType_CREATED,
			Resource: &informer.ObjectMeta{
				Name: "pod",
				Kind: "Pod",
				Pod:  &informer.PodInfo{Containers: []*informer.ContainerInfo{{Id: "container-1"}}},
			},
		})
		close(notifyDone)
	}()

	select {
	case <-notifyDone:
		t.Fatal("expected Notify to block because the decorator observer stayed subscribed after shutdown")
	case <-time.After(100 * time.Millisecond):
	}
}

Then run:

go test ./pkg/transform -run TestProcEventDecoratorLeavesBlockingObserverAfterShutdown

The test should show that after k8sLoop exits, a later store.Notify(...) call can still block because the stale observer remains subscribed and tries to send into an unbuffered channel with no receiver.

Verification

I reproduced this locally with equivalent test logic.

Suggested fix direction

  • Subscribe synchronously and defer an Unsubscribe.
  • Ensure shutdown removes the decorator from the store before the receive loop exits.
  • Avoid using an unbuffered cross-goroutine handoff for notifier delivery unless shutdown unblocks it safely.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    Status

    Todo

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions