From e319fa3aa9bd09d29399dbd289847864fc1bd65f Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Wed, 3 Oct 2018 14:46:10 +0200 Subject: [PATCH] Handle IncompleteReadError and InvalidChecksumError --- telethon/network/connection/connection.py | 38 +++++++++++++++-------- telethon/network/mtprotosender.py | 14 ++++++++- 2 files changed, 38 insertions(+), 14 deletions(-) diff --git a/telethon/network/connection/connection.py b/telethon/network/connection/connection.py index 2f3957f2..661d2de2 100644 --- a/telethon/network/connection/connection.py +++ b/telethon/network/connection/connection.py @@ -1,5 +1,9 @@ import abc import asyncio +import logging + + +__log__ = logging.getLogger(__name__) class Connection(abc.ABC): @@ -74,36 +78,44 @@ class Connection(abc.ABC): """ return self._send_queue.put(data) - def recv(self): + async def recv(self): """ Receives a packet of data through this connection mode. This method returns a coroutine. """ - return self._recv_queue.get() + ok, result = await self._recv_queue.get() + if ok: + return result + else: + raise result from None # TODO Get/put to the queue with cancellation async def _send_loop(self): """ This loop is constantly popping items off the queue to send them. """ - while not self._disconnected.is_set(): - self._send(await self._send_queue.get()) - await self._writer.drain() + try: + while not self._disconnected.is_set(): + self._send(await self._send_queue.get()) + await self._writer.drain() + except asyncio.CancelledError: + pass + except Exception: + logging.exception('Unhandled exception in the sending loop') + self.disconnect() - # TODO Handle IncompleteReadError and InvalidChecksumError async def _recv_loop(self): """ This loop is constantly putting items on the queue as they're read. """ - while not self._disconnected.is_set(): - try: + try: + while not self._disconnected.is_set(): data = await self._recv() - except asyncio.IncompleteReadError: - if not self._disconnected.is_set(): - raise - else: - await self._recv_queue.put(data) + await self._recv_queue.put((True, data)) + except Exception as e: + await self._recv_queue.put((False, e)) + self.disconnect() @abc.abstractmethod def _send(self, data): diff --git a/telethon/network/mtprotosender.py b/telethon/network/mtprotosender.py index 5dc5a530..d398eccc 100644 --- a/telethon/network/mtprotosender.py +++ b/telethon/network/mtprotosender.py @@ -9,7 +9,7 @@ from .requeststate import RequestState from .. import utils from ..errors import ( BadMessageError, BrokenAuthKeyError, SecurityError, TypeNotFoundError, - rpc_message_to_error + InvalidChecksumError, rpc_message_to_error ) from ..extensions import BinaryReader from ..helpers import _ReadyQueue @@ -376,6 +376,11 @@ class MTProtoSender: __log__.warning('Security error while unpacking a ' 'received message: %s', e) continue + except InvalidChecksumError as e: + __log__.warning( + 'Invalid checksum on the read packet (was %s expected %s)', + e.checksum, e.valid_checksum + ) except asyncio.CancelledError: return except (BrokenAuthKeyError, BufferError): @@ -383,6 +388,13 @@ class MTProtoSender: self._connection._state.auth_key = None self._start_reconnect() return + except asyncio.IncompleteReadError: + # TODO Handle packets that are too big and trigger this + # If it's not a packet that triggered this, just reconnect + __log__.info('Telegram closed the connection') + self._pending_state.clear() + self._start_reconnect() + return except Exception: __log__.exception('Unhandled error while receiving data') self._start_reconnect()