diff --git a/telethon/network/connection/connection.py b/telethon/network/connection/connection.py index 1dfd1698..2f3957f2 100644 --- a/telethon/network/connection/connection.py +++ b/telethon/network/connection/connection.py @@ -44,9 +44,14 @@ class Connection(abc.ABC): Disconnects from the server. """ self._disconnected.set() - self._send_task.cancel() - self._recv_task.cancel() - self._writer.close() + if self._send_task: + self._send_task.cancel() + + if self._recv_task: + self._recv_task.cancel() + + if self._writer: + self._writer.close() @property def disconnected(self): diff --git a/telethon/network/mtprotosender.py b/telethon/network/mtprotosender.py index 582945ec..03da7f6b 100644 --- a/telethon/network/mtprotosender.py +++ b/telethon/network/mtprotosender.py @@ -126,41 +126,6 @@ class MTProtoSender: await self._disconnect() - # TODO Move this out of the "Public API" section - async def _disconnect(self, error=None): - __log__.info('Disconnecting from %s...', self._connection) - self._user_connected = False - try: - __log__.debug('Closing current connection...') - self._connection.disconnect() - finally: - __log__.debug('Cancelling {} pending message(s)...' - .format(len(self._pending_state))) - for state in self._pending_state.values(): - if error and not state.future.done(): - state.future.set_exception(error) - else: - state.future.cancel() - - self._pending_state.clear() - self._pending_ack.clear() - self._last_ack = None - - if self._send_loop_handle: - __log__.debug('Cancelling the send loop...') - self._send_loop_handle.cancel() - - if self._recv_loop_handle: - __log__.debug('Cancelling the receive loop...') - self._recv_loop_handle.cancel() - - __log__.info('Disconnection from %s complete!', self._connection) - if self._disconnected and not self._disconnected.done(): - if error: - self._disconnected.set_exception(error) - else: - self._disconnected.set_result(None) - def send(self, request, ordered=False): """ This method enqueues the given request to be sent. Its send @@ -224,12 +189,14 @@ class MTProtoSender: authorization key if necessary, and starting the send and receive loops. """ + # TODO With ``asyncio.open_connection``, no timeout occurs + # However, these are probably desirable in some circumstances. __log__.info('Connecting to %s...', self._connection) for retry in range(1, self._retries + 1): try: __log__.debug('Connection attempt {}...'.format(retry)) await self._connection.connect() - except (asyncio.TimeoutError, OSError) as e: + except OSError as e: __log__.warning('Attempt {} at connecting failed: {}: {}' .format(retry, type(e).__name__, e)) else: @@ -274,6 +241,40 @@ class MTProtoSender: self._disconnected = self._loop.create_future() __log__.info('Connection to %s complete!', self._connection) + async def _disconnect(self, error=None): + __log__.info('Disconnecting from %s...', self._connection) + self._user_connected = False + try: + __log__.debug('Closing current connection...') + self._connection.disconnect() + finally: + __log__.debug('Cancelling {} pending message(s)...' + .format(len(self._pending_state))) + for state in self._pending_state.values(): + if error and not state.future.done(): + state.future.set_exception(error) + else: + state.future.cancel() + + self._pending_state.clear() + self._pending_ack.clear() + self._last_ack = None + + if self._send_loop_handle: + __log__.debug('Cancelling the send loop...') + self._send_loop_handle.cancel() + + if self._recv_loop_handle: + __log__.debug('Cancelling the receive loop...') + self._recv_loop_handle.cancel() + + __log__.info('Disconnection from %s complete!', self._connection) + if self._disconnected and not self._disconnected.done(): + if error: + self._disconnected.set_exception(error) + else: + self._disconnected.set_result(None) + async def _reconnect(self): """ Cleanly disconnects and then reconnects. @@ -340,7 +341,7 @@ class MTProtoSender: # TODO Debug logs to notify which messages are being sent # TODO Try sending them while no future was cancelled? - # TODO Handle timeout, cancelled, arbitrary errors + # TODO Handle cancelled?, arbitrary errors await self._connection.send(state_list) for state in state_list: if not isinstance(state, list): @@ -411,10 +412,6 @@ class MTProtoSender: if not state.future.cancelled(): state.future.set_exception(error) else: - # TODO Would be nice to avoid accessing a per-obj read_result - # Instead have a variable that indicated how the result should - # be read (an enum) and dispatch to read the result, mostly - # always it's just a normal TLObject. with BinaryReader(rpc_result.body) as reader: result = state.request.read_result(reader)