mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2025-08-08 12:59:46 +00:00
Use pytest-asyncio for client and sender tests
This commit is contained in:
@@ -13,42 +13,39 @@ TELEGRAM_DEFAULT_TEST_DC = TELEGRAM_TEST_DC_2
|
||||
TEST_TIMEOUT = 10000
|
||||
|
||||
|
||||
def test_invoke_encrypted_method(caplog: LogCaptureFixture) -> None:
|
||||
async def test_invoke_encrypted_method(caplog: LogCaptureFixture) -> None:
|
||||
caplog.set_level(logging.DEBUG)
|
||||
|
||||
async def func() -> None:
|
||||
deadline = asyncio.get_running_loop().time() + TEST_TIMEOUT
|
||||
deadline = asyncio.get_running_loop().time() + TEST_TIMEOUT
|
||||
|
||||
def timeout() -> float:
|
||||
return deadline - asyncio.get_running_loop().time()
|
||||
def timeout() -> float:
|
||||
return deadline - asyncio.get_running_loop().time()
|
||||
|
||||
sender = await asyncio.wait_for(
|
||||
connect(Full(), TELEGRAM_DEFAULT_TEST_DC), timeout()
|
||||
sender = await asyncio.wait_for(
|
||||
connect(Full(), TELEGRAM_DEFAULT_TEST_DC), timeout()
|
||||
)
|
||||
|
||||
rx = sender.enqueue(
|
||||
functions.invoke_with_layer(
|
||||
layer=LAYER,
|
||||
query=functions.init_connection(
|
||||
api_id=1,
|
||||
device_model="Test",
|
||||
system_version="0.1",
|
||||
app_version="0.1",
|
||||
system_lang_code="en",
|
||||
lang_pack="",
|
||||
lang_code="",
|
||||
proxy=None,
|
||||
params=None,
|
||||
query=functions.help.get_nearest_dc(),
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
rx = sender.enqueue(
|
||||
functions.invoke_with_layer(
|
||||
layer=LAYER,
|
||||
query=functions.init_connection(
|
||||
api_id=1,
|
||||
device_model="Test",
|
||||
system_version="0.1",
|
||||
app_version="0.1",
|
||||
system_lang_code="en",
|
||||
lang_pack="",
|
||||
lang_code="",
|
||||
proxy=None,
|
||||
params=None,
|
||||
query=functions.help.get_nearest_dc(),
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
while True:
|
||||
await asyncio.wait_for(sender.step(), timeout=timeout())
|
||||
if rx.done():
|
||||
nearest = abcs.NearestDc.from_bytes(rx.result())
|
||||
assert isinstance(nearest, types.NearestDc)
|
||||
break
|
||||
|
||||
asyncio.run(func())
|
||||
while True:
|
||||
await asyncio.wait_for(sender.step(), timeout=timeout())
|
||||
if rx.done():
|
||||
nearest = abcs.NearestDc.from_bytes(rx.result())
|
||||
assert isinstance(nearest, types.NearestDc)
|
||||
break
|
||||
|
Reference in New Issue
Block a user