Skip to content

Commit 27cffef

Browse files
author
Dorian Birraux
committed
Merge pull request #250 in LCL/wolframclientforpython from bugfix/github-30-Quit-handling to master
* commit '0c30a10433232a4b532302475d3fc58148d3ad37': code formating Add a test for a time constrained computation followed with a terminate call update changelog update version to 1.1.7 Future check for done instead of cancelled recv also check for termination event Remove pip as a dependency Add tests running Quit Kernel controller is more robust to terminated process
2 parents 26f7fa3 + 0c30a10 commit 27cffef

8 files changed

Lines changed: 113 additions & 18 deletions

File tree

CHANGELOG.md

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1+
# Version 1.1.7
2+
- kernel controller class detects terminated kernel process while waiting for an evaluation result instead of hanging.
3+
14
# Version 1.1.5 and 1.1.6
2-
- update Wolfram Language code with latest addition from 12.3.
3-
- fix various deprecated naming convention in class names.
4-
5+
- update Wolfram Language code with latest addition from 12.3.
6+
- fix various deprecated naming convention in class names.
7+
58
# Version 1.1.4
69
- Async cloud evaluator based on `aiohttp` now use `certifi` to create a default `SSLContext` if none is provided. Other cloud evaluator are based on the `requests` module which also uses this library.
710
- Updating dependency list accordingly in `setup.py`. `certifi` was already listed as a `requests` dependency, so this should have no direct impact on user site package.

