Create events.InlineQuery

This commit is contained in:
Lonami Exo 2018-07-15 11:31:14 +02:00
parent 5017a9d1da
commit 2d7c8908eb
8 changed files with 593 additions and 83 deletions

View File

@ -4,7 +4,7 @@ from .. import utils, events
class ButtonMethods(UpdateMethods): class ButtonMethods(UpdateMethods):
def _build_reply_markup(self, buttons): def _build_reply_markup(self, buttons, inline_only=False):
if buttons is None: if buttons is None:
return None return None
@ -45,7 +45,9 @@ class ButtonMethods(UpdateMethods):
if current: if current:
rows.append(types.KeyboardButtonRow(current)) rows.append(types.KeyboardButtonRow(current))
if is_inline == is_normal and is_normal: if inline_only and is_normal:
raise ValueError('You cannot use non-inline buttons here')
elif is_inline == is_normal and is_normal:
raise ValueError('You cannot mix inline with normal buttons') raise ValueError('You cannot mix inline with normal buttons')
elif is_inline: elif is_inline:
return types.ReplyInlineMarkup(rows) return types.ReplyInlineMarkup(rows)

View File

@ -13,13 +13,6 @@ from .buttons import ButtonMethods
from .. import utils, helpers from .. import utils, helpers
from ..tl import types, functions, custom from ..tl import types, functions, custom
try:
import hachoir
import hachoir.metadata
import hachoir.parser
except ImportError:
hachoir = None
__log__ = logging.getLogger(__name__) __log__ = logging.getLogger(__name__)
@ -224,8 +217,11 @@ class UploadMethods(ButtonMethods, MessageParseMethods, UserMethods):
caption, msg_entities = captions.pop() caption, msg_entities = captions.pop()
else: else:
caption, msg_entities = '', None caption, msg_entities = '', None
media.append(types.InputSingleMedia(types.InputMediaPhoto(fh), message=caption, media.append(types.InputSingleMedia(
entities=msg_entities)) types.InputMediaPhoto(fh),
message=caption,
entities=msg_entities
))
# Now we can construct the multi-media request # Now we can construct the multi-media request
result = await self(functions.messages.SendMultiMediaRequest( result = await self(functions.messages.SendMultiMediaRequest(
@ -420,75 +416,14 @@ class UploadMethods(ButtonMethods, MessageParseMethods, UserMethods):
elif as_image: elif as_image:
media = types.InputMediaUploadedPhoto(file_handle) media = types.InputMediaUploadedPhoto(file_handle)
else: else:
mime_type = None attributes, mime_type = utils.get_attributes(
if isinstance(file, str): file,
# Determine mime-type and attributes attributes=attributes,
# Take the first element by using [0] since it returns a tuple force_document=force_document,
mime_type = guess_type(file)[0] voice_note=voice_note,
attr_dict = { video_note=video_note
types.DocumentAttributeFilename:
types.DocumentAttributeFilename(
os.path.basename(file))
}
if utils.is_audio(file) and hachoir:
with hachoir.parser.createParser(file) as parser:
m = hachoir.metadata.extractMetadata(parser)
attr_dict[types.DocumentAttributeAudio] = \
types.DocumentAttributeAudio(
voice=voice_note,
title=m.get('title') if m.has(
'title') else None,
performer=m.get('author') if m.has(
'author') else None,
duration=int(m.get('duration').seconds
if m.has('duration') else 0)
) )
if not force_document and utils.is_video(file):
if hachoir:
with hachoir.parser.createParser(file) as parser:
m = hachoir.metadata.extractMetadata(parser)
doc = types.DocumentAttributeVideo(
round_message=video_note,
w=m.get('width') if m.has('width') else 0,
h=m.get('height') if m.has('height') else 0,
duration=int(m.get('duration').seconds
if m.has('duration') else 0)
)
else:
doc = types.DocumentAttributeVideo(
0, 1, 1, round_message=video_note)
attr_dict[types.DocumentAttributeVideo] = doc
else:
attr_dict = {
types.DocumentAttributeFilename:
types.DocumentAttributeFilename(
os.path.basename(
getattr(file, 'name',
None) or 'unnamed'))
}
if voice_note:
if types.DocumentAttributeAudio in attr_dict:
attr_dict[types.DocumentAttributeAudio].voice = True
else:
attr_dict[types.DocumentAttributeAudio] = \
types.DocumentAttributeAudio(0, voice=True)
# Now override the attributes if any. As we have a dict of
# {cls: instance}, we can override any class with the list
# of attributes provided by the user easily.
if attributes:
for a in attributes:
attr_dict[type(a)] = a
# Ensure we have a mime type, any; but it cannot be None
# 'The "octet-stream" subtype is used to indicate that a body
# contains arbitrary binary data.'
if not mime_type:
mime_type = 'application/octet-stream'
input_kw = {} input_kw = {}
if thumb: if thumb:
input_kw['thumb'] = await self.upload_file(thumb) input_kw['thumb'] = await self.upload_file(thumb)
@ -496,7 +431,7 @@ class UploadMethods(ButtonMethods, MessageParseMethods, UserMethods):
media = types.InputMediaUploadedDocument( media = types.InputMediaUploadedDocument(
file=file_handle, file=file_handle,
mime_type=mime_type, mime_type=mime_type,
attributes=list(attr_dict.values()), attributes=attributes,
**input_kw **input_kw
) )
return file_handle, media return file_handle, media

View File

@ -6,6 +6,7 @@ from .messageread import MessageRead
from .newmessage import NewMessage from .newmessage import NewMessage
from .userupdate import UserUpdate from .userupdate import UserUpdate
from .callbackquery import CallbackQuery from .callbackquery import CallbackQuery
from .inlinequery import InlineQuery
class StopPropagation(Exception): class StopPropagation(Exception):

View File

@ -43,8 +43,8 @@ class EventBuilder(abc.ABC):
Args: Args:
chats (`entity`, optional): chats (`entity`, optional):
May be one or more entities (username/peer/etc.). By default, May be one or more entities (username/peer/etc.), preferably IDs.
only matching chats will be handled. By default, only matching chats will be handled.
blacklist_chats (`bool`, optional): blacklist_chats (`bool`, optional):
Whether to treat the chats as a blacklist instead of Whether to treat the chats as a blacklist instead of

View File

@ -0,0 +1,190 @@
import inspect
import re
import asyncio
from .common import EventBuilder, EventCommon, name_inner_event
from .. import utils
from ..tl import types, functions, custom
from ..tl.custom.sendergetter import SenderGetter
@name_inner_event
class InlineQuery(EventBuilder):
"""
Represents an inline query event (when someone writes ``'@my_bot query'``).
Args:
users (`entity`, optional):
May be one or more entities (username/peer/etc.), preferably IDs.
By default, only inline queries from these users will be handled.
blacklist_users (`bool`, optional):
Whether to treat the users as a blacklist instead of
as a whitelist (default). This means that every chat
will be handled *except* those specified in ``users``
which will be ignored if ``blacklist_users=True``.
pattern (`str`, `callable`, `Pattern`, optional):
If set, only queries matching this pattern will be handled.
You can specify a regex-like string which will be matched
against the message, a callable function that returns ``True``
if a message is acceptable, or a compiled regex pattern.
"""
def __init__(self, users=None, *, blacklist_users=False, pattern=None):
super().__init__(chats=users, blacklist_chats=blacklist_users)
if isinstance(pattern, str):
self.pattern = re.compile(pattern).match
elif not pattern or callable(pattern):
self.pattern = pattern
elif hasattr(pattern, 'match') and callable(pattern.match):
self.pattern = pattern.match
else:
raise TypeError('Invalid pattern type given')
@staticmethod
def build(update):
if isinstance(update, types.UpdateBotInlineQuery):
event = InlineQuery.Event(update)
else:
return
event._entities = update._entities
return event
def filter(self, event):
if self.pattern:
match = self.pattern(event.text)
if not match:
return
event.pattern_match = match
return super().filter(event)
class Event(EventCommon, SenderGetter):
"""
Represents the event of a new callback query.
Members:
query (:tl:`UpdateBotCallbackQuery`):
The original :tl:`UpdateBotCallbackQuery`.
pattern_match (`obj`, optional):
The resulting object from calling the passed ``pattern``
function, which is ``re.compile(...).match`` by default.
"""
def __init__(self, query):
super().__init__(chat_peer=types.PeerUser(query.user_id))
self.query = query
self.pattern_match = None
self._answered = False
self._sender_id = query.user_id
self._input_sender = None
self._sender = None
@property
def id(self):
"""
Returns the unique identifier for the query ID.
"""
return self.query.query_id
@property
def text(self):
"""
Returns the text the user used to make the inline query.
"""
return self.query.query
@property
def offset(self):
"""
???
"""
return self.query.offset
@property
def geo(self):
"""
If the user location is requested when using inline mode
and the user's device is able to send it, this will return
the :tl:`GeoPoint` with the position of the user.
"""
return
@property
def builder(self):
"""
Returns a new `inline result builder
<telethon.tl.custom.inline.InlineBuilder>`.
"""
return custom.InlineBuilder(self._client)
async def answer(
self, results=None, cache_time=0, *,
gallery=False, private=False,
switch_pm=None, switch_pm_param=''):
"""
Answers the inline query with the given results.
Args:
results (`list`, optional):
A list of :tl:`InputBotInlineResult` to use.
You should use `builder` to create these:
.. code-block: python
builder = inline.builder
r1 = builder.article('Be nice', text='Have a nice day')
r2 = builder.article('Be bad', text="I don't like you")
await inline.answer([r1, r2])
cache_time (`int`, optional):
For how long this result should be cached on
the user's client. Defaults to 0 for no cache.
gallery (`bool`, optional):
Whether the results should show as a gallery (grid) or not.
private (`bool`, optional):
Whether the results should be cached by Telegram
(not private) or by the user's client (private).
switch_pm (`str`, optional):
If set, this text will be shown in the results
to allow the user to switch to private messages.
switch_pm_param (`str`, optional):
Optional parameter to start the bot with if
`switch_pm` was used.
"""
if self._answered:
return
results = [self._as_awaitable(x) for x in results]
done, _ = await asyncio.wait(results)
results = [x.result() for x in done]
if switch_pm:
switch_pm = types.InlineBotSwitchPM(switch_pm, switch_pm_param)
return await self._client(
functions.messages.SetInlineBotResultsRequest(
query_id=self.query.query_id,
results=results,
cache_time=cache_time,
gallery=gallery,
private=private,
switch_pm=switch_pm
)
)
@staticmethod
def _as_awaitable(obj):
if inspect.isawaitable(obj):
return obj
f = asyncio.Future()
f.set_result(obj)
return f

View File

@ -5,3 +5,4 @@ from .messagebutton import MessageButton
from .forward import Forward from .forward import Forward
from .message import Message from .message import Message
from .button import Button from .button import Button
from .inline import InlineBuilder

View File

@ -0,0 +1,302 @@
import hashlib
from .. import functions, types
from ... import utils
class InlineBuilder:
"""
Helper class to allow defining inline queries ``results``.
Common arguments to all methods are
explained here to avoid repetition:
text (`str`, optional):
If present, the user will send a text
message with this text upon being clicked.
link_preview (`bool`, optional):
Whether to show a link preview in the sent
text message or not.
geo (:tl:`InputGeoPoint`, :tl:`GeoPoint`,
:tl:`InputMediaVenue`, :tl:`MessageMediaVenue`,
optional):
If present, it may either be a geo point or a venue.
period (int, optional):
The period in seconds to be used for geo points.
contact (:tl:`InputMediaContact`, :tl:`MessageMediaContact`,
optional):
If present, it must be the contact information to send.
game (`bool`, optional):
May be ``True`` to indicate that the game will be sent.
buttons (`list`, `custom.Button <telethon.tl.custom.button.Button>`,
:tl:`KeyboardButton`, optional):
Same as ``buttons`` for `client.send_message
<telethon.client.messages.MessageMethods.send_message>`.
parse_mode (`str`, optional):
Same as ``parse_mode`` for `client.send_message
<telethon.client.messageparse.MessageParseMethods.parse_mode>`.
id (`str`, optional):
The string ID to use for this result. If not present, it
will be the SHA256 hexadecimal digest of converting the
request with empty ID to ``bytes()``, so that the ID will
be deterministic for the same input.
"""
def __init__(self, client):
self._client = client
async def article(
self, title, description=None,
*, url=None, thumb=None, content=None,
id=None, text=None, parse_mode=utils.Default, link_preview=True,
geo=None, period=60, contact=None, game=False, buttons=None,
):
"""
Creates new inline result of article type.
Args:
title (`str`):
The title to be shown for this result.
description (`str`, optional):
Further explanation of what this result means.
url (`str`, optional):
The URL to be shown for this result.
thumb (:tl:`InputWebDocument`, optional):
The thumbnail to be shown for this result.
For now it has to be a :tl:`InputWebDocument` if present.
content (:tl:`InputWebDocument`, optional):
The content to be shown for this result.
For now it has to be a :tl:`InputWebDocument` if present.
"""
# TODO Does 'article' work always?
# article, photo, gif, mpeg4_gif, video, audio,
# voice, document, location, venue, contact, game
result = types.InputBotInlineResult(
id=id or '',
type='article',
send_message=await self._message(
text=text, parse_mode=parse_mode, link_preview=link_preview,
geo=geo, period=period,
contact=contact,
game=game,
buttons=buttons
),
title=title,
description=description,
url=url,
thumb=thumb,
content=content
)
if id is None:
result.id = hashlib.sha256(bytes(result)).hexdigest()
return result
async def photo(
self, file, *, id=None,
text=None, parse_mode=utils.Default, link_preview=True,
geo=None, period=60, contact=None, game=False, buttons=None
):
"""
Creates a new inline result of photo type.
Args:
file (`obj`, optional):
Same as ``file`` for `client.send_file
<telethon.client.uploads.UploadMethods.send_file>`.
"""
fh = self._client.upload_file(file, use_cache=types.InputPhoto)
if not isinstance(fh, types.InputPhoto):
r = await self._client(functions.messages.UploadMediaRequest(
types.InputPeerEmpty(), media=types.InputMediaUploadedPhoto(fh)
))
fh = utils.get_input_photo(r.photo)
result = types.InputBotInlineResultPhoto(
id=id or '',
type='photo',
photo=fh,
send_message=await self._message(
text=text, parse_mode=parse_mode, link_preview=link_preview,
geo=geo, period=period,
contact=contact,
game=game,
buttons=buttons
)
)
if id is None:
result.id = hashlib.sha256(bytes(result)).hexdigest()
return result
async def document(
self, file, title=None, *, description=None, type=None,
mime_type=None, attributes=None, force_document=False,
voice_note=False, video_note=False, use_cache=True, id=None,
text=None, parse_mode=utils.Default, link_preview=True,
geo=None, period=60, contact=None, game=False, buttons=None
):
"""
Creates a new inline result of document type.
`use_cache`, `mime_type`, `attributes`, `force_document`,
`voice_note` and `video_note` are described in `client.send_file
<telethon.client.uploads.UploadMethods.send_file>`.
Args:
file (`obj`):
Same as ``file`` for `<client.send_file>
telethon.client.uploads.UploadMethods.send_file`.
title (`str`, optional):
The title to be shown for this result.
description (`str`, optional):
Further explanation of what this result means.
type (`str`, optional):
The type of the document. May be one of: photo, gif,
mpeg4_gif, video, audio, voice, document, sticker.
See "Type of the result" in https://core.telegram.org/bots/api.
"""
if type is None:
if voice_note:
type = 'voice'
else:
type = 'document'
use_cache = types.InputDocument if use_cache else None
fh = self._client.upload_file(file, use_cache=use_cache)
if not isinstance(fh, types.InputDocument):
attributes, mime_type = utils.get_attributes(
file,
mime_type=mime_type,
attributes=attributes,
force_document=force_document,
voice_note=voice_note,
video_note=video_note
)
r = await self._client(functions.messages.UploadMediaRequest(
types.InputPeerEmpty(), media=types.InputMediaUploadedDocument(
fh,
mime_type=mime_type,
attributes=attributes,
nosound_video=None,
thumb=None
)))
fh = utils.get_input_document(r.document)
result = types.InputBotInlineResultDocument(
id=id or '',
type=type,
document=fh,
send_message=await self._message(
text=text, parse_mode=parse_mode, link_preview=link_preview,
geo=geo, period=period,
contact=contact,
game=game,
buttons=buttons
),
title=title,
description=description
)
if id is None:
result.id = hashlib.sha256(bytes(result)).hexdigest()
return result
async def game(
self, short_name, *, id=None,
text=None, parse_mode=utils.Default, link_preview=True,
geo=None, period=60, contact=None, game=False, buttons=None
):
"""
Creates a new inline result of game type.
Args:
short_name (`str`):
The short name of the game to use.
"""
result = types.InputBotInlineResultGame(
id=id or '',
short_name=short_name,
send_message=await self._message(
text=text, parse_mode=parse_mode, link_preview=link_preview,
geo=geo, period=period,
contact=contact,
game=game,
buttons=buttons
)
)
if id is None:
result.id = hashlib.sha256(bytes(result)).hexdigest()
return result
async def _message(
self, *,
text=None, parse_mode=utils.Default, link_preview=True,
geo=None, period=60, contact=None, game=False, buttons=None
):
if sum(1 for x in (text, geo, contact, game) if x) != 1:
raise ValueError('Can only use one of text, geo, contact or game')
markup = self._client._build_reply_markup(buttons, inline_only=True)
if text:
text, msg_entities = await self._client._parse_message_text(
text, parse_mode
)
return types.InputBotInlineMessageText(
message=text,
no_webpage=not link_preview,
entities=msg_entities,
reply_markup=markup
)
elif isinstance(geo, (types.InputGeoPoint, types.GeoPoint)):
return types.InputBotInlineMessageMediaGeo(
geo_point=utils.get_input_geo(geo),
period=period,
reply_markup=markup
)
elif isinstance(geo, (types.InputMediaVenue, types.MessageMediaVenue)):
if isinstance(geo, types.InputMediaVenue):
geo_point = geo.geo_point
else:
geo_point = geo.geo
return types.InputBotInlineMessageMediaVenue(
geo_point=geo_point,
title=geo.title,
address=geo.address,
provider=geo.provider,
venue_id=geo.venue_id,
venue_type=geo.venue_type,
reply_markup=markup
)
elif isinstance(contact, (
types.InputMediaContact, types.MessageMediaContact)):
return types.InputBotInlineMessageMediaContact(
phone_number=contact.phone_number,
first_name=contact.first_name,
last_name=contact.last_name,
vcard=contact.vcard,
reply_markup=markup
)
elif game:
return types.InputBotInlineMessageGame(
reply_markup=markup
)
else:
raise ValueError('No text, game or valid geo or contact given')

View File

@ -29,10 +29,18 @@ from .tl.types import (
FileLocationUnavailable, InputMediaUploadedDocument, ChannelFull, FileLocationUnavailable, InputMediaUploadedDocument, ChannelFull,
InputMediaUploadedPhoto, DocumentAttributeFilename, photos, InputMediaUploadedPhoto, DocumentAttributeFilename, photos,
TopPeer, InputNotifyPeer, InputMessageID, InputFileLocation, TopPeer, InputNotifyPeer, InputMessageID, InputFileLocation,
InputDocumentFileLocation, PhotoSizeEmpty, InputDialogPeer InputDocumentFileLocation, PhotoSizeEmpty, InputDialogPeer,
DocumentAttributeAudio, DocumentAttributeVideo
) )
from .tl.types.contacts import ResolvedPeer from .tl.types.contacts import ResolvedPeer
try:
import hachoir
import hachoir.metadata
import hachoir.parser
except ImportError:
hachoir = None
USERNAME_RE = re.compile( USERNAME_RE = re.compile(
r'@|(?:https?://)?(?:www\.)?(?:telegram\.(?:me|dog)|t\.me)/(joinchat/)?' r'@|(?:https?://)?(?:www\.)?(?:telegram\.(?:me|dog)|t\.me)/(joinchat/)?'
) )
@ -424,6 +432,77 @@ def get_message_id(message):
raise TypeError('Invalid message type: {}'.format(type(message))) raise TypeError('Invalid message type: {}'.format(type(message)))
def get_attributes(file, *, attributes=None, mime_type=None,
force_document=False, voice_note=False, video_note=False):
"""
Get a list of attributes for the given file and
the mime type as a tuple ([attribute], mime_type).
"""
if isinstance(file, str):
# Determine mime-type and attributes
# Take the first element by using [0] since it returns a tuple
if mime_type is None:
mime_type = mimetypes.guess_type(file)[0]
attr_dict = {DocumentAttributeFilename:
DocumentAttributeFilename(os.path.basename(file))}
if is_audio(file) and hachoir is not None:
with hachoir.parser.createParser(file) as parser:
m = hachoir.metadata.extractMetadata(parser)
attr_dict[DocumentAttributeAudio] = \
DocumentAttributeAudio(
voice=voice_note,
title=m.get('title') if m.has('title') else None,
performer=m.get('author') if m.has('author') else None,
duration=int(m.get('duration').seconds
if m.has('duration') else 0)
)
if not force_document and is_video(file):
if hachoir:
with hachoir.parser.createParser(file) as parser:
m = hachoir.metadata.extractMetadata(parser)
doc = DocumentAttributeVideo(
round_message=video_note,
w=m.get('width') if m.has('width') else 0,
h=m.get('height') if m.has('height') else 0,
duration=int(m.get('duration').seconds
if m.has('duration') else 0)
)
else:
doc = DocumentAttributeVideo(
0, 1, 1, round_message=video_note)
attr_dict[DocumentAttributeVideo] = doc
else:
attr_dict = {DocumentAttributeFilename:
DocumentAttributeFilename(
os.path.basename(getattr(file, 'name', None) or 'unnamed'))}
if voice_note:
if DocumentAttributeAudio in attr_dict:
attr_dict[DocumentAttributeAudio].voice = True
else:
attr_dict[DocumentAttributeAudio] = \
DocumentAttributeAudio(0, voice=True)
# Now override the attributes if any. As we have a dict of
# {cls: instance}, we can override any class with the list
# of attributes provided by the user easily.
if attributes:
for a in attributes:
attr_dict[type(a)] = a
# Ensure we have a mime type, any; but it cannot be None
# 'The "octet-stream" subtype is used to indicate that a body
# contains arbitrary binary data.'
if not mime_type:
mime_type = 'application/octet-stream'
return list(attr_dict.values()), mime_type
def sanitize_parse_mode(mode): def sanitize_parse_mode(mode):
""" """
Converts the given parse mode into an object with Converts the given parse mode into an object with