Actually fix invalid state in Conversation (1354bf6 followup)

This commit is contained in:
Lonami Exo 2019-06-15 21:36:06 +02:00
parent 40d32cee95
commit 8d28d1145a

View File

@ -285,11 +285,12 @@ class Conversation(ChatGetter):
return await self._get_result(future, start_time, timeout, self._custom, counter) return await self._get_result(future, start_time, timeout, self._custom, counter)
async def _check_custom(self, built): async def _check_custom(self, built):
for i, (ev, fut) in self._custom.items(): for key, (ev, fut) in list(self._custom.items()):
ev_type = type(ev) ev_type = type(ev)
inst = built[ev_type] inst = built[ev_type]
if inst and ev.filter(inst): if inst and ev.filter(inst):
fut.set_result(inst) fut.set_result(inst)
del self._custom[key]
def _on_new_message(self, response): def _on_new_message(self, response):
response = response.message response = response.message
@ -302,22 +303,28 @@ class Conversation(ChatGetter):
self._incoming.append(response) self._incoming.append(response)
# Note: we don't remove from pending here, that's done on get result # Most of the time, these dictionaries will contain just one item
for msg_id, future in self._pending_responses.items(): # TODO In fact, why not make it be that way? Force one item only.
# How often will people want to wait for two responses at
# the same time? It's impossible, first one will arrive
# and then another, so they can do that.
for msg_id, future in list(self._pending_responses.items()):
self._response_indices[msg_id] = len(self._incoming) self._response_indices[msg_id] = len(self._incoming)
future.set_result(response) future.set_result(response)
del self._pending_responses[msg_id]
for msg_id, future in self._pending_replies.items(): for msg_id, future in list(self._pending_replies.items()):
if msg_id == response.reply_to_msg_id: if msg_id == response.reply_to_msg_id:
self._reply_indices[msg_id] = len(self._incoming) self._reply_indices[msg_id] = len(self._incoming)
future.set_result(response) future.set_result(response)
del self._pending_replies[msg_id]
def _on_edit(self, message): def _on_edit(self, message):
message = message.message message = message.message
if message.chat_id != self.chat_id or message.out: if message.chat_id != self.chat_id or message.out:
return return
for msg_id, future in self._pending_edits.items(): for msg_id, future in list(self._pending_edits.items()):
if msg_id < message.id: if msg_id < message.id:
edit_ts = message.edit_date.timestamp() edit_ts = message.edit_date.timestamp()
@ -330,6 +337,7 @@ class Conversation(ChatGetter):
self._edit_dates[msg_id] = message.edit_date.timestamp() self._edit_dates[msg_id] = message.edit_date.timestamp()
future.set_result(message) future.set_result(message)
del self._pending_edits[msg_id]
def _on_read(self, event): def _on_read(self, event):
if event.chat_id != self.chat_id or event.inbox: if event.chat_id != self.chat_id or event.inbox:
@ -338,10 +346,11 @@ class Conversation(ChatGetter):
self._last_read = event.max_id self._last_read = event.max_id
remove_reads = [] remove_reads = []
for msg_id, pending in self._pending_reads.items(): for msg_id, pending in list(self._pending_reads.items()):
if msg_id >= self._last_read: if msg_id >= self._last_read:
remove_reads.append(msg_id) remove_reads.append(msg_id)
pending.set_result(True) pending.set_result(True)
del self._pending_reads[msg_id]
for to_remove in remove_reads: for to_remove in remove_reads:
del self._pending_reads[to_remove] del self._pending_reads[to_remove]
@ -365,14 +374,16 @@ class Conversation(ChatGetter):
if timeout is not None: if timeout is not None:
due = min(due, start_time + timeout) due = min(due, start_time + timeout)
try: # NOTE: We can't try/finally to pop from pending here because
return await asyncio.wait_for( # the event loop needs to get back to us, but it might
future, # dispatch another update before, and in that case a
timeout=None if due == float('inf') else due - time.time(), # response could be set twice. So responses must be
loop=self._client.loop # cleared when their futures are set to a result.
) return await asyncio.wait_for(
finally: future,
del pending[target_id] timeout=None if due == float('inf') else due - time.time(),
loop=self._client.loop
)
def _cancel_all(self, exception=None): def _cancel_all(self, exception=None):
self._cancelled = True self._cancelled = True