Merge branch 'v2'

v2 is still not complete. A lot of cleanup still needs to be done.
In particular, entities still need some care. However, most of it
is there, and keeping up with two branches is annoying.
This also lets me close a lot of issues to reduce noise
and focus on the important ones.

Closes #354 (input entities have been reworked).
Closes #902 (sessions were overhauled).
Closes #1125, #3253, #1589, #1634, #3150, #3668 (updates are reworked, gaps are properly handled now).
Closes #1169 (2.0 is now merged).
Closes #1311 (proper usage should not trigger this issue on the reworked connection code).
Closes #1327 (there have been some stringify changes).
Closes #1330 (gaps are now detected).
Closes #1366 (sessions are now async).
Closes #1476, #1484 (asyncio open connection is no longer used).
Closes #1529 (commonmark is now used).
Closes #1721 (update gaps are now properly handled).
Closes #1724 (a gap that fixes this will eventually trigger).
Closes #3006 (force_sms is gone).
Closes #3041 (a clean implementation to get difference now exists).
Closes #3049 (commonmark is now used).
Closes #3111 (to_dict has changed).
Closes #3117 (SMS is no longer an option).
Closes #3171 (connectivity bug is unlikely to be a bug in the library).
Closes #3206 (Telethon cannot really fix broken SSL).
Closes #3214, #3257, #3661 (not enough information).
Closes #3215 (this had already been fixed).
Closes #3230, #3674 (entities were reworked).
Closes #3234, #3238, #3245, #3258, #3264 (the layer has been updated).
Closes #3242 (bot-API file IDs have been removed).
Closes #3244 (the error is now documented).
Closes #3249 (errors have been reworked).
This commit is contained in:
Lonami Exo
2022-01-24 13:24:35 +01:00
172 changed files with 13073 additions and 14097 deletions

View File

@@ -0,0 +1,5 @@
"""
Several extensions Python is missing, such as a proper class to handle a TCP
communication with support for cancelling the operation, and a utility class
to read arbitrary binary data in a more comfortable way, with int/strings/etc.
"""

View File

