diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index e9eae399..1a6cfebd 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -67,7 +67,7 @@ class TelegramBareClient: def __init__(self, session, api_id, api_hash, connection_mode=ConnectionMode.TCP_FULL, proxy=None, - process_updates=False, + update_workers=None, timeout=timedelta(seconds=5), **kwargs): """Refer to TelegramClient.__init__ for docs on this method""" @@ -108,7 +108,7 @@ class TelegramBareClient: # This member will process updates if enabled. # One may change self.updates.enabled at any later point. - self.updates = UpdateState(process_updates) + self.updates = UpdateState(workers=update_workers) # Used on connection - the user may modify these and reconnect kwargs['app_version'] = kwargs.get('app_version', self.__version__) diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index a28d2c62..b78fc68d 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -57,7 +57,7 @@ class TelegramClient(TelegramBareClient): def __init__(self, session, api_id, api_hash, connection_mode=ConnectionMode.TCP_FULL, proxy=None, - process_updates=False, + update_workers=None, timeout=timedelta(seconds=5), **kwargs): """Initializes the Telegram client with the specified API ID and Hash. @@ -71,11 +71,11 @@ class TelegramClient(TelegramBareClient): This will only affect how messages are sent over the network and how much processing is required before sending them. - If 'process_updates' is set to True, incoming updates will be - processed and you must manually call 'self.updates.poll()' from - another thread to retrieve the saved update objects, or your - memory will fill with these. You may modify the value of - 'self.updates.polling' at any later point. + The integer 'update_workers' represents depending on its value: + is None: Updates will *not* be stored in memory. + = 0: Another thread is responsible for calling self.updates.poll() + > 0: 'update_workers' background threads will be spawned, any + any of them will invoke all the self.updates.handlers. Despite the value of 'process_updates', if you later call '.add_update_handler(...)', updates will also be processed @@ -94,7 +94,7 @@ class TelegramClient(TelegramBareClient): session, api_id, api_hash, connection_mode=connection_mode, proxy=proxy, - process_updates=process_updates, + update_workers=update_workers, timeout=timeout ) diff --git a/telethon/update_state.py b/telethon/update_state.py index 9239bd03..cced0e1c 100644 --- a/telethon/update_state.py +++ b/telethon/update_state.py @@ -1,5 +1,4 @@ import logging -import threading from collections import deque from datetime import datetime from threading import RLock, Event, Thread @@ -11,9 +10,15 @@ class UpdateState: """Used to hold the current state of processed updates. To retrieve an update, .poll() should be called. """ - def __init__(self, polling): - self._polling = polling - self._workers = 4 + def __init__(self, workers=None): + """ + :param workers: This integer parameter has three possible cases: + workers is None: Updates will *not* be stored on self. + workers = 0: Another thread is responsible for calling self.poll() + workers > 0: 'workers' background threads will be spawned, any + any of them will invoke all the self.handlers. + """ + self._workers = workers self._worker_threads = [] self.handlers = [] @@ -25,11 +30,7 @@ class UpdateState: # https://core.telegram.org/api/updates self._state = tl.updates.State(0, 0, datetime.now(), 0, 0) - - # TODO Rename "polling" to some other variable - # that signifies "running background threads". - if polling: - self._setup_workers() + self._setup_workers() def can_poll(self): """Returns True if a call to .poll() won't lock""" @@ -37,9 +38,6 @@ class UpdateState: def poll(self): """Polls an update or blocks until an update object is available""" - if not self._polling: - raise ValueError('Updates are not being polled hence not saved.') - self._updates_available.wait() with self._updates_lock: if not self._updates_available.is_set(): @@ -54,28 +52,19 @@ class UpdateState: return update - # TODO How should this be handled with background worker threads? - def get_polling(self): - return self._polling - - def set_polling(self, polling): - self._polling = polling - if polling: - self._setup_workers() - else: - with self._updates_lock: - self._updates.clear() - self._stop_workers() - - polling = property(fget=get_polling, fset=set_polling) - def get_workers(self): return self._workers def set_workers(self, n): + """Changes the number of workers running. + If 'n is None', clears all pending updates from memory. + """ self._stop_workers() self._workers = n - self._setup_workers() + if n is None: + self._updates.clear() + else: + self._setup_workers() workers = property(fget=get_workers, fset=set_workers) @@ -83,9 +72,6 @@ class UpdateState: """Raises "StopIterationException" on the worker threads to stop them, and also clears all of them off the list """ - if self._worker_threads: - pass - self.set_error(StopIteration()) for t in self._worker_threads: t.join() @@ -93,8 +79,8 @@ class UpdateState: self._worker_threads.clear() def _setup_workers(self): - if self._worker_threads: - # There already are workers + if self._worker_threads or not self._workers: + # There already are workers, or workers is None or 0. Do nothing. return for i in range(self._workers): @@ -141,8 +127,8 @@ class UpdateState: """Processes an update object. This method is normally called by the library itself. """ - if not self._polling and not self.handlers: - return + if self._workers is None: + return # No processing needs to be done if nobody's working with self._updates_lock: if isinstance(update, tl.updates.State): @@ -154,6 +140,5 @@ class UpdateState: return # We already handled this update self._state.pts = pts - if self._polling: - self._updates.append(update) - self._updates_available.set() + self._updates.append(update) + self._updates_available.set()