diff --git a/tubesync/common/huey.py b/tubesync/common/huey.py index 66d13fc2..87963726 100644 --- a/tubesync/common/huey.py +++ b/tubesync/common/huey.py @@ -1,3 +1,28 @@ +from functools import wraps + + +def delay_to_eta(delay, /): + from huey.utils import normalize_time + return normalize_time(delay=delay) + + +def h_q_dict(q, /): + return dict( + scheduled=(q.scheduled_count(), q.scheduled(),), + pending=(q.pending_count(), q.pending(),), + result=(q.result_count(), list(q.all_results().keys()),), + ) + + +def h_q_tuple(q, /): + if isinstance(q, str): + from django_huey import get_queue + q = get_queue(q) + return ( + q.name, + list(q._registry._registry.keys()), + h_q_dict(q), + ) def sqlite_tasks(key, /, prefix=None): @@ -35,3 +60,46 @@ def sqlite_tasks(key, /, prefix=None): ), ) + +def dynamic_retry(task_func=None, /, *args, **kwargs): + if task_func is None: + from django_huey import task as huey_task + task_func = huey_task + backoff_func = kwargs.pop('backoff_func', None) + def default_backoff(attempt, /): + return (5+(attempt**4)) + if backoff_func is None or not callable(backoff_func): + backoff_func = default_backoff + def deco(fn): + @wraps(fn) + def inner(*a, **kwa): + backoff = backoff_func + # the scoping becomes complicated when reusing functions + try: + _task = kwa.pop('task') + except KeyError: + pass + else: + task = _task + try: + return fn(*a, **kwa) + except Exception as exc: + try: + task is not None + except NameError: + raise exc + for attempt in range(1, 240): + if backoff(attempt) > task.retry_delay: + task.retry_delay = backoff(attempt) + break + # insanity, but handle it anyway + if 239 == attempt: + task.retry_delay = backoff(attempt) + raise exc + kwargs.update(dict( + context=True, + retry_delay=backoff_func(1), + )) + return task_func(*args, **kwargs)(inner) + return deco + diff --git a/tubesync/sync/signals.py b/tubesync/sync/signals.py index 78329804..998ab3a3 100644 --- a/tubesync/sync/signals.py +++ b/tubesync/sync/signals.py @@ -111,10 +111,7 @@ def source_post_save(sender, instance, created, **kwargs): verbose_name=verbose_name.format(instance.name), ) if instance.source_type != Val(YouTube_SourceType.PLAYLIST) and instance.copy_channel_images: - download_source_images( - str(instance.pk), - verbose_name=verbose_name.format(instance.name), - ) + download_source_images(str(instance.pk)) if instance.index_schedule > 0: delete_task_by_source('sync.tasks.index_source_task', instance.pk) log.info(f'Scheduling first media indexing for source: {instance.name}') diff --git a/tubesync/sync/tasks.py b/tubesync/sync/tasks.py index 8a2ea9a5..97a63515 100644 --- a/tubesync/sync/tasks.py +++ b/tubesync/sync/tasks.py @@ -27,6 +27,9 @@ from django.utils.translation import gettext_lazy as _ from background_task import background from background_task.exceptions import InvalidTaskError from background_task.models import Task, CompletedTask +from django_huey import db_periodic_task, db_task, task as huey_task # noqa +from huey import CancelExecution +from common.huey import dynamic_retry from common.logger import log from common.errors import ( BgTaskWorkerError, DownloadFailedException, NoFormatException, NoMediaException, @@ -63,7 +66,6 @@ def map_task_to_instance(task): 'sync.tasks.download_media': Media, 'sync.tasks.download_media_metadata': Media, 'sync.tasks.save_all_media_for_source': Source, - 'sync.tasks.refresh_formats': Media, 'sync.tasks.rename_media': Media, 'sync.tasks.rename_all_media_for_source': Source, 'sync.tasks.wait_for_media_premiere': Media, @@ -209,29 +211,22 @@ def migrate_queues(): def save_model(instance): + with atomic(durable=False): + instance.save() if 'sqlite' != db_vendor: - with atomic(durable=False): - instance.save() return # work around for SQLite and its many # "database is locked" errors - with atomic(durable=False): - instance.save() arg = getattr(settings, 'SQLITE_DELAY_FLOAT', 1.5) time.sleep(random.expovariate(arg)) -@atomic(durable=False) def schedule_media_servers_update(): # Schedule a task to update media servers log.info('Scheduling media server updates') - verbose_name = _('Request media server rescan for "{}"') for mediaserver in MediaServer.objects.all(): - rescan_media_server( - str(mediaserver.pk), - verbose_name=verbose_name.format(mediaserver), - ) + rescan_media_server(str(mediaserver.pk)) def wait_for_errors(model, /, *, task_name=None): @@ -266,8 +261,9 @@ def wait_for_errors(model, /, *, task_name=None): update_task_status(task, None) -def cleanup_old_media(): - with atomic(): +@db_task(queue=Val(TaskQueue.FS)) +def cleanup_old_media(durable=True): + with atomic(durable=durable): for source in qs_gen(Source.objects.filter(delete_old_media=True, days_to_keep__gt=0)): delta = timezone.now() - timedelta(days=source.days_to_keep) mqs = source.media_source.defer( @@ -280,13 +276,19 @@ def cleanup_old_media(): log.info(f'Deleting expired media: {source} / {media} ' f'(now older than {source.days_to_keep} days / ' f'download_date before {delta})') - with atomic(): + with atomic(durable=False): # .delete() also triggers a pre_delete/post_delete signals that remove files media.delete() schedule_media_servers_update() -def cleanup_removed_media(source, video_keys): +@db_task(queue=Val(TaskQueue.FS)) +def cleanup_removed_media(source_id, video_keys): + try: + source = Source.objects.get(pk=source_id) + except Source.DoesNotExist as e: + # Task triggered but the Source has been deleted, delete the task + raise CancelExecution(_('no such source'), retry=False) from e if not source.delete_removed_media: return log.info(f'Cleaning up media no longer in source: {source}') @@ -295,11 +297,12 @@ def cleanup_removed_media(source, video_keys): ).filter( source=source, ) - for media in qs_gen(mqs): - if media.key not in video_keys: - log.info(f'{media.name} is no longer in source, removing') - with atomic(): - media.delete() + with atomic(durable=True): + for media in qs_gen(mqs): + if media.key not in video_keys: + log.info(f'{media.name} is no longer in source, removing') + with atomic(durable=False): + media.delete() schedule_media_servers_update() @@ -519,7 +522,7 @@ def index_source_task(source_id): save_db_batch(Metadata.objects, db_batch_data, db_fields_data) save_db_batch(Media.objects, db_batch_media, db_fields_media) # Cleanup of media no longer available from the source - cleanup_removed_media(source, video_keys) + cleanup_removed_media(str(source.pk), video_keys) # Clear references to indexed data videos = video = None db_batch_data.clear() @@ -552,7 +555,7 @@ def check_source_directory_exists(source_id): source.make_directory() -@background(schedule=dict(priority=10, run_at=10), queue=Val(TaskQueue.NET)) +@dynamic_retry(db_task, delay=10, priority=90, retries=15, queue=Val(TaskQueue.NET)) def download_source_images(source_id): ''' Downloads an image and save it as a local thumbnail attached to a @@ -564,7 +567,7 @@ def download_source_images(source_id): # Task triggered but the source no longer exists, do nothing log.error(f'Task download_source_images(pk={source_id}) called but no ' f'source exists with ID: {source_id}') - raise InvalidTaskError(_('no such source')) from e + raise CancelExecution(_('no such source'), retry=False) from e avatar, banner = source.get_image_url log.info(f'Thumbnail URL for source with ID: {source_id} / {source} ' f'Avatar: {avatar} ' @@ -783,10 +786,7 @@ def download_media(media_id, override=False): # Try refreshing formats if media.has_metadata: log.debug(f'Scheduling a task to refresh metadata for: {media.key}: "{media.name}"') - refresh_formats( - str(media.pk), - verbose_name=f'Refreshing metadata formats for: {media.key}: "{media.name}"', - ) + refresh_formats(str(media.pk)) log.exception(str(e)) raise else: @@ -794,10 +794,7 @@ def download_media(media_id, override=False): # Try refreshing formats if media.has_metadata: log.debug(f'Scheduling a task to refresh metadata for: {media.key}: "{media.name}"') - refresh_formats( - str(media.pk), - verbose_name=f'Refreshing metadata formats for: {media.key}: "{media.name}"', - ) + refresh_formats(str(media.pk)) # Expected file doesn't exist on disk err = (f'Failed to download media: {media} (UUID: {media.pk}) to disk, ' f'expected outfile does not exist: {filepath}') @@ -814,7 +811,7 @@ def download_media(media_id, override=False): schedule_media_servers_update() -@background(schedule=dict(priority=0, run_at=30), queue=Val(TaskQueue.NET), remove_existing_tasks=True) +@db_task(delay=30, expires=210, priority=100, queue=Val(TaskQueue.NET)) def rescan_media_server(mediaserver_id): ''' Attempts to request a media rescan on a remote media server. @@ -823,7 +820,7 @@ def rescan_media_server(mediaserver_id): mediaserver = MediaServer.objects.get(pk=mediaserver_id) except MediaServer.DoesNotExist as e: # Task triggered but the media server no longer exists, do nothing - raise InvalidTaskError(_('no such server')) from e + raise CancelExecution(_('no such server'), retry=False) from e # Request an rescan / update log.info(f'Updating media server: {mediaserver}') mediaserver.update() @@ -876,10 +873,7 @@ def save_all_media_for_source(source_id): tvn_format = '1/{:,}' + f'/{refresh_qs.count():,}' for mn, media in enumerate(qs_gen(refresh_qs), start=1): update_task_status(task, tvn_format.format(mn)) - refresh_formats( - str(media.pk), - verbose_name=f'Refreshing metadata formats for: {media.key}: "{media.name}"', - ) + refresh_formats(str(media.pk)) saved_later.add(media.uuid) # Keep out of the way of the index task! @@ -901,19 +895,15 @@ def save_all_media_for_source(source_id): update_task_status(task, None) -@background(schedule=dict(priority=50, run_at=0), queue=Val(TaskQueue.NET), remove_existing_tasks=True) +@dynamic_retry(db_task, backoff_func=lambda n: (n*3600)+600, priority=50, retries=15, queue=Val(TaskQueue.LIMIT)) def refresh_formats(media_id): try: media = Media.objects.get(pk=media_id) except Media.DoesNotExist as e: - raise InvalidTaskError(_('no such media')) from e - try: - media.refresh_formats - except YouTubeError as e: - log.debug(f'Failed to refresh formats for: {media.source} / {media.key}: {e!s}') - pass + raise CancelExecution(_('no such media'), retry=False) from e else: - save_model(media) + if media.refresh_formats: + save_model(media) @background(schedule=dict(priority=20, run_at=60), queue=Val(TaskQueue.FS), remove_existing_tasks=True) diff --git a/tubesync/sync/tests.py b/tubesync/sync/tests.py index 9d5ce991..bf1e4932 100644 --- a/tubesync/sync/tests.py +++ b/tubesync/sync/tests.py @@ -1833,7 +1833,7 @@ class TasksTestCase(TestCase): self.assertEqual(src1.media_source.all().count(), 3) self.assertEqual(src2.media_source.all().count(), 3) - cleanup_old_media() + cleanup_old_media.call_local(durable=False) self.assertEqual(src1.media_source.all().count(), 3) self.assertEqual(src2.media_source.all().count(), 3)