Skip to content

Commit 5e171a2

Browse files
author
Dorian Birraux
committed
Removing loop parameter in asynchronous evaluators
1 parent 838780f commit 5e171a2

4 files changed

Lines changed: 32 additions & 42 deletions

File tree

wolframclient/evaluation/base.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -143,21 +143,20 @@ def __exit__(self, type, value, traceback):
143143

144144
class WolframAsyncEvaluator(WolframEvaluatorBase):
145145
""" Asynchronous evaluators are similar to synchronous ones except that they make heavy use of coroutines
146-
and need an event loop.
146+
and need to run in an event loop.
147147
148-
Most methods from this class are similar to their counterpart from :class:`~wolframclient.evaluation.base.WolframEvaluator`,
149-
except that they are coroutines. """
148+
Most methods from this class are similar to their counterpart from
149+
:class:`~wolframclient.evaluation.base.WolframEvaluator`, except that they are coroutines. """
150150

151-
def __init__(self, loop=None, **kwargs):
152-
self._loop = loop or asyncio.get_event_loop()
151+
def __init__(self, **kwargs):
153152
super().__init__(**kwargs)
154153

155154
async def evaluate(self, expr):
156155
result = await self.evaluate_wrap(expr)
157156
return await result.get()
158157

159158
async def evaluate_many(self, expr_list):
160-
return await asyncio.gather(*map(self.evaluate, expr_list), loop=self._loop)
159+
return await asyncio.gather(*map(self.evaluate, expr_list))
161160

162161
async def evaluate_wrap(self, expr):
163162
raise NotImplementedError
@@ -215,7 +214,13 @@ async def __aexit__(self, type, value, traceback):
215214

216215
def __del__(self, _warnings=warnings):
217216
super().__del__(_warnings=warnings)
218-
if not self.stopped and self._loop and not self._loop.is_closed():
219-
self._loop.call_exception_handler(
220-
{self.__class__.__name__: self, "message": "Unclosed evaluator."}
221-
)
217+
if not self.stopped:
218+
loop = None
219+
try:
220+
loop = asyncio.get_running_loop()
221+
except RuntimeError:
222+
pass
223+
if loop and not loop.is_closed():
224+
loop.call_exception_handler(
225+
{self.__class__.__name__: self, "message": "Unclosed evaluator."}
226+
)

wolframclient/evaluation/cloud/asynccloudsession.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,6 @@ class WolframCloudAsyncSession(WolframAsyncEvaluator):
3838
async with WolframCloudAsyncSession() as session:
3939
await session.call(...)
4040
41-
An event loop can be explicitly passed using the named parameter `loop`; otherwise, the one
42-
returned by :func:`~asyncio.get_event_loop` is used.
4341
The initialization options of the class :class:`~wolframclient.evaluation.WolframCloudSession` are also supported by
4442
this class.
4543
"""
@@ -48,14 +46,13 @@ def __init__(
4846
self,
4947
credentials=None,
5048
server=None,
51-
loop=None,
5249
inputform_string_evaluation=True,
5350
oauth_session_class=None,
5451
xauth_session_class=None,
5552
http_sessionclass=None,
5653
ssl_context_class=None,
5754
):
58-
super().__init__(loop, inputform_string_evaluation=inputform_string_evaluation)
55+
super().__init__(inputform_string_evaluation=inputform_string_evaluation)
5956
self.server = server or WOLFRAM_PUBLIC_CLOUD_SERVER
6057
self.http_session = None
6158
self.http_sessionclass = http_sessionclass or aiohttp.ClientSession
@@ -76,7 +73,6 @@ def duplicate(self):
7673
return self.__class__(
7774
credentials=self.credentials,
7875
server=self.server,
79-
loop=self._loop,
8076
inputform_string_evaluation=self.inputform_string_evaluation,
8177
oauth_session_class=self.oauth_session_class,
8278
xauth_session_class=self.xauth_session_class,
@@ -90,8 +86,7 @@ async def start(self):
9086
if not self.started:
9187
if self.http_session is None or self.http_session.closed:
9288
self.http_session = self.http_sessionclass(
93-
headers={"User-Agent": "WolframClientForPython/1.0"}, loop=self._loop
94-
)
89+
headers={"User-Agent": "WolframClientForPython/1.0"})
9590
if not self.anonymous():
9691
await self._authenticate()
9792
except Exception as e:

