mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2025-08-08 21:10:29 +00:00
Merge branch 'master' into asyncio
This commit is contained in:
@@ -30,6 +30,7 @@ from .tl.functions.upload import (
|
||||
GetFileRequest, SaveBigFilePartRequest, SaveFilePartRequest
|
||||
)
|
||||
from .tl.types import InputFile, InputFileBig
|
||||
from .tl.types.auth import ExportedAuthorization
|
||||
from .tl.types.upload import FileCdnRedirect
|
||||
from .update_state import UpdateState
|
||||
from .utils import get_appropriated_part_size
|
||||
@@ -59,7 +60,7 @@ class TelegramBareClient:
|
||||
__version__ = '0.15.3'
|
||||
|
||||
# TODO Make this thread-safe, all connections share the same DC
|
||||
_dc_options = None
|
||||
_config = None # Server configuration (with .dc_options)
|
||||
|
||||
# region Initialization
|
||||
|
||||
@@ -148,7 +149,7 @@ class TelegramBareClient:
|
||||
|
||||
# region Connecting
|
||||
|
||||
async def connect(self, _exported_auth=None, _sync_updates=True, _cdn=False):
|
||||
async def connect(self, _sync_updates=True):
|
||||
"""Connects to the Telegram servers, executing authentication if
|
||||
required. Note that authenticating to the Telegram servers is
|
||||
not the same as authenticating the desired user itself, which
|
||||
@@ -156,60 +157,21 @@ class TelegramBareClient:
|
||||
|
||||
Note that the optional parameters are meant for internal use.
|
||||
|
||||
If '_exported_auth' is not None, it will be used instead to
|
||||
determine the authorization key for the current session.
|
||||
|
||||
If '_sync_updates', sync_updates() will be called and a
|
||||
second thread will be started if necessary. Note that this
|
||||
will FAIL if the client is not connected to the user's
|
||||
native data center, raising a "UserMigrateError", and
|
||||
calling .disconnect() in the process.
|
||||
|
||||
If '_cdn' is False, methods that are not allowed on such data
|
||||
centers won't be invoked.
|
||||
"""
|
||||
try:
|
||||
await self._sender.connect()
|
||||
if not self.session.auth_key:
|
||||
# New key, we need to tell the server we're going to use
|
||||
# the latest layer
|
||||
try:
|
||||
self.session.auth_key, self.session.time_offset = \
|
||||
await authenticator.do_authentication(self._sender.connection)
|
||||
except BrokenAuthKeyError:
|
||||
self._user_connected = False
|
||||
return False
|
||||
|
||||
self.session.layer = LAYER
|
||||
self.session.save()
|
||||
init_connection = True
|
||||
else:
|
||||
init_connection = self.session.layer != LAYER
|
||||
|
||||
if init_connection:
|
||||
if _exported_auth is not None:
|
||||
await self._init_connection(ImportAuthorizationRequest(
|
||||
_exported_auth.id, _exported_auth.bytes
|
||||
))
|
||||
elif not _cdn:
|
||||
TelegramBareClient._dc_options = \
|
||||
(await self._init_connection(GetConfigRequest())).dc_options
|
||||
|
||||
elif _exported_auth is not None:
|
||||
await self(ImportAuthorizationRequest(
|
||||
_exported_auth.id, _exported_auth.bytes
|
||||
))
|
||||
|
||||
if TelegramBareClient._dc_options is None and not _cdn:
|
||||
TelegramBareClient._dc_options = \
|
||||
(await self(GetConfigRequest())).dc_options
|
||||
|
||||
# Connection was successful! Try syncing the update state
|
||||
# UNLESS '_sync_updates' is False (we probably are in
|
||||
# another data center and this would raise UserMigrateError)
|
||||
# to also assert whether the user is logged in or not.
|
||||
self._user_connected = True
|
||||
if self._authorized is None and _sync_updates and not _cdn:
|
||||
if self._authorized is None and _sync_updates:
|
||||
try:
|
||||
await self.sync_updates()
|
||||
self._set_connected_and_authorized()
|
||||
@@ -224,11 +186,7 @@ class TelegramBareClient:
|
||||
# This is fine, probably layer migration
|
||||
self._logger.debug('Found invalid item, probably migrating', e)
|
||||
self.disconnect()
|
||||
return await self.connect(
|
||||
_exported_auth=_exported_auth,
|
||||
_sync_updates=_sync_updates,
|
||||
_cdn=_cdn
|
||||
)
|
||||
return await self.connect(_sync_updates=_sync_updates)
|
||||
|
||||
except (RPCError, ConnectionError) as error:
|
||||
# Probably errors from the previous session, ignore them
|
||||
@@ -241,8 +199,9 @@ class TelegramBareClient:
|
||||
def is_connected(self):
|
||||
return self._sender.is_connected()
|
||||
|
||||
async def _init_connection(self, query=None):
|
||||
result = await self(InvokeWithLayerRequest(LAYER, InitConnectionRequest(
|
||||
def _wrap_init_connection(self, query):
|
||||
"""Wraps query around InvokeWithLayerRequest(InitConnectionRequest())"""
|
||||
return InvokeWithLayerRequest(LAYER, InitConnectionRequest(
|
||||
api_id=self.api_id,
|
||||
device_model=self.session.device_model,
|
||||
system_version=self.session.system_version,
|
||||
@@ -251,10 +210,7 @@ class TelegramBareClient:
|
||||
system_lang_code=self.session.system_lang_code,
|
||||
lang_pack='', # "langPacks are for official apps only"
|
||||
query=query
|
||||
)))
|
||||
self.session.layer = LAYER
|
||||
self.session.save()
|
||||
return result
|
||||
))
|
||||
|
||||
def disconnect(self):
|
||||
"""Disconnects from the Telegram server"""
|
||||
@@ -286,13 +242,18 @@ class TelegramBareClient:
|
||||
finally:
|
||||
self._reconnect_lock.release()
|
||||
else:
|
||||
self.disconnect()
|
||||
self.session.auth_key = None # Force creating new auth_key
|
||||
# Since we're reconnecting possibly due to a UserMigrateError,
|
||||
# we need to first know the Data Centers we can connect to. Do
|
||||
# that before disconnecting.
|
||||
dc = await self._get_dc(new_dc)
|
||||
ip = dc.ip_address
|
||||
self.session.server_address = ip
|
||||
|
||||
self.session.server_address = dc.ip_address
|
||||
self.session.port = dc.port
|
||||
# auth_key's are associated with a server, which has now changed
|
||||
# so it's not valid anymore. Set to None to force recreating it.
|
||||
self.session.auth_key = None
|
||||
self.session.save()
|
||||
self.disconnect()
|
||||
return await self.connect()
|
||||
|
||||
# endregion
|
||||
@@ -301,11 +262,8 @@ class TelegramBareClient:
|
||||
|
||||
async def _get_dc(self, dc_id, ipv6=False, cdn=False):
|
||||
"""Gets the Data Center (DC) associated to 'dc_id'"""
|
||||
if TelegramBareClient._dc_options is None:
|
||||
raise ConnectionError(
|
||||
'Cannot determine the required data center IP address. '
|
||||
'Stabilise a successful initial connection first.'
|
||||
)
|
||||
if not TelegramBareClient._config:
|
||||
TelegramBareClient._config = await self(GetConfigRequest())
|
||||
|
||||
try:
|
||||
if cdn:
|
||||
@@ -314,15 +272,15 @@ class TelegramBareClient:
|
||||
rsa.add_key(pk.public_key)
|
||||
|
||||
return next(
|
||||
dc for dc in TelegramBareClient._dc_options if dc.id == dc_id
|
||||
and bool(dc.ipv6) == ipv6 and bool(dc.cdn) == cdn
|
||||
dc for dc in TelegramBareClient._config.dc_options
|
||||
if dc.id == dc_id and bool(dc.ipv6) == ipv6 and bool(dc.cdn) == cdn
|
||||
)
|
||||
except StopIteration:
|
||||
if not cdn:
|
||||
raise
|
||||
|
||||
# New configuration, perhaps a new CDN was added?
|
||||
TelegramBareClient._dc_options = await (self(GetConfigRequest())).dc_options
|
||||
TelegramBareClient._config = await self(GetConfigRequest())
|
||||
return await self._get_dc(dc_id, ipv6=ipv6, cdn=cdn)
|
||||
|
||||
async def _get_exported_client(self, dc_id):
|
||||
@@ -363,7 +321,14 @@ class TelegramBareClient:
|
||||
timeout=self._sender.connection.get_timeout(),
|
||||
loop=self._loop
|
||||
)
|
||||
await client.connect(_exported_auth=export_auth, _sync_updates=False)
|
||||
await client.connect(_sync_updates=False)
|
||||
if isinstance(export_auth, ExportedAuthorization):
|
||||
await client(ImportAuthorizationRequest(
|
||||
id=export_auth.id, bytes=export_auth.bytes
|
||||
))
|
||||
elif export_auth is not None:
|
||||
self._logger.warning('Unknown return export_auth type', export_auth)
|
||||
|
||||
client._authorized = True # We exported the auth, so we got auth
|
||||
return client
|
||||
|
||||
@@ -386,9 +351,10 @@ class TelegramBareClient:
|
||||
|
||||
# This will make use of the new RSA keys for this specific CDN.
|
||||
#
|
||||
# This relies on the fact that TelegramBareClient._dc_options is
|
||||
# static and it won't be called from this DC (it would fail).
|
||||
await client.connect(_cdn=True) # Avoid invoking non-CDN methods
|
||||
# We won't be calling GetConfigRequest because it's only called
|
||||
# when needed by ._get_dc, and also it's static so it's likely
|
||||
# set already. Avoid invoking non-CDN methods by not syncing updates.
|
||||
await client.connect(_sync_updates=False)
|
||||
client._authorized = self._authorized
|
||||
return client
|
||||
|
||||
@@ -423,11 +389,33 @@ class TelegramBareClient:
|
||||
invoke = __call__
|
||||
|
||||
async def _invoke(self, call_receive, retry, *requests):
|
||||
# We need to specify the new layer (by initializing a new
|
||||
# connection) if it has changed from the latest known one.
|
||||
init_connection = self.session.layer != LAYER
|
||||
|
||||
try:
|
||||
# Ensure that we start with no previous errors (i.e. resending)
|
||||
for x in requests:
|
||||
x.rpc_error = None
|
||||
|
||||
if not self.session.auth_key:
|
||||
# New key, we need to tell the server we're going to use
|
||||
# the latest layer and initialize the connection doing so.
|
||||
self.session.auth_key, self.session.time_offset = \
|
||||
await authenticator.do_authentication(self._sender.connection)
|
||||
init_connection = True
|
||||
|
||||
if init_connection:
|
||||
if len(requests) == 1:
|
||||
requests = [self._wrap_init_connection(requests[0])]
|
||||
else:
|
||||
# We need a SINGLE request (like GetConfig) to init conn.
|
||||
# Once that's done, the N original requests will be
|
||||
# invoked.
|
||||
TelegramBareClient._config = await self(
|
||||
self._wrap_init_connection(GetConfigRequest())
|
||||
)
|
||||
|
||||
await self._sender.send(*requests)
|
||||
|
||||
if not call_receive:
|
||||
@@ -440,6 +428,13 @@ class TelegramBareClient:
|
||||
while not all(x.confirm_received.is_set() for x in requests):
|
||||
await self._sender.receive(update_state=self.updates)
|
||||
|
||||
except BrokenAuthKeyError:
|
||||
self._logger.error('Broken auth key, a new one will be generated')
|
||||
self.session.auth_key = None
|
||||
|
||||
except TimeoutError:
|
||||
pass # We will just retry
|
||||
|
||||
except ConnectionResetError:
|
||||
if not self._user_connected or self._reconnect_lock.locked():
|
||||
# Only attempt reconnecting if the user called connect and not
|
||||
@@ -453,6 +448,12 @@ class TelegramBareClient:
|
||||
await asyncio.sleep(retry + 1, loop=self._loop)
|
||||
return None
|
||||
|
||||
if init_connection:
|
||||
# We initialized the connection successfully, even if
|
||||
# a request had an RPC error we have invoked it fine.
|
||||
self.session.layer = LAYER
|
||||
self.session.save()
|
||||
|
||||
try:
|
||||
raise next(x.rpc_error for x in requests if x.rpc_error)
|
||||
except StopIteration:
|
||||
@@ -728,7 +729,8 @@ class TelegramBareClient:
|
||||
if need_reconnect:
|
||||
need_reconnect = False
|
||||
while self._user_connected and not await self._reconnect():
|
||||
await asyncio.sleep(0.1, loop=self._loop) # Retry forever, this is instant messaging
|
||||
# Retry forever, this is instant messaging
|
||||
await asyncio.sleep(0.1, loop=self._loop)
|
||||
|
||||
await self._sender.receive(update_state=self.updates)
|
||||
except TimeoutError:
|
||||
@@ -748,7 +750,8 @@ class TelegramBareClient:
|
||||
try:
|
||||
import socks
|
||||
if isinstance(error, (
|
||||
socks.GeneralProxyError, socks.ProxyConnectionError
|
||||
socks.GeneralProxyError,
|
||||
socks.ProxyConnectionError
|
||||
)):
|
||||
# This is a known error, and it's not related to
|
||||
# Telegram but rather to the proxy. Disconnect and
|
||||
@@ -764,6 +767,7 @@ class TelegramBareClient:
|
||||
# add a little sleep to avoid the CPU usage going mad.
|
||||
await asyncio.sleep(0.1, loop=self._loop)
|
||||
break
|
||||
|
||||
self._recv_loop = None
|
||||
|
||||
# endregion
|
||||
|
Reference in New Issue
Block a user