Complete all methods under MTProtoSender and document them

This commit is contained in:
Lonami Exo
2018-06-07 11:51:09 +02:00
parent 56b09c0c9d
commit a3687b8bb5
4 changed files with 177 additions and 612 deletions

View File

@@ -3,9 +3,10 @@ import logging
from .connection import ConnectionTcpFull
from .. import helpers
from ..errors import rpc_message_to_error
from ..errors import BadMessageError, rpc_message_to_error
from ..extensions import BinaryReader
from ..tl import TLMessage, MessageContainer, GzipPacked
from ..tl.functions.auth import LogOutRequest
from ..tl.types import (
MsgsAck, Pong, BadServerSalt, BadMsgNotification, FutureSalts,
MsgNewDetailedInfo, NewSessionCreated, MsgDetailedInfo
@@ -20,9 +21,30 @@ __log__ = logging.getLogger(__name__)
# loss? Should we try reconnecting forever? A certain amount of times?
# A timeout? What about recoverable errors, like connection reset?
class MTProtoSender:
"""
MTProto Mobile Protocol sender
(https://core.telegram.org/mtproto/description).
This class is responsible for wrapping requests into `TLMessage`'s,
sending them over the network and receiving them in a safe manner.
Automatic reconnection due to temporary network issues is a concern
for this class as well, including retry of messages that could not
be sent successfully.
A new authorization key will be generated on connection if no other
key exists yet.
"""
def __init__(self, session):
self.session = session
self._connection = ConnectionTcpFull()
# Whether the user has explicitly connected or disconnected.
#
# If a disconnection happens for any other reason and it
# was *not* user action then the pending messages won't
# be cleared but on explicit user disconnection all the
# pending futures should be cancelled.
self._user_connected = False
# Send and receive calls must be atomic
@@ -61,6 +83,15 @@ class MTProtoSender:
# Public API
async def connect(self, ip, port):
"""
Connects to the specified ``ip:port``, and generates a new
authorization key for the `MTProtoSender.session` if it does
not exist yet.
"""
if self._user_connected:
return
# TODO Generate auth_key if needed
async with self._send_lock:
await self._connection.connect(ip, port)
self._user_connected = True
@@ -68,6 +99,13 @@ class MTProtoSender:
self._recv_loop_handle = asyncio.ensure_future(self._recv_loop())
async def disconnect(self):
"""
Cleanly disconnects the instance from the network, cancels
all pending requests, and closes the send and receive loops.
"""
if not self._user_connected:
return
self._user_connected = False
try:
async with self._send_lock:
@@ -75,6 +113,10 @@ class MTProtoSender:
except:
__log__.exception('Ignoring exception upon disconnection')
finally:
for message in self._pending_messages.values():
message.future.cancel()
self._pending_messages.clear()
self._send_loop_handle.cancel()
self._recv_loop_handle.cancel()
@@ -111,6 +153,12 @@ class MTProtoSender:
# Loops
async def _send_loop(self):
"""
This loop is responsible for popping items off the send
queue, encrypting them, and sending them over the network.
Besides `connect`, only this method ever sends data.
"""
while self._user_connected:
# TODO If there's more than one item, send them all at once
body = helpers.pack_message(
@@ -121,6 +169,12 @@ class MTProtoSender:
await self._connection.send(body)
async def _recv_loop(self):
"""
This loop is responsible for reading all incoming responses
from the network, decrypting and handling or dispatching them.
Besides `connect`, only this method ever receives data.
"""
while self._user_connected:
# TODO Handle exceptions
async with self._recv_lock:
@@ -136,6 +190,12 @@ class MTProtoSender:
# Response Handlers
async def _process_message(self, msg_id, seq, reader):
"""
Adds the given message to the list of messages that must be
acknowledged and dispatches control to different ``_handle_*``
method based on its type.
"""
# TODO Send pending ack
self._pending_ack.add(msg_id)
code = reader.read_int(signed=False)
reader.seek(-4)
@@ -146,7 +206,14 @@ class MTProtoSender:
pass # TODO Process updates and their entities
async def _handle_rpc_result(self, msg_id, seq, reader):
# TODO Don't make this a special case
"""
Handles the result for Remote Procedure Calls:
rpc_result#f35c6d01 req_msg_id:long result:bytes = RpcResult;
This is where the future results for sent requests are set.
"""
# TODO Don't make this a special cased object
reader.read_int(signed=False) # code
message_id = reader.read_long()
inner_code = reader.read_int(signed=False)
@@ -186,49 +253,137 @@ class MTProtoSender:
# TODO Try reading an object
async def _handle_container(self, msg_id, seq, reader):
"""
Processes the inner messages of a container with many of them:
msg_container#73f1f8dc messages:vector<%Message> = MessageContainer;
"""
for inner_msg_id, _, inner_len in MessageContainer.iter_read(reader):
next_position = reader.tell_position() + inner_len
await self._process_message(inner_msg_id, seq, reader)
reader.set_position(next_position) # Ensure reading correctly
async def _handle_gzip_packed(self, msg_id, seq, reader):
raise NotImplementedError
"""
Unpacks the data from a gzipped object and processes it:
gzip_packed#3072cfa1 packed_data:bytes = Object;
"""
with BinaryReader(GzipPacked.read(reader)) as compressed_reader:
await self._process_message(msg_id, seq, compressed_reader)
async def _handle_pong(self, msg_id, seq, reader):
raise NotImplementedError
"""
Handles pong results, which don't come inside a ``rpc_result``
but are still sent through a request:
pong#347773c5 msg_id:long ping_id:long = Pong;
"""
pong = reader.tgread_object()
message = self._pending_messages.pop(pong.msg_id, None)
if message:
message.future.set_result(pong)
async def _handle_bad_server_salt(self, msg_id, seq, reader):
"""
Corrects the currently used server salt to use the right value
before enqueuing the rejected message to be re-sent:
bad_server_salt#edab447b bad_msg_id:long bad_msg_seqno:int
error_code:int new_server_salt:long = BadMsgNotification;
"""
bad_salt = reader.tgread_object()
self.session.salt = bad_salt.new_server_salt
self.session.save()
# "the bad_server_salt response is received with the
# correct salt, and the message is to be re-sent with it"
# TODO Will this work properly for containers?
await self._send_queue.put(self._pending_messages[bad_salt.bad_msg_id])
async def _handle_bad_notification(self, msg_id, seq, reader):
raise NotImplementedError
"""
Adjusts the current state to be correct based on the
received bad message notification whenever possible:
bad_msg_notification#a7eff811 bad_msg_id:long bad_msg_seqno:int
error_code:int = BadMsgNotification;
"""
bad_msg = reader.tgread_object()
if bad_msg.error_code in (16, 17):
# Sent msg_id too low or too high (respectively).
# Use the current msg_id to determine the right time offset.
self.session.update_time_offset(correct_msg_id=msg_id)
elif bad_msg.error_code == 32:
# msg_seqno too low, so just pump it up by some "large" amount
# TODO A better fix would be to start with a new fresh session ID
self.session.sequence += 64
elif bad_msg.error_code == 33:
# msg_seqno too high never seems to happen but just in case
self.session.sequence -= 16
else:
msg = self._pending_messages.pop(bad_msg.bad_msg_id, None)
if msg:
msg.future.set_exception(BadMessageError(bad_msg.error_code))
return
# Messages are to be re-sent once we've corrected the issue
await self._send_queue.put(self._pending_messages[bad_msg.bad_msg_id])
async def _handle_detailed_info(self, msg_id, seq, reader):
raise NotImplementedError
"""
Updates the current status with the received detailed information:
msg_detailed_info#276d3ec6 msg_id:long answer_msg_id:long
bytes:int status:int = MsgDetailedInfo;
"""
# TODO https://goo.gl/VvpCC6
self._pending_ack.add(reader.tgread_object().answer_msg_id)
async def _handle_new_detailed_info(self, msg_id, seq, reader):
raise NotImplementedError
"""
Updates the current status with the received detailed information:
msg_new_detailed_info#809db6df answer_msg_id:long
bytes:int status:int = MsgDetailedInfo;
"""
# TODO https://goo.gl/G7DPsR
self._pending_ack.add(reader.tgread_object().answer_msg_id)
async def _handle_new_session_created(self, msg_id, seq, reader):
"""
Updates the current status with the received session information:
new_session_created#9ec20908 first_msg_id:long unique_id:long
server_salt:long = NewSession;
"""
# TODO https://goo.gl/LMyN7A
new_session = reader.tgread_object()
self.session.salt = new_session.server_salt
self.session.salt = reader.tgread_object().server_salt
async def _handle_ack(self, msg_id, seq, reader):
# Ignore every ack request *unless* when logging out, when it's
# when it seems to only make sense. We also need to set a non-None
# result since Telegram doesn't send the response for these.
for msg_id in reader.tgread_object().msg_ids:
# TODO pop msg_id if of type LogOutRequest, and confirm it
pass
"""
Handles a server acknowledge about our messages. Normally
these can be ignored except in the case of ``auth.logOut``:
return True
auth.logOut#5717da40 = Bool;
Telegram doesn't seem to send its result so we need to confirm
it manually. No other request is known to have this behaviour.
"""
for msg_id in reader.tgread_object().msg_ids:
msg = self._pending_messages.get(msg_id, None)
if msg and isinstance(msg.request, LogOutRequest):
del self._pending_messages[msg_id]
msg.future.set_result(True)
async def _handle_future_salts(self, msg_id, seq, reader):
raise NotImplementedError
"""
Handles future salt results, which don't come inside a
``rpc_result`` but are still sent through a request:
future_salts#ae500895 req_msg_id:long now:int
salts:vector<future_salt> = FutureSalts;
"""
# TODO save these salts and automatically adjust to the
# correct one whenever the salt in use expires.
salts = reader.tgread_object()
msg = self._pending_messages.pop(msg_id, None)
if msg:
msg.future.set_result(salts)