@@ -0,0 +1,185 @@
"""
This module contains the BinaryReader utility class.
"""
import os
import time
from datetime import datetime, timezone, timedelta
from io import BytesIO
from struct import unpack
from ..errors._custom import TypeNotFoundError
from .. import _tl
from ..types import _core
_EPOCH_NAIVE = datetime(*time.gmtime(0)[:6])
_EPOCH = _EPOCH_NAIVE.replace(tzinfo=timezone.utc)
class BinaryReader:
"""
Small utility class to read binary data.
"""
def __init__(self, data):
self.stream = BytesIO(data)
self._last = None # Should come in handy to spot -404 errors
# region Reading
# "All numbers are written as little endian."
# https://core.telegram.org/mtproto
def read_byte(self):
"""Reads a single byte value."""
return self.read(1)[0]
def read_int(self, signed=True):
"""Reads an integer (4 bytes) value."""
return int.from_bytes(self.read(4), byteorder='little', signed=signed)
def read_long(self, signed=True):
"""Reads a long integer (8 bytes) value."""
return int.from_bytes(self.read(8), byteorder='little', signed=signed)
def read_float(self):
"""Reads a real floating point (4 bytes) value."""
return unpack('<f', self.read(4))[0]
def read_double(self):
"""Reads a real floating point (8 bytes) value."""
return unpack('<d', self.read(8))[0]
def read_large_int(self, bits, signed=True):
"""Reads a n-bits long integer value."""
return int.from_bytes(
self.read(bits // 8), byteorder='little', signed=signed)
def read(self, length=-1):
"""Read the given amount of bytes, or -1 to read all remaining."""
result = self.stream.read(length)
if (length >= 0) and (len(result) != length):
raise BufferError(
'No more data left to read (need {}, got {}: {}); last read {}'
.format(length, len(result), repr(result), repr(self._last))
)
self._last = result
return result
def get_bytes(self):
"""Gets the byte array representing the current buffer as a whole."""
return self.stream.getvalue()
# endregion
# region Telegram custom reading
def tgread_bytes(self):
"""
Reads a Telegram-encoded byte array, without the need of
specifying its length.
"""
first_byte = self.read_byte()
if first_byte == 254:
length = self.read_byte() | (self.read_byte() << 8) | (
self.read_byte() << 16)
padding = length % 4
else:
length = first_byte
padding = (length + 1) % 4
data = self.read(length)
if padding > 0:
padding = 4 - padding
self.read(padding)
return data
def tgread_string(self):
"""Reads a Telegram-encoded string."""
return str(self.tgread_bytes(), encoding='utf-8', errors='replace')
def tgread_bool(self):
"""Reads a Telegram boolean value."""
value = self.read_int(signed=False)
if value == 0x997275b5: # boolTrue
return True
elif value == 0xbc799737: # boolFalse
return False
else:
raise RuntimeError('Invalid boolean code {}'.format(hex(value)))
def tgread_date(self):
"""Reads and converts Unix time (used by Telegram)
into a Python datetime object.
"""
value = self.read_int()
return _EPOCH + timedelta(seconds=value)
def tgread_object(self):
"""Reads a Telegram object."""
constructor_id = self.read_int(signed=False)
clazz = _tl.tlobjects.get(constructor_id, None)
if clazz is None:
# The class was None, but there's still a
# chance of it being a manually parsed value like bool!
value = constructor_id
if value == 0x997275b5: # boolTrue
return True
elif value == 0xbc799737: # boolFalse
return False
elif value == 0x1cb5c415: # Vector
return [self.tgread_object() for _ in range(self.read_int())]
clazz = _core.core_objects.get(constructor_id, None)
if clazz is None:
# If there was still no luck, give up
self.seek(-4) # Go back
pos = self.tell_position()
error = TypeNotFoundError(constructor_id, self.read())
self.set_position(pos)
raise error
return clazz.from_reader(self)
def tgread_vector(self):
"""Reads a vector (a list) of Telegram objects."""
if 0x1cb5c415 != self.read_int(signed=False):
raise RuntimeError('Invalid constructor code, vector was expected')
count = self.read_int()
return [self.tgread_object() for _ in range(count)]
# endregion
def close(self):
"""Closes the reader, freeing the BytesIO stream."""
self.stream.close()
# region Position related
def tell_position(self):
"""Tells the current position on the stream."""
return self.stream.tell()
def set_position(self, position):
"""Sets the current position on the stream."""
self.stream.seek(position)
def seek(self, offset):
"""
Seeks the stream position given an offset from the current position.
The offset may be negative.
"""
self.stream.seek(offset, os.SEEK_CUR)
# endregion
# region with block
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
# endregion

131
telethon/_misc/enums.py Normal file
View File

@@ -0,0 +1,131 @@
from enum import Enum
def _impl_op(which):
def op(self, other):
if not isinstance(other, type(self)):
return NotImplemented
return getattr(self._val(), which)(other._val())
return op
class ConnectionMode(Enum):
FULL = 'full'
INTERMEDIATE = 'intermediate'
ABRIDGED = 'abridged'
class Participant(Enum):
ADMIN = 'admin'
BOT = 'bot'
KICKED = 'kicked'
BANNED = 'banned'
CONTACT = 'contact'
class Action(Enum):
TYPING = 'typing'
CONTACT = 'contact'
GAME = 'game'
LOCATION = 'location'
STICKER = 'sticker'
RECORD_AUDIO = 'record-audio'
RECORD_VOICE = RECORD_AUDIO
RECORD_ROUND = 'record-round'
RECORD_VIDEO = 'record-video'
AUDIO = 'audio'
VOICE = AUDIO
SONG = AUDIO
ROUND = 'round'
VIDEO = 'video'
PHOTO = 'photo'
DOCUMENT = 'document'
FILE = DOCUMENT
CANCEL = 'cancel'
class Size(Enum):
"""
See https://core.telegram.org/api/files#image-thumbnail-types.
* ``'s'``. The image fits within a box of 100x100.
* ``'m'``. The image fits within a box of 320x320.
* ``'x'``. The image fits within a box of 800x800.
* ``'y'``. The image fits within a box of 1280x1280.
* ``'w'``. The image fits within a box of 2560x2560.
* ``'a'``. The image was cropped to be at most 160x160.
* ``'b'``. The image was cropped to be at most 320x320.
* ``'c'``. The image was cropped to be at most 640x640.
* ``'d'``. The image was cropped to be at most 1280x1280.
* ``'i'``. The image comes inline (no need to download anything).
* ``'j'``. Only the image outline is present (for stickers).
* ``'u'``. The image is actually a short MPEG4 animated video.
* ``'v'``. The image is actually a short MPEG4 video preview.
The sorting order is first dimensions, then ``cropped < boxed < video < other``.
"""
SMALL = 's'
MEDIUM = 'm'
LARGE = 'x'
EXTRA_LARGE = 'y'
ORIGINAL = 'w'
CROPPED_SMALL = 'a'
CROPPED_MEDIUM = 'b'
CROPPED_LARGE = 'c'
CROPPED_EXTRA_LARGE = 'd'
INLINE = 'i'
OUTLINE = 'j'
ANIMATED = 'u'
VIDEO = 'v'
def __hash__(self):
return object.__hash__(self)
__sub__ = _impl_op('__sub__')
__lt__ = _impl_op('__lt__')
__le__ = _impl_op('__le__')
__eq__ = _impl_op('__eq__')
__ne__ = _impl_op('__ne__')
__gt__ = _impl_op('__gt__')
__ge__ = _impl_op('__ge__')
def _val(self):
return self._category() * 100 + self._size()
def _category(self):
return {
Size.SMALL: 2,
Size.MEDIUM: 2,
Size.LARGE: 2,
Size.EXTRA_LARGE: 2,
Size.ORIGINAL: 2,
Size.CROPPED_SMALL: 1,
Size.CROPPED_MEDIUM: 1,
Size.CROPPED_LARGE: 1,
Size.CROPPED_EXTRA_LARGE: 1,
Size.INLINE: 4,
Size.OUTLINE: 5,
Size.ANIMATED: 3,
Size.VIDEO: 3,
}[self]
def _size(self):
return {
Size.SMALL: 1,
Size.MEDIUM: 3,
Size.LARGE: 5,
Size.EXTRA_LARGE: 6,
Size.ORIGINAL: 7,
Size.CROPPED_SMALL: 2,
Size.CROPPED_MEDIUM: 3,
Size.CROPPED_LARGE: 4,
Size.CROPPED_EXTRA_LARGE: 6,
# 0, since they're not the original photo at all
Size.INLINE: 0,
Size.OUTLINE: 0,
# same size as original or extra large (videos are large)
Size.ANIMATED: 7,
Size.VIDEO: 6,
}[self]

418
telethon/_misc/helpers.py Normal file
View File

@@ -0,0 +1,418 @@
"""Various helpers not related to the Telegram API itself"""
import asyncio
import io
import enum
import os
import struct
import inspect
import logging
import functools
from pathlib import Path
from hashlib import sha1
class _EntityType(enum.Enum):
USER = 0
CHAT = 1
CHANNEL = 2
_log = logging.getLogger(__name__)
# region Multiple utilities
def generate_random_long(signed=True):
"""Generates a random long integer (8 bytes), which is optionally signed"""
return int.from_bytes(os.urandom(8), signed=signed, byteorder='little')
def ensure_parent_dir_exists(file_path):
"""Ensures that the parent directory exists"""
parent = os.path.dirname(file_path)
if parent:
os.makedirs(parent, exist_ok=True)
def add_surrogate(text):
return ''.join(
# SMP -> Surrogate Pairs (Telegram offsets are calculated with these).
# See https://en.wikipedia.org/wiki/Plane_(Unicode)#Overview for more.
''.join(chr(y) for y in struct.unpack('<HH', x.encode('utf-16le')))
if (0x10000 <= ord(x) <= 0x10FFFF) else x for x in text
)
def del_surrogate(text):
return text.encode('utf-16', 'surrogatepass').decode('utf-16')
def within_surrogate(text, index, *, length=None):
"""
`True` if ``index`` is within a surrogate (before and after it, not at!).
"""
if length is None:
length = len(text)
return (
1 < index < len(text) and # in bounds
'\ud800' <= text[index - 1] <= '\udfff' and # previous is
'\ud800' <= text[index] <= '\udfff' # current is
)
def strip_text(text, entities):
"""
Strips whitespace from the given text modifying the provided entities.
This assumes that there are no overlapping entities, that their length
is greater or equal to one, and that their length is not out of bounds.
"""
if not entities:
return text.strip()
while text and text[-1].isspace():
e = entities[-1]
if e.offset + e.length == len(text):
if e.length == 1:
del entities[-1]
if not entities:
return text.strip()
else:
e.length -= 1
text = text[:-1]
while text and text[0].isspace():
for i in reversed(range(len(entities))):
e = entities[i]
if e.offset != 0:
e.offset -= 1
continue
if e.length == 1:
del entities[0]
if not entities:
return text.lstrip()
else:
e.length -= 1
text = text[1:]
return text
def retry_range(retries):
"""
Generates an integer sequence starting from 1, always returning once, and adding the given retries.
"""
return range(1, max(retries, 0) + 2)
async def _maybe_await(value):
if inspect.isawaitable(value):
return await value
else:
return value
async def _cancel(log, **tasks):
"""
Helper to cancel one or more tasks gracefully, logging exceptions.
"""
for name, task in tasks.items():
if not task:
continue
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
except RuntimeError:
# Probably: RuntimeError: await wasn't used with future
#
# See: https://github.com/python/cpython/blob/12d3061c7819a73d891dcce44327410eaf0e1bc2/Lib/asyncio/futures.py#L265
#
# Happens with _asyncio.Task instances (in "Task cancelling" state)
# trying to SIGINT the program right during initial connection, on
# _recv_loop coroutine (but we're creating its task explicitly with
# a loop, so how can it bug out like this?).
#
# Since we're aware of this error there's no point in logging it.
# *May* be https://bugs.python.org/issue37172
pass
except AssertionError as e:
# In Python 3.6, the above RuntimeError is an AssertionError
# See https://github.com/python/cpython/blob/7df32f844efed33ca781a016017eab7050263b90/Lib/asyncio/futures.py#L328
if e.args != ("yield from wasn't used with future",):
log.exception('Unhandled exception from %s after cancelling '
'%s (%s)', name, type(task), task)
except Exception:
log.exception('Unhandled exception from %s after cancelling '
'%s (%s)', name, type(task), task)
def _entity_type(entity):
# This could be a `utils` method that just ran a few `isinstance` on
# `utils.get_peer(...)`'s result. However, there are *a lot* of auto
# casts going on, plenty of calls and temporary short-lived objects.
#
# So we just check if a string is in the class name.
# Still, assert that it's the right type to not return false results.
try:
if entity.SUBCLASS_OF_ID not in (
0x2d45687, # crc32(b'Peer')
0xc91c90b6, # crc32(b'InputPeer')
0xe669bf46, # crc32(b'InputUser')
0x40f202fd, # crc32(b'InputChannel')
0x2da17977, # crc32(b'User')
0xc5af5d94, # crc32(b'Chat')
0x1f4661b9, # crc32(b'UserFull')
0xd49a2697, # crc32(b'ChatFull')
):
raise TypeError('{} does not have any entity type'.format(entity))
except AttributeError:
raise TypeError('{} is not a TLObject, cannot determine entity type'.format(entity))
name = entity.__class__.__name__
if 'User' in name:
return _EntityType.USER
elif 'Chat' in name:
return _EntityType.CHAT
elif 'Channel' in name:
return _EntityType.CHANNEL
elif 'Self' in name:
return _EntityType.USER
# 'Empty' in name or not found, we don't care, not a valid entity.
raise TypeError('{} does not have any entity type'.format(entity))
def pretty_print(obj, indent=None, max_depth=float('inf')):
max_depth -= 1
if max_depth < 0:
return '...'
to_d = getattr(obj, '_to_dict', None) or getattr(obj, 'to_dict', None)
if callable(to_d):
obj = to_d()
if indent is None:
if isinstance(obj, dict):
return '{}({})'.format(obj.get('_', 'dict'), ', '.join(
'{}={}'.format(k, pretty_print(v, indent, max_depth))
for k, v in obj.items() if k != '_'
))
elif isinstance(obj, str) or isinstance(obj, bytes):
return repr(obj)
elif hasattr(obj, '__iter__'):
return '[{}]'.format(
', '.join(pretty_print(x, indent, max_depth) for x in obj)
)
else:
return repr(obj)
else:
result = []
if isinstance(obj, dict):
result.append(obj.get('_', 'dict'))
result.append('(')
if obj:
result.append('\n')
indent += 1
for k, v in obj.items():
if k == '_':
continue
result.append('\t' * indent)
result.append(k)
result.append('=')
result.append(pretty_print(v, indent, max_depth))
result.append(',\n')
result.pop() # last ',\n'
indent -= 1
result.append('\n')
result.append('\t' * indent)
result.append(')')
elif isinstance(obj, str) or isinstance(obj, bytes):
result.append(repr(obj))
elif hasattr(obj, '__iter__'):
result.append('[\n')
indent += 1
for x in obj:
result.append('\t' * indent)
result.append(pretty_print(x, indent, max_depth))
result.append(',\n')
indent -= 1
result.append('\t' * indent)
result.append(']')
else:
result.append(repr(obj))
return ''.join(result)
# endregion
# region Cryptographic related utils
def generate_key_data_from_nonce(server_nonce, new_nonce):
"""Generates the key data corresponding to the given nonce"""
server_nonce = server_nonce.to_bytes(16, 'little', signed=True)
new_nonce = new_nonce.to_bytes(32, 'little', signed=True)
hash1 = sha1(new_nonce + server_nonce).digest()
hash2 = sha1(server_nonce + new_nonce).digest()
hash3 = sha1(new_nonce + new_nonce).digest()
key = hash1 + hash2[:12]
iv = hash2[12:20] + hash3 + new_nonce[:4]
return key, iv
# endregion
# region Custom Classes
class TotalList(list):
"""
A list with an extra `total` property, which may not match its `len`
since the total represents the total amount of items *available*
somewhere else, not the items *in this list*.
Examples:
.. code-block:: python
# Telethon returns these lists in some cases (for example,
# only when a chunk is returned, but the "total" count
# is available).
result = await client.get_messages(chat, limit=10)
print(result.total) # large number
print(len(result)) # 10
print(result[0]) # latest message
for x in result: # show the 10 messages
print(x.text)
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.total = 0
def __str__(self):
return '[{}, total={}]'.format(
', '.join(str(x) for x in self), self.total)
def __repr__(self):
return '[{}, total={}]'.format(
', '.join(repr(x) for x in self), self.total)
class _FileStream(io.IOBase):
"""
Proxy around things that represent a file and need to be used as streams
which may or not need to be closed.
This will handle `pathlib.Path`, `str` paths, in-memory `bytes`, and
anything IO-like (including `aiofiles`).
It also provides access to the name and file size (also necessary).
"""
def __init__(self, file, *, file_size=None):
if isinstance(file, Path):
file = str(file.absolute())
self._file = file
self._name = None
self._size = file_size
self._stream = None
self._close_stream = None
async def __aenter__(self):
if isinstance(self._file, str):
self._name = os.path.basename(self._file)
self._size = os.path.getsize(self._file)
self._stream = open(self._file, 'rb')
self._close_stream = True
elif isinstance(self._file, bytes):
self._size = len(self._file)
self._stream = io.BytesIO(self._file)
self._close_stream = True
elif not callable(getattr(self._file, 'read', None)):
raise TypeError('file description should have a `read` method')
elif self._size is not None:
self._name = getattr(self._file, 'name', None)
self._stream = self._file
self._close_stream = False
else:
if callable(getattr(self._file, 'seekable', None)):
seekable = await _maybe_await(self._file.seekable())
else:
seekable = False
if seekable:
pos = await _maybe_await(self._file.tell())
await _maybe_await(self._file.seek(0, os.SEEK_END))
self._size = await _maybe_await(self._file.tell())
await _maybe_await(self._file.seek(pos, os.SEEK_SET))
self._stream = self._file
self._close_stream = False
else:
_log.warning(
'Could not determine file size beforehand so the entire '
'file will be read in-memory')
data = await _maybe_await(self._file.read())
self._size = len(data)
self._stream = io.BytesIO(data)
self._close_stream = True
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self._close_stream and self._stream:
await _maybe_await(self._stream.close())
@property
def file_size(self):
return self._size
@property
def name(self):
return self._name
# Proxy all the methods. Doesn't need to be readable (makes multiline edits easier)
def read(self, *args, **kwargs): return self._stream.read(*args, **kwargs)
def readinto(self, *args, **kwargs): return self._stream.readinto(*args, **kwargs)
def write(self, *args, **kwargs): return self._stream.write(*args, **kwargs)
def fileno(self, *args, **kwargs): return self._stream.fileno(*args, **kwargs)
def flush(self, *args, **kwargs): return self._stream.flush(*args, **kwargs)
def isatty(self, *args, **kwargs): return self._stream.isatty(*args, **kwargs)
def readable(self, *args, **kwargs): return self._stream.readable(*args, **kwargs)
def readline(self, *args, **kwargs): return self._stream.readline(*args, **kwargs)
def readlines(self, *args, **kwargs): return self._stream.readlines(*args, **kwargs)
def seek(self, *args, **kwargs): return self._stream.seek(*args, **kwargs)
def seekable(self, *args, **kwargs): return self._stream.seekable(*args, **kwargs)
def tell(self, *args, **kwargs): return self._stream.tell(*args, **kwargs)
def truncate(self, *args, **kwargs): return self._stream.truncate(*args, **kwargs)
def writable(self, *args, **kwargs): return self._stream.writable(*args, **kwargs)
def writelines(self, *args, **kwargs): return self._stream.writelines(*args, **kwargs)
# close is special because it will be called by __del__ but we do NOT
# want to close the file unless we have to (we're just a wrapper).
# Instead, we do nothing (we should be used through the decorator which
# has its own mechanism to close the file correctly).
def close(self, *args, **kwargs):
pass
# endregion

60
telethon/_misc/hints.py Normal file
View File

@@ -0,0 +1,60 @@
import datetime
import typing
from . import helpers
from .. import _tl
from ..types import _custom
Phone = str
Username = str
PeerID = int
Entity = typing.Union[_tl.User, _tl.Chat, _tl.Channel]
FullEntity = typing.Union[_tl.UserFull, _tl.messages.ChatFull, _tl.ChatFull, _tl.ChannelFull]
EntityLike = typing.Union[
Phone,
Username,
PeerID,
_tl.TypePeer,
_tl.TypeInputPeer,
Entity,
FullEntity
]
EntitiesLike = typing.Union[EntityLike, typing.Sequence[EntityLike]]
ButtonLike = typing.Union[_tl.TypeKeyboardButton, _custom.Button]
MarkupLike = typing.Union[
_tl.TypeReplyMarkup,
ButtonLike,
typing.Sequence[ButtonLike],
typing.Sequence[typing.Sequence[ButtonLike]]
]
TotalList = helpers.TotalList
DateLike = typing.Optional[typing.Union[float, datetime.datetime, datetime.date, datetime.timedelta]]
LocalPath = str
ExternalUrl = str
BotFileID = str
FileLike = typing.Union[
LocalPath,
ExternalUrl,
BotFileID,
bytes,
typing.BinaryIO,
_tl.TypeMessageMedia,
_tl.TypeInputFile,
_tl.TypeInputFileLocation
]
OutFileLike = typing.Union[
str,
typing.Type[bytes],
typing.BinaryIO
]
MessageLike = typing.Union[str, _tl.Message]
MessageIDLike = typing.Union[int, _tl.Message, _tl.TypeInputMessage]
ProgressCallback = typing.Callable[[int, int], None]

225
telethon/_misc/html.py Normal file
View File

@@ -0,0 +1,225 @@
"""
Simple HTML -> Telegram entity parser.
"""
import struct
from collections import deque
from html import escape
from html.parser import HTMLParser
from typing import Iterable, Optional, Tuple, List
from .._misc import helpers
from .. import _tl
# Helpers from markdown.py
def _add_surrogate(text):
return ''.join(
''.join(chr(y) for y in struct.unpack('<HH', x.encode('utf-16le')))
if (0x10000 <= ord(x) <= 0x10FFFF) else x for x in text
)
def _del_surrogate(text):
return text.encode('utf-16', 'surrogatepass').decode('utf-16')
class HTMLToTelegramParser(HTMLParser):
def __init__(self):
super().__init__()
self.text = ''
self.entities = []
self._building_entities = {}
self._open_tags = deque()
self._open_tags_meta = deque()
def handle_starttag(self, tag, attrs):
self._open_tags.appendleft(tag)
self._open_tags_meta.appendleft(None)
attrs = dict(attrs)
EntityType = None
args = {}
if tag == 'strong' or tag == 'b':
EntityType = _tl.MessageEntityBold
elif tag == 'em' or tag == 'i':
EntityType = _tl.MessageEntityItalic
elif tag == 'u':
EntityType = _tl.MessageEntityUnderline
elif tag == 'del' or tag == 's':
EntityType = _tl.MessageEntityStrike
elif tag == 'tg-spoiler':
EntityType = _tl.MessageEntitySpoiler
elif tag == 'blockquote':
EntityType = _tl.MessageEntityBlockquote
elif tag == 'code':
try:
# If we're in the middle of a <pre> tag, this <code> tag is
# probably intended for syntax highlighting.
#
# Syntax highlighting is set with
# <code class='language-...'>codeblock</code>
# inside <pre> tags
pre = self._building_entities['pre']
try:
pre.language = attrs['class'][len('language-'):]
except KeyError:
pass
except KeyError:
EntityType = _tl.MessageEntityCode
elif tag == 'pre':
EntityType = _tl.MessageEntityPre
args['language'] = ''
elif tag == 'a':
try:
url = attrs['href']
except KeyError:
return
if url.startswith('mailto:'):
url = url[len('mailto:'):]
EntityType = _tl.MessageEntityEmail
else:
if self.get_starttag_text() == url:
EntityType = _tl.MessageEntityUrl
else:
EntityType = _tl.MessageEntityTextUrl
args['url'] = url
url = None
self._open_tags_meta.popleft()
self._open_tags_meta.appendleft(url)
if EntityType and tag not in self._building_entities:
self._building_entities[tag] = EntityType(
offset=len(self.text),
# The length will be determined when closing the tag.
length=0,
**args)
def handle_data(self, text):
previous_tag = self._open_tags[0] if len(self._open_tags) > 0 else ''
if previous_tag == 'a':
url = self._open_tags_meta[0]
if url:
text = url
for tag, entity in self._building_entities.items():
entity.length += len(text)
self.text += text
def handle_endtag(self, tag):
try:
self._open_tags.popleft()
self._open_tags_meta.popleft()
except IndexError:
pass
entity = self._building_entities.pop(tag, None)
if entity:
self.entities.append(entity)
def parse(html: str) -> Tuple[str, List[_tl.TypeMessageEntity]]:
"""
Parses the given HTML message and returns its stripped representation
plus a list of the _tl.MessageEntity's that were found.
:param html: the message with HTML to be parsed.
:return: a tuple consisting of (clean message, [message entities]).
"""
if not html:
return html, []
parser = HTMLToTelegramParser()
parser.feed(_add_surrogate(html))
text = helpers.strip_text(parser.text, parser.entities)
return _del_surrogate(text), parser.entities
def unparse(text: str, entities: Iterable[_tl.TypeMessageEntity], _offset: int = 0,
_length: Optional[int] = None) -> str:
"""
Performs the reverse operation to .parse(), effectively returning HTML
given a normal text and its _tl.MessageEntity's.
:param text: the text to be reconverted into HTML.
:param entities: the _tl.MessageEntity's applied to the text.
:return: a HTML representation of the combination of both inputs.
"""
if not text:
return text
elif not entities:
return escape(text)
text = _add_surrogate(text)
if _length is None:
_length = len(text)
html = []
last_offset = 0
for i, entity in enumerate(entities):
if entity.offset >= _offset + _length:
break
relative_offset = entity.offset - _offset
if relative_offset > last_offset:
html.append(escape(text[last_offset:relative_offset]))
elif relative_offset < last_offset:
continue
skip_entity = False
length = entity.length
# If we are in the middle of a surrogate nudge the position by +1.
# Otherwise we would end up with malformed text and fail to encode.
# For example of bad input: "Hi \ud83d\ude1c"
# https://en.wikipedia.org/wiki/UTF-16#U+010000_to_U+10FFFF
while helpers.within_surrogate(text, relative_offset, length=_length):
relative_offset += 1
while helpers.within_surrogate(text, relative_offset + length, length=_length):
length += 1
entity_text = unparse(text=text[relative_offset:relative_offset + length],
entities=entities[i + 1:],
_offset=entity.offset, _length=length)
entity_type = type(entity)
if entity_type == _tl.MessageEntityBold:
html.append('<strong>{}</strong>'.format(entity_text))
elif entity_type == _tl.MessageEntityItalic:
html.append('<em>{}</em>'.format(entity_text))
elif entity_type == _tl.MessageEntityCode:
html.append('<code>{}</code>'.format(entity_text))
elif entity_type == _tl.MessageEntityUnderline:
html.append('<u>{}</u>'.format(entity_text))
elif entity_type == _tl.MessageEntityStrike:
html.append('<del>{}</del>'.format(entity_text))
elif entity_type == _tl.MessageEntityBlockquote:
html.append('<blockquote>{}</blockquote>'.format(entity_text))
elif entity_type == _tl.MessageEntityPre:
if entity.language:
html.append(
"<pre>\n"
" <code class='language-{}'>\n"
" {}\n"
" </code>\n"
"</pre>".format(entity.language, entity_text))
else:
html.append('<pre><code>{}</code></pre>'
.format(entity_text))
elif entity_type == _tl.MessageEntityEmail:
html.append('<a href="mailto:{0}">{0}</a>'.format(entity_text))
elif entity_type == _tl.MessageEntityUrl:
html.append('<a href="{0}">{0}</a>'.format(entity_text))
elif entity_type == _tl.MessageEntityTextUrl:
html.append('<a href="{}">{}</a>'
.format(escape(entity.url), entity_text))
elif entity_type == _tl.MessageEntityMentionName:
html.append('<a href="tg://user?id={}">{}</a>'
.format(entity.user_id, entity_text))
else:
skip_entity = True
last_offset = relative_offset + (0 if skip_entity else length)
while helpers.within_surrogate(text, last_offset, length=_length):
last_offset += 1
html.append(escape(text[last_offset:]))
return _del_surrogate(''.join(html))

169
telethon/_misc/markdown.py Normal file
View File

@@ -0,0 +1,169 @@
"""
Simple markdown parser which does not support nesting. Intended primarily
for use within the library, which attempts to handle emojies correctly,
since they seem to count as two characters and it's a bit strange.
"""
import re
import warnings
import markdown_it
from .helpers import add_surrogate, del_surrogate, within_surrogate, strip_text
from .. import _tl
from .._misc import tlobject
MARKDOWN = markdown_it.MarkdownIt().enable('strikethrough')
DELIMITERS = {
_tl.MessageEntityBlockquote: ('> ', ''),
_tl.MessageEntityBold: ('**', '**'),
_tl.MessageEntityCode: ('`', '`'),
_tl.MessageEntityItalic: ('_', '_'),
_tl.MessageEntityStrike: ('~~', '~~'),
_tl.MessageEntitySpoiler: ('||', '||'),
_tl.MessageEntityUnderline: ('# ', ''),
}
# Not trying to be complete; just enough to have an alternative (mostly for inline underline).
# The fact headings are treated as underline is an implementation detail.
TAG_PATTERN = re.compile(r'<\s*(/?)\s*(\w+)')
HTML_TO_TYPE = {
'i': ('em_close', 'em_open'),
'em': ('em_close', 'em_open'),
'b': ('strong_close', 'strong_open'),
'strong': ('strong_close', 'strong_open'),
's': ('s_close', 's_open'),
'del': ('s_close', 's_open'),
'u': ('heading_open', 'heading_close'),
'mark': ('heading_open', 'heading_close'),
}
def expand_inline_and_html(tokens):
for token in tokens:
if token.type == 'inline':
yield from expand_inline_and_html(token.children)
elif token.type == 'html_inline':
match = TAG_PATTERN.match(token.content)
if match:
close, tag = match.groups()
tys = HTML_TO_TYPE.get(tag.lower())
if tys:
token.type = tys[bool(close)]
token.nesting = -1 if close else 1
yield token
else:
yield token
def parse(message):
"""
Parses the given markdown message and returns its stripped representation
plus a list of the _tl.MessageEntity's that were found.
"""
if not message:
return message, []
def push(ty, **extra):
nonlocal message, entities, token
if token.nesting > 0:
entities.append(ty(offset=len(message), length=0, **extra))
else:
for entity in reversed(entities):
if isinstance(entity, ty):
entity.length = len(message) - entity.offset
break
parsed = MARKDOWN.parse(add_surrogate(message.strip()))
message = ''
entities = []
last_map = [0, 0]
for token in expand_inline_and_html(parsed):
if token.map is not None and token.map != last_map:
# paragraphs, quotes fences have a line mapping. Use it to determine how many newlines to insert.
# But don't inssert any (leading) new lines if we're yet to reach the first textual content, or
# if the mappings are the same (e.g. a quote then opens a paragraph but the mapping is equal).
if message:
message += '\n' + '\n' * (token.map[0] - last_map[-1])
last_map = token.map
if token.type in ('blockquote_close', 'blockquote_open'):
push(_tl.MessageEntityBlockquote)
elif token.type == 'code_block':
entities.append(_tl.MessageEntityPre(offset=len(message), length=len(token.content), language=''))
message += token.content
elif token.type == 'code_inline':
entities.append(_tl.MessageEntityCode(offset=len(message), length=len(token.content)))
message += token.content
elif token.type in ('em_close', 'em_open'):
push(_tl.MessageEntityItalic)
elif token.type == 'fence':
entities.append(_tl.MessageEntityPre(offset=len(message), length=len(token.content), language=token.info))
message += token.content[:-1] # remove a single trailing newline
elif token.type == 'hardbreak':
message += '\n'
elif token.type in ('heading_close', 'heading_open'):
push(_tl.MessageEntityUnderline)
elif token.type == 'hr':
message += '\u2015\n\n'
elif token.type in ('link_close', 'link_open'):
if token.markup != 'autolink': # telegram already picks up on these automatically
push(_tl.MessageEntityTextUrl, url=token.attrs.get('href'))
elif token.type in ('s_close', 's_open'):
push(_tl.MessageEntityStrike)
elif token.type == 'softbreak':
message += ' '
elif token.type in ('strong_close', 'strong_open'):
push(_tl.MessageEntityBold)
elif token.type == 'text':
message += token.content
return del_surrogate(message), entities
def unparse(text, entities):
"""
Performs the reverse operation to .parse(), effectively returning
markdown-like syntax given a normal text and its _tl.MessageEntity's.
Because there are many possible ways for markdown to produce a certain
output, this function cannot invert .parse() perfectly.
"""
if not text or not entities:
return text
if isinstance(entities, tlobject.TLObject):
entities = (entities,)
text = add_surrogate(text)
insert_at = []
for entity in entities:
s = entity.offset
e = entity.offset + entity.length
delimiter = DELIMITERS.get(type(entity), None)
if delimiter:
insert_at.append((s, delimiter[0]))
insert_at.append((e, delimiter[1]))
elif isinstance(entity, _tl.MessageEntityPre):
insert_at.append((s, f'```{entity.language}\n'))
insert_at.append((e, '```\n'))
elif isinstance(entity, _tl.MessageEntityTextUrl):
insert_at.append((s, '['))
insert_at.append((e, f']({entity.url})'))
elif isinstance(entity, _tl.MessageEntityMentionName):
insert_at.append((s, '['))
insert_at.append((e, f'](tg://user?id={entity.user_id})'))
insert_at.sort(key=lambda t: t[0])
while insert_at:
at, what = insert_at.pop()
# If we are in the middle of a surrogate nudge the position by -1.
# Otherwise we would end up with malformed text and fail to encode.
# For example of bad input: "Hi \ud83d\ude1c"
# https://en.wikipedia.org/wiki/UTF-16#U+010000_to_U+10FFFF
while within_surrogate(text, at):
at += 1
text = text[:at] + what + text[at:]
return del_surrogate(text)

View File

@@ -0,0 +1,110 @@
import asyncio
import collections
import io
import struct
from .._tl import TLRequest
from ..types._core import MessageContainer, TLMessage
class MessagePacker:
"""
This class packs `RequestState` as outgoing `TLMessages`.
The purpose of this class is to support putting N `RequestState` into a
queue, and then awaiting for "packed" `TLMessage` in the other end. The
simplest case would be ``State -> TLMessage`` (1-to-1 relationship) but
for efficiency purposes it's ``States -> Container`` (N-to-1).
This addresses several needs: outgoing messages will be smaller, so the
encryption and network overhead also is smaller. It's also a central
point where outgoing requests are put, and where ready-messages are get.
"""
def __init__(self, state, loggers):
self._state = state
self._deque = collections.deque()
self._ready = asyncio.Event()
self._log = loggers[__name__]
def append(self, state):
self._deque.append(state)
self._ready.set()
def extend(self, states):
self._deque.extend(states)
self._ready.set()
async def get(self):
"""
Returns (batch, data) if one or more items could be retrieved.
If the cancellation occurs or only invalid items were in the
queue, (None, None) will be returned instead.
"""
if not self._deque:
self._ready.clear()
await self._ready.wait()
buffer = io.BytesIO()
batch = []
size = 0
# Fill a new batch to return while the size is small enough,
# as long as we don't exceed the maximum length of messages.
while self._deque and len(batch) <= MessageContainer.MAXIMUM_LENGTH:
state = self._deque.popleft()
size += len(state.data) + TLMessage.SIZE_OVERHEAD
if size <= MessageContainer.MAXIMUM_SIZE:
state.msg_id = self._state.write_data_as_message(
buffer, state.data, isinstance(state.request, TLRequest),
after_id=state.after.msg_id if state.after else None
)
batch.append(state)
self._log.debug('Assigned msg_id = %d to %s (%x)',
state.msg_id, state.request.__class__.__name__,
id(state.request))
continue
if batch:
# Put the item back since it can't be sent in this batch
self._deque.appendleft(state)
break
# If a single message exceeds the maximum size, then the
# message payload cannot be sent. Telegram would forcibly
# close the connection; message would never be confirmed.
#
# We don't put the item back because it can never be sent.
# If we did, we would loop again and reach this same path.
# Setting the exception twice results in `InvalidStateError`
# and this method should never return with error, which we
# really want to avoid.
self._log.warning(
'Message payload for %s is too long (%d) and cannot be sent',
state.request.__class__.__name__, len(state.data)
)
state.future.set_exception(
ValueError('Request payload is too big'))
size = 0
continue
if not batch:
return None, None
if len(batch) > 1:
# Inlined code to pack several messages into a container
data = struct.pack(
'<Ii', MessageContainer.CONSTRUCTOR_ID, len(batch)
) + buffer.getvalue()
buffer = io.BytesIO()
container_id = self._state.write_data_as_message(
buffer, data, content_related=False
)
for s in batch:
s.container_id = container_id
data = buffer.getvalue()
return batch, data

194
telethon/_misc/password.py Normal file
View File

@@ -0,0 +1,194 @@
import hashlib
import os
from .._crypto import factorization
from .. import _tl
def check_prime_and_good_check(prime: int, g: int):
good_prime_bits_count = 2048
if prime < 0 or prime.bit_length() != good_prime_bits_count:
raise ValueError('bad prime count {}, expected {}'
.format(prime.bit_length(), good_prime_bits_count))
# TODO This is awfully slow
if factorization.Factorization.factorize(prime)[0] != 1:
raise ValueError('given "prime" is not prime')
if g == 2:
if prime % 8 != 7:
raise ValueError('bad g {}, mod8 {}'.format(g, prime % 8))
elif g == 3:
if prime % 3 != 2:
raise ValueError('bad g {}, mod3 {}'.format(g, prime % 3))
elif g == 4:
pass
elif g == 5:
if prime % 5 not in (1, 4):
raise ValueError('bad g {}, mod5 {}'.format(g, prime % 5))
elif g == 6:
if prime % 24 not in (19, 23):
raise ValueError('bad g {}, mod24 {}'.format(g, prime % 24))
elif g == 7:
if prime % 7 not in (3, 5, 6):
raise ValueError('bad g {}, mod7 {}'.format(g, prime % 7))
else:
raise ValueError('bad g {}'.format(g))
prime_sub1_div2 = (prime - 1) // 2
if factorization.Factorization.factorize(prime_sub1_div2)[0] != 1:
raise ValueError('(prime - 1) // 2 is not prime')
# Else it's good
def check_prime_and_good(prime_bytes: bytes, g: int):
good_prime = bytes((
0xC7, 0x1C, 0xAE, 0xB9, 0xC6, 0xB1, 0xC9, 0x04, 0x8E, 0x6C, 0x52, 0x2F, 0x70, 0xF1, 0x3F, 0x73,
0x98, 0x0D, 0x40, 0x23, 0x8E, 0x3E, 0x21, 0xC1, 0x49, 0x34, 0xD0, 0x37, 0x56, 0x3D, 0x93, 0x0F,
0x48, 0x19, 0x8A, 0x0A, 0xA7, 0xC1, 0x40, 0x58, 0x22, 0x94, 0x93, 0xD2, 0x25, 0x30, 0xF4, 0xDB,
0xFA, 0x33, 0x6F, 0x6E, 0x0A, 0xC9, 0x25, 0x13, 0x95, 0x43, 0xAE, 0xD4, 0x4C, 0xCE, 0x7C, 0x37,
0x20, 0xFD, 0x51, 0xF6, 0x94, 0x58, 0x70, 0x5A, 0xC6, 0x8C, 0xD4, 0xFE, 0x6B, 0x6B, 0x13, 0xAB,
0xDC, 0x97, 0x46, 0x51, 0x29, 0x69, 0x32, 0x84, 0x54, 0xF1, 0x8F, 0xAF, 0x8C, 0x59, 0x5F, 0x64,
0x24, 0x77, 0xFE, 0x96, 0xBB, 0x2A, 0x94, 0x1D, 0x5B, 0xCD, 0x1D, 0x4A, 0xC8, 0xCC, 0x49, 0x88,
0x07, 0x08, 0xFA, 0x9B, 0x37, 0x8E, 0x3C, 0x4F, 0x3A, 0x90, 0x60, 0xBE, 0xE6, 0x7C, 0xF9, 0xA4,
0xA4, 0xA6, 0x95, 0x81, 0x10, 0x51, 0x90, 0x7E, 0x16, 0x27, 0x53, 0xB5, 0x6B, 0x0F, 0x6B, 0x41,
0x0D, 0xBA, 0x74, 0xD8, 0xA8, 0x4B, 0x2A, 0x14, 0xB3, 0x14, 0x4E, 0x0E, 0xF1, 0x28, 0x47, 0x54,
0xFD, 0x17, 0xED, 0x95, 0x0D, 0x59, 0x65, 0xB4, 0xB9, 0xDD, 0x46, 0x58, 0x2D, 0xB1, 0x17, 0x8D,
0x16, 0x9C, 0x6B, 0xC4, 0x65, 0xB0, 0xD6, 0xFF, 0x9C, 0xA3, 0x92, 0x8F, 0xEF, 0x5B, 0x9A, 0xE4,
0xE4, 0x18, 0xFC, 0x15, 0xE8, 0x3E, 0xBE, 0xA0, 0xF8, 0x7F, 0xA9, 0xFF, 0x5E, 0xED, 0x70, 0x05,
0x0D, 0xED, 0x28, 0x49, 0xF4, 0x7B, 0xF9, 0x59, 0xD9, 0x56, 0x85, 0x0C, 0xE9, 0x29, 0x85, 0x1F,
0x0D, 0x81, 0x15, 0xF6, 0x35, 0xB1, 0x05, 0xEE, 0x2E, 0x4E, 0x15, 0xD0, 0x4B, 0x24, 0x54, 0xBF,
0x6F, 0x4F, 0xAD, 0xF0, 0x34, 0xB1, 0x04, 0x03, 0x11, 0x9C, 0xD8, 0xE3, 0xB9, 0x2F, 0xCC, 0x5B))
if good_prime == prime_bytes:
if g in (3, 4, 5, 7):
return # It's good
check_prime_and_good_check(int.from_bytes(prime_bytes, 'big'), g)
def is_good_large(number: int, p: int) -> bool:
return number > 0 and p - number > 0
SIZE_FOR_HASH = 256
def num_bytes_for_hash(number: bytes) -> bytes:
return bytes(SIZE_FOR_HASH - len(number)) + number
def big_num_for_hash(g: int) -> bytes:
return g.to_bytes(SIZE_FOR_HASH, 'big')
def sha256(*p: bytes) -> bytes:
hash = hashlib.sha256()
for q in p:
hash.update(q)
return hash.digest()
def is_good_mod_exp_first(modexp, prime) -> bool:
diff = prime - modexp
min_diff_bits_count = 2048 - 64
max_mod_exp_size = 256
if diff < 0 or \
diff.bit_length() < min_diff_bits_count or \
modexp.bit_length() < min_diff_bits_count or \
(modexp.bit_length() + 7) // 8 > max_mod_exp_size:
return False
return True
def xor(a: bytes, b: bytes) -> bytes:
return bytes(x ^ y for x, y in zip(a, b))
def pbkdf2sha512(password: bytes, salt: bytes, iterations: int):
return hashlib.pbkdf2_hmac('sha512', password, salt, iterations)
def compute_hash(algo: _tl.PasswordKdfAlgoSHA256SHA256PBKDF2HMACSHA512iter100000SHA256ModPow,
password: str):
hash1 = sha256(algo.salt1, password.encode('utf-8'), algo.salt1)
hash2 = sha256(algo.salt2, hash1, algo.salt2)
hash3 = pbkdf2sha512(hash2, algo.salt1, 100000)
return sha256(algo.salt2, hash3, algo.salt2)
def compute_digest(algo: _tl.PasswordKdfAlgoSHA256SHA256PBKDF2HMACSHA512iter100000SHA256ModPow,
password: str):
try:
check_prime_and_good(algo.p, algo.g)
except ValueError:
raise ValueError('bad p/g in password')
value = pow(algo.g,
int.from_bytes(compute_hash(algo, password), 'big'),
int.from_bytes(algo.p, 'big'))
return big_num_for_hash(value)
# https://github.com/telegramdesktop/tdesktop/blob/18b74b90451a7db2379a9d753c9cbaf8734b4d5d/Telegram/SourceFiles/core/core_cloud_password.cpp
def compute_check(request: _tl.account.Password, password: str):
algo = request.current_algo
if not isinstance(algo, _tl.PasswordKdfAlgoSHA256SHA256PBKDF2HMACSHA512iter100000SHA256ModPow):
raise ValueError('unsupported password algorithm {}'
.format(algo.__class__.__name__))
pw_hash = compute_hash(algo, password)
p = int.from_bytes(algo.p, 'big')
g = algo.g
B = int.from_bytes(request.srp_B, 'big')
try:
check_prime_and_good(algo.p, g)
except ValueError:
raise ValueError('bad p/g in password')
if not is_good_large(B, p):
raise ValueError('bad b in check')
x = int.from_bytes(pw_hash, 'big')
p_for_hash = num_bytes_for_hash(algo.p)
g_for_hash = big_num_for_hash(g)
b_for_hash = num_bytes_for_hash(request.srp_B)
g_x = pow(g, x, p)
k = int.from_bytes(sha256(p_for_hash, g_for_hash), 'big')
kg_x = (k * g_x) % p
def generate_and_check_random():
random_size = 256
while True:
random = os.urandom(random_size)
a = int.from_bytes(random, 'big')
A = pow(g, a, p)
if is_good_mod_exp_first(A, p):
a_for_hash = big_num_for_hash(A)
u = int.from_bytes(sha256(a_for_hash, b_for_hash), 'big')
if u > 0:
return (a, a_for_hash, u)
a, a_for_hash, u = generate_and_check_random()
g_b = (B - kg_x) % p
if not is_good_mod_exp_first(g_b, p):
raise ValueError('bad g_b')
ux = u * x
a_ux = a + ux
S = pow(g_b, a_ux, p)
K = sha256(big_num_for_hash(S))
M1 = sha256(
xor(sha256(p_for_hash), sha256(g_for_hash)),
sha256(algo.salt1),
sha256(algo.salt2),
a_for_hash,
b_for_hash,
K
)
return _tl.InputCheckPasswordSRP(
request.srp_id, bytes(a_for_hash), bytes(M1))

View File

@@ -0,0 +1,129 @@
import abc
import asyncio
import time
from . import helpers
class RequestIter(abc.ABC):
"""
Helper class to deal with requests that need offsets to iterate.
It has some facilities, such as automatically sleeping a desired
amount of time between requests if needed (but not more).
`limit` is the total amount of items that the iterator should return.
This is handled on this base class, and will be always ``>= 0``.
`left` will be reset every time the iterator is used and will indicate
the amount of items that should be emitted left, so that subclasses can
be more efficient and fetch only as many items as they need.
Iterators may be used with ``reversed``, and their `reverse` flag will
be set to `True` if that's the case. Note that if this flag is set,
`buffer` should be filled in reverse too.
"""
def __init__(self, client, limit, *, reverse=False, wait_time=None, **kwargs):
self.client = client
self.reverse = reverse
self.wait_time = wait_time
self.kwargs = kwargs
self.limit = max(float('inf') if limit is None or limit == () else limit, 0)
self.left = self.limit
self.buffer = None
self.index = 0
self.total = None
self.last_load = 0
self.return_single = limit == 1 or limit == ()
async def _init(self, **kwargs):
"""
Called when asynchronous initialization is necessary. All keyword
arguments passed to `__init__` will be forwarded here, and it's
preferable to use named arguments in the subclasses without defaults
to avoid forgetting or misspelling any of them.
This method may ``raise StopAsyncIteration`` if it cannot continue.
This method may actually fill the initial buffer if it needs to,
and similarly to `_load_next_chunk`, ``return True`` to indicate
that this is the last iteration (just the initial load).
"""
async def __anext__(self):
if self.buffer is None:
self.buffer = []
if await self._init(**self.kwargs):
self.left = len(self.buffer)
if self.left <= 0: # <= 0 because subclasses may change it
raise StopAsyncIteration
if self.index == len(self.buffer):
# asyncio will handle times <= 0 to sleep 0 seconds
if self.wait_time:
await asyncio.sleep(
self.wait_time - (time.time() - self.last_load)
)
self.last_load = time.time()
self.index = 0
self.buffer = []
if await self._load_next_chunk():
self.left = len(self.buffer)
if not self.buffer:
raise StopAsyncIteration
result = self.buffer[self.index]
self.left -= 1
self.index += 1
return result
def __aiter__(self):
self.buffer = None
self.index = 0
self.last_load = 0
self.left = self.limit
return self
async def collect(self, force_list=True):
"""
Create a `self` iterator and collect it into a `TotalList`
(a normal list with a `.total` attribute).
If ``force_list`` is ``False`` and ``self.return_single`` is ``True``, no list
will be returned. Instead, either a single item or ``None`` will be returned.
"""
if not force_list and self.return_single:
self.limit = 1
async for message in self:
return message
return None
result = helpers.TotalList()
async for message in self:
result.append(message)
result.total = self.total
return result
@abc.abstractmethod
async def _load_next_chunk(self):
"""
Called when the next chunk is necessary.
It should extend the `buffer` with new items.
It should return `True` if it's the last chunk,
after which moment the method won't be called again
during the same iteration.
"""
raise NotImplementedError
def __reversed__(self):
self.reverse = not self.reverse
return self # __aiter__ will be called after, too
def __await__(self):
return self.collect(force_list=False).__await__()

158
telethon/_misc/tlobject.py Normal file
View File

@@ -0,0 +1,158 @@
import base64
import json
import struct
from datetime import datetime, date, timedelta, timezone
import time
from .helpers import pretty_print
_EPOCH_NAIVE = datetime(*time.gmtime(0)[:6])
_EPOCH_NAIVE_LOCAL = datetime(*time.localtime(0)[:6])
_EPOCH = _EPOCH_NAIVE.replace(tzinfo=timezone.utc)
def _datetime_to_timestamp(dt):
# If no timezone is specified, it is assumed to be in utc zone
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
# We use .total_seconds() method instead of simply dt.timestamp(),
# because on Windows the latter raises OSError on datetimes ~< datetime(1970,1,1)
secs = int((dt - _EPOCH).total_seconds())
# Make sure it's a valid signed 32 bit integer, as used by Telegram.
# This does make very large dates wrap around, but it's the best we
# can do with Telegram's limitations.
return struct.unpack('i', struct.pack('I', secs & 0xffffffff))[0]
def _json_default(value):
if isinstance(value, bytes):
return base64.b64encode(value).decode('ascii')
elif isinstance(value, datetime):
return value.isoformat()
else:
return repr(value)
class TLObject:
__slots__ = ()
CONSTRUCTOR_ID = None
SUBCLASS_OF_ID = None
@staticmethod
def serialize_bytes(data):
"""Write bytes by using Telegram guidelines"""
if not isinstance(data, bytes):
if isinstance(data, str):
data = data.encode('utf-8')
else:
raise TypeError(
'bytes or str expected, not {}'.format(type(data)))
r = []
if len(data) < 254:
padding = (len(data) + 1) % 4
if padding != 0:
padding = 4 - padding
r.append(bytes([len(data)]))
r.append(data)
else:
padding = len(data) % 4
if padding != 0:
padding = 4 - padding
r.append(bytes([
254,
len(data) % 256,
(len(data) >> 8) % 256,
(len(data) >> 16) % 256
]))
r.append(data)
r.append(bytes(padding))
return b''.join(r)
@staticmethod
def serialize_datetime(dt):
if not dt and not isinstance(dt, timedelta):
return b'\0\0\0\0'
if isinstance(dt, datetime):
dt = _datetime_to_timestamp(dt)
elif isinstance(dt, date):
dt = _datetime_to_timestamp(datetime(dt.year, dt.month, dt.day))
elif isinstance(dt, float):
dt = int(dt)
elif isinstance(dt, timedelta):
# Timezones are tricky. datetime.utcnow() + ... timestamp() works
dt = _datetime_to_timestamp(datetime.utcnow() + dt)
if isinstance(dt, int):
return struct.pack('<i', dt)
raise TypeError('Cannot interpret "{}" as a date.'.format(dt))
def __eq__(self, o):
return isinstance(o, type(self)) and self.to_dict() == o.to_dict()
def __ne__(self, o):
return not isinstance(o, type(self)) or self.to_dict() != o.to_dict()
def __repr__(self):
return pretty_print(self)
def __str__(self):
return pretty_print(self, max_depth=2)
def stringify(self):
return pretty_print(self, indent=0)
def to_dict(self):
res = {}
pre = ('', 'fn.')[isinstance(self, TLRequest)]
mod = self.__class__.__module__[self.__class__.__module__.rfind('.') + 1:]
if mod in ('_tl', 'fn'):
res['_'] = f'{pre}{self.__class__.__name__}'
else:
res['_'] = f'{pre}{mod}.{self.__class__.__name__}'
for slot in self.__slots__:
attr = getattr(self, slot)
if isinstance(attr, list):
res[slot] = [val.to_dict() if hasattr(val, 'to_dict') else val for val in attr]
else:
res[slot] = attr.to_dict() if hasattr(attr, 'to_dict') else attr
return res
def __bytes__(self):
try:
return self._bytes()
except AttributeError:
# If a type is wrong (e.g. expected `TLObject` but `int` was
# provided) it will try to access `._bytes()`, which will fail
# with `AttributeError`. This occurs in fact because the type
# was wrong, so raise the correct error type.
raise TypeError('a TLObject was expected but found something else')
# Custom objects will call `(...)._bytes()` and not `bytes(...)` so that
# if the wrong type is used (e.g. `int`) we won't try allocating a huge
# amount of data, which would cause a `MemoryError`.
def _bytes(self):
raise NotImplementedError
@classmethod
def from_reader(cls, reader):
raise NotImplementedError
class TLRequest(TLObject):
"""
Represents a content-related `TLObject` (a request that can be sent).
"""
@staticmethod
def read_result(reader):
return reader.tgread_object()
async def resolve(self, client, utils):
pass

1320
telethon/_misc/utils.py Normal file

File diff suppressed because it is too large Load Diff