Telethon doesn't come with scheduling builtin because it would be unnecessary bloat. If you don't want more dependencies, you can just use asyncio
to schedule functions to run later or at a given time.
For example, if you want to run foo
after 10 minutes:
async def foo():
print('hi')
def foo_cb():
client.loop.create_task(foo())
client.loop.call_later(10 * 60, foo_cb)
loop.call_later()
can only schedule synchronous functions (foo_cb
), but you can use create_task
to spawn a background, asynchronous task.
You can also run something "every N seconds". For example, to repeat foo
every 10 minutes:
async def foo():
print('hi')
def foo_cb():
client.loop.create_task(foo())
client.loop.call_later(10 * 60, foo_cb)
foo_cb()
This schedules itself again when it runs, so it is always running. It runs, 10 minutes pass, it schedules itself again, and repeat. This is similar to JavaScript's setTimeout
.
Important note. Do not call the synchronous function when you schedule it, or it will recurse forever. loop.call_later(delay, func())
won't work, but loop.call_later(delay, func)
will.
You can also use while
loops:
async def foo():
while True:
print('hi')
await sleep(10 * 60)
client.loop.create_task(foo())
When you're done setting everything up, the event loop must be running or things won't work. You can do this with:
client.run_until_disconnected()
or:
loop.run_forever()
Also if you encounter this error ConnectionError: Cannot send requests while disconnected
that means you first need to start the client client.start()
then call foo_cb()
:
client.start()
foo_cb()
client.run_until_disconnected()
If you don't mind using other libraries, aiocron
is a good option. See https://crontab.guru/ to learn its time syntax.
You can also use apscheduler
, which is much simpler and much efficient.
Here is a example for running a job (Function) everyday at 6am.
from apscheduler.schedulers.asyncio import AsyncIOScheduler
async def greet_me():
print("Good morning Midhun.")
await client.send_message('me', 'Good Morning To Myself !')
scheduler = AsyncIOScheduler()
scheduler.add_job(greet_me, trigger="cron", hour=6)
scheduler.start()
You can also run a job at specific intervals, here is a example for it.
from apscheduler.schedulers.asyncio import AsyncIOScheduler
async def this_will_run_at_every_one_hour():
await client.send_message('me', 'One hour has passed.')
scheduler = AsyncIOScheduler()
scheduler.add_job(this_will_run_at_every_one_hour, 'interval', minutes=60)
scheduler.start()
Note that for actually being able to use this scheduler you will need to import asyncio.run
and write an async main loop function to run your script and avoid errors. example is given below(edit according to your own code if needed):
async def main():
await client.start()
scheduler.start()
print("Scheduler started...\n(Press Ctrl+C to stop this)")
await client.run_until_disconnected()
if __name__ == "__main__":
try:
asyncio.run(main())
except (KeyboardInterrupt, SystemExit):
pass
You can also set a custom time zone and can also use a database for running these functions.
This is more advanced and works smoothly with Telethon.
Read more about this Here