diff --git a/telethon/utils.py b/telethon/utils.py index d2dce173..18723dc7 100644 --- a/telethon/utils.py +++ b/telethon/utils.py @@ -20,7 +20,7 @@ from mimetypes import guess_extension from types import GeneratorType from .extensions import markdown, html -from .helpers import add_surrogate, del_surrogate +from .helpers import add_surrogate, del_surrogate, strip_text from .tl import types try: @@ -1384,6 +1384,101 @@ def decode_waveform(waveform): return bytes(result) +def split_text(text, entities, *, limit=4096, max_entities=100, split_at=(r'\n', r'\s', '.')): + """ + Split a message text and entities into multiple messages, each with their + own set of entities. This allows sending a very large message as multiple + messages while respecting the formatting. + + Arguments + text (`str`): + The message text. + + entities (List[:tl:`MessageEntity`]) + The formatting entities. + + limit (`int`): + The maximum message length of each individual message. + + max_entities (`int`): + The maximum amount of entities that will be present in each + individual message. + + split_at (Tuplel[`str`]): + The list of regular expressions that will determine where to split + the text. By default, a newline is searched. If no newline is + present, a space is searched. If no space is found, the split will + be made at any character. + + The last expression should always match a character, or else the + text will stop being splitted and the resulting text may be larger + than the limit. + + Yields + Pairs of ``(str, entities)`` with the split message. + + Example + .. code-block:: python + + from telethon import utils + from telethon.extensions import markdown + + very_long_markdown_text = "..." + text, entities = markdown.parse(very_long_markdown_text) + + for text, entities in utils.split_text(text, entities): + await client.send_message(chat, text, formatting_entities=entities) + """ + # TODO add test cases (multiple entities beyond cutoff, at cutoff, splitting at emoji) + # TODO try to optimize this a bit more? (avoid new_ent, smarter update method) + def update(ent, **updates): + kwargs = ent.to_dict() + del kwargs['_'] + kwargs.update(updates) + return ent.__class__(**kwargs) + + text = add_surrogate(text) + split_at = tuple(map(re.compile, split_at)) + + while True: + if len(entities) > max_entities: + last_ent = entities[max_entities - 1] + cur_limit = min(limit, last_ent.offset + last_ent.length) + else: + cur_limit = limit + + if len(text) <= cur_limit: + break + + for split in split_at: + for i in reversed(range(cur_limit)): + m = split.match(text, pos=i) + if m: + cur_text, new_text = text[:m.end()], text[m.end():] + cur_ent, new_ent = [], [] + for ent in entities: + if ent.offset < m.end(): + if ent.offset + ent.length > m.end(): + cur_ent.append(update(ent, length=m.end() - ent.offset)) + new_ent.append(update(ent, offset=0, length=ent.offset + ent.length - m.end())) + else: + cur_ent.append(ent) + else: + new_ent.append(update(ent, offset=ent.offset - m.end())) + + yield del_surrogate(cur_text), cur_ent + text, entities = new_text, new_ent + break + else: + continue + break + else: + # Can't find where to split, just return the remaining text and entities + break + + yield del_surrogate(text), entities + + class AsyncClassWrapper: def __init__(self, wrapped): self.wrapped = wrapped