This commit is contained in:
tcely 2025-06-15 20:13:52 +00:00 committed by GitHub
commit 8766bb3b7b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 49 additions and 0 deletions

View File

@ -25,6 +25,47 @@ def h_q_tuple(q, /):
)
def h_q_reset_tasks(q, /, *, maint_func=None):
if isinstance(q, str):
from django_huey import get_queue
q = get_queue(q)
# revoke to prevent pending tasks from executing
for t in q._registry._registry.values():
q.revoke_all(t, revoke_until=delay_to_eta(600))
# clear scheduled tasks
q.storage.flush_schedule()
# clear pending tasks
q.storage.flush_queue()
# run the maintenance function
def default_maint_func(queue, /, exception=None, status=None):
if status is None:
return
if 'exception' == status and exception is not None:
# log, but do not raise an exception
from huey import logger
logger.error(
f'{queue.name}: maintenance function exception: {exception}'
)
return
return True
maint_result = None
if maint_func is None:
maint_func = default_maint_func
if maint_func and callable(maint_func):
try:
maint_result = maint_func(q, status='started')
except Exception as exc:
maint_result = maint_func(q, exception=exc, status='exception')
pass
finally:
maint_func(q, status='finished')
# clear everything now that we are done
q.storage.flush_all()
q.flush()
# return the results from the maintenance function
return maint_result
def sqlite_tasks(key, /, prefix=None):
name_fmt = 'huey_{}'
if prefix is None:

View File

@ -2,6 +2,8 @@ from django.core.management.base import BaseCommand, CommandError # noqa
from django.db.transaction import atomic
from django.utils.translation import gettext_lazy as _
from background_task.models import Task
from django_huey import DJANGO_HUEY
from common.huey import h_q_reset_tasks
from common.logger import log
from sync.models import Source
from sync.tasks import index_source_task, check_source_directory_exists
@ -13,6 +15,8 @@ class Command(BaseCommand):
def handle(self, *args, **options):
log.info('Resettings all tasks...')
for queue_name in (DJANGO_HUEY or {}).get('queues', {}):
h_q_reset_tasks(queue_name)
with atomic(durable=True):
# Delete all tasks
Task.objects.all().delete()

View File

@ -22,6 +22,8 @@ from django.utils.translation import gettext_lazy as _
from common.timestamp import timestamp_to_datetime
from common.utils import append_uri_params, mkdir_p, multi_key_sort
from background_task.models import Task, CompletedTask
from django_huey import DJANGO_HUEY
from common.huey import h_q_reset_tasks
from .models import Source, Media, MediaServer
from .forms import (ValidateSourceForm, ConfirmDeleteSourceForm, RedownloadMediaForm,
SkipMediaForm, EnableMediaForm, ResetTasksForm, ScheduleTaskForm,
@ -992,6 +994,8 @@ class ResetTasks(FormView):
def form_valid(self, form):
# Delete all tasks
Task.objects.all().delete()
for queue_name in (DJANGO_HUEY or {}).get('queues', {}):
h_q_reset_tasks(queue_name)
# Iter all tasks
for source in Source.objects.all():
verbose_name = _('Check download directory exists for source "{}"')