diff --git a/client/src/telethon/_impl/mtsender/sender.py b/client/src/telethon/_impl/mtsender/sender.py index 469b85db..ac49a531 100644 --- a/client/src/telethon/_impl/mtsender/sender.py +++ b/client/src/telethon/_impl/mtsender/sender.py @@ -238,17 +238,20 @@ class Sender: return rx.result() async def step(self) -> None: - if not self._writing: - self._writing = True - await self._do_write() - self._writing = False + try: + if not self._writing: + self._writing = True + await self._do_write() + self._writing = False - if not self._reading: - self._reading = True - await self._do_read() - self._reading = False - else: - await self._step_done.wait() + if not self._reading: + self._reading = True + await self._do_read() + self._reading = False + else: + await self._step_done.wait() + except Exception: + self._ def pop_updates(self) -> list[Updates]: updates = self._updates[:] @@ -322,6 +325,30 @@ class Sender: del self._read_buffer[:n] self._process_mtp_buffer() + def _on_error(self, error: Exception): + logging.info(f"Handling error: {error}") + self._transport.reset() + self._mtp.reset() + logging.info( + "Resetting sender state from read_buffer {}, mtp_buffer {}".format( + len(self._read_buffer), + len(self._mtp_buffer), + ) + ) + self._read_buffer.clear() + self._mtp_buffer.clear() + + # TODO: reset + + logging.warning( + f"marking all {len(self._requests)} request(s) as failed: {error}" + ) + + for req in self._requests: + req.result.set_exception(error) + + raise error + def _process_mtp_buffer(self) -> None: results = self._mtp.deserialize(self._mtp_buffer)