mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2025-08-08 21:10:29 +00:00
Port crypto from grammers
This commit is contained in:
110
client/src/telethon/_impl/crypto/__init__.py
Normal file
110
client/src/telethon/_impl/crypto/__init__.py
Normal file
@@ -0,0 +1,110 @@
|
||||
import os
|
||||
from collections import namedtuple
|
||||
from enum import IntEnum
|
||||
from hashlib import sha1, sha256
|
||||
|
||||
from .aes import ige_decrypt, ige_encrypt
|
||||
from .auth_key import AuthKey
|
||||
|
||||
|
||||
# "where x = 0 for messages from client to server and x = 8 for those from server to client"
|
||||
class Side(IntEnum):
|
||||
CLIENT = 0
|
||||
SERVER = 8
|
||||
|
||||
|
||||
CalcKey = namedtuple("CalcKey", ("key", "iv"))
|
||||
|
||||
|
||||
# https://core.telegram.org/mtproto/description#defining-aes-key-and-initialization-vector
|
||||
def calc_key(auth_key: AuthKey, msg_key: bytes, side: Side) -> CalcKey:
|
||||
x = int(side)
|
||||
|
||||
# sha256_a = SHA256 (msg_key + substr (auth_key, x, 36))
|
||||
sha256_a = sha256(msg_key + auth_key.data[x : x + 36]).digest()
|
||||
|
||||
# sha256_b = SHA256 (substr (auth_key, 40+x, 36) + msg_key)
|
||||
sha256_b = sha256(auth_key.data[x + 40 : x + 76] + msg_key).digest()
|
||||
|
||||
# aes_key = substr (sha256_a, 0, 8) + substr (sha256_b, 8, 16) + substr (sha256_a, 24, 8)
|
||||
aes_key = sha256_a[:8] + sha256_b[8:24] + sha256_a[24:32]
|
||||
|
||||
# aes_iv = substr (sha256_b, 0, 8) + substr (sha256_a, 8, 16) + substr (sha256_b, 24, 8)
|
||||
aes_iv = sha256_b[:8] + sha256_a[8:24] + sha256_b[24:32]
|
||||
|
||||
return CalcKey(aes_key, aes_iv)
|
||||
|
||||
|
||||
def determine_padding_v2_length(length: int) -> int:
|
||||
return 16 + (16 - (length % 16))
|
||||
|
||||
|
||||
def _do_encrypt_data_v2(
|
||||
plaintext: bytes, auth_key: AuthKey, random_padding: bytes
|
||||
) -> bytes:
|
||||
padded_plaintext = (
|
||||
plaintext + random_padding[: determine_padding_v2_length(len(plaintext))]
|
||||
)
|
||||
|
||||
side = Side.CLIENT
|
||||
x = int(side)
|
||||
|
||||
# msg_key_large = SHA256 (substr (auth_key, 88+x, 32) + plaintext + random_padding)
|
||||
msg_key_large = sha256(auth_key.data[x + 88 : x + 120] + padded_plaintext).digest()
|
||||
|
||||
# msg_key = substr (msg_key_large, 8, 16)
|
||||
msg_key = msg_key_large[8:24]
|
||||
|
||||
key, iv = calc_key(auth_key, msg_key, side)
|
||||
ciphertext = ige_encrypt(padded_plaintext, key, iv)
|
||||
|
||||
return auth_key.key_id + msg_key + ciphertext
|
||||
|
||||
|
||||
def encrypt_data_v2(plaintext: bytes, auth_key: AuthKey) -> bytes:
|
||||
random_padding = os.urandom(32)
|
||||
return _do_encrypt_data_v2(plaintext, auth_key, random_padding)
|
||||
|
||||
|
||||
def decrypt_data_v2(ciphertext: bytes, auth_key: AuthKey) -> bytes:
|
||||
side = Side.SERVER
|
||||
x = int(side)
|
||||
|
||||
if len(ciphertext) < 24 or (len(ciphertext) - 24) % 16 != 0:
|
||||
raise ValueError("invalid ciphertext buffer length")
|
||||
|
||||
# TODO Check salt, session_id and sequence_number
|
||||
key_id = ciphertext[:8]
|
||||
if auth_key.key_id != key_id:
|
||||
raise ValueError("server authkey mismatches with ours")
|
||||
|
||||
msg_key = ciphertext[8:24]
|
||||
key, iv = calc_key(auth_key, msg_key, side)
|
||||
plaintext = ige_decrypt(ciphertext[24:], key, iv)
|
||||
|
||||
# https://core.telegram.org/mtproto/security_guidelines#mtproto-encrypted-messages
|
||||
our_key = sha256(auth_key.data[x + 88 : x + 120] + plaintext).digest()
|
||||
if msg_key != our_key[8:24]:
|
||||
raise ValueError("server msgkey mismatches with ours")
|
||||
|
||||
return plaintext
|
||||
|
||||
|
||||
def generate_key_data_from_nonce(server_nonce: bytes, new_nonce: bytes) -> CalcKey:
|
||||
hash1 = sha1(new_nonce + server_nonce).digest()
|
||||
hash2 = sha1(server_nonce + new_nonce).digest()
|
||||
hash3 = sha1(new_nonce + new_nonce).digest()
|
||||
|
||||
key = hash1 + hash2[:12]
|
||||
iv = hash2[12:20] + hash3 + new_nonce[:4]
|
||||
return CalcKey(key, iv)
|
||||
|
||||
|
||||
def encrypt_ige(plaintext: bytes, key: bytes, iv: bytes) -> bytes:
|
||||
if len(plaintext) % 16 != 0:
|
||||
plaintext += os.urandom((16 - (len(plaintext) % 16)) % 16)
|
||||
return ige_encrypt(plaintext, key, iv)
|
||||
|
||||
|
||||
def decrypt_ige(padded_ciphertext: bytes, key: bytes, iv: bytes) -> bytes:
|
||||
return ige_decrypt(padded_ciphertext, key, iv)
|
53
client/src/telethon/_impl/crypto/aes.py
Normal file
53
client/src/telethon/_impl/crypto/aes.py
Normal file
@@ -0,0 +1,53 @@
|
||||
import pyaes
|
||||
|
||||
|
||||
def ige_encrypt(plaintext: bytes, key: bytes, iv: bytes) -> bytes:
|
||||
assert len(plaintext) % 16 == 0
|
||||
assert len(iv) == 32
|
||||
|
||||
aes = pyaes.AES(key)
|
||||
iv1 = iv[:16]
|
||||
iv2 = iv[16:]
|
||||
|
||||
ciphertext = bytearray()
|
||||
|
||||
for block_offset in range(0, len(plaintext), 16):
|
||||
plaintext_block = plaintext[block_offset : block_offset + 16]
|
||||
ciphertext_block = bytes(
|
||||
a ^ b
|
||||
for a, b in zip(
|
||||
aes.encrypt([a ^ b for a, b in zip(plaintext_block, iv1)]), iv2
|
||||
)
|
||||
)
|
||||
iv1 = ciphertext_block
|
||||
iv2 = plaintext_block
|
||||
|
||||
ciphertext += ciphertext_block
|
||||
|
||||
return bytes(ciphertext)
|
||||
|
||||
|
||||
def ige_decrypt(ciphertext: bytes, key: bytes, iv: bytes) -> bytes:
|
||||
assert len(ciphertext) % 16 == 0
|
||||
assert len(iv) == 32
|
||||
|
||||
aes = pyaes.AES(key)
|
||||
iv1 = iv[:16]
|
||||
iv2 = iv[16:]
|
||||
|
||||
plaintext = bytearray()
|
||||
|
||||
for block_offset in range(0, len(ciphertext), 16):
|
||||
ciphertext_block = ciphertext[block_offset : block_offset + 16]
|
||||
plaintext_block = bytes(
|
||||
a ^ b
|
||||
for a, b in zip(
|
||||
aes.decrypt([a ^ b for a, b in zip(ciphertext_block, iv2)]), iv1
|
||||
)
|
||||
)
|
||||
iv1 = ciphertext_block
|
||||
iv2 = plaintext_block
|
||||
|
||||
plaintext += plaintext_block
|
||||
|
||||
return bytes(plaintext)
|
23
client/src/telethon/_impl/crypto/auth_key.py
Normal file
23
client/src/telethon/_impl/crypto/auth_key.py
Normal file
@@ -0,0 +1,23 @@
|
||||
from dataclasses import dataclass
|
||||
from hashlib import sha1
|
||||
from typing import Self
|
||||
|
||||
|
||||
@dataclass
|
||||
class AuthKey:
|
||||
data: bytes
|
||||
aux_hash: bytes
|
||||
key_id: bytes
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, data: bytes) -> Self:
|
||||
sha = sha1(data).digest()
|
||||
aux_hash = sha[:8]
|
||||
key_id = sha[12:]
|
||||
return cls(data=data, aux_hash=aux_hash, key_id=key_id)
|
||||
|
||||
def __bytes__(self) -> bytes:
|
||||
return self.data
|
||||
|
||||
def calc_new_nonce_hash(self, new_nonce: bytes, number: int) -> bytes:
|
||||
return sha1(new_nonce + bytes((number,)) + self.aux_hash).digest()[4:]
|
49
client/src/telethon/_impl/crypto/factorize.py
Normal file
49
client/src/telethon/_impl/crypto/factorize.py
Normal file
@@ -0,0 +1,49 @@
|
||||
from math import gcd
|
||||
from random import randrange
|
||||
from typing import Tuple
|
||||
|
||||
|
||||
def factorize(pq: int) -> Tuple[int, int]:
|
||||
"""
|
||||
Factorize the given number into its two prime factors.
|
||||
|
||||
The algorithm here is a faster variant of [Pollard's rho algorithm],
|
||||
published by [Richard Brent], based on
|
||||
<https://comeoncodeon.wordpress.com/2010/09/18/pollard-rho-brent-integer-factorization/>.
|
||||
|
||||
[Pollard's rho algorithm]: <https://en.wikipedia.org/wiki/Pollard%27s_rho_algorithm>
|
||||
[Richard Brent]: <https://maths-people.anu.edu.au/~brent/pd/rpb051i.pdf>
|
||||
"""
|
||||
if pq % 2 == 0:
|
||||
return 2, pq // 2
|
||||
|
||||
y, c, m = randrange(1, pq), randrange(1, pq), randrange(1, pq)
|
||||
g = r = q = 1
|
||||
x = ys = 0
|
||||
|
||||
while g == 1:
|
||||
x = y
|
||||
for _ in range(r):
|
||||
y = (pow(y, 2, pq) + c) % pq
|
||||
|
||||
k = 0
|
||||
while k < r and g == 1:
|
||||
ys = y
|
||||
for _ in range(min(m, r - k)):
|
||||
y = (pow(y, 2, pq) + c) % pq
|
||||
q = q * (abs(x - y)) % pq
|
||||
|
||||
g = gcd(q, pq)
|
||||
k += m
|
||||
|
||||
r *= 2
|
||||
|
||||
if g == pq:
|
||||
while True:
|
||||
ys = (pow(ys, 2, pq) + c) % pq
|
||||
g = gcd(abs(x - ys), pq)
|
||||
if g > 1:
|
||||
break
|
||||
|
||||
p, q = g, pq // g
|
||||
return (p, q) if p < q else (q, p)
|
48
client/src/telethon/_impl/crypto/rsa.py
Normal file
48
client/src/telethon/_impl/crypto/rsa.py
Normal file
@@ -0,0 +1,48 @@
|
||||
import struct
|
||||
from hashlib import sha1
|
||||
|
||||
from rsa import PublicKey, encrypt
|
||||
|
||||
from ..tl.core import serialize_bytes_to
|
||||
|
||||
|
||||
def compute_fingerprint(key: PublicKey) -> int:
|
||||
buffer = bytearray()
|
||||
serialize_bytes_to(buffer, int.to_bytes(key.n, (key.n.bit_length() + 7) // 8))
|
||||
serialize_bytes_to(buffer, int.to_bytes(key.e, (key.e.bit_length() + 7) // 8))
|
||||
fingerprint = struct.unpack("<q", sha1(buffer).digest()[-8:])[0]
|
||||
assert isinstance(fingerprint, int)
|
||||
return fingerprint
|
||||
|
||||
|
||||
def encrypt_hashed(data: bytes, key: PublicKey) -> bytes:
|
||||
return encrypt(sha1(data).digest() + data, key)
|
||||
|
||||
|
||||
# From my.telegram.org.
|
||||
PRODUCTION_RSA_KEY = PublicKey.load_pkcs1(
|
||||
b"""-----BEGIN RSA PUBLIC KEY-----
|
||||
MIIBCgKCAQEA6LszBcC1LGzyr992NzE0ieY+BSaOW622Aa9Bd4ZHLl+TuFQ4lo4g
|
||||
5nKaMBwK/BIb9xUfg0Q29/2mgIR6Zr9krM7HjuIcCzFvDtr+L0GQjae9H0pRB2OO
|
||||
62cECs5HKhT5DZ98K33vmWiLowc621dQuwKWSQKjWf50XYFw42h21P2KXUGyp2y/
|
||||
+aEyZ+uVgLLQbRA1dEjSDZ2iGRy12Mk5gpYc397aYp438fsJoHIgJ2lgMv5h7WY9
|
||||
t6N/byY9Nw9p21Og3AoXSL2q/2IJ1WRUhebgAdGVMlV1fkuOQoEzR7EdpqtQD9Cs
|
||||
5+bfo3Nhmcyvk5ftB0WkJ9z6bNZ7yxrP8wIDAQAB
|
||||
-----END RSA PUBLIC KEY-----"""
|
||||
)
|
||||
|
||||
TESTMODE_RSA_KEY = PublicKey.load_pkcs1(
|
||||
b"""-----BEGIN RSA PUBLIC KEY-----
|
||||
MIIBCgKCAQEAyMEdY1aR+sCR3ZSJrtztKTKqigvO/vBfqACJLZtS7QMgCGXJ6XIR
|
||||
yy7mx66W0/sOFa7/1mAZtEoIokDP3ShoqF4fVNb6XeqgQfaUHd8wJpDWHcR2OFwv
|
||||
plUUI1PLTktZ9uW2WE23b+ixNwJjJGwBDJPQEQFBE+vfmH0JP503wr5INS1poWg/
|
||||
j25sIWeYPHYeOrFp/eXaqhISP6G+q2IeTaWTXpwZj4LzXq5YOpk4bYEQ6mvRq7D1
|
||||
aHWfYmlEGepfaYR8Q0YqvvhYtMte3ITnuSJs171+GDqpdKcSwHnd6FudwGO4pcCO
|
||||
j4WcDuXc2CTHgH8gFTNhp/Y8/SpDOhvn9QIDAQAB
|
||||
-----END RSA PUBLIC KEY-----"""
|
||||
)
|
||||
|
||||
|
||||
RSA_KEYS = {
|
||||
compute_fingerprint(key): key for key in (PRODUCTION_RSA_KEY, TESTMODE_RSA_KEY)
|
||||
}
|
140
client/src/telethon/_impl/crypto/two_factor_auth.py
Normal file
140
client/src/telethon/_impl/crypto/two_factor_auth.py
Normal file
@@ -0,0 +1,140 @@
|
||||
# Ported from https://github.com/Lonami/grammers/blob/d91dc82/lib/grammers-crypto/src/two_factor_auth.rs
|
||||
from collections import namedtuple
|
||||
from hashlib import pbkdf2_hmac, sha256
|
||||
|
||||
from .factorize import factorize
|
||||
|
||||
TwoFactorAuth = namedtuple("TwoFactorAuth", ("m1", "g_a"))
|
||||
|
||||
|
||||
def pad_to_256(data: bytes) -> bytes:
|
||||
return bytes(256 - len(data)) + data
|
||||
|
||||
|
||||
# H(data) := sha256(data)
|
||||
def h(*data: bytes) -> bytes:
|
||||
return sha256(b"".join(data)).digest()
|
||||
|
||||
|
||||
# SH(data, salt) := H(salt | data | salt)
|
||||
def sh(data: bytes, salt: bytes) -> bytes:
|
||||
return h(salt, data, salt)
|
||||
|
||||
|
||||
# PH1(password, salt1, salt2) := SH(SH(password, salt1), salt2)
|
||||
def ph1(password: bytes, salt1: bytes, salt2: bytes) -> bytes:
|
||||
return sh(sh(password, salt1), salt2)
|
||||
|
||||
|
||||
# PH2(password, salt1, salt2) := SH(pbkdf2(sha512, PH1(password, salt1, salt2), salt1, 100000), salt2)
|
||||
def ph2(password: bytes, salt1: bytes, salt2: bytes) -> bytes:
|
||||
return sh(pbkdf2_hmac("sha512", ph1(password, salt1, salt2), salt1, 100000), salt2)
|
||||
|
||||
|
||||
# https://core.telegram.org/api/srp
|
||||
def calculate_2fa(
|
||||
*,
|
||||
salt1: bytes,
|
||||
salt2: bytes,
|
||||
g: int,
|
||||
p: bytes,
|
||||
g_b: bytes,
|
||||
a: bytes,
|
||||
password: bytes,
|
||||
) -> TwoFactorAuth:
|
||||
big_p = int.from_bytes(p)
|
||||
|
||||
g_b = pad_to_256(g_b)
|
||||
a = pad_to_256(a)
|
||||
|
||||
g_for_hash = g.to_bytes(256)
|
||||
|
||||
big_g_b = int.from_bytes(g_b)
|
||||
|
||||
big_g = g
|
||||
big_a = int.from_bytes(a)
|
||||
|
||||
# k := H(p | g)
|
||||
k = h(p, g_for_hash)
|
||||
big_k = int.from_bytes(k)
|
||||
|
||||
# g_a := pow(g, a) mod p
|
||||
g_a = pow(big_g, big_a, big_p).to_bytes(256)
|
||||
|
||||
# u := H(g_a | g_b)
|
||||
u = int.from_bytes(h(g_a, g_b))
|
||||
|
||||
# x := PH2(password, salt1, salt2)
|
||||
x = int.from_bytes(ph2(password, salt1, salt2))
|
||||
|
||||
# v := pow(g, x) mod p
|
||||
big_v = pow(big_g, x, big_p)
|
||||
|
||||
# k_v := (k * v) mod p
|
||||
k_v = (big_k * big_v) % big_p
|
||||
|
||||
# t := (g_b - k_v) mod p (positive modulo, if the result is negative increment by p)
|
||||
if big_g_b > k_v:
|
||||
sub = big_g_b - k_v
|
||||
else:
|
||||
sub = k_v - big_g_b
|
||||
|
||||
big_t = sub % big_p
|
||||
|
||||
# s_a := pow(t, a + u * x) mod p
|
||||
first = u * x
|
||||
second = big_a + first
|
||||
big_s_a = pow(big_t, second, big_p)
|
||||
|
||||
# k_a := H(s_a)
|
||||
k_a = h(big_s_a.to_bytes(256))
|
||||
|
||||
# M1 := H(H(p) xor H(g) | H(salt1) | H(salt2) | g_a | g_b | k_a)
|
||||
h_p = h(p)
|
||||
h_g = h(g_for_hash)
|
||||
|
||||
p_xor_g = bytes(hpi ^ hgi for hpi, hgi in zip(h_p, h_g))
|
||||
|
||||
m1 = h(p_xor_g, h(salt1), h(salt2), g_a, g_b, k_a)
|
||||
|
||||
return TwoFactorAuth(m1, g_a)
|
||||
|
||||
|
||||
def check_p_len(p: bytes) -> bool:
|
||||
return len(p) == 256
|
||||
|
||||
|
||||
def check_known_prime(p: bytes, g: int) -> bool:
|
||||
good_prime = b"\xc7\x1c\xae\xb9\xc6\xb1\xc9\x04\x8elR/p\xf1?s\x98\r@#\x8e>!\xc1I4\xd07V=\x93\x0fH\x19\x8a\n\xa7\xc1@X\"\x94\x93\xd2%0\xf4\xdb\xfa3on\n\xc9%\x13\x95C\xae\xd4L\xce|7 \xfdQ\xf6\x94XpZ\xc6\x8c\xd4\xfekk\x13\xab\xdc\x97FQ)i2\x84T\xf1\x8f\xaf\x8cY_d$w\xfe\x96\xbb*\x94\x1d[\xcd\x1dJ\xc8\xccI\x88\x07\x08\xfa\x9b7\x8e<O:\x90`\xbe\xe6|\xf9\xa4\xa4\xa6\x95\x81\x10Q\x90~\x16'S\xb5k\x0fkA\r\xbat\xd8\xa8K*\x14\xb3\x14N\x0e\xf1(GT\xfd\x17\xed\x95\rYe\xb4\xb9\xddFX-\xb1\x17\x8d\x16\x9ck\xc4e\xb0\xd6\xff\x9c\xa3\x92\x8f\xef[\x9a\xe4\xe4\x18\xfc\x15\xe8>\xbe\xa0\xf8\x7f\xa9\xff^\xedp\x05\r\xed(I\xf4{\xf9Y\xd9V\x85\x0c\xe9)\x85\x1f\r\x81\x15\xf65\xb1\x05\xee.N\x15\xd0K$T\xbfoO\xad\xf04\xb1\x04\x03\x11\x9c\xd8\xe3\xb9/\xcc["
|
||||
return p == good_prime and g in (3, 4, 5, 7)
|
||||
|
||||
|
||||
def check_p_prime_and_subgroup(p: bytes, g: int) -> bool:
|
||||
if check_known_prime(p, g):
|
||||
return True
|
||||
|
||||
big_p = int.from_bytes(p)
|
||||
|
||||
if g == 2:
|
||||
candidate = big_p % 8 == 7
|
||||
elif g == 3:
|
||||
candidate = big_p % 3 == 2
|
||||
elif g == 4:
|
||||
candidate = True
|
||||
elif g == 5:
|
||||
candidate = (big_p % 5) in (1, 4)
|
||||
elif g == 6:
|
||||
candidate = (big_p % 24) in (19, 23)
|
||||
elif g == 7:
|
||||
candidate = (big_p % 7) in (3, 5, 6)
|
||||
else:
|
||||
raise ValueError(f"bad g: {g}")
|
||||
|
||||
return candidate and factorize((big_p - 1) // 2)[0] == 1
|
||||
|
||||
|
||||
def check_p_and_g(p: bytes, g: int) -> bool:
|
||||
if not check_p_len(p):
|
||||
return False
|
||||
|
||||
return check_p_prime_and_subgroup(p, g)
|
Reference in New Issue
Block a user