wolframclient/evaluation/kernel/asyncsession.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,7 @@ class WolframLanguageAsyncSession(WolframAsyncEvaluator, WolframLanguageSession)
2222
async with WolframLanguageAsyncSession() as session:
2323
await session.evaluate('Now')
2424
25-
An event loop can be explicitly passed using the named parameter `loop`; otherwise, the one
26-
returned by :func:`~asyncio.get_event_loop` is used.
27-
28-
Coroutines all run in a unique thread. Since a Wolfram kernel is single threaded, there can
25+
Coroutines all run in their own thread. Since a Wolfram kernel is single threaded, there can
2926
be only one evaluation at a time. In a sense, from the event loop point of view, evaluations
3027
are atomic operations. Even when many asynchronous sessions are started, the number of
3128
threads equals the number of kernel instances running and should not be problematic. Ensuring
@@ -37,7 +34,6 @@ def __init__(
3734
self,
3835
kernel=None,
3936
consumer=None,
40-
loop=None,
4137
initfile=None,
4238
kernel_loglevel=logging.NOTSET,
4339
stdin=PIPE,
@@ -55,14 +51,12 @@ def __init__(
5551
stdout=stdout,
5652
stderr=stderr,
5753
inputform_string_evaluation=inputform_string_evaluation,
58-
loop=loop,
5954
**kwargs
6055
)
6156

6257
def duplicate(self):
6358
return self.__class__(
6459
kernel=self.kernel,
65-
loop=self._loop,
6660
consumer=self.consumer,
6761
initfile=self.initfile,
6862
kernel_loglevel=self.kernel_loglevel,
@@ -77,7 +71,7 @@ async def do_evaluate_future(self, expr, result_update_callback=None, **kwargs):
7771
future = super().do_evaluate_future(
7872
expr, result_update_callback=result_update_callback, **kwargs
7973
)
80-
return asyncio.wrap_future(future, loop=self._loop)
74+
return asyncio.wrap_future(future)
8175

8276
async def evaluate_future(self, expr, **kwargs):
8377
await self.ensure_started()
@@ -138,7 +132,7 @@ async def start(self):
138132
139133
This method is a coroutine."""
140134
future = super().start_future()
141-
await asyncio.wrap_future(future, loop=self._loop)
135+
await asyncio.wrap_future(future)
142136

143137
async def stop(self):
144138
"""Asynchronously stop the session (graceful termination).
@@ -155,4 +149,4 @@ async def terminate(self):
155149
async def _async_terminate(self, gracefully):
156150
logger.info("Terminating asynchronous kernel session.")
157151
future = super().stop_future(gracefully=gracefully)
158-
await asyncio.wrap_future(future, loop=self._loop)
152+
await asyncio.wrap_future(future)

wolframclient/evaluation/pool.py

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,6 @@ class WolframEvaluatorPool(WolframAsyncEvaluator):
5858
Set `load_factor` to specify how many workloads are queued per kernel before a new evaluation becomes a blocking
5959
operation. Values below or equal to 0 mean an infinite queue size.
6060
61-
Set `loop` to the event loop to use.
62-
6361
`kwargs` are passed to :class:`~wolframclient.evaluation.WolframLanguageAsyncSession` during initialization.
6462
"""
6563

@@ -68,14 +66,13 @@ def __init__(
6866
async_evaluators=None,
6967
poolsize=4,
7068
load_factor=0,
71-
loop=None,
7269
async_language_session_class=WolframLanguageAsyncSession,
7370
**kwargs
7471
):
75-
super().__init__(loop)
72+
super().__init__()
7673
if poolsize <= 0:
7774
raise ValueError("Invalid pool size value %i. Expecting a positive integer." % poolsize)
78-
self._queue = asyncio.Queue(load_factor * poolsize, loop=self._loop)
75+
self._queue = asyncio.Queue(load_factor * poolsize)
7976
self.async_language_session_class = async_language_session_class
8077
self._evaluators = set()
8178
if async_evaluators is None or isinstance(async_evaluators, six.string_types):
@@ -98,7 +95,7 @@ def __init__(
9895
def _add_evaluator(self, evaluator, **kwargs):
9996
if evaluator is None or isinstance(evaluator, six.string_types):
10097
self._evaluators.add(
101-
self.async_language_session_class(kernel=evaluator, loop=self._loop, **kwargs)
98+
self.async_language_session_class(kernel=evaluator, **kwargs)
10299
)
103100
elif isinstance(evaluator, WolframAsyncEvaluator):
104101
if evaluator in self._evaluators:
@@ -196,11 +193,10 @@ async def start(self):
196193
self.stopped = False
197194
# keep track of the init tasks. We have to wait before terminating.
198195
self._kernel_start_tasks = {
199-
(asyncio.ensure_future(self._async_start_kernel(kernel), loop=self._loop))
200-
for kernel in self._evaluators
196+
asyncio.ensure_future(self._async_start_kernel(kernel)) for kernel in self._evaluators
201197
}
202198
# uninitialized kernels are removed if they failed to start
203-
# if they do start, the task (the loop) is added to _kernel_evaluation_loop_tasks.
199+
# if they do start, the task is added to _kernel_evaluation_loop_tasks.
204200
# we need at least one working kernel.
205201
# we also need to keep track of start kernel tasks in case of early termination.
206202
while len(self._kernel_evaluation_loop_tasks) == 0:
@@ -228,15 +224,15 @@ async def stop(self):
228224
for _ in range(len(self._kernel_evaluation_loop_tasks)):
229225
await self._queue.put(None)
230226
# wait for loop to finish before terminating the kernels
231-
await asyncio.wait(self._kernel_evaluation_loop_tasks, loop=self._loop)
227+
await asyncio.wait(self._kernel_evaluation_loop_tasks)
232228
except CancelledError:
233229
pass
234230
except Exception as e:
235231
logger.warning("Exception raised while terminating loop: %s", e)
236232
# terminate the kernel instances, if any started.
237233
tasks = {asyncio.create_task(kernel.stop()) for kernel in self._evaluators}
238234
# `wait` raises the first exception, but wait for all tasks to finish.
239-
await asyncio.wait(tasks, loop=self._loop)
235+
await asyncio.wait(tasks)
240236

241237
async def terminate(self):
242238
await self.stop()
@@ -253,17 +249,17 @@ async def _put_evaluation_task(self, future, func, expr, **kwargs):
253249
self.eval_count += 1
254250

255251
async def evaluate(self, expr, **kwargs):
256-
future = asyncio.Future(loop=self._loop)
252+
future = asyncio.Future()
257253
await self._put_evaluation_task(future, "evaluate", expr, **kwargs)
258254
return await future
259255

260256
async def evaluate_wxf(self, expr, **kwargs):
261-
future = asyncio.Future(loop=self._loop)
257+
future = asyncio.Future()
262258
await self._put_evaluation_task(future, "evaluate_wxf", expr, **kwargs)
263259
return await future
264260

265261
async def evaluate_wrap(self, expr, **kwargs):
266-
future = asyncio.Future(loop=self._loop)
262+
future = asyncio.Future()
267263
await self._put_evaluation_task(future, "evaluate_wrap", expr, **kwargs)
268264
return await future
269265

@@ -283,7 +279,7 @@ def __len__(self):
283279
return len(self._kernel_evaluation_loop_tasks)
284280

285281

286-
def parallel_evaluate(expressions, evaluator_spec=None, max_evaluators=4, loop=None):
282+
def parallel_evaluate(expressions, evaluator_spec=None, max_evaluators=4):
287283
""" Start a kernel pool and evaluate the expressions in parallel.
288284
289285
The pool is created with the value of `evaluator_spec`. The pool is automatically stopped when it is no longer

0 commit comments

Comments
 (0)