Skip to content

Commit d3c8417

Browse files
author
Dorian Birraux
committed
Rename list of tasks in pool.py
Names were pretty confusing.
1 parent 876ad1d commit d3c8417

1 file changed

Lines changed: 20 additions & 23 deletions

File tree

wolframclient/evaluation/pool.py

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@ def __init__(
8888
break
8989
self._add_evaluator(evaluator)
9090

91-
self._started_tasks = []
92-
self._pending_init_tasks = ()
91+
self._kernel_start_tasks = ()
92+
self._kernel_evaluation_loop_tasks = []
9393
self.last = 0
9494
self.eval_count = 0
9595
self.requestedsize = poolsize
@@ -179,11 +179,11 @@ async def _async_start_kernel(self, kernel):
179179
if logger.isEnabledFor(logging.INFO):
180180
logger.info("New kernel started in pool: %s.", kernel)
181181
# register the task. The loop is not always started at this point.
182-
self._started_tasks.append(task)
182+
self._kernel_evaluation_loop_tasks.append(task)
183183

184184
@property
185185
def started(self):
186-
return len(self._started_tasks) > 0
186+
return len(self._kernel_evaluation_loop_tasks) > 0
187187

188188
async def start(self):
189189
""" Start a pool of kernels and wait for at least one of them to
@@ -194,21 +194,21 @@ async def start(self):
194194
"""
195195
self.stopped = False
196196
# keep track of the init tasks. We have to wait before terminating.
197-
self._pending_init_tasks = {
197+
self._kernel_start_tasks = {
198198
(asyncio.ensure_future(self._async_start_kernel(kernel), loop=self._loop))
199199
for kernel in self._evaluators
200200
}
201201
# uninitialized kernels are removed if they failed to start
202-
# if they do start the task (the loop) is added to _started_tasks.
202+
# if they do start, the task (the loop) is added to _kernel_evaluation_loop_tasks.
203203
# we need at least one working kernel.
204204
# we also need to keep track of start kernel tasks in case of early termination.
205-
while len(self._started_tasks) == 0:
206-
if len(self._pending_init_tasks) == 0:
205+
while len(self._kernel_evaluation_loop_tasks) == 0:
206+
if len(self._kernel_start_tasks) == 0:
207207
raise WolframKernelException("Failed to start any kernel.")
208-
_, self._pending_init_tasks = await asyncio.wait(
209-
self._pending_init_tasks, return_when=asyncio.FIRST_COMPLETED
208+
_, self._kernel_start_tasks = await asyncio.wait(
209+
self._kernel_start_tasks, return_when=asyncio.FIRST_COMPLETED
210210
)
211-
logger.info("Pool initialized with %i running kernels", len(self._started_tasks))
211+
logger.info("Pool initialized with %i running kernels", len(self._kernel_evaluation_loop_tasks))
212212

213213
async def stop(self):
214214

@@ -217,17 +217,17 @@ async def stop(self):
217217

218218
self.stopped = True
219219
# make sure all init tasks are finished.
220-
if len(self._pending_init_tasks) > 0:
221-
for task in self._pending_init_tasks:
220+
if len(self._kernel_start_tasks) > 0:
221+
for task in self._kernel_start_tasks:
222222
task.cancel()
223-
await asyncio.wait(self._pending_init_tasks)
224-
if len(self._started_tasks) > 0:
223+
await asyncio.wait(self._kernel_start_tasks)
224+
if len(self._kernel_evaluation_loop_tasks) > 0:
225225
try:
226226
# request for loop termination.
227-
for _ in range(len(self._started_tasks)):
227+
for _ in range(len(self._kernel_evaluation_loop_tasks)):
228228
await self._queue.put(None)
229229
# wait for loop to finish before terminating the kernels
230-
await asyncio.wait(self._started_tasks, loop=self._loop)
230+
await asyncio.wait(self._kernel_evaluation_loop_tasks, loop=self._loop)
231231
except CancelledError:
232232
pass
233233
except Exception as e:
@@ -266,23 +266,20 @@ async def evaluate_wrap(self, expr, **kwargs):
266266
await self._put_evaluation_task(future, "evaluate_wrap", expr, **kwargs)
267267
return await future
268268

269-
def evaluate_all(self, iterable):
270-
return self._loop.run_until_complete(self._evaluate_all(iterable))
271-
272-
async def _evaluate_all(self, iterable):
269+
async def evaluate_all(self, iterable):
273270
tasks = [asyncio.create_task(self.evaluate(expr)) for expr in iterable]
274271
return await asyncio.gather(*tasks)
275272

276273
def __repr__(self):
277274
return "<%s %i/%i started evaluators, cumulating %i evaluations>" % (
278275
self.__class__.__name__,
279-
len(self._started_tasks),
276+
len(self._kernel_evaluation_loop_tasks),
280277
self.requestedsize,
281278
self.eval_count,
282279
)
283280

284281
def __len__(self):
285-
return len(self._started_tasks)
282+
return len(self._kernel_evaluation_loop_tasks)
286283

287284

288285
def parallel_evaluate(expressions, evaluator_spec=None, max_evaluators=4, loop=None):

0 commit comments

Comments
 (0)