From a3687b8bb53acdc1037e77720cb09224d153fe9c Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 7 Jun 2018 11:51:09 +0200 Subject: [PATCH] Complete all methods under MTProtoSender and document them --- telethon/network/__init__.py | 2 +- telethon/network/mtproto_sender.py | 590 ----------------------------- telethon/network/mtprotosender.py | 195 +++++++++- telethon/telegram_bare_client.py | 2 +- 4 files changed, 177 insertions(+), 612 deletions(-) delete mode 100644 telethon/network/mtproto_sender.py diff --git a/telethon/network/__init__.py b/telethon/network/__init__.py index 756df771..6d8584bc 100644 --- a/telethon/network/__init__.py +++ b/telethon/network/__init__.py @@ -4,7 +4,7 @@ with Telegram's servers and the protocol used (TCP full, abridged, etc.). """ from .mtproto_plain_sender import MtProtoPlainSender from .authenticator import do_authentication -from .mtproto_sender import MtProtoSender +from .mtprotosender import MTProtoSender from .connection import ( ConnectionTcpFull, ConnectionTcpAbridged, ConnectionTcpObfuscated, ConnectionTcpIntermediate diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py deleted file mode 100644 index e282abea..00000000 --- a/telethon/network/mtproto_sender.py +++ /dev/null @@ -1,590 +0,0 @@ -""" -This module contains the class used to communicate with Telegram's servers -encrypting every packet, and relies on a valid AuthKey in the used Session. -""" -import logging -from threading import Lock - -from .. import helpers, utils -from ..errors import ( - BadMessageError, InvalidChecksumError, BrokenAuthKeyError, - rpc_message_to_error -) -from ..extensions import BinaryReader -from ..tl import TLMessage, MessageContainer, GzipPacked -from ..tl.all_tlobjects import tlobjects -from ..tl.functions import InvokeAfterMsgRequest -from ..tl.functions.auth import LogOutRequest -from ..tl.types import ( - MsgsAck, Pong, BadServerSalt, BadMsgNotification, FutureSalts, - MsgNewDetailedInfo, NewSessionCreated, MsgDetailedInfo -) - -__log__ = logging.getLogger(__name__) - - -class MtProtoSender: - """ - MTProto Mobile Protocol sender - (https://core.telegram.org/mtproto/description). - - Note that this class is not thread-safe, and calling send/receive - from two or more threads at the same time is undefined behaviour. - Rationale: - a new connection should be spawned to send/receive requests - in parallel, so thread-safety (hence locking) isn't needed. - """ - - def __init__(self, session, connection): - """ - Initializes a new MTProto sender. - - :param session: - the Session to be used with this sender. Must contain the IP and - port of the server, salt, ID, and AuthKey, - :param connection: - the Connection to be used. - """ - self.session = session - self.connection = connection - - # Message IDs that need confirmation - self._need_confirmation = set() - - # Requests (as msg_id: Message) sent waiting to be received - self._pending_receive = {} - - # Multithreading - self._send_lock = Lock() - - # If we're invoking something from an update thread but we're also - # receiving other request from the main thread (e.g. an update arrives - # and we need to process it) we must ensure that only one is calling - # receive at a given moment, since the receive step is fragile. - self._recv_lock = Lock() - - def connect(self): - """Connects to the server.""" - self.connection.connect(self.session.server_address, self.session.port) - - def is_connected(self): - """ - Determines whether the sender is connected or not. - - :return: true if the sender is connected. - """ - return self.connection.is_connected() - - def disconnect(self): - """Disconnects from the server.""" - __log__.info('Disconnecting MtProtoSender...') - self.connection.close() - self._clear_all_pending() - - # region Send and receive - - def send(self, requests, ordered=False): - """ - Sends the specified TLObject(s) (which must be requests), - and acknowledging any message which needed confirmation. - - :param requests: the requests to be sent. - :param ordered: whether the requests should be invoked in the - order in which they appear or they can be executed - in arbitrary order in the server. - """ - if not utils.is_list_like(requests): - requests = (requests,) - - if ordered: - requests = iter(requests) - messages = [TLMessage(self.session, next(requests))] - for r in requests: - messages.append(TLMessage(self.session, r, - after_id=messages[-1].msg_id)) - else: - messages = [TLMessage(self.session, r) for r in requests] - - self._pending_receive.update({m.msg_id: m for m in messages}) - - __log__.debug('Sending requests with IDs: %s', ', '.join( - '{}: {}'.format(m.request.__class__.__name__, m.msg_id) - for m in messages - )) - - # Pack everything in the same container if we need to send AckRequests - if self._need_confirmation: - messages.append( - TLMessage(self.session, MsgsAck(list(self._need_confirmation))) - ) - self._need_confirmation.clear() - - if len(messages) == 1: - message = messages[0] - else: - message = TLMessage(self.session, MessageContainer(messages)) - # On bad_msg_salt errors, Telegram will reply with the ID of - # the container and not the requests it contains, so in case - # this happens we need to know to which container they belong. - for m in messages: - m.container_msg_id = message.msg_id - - self._send_message(message) - - def _send_acknowledge(self, msg_id): - """Sends a message acknowledge for the given msg_id.""" - self._send_message(TLMessage(self.session, MsgsAck([msg_id]))) - - def receive(self, update_state): - """ - Receives a single message from the connected endpoint. - - This method returns nothing, and will only affect other parts - of the MtProtoSender such as the updates callback being fired - or a pending request being confirmed. - - Any unhandled object (likely updates) will be passed to - update_state.process(TLObject). - - :param update_state: - the UpdateState that will process all the received - Update and Updates objects. - """ - if self._recv_lock.locked(): - with self._recv_lock: - # Don't busy wait, acquire it but return because there's - # already a receive running and we don't want another one. - # It would lock until Telegram sent another update even if - # the current receive already received the expected response. - return - - try: - with self._recv_lock: - body = self.connection.recv() - except (BufferError, InvalidChecksumError): - # TODO BufferError, we should spot the cause... - # "No more bytes left"; something wrong happened, clear - # everything to be on the safe side, or: - # - # "This packet should be skipped"; since this may have - # been a result for a request, invalidate every request - # and just re-invoke them to avoid problems - __log__.exception('Error while receiving server response. ' - '%d pending request(s) will be ignored', - len(self._pending_receive)) - self._clear_all_pending() - return - - message, remote_msg_id, remote_seq = self._decode_msg(body) - with BinaryReader(message) as reader: - self._process_msg(remote_msg_id, remote_seq, reader, update_state) - - # endregion - - # region Low level processing - - def _send_message(self, message): - """ - Sends the given encrypted through the network. - - :param message: the TLMessage to be sent. - """ - with self._send_lock: - self.connection.send(helpers.pack_message(self.session, message)) - - def _decode_msg(self, body): - """ - Decodes the body of the payload received from the network. - - :param body: the body to be decoded. - :return: a tuple of (decoded message, remote message id, remote seq). - """ - if len(body) < 8: - if body == b'l\xfe\xff\xff': - raise BrokenAuthKeyError() - else: - raise BufferError("Can't decode packet ({})".format(body)) - - with BinaryReader(body) as reader: - return helpers.unpack_message(self.session, reader) - - def _process_msg(self, msg_id, sequence, reader, state): - """ - Processes the message read from the network inside reader. - - :param msg_id: the ID of the message. - :param sequence: the sequence of the message. - :param reader: the BinaryReader that contains the message. - :param state: the current UpdateState. - :return: true if the message was handled correctly, false otherwise. - """ - # TODO Check salt, session_id and sequence_number - self._need_confirmation.add(msg_id) - - code = reader.read_int(signed=False) - reader.seek(-4) - - # These are a bit of special case, not yet generated by the code gen - if code == 0xf35c6d01: # rpc_result, (response of an RPC call) - __log__.debug('Processing Remote Procedure Call result') - return self._handle_rpc_result(msg_id, sequence, reader) - - if code == MessageContainer.CONSTRUCTOR_ID: - __log__.debug('Processing container result') - return self._handle_container(msg_id, sequence, reader, state) - - if code == GzipPacked.CONSTRUCTOR_ID: - __log__.debug('Processing gzipped result') - return self._handle_gzip_packed(msg_id, sequence, reader, state) - - if code not in tlobjects: - __log__.warning( - 'Unknown message with ID %d, data left in the buffer %s', - hex(code), repr(reader.get_bytes()[reader.tell_position():]) - ) - return False - - obj = reader.tgread_object() - __log__.debug('Processing %s result', type(obj).__name__) - - if isinstance(obj, Pong): - return self._handle_pong(msg_id, sequence, obj) - - if isinstance(obj, BadServerSalt): - return self._handle_bad_server_salt(msg_id, sequence, obj) - - if isinstance(obj, BadMsgNotification): - return self._handle_bad_msg_notification(msg_id, sequence, obj) - - if isinstance(obj, MsgDetailedInfo): - return self._handle_msg_detailed_info(msg_id, sequence, obj) - - if isinstance(obj, MsgNewDetailedInfo): - return self._handle_msg_new_detailed_info(msg_id, sequence, obj) - - if isinstance(obj, NewSessionCreated): - return self._handle_new_session_created(msg_id, sequence, obj) - - if isinstance(obj, MsgsAck): # may handle the request we wanted - # Ignore every ack request *unless* when logging out, when it's - # when it seems to only make sense. We also need to set a non-None - # result since Telegram doesn't send the response for these. - for msg_id in obj.msg_ids: - r = self._pop_request_of_type(msg_id, LogOutRequest) - if r: - r.result = True # Telegram won't send this value - r.confirm_received.set() - __log__.debug('Confirmed %s through ack', type(r).__name__) - - return True - - if isinstance(obj, FutureSalts): - r = self._pop_request(obj.req_msg_id) - if r: - r.result = obj - r.confirm_received.set() - __log__.debug('Confirmed %s through salt', type(r).__name__) - - # If the object isn't any of the above, then it should be an Update. - self.session.process_entities(obj) - if state: - state.process(obj) - - return True - - # endregion - - # region Message handling - - def _pop_request(self, msg_id): - """ - Pops a pending **request** from self._pending_receive. - - :param msg_id: the ID of the message that belongs to the request. - :return: the request, or None if it wasn't found. - """ - message = self._pending_receive.pop(msg_id, None) - if message: - return message.request - - def _pop_request_of_type(self, msg_id, t): - """ - Pops a pending **request** from self._pending_receive. - - :param msg_id: the ID of the message that belongs to the request. - :param t: the type of the desired request. - :return: the request matching the type t, or None if it wasn't found. - """ - message = self._pending_receive.get(msg_id, None) - if message and isinstance(message.request, t): - return self._pending_receive.pop(msg_id).request - - def _pop_requests_of_container(self, container_msg_id): - """ - Pops pending **requests** from self._pending_receive. - - :param container_msg_id: the ID of the container. - :return: the requests that belong to the given container. May be empty. - """ - msgs = [msg for msg in self._pending_receive.values() - if msg.container_msg_id == container_msg_id] - - requests = [msg.request for msg in msgs] - for msg in msgs: - self._pending_receive.pop(msg.msg_id, None) - return requests - - def _clear_all_pending(self): - """ - Clears all pending requests, and flags them all as received. - """ - for r in self._pending_receive.values(): - r.request.confirm_received.set() - __log__.info('Abruptly confirming %s', type(r).__name__) - self._pending_receive.clear() - - def _resend_request(self, msg_id): - """ - Re-sends the request that belongs to a certain msg_id. This may - also be the msg_id of a container if they were sent in one. - - :param msg_id: the ID of the request to be resent. - """ - request = self._pop_request(msg_id) - if request: - return self.send(request) - requests = self._pop_requests_of_container(msg_id) - if requests: - return self.send(*requests) - - def _handle_pong(self, msg_id, sequence, pong): - """ - Handles a Pong response. - - :param msg_id: the ID of the message. - :param sequence: the sequence of the message. - :param reader: the reader containing the Pong. - :return: true, as it always succeeds. - """ - request = self._pop_request(pong.msg_id) - if request: - request.result = pong - request.confirm_received.set() - __log__.debug('Confirmed %s through pong', type(request).__name__) - - return True - - def _handle_container(self, msg_id, sequence, reader, state): - """ - Handles a MessageContainer response. - - :param msg_id: the ID of the message. - :param sequence: the sequence of the message. - :param reader: the reader containing the MessageContainer. - :return: true, as it always succeeds. - """ - for inner_msg_id, _, inner_len in MessageContainer.iter_read(reader): - begin_position = reader.tell_position() - - # Note that this code is IMPORTANT for skipping RPC results of - # lost requests (i.e., ones from the previous connection session) - try: - if not self._process_msg(inner_msg_id, sequence, reader, state): - reader.set_position(begin_position + inner_len) - except: - # If any error is raised, something went wrong; skip the packet - reader.set_position(begin_position + inner_len) - raise - - return True - - def _handle_bad_server_salt(self, msg_id, sequence, bad_salt): - """ - Handles a BadServerSalt response. - - :param msg_id: the ID of the message. - :param sequence: the sequence of the message. - :param reader: the reader containing the BadServerSalt. - :return: true, as it always succeeds. - """ - self.session.salt = bad_salt.new_server_salt - self.session.save() - - # "the bad_server_salt response is received with the - # correct salt, and the message is to be re-sent with it" - self._resend_request(bad_salt.bad_msg_id) - return True - - def _handle_bad_msg_notification(self, msg_id, sequence, bad_msg): - """ - Handles a BadMessageError response. - - :param msg_id: the ID of the message. - :param sequence: the sequence of the message. - :param reader: the reader containing the BadMessageError. - :return: true, as it always succeeds. - """ - error = BadMessageError(bad_msg.error_code) - __log__.warning('Read bad msg notification %s: %s', bad_msg, error) - if bad_msg.error_code in (16, 17): - # sent msg_id too low or too high (respectively). - # Use the current msg_id to determine the right time offset. - self.session.update_time_offset(correct_msg_id=msg_id) - __log__.info('Attempting to use the correct time offset') - self._resend_request(bad_msg.bad_msg_id) - return True - elif bad_msg.error_code == 32: - # msg_seqno too low, so just pump it up by some "large" amount - # TODO A better fix would be to start with a new fresh session ID - self.session.sequence += 64 - __log__.info('Attempting to set the right higher sequence') - self._resend_request(bad_msg.bad_msg_id) - return True - elif bad_msg.error_code == 33: - # msg_seqno too high never seems to happen but just in case - self.session.sequence -= 16 - __log__.info('Attempting to set the right lower sequence') - self._resend_request(bad_msg.bad_msg_id) - return True - else: - raise error - - def _handle_msg_detailed_info(self, msg_id, sequence, msg_new): - """ - Handles a MsgDetailedInfo response. - - :param msg_id: the ID of the message. - :param sequence: the sequence of the message. - :param reader: the reader containing the MsgDetailedInfo. - :return: true, as it always succeeds. - """ - # TODO For now, simply ack msg_new.answer_msg_id - # Relevant tdesktop source code: https://goo.gl/VvpCC6 - self._send_acknowledge(msg_new.answer_msg_id) - return True - - def _handle_msg_new_detailed_info(self, msg_id, sequence, msg_new): - """ - Handles a MsgNewDetailedInfo response. - - :param msg_id: the ID of the message. - :param sequence: the sequence of the message. - :param reader: the reader containing the MsgNewDetailedInfo. - :return: true, as it always succeeds. - """ - # TODO For now, simply ack msg_new.answer_msg_id - # Relevant tdesktop source code: https://goo.gl/G7DPsR - self._send_acknowledge(msg_new.answer_msg_id) - return True - - def _handle_new_session_created(self, msg_id, sequence, new_session): - """ - Handles a NewSessionCreated response. - - :param msg_id: the ID of the message. - :param sequence: the sequence of the message. - :param reader: the reader containing the NewSessionCreated. - :return: true, as it always succeeds. - """ - self.session.salt = new_session.server_salt - # TODO https://goo.gl/LMyN7A - return True - - def _handle_rpc_result(self, msg_id, sequence, reader): - """ - Handles a RPCResult response. - - :param msg_id: the ID of the message. - :param sequence: the sequence of the message. - :param reader: the reader containing the RPCResult. - :return: true if the request ID to which this result belongs is found, - false otherwise (meaning nothing was read). - """ - reader.read_int(signed=False) # code - request_id = reader.read_long() - inner_code = reader.read_int(signed=False) - reader.seek(-4) - - __log__.debug('Received response for request with ID %d', request_id) - request = self._pop_request(request_id) - - if inner_code == 0x2144ca19: # RPC Error - reader.seek(4) - if self.session.report_errors and request: - error = rpc_message_to_error( - reader.read_int(), reader.tgread_string(), - report_method=type(request).CONSTRUCTOR_ID - ) - else: - error = rpc_message_to_error( - reader.read_int(), reader.tgread_string() - ) - - # Acknowledge that we received the error - self._send_acknowledge(request_id) - - if request: - request.rpc_error = error - request.confirm_received.set() - - __log__.debug('Confirmed %s through error %s', - type(request).__name__, error) - # else TODO Where should this error be reported? - # Read may be async. Can an error not-belong to a request? - return True # All contents were read okay - - elif request: - if inner_code == GzipPacked.CONSTRUCTOR_ID: - with BinaryReader(GzipPacked.read(reader)) as compressed_reader: - request.on_response(compressed_reader) - else: - request.on_response(reader) - - self.session.process_entities(request.result) - request.confirm_received.set() - __log__.debug( - 'Confirmed %s through normal result %s', - type(request).__name__, type(request.result).__name__ - ) - return True - - # If it's really a result for RPC from previous connection - # session, it will be skipped by the handle_container(). - # For some reason this also seems to happen when downloading - # photos, where the server responds with FileJpeg(). - def _try_read(r): - try: - return r.tgread_object() - except Exception as e: - return '(failed to read: {})'.format(e) - - if inner_code == GzipPacked.CONSTRUCTOR_ID: - with BinaryReader(GzipPacked.read(reader)) as compressed_reader: - obj = _try_read(compressed_reader) - else: - obj = _try_read(reader) - - __log__.warning( - 'Lost request (ID %d) with code %s will be skipped, contents: %s', - request_id, hex(inner_code), obj - ) - return False - - def _handle_gzip_packed(self, msg_id, sequence, reader, state): - """ - Handles a GzipPacked response. - - :param msg_id: the ID of the message. - :param sequence: the sequence of the message. - :param reader: the reader containing the GzipPacked. - :return: the result of processing the packed message. - """ - with BinaryReader(GzipPacked.read(reader)) as compressed_reader: - # We are reentering process_msg, which seemingly the same msg_id - # to the self._need_confirmation set. Remove it from there first - # to avoid any future conflicts (i.e. if we "ignore" messages - # that we are already aware of, see 1a91c02 and old 63dfb1e) - self._need_confirmation -= {msg_id} - return self._process_msg(msg_id, sequence, compressed_reader, state) - - # endregion diff --git a/telethon/network/mtprotosender.py b/telethon/network/mtprotosender.py index b2012757..e6a7ba9e 100644 --- a/telethon/network/mtprotosender.py +++ b/telethon/network/mtprotosender.py @@ -3,9 +3,10 @@ import logging from .connection import ConnectionTcpFull from .. import helpers -from ..errors import rpc_message_to_error +from ..errors import BadMessageError, rpc_message_to_error from ..extensions import BinaryReader from ..tl import TLMessage, MessageContainer, GzipPacked +from ..tl.functions.auth import LogOutRequest from ..tl.types import ( MsgsAck, Pong, BadServerSalt, BadMsgNotification, FutureSalts, MsgNewDetailedInfo, NewSessionCreated, MsgDetailedInfo @@ -20,9 +21,30 @@ __log__ = logging.getLogger(__name__) # loss? Should we try reconnecting forever? A certain amount of times? # A timeout? What about recoverable errors, like connection reset? class MTProtoSender: + """ + MTProto Mobile Protocol sender + (https://core.telegram.org/mtproto/description). + + This class is responsible for wrapping requests into `TLMessage`'s, + sending them over the network and receiving them in a safe manner. + + Automatic reconnection due to temporary network issues is a concern + for this class as well, including retry of messages that could not + be sent successfully. + + A new authorization key will be generated on connection if no other + key exists yet. + """ def __init__(self, session): self.session = session self._connection = ConnectionTcpFull() + + # Whether the user has explicitly connected or disconnected. + # + # If a disconnection happens for any other reason and it + # was *not* user action then the pending messages won't + # be cleared but on explicit user disconnection all the + # pending futures should be cancelled. self._user_connected = False # Send and receive calls must be atomic @@ -61,6 +83,15 @@ class MTProtoSender: # Public API async def connect(self, ip, port): + """ + Connects to the specified ``ip:port``, and generates a new + authorization key for the `MTProtoSender.session` if it does + not exist yet. + """ + if self._user_connected: + return + + # TODO Generate auth_key if needed async with self._send_lock: await self._connection.connect(ip, port) self._user_connected = True @@ -68,6 +99,13 @@ class MTProtoSender: self._recv_loop_handle = asyncio.ensure_future(self._recv_loop()) async def disconnect(self): + """ + Cleanly disconnects the instance from the network, cancels + all pending requests, and closes the send and receive loops. + """ + if not self._user_connected: + return + self._user_connected = False try: async with self._send_lock: @@ -75,6 +113,10 @@ class MTProtoSender: except: __log__.exception('Ignoring exception upon disconnection') finally: + for message in self._pending_messages.values(): + message.future.cancel() + + self._pending_messages.clear() self._send_loop_handle.cancel() self._recv_loop_handle.cancel() @@ -111,6 +153,12 @@ class MTProtoSender: # Loops async def _send_loop(self): + """ + This loop is responsible for popping items off the send + queue, encrypting them, and sending them over the network. + + Besides `connect`, only this method ever sends data. + """ while self._user_connected: # TODO If there's more than one item, send them all at once body = helpers.pack_message( @@ -121,6 +169,12 @@ class MTProtoSender: await self._connection.send(body) async def _recv_loop(self): + """ + This loop is responsible for reading all incoming responses + from the network, decrypting and handling or dispatching them. + + Besides `connect`, only this method ever receives data. + """ while self._user_connected: # TODO Handle exceptions async with self._recv_lock: @@ -136,6 +190,12 @@ class MTProtoSender: # Response Handlers async def _process_message(self, msg_id, seq, reader): + """ + Adds the given message to the list of messages that must be + acknowledged and dispatches control to different ``_handle_*`` + method based on its type. + """ + # TODO Send pending ack self._pending_ack.add(msg_id) code = reader.read_int(signed=False) reader.seek(-4) @@ -146,7 +206,14 @@ class MTProtoSender: pass # TODO Process updates and their entities async def _handle_rpc_result(self, msg_id, seq, reader): - # TODO Don't make this a special case + """ + Handles the result for Remote Procedure Calls: + + rpc_result#f35c6d01 req_msg_id:long result:bytes = RpcResult; + + This is where the future results for sent requests are set. + """ + # TODO Don't make this a special cased object reader.read_int(signed=False) # code message_id = reader.read_long() inner_code = reader.read_int(signed=False) @@ -186,49 +253,137 @@ class MTProtoSender: # TODO Try reading an object async def _handle_container(self, msg_id, seq, reader): + """ + Processes the inner messages of a container with many of them: + + msg_container#73f1f8dc messages:vector<%Message> = MessageContainer; + """ for inner_msg_id, _, inner_len in MessageContainer.iter_read(reader): next_position = reader.tell_position() + inner_len await self._process_message(inner_msg_id, seq, reader) reader.set_position(next_position) # Ensure reading correctly async def _handle_gzip_packed(self, msg_id, seq, reader): - raise NotImplementedError + """ + Unpacks the data from a gzipped object and processes it: + + gzip_packed#3072cfa1 packed_data:bytes = Object; + """ + with BinaryReader(GzipPacked.read(reader)) as compressed_reader: + await self._process_message(msg_id, seq, compressed_reader) async def _handle_pong(self, msg_id, seq, reader): - raise NotImplementedError + """ + Handles pong results, which don't come inside a ``rpc_result`` + but are still sent through a request: + + pong#347773c5 msg_id:long ping_id:long = Pong; + """ + pong = reader.tgread_object() + message = self._pending_messages.pop(pong.msg_id, None) + if message: + message.future.set_result(pong) async def _handle_bad_server_salt(self, msg_id, seq, reader): + """ + Corrects the currently used server salt to use the right value + before enqueuing the rejected message to be re-sent: + + bad_server_salt#edab447b bad_msg_id:long bad_msg_seqno:int + error_code:int new_server_salt:long = BadMsgNotification; + """ bad_salt = reader.tgread_object() self.session.salt = bad_salt.new_server_salt self.session.save() - - # "the bad_server_salt response is received with the - # correct salt, and the message is to be re-sent with it" + # TODO Will this work properly for containers? await self._send_queue.put(self._pending_messages[bad_salt.bad_msg_id]) async def _handle_bad_notification(self, msg_id, seq, reader): - raise NotImplementedError + """ + Adjusts the current state to be correct based on the + received bad message notification whenever possible: + + bad_msg_notification#a7eff811 bad_msg_id:long bad_msg_seqno:int + error_code:int = BadMsgNotification; + """ + bad_msg = reader.tgread_object() + if bad_msg.error_code in (16, 17): + # Sent msg_id too low or too high (respectively). + # Use the current msg_id to determine the right time offset. + self.session.update_time_offset(correct_msg_id=msg_id) + elif bad_msg.error_code == 32: + # msg_seqno too low, so just pump it up by some "large" amount + # TODO A better fix would be to start with a new fresh session ID + self.session.sequence += 64 + elif bad_msg.error_code == 33: + # msg_seqno too high never seems to happen but just in case + self.session.sequence -= 16 + else: + msg = self._pending_messages.pop(bad_msg.bad_msg_id, None) + if msg: + msg.future.set_exception(BadMessageError(bad_msg.error_code)) + return + + # Messages are to be re-sent once we've corrected the issue + await self._send_queue.put(self._pending_messages[bad_msg.bad_msg_id]) async def _handle_detailed_info(self, msg_id, seq, reader): - raise NotImplementedError + """ + Updates the current status with the received detailed information: + + msg_detailed_info#276d3ec6 msg_id:long answer_msg_id:long + bytes:int status:int = MsgDetailedInfo; + """ + # TODO https://goo.gl/VvpCC6 + self._pending_ack.add(reader.tgread_object().answer_msg_id) async def _handle_new_detailed_info(self, msg_id, seq, reader): - raise NotImplementedError + """ + Updates the current status with the received detailed information: + + msg_new_detailed_info#809db6df answer_msg_id:long + bytes:int status:int = MsgDetailedInfo; + """ + # TODO https://goo.gl/G7DPsR + self._pending_ack.add(reader.tgread_object().answer_msg_id) async def _handle_new_session_created(self, msg_id, seq, reader): + """ + Updates the current status with the received session information: + + new_session_created#9ec20908 first_msg_id:long unique_id:long + server_salt:long = NewSession; + """ # TODO https://goo.gl/LMyN7A - new_session = reader.tgread_object() - self.session.salt = new_session.server_salt + self.session.salt = reader.tgread_object().server_salt async def _handle_ack(self, msg_id, seq, reader): - # Ignore every ack request *unless* when logging out, when it's - # when it seems to only make sense. We also need to set a non-None - # result since Telegram doesn't send the response for these. - for msg_id in reader.tgread_object().msg_ids: - # TODO pop msg_id if of type LogOutRequest, and confirm it - pass + """ + Handles a server acknowledge about our messages. Normally + these can be ignored except in the case of ``auth.logOut``: - return True + auth.logOut#5717da40 = Bool; + + Telegram doesn't seem to send its result so we need to confirm + it manually. No other request is known to have this behaviour. + """ + for msg_id in reader.tgread_object().msg_ids: + msg = self._pending_messages.get(msg_id, None) + if msg and isinstance(msg.request, LogOutRequest): + del self._pending_messages[msg_id] + msg.future.set_result(True) async def _handle_future_salts(self, msg_id, seq, reader): - raise NotImplementedError + """ + Handles future salt results, which don't come inside a + ``rpc_result`` but are still sent through a request: + + future_salts#ae500895 req_msg_id:long now:int + salts:vector = FutureSalts; + """ + # TODO save these salts and automatically adjust to the + # correct one whenever the salt in use expires. + salts = reader.tgread_object() + msg = self._pending_messages.pop(msg_id, None) + if msg: + msg.future.set_result(salts) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index f3db342b..070ed49e 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -14,7 +14,7 @@ from .errors import ( PhoneMigrateError, NetworkMigrateError, UserMigrateError, AuthKeyError, RpcCallFailError ) -from .network import authenticator, MtProtoSender, ConnectionTcpFull +from .network import authenticator, MTProtoSender, ConnectionTcpFull from .sessions import Session, SQLiteSession from .tl import TLObject from .tl.all_tlobjects import LAYER