diff --git a/kafka/net/selector.py b/kafka/net/selector.py index 0b6af0196..a4440da4d 100644 --- a/kafka/net/selector.py +++ b/kafka/net/selector.py @@ -396,8 +396,10 @@ def _retire_task(self, task): task.close() def unschedule(self, task): + was_scheduled = task.scheduled_at is not None self._remove_scheduled(task) - self._retire_task(task) + if was_scheduled: + self._retire_task(task) def reschedule(self, when, task): self._remove_scheduled(task) diff --git a/test/net/test_selector.py b/test/net/test_selector.py index 34042cdd6..dada57ce4 100644 --- a/test/net/test_selector.py +++ b/test/net/test_selector.py @@ -507,6 +507,23 @@ async def resolver(): net.poll(timeout_ms=1000, future=done) assert results == [('a', 'b')] + def test_unschedule_does_not_close_task_in_ready(self): + """Regression: a timer that has already fired -- popped from the heap + into _ready must survive unschedule(). """ + net = NetworkSelector() + fired = [] + timer = net.call_at(time.monotonic() - 1, lambda: fired.append(True)) + net._schedule_tasks() # move the due timer from the heap into _ready + assert timer in net._ready + assert timer.scheduled_at is None + + net.unschedule(timer) + + assert not timer.is_done, \ + 'unschedule() closed a task already queued in _ready' + net.drain() # must drive the queued timer without raising + assert fired == [True] + class TestSlowTaskMonitor: """Detection for tasks that hog the event loop (livelock guard).