Use a safer reconnect behaviour (respect multithread too)

This commit is contained in:
Lonami Exo 2017-09-22 12:20:38 +02:00
parent ffadcd029f
commit bc15b451b5
2 changed files with 47 additions and 25 deletions

View File

@ -3,6 +3,7 @@ from datetime import timedelta
from hashlib import md5 from hashlib import md5
from io import BytesIO from io import BytesIO
from os import path from os import path
from threading import RLock
from . import helpers as utils from . import helpers as utils
from .crypto import rsa, CdnDecrypter from .crypto import rsa, CdnDecrypter
@ -81,6 +82,10 @@ class TelegramBareClient:
self._logger = logging.getLogger(__name__) self._logger = logging.getLogger(__name__)
# Two threads may be calling reconnect() when the connection is lost,
# we only want one to actually perform the reconnection.
self._connect_lock = RLock()
# Cache "exported" senders 'dc_id: TelegramBareClient' and # Cache "exported" senders 'dc_id: TelegramBareClient' and
# their corresponding sessions not to recreate them all # their corresponding sessions not to recreate them all
# the time since it's a (somewhat expensive) process. # the time since it's a (somewhat expensive) process.
@ -177,22 +182,29 @@ class TelegramBareClient:
self._sender.disconnect() self._sender.disconnect()
def reconnect(self, new_dc=None): def reconnect(self, new_dc=None):
"""Disconnects and connects again (effectively reconnecting). """If 'new_dc' is not set, only a call to .connect() will be made
since it's assumed that the connection has been lost and the
library is reconnecting.
If 'new_dc' is not None, the current authorization key is If 'new_dc' is set, the client is first disconnected from the
removed, the DC used is switched, and a new connection is made. current data center, clears the auth key for the old DC, and
connects to the new data center.
""" """
self.disconnect() if new_dc is None:
# Assume we are disconnected due to some error, so connect again
if new_dc is not None: with self._connect_lock:
# Another thread may have connected again, so check that first
if not self.is_connected():
self.connect()
else:
self.disconnect()
self.session.auth_key = None # Force creating new auth_key self.session.auth_key = None # Force creating new auth_key
dc = self._get_dc(new_dc) dc = self._get_dc(new_dc)
ip = dc.ip_address ip = dc.ip_address
self._sender.connection.ip = self.session.server_address = ip self._sender.connection.ip = self.session.server_address = ip
self._sender.connection.port = self.session.port = dc.port self._sender.connection.port = self.session.port = dc.port
self.session.save() self.session.save()
self.connect()
self.connect()
# endregion # endregion

View File

@ -126,6 +126,15 @@ class TelegramClient(TelegramBareClient):
self._phone_code_hash = None self._phone_code_hash = None
self._phone = None self._phone = None
# Despite the state of the real connection, keep track of whether
# the user has explicitly called .connect() or .disconnect() here.
# This information is required by the read thread, who will be the
# one attempting to reconnect on the background *while* the user
# doesn't explicitly call .disconnect(), thus telling it to stop
# retrying. The main thread, knowing there is a background thread
# attempting reconnection as soon as it happens, will just sleep.
self._user_connected = False
# Save whether the user is authorized here (a.k.a. logged in) # Save whether the user is authorized here (a.k.a. logged in)
self._authorized = False self._authorized = False
@ -167,6 +176,7 @@ class TelegramClient(TelegramBareClient):
if not ok: if not ok:
return False return False
self._user_connected = True
try: try:
self.sync_updates() self.sync_updates()
self._set_connected_and_authorized() self._set_connected_and_authorized()
@ -178,8 +188,7 @@ class TelegramClient(TelegramBareClient):
def disconnect(self): def disconnect(self):
"""Disconnects from the Telegram server """Disconnects from the Telegram server
and stops all the spawned threads""" and stops all the spawned threads"""
# The existing thread will close eventually, since it's self._user_connected = False
# only running while the MtProtoSender.is_connected()
self._recv_thread = None self._recv_thread = None
# This will trigger a "ConnectionResetError", usually, the background # This will trigger a "ConnectionResetError", usually, the background
@ -255,9 +264,20 @@ class TelegramClient(TelegramBareClient):
'attempting to reconnect at DC {}' 'attempting to reconnect at DC {}'
.format(e.new_dc)) .format(e.new_dc))
# TODO What happens with the background thread here?
# For normal use cases, this won't happen, because this will only
# be on the very first connection (not authorized, not running),
# but may be an issue for people who actually travel?
self.reconnect(new_dc=e.new_dc) self.reconnect(new_dc=e.new_dc)
return self.invoke(request) return self.invoke(request)
except ConnectionResetError:
if self._connect_lock.locked():
# We are connecting and we don't want to reconnect there...
raise
while self._user_connected and not self.reconnect():
pass # Retry forever until we finally can send the request
# Let people use client(SomeRequest()) instead client.invoke(...) # Let people use client(SomeRequest()) instead client.invoke(...)
__call__ = invoke __call__ = invoke
@ -1031,7 +1051,7 @@ class TelegramClient(TelegramBareClient):
# #
# This way, sending and receiving will be completely independent. # This way, sending and receiving will be completely independent.
def _recv_thread_impl(self): def _recv_thread_impl(self):
while self._sender.is_connected(): while self._user_connected:
try: try:
if datetime.now() > self._last_ping + self._ping_delay: if datetime.now() > self._last_ping + self._ping_delay:
self._sender.send(PingRequest( self._sender.send(PingRequest(
@ -1040,24 +1060,14 @@ class TelegramClient(TelegramBareClient):
self._last_ping = datetime.now() self._last_ping = datetime.now()
self._sender.receive(update_state=self.updates) self._sender.receive(update_state=self.updates)
except AttributeError:
# 'NoneType' object has no attribute 'receive'.
# The only moment when this can happen is reconnection
# was triggered from another thread and the ._sender
# was set to None, so close this thread and exit by return.
self._recv_thread = None
return
except TimeoutError: except TimeoutError:
# No problem. # No problem.
pass pass
except ConnectionResetError: except ConnectionResetError:
if self._recv_thread is not None: self._logger.debug('Server disconnected us. Reconnecting...')
# Do NOT attempt reconnecting unless the connection was while self._user_connected and not self.reconnect():
# finished by the user -> ._recv_thread is None pass # Retry forever, this is instant messaging
self._logger.debug('Server disconnected us. Reconnecting...')
self._recv_thread = None # Not running anymore
self.reconnect()
return
except Exception as e: except Exception as e:
# Unknown exception, pass it to the main thread # Unknown exception, pass it to the main thread
self.updates.set_error(e) self.updates.set_error(e)