diff --git a/telethon/tl/custom/conversation.py b/telethon/tl/custom/conversation.py index 9df1bf58..5ff92bb6 100644 --- a/telethon/tl/custom/conversation.py +++ b/telethon/tl/custom/conversation.py @@ -285,11 +285,12 @@ class Conversation(ChatGetter): return await self._get_result(future, start_time, timeout, self._custom, counter) async def _check_custom(self, built): - for i, (ev, fut) in self._custom.items(): + for key, (ev, fut) in list(self._custom.items()): ev_type = type(ev) inst = built[ev_type] if inst and ev.filter(inst): fut.set_result(inst) + del self._custom[key] def _on_new_message(self, response): response = response.message @@ -302,22 +303,28 @@ class Conversation(ChatGetter): self._incoming.append(response) - # Note: we don't remove from pending here, that's done on get result - for msg_id, future in self._pending_responses.items(): + # Most of the time, these dictionaries will contain just one item + # TODO In fact, why not make it be that way? Force one item only. + # How often will people want to wait for two responses at + # the same time? It's impossible, first one will arrive + # and then another, so they can do that. + for msg_id, future in list(self._pending_responses.items()): self._response_indices[msg_id] = len(self._incoming) future.set_result(response) + del self._pending_responses[msg_id] - for msg_id, future in self._pending_replies.items(): + for msg_id, future in list(self._pending_replies.items()): if msg_id == response.reply_to_msg_id: self._reply_indices[msg_id] = len(self._incoming) future.set_result(response) + del self._pending_replies[msg_id] def _on_edit(self, message): message = message.message if message.chat_id != self.chat_id or message.out: return - for msg_id, future in self._pending_edits.items(): + for msg_id, future in list(self._pending_edits.items()): if msg_id < message.id: edit_ts = message.edit_date.timestamp() @@ -330,6 +337,7 @@ class Conversation(ChatGetter): self._edit_dates[msg_id] = message.edit_date.timestamp() future.set_result(message) + del self._pending_edits[msg_id] def _on_read(self, event): if event.chat_id != self.chat_id or event.inbox: @@ -338,10 +346,11 @@ class Conversation(ChatGetter): self._last_read = event.max_id remove_reads = [] - for msg_id, pending in self._pending_reads.items(): + for msg_id, pending in list(self._pending_reads.items()): if msg_id >= self._last_read: remove_reads.append(msg_id) pending.set_result(True) + del self._pending_reads[msg_id] for to_remove in remove_reads: del self._pending_reads[to_remove] @@ -365,14 +374,16 @@ class Conversation(ChatGetter): if timeout is not None: due = min(due, start_time + timeout) - try: - return await asyncio.wait_for( - future, - timeout=None if due == float('inf') else due - time.time(), - loop=self._client.loop - ) - finally: - del pending[target_id] + # NOTE: We can't try/finally to pop from pending here because + # the event loop needs to get back to us, but it might + # dispatch another update before, and in that case a + # response could be set twice. So responses must be + # cleared when their futures are set to a result. + return await asyncio.wait_for( + future, + timeout=None if due == float('inf') else due - time.time(), + loop=self._client.loop + ) def _cancel_all(self, exception=None): self._cancelled = True