Fix-up 771c573 to properly stop background update workers

The "special" StopIteration object didn't actually make any sense.
Instead looping forever, workers now loop while there are workers,
so that they stop looping once the count is cleared.

Dummy values are still inserted so that they don't need to timeout
on the queue before exiting (these values are None) so in essence,
this keeps the best of both of worlds.
This commit is contained in:
Lonami Exo 2018-03-01 20:13:21 +01:00
parent 3a3f221bd1
commit f09ab6c6b6

View File

@ -39,19 +39,15 @@ class UpdateState:
return not self._updates.empty() return not self._updates.empty()
def poll(self, timeout=None): def poll(self, timeout=None):
"""Polls an update or blocks until an update object is available. """
Polls an update or blocks until an update object is available.
If 'timeout is not None', it should be a floating point value, If 'timeout is not None', it should be a floating point value,
and the method will 'return None' if waiting times out. and the method will 'return None' if waiting times out.
""" """
try: try:
update = self._updates.get(timeout=timeout) return self._updates.get(timeout=timeout)
except Empty: except Empty:
return return None
if isinstance(update, Exception):
raise update # Some error was set through (surely StopIteration)
return update
def get_workers(self): def get_workers(self):
return self._workers return self._workers
@ -60,27 +56,24 @@ class UpdateState:
"""Changes the number of workers running. """Changes the number of workers running.
If 'n is None', clears all pending updates from memory. If 'n is None', clears all pending updates from memory.
""" """
if n is None:
self.stop_workers() self.stop_workers()
else:
self._workers = n self._workers = n
if n is not None:
self.setup_workers() self.setup_workers()
workers = property(fget=get_workers, fset=set_workers) workers = property(fget=get_workers, fset=set_workers)
def stop_workers(self): def stop_workers(self):
""" """
Raises "StopIterationException" on the worker threads to stop Waits for all the worker threads to stop.
them, and also clears all the workers/updates from the lists.
""" """
if self._workers: # Put dummy ``None`` objects so that they don't need to timeout.
n = self._workers
self._workers = None
with self._updates_lock: with self._updates_lock:
# Insert at the beginning so the very next poll causes an error for _ in range(n):
# on all the worker threads self._updates.put(None)
# TODO Should this reset the pts and such?
while self._updates:
self._updates.get()
for _ in range(self._workers):
self._updates.put(StopIteration())
for t in self._worker_threads: for t in self._worker_threads:
t.join() t.join()
@ -103,7 +96,7 @@ class UpdateState:
thread.start() thread.start()
def _worker_loop(self, wid): def _worker_loop(self, wid):
while True: while self._workers is not None:
try: try:
update = self.poll(timeout=UpdateState.WORKER_POLL_TIMEOUT) update = self.poll(timeout=UpdateState.WORKER_POLL_TIMEOUT)
if update and self.handler: if update and self.handler: