From e47344c0f0ee2805bb011660f0c3d37ba3457f5e Mon Sep 17 00:00:00 2001 From: Lonami Date: Sat, 10 Sep 2016 10:17:15 +0200 Subject: [PATCH] Added a thread lock to the TcpClient This gives multi-threading safety without giving up on speed (now there's no need for additional sleeps) --- network/mtproto_sender.py | 19 +++++++----- network/tcp_client.py | 64 ++++++++++++++++++++++----------------- network/tcp_transport.py | 4 +-- 3 files changed, 51 insertions(+), 36 deletions(-) diff --git a/network/mtproto_sender.py b/network/mtproto_sender.py index 9e4bfef0..5d42a537 100755 --- a/network/mtproto_sender.py +++ b/network/mtproto_sender.py @@ -14,22 +14,27 @@ from tl.all_tlobjects import tlobjects class MtProtoSender: """MTProto Mobile Protocol sender (https://core.telegram.org/mtproto/description)""" - def __init__(self, transport, session): + def __init__(self, transport, session, check_updates_delay=0.1): + """If check_updates_delay is None, no updates will be checked. + Otherwise, specifies every how often updates should be checked""" + self.transport = transport self.session = session self.need_confirmation = [] # Message IDs that need confirmation self.on_update_handlers = [] - # Set up updates thread - self.updates_thread = Thread(target=self.updates_thread_method, name='Updates thread') - self.updates_thread_running = True - self.updates_thread_paused = True + # Set up updates thread, if the delay is not None + self.check_updates_delay = check_updates_delay + if check_updates_delay: + self.updates_thread = Thread(target=self.updates_thread_method, name='Updates thread') + self.updates_thread_running = True + self.updates_thread_paused = True - self.updates_thread.start() + self.updates_thread.start() def disconnect(self): - """Disconnects and **stops all the running threads**""" + """Disconnects and **stops all the running threads** if any""" self.updates_thread_running = False self.transport.cancel_receive() self.transport.close() diff --git a/network/tcp_client.py b/network/tcp_client.py index 034662f8..1beda5a5 100755 --- a/network/tcp_client.py +++ b/network/tcp_client.py @@ -1,6 +1,7 @@ # Python rough implementation of a C# TCP client import socket import time +from threading import Lock from errors import ReadCancelledError from utils import BinaryWriter @@ -10,8 +11,11 @@ class TcpClient: def __init__(self): self.connected = False self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + # Support for multi-threading advantages and safety self.cancelled = False # Has the read operation been cancelled? self.delay = 0.1 # Read delay when there was no data available + self.lock = Lock() def connect(self, ip, port): """Connects to the specified IP and port number""" @@ -27,40 +31,46 @@ class TcpClient: def write(self, data): """Writes (sends) the specified bytes to the connected peer""" - self.socket.sendall(data) + + # Ensure that only one thread can send data at once + with self.lock: + self.socket.sendall(data) def read(self, buffer_size): """Reads (receives) the specified bytes from the connected peer""" - self.cancelled = False # Ensure it is not cancelled at first - with BinaryWriter() as writer: - while writer.written_count < buffer_size and not self.cancelled: - try: - # When receiving from the socket, we may not receive all the data at once - # This is why we need to keep checking to make sure that we receive it all - left_count = buffer_size - writer.written_count - partial = self.socket.recv(left_count) - writer.write(partial) + # Ensure that only one thread can receive data at once + with self.lock: + # Ensure it is not cancelled at first, so we can enter the loop + self.cancelled = False - except BlockingIOError: - # There was no data available for us to read. Sleep a bit - time.sleep(self.delay) + with BinaryWriter() as writer: + while writer.written_count < buffer_size and not self.cancelled: + try: + # When receiving from the socket, we may not receive all the data at once + # This is why we need to keep checking to make sure that we receive it all + left_count = buffer_size - writer.written_count + partial = self.socket.recv(left_count) + writer.write(partial) - # If the operation was cancelled *but* data was read, - # this will result on data loss so raise an exception - # TODO this could be solved by using an internal FIFO buffer (first in, first out) - if self.cancelled: - if writer.written_count == 0: - raise ReadCancelledError() - else: - raise NotImplementedError('The read operation was cancelled when some data ' - 'was already read. This has not yet implemented ' - 'an internal buffer, so cannot continue.') + except BlockingIOError: + # There was no data available for us to read. Sleep a bit + time.sleep(self.delay) - return writer.get_bytes() + # If the operation was cancelled *but* data was read, + # this will result on data loss so raise an exception + # TODO this could be solved by using an internal FIFO buffer (first in, first out) + if self.cancelled: + if writer.written_count == 0: + raise ReadCancelledError() + else: + raise NotImplementedError('The read operation was cancelled when some data ' + 'was already read. This has not yet implemented ' + 'an internal buffer, so cannot continue.') + + # If everything went fine, return the read bytes + return writer.get_bytes() def cancel_read(self): - """Cancels the read operation if it was blocking and stops - the current thread until it's cancelled""" + """Cancels the read operation raising a ReadCancelledError""" self.cancelled = True - time.sleep(self.delay) diff --git a/network/tcp_transport.py b/network/tcp_transport.py index f53aaebe..73672793 100755 --- a/network/tcp_transport.py +++ b/network/tcp_transport.py @@ -60,8 +60,8 @@ class TcpTransport: self.tcp_client.close() def cancel_receive(self): - """Cancels (stops) trying to receive from the remote peer and - stops the current thread until it's cancelled""" + """Cancels (stops) trying to receive from the + remote peer and raises a ReadCancelledError""" self.tcp_client.cancel_read() def get_client_delay(self):