From 314e3596d2cf4599f7e38e48fbbdca0ac7635231 Mon Sep 17 00:00:00 2001 From: tcely Date: Sun, 15 Jun 2025 11:03:54 -0400 Subject: [PATCH 1/7] Add the `h_q_reset_tasks` function This is a better way than just removing the data while the consumer is running. --- tubesync/common/huey.py | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/tubesync/common/huey.py b/tubesync/common/huey.py index 87963726..71d4f8a4 100644 --- a/tubesync/common/huey.py +++ b/tubesync/common/huey.py @@ -25,6 +25,42 @@ def h_q_tuple(q, /): ) +def h_q_reset_tasks(q, /, *, maint_func=None): + # revoke to prevent pending tasks from executing + for t in q._registry._registry: + q.revoke_all(t, revoke_until=delay_to_eta(600)) + # clear scheduled tasks + q.storage.flush_schedule() + # clear pending tasks + q.storage.flush_queue() + # run the maintenance function + def default_maint_func(queue, /, exception=None, status=None): + if status is None: + return + if 'exception' == status and exception is not None: + # log, but do not raise an exception + from huey import logger + logger.error( + f'{queue.name}: maintenance function exception: {exception}' + ) + return + return True + maint_result = None + if maint_func is None: + maint_func = default_maint_func + if maint_func and callable(maint_func): + try: + maint_result = maint_func(q, status='started') + except Exception as exc: + maint_result = maint_func(q, exc=exc, status='exception') + pass + finally: + maint_func(q, status='finished') + # clear everything now that we are done + q.storage.flush_all() + q.flush() + + def sqlite_tasks(key, /, prefix=None): name_fmt = 'huey_{}' if prefix is None: From 97ef9e2f27f857d3745cda3919b0187dd296c651 Mon Sep 17 00:00:00 2001 From: tcely Date: Sun, 15 Jun 2025 11:07:58 -0400 Subject: [PATCH 2/7] fixup: return the results --- tubesync/common/huey.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tubesync/common/huey.py b/tubesync/common/huey.py index 71d4f8a4..8f9cea5a 100644 --- a/tubesync/common/huey.py +++ b/tubesync/common/huey.py @@ -59,6 +59,8 @@ def h_q_reset_tasks(q, /, *, maint_func=None): # clear everything now that we are done q.storage.flush_all() q.flush() + # return the results from the maintenance function + return maint_result def sqlite_tasks(key, /, prefix=None): From 98f5d4539181ca65c631c74b1f66792b0694f72d Mon Sep 17 00:00:00 2001 From: tcely Date: Sun, 15 Jun 2025 11:29:12 -0400 Subject: [PATCH 3/7] Accept queue name strings as well --- tubesync/common/huey.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tubesync/common/huey.py b/tubesync/common/huey.py index 8f9cea5a..a7fb2403 100644 --- a/tubesync/common/huey.py +++ b/tubesync/common/huey.py @@ -26,6 +26,9 @@ def h_q_tuple(q, /): def h_q_reset_tasks(q, /, *, maint_func=None): + if isinstance(q, str): + from django_huey import get_queue + q = get_queue(q) # revoke to prevent pending tasks from executing for t in q._registry._registry: q.revoke_all(t, revoke_until=delay_to_eta(600)) From 969848938872d2ec47b0e9fb12b5894a74321ca4 Mon Sep 17 00:00:00 2001 From: tcely Date: Sun, 15 Jun 2025 11:49:10 -0400 Subject: [PATCH 4/7] Reset Huey tasks from the CLI --- tubesync/sync/management/commands/reset-tasks.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tubesync/sync/management/commands/reset-tasks.py b/tubesync/sync/management/commands/reset-tasks.py index ae38a464..dc0029db 100644 --- a/tubesync/sync/management/commands/reset-tasks.py +++ b/tubesync/sync/management/commands/reset-tasks.py @@ -2,6 +2,8 @@ from django.core.management.base import BaseCommand, CommandError # noqa from django.db.transaction import atomic from django.utils.translation import gettext_lazy as _ from background_task.models import Task +from django_huey import DJANGO_HUEY +from common.huey import h_q_reset_tasks from common.logger import log from sync.models import Source from sync.tasks import index_source_task, check_source_directory_exists @@ -13,6 +15,8 @@ class Command(BaseCommand): def handle(self, *args, **options): log.info('Resettings all tasks...') + for queue_name in (DJANGO_HUEY or {}).get('queues', {}): + h_q_reset_tasks(queue_name) with atomic(durable=True): # Delete all tasks Task.objects.all().delete() From bda66c6df767191d2fa53950fd33f6484dc75dec Mon Sep 17 00:00:00 2001 From: tcely Date: Sun, 15 Jun 2025 11:54:25 -0400 Subject: [PATCH 5/7] Reset Huey tasks from the web UI --- tubesync/sync/views.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tubesync/sync/views.py b/tubesync/sync/views.py index 493098cd..145098f2 100644 --- a/tubesync/sync/views.py +++ b/tubesync/sync/views.py @@ -22,6 +22,8 @@ from django.utils.translation import gettext_lazy as _ from common.timestamp import timestamp_to_datetime from common.utils import append_uri_params, mkdir_p, multi_key_sort from background_task.models import Task, CompletedTask +from django_huey import DJANGO_HUEY +from common.huey import h_q_reset_tasks from .models import Source, Media, MediaServer from .forms import (ValidateSourceForm, ConfirmDeleteSourceForm, RedownloadMediaForm, SkipMediaForm, EnableMediaForm, ResetTasksForm, ScheduleTaskForm, @@ -992,6 +994,8 @@ class ResetTasks(FormView): def form_valid(self, form): # Delete all tasks Task.objects.all().delete() + for queue_name in (DJANGO_HUEY or {}).get('queues', {}): + h_q_reset_tasks(queue_name) # Iter all tasks for source in Source.objects.all(): verbose_name = _('Check download directory exists for source "{}"') From 9dc1376acc32751344f1724ed8e740d638bcc3ec Mon Sep 17 00:00:00 2001 From: tcely Date: Sun, 15 Jun 2025 11:58:51 -0400 Subject: [PATCH 6/7] fixup: match up the arguments --- tubesync/common/huey.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tubesync/common/huey.py b/tubesync/common/huey.py index a7fb2403..f33d946d 100644 --- a/tubesync/common/huey.py +++ b/tubesync/common/huey.py @@ -55,7 +55,7 @@ def h_q_reset_tasks(q, /, *, maint_func=None): try: maint_result = maint_func(q, status='started') except Exception as exc: - maint_result = maint_func(q, exc=exc, status='exception') + maint_result = maint_func(q, exception=exc, status='exception') pass finally: maint_func(q, status='finished') From 5db94c08d1fe124e8f40828ae5a86304fa2d5853 Mon Sep 17 00:00:00 2001 From: tcely Date: Sun, 15 Jun 2025 16:13:49 -0400 Subject: [PATCH 7/7] fixup: revoke the task objects --- tubesync/common/huey.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tubesync/common/huey.py b/tubesync/common/huey.py index f33d946d..1624d793 100644 --- a/tubesync/common/huey.py +++ b/tubesync/common/huey.py @@ -30,7 +30,7 @@ def h_q_reset_tasks(q, /, *, maint_func=None): from django_huey import get_queue q = get_queue(q) # revoke to prevent pending tasks from executing - for t in q._registry._registry: + for t in q._registry._registry.values(): q.revoke_all(t, revoke_until=delay_to_eta(600)) # clear scheduled tasks q.storage.flush_schedule()