From 56b09c0c9df16276153fa8ee38a664d1b7877ee3 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 7 Jun 2018 10:30:20 +0200 Subject: [PATCH] Properly set future results --- telethon/network/mtprotosender.py | 45 +++++++++++++++++++---- telethon/tl/tl_message.py | 17 ++++++++- telethon/tl/tlobject.py | 35 +++--------------- telethon_generator/generators/tlobject.py | 9 +++-- 4 files changed, 63 insertions(+), 43 deletions(-) diff --git a/telethon/network/mtprotosender.py b/telethon/network/mtprotosender.py index f03d5beb..b2012757 100644 --- a/telethon/network/mtprotosender.py +++ b/telethon/network/mtprotosender.py @@ -79,11 +79,34 @@ class MTProtoSender: self._recv_loop_handle.cancel() async def send(self, request): - # TODO Should the asyncio.Future creation belong here? - request.result = asyncio.Future() + """ + This method enqueues the given request to be sent. + + The request will be wrapped inside a `TLMessage` until its + response arrives, and the `Future` response of the `TLMessage` + is immediately returned so that one can further ``await`` it: + + .. code-block:: python + + async def method(): + # Sending (enqueued for the send loop) + future = await sender.send(request) + # Receiving (waits for the receive loop to read the result) + result = await future + + Designed like this because Telegram may send the response at + any point, and it can send other items while one waits for it. + Once the response for this future arrives, it is set with the + received result, quite similar to how a ``receive()`` call + would otherwise work. + + Since the receiving part is "built in" the future, it's + impossible to await receive a result that was never sent. + """ message = TLMessage(self.session, request) self._pending_messages[message.msg_id] = message await self._send_queue.put(message) + return message.future # Loops @@ -129,7 +152,7 @@ class MTProtoSender: inner_code = reader.read_int(signed=False) reader.seek(-4) - message = self._pending_messages.pop(message_id) + message = self._pending_messages.pop(message_id, None) if inner_code == 0x2144ca19: # RPC Error reader.seek(4) if self.session.report_errors and message: @@ -142,17 +165,23 @@ class MTProtoSender: reader.read_int(), reader.tgread_string() ) - # TODO Acknowledge that we received the error request_id - # TODO Set message.request exception + await self._send_queue.put( + TLMessage(self.session, MsgsAck([msg_id]))) + + if not message.future.cancelled(): + message.future.set_exception(error) + return elif message: - # TODO Make on_response result.set_result() instead replacing it if inner_code == GzipPacked.CONSTRUCTOR_ID: with BinaryReader(GzipPacked.read(reader)) as compressed_reader: - message.on_response(compressed_reader) + result = message.request.read_result(compressed_reader) else: - message.on_response(reader) + result = message.request.read_result(reader) # TODO Process possible entities + if not message.future.cancelled(): + message.future.set_result(result) + return # TODO Try reading an object diff --git a/telethon/tl/tl_message.py b/telethon/tl/tl_message.py index f6246de2..da144a91 100644 --- a/telethon/tl/tl_message.py +++ b/telethon/tl/tl_message.py @@ -1,3 +1,4 @@ +import asyncio import struct from . import TLObject, GzipPacked @@ -5,7 +6,20 @@ from ..tl.functions import InvokeAfterMsgRequest class TLMessage(TLObject): - """https://core.telegram.org/mtproto/service_messages#simple-container""" + """ + https://core.telegram.org/mtproto/service_messages#simple-container. + + Messages are what's ultimately sent to Telegram: + message msg_id:long seqno:int bytes:int body:bytes = Message; + + Each message has its own unique identifier, and the body is simply + the serialized request that should be executed on the server. Then + Telegram will, at some point, respond with the result for this msg. + + Thus it makes sense that requests and their result are bound to a + sent `TLMessage`, and this result can be represented as a `Future` + that will eventually be set with either a result, error or cancelled. + """ def __init__(self, session, request, after_id=None): super().__init__() del self.content_related @@ -13,6 +27,7 @@ class TLMessage(TLObject): self.seq_no = session.generate_sequence(request.content_related) self.request = request self.container_msg_id = None + self.future = asyncio.Future() # After which message ID this one should run. We do this so # InvokeAfterMsgRequest is transparent to the user and we can diff --git a/telethon/tl/tlobject.py b/telethon/tl/tlobject.py index 08a4fb70..900aae9b 100644 --- a/telethon/tl/tlobject.py +++ b/telethon/tl/tlobject.py @@ -5,35 +5,9 @@ from threading import Event class TLObject: def __init__(self): - self.rpc_error = None - self.result = None # An asyncio.Future set later - - # These should be overrode + # TODO Perhaps content_related makes more sense as another type? + # Something like class TLRequest(TLObject), request inherit this self.content_related = False # Only requests/functions/queries are - - # Internal parameter to tell pickler in which state Event object was - self._event_is_set = False - self._set_event() - - def _set_event(self): - self.confirm_received = Event() - - # Set Event state to 'set' if needed - if self._event_is_set: - self.confirm_received.set() - - def __getstate__(self): - # Save state of the Event object - self._event_is_set = self.confirm_received.is_set() - - # Exclude Event object from dict and return new state - new_dct = dict(self.__dict__) - del new_dct["confirm_received"] - return new_dct - - def __setstate__(self, state): - self.__dict__ = state - self._set_event() # These should not be overrode @staticmethod @@ -164,8 +138,9 @@ class TLObject: raise TypeError('Cannot interpret "{}" as a date.'.format(dt)) # These are nearly always the same for all subclasses - def on_response(self, reader): - self.result = reader.tgread_object() + @staticmethod + def read_result(reader): + return reader.tgread_object() def __eq__(self, o): return isinstance(o, type(self)) and self.to_dict() == o.to_dict() diff --git a/telethon_generator/generators/tlobject.py b/telethon_generator/generators/tlobject.py index 1058a1ae..d37c5d5c 100644 --- a/telethon_generator/generators/tlobject.py +++ b/telethon_generator/generators/tlobject.py @@ -142,7 +142,7 @@ def _write_source_code(tlobject, builder, type_constructors): _write_to_dict(tlobject, builder) _write_to_bytes(tlobject, builder) _write_from_reader(tlobject, builder) - _write_on_response(tlobject, builder) + _write_read_result(tlobject, builder) def _write_class_init(tlobject, type_constructors, builder): @@ -333,7 +333,7 @@ def _write_from_reader(tlobject, builder): '{0}=_{0}'.format(a.name) for a in tlobject.real_args)) -def _write_on_response(tlobject, builder): +def _write_read_result(tlobject, builder): # Only requests can have a different response that's not their # serialized body, that is, we'll be setting their .result. # @@ -354,9 +354,10 @@ def _write_on_response(tlobject, builder): return builder.end_block() - builder.writeln('def on_response(self, reader):') + builder.writeln('@staticmethod') + builder.writeln('def read_result(reader):') builder.writeln('reader.read_int() # Vector ID') - builder.writeln('self.result = [reader.read_{}() ' + builder.writeln('return [reader.read_{}() ' 'for _ in range(reader.read_int())]', m.group(1))