Add error handling in Sender class; reset transport and clear buffers on exception

This commit is contained in:
Jahongir Qurbonov 2025-06-01 20:17:35 +05:00
parent 05af28d5e1
commit 66e9b53791
No known key found for this signature in database
GPG Key ID: 256976CED13D5F2D

View File

@ -238,17 +238,20 @@ class Sender:
return rx.result() return rx.result()
async def step(self) -> None: async def step(self) -> None:
if not self._writing: try:
self._writing = True if not self._writing:
await self._do_write() self._writing = True
self._writing = False await self._do_write()
self._writing = False
if not self._reading: if not self._reading:
self._reading = True self._reading = True
await self._do_read() await self._do_read()
self._reading = False self._reading = False
else: else:
await self._step_done.wait() await self._step_done.wait()
except Exception:
self._
def pop_updates(self) -> list[Updates]: def pop_updates(self) -> list[Updates]:
updates = self._updates[:] updates = self._updates[:]
@ -322,6 +325,30 @@ class Sender:
del self._read_buffer[:n] del self._read_buffer[:n]
self._process_mtp_buffer() self._process_mtp_buffer()
def _on_error(self, error: Exception):
logging.info(f"Handling error: {error}")
self._transport.reset()
self._mtp.reset()
logging.info(
"Resetting sender state from read_buffer {}, mtp_buffer {}".format(
len(self._read_buffer),
len(self._mtp_buffer),
)
)
self._read_buffer.clear()
self._mtp_buffer.clear()
# TODO: reset
logging.warning(
f"marking all {len(self._requests)} request(s) as failed: {error}"
)
for req in self._requests:
req.result.set_exception(error)
raise error
def _process_mtp_buffer(self) -> None: def _process_mtp_buffer(self) -> None:
results = self._mtp.deserialize(self._mtp_buffer) results = self._mtp.deserialize(self._mtp_buffer)