From 9dc400915229fc81ead1338a0e50d2d2628dd6f9 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Fri, 19 Oct 2018 14:41:50 +0200 Subject: [PATCH] Handle the right errors --- telethon/extensions/messagepacker.py | 2 - telethon/network/connection/connection.py | 5 ++ telethon/network/mtprotosender.py | 64 +++++++++++++---------- 3 files changed, 41 insertions(+), 30 deletions(-) diff --git a/telethon/extensions/messagepacker.py b/telethon/extensions/messagepacker.py index c4b410fe..23df601b 100644 --- a/telethon/extensions/messagepacker.py +++ b/telethon/extensions/messagepacker.py @@ -111,6 +111,4 @@ class MessagePacker: s.container_id = container_id data = buffer.getvalue() - __log__.debug('Packed %d message(s) in %d bytes for sending', - len(batch), len(data)) return batch, data diff --git a/telethon/network/connection/connection.py b/telethon/network/connection/connection.py index 75148af0..a83b87e1 100644 --- a/telethon/network/connection/connection.py +++ b/telethon/network/connection/connection.py @@ -4,6 +4,8 @@ import logging import socket import ssl as ssl_mod +from ...errors import InvalidChecksumError + __log__ = logging.getLogger(__name__) @@ -173,6 +175,9 @@ class Connection(abc.ABC): if isinstance(e, asyncio.IncompleteReadError): msg = 'The server closed the connection' logging.info(msg) + elif isinstance(e, InvalidChecksumError): + msg = 'The server response had an invalid checksum' + logging.info(msg) else: msg = 'Unexpected exception in the receive loop' logging.exception(msg) diff --git a/telethon/network/mtprotosender.py b/telethon/network/mtprotosender.py index 9c4e0e55..0d2fa3f4 100644 --- a/telethon/network/mtprotosender.py +++ b/telethon/network/mtprotosender.py @@ -196,7 +196,7 @@ class MTProtoSender: try: __log__.debug('Connection attempt {}...'.format(retry)) await self._connection.connect(timeout=self._connect_timeout) - except (OSError, asyncio.TimeoutError) as e: + except (ConnectionError, asyncio.TimeoutError) as e: __log__.warning('Attempt {} at connecting failed: {}: {}' .format(retry, type(e).__name__, e)) else: @@ -287,9 +287,6 @@ class MTProtoSender: __log__.debug('Awaiting for the receive loop before reconnecting...') await self._recv_loop_handle - __log__.debug('Closing current connection...') - self._connection.disconnect() - self._reconnecting = False # Start with a clean state (and thus session ID) to avoid old msgs @@ -334,19 +331,31 @@ class MTProtoSender: self._last_acks.append(ack) self._pending_ack.clear() - batch, data = await self._send_queue.get( - self._connection.disconnected) + __log__.debug('Waiting for messages to send...') + # TODO Wait for the connection send queue to be empty? + # This means that while it's not empty we can wait for + # more messages to be added to the send queue. + try: + batch, data = await self._send_queue.get( + self._connection.disconnected) + except asyncio.CancelledError: + return if not data: continue + __log__.debug('Encrypting %d message(s) in %d bytes for sending', + len(batch), len(data)) + + data = self._state.encrypt_message_data(data) try: - # TODO Split except - data = self._state.encrypt_message_data(data) await self._connection.send(data) - except Exception: - __log__.exception('Unhandled error while sending data') - continue + except asyncio.CancelledError: + return + except ConnectionError: + __log__.info('Connection closed while sending data') + self._start_reconnect() + return for state in batch: if not isinstance(state, list): @@ -357,6 +366,8 @@ class MTProtoSender: if isinstance(s.request, TLRequest): self._pending_state[s.msg_id] = s + __log__.debug('Encrypted messages put in a queue to be sent') + async def _recv_loop(self): """ This loop is responsible for reading all incoming responses @@ -367,10 +378,18 @@ class MTProtoSender: while self._user_connected and not self._reconnecting: __log__.debug('Receiving items from the network...') try: - # TODO Split except body = await self._connection.recv() + except asyncio.CancelledError: + return + except ConnectionError: + __log__.info('Connection closed while receiving data') + self._start_reconnect() + return + + try: message = self._state.decrypt_message_data(body) except TypeNotFoundError as e: + # Received object which we don't know how to deserialize __log__.info('Type %08x not found, remaining data %r', e.invalid_constructor_id, e.remaining) continue @@ -380,13 +399,6 @@ class MTProtoSender: __log__.warning('Security error while unpacking a ' 'received message: %s', e) continue - except InvalidChecksumError as e: - __log__.warning( - 'Invalid checksum on the read packet (was %s expected %s)', - e.checksum, e.valid_checksum - ) - except asyncio.CancelledError: - return except BufferError as e: if isinstance(e, InvalidBufferError) and e.code == 404: __log__.info('Broken authorization key; resetting') @@ -396,19 +408,15 @@ class MTProtoSender: self._auth_key.key = None self._start_reconnect() return - except asyncio.IncompleteReadError: - __log__.info('Telegram closed the connection') - self._start_reconnect() - return except Exception: __log__.exception('Unhandled error while receiving data') self._start_reconnect() return - else: - try: - await self._process_message(message) - except Exception: - __log__.exception('Unhandled error while processing msgs') + + try: + await self._process_message(message) + except Exception: + __log__.exception('Unhandled error while processing msgs') # Response Handlers