Skip to content

Commit 4fb032f

Browse files
author
Dorian Birraux
committed
Kernel controller is more robust to terminated process
When the kernel is crashing or when it quits (e.g. due to Quit[]) the kernel controller on the python side was not checking the state. While waiting for a response (recv), regularly check for the process state. More robust future manipulation, checking for aborted future before setting its value.
1 parent 26f7fa3 commit 4fb032f

2 files changed

Lines changed: 36 additions & 7 deletions

File tree

wolframclient/evaluation/kernel/kernelcontroller.py

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,13 @@ def terminate(self):
483483
if not self.started:
484484
future.set_result(True)
485485
return future
486-
self.enqueue_task(self.STOP, future, None)
486+
# only enqueue task if the Event is not triggered.
487+
# when trigger_termination_requested is set, the run function is
488+
# already dealing with the exception and is about to terminate.
489+
if self.trigger_termination_requested.is_set():
490+
future.set_result(True)
491+
else:
492+
self.enqueue_task(self.STOP, future, None)
487493
self._state_terminated = True
488494
self.trigger_termination_requested.set()
489495
return future
@@ -496,13 +502,26 @@ def join(self, timeout=None):
496502
def evaluate_future(self, wxf, future, result_update_callback=None, **kwargs):
497503
self.enqueue_task(wxf, future, result_update_callback)
498504

505+
def _recv_check_process(self, copy=False):
506+
"""
507+
Call recv on the kernel input socket. Regularly check that the kernel process
508+
is running.
509+
:param copy: whether or not to copy the socket data. Default is False.
510+
:return:
511+
"""
512+
try:
513+
return self.kernel_socket_in.recv_abortable(copy=copy, abort_event=_KernelProcessDied(self.kernel_proc))
514+
except SocketAborted:
515+
logger.info("Kernel process is not running anymore.")
516+
raise WolframKernelException("Kernel is not running anymore.")
517+
499518
def _do_evaluate(self, wxf, future, result_update_callback):
500519
start = time.perf_counter()
501520
self.kernel_socket_out.send(zmq.Frame(wxf))
502521
if logger.isEnabledFor(logging.DEBUG):
503522
logger.debug("Expression sent to kernel in %.06fsec", time.perf_counter() - start)
504523
start = time.perf_counter()
505-
wxf_eval_data = self.kernel_socket_in.recv_abortable(copy=False)
524+
wxf_eval_data = self._recv_check_process()
506525
if logger.isEnabledFor(logging.DEBUG):
507526
logger.debug(
508527
"Expression received from kernel after %.06fsec", time.perf_counter() - start
@@ -577,11 +596,11 @@ def run(self):
577596
else:
578597
self._kernel_stop()
579598
except Exception as e:
580-
if future:
599+
if future and not future.cancelled():
581600
future.set_exception(e)
582601
future = None
583602
finally:
584-
if future:
603+
if future and not future.cancelled():
585604
future.set_result(True)
586605

587606
def _cancel_tasks(self):
@@ -611,3 +630,10 @@ def __init__(self, subprocess, abort_event):
611630

612631
def is_set(self):
613632
return self.subprocess.poll() is not None or self.abort_event.is_set()
633+
634+
class _KernelProcessDied(object):
635+
def __init__(self, subprocess):
636+
self.subprocess = subprocess
637+
638+
def is_set(self):
639+
return self.subprocess.poll() is not None

wolframclient/evaluation/kernel/localsession.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,13 +170,16 @@ def start_future(self):
170170
return future
171171

172172
def stop(self):
173-
""" Request the Wolfram kernel to stop gracefully. """
173+
""" Request the Wolfram kernel to stop gracefully.
174+
When recovering from an error state, it's recommended to use terminate() which
175+
is more resilient.
176+
"""
174177
self._stop(gracefully=True)
175178

176179
def terminate(self):
177180
""" Request the Wolfram kernel to stop immediately.
178-
179-
Ongoing evaluations may be cancelled. """
181+
Ongoing evaluations are likely to be cancelled.
182+
"""
180183
self._stop(gracefully=False)
181184

182185
def _stop(self, gracefully=True):

0 commit comments

Comments
 (0)