Complete migration guide from other bot libraries

This commit is contained in:
Lonami Exo
2023-10-01 13:37:28 +02:00
parent 18895748c4
commit 4df1f4537b
15 changed files with 521 additions and 49 deletions

View File

@@ -125,14 +125,20 @@ async def sign_in(
return await complete_login(self, result)
async def interactive_login(self: Client) -> User:
async def interactive_login(
self: Client,
phone_or_token: Optional[str] = None,
*,
password: Optional[str] = None,
) -> User:
if me := await self.get_me():
return me
phone_or_token = ""
while not re.match(r"\+?[\s()]*\d", phone_or_token):
print("Please enter your phone (+1 23...) or bot token (12:abcd...)")
phone_or_token = input(": ").strip()
if not phone_or_token:
phone_or_token = ""
while not re.match(r"\+?[\s()]*\d", phone_or_token):
print("Please enter your phone (+1 23...) or bot token (12:abcd...)")
phone_or_token = input(": ").strip()
# Bot flow
if re.match(r"\d+:", phone_or_token):
@@ -160,16 +166,21 @@ async def interactive_login(self: Client) -> User:
break
if isinstance(user_or_token, PasswordToken):
while True:
print("Please enter your password (prompt is hidden; type and press enter)")
password = getpass.getpass(": ")
try:
user = await self.check_password(user_or_token, password)
except RpcError as e:
if e.name.startswith("PASSWORD"):
print("Invalid password:", e)
else:
raise
if password:
user = await self.check_password(user_or_token, password)
else:
while True:
print(
"Please enter your password (prompt is hidden; type and press enter)"
)
password = getpass.getpass(": ")
try:
user = await self.check_password(user_or_token, password)
except RpcError as e:
if e.name.startswith("PASSWORD"):
print("Invalid password:", e)
else:
raise
else:
user = user_or_token

View File

@@ -168,6 +168,14 @@ class Client:
:param update_queue_limit:
Maximum amount of updates to keep in memory before dropping them.
:param check_all_handlers:
Whether to always check all event handlers or stop early.
The library will call event handlers in the order they were added.
By default, the library stops checking handlers as soon as a filter returns :data:`True`.
By setting ``check_all_handlers=True``, the library will keep calling handlers after the first match.
"""
def __init__(
@@ -175,6 +183,7 @@ class Client:
session: Optional[Union[str, Path, Storage]],
api_id: int,
api_hash: Optional[str] = None,
check_all_handlers: bool = False,
) -> None:
self._sender: Optional[Sender] = None
self._sender_lock = asyncio.Lock()
@@ -190,6 +199,7 @@ class Client:
api_id=api_id,
api_hash=api_hash or "",
)
self._message_box = MessageBox()
self._chat_hashes = ChatHashCache(None)
self._last_update_limit_warn: Optional[float] = None
@@ -197,10 +207,10 @@ class Client:
Tuple[abcs.Update, Dict[int, Chat]]
] = asyncio.Queue(maxsize=self._config.update_queue_limit or 0)
self._dispatcher: Optional[asyncio.Task[None]] = None
self._downloader_map = object()
self._handlers: Dict[
Type[Event], List[Tuple[Callable[[Any], Awaitable[Any]], Optional[Filter]]]
] = {}
self._shortcircuit_handlers = not check_all_handlers
if self_user := self._config.session.user:
self._dc_id = self_user.dc
@@ -795,11 +805,19 @@ class Client:
"""
return await inline_query(self, bot, query, chat=chat)
async def interactive_login(self) -> User:
async def interactive_login(
self, phone_or_token: Optional[str] = None, *, password: Optional[str] = None
) -> User:
"""
Begin an interactive login if needed.
If the account was already logged-in, this method simply returns :term:`yourself`.
:param phone_or_token:
Bypass the phone number or bot token prompt, and use this value instead.
:param password:
Bypass the 2FA password prompt, and use this value instead.
:return: The user corresponding to :term:`yourself`.
.. rubric:: Example
@@ -809,11 +827,14 @@ class Client:
me = await client.interactive_login()
print('Logged in as:', me.full_name)
# or, to make sure you're logged-in as a bot
await client.interactive_login('1234:ab56cd78ef90)
.. seealso::
In-depth explanation for :doc:`/basic/signing-in`.
"""
return await interactive_login(self)
return await interactive_login(self, phone_or_token, password=password)
async def is_authorized(self) -> bool:
"""
@@ -1210,7 +1231,7 @@ class Client:
async def send_message(
self,
chat: ChatLike,
text: Optional[str] = None,
text: Optional[Union[str, Message]] = None,
*,
markdown: Optional[str] = None,
html: Optional[str] = None,
@@ -1225,6 +1246,8 @@ class Client:
:param text:
Message text, with no formatting.
When given a :class:`Message` instance, a copy of the message will be sent.
:param text_markdown:
Message text, parsed as CommonMark.

View File

@@ -124,16 +124,21 @@ def extend_update_queue(
async def dispatcher(client: Client) -> None:
while client.connected:
update, chat_map = await client._updates.get()
for event_cls, handlers in client._handlers.items():
if event := event_cls._try_from_update(client, update, chat_map):
for handler, filter in handlers:
if not filter or filter(event):
try:
await handler(event)
except asyncio.CancelledError:
raise
except Exception:
# TODO proper logger
name = getattr(handler, "__name__", repr(handler))
logging.exception("Unhandled exception on %s", name)
try:
await dispatch_next(client)
except asyncio.CancelledError:
raise
except Exception:
# TODO proper logger
logging.exception("Unhandled exception in event handler")
async def dispatch_next(client: Client) -> None:
update, chat_map = await client._updates.get()
for event_cls, handlers in client._handlers.items():
if event := event_cls._try_from_update(client, update, chat_map):
for handler, filter in handlers:
if not filter or filter(event):
await handler(event)
if client._shortcircuit_handlers:
return

View File

@@ -15,6 +15,13 @@ class Event(metaclass=NoPublicConstructor):
The base type of all events.
"""
@property
def client(self) -> Client:
"""
The :class:`~telethon.Client` that received this update.
"""
return self._client
@classmethod
@abc.abstractmethod
def _try_from_update(

View File

@@ -1,6 +1,6 @@
from .combinators import All, Any, Not
from .common import Chats, Filter, Senders
from .messages import Command, Forward, Incoming, Outgoing, Reply, Text
from .messages import Command, Forward, Incoming, Media, Outgoing, Reply, Text, TextOnly
__all__ = [
"All",
@@ -12,7 +12,9 @@ __all__ = [
"Command",
"Forward",
"Incoming",
"Media",
"Outgoing",
"Reply",
"Text",
"TextOnly",
]

View File

@@ -1,7 +1,7 @@
from __future__ import annotations
import re
from typing import TYPE_CHECKING, Optional, Union
from typing import TYPE_CHECKING, Literal, Optional, Union
from ..event import Event
@@ -18,6 +18,9 @@ class Text:
The match, if any, is discarded. If you need to access captured groups,
you need to manually perform the check inside the handler instead.
Note that the caption text in messages with media is also searched.
If you want to filter based on media, use :class:`TextOnly` or :class:`Media`.
"""
__slots__ = ("_pattern",)
@@ -131,3 +134,50 @@ class Reply:
def __call__(self, event: Event) -> bool:
return getattr(event, "reply", None) is not None
class TextOnly:
"""
Filter by messages with some text and no media.
Note that link previews are only considered media if they have a photo or document.
"""
MediaTypes = Union[Literal["photo"], Literal["audio"], Literal["video"]]
class Media:
"""
Filter by the media type in the message.
By default, this filter will pass if the message has any media.
Note that link previews are only considered media if they have a photo or document.
When you specify one or more media types, *only* those types will be considered.
You can use literal strings or the constants defined by the filter.
"""
PHOTO = "photo"
AUDIO = "audio"
VIDEO = "video"
__slots__ = "_types"
def __init__(self, types: Optional[MediaTypes] = None) -> None:
self._types = types
@property
def types(self) -> MediaTypes:
"""
The media types being checked.
"""
return self._types
def __call__(self, event: Event) -> bool:
if self._types is None:
return getattr(event, "file", None) is not None
else:
return any(getattr(event, ty, None) is not None for ty in self._types)

View File

@@ -21,7 +21,7 @@ def del_surrogate(text: str) -> str:
def within_surrogate(text: str, index: int, *, length: Optional[int] = None) -> bool:
"""
`True` if ``index`` is within a surrogate (before and after it, not at!).
:data:`True` if ``index`` is within a surrogate (before and after it, not at!).
"""
if length is None:
length = len(text)

View File

@@ -15,11 +15,13 @@ from .._impl.client.events.filters import (
Filter,
Forward,
Incoming,
Media,
Not,
Outgoing,
Reply,
Senders,
Text,
TextOnly,
)
__all__ = [
@@ -30,9 +32,11 @@ __all__ = [
"Filter",
"Forward",
"Incoming",
"Media",
"Not",
"Outgoing",
"Reply",
"Senders",
"Text",
"TextOnly",
]