diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index 07dc5597..81b448e1 100644 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -47,6 +47,16 @@ class MtProtoSender: self._constant_read = constant_read self._recv_thread = None + # Every unhandled result gets passed to these callbacks, which + # should be functions accepting a single parameter: a TLObject. + # This should only be Update(s), although it can actually be any type. + # + # The thread from which these callbacks are called can be any. + # + # The creator of the MtProtoSender is responsible for setting this + # to point to the list wherever their callbacks reside. + self.unhandled_callbacks = None + def connect(self): """Connects to the server""" if not self.is_connected(): @@ -239,16 +249,15 @@ class MtProtoSender: return True - # If the code is not parsed manually, then it was parsed by the code generator! - # In this case, we will simply treat the incoming TLObject as an Update, - # if we can first find a matching TLObject + # If the code is not parsed manually then it should be a TLObject. if code in tlobjects: result = reader.tgread_object() - if updates is None: - self._logger.debug('Ignored update for %s', repr(result)) + if self.unhandled_callbacks: + self._logger.debug('Passing TLObject to callbacks %s', repr(result)) + for callback in self.unhandled_callbacks: + callback(result) else: - self._logger.debug('Read update for %s', repr(result)) - updates.append(result) + self._logger.debug('Ignoring unhandled TLObject %s', repr(result)) return True diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 9a9fbf69..20b5efec 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -83,6 +83,12 @@ class TelegramBareClient: # the time since it's a (somewhat expensive) process. self._cached_clients = {} + # Update callbacks (functions accepting a single TLObject) go here + # + # Note that changing the list to which this variable points to + # will not reflect the changes on the existing senders. + self._update_callbacks = [] + # These will be set later self.dc_options = None self._sender = None @@ -136,6 +142,7 @@ class TelegramBareClient: self._sender = MtProtoSender( connection, self.session, constant_read=constant_read ) + self._sender.unhandled_callbacks = self._update_callbacks self._sender.connect() # Now it's time to send an InitConnectionRequest diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index 8c34ebc6..c85fd0f7 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -98,14 +98,6 @@ class TelegramClient(TelegramBareClient): # Safety across multiple threads (for the updates thread) self._lock = RLock() - # Updates-related members - self._update_handlers = [] - self._updates_thread_running = Event() - self._updates_thread_receiving = Event() - - self._next_ping_at = 0 - self.ping_interval = 60 # Seconds - # Used on connection - the user may modify these and reconnect kwargs['app_version'] = kwargs.get('app_version', self.__version__) for name, value in kwargs.items(): @@ -145,7 +137,6 @@ class TelegramClient(TelegramBareClient): def disconnect(self): """Disconnects from the Telegram server and stops all the spawned threads""" - self._set_updates_thread(running=False) super().disconnect() # Also disconnect all the cached senders @@ -193,18 +184,8 @@ class TelegramClient(TelegramBareClient): try: self._lock.acquire() - updates = [] if self._update_handlers else None - result = super().invoke( - request, updates=updates - ) - - if updates: - for update in updates: - for handler in self._update_handlers: - handler(update) - # TODO Retry if 'result' is None? - return result + return super().invoke(request) except (PhoneMigrateError, NetworkMigrateError, UserMigrateError) as e: self._logger.debug('DC error when invoking request, ' @@ -394,8 +375,8 @@ class TelegramClient(TelegramBareClient): no_webpage=not link_preview ) result = self(request) - for handler in self._update_handlers: - handler(result) + for callback in self._update_callbacks: + callback(result) return request.random_id def get_message_history(self, @@ -886,109 +867,12 @@ class TelegramClient(TelegramBareClient): def add_update_handler(self, handler): """Adds an update handler (a function which takes a TLObject, an update, as its parameter) and listens for updates""" - if not self._sender: - raise RuntimeError("You can't add update handlers until you've " - "successfully connected to the server.") - - first_handler = not self._update_handlers - self._update_handlers.append(handler) - if first_handler: - self._set_updates_thread(running=True) + self._update_callbacks.append(handler) def remove_update_handler(self, handler): - self._update_handlers.remove(handler) - if not self._update_handlers: - self._set_updates_thread(running=False) + self._update_callbacks.remove(handler) def list_update_handlers(self): - return self._update_handlers[:] - - def _set_updates_thread(self, running): - """Sets the updates thread status (running or not)""" - if running == self._updates_thread_running.is_set(): - return - - # Different state, update the saved value and behave as required - self._logger.debug('Changing updates thread running status to %s', running) - if running: - self._updates_thread_running.set() - if not self._updates_thread: - self._updates_thread = Thread( - name='UpdatesThread', daemon=True, - target=self._updates_thread_method) - - self._updates_thread.start() - else: - self._updates_thread_running.clear() - if self._updates_thread_receiving.is_set(): - # self._sender.cancel_receive() - pass - - def _updates_thread_method(self): - """This method will run until specified and listen for incoming updates""" - - while self._updates_thread_running.is_set(): - # Always sleep a bit before each iteration to relax the CPU, - # since it's possible to early 'continue' the loop to reach - # the next iteration, but we still should to sleep. - sleep(0.1) - - with self._lock: - self._logger.debug('Updates thread acquired the lock') - try: - self._updates_thread_receiving.set() - self._logger.debug( - 'Trying to receive updates from the updates thread' - ) - - if time() > self._next_ping_at: - self._next_ping_at = time() + self.ping_interval - self(PingRequest(utils.generate_random_long())) - - #updates = self._sender.receive_updates(timeout=timeout) - updates = [] - - self._updates_thread_receiving.clear() - self._logger.debug( - 'Received {} update(s) from the updates thread' - .format(len(updates)) - ) - for update in updates: - for handler in self._update_handlers: - handler(update) - - except ConnectionResetError: - self._logger.debug('Server disconnected us. Reconnecting...') - self.reconnect() - - except TimeoutError: - self._logger.debug('Receiving updates timed out') - - except ReadCancelledError: - self._logger.debug('Receiving updates cancelled') - - except BrokenPipeError: - self._logger.debug('Tcp session is broken. Reconnecting...') - self.reconnect() - - except InvalidChecksumError: - self._logger.debug('MTProto session is broken. Reconnecting...') - self.reconnect() - - except OSError: - self._logger.debug('OSError on updates thread, %s logging out', - 'was' if self._sender.logging_out else 'was not') - - if self._sender.logging_out: - # This error is okay when logging out, means we got disconnected - # TODO Not sure why this happens because we call disconnect()... - self._set_updates_thread(running=False) - else: - raise - - self._logger.debug('Updates thread released the lock') - - # Thread is over, so clean unset its variable - self._updates_thread = None + return self._update_callbacks[:] # endregion