Call catch_up on reconnect (WIP for #1125)

This commit is contained in:
Lonami Exo 2019-03-28 12:32:02 +01:00
parent ad963fd23e
commit 5377169db2
3 changed files with 104 additions and 27 deletions

View File

@ -378,8 +378,8 @@ class AuthMethods(MessageParseMethods, UserMethods):
# By setting state.pts = 1 after logging in, the user or bot can # By setting state.pts = 1 after logging in, the user or bot can
# `catch_up` on all updates (and obtain necessary access hashes) # `catch_up` on all updates (and obtain necessary access hashes)
# if they desire. The date parameter is ignored when pts = 1. # if they desire. The date parameter is ignored when pts = 1.
self._state.pts = 1 self._old_state = types.updates.State(
self._state.date = datetime.datetime.now(tz=datetime.timezone.utc) 1, 0, datetime.datetime.now(tz=datetime.timezone.utc), 0, 0)
return user return user
@ -437,8 +437,8 @@ class AuthMethods(MessageParseMethods, UserMethods):
self._bot = None self._bot = None
self._self_input_peer = None self._self_input_peer = None
self._authorized = False self._authorized = False
self._state = types.updates.State( self._old_state = None
0, 0, datetime.datetime.now(tz=datetime.timezone.utc), 0, 0) self._new_state = None
await self.disconnect() await self.disconnect()
self.session.delete() self.session.delete()

View File

@ -304,10 +304,17 @@ class TelegramBaseClient(abc.ABC):
self._dispatching_updates_queue = None self._dispatching_updates_queue = None
self._authorized = None # None = unknown, False = no, True = yes self._authorized = None # None = unknown, False = no, True = yes
self._state = self.session.get_update_state(0)
if not self._state: # Update state (for catching up after a disconnection)
self._state = types.updates.State( self._old_state = self.session.get_update_state(0)
0, 0, datetime.now(tz=timezone.utc), 0, 0) self._new_state = None
# If we catch up, while we don't get disconnected,
# the old state will be the same as the new one.
#
# If we do get disconnected, then the old and new
# state may differ.
self._old_state_is_new = False
# Some further state for subclasses # Some further state for subclasses
self._event_builders = [] self._event_builders = []
@ -389,7 +396,13 @@ class TelegramBaseClient(abc.ABC):
async def _disconnect_coro(self): async def _disconnect_coro(self):
await self._disconnect() await self._disconnect()
self.session.set_update_state(0, self._state)
# If we disconnect, the old state is the last one we are aware of
self._old_state_is_new = True
if self._new_state:
self.session.set_update_state(0, self._new_state)
self.session.close() self.session.close()
async def _disconnect(self): async def _disconnect(self):

View File

@ -135,10 +135,19 @@ class UpdateMethods(UserMethods):
This can also be used to forcibly fetch new updates if there are any. This can also be used to forcibly fetch new updates if there are any.
""" """
state = self._state state = self._new_state if self._old_state_is_new else self._old_state
if state.pts == 0: if not self._old_state_is_new and self._new_state:
# pts = 0 is invalid, pts = 1 will catch up since the beginning max_pts = self._new_state.pts
state.pts = 1 else:
max_pts = float('inf')
print('catching up since', state, 'up to', max_pts)
# No known state -> catch up since the beginning (date is ignored).
# Note: pts = 0 is invalid (and so is no date/unix timestamp = 0).
if not state:
state = types.updates.State(
1, 0, datetime.datetime.now(tz=datetime.timezone.utc), 0, 0)
self.session.catching_up = True self.session.catching_up = True
try: try:
@ -163,6 +172,22 @@ class UpdateMethods(UserMethods):
for m in d.new_messages for m in d.new_messages
] ]
)) ))
# We don't want to fetch updates we already know about.
#
# We may still get duplicates because the Difference
# contains a lot of updates and presumably only has
# the state for the last one, but at least we don't
# unnecessarily fetch too many.
#
# updates.getDifference's pts_total_limit seems to mean
# "how many pts is the request allowed to return", and
# if there is more than that, it returns "too long" (so
# there would be duplicate updates since we know about
# some). This can be used to detect collisions (i.e.
# it would return an update we have already seen).
if state.pts >= max_pts:
break
else: else:
if isinstance(d, types.updates.DifferenceEmpty): if isinstance(d, types.updates.DifferenceEmpty):
state.date = d.date state.date = d.date
@ -171,7 +196,9 @@ class UpdateMethods(UserMethods):
state.pts = d.pts state.pts = d.pts
break break
finally: finally:
self._state = state self._old_state = None
self._new_state = state
self._old_state_is_new = True
self.session.set_update_state(0, state) self.session.set_update_state(0, state)
self.session.catching_up = False self.session.catching_up = False
@ -201,17 +228,26 @@ class UpdateMethods(UserMethods):
self._dispatching_updates_queue.set() self._dispatching_updates_queue.set()
self._loop.create_task(self._dispatch_queue_updates()) self._loop.create_task(self._dispatch_queue_updates())
need_diff = False
if hasattr(update, 'pts') and update.pts is not None:
if self._state.pts and (update.pts - self._state.pts) > 1:
need_diff = True
self._state.pts = update.pts
if hasattr(update, 'date'):
self._state.date = update.date
if hasattr(update, 'seq'):
self._state.seq = update.seq
# TODO make use of need_diff # TODO make use of need_diff
need_diff = False
if getattr(update, 'pts', None):
if not self._new_state:
self._new_state = types.updates.State(
update.pts,
0,
getattr(update, 'date', datetime.datetime.now(tz=datetime.timezone.utc)),
getattr(update, 'seq', 0),
0
)
else:
if self._new_state.pts and (update.pts - self._new_state.pts) > 1:
need_diff = True
self._new_state.pts = update.pts
if hasattr(update, 'date'):
self._new_state.date = update.date
if hasattr(update, 'seq'):
self._new_state.seq = update.seq
async def _update_loop(self): async def _update_loop(self):
# Pings' ID don't really need to be secure, just "random" # Pings' ID don't really need to be secure, just "random"
@ -307,10 +343,38 @@ class UpdateMethods(UserMethods):
try: try:
self._log[__name__].info( self._log[__name__].info(
'Asking for the current state after reconnect...') 'Asking for the current state after reconnect...')
state = await self(functions.updates.GetStateRequest())
self._log[__name__].info('Got new state! %s', state) # TODO consider:
# If there aren't many updates while the client is disconnected
# (I tried with up to 20), Telegram seems to send them without
# asking for them (via updates.getDifference).
#
# On disconnection, the library should probably set a "need
# difference" or "catching up" flag so that any new updates are
# ignored, and then the library should call updates.getDifference
# itself to fetch them.
#
# In any case (either there are too many updates and Telegram
# didn't send them, or there isn't a lot and Telegram sent them
# but we dropped them), we fetch the new difference to get all
# missed updates. I feel like this would be the best solution.
# If a disconnection occurs, the old known state will be
# the latest one we were aware of, so we can catch up since
# the most recent state we were aware of.
# TODO Ideally we set _old_state = _new_state *on* disconnect,
# not *after* we managed to reconnect since perhaps an update
# arrives just before we can get started.
self._old_state_is_new = True
await self.catch_up()
self._log[__name__].info('Successfully fetched missed updates')
except errors.RPCError as e: except errors.RPCError as e:
self._log[__name__].info('Failed to get current state: %r', e) self._log[__name__].warning('Failed to get missed updates after '
'reconnect: %r', e)
except Exception:
self._log[__name__].exception('Unhandled exception while getting '
'update difference after reconnect')
# endregion # endregion