diff --git a/readthedocs/misc/v2-migration-guide.rst b/readthedocs/misc/v2-migration-guide.rst index f5e5fe90..a2c32f70 100644 --- a/readthedocs/misc/v2-migration-guide.rst +++ b/readthedocs/misc/v2-migration-guide.rst @@ -117,6 +117,7 @@ they no longer make sense: And the following, which were inherited from ``MemorySession``: * ``delete``. You can ``os.remove`` the file instead (preferably after ``client.log_out()``). + ``client.log_out()`` also no longer deletes the session file (it can't as there's no method). * ``set_dc``. * ``dc_id``. * ``server_address``. diff --git a/telethon/_client/auth.py b/telethon/_client/auth.py index 05dd05c3..296656f7 100644 --- a/telethon/_client/auth.py +++ b/telethon/_client/auth.py @@ -372,7 +372,6 @@ async def log_out(self: 'TelegramClient') -> bool: self._state_cache.reset() await self.disconnect() - self.session.delete() return True async def edit_2fa( diff --git a/telethon/_client/downloads.py b/telethon/_client/downloads.py index 9ae1aec8..99932cdc 100644 --- a/telethon/_client/downloads.py +++ b/telethon/_client/downloads.py @@ -39,26 +39,17 @@ class _DirectDownloadIter(requestiter.RequestIter): self._msg_data = msg_data self._timed_out = False - self._exported = dc_id and self.client.session.dc_id != dc_id + # TODO should cache current session state + state = await self.client.session.get_state() + + self._exported = dc_id and state.dc_id != dc_id if not self._exported: # The used sender will also change if ``FileMigrateError`` occurs self._sender = self.client._sender else: - try: - self._sender = await self.client._borrow_exported_sender(dc_id) - except errors.DcIdInvalidError: - # Can't export a sender for the ID we are currently in - config = await self.client(_tl.fn.help.GetConfig()) - for option in config.dc_options: - if option.ip_address == self.client.session.server_address: - self.client.session.set_dc( - option.id, option.ip_address, option.port) - self.client.session.save() - break - - # TODO Figure out why the session may have the wrong DC ID - self._sender = self.client._sender - self._exported = False + # If this raises DcIdInvalidError, it means we tried exporting the same DC we're in. + # This should not happen, but if it does, it's a bug. + self._sender = await self.client._borrow_exported_sender(dc_id) async def _load_next_chunk(self): cur = await self._request() diff --git a/telethon/_client/messages.py b/telethon/_client/messages.py index e5290742..69f00a0c 100644 --- a/telethon/_client/messages.py +++ b/telethon/_client/messages.py @@ -589,7 +589,10 @@ async def edit_message( ) # Invoke `messages.editInlineBotMessage` from the right datacenter. # Otherwise, Telegram will error with `MESSAGE_ID_INVALID` and do nothing. - exported = self.session.dc_id != entity.dc_id + # TODO should cache current session state + state = await self.session.get_state() + + exported = state.dc_id != entity.dc_id if exported: try: sender = await self._borrow_exported_sender(entity.dc_id) diff --git a/telethon/_client/telegrambaseclient.py b/telethon/_client/telegrambaseclient.py index f9541e44..1050c2a5 100644 --- a/telethon/_client/telegrambaseclient.py +++ b/telethon/_client/telegrambaseclient.py @@ -6,12 +6,14 @@ import logging import platform import time import typing +import ipaddress from .. import version, helpers, __name__ as __base_name__, _tl from .._crypto import rsa from .._misc import markdown, entitycache, statecache, enums from .._network import MTProtoSender, Connection, ConnectionTcpFull, connection as conns from ..sessions import Session, SQLiteSession, MemorySession +from ..sessions.types import DataCenter, SessionState DEFAULT_DC_ID = 2 DEFAULT_IPV4_IP = '149.154.167.51' @@ -129,15 +131,6 @@ def init( 'The given session must be a str or a Session instance.' ) - # ':' in session.server_address is True if it's an IPv6 address - if (not session.server_address or - (':' in session.server_address) != use_ipv6): - session.set_dc( - DEFAULT_DC_ID, - DEFAULT_IPV6_IP if self._use_ipv6 else DEFAULT_IPV4_IP, - DEFAULT_PORT - ) - self.flood_sleep_threshold = flood_sleep_threshold # TODO Use AsyncClassWrapper(session) @@ -230,13 +223,11 @@ def init( ) self._sender = MTProtoSender( - self.session.auth_key, loggers=self._log, retries=self._connection_retries, delay=self._retry_delay, auto_reconnect=self._auto_reconnect, connect_timeout=self._timeout, - auth_key_callback=self._auth_key_callback, update_callback=self._handle_update, auto_reconnect_callback=self._handle_auto_reconnect ) @@ -264,11 +255,6 @@ def init( self._authorized = None # None = unknown, False = no, True = yes - # Update state (for catching up after a disconnection) - # TODO Get state from channels too - self._state_cache = statecache.StateCache( - self.session.get_update_state(0), self._log) - # Some further state for subclasses self._event_builders = [] @@ -310,10 +296,33 @@ def set_flood_sleep_threshold(self, value): async def connect(self: 'TelegramClient') -> None: + all_dc = await self.session.get_all_dc() + state = await self.session.get_state() + + dc = None + if state: + for d in all_dc: + if d.id == state.dc_id: + dc = d + break + + if dc is None: + dc = DataCenter( + id=DEFAULT_DC_ID, + ipv4=None if self._use_ipv6 else int(ipaddress.ip_address(DEFAULT_IPV4_IP)), + ipv6=int(ipaddress.ip_address(DEFAULT_IPV6_IP)) if self._use_ipv6 else None, + port=DEFAULT_PORT, + auth=b'', + ) + + # Update state (for catching up after a disconnection) + # TODO Get state from channels too + self._state_cache = statecache.StateCache(state, self._log) + if not await self._sender.connect(self._connection( - self.session.server_address, - self.session.port, - self.session.dc_id, + str(ipaddress.ip_address(dc.ipv6 or dc.ipv4)), + dc.port, + dc.id, loggers=self._log, proxy=self._proxy, local_addr=self._local_addr @@ -321,8 +330,10 @@ async def connect(self: 'TelegramClient') -> None: # We don't want to init or modify anything if we were already connected return - self.session.auth_key = self._sender.auth_key - self.session.save() + if self._sender.auth_key.key != dc.key: + dc.key = self._sender.auth_key.key + await self.session.insert_dc(dc) + await self.session.save() self._init_request.query = _tl.fn.help.GetConfig() @@ -388,15 +399,12 @@ async def _disconnect_coro(self: 'TelegramClient'): pts, date = self._state_cache[None] if pts and date: - self.session.set_update_state(0, _tl.updates.State( - pts=pts, - qts=0, - date=date, - seq=0, - unread_count=0 - )) - - self.session.close() + state = await self.session.get_state() + if state: + state.pts = pts + state.date = date + await self.session.set_state(state) + await self.session.save() async def _disconnect(self: 'TelegramClient'): """ @@ -414,31 +422,59 @@ async def _switch_dc(self: 'TelegramClient', new_dc): Permanently switches the current connection to the new data center. """ self._log[__name__].info('Reconnecting to new data center %s', new_dc) - dc = await _get_dc(self, new_dc) + dc = await _refresh_and_get_dc(self, new_dc) + + state = await self.session.get_state() + if state is None: + state = SessionState( + user_id=0, + dc_id=dc.id, + bot=False, + pts=0, + qts=0, + date=0, + seq=0, + takeout_id=None, + ) + else: + state.dc_id = dc.id + + await self.session.set_state(dc.id) + await self.session.save() - self.session.set_dc(dc.id, dc.ip_address, dc.port) - # auth_key's are associated with a server, which has now changed - # so it's not valid anymore. Set to None to force recreating it. - self._sender.auth_key.key = None - self.session.auth_key = None - self.session.save() await _disconnect(self) return await self.connect() -def _auth_key_callback(self: 'TelegramClient', auth_key): - """ - Callback from the sender whenever it needed to generate a - new authorization key. This means we are not authorized. - """ - self.session.auth_key = auth_key - self.session.save() +async def _refresh_and_get_dc(self: 'TelegramClient', dc_id): + """ + Gets the Data Center (DC) associated to `dc_id`. -async def _get_dc(self: 'TelegramClient', dc_id): - """Gets the Data Center (DC) associated to 'dc_id'""" + Also take this opportunity to refresh the addresses stored in the session if needed. + """ cls = self.__class__ if not cls._config: cls._config = await self(_tl.fn.help.GetConfig()) + all_dc = {dc.id: dc for dc in await self.session.get_all_dc()} + for dc in cls._config.dc_options: + if dc.media_only or dc.tcpo_only or dc.cdn: + continue + + ip = int(ipaddress.ip_address(dc.ip_address)) + if dc.id in all_dc: + all_dc[dc.id].port = dc.port + if dc.ipv6: + all_dc[dc.id].ipv6 = ip + else: + all_dc[dc.id].ipv4 = ip + elif dc.ipv6: + all_dc[dc.id] = DataCenter(dc.id, None, ip, dc.port, b'') + else: + all_dc[dc.id] = DataCenter(dc.id, ip, None, dc.port, b'') + + for dc in all_dc.values(): + await self.session.insert_dc(dc) + await self.session.save() try: return next( @@ -463,12 +499,12 @@ async def _create_exported_sender(self: 'TelegramClient', dc_id): """ # Thanks badoualy/kotlogram on /telegram/api/DefaultTelegramClient.kt # for clearly showing how to export the authorization - dc = await _get_dc(self, dc_id) + dc = await _refresh_and_get_dc(self, dc_id) # Can't reuse self._sender._connection as it has its own seqno. # # If one were to do that, Telegram would reset the connection # with no further clues. - sender = MTProtoSender(None, loggers=self._log) + sender = MTProtoSender(loggers=self._log) await sender.connect(self._connection( dc.ip_address, dc.port, @@ -503,7 +539,7 @@ async def _borrow_exported_sender(self: 'TelegramClient', dc_id): self._borrowed_senders[dc_id] = (state, sender) elif state.need_connect(): - dc = await _get_dc(self, dc_id) + dc = await _refresh_and_get_dc(self, dc_id) await sender.connect(self._connection( dc.ip_address, dc.port, diff --git a/telethon/_client/telegramclient.py b/telethon/_client/telegramclient.py index 3d2360ad..e5dc6f8f 100644 --- a/telethon/_client/telegramclient.py +++ b/telethon/_client/telegramclient.py @@ -206,8 +206,7 @@ class TelegramClient: it's `True` then the takeout will be finished, and if no exception occurred during it, then `True` will be considered as a result. Otherwise, the takeout will not be finished and its ID will be - preserved for future usage as `client.session.takeout_id - `. + preserved for future usage in the session. Arguments finalize (`bool`): @@ -3599,9 +3598,6 @@ class TelegramClient: async def _clean_exported_senders(self: 'TelegramClient'): return await telegrambaseclient._clean_exported_senders(**locals()) - def _auth_key_callback(self: 'TelegramClient', auth_key): - return telegrambaseclient._auth_key_callback - def _handle_update(self: 'TelegramClient', update): return updates._handle_update(**locals()) diff --git a/telethon/_client/updates.py b/telethon/_client/updates.py index b319f8e2..0ae8b299 100644 --- a/telethon/_client/updates.py +++ b/telethon/_client/updates.py @@ -79,7 +79,7 @@ async def catch_up(self: 'TelegramClient'): if not pts: return - self.session.catching_up = True + self._catching_up = True try: while True: d = await self(_tl.fn.updates.GetDifference( @@ -129,16 +129,13 @@ async def catch_up(self: 'TelegramClient'): finally: # TODO Save new pts to session self._state_cache._pts_date = (pts, date) - self.session.catching_up = False + self._catching_up = False # It is important to not make _handle_update async because we rely on # the order that the updates arrive in to update the pts and date to # be always-increasing. There is also no need to make this async. def _handle_update(self: 'TelegramClient', update): - self.session.process_entities(update) - self._entity_cache.add(update) - if isinstance(update, (_tl.Updates, _tl.UpdatesCombined)): entities = {utils.get_peer_id(x): x for x in itertools.chain(update.users, update.chats)} @@ -203,11 +200,10 @@ async def _update_loop(self: 'TelegramClient'): except (ConnectionError, asyncio.CancelledError): return - # Entities and cached files are not saved when they are - # inserted because this is a rather expensive operation - # (default's sqlite3 takes ~0.1s to commit changes). Do - # it every minute instead. No-op if there's nothing new. - self.session.save() + # Entities are not saved when they are inserted because this is a rather expensive + # operation (default's sqlite3 takes ~0.1s to commit changes). Do it every minute + # instead. No-op if there's nothing new. + await self.session.save() # We need to send some content-related request at least hourly # for Telegram to keep delivering updates, otherwise they will @@ -232,6 +228,10 @@ async def _dispatch_queue_updates(self: 'TelegramClient'): self._dispatching_updates_queue.clear() async def _dispatch_update(self: 'TelegramClient', update, others, channel_id, pts_date): + entities = self._entity_cache.add(list((update._entities or {}).values())) + if entities: + await self.session.insert_entities(entities) + if not self._entity_cache.ensure_cached(update): # We could add a lock to not fetch the same pts twice if we are # already fetching it. However this does not happen in practice, diff --git a/telethon/_client/users.py b/telethon/_client/users.py index 72786fe5..394baee9 100644 --- a/telethon/_client/users.py +++ b/telethon/_client/users.py @@ -7,6 +7,7 @@ import typing from .. import errors, hints, _tl from .._misc import helpers, utils from ..errors import MultiError, RPCError +from ..sessions.types import Entity _NOT_A_REQUEST = lambda: TypeError('You can only invoke requests, not types!') @@ -70,8 +71,9 @@ async def _call(self: 'TelegramClient', sender, request, ordered=False, flood_sl exceptions.append(e) results.append(None) continue - self.session.process_entities(result) - self._entity_cache.add(result) + entities = self._entity_cache.add(result) + if entities: + await self.session.insert_entities(entities) exceptions.append(None) results.append(result) request_index += 1 @@ -81,8 +83,9 @@ async def _call(self: 'TelegramClient', sender, request, ordered=False, flood_sl return results else: result = await future - self.session.process_entities(result) - self._entity_cache.add(result) + entities = self._entity_cache.add(result) + if entities: + await self.session.insert_entities(entities) return result except (errors.ServerError, errors.RpcCallFailError, errors.RpcMcgetFailError, errors.InterdcCallErrorError, @@ -266,9 +269,19 @@ async def get_input_entity( if peer in ('me', 'self'): return _tl.InputPeerSelf() - # No InputPeer, cached peer, or known string. Fetch from disk cache + # No InputPeer, cached peer, or known string. Fetch from session cache try: - return self.session.get_input_entity(peer) + peer = utils.get_peer(peer) + if isinstance(peer, _tl.PeerUser): + entity = await self.session.get_entity(Entity.USER, peer.user_id) + if entity: + return _tl.InputPeerUser(entity.id, entity.access_hash) + elif isinstance(peer, _tl.PeerChat): + return _tl.InputPeerChat(peer.chat_id) + elif isinstance(peer, _tl.PeerChannel): + entity = await self.session.get_entity(Entity.CHANNEL, peer.user_id) + if entity: + return _tl.InputPeerChannel(entity.id, entity.access_hash) except ValueError: pass @@ -387,12 +400,6 @@ async def _get_entity_from_string(self: 'TelegramClient', string): return next(x for x in result.chats if x.id == pid) except StopIteration: pass - try: - # Nobody with this username, maybe it's an exact name/title - return await self.get_entity( - self.session.get_input_entity(string)) - except ValueError: - pass raise ValueError( 'Cannot find any entity corresponding to "{}"'.format(string) diff --git a/telethon/_misc/entitycache.py b/telethon/_misc/entitycache.py index a191dc6f..685aa411 100644 --- a/telethon/_misc/entitycache.py +++ b/telethon/_misc/entitycache.py @@ -3,6 +3,7 @@ import itertools from .._misc import utils from .. import _tl +from ..sessions.types import Entity # Which updates have the following fields? _has_field = { @@ -51,27 +52,60 @@ class EntityCache: """ In-memory input entity cache, defaultdict-like behaviour. """ - def add(self, entities): + def add(self, entities, _mappings={ + _tl.User.CONSTRUCTOR_ID: lambda e: (Entity.BOT if e.bot else Entity.USER, e.id, e.access_hash), + _tl.UserFull.CONSTRUCTOR_ID: lambda e: (Entity.BOT if e.user.bot else Entity.USER, e.user.id, e.user.access_hash), + _tl.Chat.CONSTRUCTOR_ID: lambda e: (Entity.GROUP, e.id, 0), + _tl.ChatFull.CONSTRUCTOR_ID: lambda e: (Entity.GROUP, e.id, 0), + _tl.ChatEmpty.CONSTRUCTOR_ID: lambda e: (Entity.GROUP, e.id, 0), + _tl.ChatForbidden.CONSTRUCTOR_ID: lambda e: (Entity.GROUP, e.id, 0), + _tl.Channel.CONSTRUCTOR_ID: lambda e: ( + Entity.MEGAGROUP if e.megagroup else (Entity.GIGAGROUP if e.gigagroup else Entity.CHANNEL), + e.id, + e.access_hash, + ), + _tl.ChannelForbidden.CONSTRUCTOR_ID: lambda e: (Entity.MEGAGROUP if e.megagroup else Entity.CHANNEL, e.id, e.access_hash), + }): """ Adds the given entities to the cache, if they weren't saved before. + + Returns a list of Entity that can be saved in the session. """ if not utils.is_list_like(entities): # Invariant: all "chats" and "users" are always iterables, - # and "user" never is (so we wrap it inside a list). + # and "user" and "chat" never are (so we wrap them inside a list). + # + # Itself may be already the entity we want to cache. entities = itertools.chain( + [entities], getattr(entities, 'chats', []), getattr(entities, 'users', []), - (hasattr(entities, 'user') and [entities.user]) or [] + (hasattr(entities, 'user') and [entities.user]) or [], + (hasattr(entities, 'chat') and [entities.user]) or [], ) - for entity in entities: + rows = [] + for e in entities: try: - pid = utils.get_peer_id(entity) - if pid not in self.__dict__: - # Note: `get_input_peer` already checks for `access_hash` - self.__dict__[pid] = utils.get_input_peer(entity) - except TypeError: - pass + mapper = _mappings[e.CONSTRUCTOR_ID] + except (AttributeError, KeyError): + continue + + ty, id, access_hash = mapper(e) + + # Need to check for non-zero access hash unless it's a group (#354 and #392). + # Also check it's not `min` (`access_hash` usage is limited since layer 102). + if not getattr(e, 'min', False) and (access_hash or ty == Entity.GROUP): + rows.append(Entity(ty, id, access_hash)) + if id not in self.__dict__: + if ty in (Entity.USER, Entity.BOT): + self.__dict__[id] = _tl.InputPeerUser(id, access_hash) + elif ty in (Entity.GROUP): + self.__dict__[id] = _tl.InputPeerChat(id) + elif ty in (Entity.CHANNEL, Entity.MEGAGROUP, Entity.GIGAGROUP): + self.__dict__[id] = _tl.InputPeerChannel(id, access_hash) + + return rows def __getitem__(self, item): """ diff --git a/telethon/_network/mtprotosender.py b/telethon/_network/mtprotosender.py index 9a873fc8..16bd6e32 100644 --- a/telethon/_network/mtprotosender.py +++ b/telethon/_network/mtprotosender.py @@ -34,9 +34,8 @@ class MTProtoSender: A new authorization key will be generated on connection if no other key exists yet. """ - def __init__(self, auth_key, *, loggers, + def __init__(self, *, loggers, retries=5, delay=1, auto_reconnect=True, connect_timeout=None, - auth_key_callback=None, update_callback=None, auto_reconnect_callback=None): self._connection = None self._loggers = loggers @@ -45,7 +44,6 @@ class MTProtoSender: self._delay = delay self._auto_reconnect = auto_reconnect self._connect_timeout = connect_timeout - self._auth_key_callback = auth_key_callback self._update_callback = update_callback self._auto_reconnect_callback = auto_reconnect_callback self._connect_lock = asyncio.Lock() @@ -67,7 +65,7 @@ class MTProtoSender: self._recv_loop_handle = None # Preserving the references of the AuthKey and state is important - self.auth_key = auth_key or AuthKey(None) + self.auth_key = AuthKey(None) self._state = MTProtoState(self.auth_key, loggers=self._loggers) # Outgoing messages are put in a queue and sent in a batch. @@ -283,13 +281,6 @@ class MTProtoSender: self.auth_key.key, self._state.time_offset = \ await authenticator.do_authentication(plain) - # This is *EXTREMELY* important since we don't control - # external references to the authorization key, we must - # notify whenever we change it. This is crucial when we - # switch to different data centers. - if self._auth_key_callback: - self._auth_key_callback(self.auth_key) - self._log.debug('auth_key generation success!') return True except (SecurityError, AssertionError) as e: @@ -372,8 +363,6 @@ class MTProtoSender: if isinstance(e, InvalidBufferError) and e.code == 404: self._log.info('Broken authorization key; resetting') self.auth_key.key = None - if self._auth_key_callback: - self._auth_key_callback(None) ok = False break @@ -516,8 +505,6 @@ class MTProtoSender: if isinstance(e, InvalidBufferError) and e.code == 404: self._log.info('Broken authorization key; resetting') self.auth_key.key = None - if self._auth_key_callback: - self._auth_key_callback(None) await self._disconnect(error=e) else: