From f455350e1362c97607807042ca7a563b792b3c9e Mon Sep 17 00:00:00 2001 From: tcely Date: Thu, 12 Jun 2025 16:21:29 -0400 Subject: [PATCH 01/20] Migrate some tasks to `huey` --- tubesync/sync/tasks.py | 71 +++++++++++++++++++----------------------- 1 file changed, 32 insertions(+), 39 deletions(-) diff --git a/tubesync/sync/tasks.py b/tubesync/sync/tasks.py index 8a2ea9a5..741a8468 100644 --- a/tubesync/sync/tasks.py +++ b/tubesync/sync/tasks.py @@ -25,6 +25,8 @@ from django.db.transaction import atomic from django.utils import timezone from django.utils.translation import gettext_lazy as _ from background_task import background +from django_huey import db_periodic_task, db_task, task # noqa +from huey import CancelExecution from background_task.exceptions import InvalidTaskError from background_task.models import Task, CompletedTask from common.logger import log @@ -63,7 +65,6 @@ def map_task_to_instance(task): 'sync.tasks.download_media': Media, 'sync.tasks.download_media_metadata': Media, 'sync.tasks.save_all_media_for_source': Source, - 'sync.tasks.refresh_formats': Media, 'sync.tasks.rename_media': Media, 'sync.tasks.rename_all_media_for_source': Source, 'sync.tasks.wait_for_media_premiere': Media, @@ -209,29 +210,22 @@ def migrate_queues(): def save_model(instance): + with atomic(durable=False): + instance.save() if 'sqlite' != db_vendor: - with atomic(durable=False): - instance.save() return # work around for SQLite and its many # "database is locked" errors - with atomic(durable=False): - instance.save() arg = getattr(settings, 'SQLITE_DELAY_FLOAT', 1.5) time.sleep(random.expovariate(arg)) -@atomic(durable=False) def schedule_media_servers_update(): # Schedule a task to update media servers log.info('Scheduling media server updates') - verbose_name = _('Request media server rescan for "{}"') for mediaserver in MediaServer.objects.all(): - rescan_media_server( - str(mediaserver.pk), - verbose_name=verbose_name.format(mediaserver), - ) + rescan_media_server(str(mediaserver.pk)) def wait_for_errors(model, /, *, task_name=None): @@ -266,8 +260,9 @@ def wait_for_errors(model, /, *, task_name=None): update_task_status(task, None) -def cleanup_old_media(): - with atomic(): +@db_task(queue=Val(TaskQueue.FS)) +def cleanup_old_media(durable=True): + with atomic(durable=durable): for source in qs_gen(Source.objects.filter(delete_old_media=True, days_to_keep__gt=0)): delta = timezone.now() - timedelta(days=source.days_to_keep) mqs = source.media_source.defer( @@ -280,13 +275,19 @@ def cleanup_old_media(): log.info(f'Deleting expired media: {source} / {media} ' f'(now older than {source.days_to_keep} days / ' f'download_date before {delta})') - with atomic(): + with atomic(durable=False): # .delete() also triggers a pre_delete/post_delete signals that remove files media.delete() schedule_media_servers_update() -def cleanup_removed_media(source, video_keys): +@db_task(queue=Val(TaskQueue.FS)) +def cleanup_removed_media(source_id, video_keys): + try: + source = Source.objects.get(pk=source_id) + except Source.DoesNotExist as e: + # Task triggered but the Source has been deleted, delete the task + raise CancelExecution(_('no such source'), retry=False) from e if not source.delete_removed_media: return log.info(f'Cleaning up media no longer in source: {source}') @@ -295,11 +296,12 @@ def cleanup_removed_media(source, video_keys): ).filter( source=source, ) - for media in qs_gen(mqs): - if media.key not in video_keys: - log.info(f'{media.name} is no longer in source, removing') - with atomic(): - media.delete() + with atomic(durable=True): + for media in qs_gen(mqs): + if media.key not in video_keys: + log.info(f'{media.name} is no longer in source, removing') + with atomic(durable=False): + media.delete() schedule_media_servers_update() @@ -519,7 +521,7 @@ def index_source_task(source_id): save_db_batch(Metadata.objects, db_batch_data, db_fields_data) save_db_batch(Media.objects, db_batch_media, db_fields_media) # Cleanup of media no longer available from the source - cleanup_removed_media(source, video_keys) + cleanup_removed_media(str(source.pk), video_keys) # Clear references to indexed data videos = video = None db_batch_data.clear() @@ -552,7 +554,7 @@ def check_source_directory_exists(source_id): source.make_directory() -@background(schedule=dict(priority=10, run_at=10), queue=Val(TaskQueue.NET)) +@db_task(delay=10, priority=90, queue=Val(TaskQueue.NET)) def download_source_images(source_id): ''' Downloads an image and save it as a local thumbnail attached to a @@ -564,7 +566,7 @@ def download_source_images(source_id): # Task triggered but the source no longer exists, do nothing log.error(f'Task download_source_images(pk={source_id}) called but no ' f'source exists with ID: {source_id}') - raise InvalidTaskError(_('no such source')) from e + raise CancelExecution(_('no such source'), retry=False) from e avatar, banner = source.get_image_url log.info(f'Thumbnail URL for source with ID: {source_id} / {source} ' f'Avatar: {avatar} ' @@ -783,10 +785,7 @@ def download_media(media_id, override=False): # Try refreshing formats if media.has_metadata: log.debug(f'Scheduling a task to refresh metadata for: {media.key}: "{media.name}"') - refresh_formats( - str(media.pk), - verbose_name=f'Refreshing metadata formats for: {media.key}: "{media.name}"', - ) + refresh_formats(str(media.pk)) log.exception(str(e)) raise else: @@ -794,10 +793,7 @@ def download_media(media_id, override=False): # Try refreshing formats if media.has_metadata: log.debug(f'Scheduling a task to refresh metadata for: {media.key}: "{media.name}"') - refresh_formats( - str(media.pk), - verbose_name=f'Refreshing metadata formats for: {media.key}: "{media.name}"', - ) + refresh_formats(str(media.pk)) # Expected file doesn't exist on disk err = (f'Failed to download media: {media} (UUID: {media.pk}) to disk, ' f'expected outfile does not exist: {filepath}') @@ -814,7 +810,7 @@ def download_media(media_id, override=False): schedule_media_servers_update() -@background(schedule=dict(priority=0, run_at=30), queue=Val(TaskQueue.NET), remove_existing_tasks=True) +@db_task(delay=30, expire=60, priority=100, queue=Val(TaskQueue.NET)) def rescan_media_server(mediaserver_id): ''' Attempts to request a media rescan on a remote media server. @@ -823,7 +819,7 @@ def rescan_media_server(mediaserver_id): mediaserver = MediaServer.objects.get(pk=mediaserver_id) except MediaServer.DoesNotExist as e: # Task triggered but the media server no longer exists, do nothing - raise InvalidTaskError(_('no such server')) from e + raise CancelExecution(_('no such server'), retry=False) from e # Request an rescan / update log.info(f'Updating media server: {mediaserver}') mediaserver.update() @@ -876,10 +872,7 @@ def save_all_media_for_source(source_id): tvn_format = '1/{:,}' + f'/{refresh_qs.count():,}' for mn, media in enumerate(qs_gen(refresh_qs), start=1): update_task_status(task, tvn_format.format(mn)) - refresh_formats( - str(media.pk), - verbose_name=f'Refreshing metadata formats for: {media.key}: "{media.name}"', - ) + refresh_formats(str(media.pk)) saved_later.add(media.uuid) # Keep out of the way of the index task! @@ -901,12 +894,12 @@ def save_all_media_for_source(source_id): update_task_status(task, None) -@background(schedule=dict(priority=50, run_at=0), queue=Val(TaskQueue.NET), remove_existing_tasks=True) +@db_task(priority=50, queue=Val(TaskQueue.LIMIT)) def refresh_formats(media_id): try: media = Media.objects.get(pk=media_id) except Media.DoesNotExist as e: - raise InvalidTaskError(_('no such media')) from e + raise CancelExecution(_('no such media'), retry=False) from e try: media.refresh_formats except YouTubeError as e: From 1fad2902977cef5d5e0e976fcec5db997104f3e0 Mon Sep 17 00:00:00 2001 From: tcely Date: Thu, 12 Jun 2025 16:35:59 -0400 Subject: [PATCH 02/20] Call `cleanup_old_media` function directly --- tubesync/sync/tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tubesync/sync/tests.py b/tubesync/sync/tests.py index 9d5ce991..bf1e4932 100644 --- a/tubesync/sync/tests.py +++ b/tubesync/sync/tests.py @@ -1833,7 +1833,7 @@ class TasksTestCase(TestCase): self.assertEqual(src1.media_source.all().count(), 3) self.assertEqual(src2.media_source.all().count(), 3) - cleanup_old_media() + cleanup_old_media.call_local(durable=False) self.assertEqual(src1.media_source.all().count(), 3) self.assertEqual(src2.media_source.all().count(), 3) From bcd758c0288f2f207eaa5194aff72d1783ed0ced Mon Sep 17 00:00:00 2001 From: tcely Date: Thu, 12 Jun 2025 16:49:40 -0400 Subject: [PATCH 03/20] Set `fuzzy_paths` to an empty list by default --- tubesync/sync/models/media.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tubesync/sync/models/media.py b/tubesync/sync/models/media.py index 42d2d891..0a2e06b5 100644 --- a/tubesync/sync/models/media.py +++ b/tubesync/sync/models/media.py @@ -1162,6 +1162,7 @@ class Media(models.Model): log.info(f'Collected {len(other_paths)} other paths for: {self!s}') # adopt orphaned files, if possible + fuzzy_paths = list() media_format = str(self.source.media_format) top_dir_path = Path(self.source.directory_path) if '{key}' in media_format: From 0ddbb80422f3f505ae97d6834d23e61d020b87c4 Mon Sep 17 00:00:00 2001 From: tcely Date: Thu, 12 Jun 2025 16:56:28 -0400 Subject: [PATCH 04/20] Update signals.py --- tubesync/sync/signals.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tubesync/sync/signals.py b/tubesync/sync/signals.py index 78329804..998ab3a3 100644 --- a/tubesync/sync/signals.py +++ b/tubesync/sync/signals.py @@ -111,10 +111,7 @@ def source_post_save(sender, instance, created, **kwargs): verbose_name=verbose_name.format(instance.name), ) if instance.source_type != Val(YouTube_SourceType.PLAYLIST) and instance.copy_channel_images: - download_source_images( - str(instance.pk), - verbose_name=verbose_name.format(instance.name), - ) + download_source_images(str(instance.pk)) if instance.index_schedule > 0: delete_task_by_source('sync.tasks.index_source_task', instance.pk) log.info(f'Scheduling first media indexing for source: {instance.name}') From 6147d877bd177ae19ed10964f0df9bfa221e4458 Mon Sep 17 00:00:00 2001 From: tcely Date: Thu, 12 Jun 2025 17:10:18 -0400 Subject: [PATCH 05/20] Update tasks.py --- tubesync/sync/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tubesync/sync/tasks.py b/tubesync/sync/tasks.py index 741a8468..3b26111a 100644 --- a/tubesync/sync/tasks.py +++ b/tubesync/sync/tasks.py @@ -25,7 +25,7 @@ from django.db.transaction import atomic from django.utils import timezone from django.utils.translation import gettext_lazy as _ from background_task import background -from django_huey import db_periodic_task, db_task, task # noqa +from django_huey import db_periodic_task, db_task, task as huey_task # noqa from huey import CancelExecution from background_task.exceptions import InvalidTaskError from background_task.models import Task, CompletedTask From be6189bde05228af016b6a75439c62e82b5e4498 Mon Sep 17 00:00:00 2001 From: tcely Date: Thu, 12 Jun 2025 17:45:36 -0400 Subject: [PATCH 06/20] Expires media server update attempts after 3 minutes --- tubesync/sync/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tubesync/sync/tasks.py b/tubesync/sync/tasks.py index 3b26111a..4e9b41c1 100644 --- a/tubesync/sync/tasks.py +++ b/tubesync/sync/tasks.py @@ -810,7 +810,7 @@ def download_media(media_id, override=False): schedule_media_servers_update() -@db_task(delay=30, expire=60, priority=100, queue=Val(TaskQueue.NET)) +@db_task(delay=30, expires=210, priority=100, queue=Val(TaskQueue.NET)) def rescan_media_server(mediaserver_id): ''' Attempts to request a media rescan on a remote media server. From 66695a5f0aecafc8f051d62ad94bd6dce59e02ff Mon Sep 17 00:00:00 2001 From: tcely Date: Fri, 13 Jun 2025 05:56:40 -0400 Subject: [PATCH 07/20] Add my silly functions for examining queue state --- tubesync/common/huey.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tubesync/common/huey.py b/tubesync/common/huey.py index 66d13fc2..34c757ae 100644 --- a/tubesync/common/huey.py +++ b/tubesync/common/huey.py @@ -1,5 +1,20 @@ +def h_q_dict(q, /): + return dict( + scheduled=(q.scheduled_count(), q.scheduled(),), + pending=(q.pending_count(), q.pending(),), + result=(q.result_count(), q.all_results(),), + ) + + +def h_q_tuple(q, /): + return ( + q.name, + h_q_dict(q), + ) + + def sqlite_tasks(key, /, prefix=None): name_fmt = 'huey_{}' if prefix is None: From 2378ee8649003fbb25c23cdddded1d6125b8f385 Mon Sep 17 00:00:00 2001 From: tcely Date: Fri, 13 Jun 2025 06:15:10 -0400 Subject: [PATCH 08/20] Also accept the queue name --- tubesync/common/huey.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tubesync/common/huey.py b/tubesync/common/huey.py index 34c757ae..543790aa 100644 --- a/tubesync/common/huey.py +++ b/tubesync/common/huey.py @@ -9,6 +9,9 @@ def h_q_dict(q, /): def h_q_tuple(q, /): + if isinstance(q, str): + from django_huey import get_queue + q = get_queue(q) return ( q.name, h_q_dict(q), From 19983f06cbf5b9faee57996875fd61bb615e4d1a Mon Sep 17 00:00:00 2001 From: tcely Date: Sat, 14 Jun 2025 12:03:55 -0400 Subject: [PATCH 09/20] Add the `delay_to_eta` function --- tubesync/common/huey.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tubesync/common/huey.py b/tubesync/common/huey.py index 543790aa..e080d97f 100644 --- a/tubesync/common/huey.py +++ b/tubesync/common/huey.py @@ -1,5 +1,10 @@ +def delay_to_eta(delay, /): + from huey.utils import normalize_time + return normalize_time(delay=delay) + + def h_q_dict(q, /): return dict( scheduled=(q.scheduled_count(), q.scheduled(),), From 897aa9ce7aec9b777becf67a015001474c1fc957 Mon Sep 17 00:00:00 2001 From: tcely Date: Sat, 14 Jun 2025 12:45:10 -0400 Subject: [PATCH 10/20] Display which functions are registered --- tubesync/common/huey.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tubesync/common/huey.py b/tubesync/common/huey.py index e080d97f..fa36c619 100644 --- a/tubesync/common/huey.py +++ b/tubesync/common/huey.py @@ -19,6 +19,7 @@ def h_q_tuple(q, /): q = get_queue(q) return ( q.name, + list(q._registry._registry.keys()), h_q_dict(q), ) From 3b754622f5f4fd63bef678878f775d19cc514926 Mon Sep 17 00:00:00 2001 From: tcely Date: Sat, 14 Jun 2025 13:56:15 -0400 Subject: [PATCH 11/20] Add a `exponential_backoff` decorator to mimic the `background` delays --- tubesync/common/huey.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/tubesync/common/huey.py b/tubesync/common/huey.py index fa36c619..3ea849df 100644 --- a/tubesync/common/huey.py +++ b/tubesync/common/huey.py @@ -1,3 +1,4 @@ +from functools import wraps def delay_to_eta(delay, /): @@ -59,3 +60,29 @@ def sqlite_tasks(key, /, prefix=None): ), ) + +def exponential_backoff(task_func=None, /, *args, *, **kwargs): + if task_func is None: + from django_huey import task as huey_task + task_func = huey_task + def backoff(attempt, /): + return (5+(attempt**4)) + def deco(fn): + @wraps(fn) + def inner(*a, **kwa): + task = kwa.pop('task') + try: + return fn(*a, **kwa) + except Exception as exc: + attempt = 1 + while task.retry_delay <= backoff(attempt): + attempt += 1 + task.retry_delay = backoff(attempt) + raise exc + kwargs.update(dict( + context=True, + retry_delay=backoff(1), + )) + return task_func(*args, **kwargs)(inner) + return deco + From 93b5ef7d43b28861b962d1881d3a372542afca05 Mon Sep 17 00:00:00 2001 From: tcely Date: Sat, 14 Jun 2025 13:59:15 -0400 Subject: [PATCH 12/20] fixup: syntax --- tubesync/common/huey.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tubesync/common/huey.py b/tubesync/common/huey.py index 3ea849df..4fc44c84 100644 --- a/tubesync/common/huey.py +++ b/tubesync/common/huey.py @@ -61,7 +61,7 @@ def sqlite_tasks(key, /, prefix=None): ) -def exponential_backoff(task_func=None, /, *args, *, **kwargs): +def exponential_backoff(task_func=None, /, *args, **kwargs): if task_func is None: from django_huey import task as huey_task task_func = huey_task From 328682160e3e7b35b261b38b21a016719db0be29 Mon Sep 17 00:00:00 2001 From: tcely Date: Sat, 14 Jun 2025 14:58:53 -0400 Subject: [PATCH 13/20] Better handling of large retries --- tubesync/common/huey.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tubesync/common/huey.py b/tubesync/common/huey.py index 4fc44c84..a4c7866a 100644 --- a/tubesync/common/huey.py +++ b/tubesync/common/huey.py @@ -74,10 +74,13 @@ def exponential_backoff(task_func=None, /, *args, **kwargs): try: return fn(*a, **kwa) except Exception as exc: - attempt = 1 - while task.retry_delay <= backoff(attempt): - attempt += 1 - task.retry_delay = backoff(attempt) + for attempt in range(1, 1_001): + if backoff(attempt) > task.retry_delay: + task.retry_delay = backoff(attempt) + break + # insanity, but handle it anyway + if 1_000 == attempt: + task.retry_delay = backoff(attempt) raise exc kwargs.update(dict( context=True, From 8b31be124256336df3fcb3bd6a57ceec12e04e49 Mon Sep 17 00:00:00 2001 From: tcely Date: Sat, 14 Jun 2025 15:16:18 -0400 Subject: [PATCH 14/20] Retry `refresh_formats` using `exponential_backoff` --- tubesync/sync/tasks.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tubesync/sync/tasks.py b/tubesync/sync/tasks.py index 4e9b41c1..a356ac95 100644 --- a/tubesync/sync/tasks.py +++ b/tubesync/sync/tasks.py @@ -25,10 +25,11 @@ from django.db.transaction import atomic from django.utils import timezone from django.utils.translation import gettext_lazy as _ from background_task import background -from django_huey import db_periodic_task, db_task, task as huey_task # noqa -from huey import CancelExecution from background_task.exceptions import InvalidTaskError from background_task.models import Task, CompletedTask +from django_huey import db_periodic_task, db_task, task as huey_task # noqa +from huey import CancelExecution +from common.huey import exponential_backoff from common.logger import log from common.errors import ( BgTaskWorkerError, DownloadFailedException, NoFormatException, NoMediaException, @@ -894,7 +895,7 @@ def save_all_media_for_source(source_id): update_task_status(task, None) -@db_task(priority=50, queue=Val(TaskQueue.LIMIT)) +@exponential_backoff(db_task, priority=50, retries=15, queue=Val(TaskQueue.LIMIT)) def refresh_formats(media_id): try: media = Media.objects.get(pk=media_id) From 7c3404573c2f635327b1777c657fbb3098a83632 Mon Sep 17 00:00:00 2001 From: tcely Date: Sat, 14 Jun 2025 15:33:32 -0400 Subject: [PATCH 15/20] Retry `download_source_images` using `exponential_backoff` --- tubesync/sync/tasks.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/tubesync/sync/tasks.py b/tubesync/sync/tasks.py index a356ac95..37465ca0 100644 --- a/tubesync/sync/tasks.py +++ b/tubesync/sync/tasks.py @@ -555,7 +555,7 @@ def check_source_directory_exists(source_id): source.make_directory() -@db_task(delay=10, priority=90, queue=Val(TaskQueue.NET)) +@exponential_backoff(db_task, delay=10, priority=90, retries=15, queue=Val(TaskQueue.NET)) def download_source_images(source_id): ''' Downloads an image and save it as a local thumbnail attached to a @@ -901,13 +901,9 @@ def refresh_formats(media_id): media = Media.objects.get(pk=media_id) except Media.DoesNotExist as e: raise CancelExecution(_('no such media'), retry=False) from e - try: - media.refresh_formats - except YouTubeError as e: - log.debug(f'Failed to refresh formats for: {media.source} / {media.key}: {e!s}') - pass else: - save_model(media) + if media.refresh_formats: + save_model(media) @background(schedule=dict(priority=20, run_at=60), queue=Val(TaskQueue.FS), remove_existing_tasks=True) From be8a2035ba87fb3bbbe15e2bc772b6abff1e7bf2 Mon Sep 17 00:00:00 2001 From: tcely Date: Sat, 14 Jun 2025 16:15:19 -0400 Subject: [PATCH 16/20] Best wishes to the next century --- tubesync/common/huey.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tubesync/common/huey.py b/tubesync/common/huey.py index a4c7866a..73062fd3 100644 --- a/tubesync/common/huey.py +++ b/tubesync/common/huey.py @@ -74,12 +74,12 @@ def exponential_backoff(task_func=None, /, *args, **kwargs): try: return fn(*a, **kwa) except Exception as exc: - for attempt in range(1, 1_001): + for attempt in range(1, 240): if backoff(attempt) > task.retry_delay: task.retry_delay = backoff(attempt) break # insanity, but handle it anyway - if 1_000 == attempt: + if 239 == attempt: task.retry_delay = backoff(attempt) raise exc kwargs.update(dict( From f73ac4e59a4f7d90d9026b44b4a73fc3ed35d0fa Mon Sep 17 00:00:00 2001 From: tcely Date: Sun, 15 Jun 2025 05:55:08 -0400 Subject: [PATCH 17/20] The encoded bytes are not useful to print --- tubesync/common/huey.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tubesync/common/huey.py b/tubesync/common/huey.py index 73062fd3..528dfe2b 100644 --- a/tubesync/common/huey.py +++ b/tubesync/common/huey.py @@ -10,7 +10,7 @@ def h_q_dict(q, /): return dict( scheduled=(q.scheduled_count(), q.scheduled(),), pending=(q.pending_count(), q.pending(),), - result=(q.result_count(), q.all_results(),), + result=(q.result_count(), q.all_results().keys(),), ) From 94b71cbfb9d85d47112a38769db8928e6640de30 Mon Sep 17 00:00:00 2001 From: tcely Date: Sun, 15 Jun 2025 06:22:18 -0400 Subject: [PATCH 18/20] Allow custom back off functions --- tubesync/common/huey.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/tubesync/common/huey.py b/tubesync/common/huey.py index 528dfe2b..3f02c2a0 100644 --- a/tubesync/common/huey.py +++ b/tubesync/common/huey.py @@ -65,15 +65,29 @@ def exponential_backoff(task_func=None, /, *args, **kwargs): if task_func is None: from django_huey import task as huey_task task_func = huey_task - def backoff(attempt, /): + backoff_func = kwargs.pop('backoff_func', None) + def default_backoff(attempt, /): return (5+(attempt**4)) + if backoff_func is None or not callable(backoff_func): + backoff_func = default_backoff def deco(fn): @wraps(fn) def inner(*a, **kwa): - task = kwa.pop('task') + backoff = backoff_func + # the scoping becomes complicated when reusing functions + try: + _task = kwa.pop('task') + except KeyError: + pass + else: + task = _task try: return fn(*a, **kwa) except Exception as exc: + try: + task is not None + except NameError: + raise exc for attempt in range(1, 240): if backoff(attempt) > task.retry_delay: task.retry_delay = backoff(attempt) @@ -84,7 +98,7 @@ def exponential_backoff(task_func=None, /, *args, **kwargs): raise exc kwargs.update(dict( context=True, - retry_delay=backoff(1), + retry_delay=backoff_func(1), )) return task_func(*args, **kwargs)(inner) return deco From a4040928be0687cc320e971d8b0bac10dfbaece8 Mon Sep 17 00:00:00 2001 From: tcely Date: Sun, 15 Jun 2025 06:27:34 -0400 Subject: [PATCH 19/20] Print only a list of keys --- tubesync/common/huey.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tubesync/common/huey.py b/tubesync/common/huey.py index 3f02c2a0..4bdd24b5 100644 --- a/tubesync/common/huey.py +++ b/tubesync/common/huey.py @@ -10,7 +10,7 @@ def h_q_dict(q, /): return dict( scheduled=(q.scheduled_count(), q.scheduled(),), pending=(q.pending_count(), q.pending(),), - result=(q.result_count(), q.all_results().keys(),), + result=(q.result_count(), list(q.all_results().keys()),), ) From 2c203572a26a2e631e429afd0b505224b4a101ed Mon Sep 17 00:00:00 2001 From: tcely Date: Sun, 15 Jun 2025 07:27:47 -0400 Subject: [PATCH 20/20] Rename `exponential_backoff` to `dynamic_retry` --- tubesync/common/huey.py | 2 +- tubesync/sync/tasks.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tubesync/common/huey.py b/tubesync/common/huey.py index 4bdd24b5..87963726 100644 --- a/tubesync/common/huey.py +++ b/tubesync/common/huey.py @@ -61,7 +61,7 @@ def sqlite_tasks(key, /, prefix=None): ) -def exponential_backoff(task_func=None, /, *args, **kwargs): +def dynamic_retry(task_func=None, /, *args, **kwargs): if task_func is None: from django_huey import task as huey_task task_func = huey_task diff --git a/tubesync/sync/tasks.py b/tubesync/sync/tasks.py index 37465ca0..97a63515 100644 --- a/tubesync/sync/tasks.py +++ b/tubesync/sync/tasks.py @@ -29,7 +29,7 @@ from background_task.exceptions import InvalidTaskError from background_task.models import Task, CompletedTask from django_huey import db_periodic_task, db_task, task as huey_task # noqa from huey import CancelExecution -from common.huey import exponential_backoff +from common.huey import dynamic_retry from common.logger import log from common.errors import ( BgTaskWorkerError, DownloadFailedException, NoFormatException, NoMediaException, @@ -555,7 +555,7 @@ def check_source_directory_exists(source_id): source.make_directory() -@exponential_backoff(db_task, delay=10, priority=90, retries=15, queue=Val(TaskQueue.NET)) +@dynamic_retry(db_task, delay=10, priority=90, retries=15, queue=Val(TaskQueue.NET)) def download_source_images(source_id): ''' Downloads an image and save it as a local thumbnail attached to a @@ -895,7 +895,7 @@ def save_all_media_for_source(source_id): update_task_status(task, None) -@exponential_backoff(db_task, priority=50, retries=15, queue=Val(TaskQueue.LIMIT)) +@dynamic_retry(db_task, backoff_func=lambda n: (n*3600)+600, priority=50, retries=15, queue=Val(TaskQueue.LIMIT)) def refresh_formats(media_id): try: media = Media.objects.get(pk=media_id)