diff --git a/tubesync/common/huey.py b/tubesync/common/huey.py index 87963726..1624d793 100644 --- a/tubesync/common/huey.py +++ b/tubesync/common/huey.py @@ -25,6 +25,47 @@ def h_q_tuple(q, /): ) +def h_q_reset_tasks(q, /, *, maint_func=None): + if isinstance(q, str): + from django_huey import get_queue + q = get_queue(q) + # revoke to prevent pending tasks from executing + for t in q._registry._registry.values(): + q.revoke_all(t, revoke_until=delay_to_eta(600)) + # clear scheduled tasks + q.storage.flush_schedule() + # clear pending tasks + q.storage.flush_queue() + # run the maintenance function + def default_maint_func(queue, /, exception=None, status=None): + if status is None: + return + if 'exception' == status and exception is not None: + # log, but do not raise an exception + from huey import logger + logger.error( + f'{queue.name}: maintenance function exception: {exception}' + ) + return + return True + maint_result = None + if maint_func is None: + maint_func = default_maint_func + if maint_func and callable(maint_func): + try: + maint_result = maint_func(q, status='started') + except Exception as exc: + maint_result = maint_func(q, exception=exc, status='exception') + pass + finally: + maint_func(q, status='finished') + # clear everything now that we are done + q.storage.flush_all() + q.flush() + # return the results from the maintenance function + return maint_result + + def sqlite_tasks(key, /, prefix=None): name_fmt = 'huey_{}' if prefix is None: diff --git a/tubesync/sync/management/commands/reset-tasks.py b/tubesync/sync/management/commands/reset-tasks.py index ae38a464..dc0029db 100644 --- a/tubesync/sync/management/commands/reset-tasks.py +++ b/tubesync/sync/management/commands/reset-tasks.py @@ -2,6 +2,8 @@ from django.core.management.base import BaseCommand, CommandError # noqa from django.db.transaction import atomic from django.utils.translation import gettext_lazy as _ from background_task.models import Task +from django_huey import DJANGO_HUEY +from common.huey import h_q_reset_tasks from common.logger import log from sync.models import Source from sync.tasks import index_source_task, check_source_directory_exists @@ -13,6 +15,8 @@ class Command(BaseCommand): def handle(self, *args, **options): log.info('Resettings all tasks...') + for queue_name in (DJANGO_HUEY or {}).get('queues', {}): + h_q_reset_tasks(queue_name) with atomic(durable=True): # Delete all tasks Task.objects.all().delete() diff --git a/tubesync/sync/views.py b/tubesync/sync/views.py index 493098cd..145098f2 100644 --- a/tubesync/sync/views.py +++ b/tubesync/sync/views.py @@ -22,6 +22,8 @@ from django.utils.translation import gettext_lazy as _ from common.timestamp import timestamp_to_datetime from common.utils import append_uri_params, mkdir_p, multi_key_sort from background_task.models import Task, CompletedTask +from django_huey import DJANGO_HUEY +from common.huey import h_q_reset_tasks from .models import Source, Media, MediaServer from .forms import (ValidateSourceForm, ConfirmDeleteSourceForm, RedownloadMediaForm, SkipMediaForm, EnableMediaForm, ResetTasksForm, ScheduleTaskForm, @@ -992,6 +994,8 @@ class ResetTasks(FormView): def form_valid(self, form): # Delete all tasks Task.objects.all().delete() + for queue_name in (DJANGO_HUEY or {}).get('queues', {}): + h_q_reset_tasks(queue_name) # Iter all tasks for source in Source.objects.all(): verbose_name = _('Check download directory exists for source "{}"')