Port mtproto from grammers

This commit is contained in:
Lonami Exo
2023-07-09 21:16:55 +02:00
parent 9636ef35c1
commit 269ee4f05f
35 changed files with 1747 additions and 57 deletions

View File

@@ -5,8 +5,8 @@ def get_auth_key() -> AuthKey:
return AuthKey.from_bytes(bytes(range(256)))
def get_new_nonce() -> bytes:
return bytes(range(32))
def get_new_nonce() -> int:
return int.from_bytes(bytes(range(32)))
def test_auth_key_aux_hash() -> None:
@@ -28,7 +28,7 @@ def test_calc_new_nonce_hash1() -> None:
new_nonce = get_new_nonce()
assert (
auth_key.calc_new_nonce_hash(new_nonce, 1)
== b"\xc2\xce\xd2\xb3>Y:U\xd2\x7fJ]\xab\xee|g"
== 258944117842285651226187582903746985063
)
@@ -37,7 +37,7 @@ def test_calc_new_nonce_hash2() -> None:
new_nonce = get_new_nonce()
assert (
auth_key.calc_new_nonce_hash(new_nonce, 2)
== b"\xf41\x8e\x85\xbd/\xf3\xbe\x84\xd9\xfe\xfc\xe3\xdc\xe3\x9f"
== 324588944215647649895949797213421233055
)
@@ -46,5 +46,5 @@ def test_calc_new_nonce_hash3() -> None:
new_nonce = get_new_nonce()
assert (
auth_key.calc_new_nonce_hash(new_nonce, 3)
== b"K\xf9\xd7\xb3}\xb4\x13\xeeC\x1d(Qv1\xcb="
== 100989356540453064705070297823778556733
)

View File

