Replace most raw API usage with new location

This commit is contained in:
Lonami Exo
2021-09-12 12:16:02 +02:00
parent a901d43a6d
commit d48649602b
53 changed files with 918 additions and 984 deletions

View File

@@ -6,17 +6,10 @@ import os
import time
from hashlib import sha1
from ..tl.types import (
ResPQ, PQInnerData, ServerDHParamsFail, ServerDHParamsOk,
ServerDHInnerData, ClientDHInnerData, DhGenOk, DhGenRetry, DhGenFail
)
from .. import helpers
from .. import helpers, _tl
from ..crypto import AES, AuthKey, Factorization, rsa
from ..errors import SecurityError
from ..extensions import BinaryReader
from ..tl.functions import (
ReqPqMultiRequest, ReqDHParamsRequest, SetClientDHParamsRequest
)
async def do_authentication(sender):
@@ -28,8 +21,8 @@ async def do_authentication(sender):
"""
# Step 1 sending: PQ Request, endianness doesn't matter since it's random
nonce = int.from_bytes(os.urandom(16), 'big', signed=True)
res_pq = await sender.send(ReqPqMultiRequest(nonce))
assert isinstance(res_pq, ResPQ), 'Step 1 answer was %s' % res_pq
res_pq = await sender.send(_tl.fn.ReqPqMulti(nonce))
assert isinstance(res_pq, _tl.ResPQ), 'Step 1 answer was %s' % res_pq
if res_pq.nonce != nonce:
raise SecurityError('Step 1 invalid nonce from server')
@@ -41,7 +34,7 @@ async def do_authentication(sender):
p, q = rsa.get_byte_array(p), rsa.get_byte_array(q)
new_nonce = int.from_bytes(os.urandom(32), 'little', signed=True)
pq_inner_data = bytes(PQInnerData(
pq_inner_data = bytes(_tl.PQInnerData(
pq=rsa.get_byte_array(pq), p=p, q=q,
nonce=res_pq.nonce,
server_nonce=res_pq.server_nonce,
@@ -72,7 +65,7 @@ async def do_authentication(sender):
)
)
server_dh_params = await sender.send(ReqDHParamsRequest(
server_dh_params = await sender.send(_tl.fn.ReqDHParams(
nonce=res_pq.nonce,
server_nonce=res_pq.server_nonce,
p=p, q=q,
@@ -81,7 +74,7 @@ async def do_authentication(sender):
))
assert isinstance(
server_dh_params, (ServerDHParamsOk, ServerDHParamsFail)),\
server_dh_params, (_tl.ServerDHParamsOk, _tl.ServerDHParamsFail)),\
'Step 2.1 answer was %s' % server_dh_params
if server_dh_params.nonce != res_pq.nonce:
@@ -90,7 +83,7 @@ async def do_authentication(sender):
if server_dh_params.server_nonce != res_pq.server_nonce:
raise SecurityError('Step 2 invalid server nonce from server')
if isinstance(server_dh_params, ServerDHParamsFail):
if isinstance(server_dh_params, _tl.ServerDHParamsFail):
nnh = int.from_bytes(
sha1(new_nonce.to_bytes(32, 'little', signed=True)).digest()[4:20],
'little', signed=True
@@ -98,7 +91,7 @@ async def do_authentication(sender):
if server_dh_params.new_nonce_hash != nnh:
raise SecurityError('Step 2 invalid DH fail nonce from server')
assert isinstance(server_dh_params, ServerDHParamsOk),\
assert isinstance(server_dh_params, _tl.ServerDHParamsOk),\
'Step 2.2 answer was %s' % server_dh_params
# Step 3 sending: Complete DH Exchange
@@ -116,7 +109,7 @@ async def do_authentication(sender):
with BinaryReader(plain_text_answer) as reader:
reader.read(20) # hash sum
server_dh_inner = reader.tgread_object()
assert isinstance(server_dh_inner, ServerDHInnerData),\
assert isinstance(server_dh_inner, _tl.ServerDHInnerData),\
'Step 3 answer was %s' % server_dh_inner
if server_dh_inner.nonce != res_pq.nonce:
@@ -157,7 +150,7 @@ async def do_authentication(sender):
raise SecurityError('g_b is not within (2^{2048-64}, dh_prime - 2^{2048-64})')
# Prepare client DH Inner Data
client_dh_inner = bytes(ClientDHInnerData(
client_dh_inner = bytes(_tl.ClientDHInnerData(
nonce=res_pq.nonce,
server_nonce=res_pq.server_nonce,
retry_id=0, # TODO Actual retry ID
@@ -170,13 +163,13 @@ async def do_authentication(sender):
client_dh_encrypted = AES.encrypt_ige(client_dh_inner_hashed, key, iv)
# Prepare Set client DH params
dh_gen = await sender.send(SetClientDHParamsRequest(
dh_gen = await sender.send(_tl.fn.SetClientDHParams(
nonce=res_pq.nonce,
server_nonce=res_pq.server_nonce,
encrypted_data=client_dh_encrypted,
))
nonce_types = (DhGenOk, DhGenRetry, DhGenFail)
nonce_types = (_tl.DhGenOk, _tl.DhGenRetry, _tl.DhGenFail)
assert isinstance(dh_gen, nonce_types), 'Step 3.1 answer was %s' % dh_gen
name = dh_gen.__class__.__name__
if dh_gen.nonce != res_pq.nonce:
@@ -194,7 +187,7 @@ async def do_authentication(sender):
if dh_hash != new_nonce_hash:
raise SecurityError('Step 3 invalid new nonce hash')
if not isinstance(dh_gen, DhGenOk):
if not isinstance(dh_gen, _tl.DhGenOk):
raise AssertionError('Step 3.2 answer was %s' % dh_gen)
return auth_key, time_offset