diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index a6d78c14..38fd1555 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -1,21 +1,24 @@ import logging -from datetime import timedelta +import os +import threading +from datetime import timedelta, datetime from hashlib import md5 from io import BytesIO -from os import path from threading import Lock +from time import sleep from . import helpers as utils from .crypto import rsa, CdnDecrypter from .errors import ( RPCError, BrokenAuthKeyError, - FloodWaitError, FileMigrateError, TypeNotFoundError + FloodWaitError, FileMigrateError, TypeNotFoundError, + UnauthorizedError, PhoneMigrateError, NetworkMigrateError, UserMigrateError ) from .network import authenticator, MtProtoSender, Connection, ConnectionMode from .tl import TLObject, Session from .tl.all_tlobjects import LAYER from .tl.functions import ( - InitConnectionRequest, InvokeWithLayerRequest + InitConnectionRequest, InvokeWithLayerRequest, PingRequest ) from .tl.functions.auth import ( ImportAuthorizationRequest, ExportAuthorizationRequest @@ -23,6 +26,7 @@ from .tl.functions.auth import ( from .tl.functions.help import ( GetCdnConfigRequest, GetConfigRequest ) +from .tl.functions.updates import GetStateRequest from .tl.functions.upload import ( GetFileRequest, SaveBigFilePartRequest, SaveFilePartRequest ) @@ -64,11 +68,22 @@ class TelegramBareClient: connection_mode=ConnectionMode.TCP_FULL, proxy=None, process_updates=False, - timeout=timedelta(seconds=5)): - """Initializes the Telegram client with the specified API ID and Hash. - Session must always be a Session instance, and an optional proxy - can also be specified to be used on the connection. - """ + timeout=timedelta(seconds=5), + **kwargs): + """Refer to TelegramClient.__init__ for docs on this method""" + if not api_id or not api_hash: + raise PermissionError( + "Your API ID or Hash cannot be empty or None. " + "Refer to Telethon's README.rst for more information.") + + # Determine what session object we have + if isinstance(session, str) or session is None: + session = Session.try_load_or_create_new(session) + elif not isinstance(session, Session): + raise ValueError( + 'The given session must be a str or a Session instance.' + ) + self.session = session self.api_id = int(api_id) self.api_hash = api_hash @@ -95,6 +110,39 @@ class TelegramBareClient: # One may change self.updates.enabled at any later point. self.updates = UpdateState(process_updates) + # Used on connection - the user may modify these and reconnect + kwargs['app_version'] = kwargs.get('app_version', self.__version__) + for name, value in kwargs.items(): + if hasattr(self.session, name): + setattr(self.session, name, value) + + # Despite the state of the real connection, keep track of whether + # the user has explicitly called .connect() or .disconnect() here. + # This information is required by the read thread, who will be the + # one attempting to reconnect on the background *while* the user + # doesn't explicitly call .disconnect(), thus telling it to stop + # retrying. The main thread, knowing there is a background thread + # attempting reconnection as soon as it happens, will just sleep. + self._user_connected = False + + # Save whether the user is authorized here (a.k.a. logged in) + self._authorized = False + + # Uploaded files cache so subsequent calls are instant + self._upload_cache = {} + + # Constantly read for results and updates from within the main client + self._recv_thread = None + + # Identifier of the main thread (the one that called .connect()). + # This will be used to create new connections from any other thread, + # so that requests can be sent in parallel. + self._main_thread_ident = None + + # Default PingRequest delay + self._last_ping = datetime.now() + self._ping_delay = timedelta(minutes=1) + # endregion # region Connecting @@ -108,6 +156,8 @@ class TelegramBareClient: If 'exported_auth' is not None, it will be used instead to determine the authorization key for the current session. """ + self._main_thread_ident = threading.get_ident() + try: self._sender.connect() if not self.session.auth_key: @@ -143,6 +193,15 @@ class TelegramBareClient: TelegramBareClient._dc_options = \ self(GetConfigRequest()).dc_options + # Connection was successful! Try syncing the update state + # to also assert whether the user is logged in or not. + self._user_connected = True + try: + self.sync_updates() + self._set_connected_and_authorized() + except UnauthorizedError: + self._authorized = False + return True except TypeNotFoundError as e: @@ -178,9 +237,23 @@ class TelegramBareClient: return result def disconnect(self): - """Disconnects from the Telegram server""" + """Disconnects from the Telegram server + and stops all the spawned threads""" + self._user_connected = False + self._recv_thread = None + + # This will trigger a "ConnectionResetError", for subsequent calls + # to read or send (from another thread) and usually, the background + # thread would try restarting the connection but since the + # ._recv_thread = None, it knows it doesn't have to. self._sender.disconnect() + # Also disconnect all the cached senders + for sender in self._cached_clients.values(): + sender.disconnect() + + self._cached_clients.clear() + def _reconnect(self, new_dc=None): """If 'new_dc' is not set, only a call to .connect() will be made since it's assumed that the connection has been lost and the @@ -210,7 +283,11 @@ class TelegramBareClient: # endregion - # region Working with different Data Centers + # region Working with different connections/Data Centers + + def _on_read_thread(self): + return self._recv_thread is not None and \ + threading.get_ident() == self._recv_thread.ident def _get_dc(self, dc_id, ipv6=False, cdn=False): """Gets the Data Center (DC) associated to 'dc_id'""" @@ -290,16 +367,21 @@ class TelegramBareClient: # region Invoking Telegram requests - def invoke(self, *requests, call_receive=True, retries=5, sender=None): + def invoke(self, *requests, call_receive=True, retries=5): """Invokes (sends) a MTProtoRequest and returns (receives) its result. - If 'updates' is not None, all read update object will be put - in such list. Otherwise, update objects will be ignored. - - If 'call_receive' is set to False, then there should be another - thread calling to 'self._sender.receive()' running or this method - will lock forever. + The invoke will be retried up to 'retries' times before raising + ValueError(). """ + # This is only valid when the read thread is reconnecting, + # that is, the connection lock is locked. + on_read_thread = self._on_read_thread() + if on_read_thread and not self._connect_lock.locked(): + return # Just ignore, we would be raising and crashing the thread + + # Any error from a background thread will be "posted" and checked here + self.updates.check_error() + if not all(isinstance(x, TLObject) and x.content_related for x in requests): raise ValueError('You can only invoke requests, not types!') @@ -307,8 +389,20 @@ class TelegramBareClient: if retries <= 0: raise ValueError('Number of retries reached 0.') - if sender is None: + # Determine the sender to be used (main or a new connection) + # TODO Polish this so it's nicer + on_main_thread = threading.get_ident() == self._main_thread_ident + if on_main_thread or on_read_thread: sender = self._sender + else: + conn = Connection( + self.session.server_address, self.session.port, + mode=self._sender.connection._mode, + proxy=self._sender.connection.conn.proxy, + timeout=self._sender.connection.get_timeout() + ) + sender = MtProtoSender(self.session, conn) + sender.connect() try: # Ensure that we start with no previous errors (i.e. resending) @@ -317,6 +411,14 @@ class TelegramBareClient: x.rpc_error = None sender.send(*requests) + + # We should call receive from this thread if there's no background + # thread reading or if the server disconnected us and we're trying + # to reconnect. This is because the read thread may either be + # locked also trying to reconnect or we may be said thread already. + call_receive = not on_main_thread or \ + self._recv_thread is None or self._connect_lock.locked() + if not call_receive: # TODO This will be slightly troublesome if we allow # switching between constant read or not on the fly. @@ -330,22 +432,49 @@ class TelegramBareClient: while not all(x.confirm_received.is_set() for x in requests): sender.receive(update_state=self.updates) + except (PhoneMigrateError, NetworkMigrateError, + UserMigrateError) as e: + self._logger.debug( + 'DC error when invoking request, ' + 'attempting to reconnect at DC {}'.format(e.new_dc) + ) + + # TODO What happens with the background thread here? + # For normal use cases, this won't happen, because this will only + # be on the very first connection (not authorized, not running), + # but may be an issue for people who actually travel? + self._reconnect(new_dc=e.new_dc) + return self.invoke( + *requests, call_receive=call_receive, retries=(retries - 1) + ) + except TimeoutError: pass # We will just retry except ConnectionResetError: + if self._connect_lock.locked(): + # We are connecting and we don't want to reconnect there... + raise + self._logger.debug('Server disconnected us. Reconnecting and ' 'resending request...') + if sender != self._sender: + # TODO Try reconnecting forever too? sender.connect() else: - self._reconnect() + while self._user_connected and not self._reconnect(): + sleep(0.1) # Retry forever until we can send the request except FloodWaitError: sender.disconnect() self.disconnect() raise + finally: + if sender != self._sender: + sender.disconnect() + try: raise next(x.rpc_error for x in requests if x.rpc_error) except StopIteration: @@ -363,6 +492,13 @@ class TelegramBareClient: # Let people use client(SomeRequest()) instead client.invoke(...) __call__ = invoke + # Some really basic functionality + + def is_user_authorized(self): + """Has the user been authorized yet + (code request sent and confirmed)?""" + return self._authorized + # endregion # region Uploading media @@ -388,10 +524,10 @@ class TelegramBareClient: Default values for the optional parameters if left as None are: part_size_kb = get_appropriated_part_size(file_size) - file_name = path.basename(file_path) + file_name = os.path.basename(file_path) """ if isinstance(file, str): - file_size = path.getsize(file) + file_size = os.path.getsize(file) elif isinstance(file, bytes): file_size = len(file) else: @@ -447,7 +583,7 @@ class TelegramBareClient: # Set a default file name if None was specified if not file_name: if isinstance(file, str): - file_name = path.basename(file) + file_name = os.path.basename(file) else: file_name = str(file_id) @@ -544,3 +680,73 @@ class TelegramBareClient: f.close() # endregion + + # region Updates handling + + def sync_updates(self): + """Synchronizes self.updates to their initial state. Will be + called automatically on connection if self.updates.enabled = True, + otherwise it should be called manually after enabling updates. + """ + self.updates.process(self(GetStateRequest())) + + def add_update_handler(self, handler): + """Adds an update handler (a function which takes a TLObject, + an update, as its parameter) and listens for updates""" + sync = not self.updates.handlers + self.updates.handlers.append(handler) + if sync: + self.sync_updates() + + def remove_update_handler(self, handler): + self.updates.handlers.remove(handler) + + def list_update_handlers(self): + return self.updates.handlers[:] + + # endregion + + # Constant read + + def _set_connected_and_authorized(self): + self._authorized = True + if self._recv_thread is None: + self._recv_thread = threading.Thread( + name='ReadThread', daemon=True, + target=self._recv_thread_impl + ) + self._recv_thread.start() + + # By using this approach, another thread will be + # created and started upon connection to constantly read + # from the other end. Otherwise, manual calls to .receive() + # must be performed. The MtProtoSender cannot be connected, + # or an error will be thrown. + # + # This way, sending and receiving will be completely independent. + def _recv_thread_impl(self): + while self._user_connected: + try: + if datetime.now() > self._last_ping + self._ping_delay: + self._sender.send(PingRequest( + int.from_bytes(os.urandom(8), 'big', signed=True) + )) + self._last_ping = datetime.now() + + self._sender.receive(update_state=self.updates) + except TimeoutError: + # No problem. + pass + except ConnectionResetError: + self._logger.debug('Server disconnected us. Reconnecting...') + while self._user_connected and not self._reconnect(): + sleep(0.1) # Retry forever, this is instant messaging + + except Exception as e: + # Unknown exception, pass it to the main thread + self.updates.set_error(e) + break + + self._recv_thread = None + + # endregion diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index 9db87684..a28d2c62 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -1,10 +1,7 @@ import os -import threading from datetime import datetime, timedelta from functools import lru_cache from mimetypes import guess_type -from threading import Thread -from time import sleep try: import socks @@ -15,12 +12,10 @@ from . import TelegramBareClient from . import helpers as utils from .errors import ( RPCError, UnauthorizedError, InvalidParameterError, PhoneCodeEmptyError, - PhoneMigrateError, NetworkMigrateError, UserMigrateError, PhoneCodeExpiredError, PhoneCodeHashEmptyError, PhoneCodeInvalidError ) -from .network import Connection, ConnectionMode, MtProtoSender -from .tl import Session, TLObject -from .tl.functions import PingRequest +from .network import ConnectionMode +from .tl import TLObject from .tl.functions.account import ( GetPasswordRequest ) @@ -35,9 +30,6 @@ from .tl.functions.messages import ( GetDialogsRequest, GetHistoryRequest, ReadHistoryRequest, SendMediaRequest, SendMessageRequest ) -from .tl.functions.updates import ( - GetStateRequest -) from .tl.functions.users import ( GetUsersRequest ) @@ -98,18 +90,6 @@ class TelegramClient(TelegramBareClient): system_lang_code = lang_code report_errors = True """ - if not api_id or not api_hash: - raise PermissionError( - "Your API ID or Hash cannot be empty or None. " - "Refer to Telethon's README.rst for more information.") - - # Determine what session object we have - if isinstance(session, str) or session is None: - session = Session.try_load_or_create_new(session) - elif not isinstance(session, Session): - raise ValueError( - 'The given session must be a str or a Session instance.') - super().__init__( session, api_id, api_hash, connection_mode=connection_mode, @@ -118,187 +98,16 @@ class TelegramClient(TelegramBareClient): timeout=timeout ) - # Used on connection - the user may modify these and reconnect - kwargs['app_version'] = kwargs.get('app_version', self.__version__) - for name, value in kwargs.items(): - if hasattr(self.session, name): - setattr(self.session, name, value) - - self._updates_thread = None + # Some fields to easy signing in self._phone_code_hash = None self._phone = None - # Despite the state of the real connection, keep track of whether - # the user has explicitly called .connect() or .disconnect() here. - # This information is required by the read thread, who will be the - # one attempting to reconnect on the background *while* the user - # doesn't explicitly call .disconnect(), thus telling it to stop - # retrying. The main thread, knowing there is a background thread - # attempting reconnection as soon as it happens, will just sleep. - self._user_connected = False - - # Save whether the user is authorized here (a.k.a. logged in) - self._authorized = False - - # Uploaded files cache so subsequent calls are instant - self._upload_cache = {} - - # Constantly read for results and updates from within the main client - self._recv_thread = None - - # Identifier of the main thread (the one that called .connect()). - # This will be used to create new connections from any other thread, - # so that requests can be sent in parallel. - self._main_thread_ident = None - - # Default PingRequest delay - self._last_ping = datetime.now() - self._ping_delay = timedelta(minutes=1) - - # endregion - - # region Connecting - - def connect(self, exported_auth=None): - """Connects to the Telegram servers, executing authentication if - required. Note that authenticating to the Telegram servers is - not the same as authenticating the desired user itself, which - may require a call (or several) to 'sign_in' for the first time. - - exported_auth is meant for internal purposes and can be ignored. - """ - self._main_thread_ident = threading.get_ident() - - if socks and self._recv_thread: - # Treat proxy errors specially since they're not related to - # Telegram itself, but rather to the proxy. If any happens on - # the read thread forward it to the main thread. - try: - ok = super().connect(exported_auth=exported_auth) - except socks.ProxyConnectionError as e: - ok = False - # Report the exception to the main thread - self.updates.set_error(e) - else: - ok = super().connect(exported_auth=exported_auth) - - if not ok: - return False - - self._user_connected = True - try: - self.sync_updates() - self._set_connected_and_authorized() - except UnauthorizedError: - self._authorized = False - - return True - - def disconnect(self): - """Disconnects from the Telegram server - and stops all the spawned threads""" - self._user_connected = False - self._recv_thread = None - - # This will trigger a "ConnectionResetError", usually, the background - # thread would try restarting the connection but since the - # ._recv_thread = None, it knows it doesn't have to. - super().disconnect() - - # Also disconnect all the cached senders - for sender in self._cached_clients.values(): - sender.disconnect() - - self._cached_clients.clear() - - # endregion - - # region Working with different connections - - def _on_read_thread(self): - return self._recv_thread is not None and \ - threading.get_ident() == self._recv_thread.ident - # endregion # region Telegram requests functions - def invoke(self, *requests, **kwargs): - """Invokes (sends) one or several MTProtoRequest and returns - (receives) their result. An optional named 'retries' parameter - can be used, indicating how many times it should retry. - """ - # This is only valid when the read thread is reconnecting, - # that is, the connection lock is locked. - on_read_thread = self._on_read_thread() - if on_read_thread and not self._connect_lock.locked(): - return # Just ignore, we would be raising and crashing the thread - - self.updates.check_error() - - # Determine the sender to be used (main or a new connection) - # TODO Polish this so it's nicer - on_main_thread = threading.get_ident() == self._main_thread_ident - if on_main_thread or on_read_thread: - sender = self._sender - else: - conn = Connection( - self.session.server_address, self.session.port, - mode=self._sender.connection._mode, - proxy=self._sender.connection.conn.proxy, - timeout=self._sender.connection.get_timeout() - ) - sender = MtProtoSender(self.session, conn) - sender.connect() - - try: - # We should call receive from this thread if there's no background - # thread reading or if the server disconnected us and we're trying - # to reconnect. This is because the read thread may either be - # locked also trying to reconnect or we may be said thread already. - call_receive = not on_main_thread or \ - self._recv_thread is None or self._connect_lock.locked() - - return super().invoke( - *requests, - call_receive=call_receive, - retries=kwargs.get('retries', 5), - sender=sender - ) - - except (PhoneMigrateError, NetworkMigrateError, UserMigrateError) as e: - self._logger.debug('DC error when invoking request, ' - 'attempting to reconnect at DC {}' - .format(e.new_dc)) - - # TODO What happens with the background thread here? - # For normal use cases, this won't happen, because this will only - # be on the very first connection (not authorized, not running), - # but may be an issue for people who actually travel? - self._reconnect(new_dc=e.new_dc) - return self.invoke(*requests) - - except ConnectionResetError as e: - if self._connect_lock.locked(): - # We are connecting and we don't want to reconnect there... - raise - while self._user_connected and not self._reconnect(): - sleep(0.1) # Retry forever until we can send the request - - finally: - if sender != self._sender: - sender.disconnect() - - # Let people use client(SomeRequest()) instead client.invoke(...) - __call__ = invoke - # region Authorization requests - def is_user_authorized(self): - """Has the user been authorized yet - (code request sent and confirmed)?""" - return self._authorized - def send_code_request(self, phone): """Sends a code request to the specified phone number""" if isinstance(phone, int): @@ -992,73 +801,3 @@ class TelegramClient(TelegramBareClient): ) # endregion - - # region Updates handling - - def sync_updates(self): - """Synchronizes self.updates to their initial state. Will be - called automatically on connection if self.updates.enabled = True, - otherwise it should be called manually after enabling updates. - """ - self.updates.process(self(GetStateRequest())) - - def add_update_handler(self, handler): - """Adds an update handler (a function which takes a TLObject, - an update, as its parameter) and listens for updates""" - sync = not self.updates.handlers - self.updates.handlers.append(handler) - if sync: - self.sync_updates() - - def remove_update_handler(self, handler): - self.updates.handlers.remove(handler) - - def list_update_handlers(self): - return self.updates.handlers[:] - - # endregion - - # Constant read - - def _set_connected_and_authorized(self): - self._authorized = True - if self._recv_thread is None: - self._recv_thread = Thread( - name='ReadThread', daemon=True, - target=self._recv_thread_impl - ) - self._recv_thread.start() - - # By using this approach, another thread will be - # created and started upon connection to constantly read - # from the other end. Otherwise, manual calls to .receive() - # must be performed. The MtProtoSender cannot be connected, - # or an error will be thrown. - # - # This way, sending and receiving will be completely independent. - def _recv_thread_impl(self): - while self._user_connected: - try: - if datetime.now() > self._last_ping + self._ping_delay: - self._sender.send(PingRequest( - int.from_bytes(os.urandom(8), 'big', signed=True) - )) - self._last_ping = datetime.now() - - self._sender.receive(update_state=self.updates) - except TimeoutError: - # No problem. - pass - except ConnectionResetError: - self._logger.debug('Server disconnected us. Reconnecting...') - while self._user_connected and not self._reconnect(): - sleep(0.1) # Retry forever, this is instant messaging - - except Exception as e: - # Unknown exception, pass it to the main thread - self.updates.set_error(e) - break - - self._recv_thread = None - - # endregion