Merge branch 'master' into asyncio

This commit is contained in:
Lonami Exo
2017-10-16 10:03:01 +02:00
17 changed files with 450 additions and 245 deletions

View File

@@ -11,7 +11,10 @@ from ..errors import (
from ..extensions import BinaryReader
from ..tl import TLMessage, MessageContainer, GzipPacked
from ..tl.all_tlobjects import tlobjects
from ..tl.types import MsgsAck
from ..tl.types import (
MsgsAck, Pong, BadServerSalt, BadMsgNotification,
MsgNewDetailedInfo, NewSessionCreated, MsgDetailedInfo
)
from ..tl.functions.auth import LogOutRequest
logging.getLogger(__name__).addHandler(logging.NullHandler())
@@ -180,24 +183,33 @@ class MtProtoSender:
if code == 0xf35c6d01: # rpc_result, (response of an RPC call)
return await self._handle_rpc_result(msg_id, sequence, reader)
if code == 0x347773c5: # pong
if code == Pong.CONSTRUCTOR_ID:
return await self._handle_pong(msg_id, sequence, reader)
if code == 0x73f1f8dc: # msg_container
if code == MessageContainer.CONSTRUCTOR_ID:
return await self._handle_container(msg_id, sequence, reader, state)
if code == 0x3072cfa1: # gzip_packed
if code == GzipPacked.CONSTRUCTOR_ID:
return await self._handle_gzip_packed(msg_id, sequence, reader, state)
if code == 0xedab447b: # bad_server_salt
if code == BadServerSalt.CONSTRUCTOR_ID:
return await self._handle_bad_server_salt(msg_id, sequence, reader)
if code == 0xa7eff811: # bad_msg_notification
if code == BadMsgNotification.CONSTRUCTOR_ID:
return await self._handle_bad_msg_notification(msg_id, sequence, reader)
# msgs_ack, it may handle the request we wanted
if code == 0x62d6b459:
if code == MsgDetailedInfo.CONSTRUCTOR_ID:
return await self._handle_msg_detailed_info(msg_id, sequence, reader)
if code == MsgNewDetailedInfo.CONSTRUCTOR_ID:
return await self._handle_msg_new_detailed_info(msg_id, sequence, reader)
if code == NewSessionCreated.CONSTRUCTOR_ID:
return await self._handle_new_session_created(msg_id, sequence, reader)
if code == MsgsAck.CONSTRUCTOR_ID: # may handle the request we wanted
ack = reader.tgread_object()
assert isinstance(ack, MsgsAck)
# Ignore every ack request *unless* when logging out, when it's
# when it seems to only make sense. We also need to set a non-None
# result since Telegram doesn't send the response for these.
@@ -219,7 +231,12 @@ class MtProtoSender:
return True
self._logger.debug('Unknown message: {}'.format(hex(code)))
self._logger.debug(
'[WARN] Unknown message: {}, data left in the buffer: {}'
.format(
hex(code), repr(reader.get_bytes()[reader.tell_position():])
)
)
return False
# endregion
@@ -239,7 +256,7 @@ class MtProtoSender:
the given type, or returns None if it's not found/doesn't match.
"""
message = self._pending_receive.get(msg_id, None)
if isinstance(message.request, t):
if message and isinstance(message.request, t):
return self._pending_receive.pop(msg_id).request
def _clear_all_pending(self):
@@ -249,12 +266,13 @@ class MtProtoSender:
async def _handle_pong(self, msg_id, sequence, reader):
self._logger.debug('Handling pong')
reader.read_int(signed=False) # code
received_msg_id = reader.read_long()
pong = reader.tgread_object()
assert isinstance(pong, Pong)
request = self._pop_request(received_msg_id)
request = self._pop_request(pong.msg_id)
if request:
self._logger.debug('Pong confirmed a request')
request.result = pong
request.confirm_received.set()
return True
@@ -278,14 +296,15 @@ class MtProtoSender:
async def _handle_bad_server_salt(self, msg_id, sequence, reader):
self._logger.debug('Handling bad server salt')
reader.read_int(signed=False) # code
bad_msg_id = reader.read_long()
reader.read_int() # bad_msg_seq_no
reader.read_int() # error_code
new_salt = reader.read_long(signed=False)
self.session.salt = new_salt
bad_salt = reader.tgread_object()
assert isinstance(bad_salt, BadServerSalt)
request = self._pop_request(bad_msg_id)
# Our salt is unsigned, but the objects work with signed salts
self.session.salt = struct.unpack(
'<Q', struct.pack('<q', bad_salt.new_server_salt)
)[0]
request = self._pop_request(bad_salt.bad_msg_id)
if request:
await self.send(request)
@@ -293,31 +312,53 @@ class MtProtoSender:
async def _handle_bad_msg_notification(self, msg_id, sequence, reader):
self._logger.debug('Handling bad message notification')
reader.read_int(signed=False) # code
reader.read_long() # request_id
reader.read_int() # request_sequence
bad_msg = reader.tgread_object()
assert isinstance(bad_msg, BadMsgNotification)
error_code = reader.read_int()
error = BadMessageError(error_code)
if error_code in (16, 17):
error = BadMessageError(bad_msg.error_code)
if bad_msg.error_code in (16, 17):
# sent msg_id too low or too high (respectively).
# Use the current msg_id to determine the right time offset.
self.session.update_time_offset(correct_msg_id=msg_id)
self._logger.debug('Read Bad Message error: ' + str(error))
self._logger.debug('Attempting to use the correct time offset.')
return True
elif error_code == 32:
elif bad_msg.error_code == 32:
# msg_seqno too low, so just pump it up by some "large" amount
# TODO A better fix would be to start with a new fresh session ID
self.session._sequence += 64
return True
elif error_code == 33:
elif bad_msg.error_code == 33:
# msg_seqno too high never seems to happen but just in case
self.session._sequence -= 16
return True
else:
raise error
async def _handle_msg_detailed_info(self, msg_id, sequence, reader):
msg_new = reader.tgread_object()
assert isinstance(msg_new, MsgDetailedInfo)
# TODO For now, simply ack msg_new.answer_msg_id
# Relevant tdesktop source code: https://goo.gl/VvpCC6
await self._send_acknowledge(msg_new.answer_msg_id)
return True
async def _handle_msg_new_detailed_info(self, msg_id, sequence, reader):
msg_new = reader.tgread_object()
assert isinstance(msg_new, MsgNewDetailedInfo)
# TODO For now, simply ack msg_new.answer_msg_id
# Relevant tdesktop source code: https://goo.gl/G7DPsR
await self._send_acknowledge(msg_new.answer_msg_id)
return True
async def _handle_new_session_created(self, msg_id, sequence, reader):
new_session = reader.tgread_object()
assert isinstance(new_session, NewSessionCreated)
# TODO https://goo.gl/LMyN7A
return True
async def _handle_rpc_result(self, msg_id, sequence, reader):
self._logger.debug('Handling RPC result')
reader.read_int(signed=False) # code
@@ -346,25 +387,26 @@ class MtProtoSender:
# else TODO Where should this error be reported?
# Read may be async. Can an error not-belong to a request?
self._logger.debug('Read RPC error: %s', str(error))
else:
if request:
self._logger.debug('Reading request response')
if inner_code == 0x3072cfa1: # GZip packed
unpacked_data = gzip.decompress(reader.tgread_bytes())
with BinaryReader(unpacked_data) as compressed_reader:
request.on_response(compressed_reader)
else:
reader.seek(-4)
request.on_response(reader)
return True # All contents were read okay
self.session.process_entities(request.result)
request.confirm_received.set()
return True
elif request:
self._logger.debug('Reading request response')
if inner_code == 0x3072cfa1: # GZip packed
unpacked_data = gzip.decompress(reader.tgread_bytes())
with BinaryReader(unpacked_data) as compressed_reader:
request.on_response(compressed_reader)
else:
# If it's really a result for RPC from previous connection
# session, it will be skipped by the handle_container()
self._logger.debug('Lost request will be skipped.')
return False
reader.seek(-4)
request.on_response(reader)
self.session.process_entities(request.result)
request.confirm_received.set()
return True
# If it's really a result for RPC from previous connection
# session, it will be skipped by the handle_container()
self._logger.debug('Lost request will be skipped.')
return False
async def _handle_gzip_packed(self, msg_id, sequence, reader, state):
self._logger.debug('Handling gzip packed data')