mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2025-06-18 19:16:43 +00:00
Remove ability to TcpClient.cancel_read()
This simplifies the process of sending and receiving data, and makes use of Python's socket.settimeout instead a hand-crafted version with a sort-of arbitrary self.delay = 0.1 (seconds), which should improve the speed of the method
This commit is contained in:
parent
cc280a129d
commit
36f51e1e3f
@ -14,11 +14,6 @@ class TcpClient:
|
|||||||
self._proxy = proxy
|
self._proxy = proxy
|
||||||
self._socket = None
|
self._socket = None
|
||||||
|
|
||||||
# Support for multi-threading advantages and safety
|
|
||||||
self.cancelled = Event() # Has the read operation been cancelled?
|
|
||||||
self.delay = 0.1 # Read delay when there was no data available
|
|
||||||
self._lock = Lock()
|
|
||||||
|
|
||||||
def _recreate_socket(self, mode):
|
def _recreate_socket(self, mode):
|
||||||
if self._proxy is None:
|
if self._proxy is None:
|
||||||
self._socket = socket.socket(mode, socket.SOCK_STREAM)
|
self._socket = socket.socket(mode, socket.SOCK_STREAM)
|
||||||
@ -36,14 +31,13 @@ class TcpClient:
|
|||||||
"""
|
"""
|
||||||
if not self.connected:
|
if not self.connected:
|
||||||
if ':' in ip: # IPv6
|
if ':' in ip: # IPv6
|
||||||
self._recreate_socket(socket.AF_INET6)
|
mode, address = socket.AF_INET6, (ip, port, 0, 0)
|
||||||
self._socket.settimeout(timeout)
|
|
||||||
self._socket.connect((ip, port, 0, 0))
|
|
||||||
else:
|
else:
|
||||||
self._recreate_socket(socket.AF_INET)
|
mode, address = socket.AF_INET, (ip, port)
|
||||||
|
|
||||||
|
self._recreate_socket(mode)
|
||||||
self._socket.settimeout(timeout)
|
self._socket.settimeout(timeout)
|
||||||
self._socket.connect((ip, port))
|
self._socket.connect(address)
|
||||||
self._socket.setblocking(False)
|
|
||||||
|
|
||||||
def _get_connected(self):
|
def _get_connected(self):
|
||||||
return self._socket is not None
|
return self._socket is not None
|
||||||
@ -68,20 +62,11 @@ class TcpClient:
|
|||||||
# TODO Check whether the code using this has multiple threads calling
|
# TODO Check whether the code using this has multiple threads calling
|
||||||
# .write() on the very same socket. If so, have two locks, one for
|
# .write() on the very same socket. If so, have two locks, one for
|
||||||
# .write() and another for .read().
|
# .write() and another for .read().
|
||||||
|
#
|
||||||
|
# TODO Timeout may be an issue when sending the data, Changed in v3.5:
|
||||||
|
# The socket timeout is now the maximum total duration to send all data.
|
||||||
try:
|
try:
|
||||||
view = memoryview(data)
|
self._socket.sendall(data)
|
||||||
total_sent, total = 0, len(data)
|
|
||||||
while total_sent < total:
|
|
||||||
try:
|
|
||||||
sent = self._socket.send(view[total_sent:])
|
|
||||||
if sent == 0:
|
|
||||||
self.close()
|
|
||||||
raise ConnectionResetError(
|
|
||||||
'The server has closed the connection.')
|
|
||||||
total_sent += sent
|
|
||||||
|
|
||||||
except BlockingIOError:
|
|
||||||
time.sleep(self.delay)
|
|
||||||
except BrokenPipeError:
|
except BrokenPipeError:
|
||||||
self.close()
|
self.close()
|
||||||
raise
|
raise
|
||||||
@ -95,23 +80,10 @@ class TcpClient:
|
|||||||
and it's waiting for more, the timeout will NOT cancel the
|
and it's waiting for more, the timeout will NOT cancel the
|
||||||
operation. Set to None for no timeout
|
operation. Set to None for no timeout
|
||||||
"""
|
"""
|
||||||
|
# TODO Remove the timeout from this method, always use previous one
|
||||||
# Ensure it is not cancelled at first, so we can enter the loop
|
|
||||||
self.cancelled.clear()
|
|
||||||
|
|
||||||
# Set the starting time so we can
|
|
||||||
# calculate whether the timeout should fire
|
|
||||||
start_time = datetime.now() if timeout is not None else None
|
|
||||||
|
|
||||||
with BufferedWriter(BytesIO(), buffer_size=size) as buffer:
|
with BufferedWriter(BytesIO(), buffer_size=size) as buffer:
|
||||||
bytes_left = size
|
bytes_left = size
|
||||||
while bytes_left != 0:
|
while bytes_left != 0:
|
||||||
# Only do cancel if no data was read yet
|
|
||||||
# Otherwise, carry on reading and finish
|
|
||||||
if self.cancelled.is_set() and bytes_left == size:
|
|
||||||
raise ReadCancelledError()
|
|
||||||
|
|
||||||
try:
|
|
||||||
partial = self._socket.recv(bytes_left)
|
partial = self._socket.recv(bytes_left)
|
||||||
if len(partial) == 0:
|
if len(partial) == 0:
|
||||||
self.close()
|
self.close()
|
||||||
@ -121,22 +93,6 @@ class TcpClient:
|
|||||||
buffer.write(partial)
|
buffer.write(partial)
|
||||||
bytes_left -= len(partial)
|
bytes_left -= len(partial)
|
||||||
|
|
||||||
except BlockingIOError as error:
|
|
||||||
# No data available yet, sleep a bit
|
|
||||||
time.sleep(self.delay)
|
|
||||||
|
|
||||||
# Check if the timeout finished
|
|
||||||
if timeout is not None:
|
|
||||||
time_passed = datetime.now() - start_time
|
|
||||||
if time_passed > timeout:
|
|
||||||
raise TimeoutError(
|
|
||||||
'The read operation exceeded the timeout.') from error
|
|
||||||
|
|
||||||
# If everything went fine, return the read bytes
|
# If everything went fine, return the read bytes
|
||||||
buffer.flush()
|
buffer.flush()
|
||||||
return buffer.raw.getvalue()
|
return buffer.raw.getvalue()
|
||||||
|
|
||||||
def cancel_read(self):
|
|
||||||
"""Cancels the read operation IF it hasn't yet
|
|
||||||
started, raising a ReadCancelledError"""
|
|
||||||
self.cancelled.set()
|
|
||||||
|
@ -96,11 +96,6 @@ class Connection:
|
|||||||
def close(self):
|
def close(self):
|
||||||
self.conn.close()
|
self.conn.close()
|
||||||
|
|
||||||
def cancel_receive(self):
|
|
||||||
"""Cancels (stops) trying to receive from the
|
|
||||||
remote peer and raises a ReadCancelledError"""
|
|
||||||
self.conn.cancel_read()
|
|
||||||
|
|
||||||
def get_client_delay(self):
|
def get_client_delay(self):
|
||||||
"""Gets the client read delay"""
|
"""Gets the client read delay"""
|
||||||
return self.conn.delay
|
return self.conn.delay
|
||||||
|
Loading…
Reference in New Issue
Block a user