Document the network/ module

This commit is contained in:
Lonami Exo
2017-11-30 13:20:51 +01:00
parent 7509ba9067
commit 9046b46fcd
5 changed files with 300 additions and 46 deletions

View File

@@ -1,3 +1,7 @@
"""
This module contains the class used to communicate with Telegram's servers
encrypting every packet, and relies on a valid AuthKey in the used Session.
"""
import gzip
import logging
import struct
@@ -31,8 +35,14 @@ class MtProtoSender:
"""
def __init__(self, session, connection):
"""Creates a new MtProtoSender configured to send messages through
'connection' and using the parameters from 'session'.
"""
Initializes a new MTProto sender.
:param session:
the Session to be used with this sender. Must contain the IP and
port of the server, salt, ID, and AuthKey,
:param connection:
the Connection to be used.
"""
self.session = session
self.connection = connection
@@ -45,28 +55,36 @@ class MtProtoSender:
self._pending_receive = {}
def connect(self):
"""Connects to the server"""
"""Connects to the server."""
self.connection.connect(self.session.server_address, self.session.port)
def is_connected(self):
"""
Determines whether the sender is connected or not.
:return: true if the sender is connected.
"""
return self.connection.is_connected()
def disconnect(self):
"""Disconnects from the server"""
"""Disconnects from the server."""
self.connection.close()
self._need_confirmation.clear()
self._clear_all_pending()
def clone(self):
"""Creates a copy of this MtProtoSender as a new connection"""
"""Creates a copy of this MtProtoSender as a new connection."""
return MtProtoSender(self.session, self.connection.clone())
# region Send and receive
def send(self, *requests):
"""Sends the specified MTProtoRequest, previously sending any message
which needed confirmation."""
"""
Sends the specified TLObject(s) (which must be requests),
and acknowledging any message which needed confirmation.
:param requests: the requests to be sent.
"""
# Finally send our packed request(s)
messages = [TLMessage(self.session, r) for r in requests]
self._pending_receive.update({m.msg_id: m for m in messages})
@@ -91,18 +109,23 @@ class MtProtoSender:
self._send_message(message)
def _send_acknowledge(self, msg_id):
"""Sends a message acknowledge for the given msg_id"""
"""Sends a message acknowledge for the given msg_id."""
self._send_message(TLMessage(self.session, MsgsAck([msg_id])))
def receive(self, update_state):
"""Receives a single message from the connected endpoint.
"""
Receives a single message from the connected endpoint.
This method returns nothing, and will only affect other parts
of the MtProtoSender such as the updates callback being fired
or a pending request being confirmed.
This method returns nothing, and will only affect other parts
of the MtProtoSender such as the updates callback being fired
or a pending request being confirmed.
Any unhandled object (likely updates) will be passed to
update_state.process(TLObject).
Any unhandled object (likely updates) will be passed to
update_state.process(TLObject).
:param update_state:
the UpdateState that will process all the received
Update and Updates objects.
"""
try:
body = self.connection.recv()
@@ -126,8 +149,11 @@ class MtProtoSender:
# region Low level processing
def _send_message(self, message):
"""Sends the given Message(TLObject) encrypted through the network"""
"""
Sends the given encrypted through the network.
:param message: the TLMessage to be sent.
"""
plain_text = \
struct.pack('<QQ', self.session.salt, self.session.id) \
+ bytes(message)
@@ -141,7 +167,12 @@ class MtProtoSender:
self.connection.send(result)
def _decode_msg(self, body):
"""Decodes an received encrypted message body bytes"""
"""
Decodes the body of the payload received from the network.
:param body: the body to be decoded.
:return: a tuple of (decoded message, remote message id, remote seq).
"""
message = None
remote_msg_id = None
remote_sequence = None
@@ -172,12 +203,15 @@ class MtProtoSender:
return message, remote_msg_id, remote_sequence
def _process_msg(self, msg_id, sequence, reader, state):
"""Processes and handles a Telegram message.
Returns True if the message was handled correctly and doesn't
need to be skipped. Returns False otherwise.
"""
Processes the message read from the network inside reader.
:param msg_id: the ID of the message.
:param sequence: the sequence of the message.
:param reader: the BinaryReader that contains the message.
:param state: the current UpdateState.
:return: true if the message was handled correctly, false otherwise.
"""
# TODO Check salt, session_id and sequence_number
self._need_confirmation.add(msg_id)
@@ -249,24 +283,34 @@ class MtProtoSender:
# region Message handling
def _pop_request(self, msg_id):
"""Pops a pending REQUEST from self._pending_receive, or
returns None if it's not found.
"""
Pops a pending **request** from self._pending_receive.
:param msg_id: the ID of the message that belongs to the request.
:return: the request, or None if it wasn't found.
"""
message = self._pending_receive.pop(msg_id, None)
if message:
return message.request
def _pop_request_of_type(self, msg_id, t):
"""Pops a pending REQUEST from self._pending_receive if it matches
the given type, or returns None if it's not found/doesn't match.
"""
Pops a pending **request** from self._pending_receive.
:param msg_id: the ID of the message that belongs to the request.
:param t: the type of the desired request.
:return: the request matching the type t, or None if it wasn't found.
"""
message = self._pending_receive.get(msg_id, None)
if message and isinstance(message.request, t):
return self._pending_receive.pop(msg_id).request
def _pop_requests_of_container(self, container_msg_id):
"""Pops the pending requests (plural) from self._pending_receive if
they were sent on a container that matches container_msg_id.
"""
Pops pending **requests** from self._pending_receive.
:param container_msg_id: the ID of the container.
:return: the requests that belong to the given container. May be empty.
"""
msgs = [msg for msg in self._pending_receive.values()
if msg.container_msg_id == container_msg_id]
@@ -277,13 +321,19 @@ class MtProtoSender:
return requests
def _clear_all_pending(self):
"""
Clears all pending requests, and flags them all as received.
"""
for r in self._pending_receive.values():
r.request.confirm_received.set()
self._pending_receive.clear()
def _resend_request(self, msg_id):
"""Re-sends the request that belongs to a certain msg_id. This may
also be the msg_id of a container if they were sent in one.
"""
Re-sends the request that belongs to a certain msg_id. This may
also be the msg_id of a container if they were sent in one.
:param msg_id: the ID of the request to be resent.
"""
request = self._pop_request(msg_id)
if request:
@@ -293,6 +343,14 @@ class MtProtoSender:
return self.send(*requests)
def _handle_pong(self, msg_id, sequence, reader):
"""
Handles a Pong response.
:param msg_id: the ID of the message.
:param sequence: the sequence of the message.
:param reader: the reader containing the Pong.
:return: true, as it always succeeds.
"""
self._logger.debug('Handling pong')
pong = reader.tgread_object()
assert isinstance(pong, Pong)
@@ -306,6 +364,14 @@ class MtProtoSender:
return True
def _handle_container(self, msg_id, sequence, reader, state):
"""
Handles a MessageContainer response.
:param msg_id: the ID of the message.
:param sequence: the sequence of the message.
:param reader: the reader containing the MessageContainer.
:return: true, as it always succeeds.
"""
self._logger.debug('Handling container')
for inner_msg_id, _, inner_len in MessageContainer.iter_read(reader):
begin_position = reader.tell_position()
@@ -323,6 +389,14 @@ class MtProtoSender:
return True
def _handle_bad_server_salt(self, msg_id, sequence, reader):
"""
Handles a BadServerSalt response.
:param msg_id: the ID of the message.
:param sequence: the sequence of the message.
:param reader: the reader containing the BadServerSalt.
:return: true, as it always succeeds.
"""
self._logger.debug('Handling bad server salt')
bad_salt = reader.tgread_object()
assert isinstance(bad_salt, BadServerSalt)
@@ -339,6 +413,14 @@ class MtProtoSender:
return True
def _handle_bad_msg_notification(self, msg_id, sequence, reader):
"""
Handles a BadMessageError response.
:param msg_id: the ID of the message.
:param sequence: the sequence of the message.
:param reader: the reader containing the BadMessageError.
:return: true, as it always succeeds.
"""
self._logger.debug('Handling bad message notification')
bad_msg = reader.tgread_object()
assert isinstance(bad_msg, BadMsgNotification)
@@ -367,6 +449,14 @@ class MtProtoSender:
raise error
def _handle_msg_detailed_info(self, msg_id, sequence, reader):
"""
Handles a MsgDetailedInfo response.
:param msg_id: the ID of the message.
:param sequence: the sequence of the message.
:param reader: the reader containing the MsgDetailedInfo.
:return: true, as it always succeeds.
"""
msg_new = reader.tgread_object()
assert isinstance(msg_new, MsgDetailedInfo)
@@ -376,6 +466,14 @@ class MtProtoSender:
return True
def _handle_msg_new_detailed_info(self, msg_id, sequence, reader):
"""
Handles a MsgNewDetailedInfo response.
:param msg_id: the ID of the message.
:param sequence: the sequence of the message.
:param reader: the reader containing the MsgNewDetailedInfo.
:return: true, as it always succeeds.
"""
msg_new = reader.tgread_object()
assert isinstance(msg_new, MsgNewDetailedInfo)
@@ -385,12 +483,29 @@ class MtProtoSender:
return True
def _handle_new_session_created(self, msg_id, sequence, reader):
"""
Handles a NewSessionCreated response.
:param msg_id: the ID of the message.
:param sequence: the sequence of the message.
:param reader: the reader containing the NewSessionCreated.
:return: true, as it always succeeds.
"""
new_session = reader.tgread_object()
assert isinstance(new_session, NewSessionCreated)
# TODO https://goo.gl/LMyN7A
return True
def _handle_rpc_result(self, msg_id, sequence, reader):
"""
Handles a RPCResult response.
:param msg_id: the ID of the message.
:param sequence: the sequence of the message.
:param reader: the reader containing the RPCResult.
:return: true if the request ID to which this result belongs is found,
false otherwise (meaning nothing was read).
"""
self._logger.debug('Handling RPC result')
reader.read_int(signed=False) # code
request_id = reader.read_long()
@@ -440,6 +555,14 @@ class MtProtoSender:
return False
def _handle_gzip_packed(self, msg_id, sequence, reader, state):
"""
Handles a GzipPacked response.
:param msg_id: the ID of the message.
:param sequence: the sequence of the message.
:param reader: the reader containing the GzipPacked.
:return: the result of processing the packed message.
"""
self._logger.debug('Handling gzip packed data')
with BinaryReader(GzipPacked.read(reader)) as compressed_reader:
# We are reentering process_msg, which seemingly the same msg_id