@@ -0,0 +1,64 @@
from contextlib import contextmanager
from typing import Iterator
from rsa import PublicKey
from telethon._impl.crypto.auth_key import AuthKey
from telethon._impl.mtproto.authentication import (
CreatedKey,
_do_step1,
_do_step2,
_do_step3,
create_key,
)
@contextmanager
def old_rsa_keys() -> Iterator[None]:
import telethon._impl.mtproto.authentication
# Temporarily replace the imported global.
orig_rsa = telethon._impl.mtproto.authentication.RSA_KEYS # type: ignore [attr-defined]
try:
telethon._impl.mtproto.authentication.RSA_KEYS = { # type: ignore [attr-defined]
-4344800451088585951: PublicKey(
n=24403446649145068056824081744112065346446136066297307473868293895086332508101251964919587745984311372853053253457835208829824428441874946556659953519213382748319518214765985662663680818277989736779506318868003755216402538945900388706898101286548187286716959100102939636333452457308619454821845196109544157601096359148241435922125602449263164512290854366930013825808102403072317738266383237191313714482187326643144603633877219028262697593882410403273959074350849923041765639673335775605842311578109726403165298875058941765362622936097839775380070572921007586266115476975819175319995527916042178582540628652481530373407,
e=65537,
)
}
yield
finally:
telethon._impl.mtproto.authentication.RSA_KEYS = orig_rsa # type: ignore [attr-defined]
def test_successful_auth_key_gen_flow() -> None:
step1_random = b"\x86\xd4%\xe6F\r\xe2\xa0H&3\x11_\x8fw\xf1"
step1_request = b"\xf1\x8e~\xbe\x86\xd4%\xe6F\r\xe2\xa0H&3\x11_\x8fw\xf1"
step1_response = b"c$\x16\x05\x86\xd4%\xe6F\r\xe2\xa0H&3\x11_\x8fw\xf1\xe4\xb1\xfeR+vIQh\x91t#W\xc9j\x1a\x08 \xcd<\xb0X{\xddq\x00\x00\x00\x15\xc4\xb5\x1c\x02\x00\x00\x00\x02\x9fK\xa1m\x10\x92\x96!k\xe8l\x02+\xb4\xc3"
step2_random = b"\xc3\x94\x9bxu\xf2\xf6\x13\xadn\x01\x12%\x85\x1f\x84u\xa2\xe2{\x06\xd4~\xec\x94v(\xba\x15\xaazX{\xfe.\xd2\x92\x0b\xa6\xcd\xe7\x92\x84\x87\xcc\xf57\xee;\x9f\xc9c\xde\xbdE\xdc+\x85q\xa3\xbe\x83_@\x04W)\x16\xc5\xe7\xac`\xc4y\xd3lj\xba\xda\xe7\xb2\xd6os:\xd9F,\r4K\x86\x96\xfc\x95\xaag\x80)\xbf\x1a\x11\xad\xc8:^\xfcx)\xa1\xa3r\xcd\xda>)-\xf2\x87\x06\xee\x10U\x17\xd2\xd7\x9c\r\x9fB\x9ef\xd0\x8e3\xc0\x10\x06~\xca\x9b\x87\x83\x998#\xb4\x19mF\xf6\xd2\x10\x1a\x07\xf6\x83\x03\x07R\xef\x83\xab\xde\x98\xe8\xbd\xa3\xb8x\x84\xf6{+f\xeb\x031\xa5\x16A\xec\x1a\x90Xe(<\x8c\xb6\xbe\xcaNm\xe0\x8b\xf3\xf4\x83\x85\xfc=\xfdv8%\x9f50=\xb5\xca\xd9\x7f\xb3\x93\x8f\x00\xd9N\xaa\x1f\x88@<G\x1e\x13V(1;5\x9bg@\x94[\x82\xab\x9d\xa6\x1f\xfaz^\xec\x94jd\x869G\x91\xa0\x9c\n\xb9T\x0e\x01V\xb4\x1c\xfa\xbft\xf1G\x7f.\x92n6\x06\x928~\xba\x7f\xe9\x1f\x18\xf4a\xb63\x87"
step2_request = b'\xbe\xe4\x12\xd7\x86\xd4%\xe6F\r\xe2\xa0H&3\x11_\x8fw\xf1\xe4\xb1\xfeR+vIQh\x91t#W\xc9j\x1a\x04Z\x89\x9d\x9b\x00\x00\x00\x04\\\xbf\xa7\xe3\x00\x00\x00!k\xe8l\x02+\xb4\xc3\xfe\x00\x01\x00l\x0f\xb7\xdfb\xfd4\xfe!3\x07\xffx.b\x19\xc8\x98\x99\x7f\x81\xc05\xfa7cZ[XX\x91\x8a\xc9\x94\xad/wF\x18\x1f|9yBL\x1d\xdc\xe1|\x17N\x9d4\x01.I\'a\xe3B|\xce\x9b\x8e\xd0\xbd\xbcF*\xebS\xc6\x9d\x90/\x06\xa2\x1d\xac[\x99\xc1\xe3\xad\xaf\xc5\xa4\xc8\x1a?\x03\xc4fu\xe6\x1e\xd0\xbc$>6\x83\xccZ%\x0b\xa6\xff\xe2Yl\x9f\xbbkB\r>\xf0\x92\x00\x15\x1d\x83\xb4X\x98+\xe7\x80\xea\\F-6\x9b@\xfdi\xcf\x87\xb9\xf2\xfeW\xec\x03ck?\xf8\x05\x90\r\xb4\xd9<\x1f@R\xe7g\xd9\xba\xb0\x8b\x9b\x93\x0b\xa87UV\x08\xe5C>\xe33\xa8!\xc0\xd6\xe4\xa1\x11\xb8&x\xc1\x1fWt\xe0\x9f\xd2z@\t\xf3w\x9e\x04\x1ao\xff,\x85L\xd9\x8e\xe7\x1d\xf3=\x91h\xf3U~\xce\x17\x18!\xe5-\xb5qxU\x86\xb3\xd2k\x08\r\xa2\xf8E3E\xda\xc7\x0ch3F@\xfed"\x0e\x8ay\xe4\xf0"'
step2_response = b'\\\x07\xe8\xd0\x86\xd4%\xe6F\r\xe2\xa0H&3\x11_\x8fw\xf1\xe4\xb1\xfeR+vIQh\x91t#W\xc9j\x1a\xfeP\x02\x00:MC$\x99\x1b\xa8]\xf7\'[\x91\xe6\x01\xec\x04$\xa7`&Y1\xe5\x13PG\x1f3r\x05\xa0\x02\xf8+\xc7;\xd9\x1d\x84\x1ewo(\x1f\x16\xb4\x93I\xabpZ\xebVB\xce\xaf\xa9\x1c?\x8f\x18\xadq\x06\xa8=[\xe2\x9d\x187\xd8s\x0c\xf1\xa3\xfd\xdeSfw\x88$X\xbf\x14\x17\x14&\x04\xc0\xe4\x85\x00\x94\x186w2\xc6\xd0\xc7c\xe2S9<l\xb9C7}\xc1\xd8\xb5\x8eEy",\x90\xbfWd35\xdeV\xe1\xa4;\x89\x93\xd4\\\xdc\x17e\xdfsa|\xd9\xbb[*\xff\xdd;\xa7\x0b%\xe2F\x0e\xdf\xef\xaf\x80\xa1@\x05\x0c\xc4#\xb9\x10I\x0f)\xa6\x01f\xd9\xd8\xe8\xaa\xf2]\xfa\x9b\x85\xb3)Y\x1a\n\xa9\xd3\xe1\xdf\xc4s\xe1\xd3~t\xe3\xfe\xf9W\xbb\x0b5\xd2\xc3\xa3T\r\xb9\x92\xa6I\xc6\t\x91\xc5]a\x89\xd8P!\xf2\xa2\x0f\x16\tJ\x9e\x0b,\x88\xd0\xa5>\xc4\x05\xc7Ws\xf4/\xd8\x10\xfemK\x01dB\xd5q\x93T\xd6\x04\xcaW\x00\x19\x07\x92\xca8h\x13J\xf2\xae\xf9\x1d\xba\xc8\xcc\x8f\xdc\x82\xe6"^\x83\xd0\xa4\xd2\x9a\x99\x92<\x15\x1e#J \x88\x82Y\x11\xd6\x14>j\xcc\x10e\x9f1\xf3{~\xa8\xfb\xdd\x82\xd1\x81`,0LAN\x87\x8c\xcc\tZ2\x07\x18\x81\x9f\\h\x8c$\xc9\x1e^\n\rJ\x94,\xb6L\x06L\x9d\xd6\xed4}W\xb5\xcd\xbcu\xd2MI\x98\x03\xac\xa8\xebZ\xed\xc3#\x1c\x8e\xc2\x16\x89\xe3\xfc=\xd1x\x9cR\xf6\x1e\xe7\xb3\xbfS\xc0\xef\xf4!\xdeH:7SSi\x17Z\xc3-2B&\xa9\xd4,4%~@\x12\'j}p\xfay\x8bU\xab\xa1;\xb7\xfc\xf9\x9fM\xf6:\xc0\xfe\xa4T\xe2\xf2\xf1E}\x0e-\xc66\x00\x91\xd4\xf7\xd6\x0b6L\xc8\xd7\x07\xefQ\x18\xce=\xf1\x88\xdf\xd64\r\xfb\xce\xb4\xb0\xf3w~\xda\x80\xc0\xc5\xca\xa3\x02\xb2A\xd21\xbc\xefH\xf217\x00\xc1Z\x85p\xa1\x87V/\x88\xe0\x10y\x07bO"\xe1\xe0b@\x80\xf8\x9f\xe1cc5\xc2\xa8\xd0\xb0&\x97\xdb\x14<0.\xfa$B\x9a\xca\x0c\xef3\x01@+\xeeQ\x9a\xe1\x1d\x8b6\x02%\xe8\x1c\xa5\xb9N9\xc1\xce\x93\xcc\xe4\xde\xc8%\x9c\x14\xce\xf6\xaa\x02\x1a\xa1\xc3\xe3\xb5v\xd2n%=c\x0e\xa8\xbd\xfa(\xefE\xe8i\xcf\xeb'
step3_random = b'G\x92z\n\x1b\x15b\x91\x14,\xf8\xf7;\x03)\xa1\xccl\xde\x9f\xb6\xed\x89\x135\xaf\xfa\xc3>\x08:\x7f\xee\xda_o\xe9\xd5\xe1W\xe1\x9d\xc9\x13b<6\xcc\x1e\xd3\x87ac\x08\xe8\xf93\xf2\x07\x98\x98$|\xe9]\xf0+\xc4alt\x84)\x17\x18\xbc\xdd\x03\x94\x1bfR\xaf\xe2\x94_B\x1d\xd7C\xd5\x1fr\xe9\xae\xea\xcc\x0cu\xd9\x1fK2 \xcc\xc9\xda\xf5\xec=q\x8f^*\xf5_\x18"\xbd\x06\x1c\xee\xc1\xc0^H\xb0\t\xfe\x14\xba\x16\xb4\x84O\xa4!+\xca\xf1U]\x1fV[\xfa\x86-w\xfamL\xf4U\xf2]\xacS}\x05\xbbF\x83C-z\xec\xa8Mq?\xda"\xbe\xe3\x0b\xd6d\xbe\x8b9\x1e\x96\xf0\x9c,q\x92\xb6\x1d\xd1\xb2\xeb0\xc0\x06\x19S\xc2\x8cE\x0eob\x1ed\x93:\x13\xf9Ai\xe8\xa9N\n\xea\xda}\xb4\x0c\xb1\x01\xcfw\xa2`\xaeij\t\xa5n@\x9diq\x84m@\xe6\'\xec\x82\x18\x97\x89,\\\xef<\x14\xe3\x12+\x83\xb6 \x82\x16\x00Q\xff\xce\x93\xdb\xea]\xaf\x87\xd6\x1e'
step3_request = b"\x1f_\x04\xf5\x86\xd4%\xe6F\r\xe2\xa0H&3\x11_\x8fw\xf1\xe4\xb1\xfeR+vIQh\x91t#W\xc9j\x1a\xfeP\x01\x00%t5m\x89\x96p\x89\xb4\x92\xd8W\x97\xeb\x07J\xc0\x9c\xa9\xbcF\xe1\tbV\x98\x0fU\xa2\x1b\xf2\xe7\xe4\xbb$V\x97X7$\xc3\x0bx\xa2-N(\x1d&\xf7\xce\xfd\xd2\x0f\x13\xb4%\xd4\xca\xf9h>\x01\x0f\xf4y\xc2\xee\x87\x86\x8c\xaf\xc1\xf4u\\\xedee\x9ag\x8f\x0f'h\r`\xe9\x1f\x9a\x0f\x80?%d\xa5\xa7A\x9d\xd4{\xf1!\x82\xc6\xd4\x8c\xf9\xfaT\x9f\x89\x0f\x8ac\xaf\xfcpK\x07q\x05/H\x18\xd3\xe5\xd29\xe8\xbb\xf8`\x8e\\\xee\x9b\xef\xb2\xbcM\xd9\x0f\x17\xc0\x7f`\x8c\xae\xdfZN\\\xfe\x94\xffMvRN0\xcf\n\x02\x8a\x17\xe3q\xfb\xf5\xf7v\xe2\xf2\xf5\xf4\x07`#\xc3\r5\xd3\xb3\x7f\xc3\xb9\x03z@\xb6t\xdeg\xbc\xce\xb6J\x02^\xa1\x95\xa1\xf8\xad\\\xe1Y\xe9n}\x99\xac\x9cu\xd6\xfb\xebMn\x16\xe6\xb8\xe9\xda3\x0c%+\x0bd\xa4N\x19p\x83L\xd3cq\x16\xceS3\xa4\x05\x9b<S\xf8dGh\xbd\x8dr\xf8<\x06J\x97\x8d]\x88\xb3k\xb6N\x86>w\x05\x13e\x9b\xb8\x8d)D\xe9I\xffd\x87u\xe6\xee\xe7\x8d\xa2\xe1ve\xcc\xb6$\x9bwFx\xd3u\xad\xd9\xf0\x04\xc8\xaf\x97A\x03\xff4\x01\xd2\x12\x82\xc2\xbf\x1b\xf7\xc2\x8f\x98\xc3\xef\xac\xf1yVc\x14Y\xef\xe4\xc1"
step3_response = b"4\xf7\xcb;\x86\xd4%\xe6F\r\xe2\xa0H&3\x11_\x8fw\xf1\xe4\xb1\xfeR+vIQh\x91t#W\xc9j\x1a\x16G\x8884\xca{\xfd\xe2s\xa2N8\xc8\xc8\xb3"
expected_auth_key = b"\x0b\x1aJ\xd1\xb0\xa7\x91\x8bv?\xb7'<#\xca@\x08\xdc\x10z\x8c\x88\x8a}\xe7V\xeb\x93\x85\x03\x88\xe5\xc0R\xa0\xed6\x81fv\x84\xcc\x97|:\xf8\xf5\xc1\xbe+\xa2yT\xa0\x93\n:\xe3F330S\x82\xb8\x05\xc0\x87\x8a\xa7)\xcb+\xe4\xb6\x8br\t]\x96\xdc-5\xfa`R\xab\x98\xa5\xe7\x03\x04\xd8\x8dj\x96\x06\xe6$\xc5\xe6\xde\x84\x94\xad\x8b\x0eW\xc8\xb7\xc6b\x90\xd0\x1a\x9f\xfdm\x11o\xb7X^o\xf2\x05X\xfd\x9a@\x1b/\x92R\xf1\x9e\xf5\xe8J\xa3\x84\x8d\x1a\x9d\x10\x14P\x13\xa6v\x8c\xf8Q\xdf\xda\xe9\x00\xc7\xf51\x8e&#\xa8\xa9\xab\xcdo;\xe5\n\xa7\x8b\x9f\xd9@\xa4\x9d.\xfa\xc4=\xdd\x84\x9c\xd0&\xf6\x18V\xd0\x12\x1e\x13\x15\xd7\xc1\x91\xd2\xb3\nc\xdb\xed\x16\x0b_GNj\x8cpO\xf4L\xb9\x10\xd8\x97\x02\xd5\xcb\xd1\xe8\x82\x16\xedC\x15\n\xef\x0b\xaa\x89$\xb7\x1c6}\xacad\xd7\x9f0\x18\xf3\xdd\x06\x8e4\xbd\xb3\x12?\xe0"
request, step1 = _do_step1(step1_random)
assert request == step1_request
response = step1_response
with old_rsa_keys():
request, step2 = _do_step2(step1, response, step2_random)
assert request == step2_request
response = step2_response
step3_now = 1580236449
request, step3 = _do_step3(step2, response, step3_random, step3_now)
assert request == step3_request
response = step3_response
finished = create_key(step3, response)
assert finished == CreatedKey(
auth_key=AuthKey.from_bytes(expected_auth_key),
time_offset=0,
first_salt=4809708467028043047,
)

