Completely overhaul errors to be generated dynamically

This commit is contained in:
Lonami Exo
2021-09-24 20:07:34 +02:00
parent cfe47a0434
commit debde6e856
26 changed files with 345 additions and 318 deletions

View File

@@ -1,46 +1,48 @@
"""
This module holds all the base and automatically generated errors that the
Telegram API has. See telethon_generator/errors.json for more.
"""
import re
import sys
from .common import (
ReadCancelledError, TypeNotFoundError, InvalidChecksumError,
InvalidBufferError, SecurityError, CdnFileTamperedError,
BadMessageError, MultiError
from ._custom import (
ReadCancelledError,
TypeNotFoundError,
InvalidChecksumError,
InvalidBufferError,
SecurityError,
CdnFileTamperedError,
BadMessageError,
MultiError,
)
from ._rpcbase import (
RpcError,
InvalidDcError,
BadRequestError,
UnauthorizedError,
ForbiddenError,
NotFoundError,
AuthKeyError,
FloodError,
ServerError,
BotTimeout,
TimedOutError,
_mk_error_type
)
# This imports the base errors too, as they're imported there
from .rpcbaseerrors import *
from .rpcerrorlist import *
if sys.version_info < (3, 7):
# https://stackoverflow.com/a/7668273/
class _TelethonErrors:
def __init__(self, _mk_error_type, everything):
self._mk_error_type = _mk_error_type
self.__dict__.update({
k: v
for k, v in everything.items()
if isinstance(v, type) and issubclass(v, Exception)
})
def __getattr__(self, name):
return self._mk_error_type(name=name)
def rpc_message_to_error(rpc_error, request):
"""
Converts a Telegram's RPC Error to a Python error.
sys.modules[__name__] = _TelethonErrors(_mk_error_type, globals())
else:
# https://www.python.org/dev/peps/pep-0562/
def __getattr__(name):
return _mk_error_type(name=name)
:param rpc_error: the RpcError instance.
:param request: the request that caused this error.
:return: the RPCError as a Python exception that represents this error.
"""
# Try to get the error by direct look-up, otherwise regex
# Case-insensitive, for things like "timeout" which don't conform.
cls = rpc_errors_dict.get(rpc_error.error_message.upper(), None)
if cls:
return cls(request=request)
for msg_regex, cls in rpc_errors_re:
m = re.match(msg_regex, rpc_error.error_message)
if m:
capture = int(m.group(1)) if m.groups() else None
return cls(request=request, capture=capture)
# Some errors are negative:
# * -500 for "No workers running",
# * -503 for "Timeout"
#
# We treat them as if they were positive, so -500 will be treated
# as a `ServerError`, etc.
cls = base_errors.get(abs(rpc_error.error_code), RPCError)
return cls(request=request, message=rpc_error.error_message,
code=rpc_error.error_code)
del sys