Fix mtsender enqueuer and complete test

This commit is contained in:
Lonami Exo
2023-08-31 12:27:43 +02:00
parent dfc540c472
commit 7166059132
2 changed files with 30 additions and 14 deletions

View File

@@ -4,6 +4,7 @@ import logging
from pytest import LogCaptureFixture
from telethon._impl.mtproto.transport.full import Full
from telethon._impl.mtsender.sender import connect
from telethon._impl.tl import LAYER, abcs, functions, types
TELEGRAM_TEST_DC_2 = "149.154.167.40:443"
@@ -25,7 +26,29 @@ def test_invoke_encrypted_method(caplog: LogCaptureFixture) -> None:
connect(Full(), TELEGRAM_DEFAULT_TEST_DC), timeout()
)
# TODO test enqueuer
sender, enqueuer
rx = enqueuer.enqueue(
functions.invoke_with_layer(
layer=LAYER,
query=functions.init_connection(
api_id=1,
device_model="Test",
system_version="0.1",
app_version="0.1",
system_lang_code="en",
lang_pack="",
lang_code="",
proxy=None,
params=None,
query=functions.help.get_nearest_dc(),
),
)
)
while True:
await asyncio.wait_for(sender.step(), timeout=timeout())
if rx.done():
nearest = abcs.NearestDc.from_bytes(rx.result())
assert isinstance(nearest, types.NearestDc)
break
asyncio.run(func())