From ab0fba29498fa74f336a67cf77ebe23b89cda022 Mon Sep 17 00:00:00 2001 From: Adrian Zdobylak Date: Thu, 18 Jun 2026 17:17:56 +0200 Subject: [PATCH 1/2] add test case: unschedule should not close ready task --- test/net/test_selector.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/test/net/test_selector.py b/test/net/test_selector.py index 34042cdd6..dada57ce4 100644 --- a/test/net/test_selector.py +++ b/test/net/test_selector.py @@ -507,6 +507,23 @@ async def resolver(): net.poll(timeout_ms=1000, future=done) assert results == [('a', 'b')] + def test_unschedule_does_not_close_task_in_ready(self): + """Regression: a timer that has already fired -- popped from the heap + into _ready must survive unschedule(). """ + net = NetworkSelector() + fired = [] + timer = net.call_at(time.monotonic() - 1, lambda: fired.append(True)) + net._schedule_tasks() # move the due timer from the heap into _ready + assert timer in net._ready + assert timer.scheduled_at is None + + net.unschedule(timer) + + assert not timer.is_done, \ + 'unschedule() closed a task already queued in _ready' + net.drain() # must drive the queued timer without raising + assert fired == [True] + class TestSlowTaskMonitor: """Detection for tasks that hog the event loop (livelock guard). From 75851e82971643cc0a64c42d0a6bdb474263695f Mon Sep 17 00:00:00 2001 From: Adrian Zdobylak Date: Thu, 18 Jun 2026 17:20:55 +0200 Subject: [PATCH 2/2] retire only if task was scheduled --- kafka/net/selector.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kafka/net/selector.py b/kafka/net/selector.py index 0b6af0196..a4440da4d 100644 --- a/kafka/net/selector.py +++ b/kafka/net/selector.py @@ -396,8 +396,10 @@ def _retire_task(self, task): task.close() def unschedule(self, task): + was_scheduled = task.scheduled_at is not None self._remove_scheduled(task) - self._retire_task(task) + if was_scheduled: + self._retire_task(task) def reschedule(self, when, task): self._remove_scheduled(task)