Configurable per-client request/connection retries

This commit is contained in:
Lonami Exo 2018-06-18 18:11:16 +02:00
parent 09ea1179ca
commit ebfe8ebf40
3 changed files with 54 additions and 13 deletions

View File

@ -2,6 +2,7 @@ import abc
import asyncio import asyncio
import logging import logging
import platform import platform
import sys
import time import time
import warnings import warnings
from datetime import timedelta, datetime from datetime import timedelta, datetime
@ -66,8 +67,33 @@ class TelegramBaseClient(abc.ABC):
See https://github.com/Anorov/PySocks#usage-1 for more. See https://github.com/Anorov/PySocks#usage-1 for more.
timeout (`int` | `float` | `timedelta`, optional): timeout (`int` | `float` | `timedelta`, optional):
The timeout to be used when receiving responses from The timeout to be used when connecting, sending and receiving
the network. Defaults to 5 seconds. responses from the network. This is **not** the timeout to
be used when ``await``'ing for invoked requests, and you
should use ``asyncio.wait`` or ``asyncio.wait_for`` for that.
request_retries (`int`, optional):
How many times a request should be retried. Request are retried
when Telegram is having internal issues (due to either
``errors.ServerError`` or ``errors.RpcCallFailError``),
when there is a ``errors.FloodWaitError`` less than
``session.flood_sleep_threshold``, or when there's a
migrate error.
May set to a false-y value (``0`` or ``None``) for infinite
retries, but this is not recommended, since some requests can
always trigger a call fail (such as searching for messages).
connection_retries (`int`, optional):
How many times the reconnection should retry, either on the
initial connection or when Telegram disconnects us. May be
set to a false-y value (``0`` or ``None``) for infinite
retries, but this is not recommended, since the program can
get stuck in an infinite loop.
auto_reconnect (`bool`, optional):
Whether reconnection should be retried `connection_retries`
times automatically if Telegram disconnects us or not.
report_errors (`bool`, optional): report_errors (`bool`, optional):
Whether to report RPC errors or not. Defaults to ``True``, Whether to report RPC errors or not. Defaults to ``True``,
@ -109,6 +135,9 @@ class TelegramBaseClient(abc.ABC):
use_ipv6=False, use_ipv6=False,
proxy=None, proxy=None,
timeout=timedelta(seconds=10), timeout=timedelta(seconds=10),
request_retries=5,
connection_retries=5,
auto_reconnect=True,
report_errors=True, report_errors=True,
device_model=None, device_model=None,
system_version=None, system_version=None,
@ -116,7 +145,6 @@ class TelegramBaseClient(abc.ABC):
lang_code='en', lang_code='en',
system_lang_code='en', system_lang_code='en',
loop=None): loop=None):
"""Refer to TelegramClient.__init__ for docs on this method"""
if not api_id or not api_hash: if not api_id or not api_hash:
raise ValueError( raise ValueError(
"Your API ID or Hash cannot be empty or None. " "Your API ID or Hash cannot be empty or None. "
@ -147,6 +175,10 @@ class TelegramBaseClient(abc.ABC):
self.api_id = int(api_id) self.api_id = int(api_id)
self.api_hash = api_hash self.api_hash = api_hash
self._request_retries = request_retries or sys.maxsize
self._connection_retries = connection_retries or sys.maxsize
self._auto_reconnect = auto_reconnect
if isinstance(connection, type): if isinstance(connection, type):
connection = connection( connection = connection(
proxy=proxy, timeout=timeout, loop=self._loop) proxy=proxy, timeout=timeout, loop=self._loop)
@ -171,6 +203,8 @@ class TelegramBaseClient(abc.ABC):
self._connection = connection self._connection = connection
self._sender = MTProtoSender( self._sender = MTProtoSender(
state, connection, self._loop, state, connection, self._loop,
retries=self._connection_retries,
auto_reconnect=self._auto_reconnect,
update_callback=self._handle_update update_callback=self._handle_update
) )
@ -361,7 +395,7 @@ class TelegramBaseClient(abc.ABC):
# region Invoking Telegram requests # region Invoking Telegram requests
@abc.abstractmethod @abc.abstractmethod
def __call__(self, request, retries=5, ordered=False): def __call__(self, request, ordered=False):
""" """
Invokes (sends) one or more MTProtoRequests and returns (receives) Invokes (sends) one or more MTProtoRequests and returns (receives)
their result. their result.

View File

@ -12,14 +12,14 @@ _NOT_A_REQUEST = TypeError('You can only invoke requests, not types!')
class UserMethods(TelegramBaseClient): class UserMethods(TelegramBaseClient):
async def __call__(self, request, retries=5, ordered=False): async def __call__(self, request, ordered=False):
for r in (request if utils.is_list_like(request) else (request,)): for r in (request if utils.is_list_like(request) else (request,)):
if not isinstance(r, TLRequest): if not isinstance(r, TLRequest):
raise _NOT_A_REQUEST raise _NOT_A_REQUEST
await r.resolve(self, utils) await r.resolve(self, utils)
self._last_request = time.time() self._last_request = time.time()
for _ in range(retries): for _ in range(self._request_retries):
try: try:
future = self._sender.send(request, ordered=ordered) future = self._sender.send(request, ordered=ordered)
if isinstance(future, list): if isinstance(future, list):
@ -40,6 +40,7 @@ class UserMethods(TelegramBaseClient):
raise raise
except (errors.PhoneMigrateError, errors.NetworkMigrateError, except (errors.PhoneMigrateError, errors.NetworkMigrateError,
errors.UserMigrateError) as e: errors.UserMigrateError) as e:
__log__.info('Phone migrated to %d', e.new_dc)
should_raise = isinstance(e, ( should_raise = isinstance(e, (
errors.PhoneMigrateError, errors.NetworkMigrateError errors.PhoneMigrateError, errors.NetworkMigrateError
)) ))

View File

@ -46,13 +46,14 @@ class MTProtoSender:
key exists yet. key exists yet.
""" """
def __init__(self, state, connection, loop, *, def __init__(self, state, connection, loop, *,
retries=5, update_callback=None): retries=5, auto_reconnect=True, update_callback=None):
self.state = state self.state = state
self._connection = connection self._connection = connection
self._loop = loop self._loop = loop
self._ip = None self._ip = None
self._port = None self._port = None
self._retries = retries self._retries = retries
self._auto_reconnect = auto_reconnect
self._update_callback = update_callback self._update_callback = update_callback
# Whether the user has explicitly connected or disconnected. # Whether the user has explicitly connected or disconnected.
@ -287,12 +288,17 @@ class MTProtoSender:
await self._connection.close() await self._connection.close()
self._reconnecting = False self._reconnecting = False
retries = self._retries if self._auto_reconnect else 0
for retry in range(1, retries + 1):
try: try:
await self._connect() await self._connect()
except ConnectionError as e: break
__log__.error('Failed to reconnect automatically, ' except ConnectionError:
'disconnecting with error {}'.format(e)) __log__.info('Failed reconnection retry %d/%d', retry, retries)
await self._disconnect(error=e) else:
__log__.error('Failed to reconnect automatically.')
await self._disconnect(error=ConnectionError())
def _clean_containers(self, msg_ids): def _clean_containers(self, msg_ids):
""" """