Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion kafka/net/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,10 @@ def _retire_task(self, task):
task.close()

def unschedule(self, task):
was_scheduled = task.scheduled_at is not None
self._remove_scheduled(task)
self._retire_task(task)
if was_scheduled:
self._retire_task(task)

def reschedule(self, when, task):
self._remove_scheduled(task)
Expand Down
17 changes: 17 additions & 0 deletions test/net/test_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,23 @@ async def resolver():
net.poll(timeout_ms=1000, future=done)
assert results == [('a', 'b')]

def test_unschedule_does_not_close_task_in_ready(self):
"""Regression: a timer that has already fired -- popped from the heap
into _ready must survive unschedule(). """
net = NetworkSelector()
fired = []
timer = net.call_at(time.monotonic() - 1, lambda: fired.append(True))
net._schedule_tasks() # move the due timer from the heap into _ready
assert timer in net._ready
assert timer.scheduled_at is None

net.unschedule(timer)

assert not timer.is_done, \
'unschedule() closed a task already queued in _ready'
net.drain() # must drive the queued timer without raising
assert fired == [True]


class TestSlowTaskMonitor:
"""Detection for tasks that hog the event loop (livelock guard).
Expand Down
Loading