View File

@@ -62,8 +62,8 @@ def test_decrypt_server_data_v2() -> None:
def test_key_from_nonce() -> None:
server_nonce = bytes(range(16))
new_nonce = bytes(range(32))
server_nonce = int.from_bytes(bytes(range(16)))
new_nonce = int.from_bytes(bytes(range(32)))
(key, iv) = generate_key_data_from_nonce(server_nonce, new_nonce)
assert (

View File

@@ -0,0 +1,193 @@
import struct
from pytest import raises
from telethon._impl.mtproto.mtp import Encrypted, Plain, RpcError
from telethon._impl.tl.mtproto.types import RpcError as GeneratedRpcError
def test_rpc_error_parsing() -> None:
assert RpcError.from_mtproto_error(
GeneratedRpcError(
error_code=400,
error_message="CHAT_INVALID",
)
) == RpcError(
code=400,
name="CHAT_INVALID",
value=None,
caused_by=None,
)
assert RpcError.from_mtproto_error(
GeneratedRpcError(
error_code=420,
error_message="FLOOD_WAIT_31",
)
) == RpcError(
code=420,
name="FLOOD_WAIT",
value=31,
caused_by=None,
)
assert RpcError.from_mtproto_error(
GeneratedRpcError(
error_code=500,
error_message="INTERDC_2_CALL_ERROR",
)
) == RpcError(
code=500,
name="INTERDC_CALL_ERROR",
value=2,
caused_by=None,
)
PLAIN_REQUEST = b"Hey!"
def test_plain_finalize_clears_buffer() -> None:
mtp = Plain()
mtp.push(PLAIN_REQUEST)
assert len(mtp.finalize()) == 24
mtp.push(PLAIN_REQUEST)
assert len(mtp.finalize()) == 24
def test_plain_only_one_push_allowed() -> None:
mtp = Plain()
assert mtp.push(PLAIN_REQUEST) is not None
assert mtp.push(PLAIN_REQUEST) is None
MESSAGE_PREFIX_LEN = 8 + 8 # salt + client_id
GZIP_PACKED_HEADER = b"\xa1\xcf\x72\x30"
MSG_CONTAINER_HEADER = b"\xdc\xf8\xf1\x73"
REQUEST = b"Hey!"
REQUEST_B = b"Bye!"
def auth_key() -> bytes:
return bytes(256)
def ensure_buffer_is_message(buffer: bytes, body: bytes, seq_no: int) -> None:
# msg_id, based on time
assert buffer[0:8] != bytes(8)
# seq_no, sequential odd number
assert buffer[8:12] == struct.pack("<i", seq_no)
# bytes, body length
assert buffer[12:16] == struct.pack("<i", len(body))
# body
assert buffer[16:] == body
def test_serialization_has_salt_client_id() -> None:
mtp = Encrypted(auth_key())
mtp.push(REQUEST)
buffer = mtp._finalize_plain()
# salt
assert buffer[0:8] == bytes(8)
# client_id
assert buffer[8:16] != bytes(8)
# message
ensure_buffer_is_message(buffer[MESSAGE_PREFIX_LEN:], REQUEST, 1)
def test_correct_single_serialization() -> None:
mtp = Encrypted(auth_key())
assert mtp.push(REQUEST) is not None
buffer = mtp._finalize_plain()
ensure_buffer_is_message(buffer[MESSAGE_PREFIX_LEN:], REQUEST, 1)
def test_correct_multi_serialization() -> None:
mtp = Encrypted(auth_key(), compression_threshold=None)
assert mtp.push(REQUEST) is not None
assert mtp.push(REQUEST_B) is not None
buffer = mtp._finalize_plain()
buffer = buffer[MESSAGE_PREFIX_LEN:]
# container msg_id
assert buffer[0:8] != bytes(8)
# seq_no (after 1, 3 content-related comes 4)
assert buffer[8:12] == b"\x04\0\0\0"
# body length
assert buffer[12:16] == b"\x30\0\0\0"
# container constructor_id
assert buffer[16:20] == MSG_CONTAINER_HEADER
# message count
assert buffer[20:24] == b"\x02\0\0\0"
ensure_buffer_is_message(buffer[24:44], REQUEST, 1)
ensure_buffer_is_message(buffer[44:], REQUEST_B, 3)
def test_correct_single_large_serialization() -> None:
mtp = Encrypted(auth_key(), compression_threshold=None)
data = bytes(0x7F for _ in range(768 * 1024))
assert mtp.push(data) is not None
buffer = mtp._finalize_plain()
buffer = buffer[MESSAGE_PREFIX_LEN:]
assert len(buffer) == 16 + len(data)
def test_correct_multi_large_serialization() -> None:
mtp = Encrypted(auth_key(), compression_threshold=None)
data = bytes(0x7F for _ in range(768 * 1024))
assert mtp.push(data) is not None
assert mtp.push(data) is None
buffer = mtp._finalize_plain()
buffer = buffer[MESSAGE_PREFIX_LEN:]
assert len(buffer) == 16 + len(data)
def test_large_payload_panics() -> None:
mtp = Encrypted(auth_key())
with raises(AssertionError):
mtp.push(bytes(2 * 1024 * 1024))
def test_non_padded_payload_panics() -> None:
mtp = Encrypted(auth_key())
with raises(AssertionError):
mtp.push(b"\x01\x02\x03")
def test_no_compression_is_honored() -> None:
mtp = Encrypted(auth_key(), compression_threshold=None)
mtp.push(bytes(512 * 1024))
buffer = mtp._finalize_plain()
assert GZIP_PACKED_HEADER not in buffer
def test_some_compression() -> None:
mtp = Encrypted(auth_key(), compression_threshold=768 * 1024)
mtp.push(bytes(512 * 1024))
buffer = mtp._finalize_plain()
assert GZIP_PACKED_HEADER not in buffer
mtp = Encrypted(auth_key(), compression_threshold=256 * 1024)
mtp.push(bytes(512 * 1024))
buffer = mtp._finalize_plain()
assert GZIP_PACKED_HEADER in buffer
mtp = Encrypted(auth_key())
mtp.push(bytes(512 * 1024))
buffer = mtp._finalize_plain()
assert GZIP_PACKED_HEADER in buffer

View File

@@ -0,0 +1,11 @@
from telethon._impl.mtproto.utils import gzip_decompress
from telethon._impl.tl.core import Reader
from telethon._impl.tl.mtproto.types import GzipPacked, RpcResult
def test_gzip_decompress() -> None:
rpc_result = Reader(
b'\x01m\\\xf3\x84)\x96E6K1^\xa1\xcfr0\xfe\x8c\x01\x00\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\x03\x95\x93=K\xc3P\x14\x86O>\x14T\xaa\x01\x11\x1cD\x1c\x84n\xb7\xf7&\xb9\xf9\x9a:\n\x82tt\xaa6m\x12\x0b\xad\xa9mZp\xd2\xd1Qpp\x16\x7f\x838\xe8\xe8\xe0\x8fpp\xee\x9f\xa87\x1f\xeaM[R\x1a\xb89$O\xdes\xde\xfb&\t\xaa\x1b\xda\xd1>\x80qL\xea\xb5SR7\x1f\xaf\xdfe\x00\xd8\xf9x\xd9\xdbf\xb5\xf4\xf4\xba\xcb\n\x08lm\x12\xddF\x84\xea\x88\x98\x14Q\r\xde\x84\x94+\x0b\xb8\x90\xf1\xb2\x8a1q\xb0\xabZ\x8e\xafj-\xc7O.\xd9\x91?5~u\xf1\\1\xd7\xd70\x11\xc5\xb9\xb9\xb3\x9c\xfcq1\xe3[<\'\x94\xf0\xbe\xc4\x7f_\x86\xd9t\xb0\xeeY\xb1/\xb5\xc8\x97\xb4\x9c\xce\xe5\xf7#\xe5\xfc\xb0\x9c\x08\xc6\xfc~\x8a\xb8\x90\xf199j\x8br\x94\xa7s\xb2I.\xc7".d|\xce~\xf5\xa2\xb9b\xa6\xe3\xf27\x10Q1\x9f\xe3\x12}]\xde\xcf\xcaL\x0e~\xac\xa3E~\x94LW\xb2\t\x8b\xd5B\x94\xb9\xb10\x00\x9f\xd34\xa7v\xc6\x1b\xbd\x91\x86\x06\x91\xd7A\xcd\xb0\x0b\xf0\xc9\x9e\xab\x96$h\xb1Z\xd3%\xb8c?\x0b\x1e\x02|?\xc8I\xfdb\x8d\xce\xc7\xe9w\x12?\x0b\xcf"L&\x93\x9bx\xc1\xf80\xb9\x17\xcf\xbau\xd6\x92\x1a\xf798\x01\xb8?\x13\x12\xbdRf3/\xa2\xa87p*\x95\x08u\xbd\n{\xf7A\xdb_\xf7\xc3a\x7fp5l\xf4=\x90\xdd\xf6e\xc0t\xabA\x18\x06\x1d/\rSI\xcb\x0f:\xba\r\x8d\xd8\x03\x00\x00'
).read_serializable(RpcResult)
gzip_packed = Reader(rpc_result.result).read_serializable(GzipPacked)
assert len(gzip_decompress(gzip_packed)) == 984

View File

@@ -47,3 +47,13 @@ def test_generated_object(obj: Serializable) -> None:
assert bytes(obj)[:4] == struct.pack("<I", obj.constructor_id())
assert type(obj)._read_from(Reader(bytes(obj)[4:])) == obj
assert Reader(bytes(obj)).read_serializable(type(obj)) == obj
def test_repeated_read() -> None:
reader = Reader(bytes(range(8)))
assert reader.read(4) == bytes(range(4))
assert reader.read(4) == bytes(range(4, 8))
reader = Reader(bytes(range(8)))
assert reader.read_fmt("4b", 4) == tuple(range(4))
assert reader.read_fmt("4b", 4) == tuple(range(4, 8))

View File

@@ -1,7 +1,9 @@
from rsa import PublicKey
from telethon._impl.crypto.rsa import (
PRODUCTION_RSA_KEY,
TESTMODE_RSA_KEY,
compute_fingerprint,
encrypt_hashed,
)
@@ -13,3 +15,15 @@ def test_fingerprint_1() -> None:
def test_fingerprint_2() -> None:
fp = compute_fingerprint(TESTMODE_RSA_KEY)
assert fp == -5595554452916591101
def test_rsa_encryption() -> None:
key = PublicKey(
n=22081946531037833540524260580660774032207476521197121128740358761486364763467087828766873972338019078976854986531076484772771735399701424566177039926855356719497736439289455286277202113900509554266057302466528985253648318314129246825219640197356165626774276930672688973278712614800066037531599375044750753580126415613086372604312320014358994394131667022861767539879232149461579922316489532682165746762569651763794500923643656753278887871955676253526661694459370047843286685859688756429293184148202379356802488805862746046071921830921840273062124571073336369210703400985851431491295910187179045081526826572515473914151,
e=65537,
)
result = encrypt_hashed(b"Hello!", key, bytes(256))
assert (
result
== b"up-L\x88\xd2\x9bj\xb945Q$\xdd(\xd9\xb6*GU\x88A\xc8\x03\x14P\xf7I\x9b\x1c\x9ck\xd3\x9d'\xc1X\x1cQ4NQ\xc1y#pd\xa7#\xae\x93\x9dZ\xc3P\x14\xfd\x8bO\xe2Ou\xe3\x11\\2\xa1ci\xee+7:a\xec\x94F\xb9+.=\xf0v\x18\xdb\n\x8a\xfd\xa9\x99\xb6p+2\xb5\x81\x9b\xd6\xeaIp\xfb4|\xa8J`\xd0\xc3\x8a\xb7\x0cf\xe5\xed\x01@D\x88\x89\xa3\xb8\x82\xee\xa53\xba\xd0^\xfa E\xed\xa7\x17\x12<AJ\xbf\xde\xd4>\x1e\xb4\x83\xa0Ixn\xf5\x03\x1b\x12\xd5\x1a?\xf7\xec\xb7\xd8\x04\xd4A5\x94_\x98\xf7ZJl\xf1\xa1\xdf7U\x9e0\xbb\xe9*Kyf\xc3O\x078\xe6\xd10Y\x85wm&\xdf\xab|\x0f\xdf\xd7\xec ,\xc7\x8cT\xcf\x82\xac#\x86\xc7\x9d\x0e\x19u\x80\xa4\xfa\x940\n#\x82\xf9\xe1\x16\xfe\x82\xdf\x9b\xd8r\xe5\xb9\xda{Bb#\xbf\x1a\xd8X\x890\xb5\x1e\x16]l\xdd\x02"
)

View File

@@ -0,0 +1,60 @@
from typing import Tuple
from pytest import raises
from telethon._impl.mtproto.transport.abridged import Abridged
def setup_pack(n: int) -> Tuple[Abridged, bytes, bytearray]:
input = bytes(x & 0xFF for x in range(n))
return Abridged(), input, bytearray()
def test_pack_empty() -> None:
transport, input, output = setup_pack(0)
transport.pack(input, output)
assert output == b"\xef\0"
def test_pack_non_padded() -> None:
transport, input, output = setup_pack(7)
with raises(AssertionError):
transport.pack(input, output)
def test_pack_normal() -> None:
transport, input, output = setup_pack(128)
transport.pack(input, output)
assert output[:2] == b"\xef\x20"
assert output[2:] == input
def pack_large() -> None:
transport, input, output = setup_pack(1024)
transport.pack(input, output)
assert output[:5] == b"\xef\x7f\0\x01\0"
assert output[5:] == input
def test_unpack_small() -> None:
transport = Abridged()
input = b"\x01"
output = bytearray()
with raises(ValueError) as e:
transport.unpack(input, output)
e.match("missing bytes")
def test_unpack_normal() -> None:
transport, input, packed = setup_pack(128)
unpacked = bytearray()
transport.pack(input, packed)
transport.unpack(packed[1:], unpacked)
assert input == unpacked
def unpack_large() -> None:
transport, input, packed = setup_pack(1024)
unpacked = bytearray()
transport.pack(input, packed)
transport.unpack(packed[1:], unpacked)
assert input == unpacked

View File

@@ -0,0 +1,103 @@
from typing import Tuple
from pytest import raises
from telethon._impl.mtproto.transport.full import Full
def setup_pack(n: int) -> Tuple[Full, bytes, bytearray]:
input = bytes(x & 0xFF for x in range(n))
return Full(), input, bytearray()
def setup_unpack(n: int) -> Tuple[bytes, Full, bytes, bytearray]:
transport, expected_output, input = setup_pack(n)
transport.pack(expected_output, input)
return expected_output, Full(), input, bytearray()
def test_pack_empty() -> None:
transport, input, output = setup_pack(0)
transport.pack(input, output)
assert output == b"\x0c\x00\x00\x00\x00\x00\x00\x00&\xca\x8d2"
def test_pack_non_padded() -> None:
transport, input, output = setup_pack(7)
with raises(AssertionError):
transport.pack(input, output)
def test_pack_normal() -> None:
transport, input, output = setup_pack(128)
transport.pack(input, output)
assert output[:4] == b"\x8c\0\0\0"
assert output[4:8] == b"\0\0\0\0"
assert output[8 : 8 + len(input)] == input
assert output[8 + len(input) :] == b"\x86s\x957"
def test_pack_twice() -> None:
transport, input, output = setup_pack(128)
transport.pack(input, output)
output.clear()
transport.pack(input, output)
assert output[:4] == b"\x8c\0\0\0"
assert output[4:8] == b"\x01\0\0\0"
assert output[8 : 8 + len(input)] == input
assert output[8 + len(input) :] == b"\x96\t\xf0J"
def test_unpack_small() -> None:
transport = Full()
input = b"\0\x01\x02"
output = bytearray()
with raises(ValueError) as e:
transport.unpack(input, output)
e.match("missing bytes")
def test_unpack_normal() -> None:
expected_output, transport, input, output = setup_unpack(128)
transport.unpack(input, output)
assert output == expected_output
def test_unpack_twice() -> None:
transport, input, packed = setup_pack(128)
unpacked = bytearray()
transport.pack(input, packed)
transport.unpack(packed, unpacked)
assert input == unpacked
packed.clear()
unpacked.clear()
transport.pack(input, packed)
transport.unpack(packed, unpacked)
assert input == unpacked
def test_unpack_bad_crc() -> None:
_, transport, input, output = setup_unpack(128)
input = input[:-1] + bytes((input[-1] ^ 0xFF,))
with raises(ValueError) as e:
transport.unpack(input, output)
e.match("bad crc")
e.match("expected: 932541318")
e.match("got: 3365237638")
def test_unpack_bad_seq() -> None:
transport, input, packed = setup_pack(128)
unpacked = bytearray()
transport.pack(input, packed)
packed.clear()
transport.pack(input, packed)
with raises(ValueError) as e:
transport.unpack(packed, unpacked)
e.match("bad seq")
e.match("expected: 0")
e.match("got: 1")

View File

@@ -0,0 +1,45 @@
from typing import Tuple
from pytest import raises
from telethon._impl.mtproto.transport.intermediate import Intermediate
def setup_pack(n: int) -> Tuple[Intermediate, bytes, bytearray]:
input = bytes(x & 0xFF for x in range(n))
return Intermediate(), input, bytearray()
def test_pack_empty() -> None:
transport, input, output = setup_pack(0)
transport.pack(input, output)
assert output == b"\xee\xee\xee\xee\0\0\0\0"
def test_pack_non_padded() -> None:
transport, input, output = setup_pack(7)
with raises(AssertionError):
transport.pack(input, output)
def test_pack_normal() -> None:
transport, input, output = setup_pack(128)
transport.pack(input, output)
assert output[:8] == b"\xee\xee\xee\xee\x80\0\0\0"
assert output[8:] == input
def test_unpack_small() -> None:
transport = Intermediate()
input = b"\x01"
output = bytearray()
with raises(ValueError) as e:
transport.unpack(input, output)
e.match("missing bytes")
def test_unpack_normal() -> None:
transport, input, packed = setup_pack(128)
unpacked = bytearray()
transport.pack(input, packed)
transport.unpack(packed[4:], unpacked)
assert input == unpacked