diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index ef8b4794..674a4e03 100644 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -28,8 +28,10 @@ class MtProtoSender: self._need_confirmation = [] # Message IDs that need confirmation self._pending_receive = [] # Requests sent waiting to be received - # Store an RLock instance to make this class safely multi-threaded - self._lock = RLock() + # Sending and receiving are independent, but two threads cannot + # send or receive at the same time no matter what. + self._send_lock = RLock() + self._recv_lock = RLock() # Used when logging out, the only request that seems to use 'ack' # TODO There might be a better way to handle msgs_ack requests @@ -52,23 +54,17 @@ class MtProtoSender: """Sends the specified MTProtoRequest, previously sending any message which needed confirmation.""" - # Now only us can be using this method - with self._lock: - self._logger.debug('send() acquired the lock') + # If any message needs confirmation send an AckRequest first + self._send_acknowledges() - # If any message needs confirmation send an AckRequest first - self._send_acknowledges() + # Finally send our packed request + with BinaryWriter() as writer: + request.on_send(writer) + self._send_packet(writer.get_bytes(), request) + self._pending_receive.append(request) - # Finally send our packed request - with BinaryWriter() as writer: - request.on_send(writer) - self._send_packet(writer.get_bytes(), request) - self._pending_receive.append(request) - - # And update the saved session - self.session.save() - - self._logger.debug('send() released the lock') + # And update the saved session + self.session.save() def _send_acknowledges(self): """Sends a messages acknowledge for all those who _need_confirmation""" @@ -90,16 +86,13 @@ class MtProtoSender: Any unhandled object (likely updates) will be passed to update_state.process(TLObject). """ - # TODO Don't ignore updates - self._logger.debug('Receiving a message...') - body = self.connection.recv() - message, remote_msg_id, remote_seq = self._decode_msg(body) + with self._recv_lock: + body = self.connection.recv() + message, remote_msg_id, remote_seq = self._decode_msg(body) with BinaryReader(message) as reader: self._process_msg(remote_msg_id, remote_seq, reader, update_state) - self._logger.debug('Received message.') - # endregion # region Low level processing @@ -107,8 +100,6 @@ class MtProtoSender: def _send_packet(self, packet, request): """Sends the given packet bytes with the additional information of the original request. - - This does NOT lock the threads! """ request.request_msg_id = self.session.get_new_msg_id() @@ -134,7 +125,8 @@ class MtProtoSender: self.session.auth_key.key_id, signed=False) cipher_writer.write(msg_key) cipher_writer.write(cipher_text) - self.connection.send(cipher_writer.get_bytes()) + with self._send_lock: + self.connection.send(cipher_writer.get_bytes()) def _decode_msg(self, body): """Decodes an received encrypted message body bytes""" diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index 3ac6e65f..17e2ccb4 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -2,7 +2,7 @@ import os import threading from datetime import datetime, timedelta from mimetypes import guess_type -from threading import RLock, Thread +from threading import Thread from . import TelegramBareClient from . import helpers as utils @@ -50,9 +50,6 @@ class TelegramClient(TelegramBareClient): As opposed to the TelegramBareClient, this one features downloading media from different data centers, starting a second thread to handle updates, and some very common functionality. - - This should be used when the (slight) overhead of having locks, - threads, and possibly multiple connections is not an issue. """ # region Initialization @@ -118,9 +115,6 @@ class TelegramClient(TelegramBareClient): timeout=timeout ) - # Safety across multiple threads (for the updates thread) - self._lock = RLock() - # Used on connection - the user may modify these and reconnect kwargs['app_version'] = kwargs.get('app_version', self.__version__) for name, value in kwargs.items(): @@ -239,8 +233,6 @@ class TelegramClient(TelegramBareClient): raise AssertionError('Cannot invoke requests from the ReadThread') try: - self._lock.acquire() - # Users may call this method from within some update handler. # If this is the case, then the thread invoking the request # will be the one which should be reading (but is invoking the @@ -259,9 +251,6 @@ class TelegramClient(TelegramBareClient): self.reconnect(new_dc=e.new_dc) return self.invoke(request) - finally: - self._lock.release() - # Let people use client(SomeRequest()) instead client.invoke(...) __call__ = invoke