setup.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ def load_tests():
6060
test_suite='setup.load_tests',
6161
python_requires='>=3.5.3',
6262
install_requires = [
63-
'pip',
6463
'numpy',
6564
'pytz',
6665
'requests',

wolframclient/about.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22

33
__name__ = "wolframclient"
44
__description__ = "A Python library with various tools to interact with the Wolfram Language and the Wolfram Cloud."
5-
__version__ = "1.1.6"
5+
__version__ = "1.1.7"
66
__author__ = "Wolfram Research"
77
__author_email__ = "support@wolfram.com, dorianb@wolfram.com, riccardod@wolfram.com"

wolframclient/evaluation/kernel/kernelcontroller.py

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,7 @@ def _kernel_start(self):
434434
# on the kernel side.
435435
response = self.kernel_socket_in.recv_abortable(
436436
timeout=self.get_parameter("STARTUP_TIMEOUT"),
437-
abort_event=_StartEvent(self.kernel_proc, self.trigger_termination_requested),
437+
abort_event=self._new_running_event(),
438438
)
439439
if response == self._KERNEL_OK:
440440
if logger.isEnabledFor(logging.INFO):
@@ -483,9 +483,15 @@ def terminate(self):
483483
if not self.started:
484484
future.set_result(True)
485485
return future
486-
self.enqueue_task(self.STOP, future, None)
486+
# only enqueue task if the Event is not triggered.
487+
# when trigger_termination_requested is set, the run function is
488+
# already dealing with the exception and is about to terminate.
489+
if not self.trigger_termination_requested.is_set():
490+
self.enqueue_task(self.STOP, future, None)
491+
self.trigger_termination_requested.set()
492+
493+
future.set_result(True)
487494
self._state_terminated = True
488-
self.trigger_termination_requested.set()
489495
return future
490496

491497
def join(self, timeout=None):
@@ -496,13 +502,37 @@ def join(self, timeout=None):
496502
def evaluate_future(self, wxf, future, result_update_callback=None, **kwargs):
497503
self.enqueue_task(wxf, future, result_update_callback)
498504

505+
def _new_running_event(self):
506+
"""
507+
Create a new event that triggers when the kernel process has terminated or when termination was requested.
508+
:return:
509+
"""
510+
return _ProcessAliveNotAbortedEvent(
511+
self.kernel_proc, self.trigger_termination_requested
512+
)
513+
514+
def _recv_check_process(self, copy=False):
515+
"""
516+
Call recv on the kernel input socket. Regularly check that the kernel process
517+
is running.
518+
:param copy: whether or not to copy the socket data. Default is False.
519+
:return:
520+
"""
521+
try:
522+
return self.kernel_socket_in.recv_abortable(
523+
copy=copy, abort_event=self._new_running_event()
524+
)
525+
except SocketAborted:
526+
logger.info("Kernel process is not running anymore.")
527+
raise WolframKernelException("Kernel is not running anymore.")
528+
499529
def _do_evaluate(self, wxf, future, result_update_callback):
500530
start = time.perf_counter()
501531
self.kernel_socket_out.send(zmq.Frame(wxf))
502532
if logger.isEnabledFor(logging.DEBUG):
503533
logger.debug("Expression sent to kernel in %.06fsec", time.perf_counter() - start)
504534
start = time.perf_counter()
505-
wxf_eval_data = self.kernel_socket_in.recv_abortable(copy=False)
535+
wxf_eval_data = self._recv_check_process()
506536
if logger.isEnabledFor(logging.DEBUG):
507537
logger.debug(
508538
"Expression received from kernel after %.06fsec", time.perf_counter() - start
@@ -562,7 +592,7 @@ def run(self):
562592
raise e
563593
except Exception as e:
564594
self.trigger_termination_requested.set()
565-
if future and not future.cancelled():
595+
if future and not future.done():
566596
future.set_exception(e)
567597
future = None
568598
else:
@@ -577,11 +607,11 @@ def run(self):
577607
else:
578608
self._kernel_stop()
579609
except Exception as e:
580-
if future:
610+
if future and not future.done():
581611
future.set_exception(e)
582612
future = None
583613
finally:
584-
if future:
614+
if future and not future.done():
585615
future.set_result(True)
586616

587617
def _cancel_tasks(self):
@@ -604,10 +634,18 @@ def __repr__(self):
604634
return '<%s[%s ❌], "%s">' % (self.__class__.__name__, self.name, self.kernel)
605635

606636

607-
class _StartEvent(object):
637+
class _ProcessAliveNotAbortedEvent(object):
608638
def __init__(self, subprocess, abort_event):
609639
self.subprocess = subprocess
610640
self.abort_event = abort_event
611641

612642
def is_set(self):
613643
return self.subprocess.poll() is not None or self.abort_event.is_set()
644+
645+
646+
class _KernelProcessDied(object):
647+
def __init__(self, subprocess):
648+
self.subprocess = subprocess
649+
650+
def is_set(self):
651+
return self.subprocess.poll() is not None

wolframclient/evaluation/kernel/localsession.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,13 +170,16 @@ def start_future(self):
170170
return future
171171

172172
def stop(self):
173-
""" Request the Wolfram kernel to stop gracefully. """
173+
""" Request the Wolfram kernel to stop gracefully.
174+
When recovering from an error state, it's recommended to use terminate() which
175+
is more resilient.
176+
"""
174177
self._stop(gracefully=True)
175178

176179
def terminate(self):
177180
""" Request the Wolfram kernel to stop immediately.
178-
179-
Ongoing evaluations may be cancelled. """
181+
Ongoing evaluations are likely to be cancelled.
182+
"""
180183
self._stop(gracefully=False)
181184

182185
def _stop(self, gracefully=True):

wolframclient/tests/evaluation/test_coroutine.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
WolframLanguageAsyncSession,
1010
parallel_evaluate,
1111
)
12+
from wolframclient.exception import WolframKernelException
1213
from wolframclient.language import wl, wlexpr
1314
from wolframclient.tests.configure import (
1415
kernel_path,
@@ -111,6 +112,25 @@ async def test_eval_parallel(self):
111112
res = await asyncio.gather(*tasks)
112113
self.assertEqual(res, list(range(1, 11)))
113114

115+
@run_in_loop
116+
async def test_quit_restart(self):
117+
try:
118+
async_session = WolframLanguageAsyncSession(
119+
kernel_path, kernel_loglevel=logging.INFO
120+
)
121+
pid1 = await async_session.evaluate("$ProcessID")
122+
self.assertEqual(pid1, async_session.kernel_controller.pid)
123+
with self.assertRaises(WolframKernelException):
124+
await async_session.evaluate("Quit[]")
125+
await async_session.terminate()
126+
pid2 = await async_session.evaluate("$ProcessID")
127+
self.assertEqual(pid2, async_session.kernel_controller.pid)
128+
self.assertNotEqual(pid1, pid2)
129+
finally:
130+
if async_session:
131+
await async_session.terminate()
132+
self.assertTrue(async_session.stopped)
133+
114134
def test_kwargs_parameters(self):
115135
TestKernelBase.class_kwargs_parameters(self, WolframLanguageAsyncSession)
116136

@@ -207,7 +227,7 @@ async def test_pool_from_mixed_kernel_cloud_path(self):
207227
kernel_path,
208228
)
209229
async with WolframEvaluatorPool(
210-
sessions, kernel_loglevel=logging.INFO, STARTUP_TIMEOUT=5, TERMINATE_TIMEOUT=3
230+
sessions, kernel_loglevel=logging.INFO, STARTUP_TIMEOUT=20, TERMINATE_TIMEOUT=3
211231
) as pool:
212232
await self._pool_evaluation_check(pool)
213233
for session in sessions:

wolframclient/tests/evaluation/test_kernel.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
from __future__ import absolute_import, print_function, unicode_literals
22

33
import logging
4+
from concurrent.futures import TimeoutError
5+
from time import time
46

57
from wolframclient.deserializers import WXFConsumer, binary_deserialize
68
from wolframclient.evaluation import WolframLanguageSession
@@ -178,6 +180,35 @@ def test_auto_start_session(self):
178180
session.terminate()
179181
self.assertTrue(session.stopped)
180182

183+
def test_kernel_dies_restart(self):
184+
try:
185+
session = WolframLanguageSession(kernel_path)
186+
pid1 = session.evaluate("$ProcessID")
187+
self.assertEqual(pid1, session.kernel_controller.pid)
188+
with self.assertRaises(WolframKernelException):
189+
session.evaluate("Quit[]")
190+
session.terminate()
191+
pid2 = session.evaluate("$ProcessID")
192+
self.assertEqual(pid2, session.kernel_controller.pid)
193+
self.assertNotEqual(pid1, pid2)
194+
finally:
195+
session.terminate()
196+
self.assertTrue(session.stopped)
197+
198+
def test_kernel_abort_restart(self):
199+
try:
200+
session = WolframLanguageSession(kernel_path)
201+
session.start()
202+
start = time()
203+
future = session.evaluate_future("Pause[10]")
204+
with self.assertRaises(TimeoutError):
205+
future.result(timeout=1.0)
206+
session.terminate()
207+
self.assertTrue((time() - start) < 10)
208+
finally:
209+
session.terminate()
210+
self.assertTrue(session.stopped)
211+
181212
def test_pure_function_inputform(self):
182213
f = self.kernel_session.function("#+1&")
183214
self.assertEqual(f(3), 4)
@@ -298,7 +329,7 @@ def test_throw(self):
298329
class TestSessionTimeout(TestCaseSettings):
299330
def test_evaluate_async_basic_inputform(self):
300331
future = self.kernel_session.evaluate_future("1+1")
301-
self.assertEqual(future.result(timeout=2), 2)
332+
self.assertEqual(future.result(timeout=5), 2)
302333

303334
def test_evaluate_async_basic_wl(self):
304335
future = self.kernel_session.evaluate_future(wl.Plus(1, 2))

wolframclient/tests/externalevaluate/ev_loop.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from wolframclient.utils.externalevaluate import EXPORT_KWARGS, start_zmq_loop
1010
from wolframclient.utils.tests import TestCase as BaseTestCase
1111

12+
1213
class TestCase(BaseTestCase):
1314
def compare(self, string_version, result):
1415
self.assertEqual(string_version, export(result, **EXPORT_KWARGS))

0 commit comments

Comments
 (0)