From ce63b2806c5f6c9a4c36d17be7e30ad94a3f3f99 Mon Sep 17 00:00:00 2001 From: tcely Date: Mon, 23 Jun 2025 21:48:07 -0400 Subject: [PATCH] Move more work into `huey` queues --- tubesync/sync/tasks.py | 139 ++++++++++++++++++++++++++++++----------- 1 file changed, 104 insertions(+), 35 deletions(-) diff --git a/tubesync/sync/tasks.py b/tubesync/sync/tasks.py index ff0c202e..84c1f752 100644 --- a/tubesync/sync/tasks.py +++ b/tubesync/sync/tasks.py @@ -328,7 +328,7 @@ def wait_for_errors(model, /, *, queue_name=None, task_name=None): raise BgTaskWorkerError(_('queue worker stopped')) -@db_task(queue=Val(TaskQueue.FS)) +@db_task(priority=90, queue=Val(TaskQueue.FS)) def cleanup_old_media(durable=True): with atomic(durable=durable): for source in qs_gen(Source.objects.filter(delete_old_media=True, days_to_keep__gt=0)): @@ -349,7 +349,7 @@ def cleanup_old_media(durable=True): schedule_media_servers_update() -@db_task(queue=Val(TaskQueue.FS)) +@db_task(priority=90, queue=Val(TaskQueue.FS)) def cleanup_removed_media(source_id, video_keys): try: source = Source.objects.get(pk=source_id) @@ -451,8 +451,8 @@ def wait_for_database_queue(): ) -@background(schedule=dict(priority=20, run_at=30), queue=Val(TaskQueue.NET), remove_existing_tasks=True) -def index_source_task(source_id): +@db_task(delay=30, priority=80, queue=Val(TaskQueue.LIMIT)) +def index_source(source_id): ''' Indexes media available from a Source object. ''' @@ -464,10 +464,10 @@ def index_source_task(source_id): source = Source.objects.get(pk=source_id) except Source.DoesNotExist as e: # Task triggered but the Source has been deleted, delete the task - raise InvalidTaskError(_('no such source')) from e + raise CancelExecution(_('no such source'), retry=False) from e # An inactive Source would return an empty list for videos anyway if not source.is_active: - return + return False # update the target schedule column source.task_run_at_dt # Reset any errors @@ -609,6 +609,16 @@ def index_source_task(source_id): schedule=dict(run_at=60), verbose_name=vn_fmt.format(source.name), ) + return True + + +@background(schedule=dict(priority=20, run_at=30), queue=Val(TaskQueue.NET), remove_existing_tasks=True) +def index_source_task(source_id): + try: + res = index_source(source_id) + return res.get(blocking=True) + except CancelExecution as e: + raise InvalidTaskError(str(e)) from e @dynamic_retry(db_task, priority=100, retries=15, queue=Val(TaskQueue.FS)) @@ -682,8 +692,53 @@ def download_source_images(source_id): log.info(f'Thumbnail downloaded for source with ID: {source_id} / {source}') -@background(schedule=dict(priority=40, run_at=60), queue=Val(TaskQueue.NET), remove_existing_tasks=True) -def download_media_metadata(media_id): +@db_task(delay=60, priority=90, retries=5, retry_delay=60, queue=Val(TaskQueue.FS)) +@atomic(durable=True) +def delete_media(media_id): + try: + media = Media.objects.get(pk=media_id) + except Media.DoesNotExist as e: + raise CancelExecution(_('no such media'), retry=False) from e + else: + media.delete() + return True + return False + + +@db_task(delay=60, priority=70, retries=5, retry_delay=60, queue=Val(TaskQueue.FS)) +@atomic(durable=True) +def rename_media(media_id): + try: + media = Media.objects.get(pk=media_id) + except Media.DoesNotExist as e: + raise CancelExecution(_('no such media'), retry=False) from e + else: + with huey_lock_task( + f'media:{media.uuid}', + queue=Val(TaskQueue.DB), + ): + media.rename_files() + + +@db_task(delay=60, priority=80, retries=5, retry_delay=60, queue=Val(TaskQueue.FS)) +@atomic(durable=True) +def save_media(media_id): + try: + media = Media.objects.get(pk=media_id) + except Media.DoesNotExist as e: + raise CancelExecution(_('no such media'), retry=False) from e + else: + with huey_lock_task( + f'media:{media.uuid}', + queue=Val(TaskQueue.DB), + ): + media.save() + return True + return False + + +@db_task(delay=60, priority=60, queue=Val(TaskQueue.LIMIT)) +def download_metadata(media_id): ''' Downloads the metadata for a media item. ''' @@ -693,10 +748,10 @@ def download_media_metadata(media_id): # Task triggered but the media no longer exists, do nothing log.error(f'Task download_media_metadata(pk={media_id}) called but no ' f'media exists with ID: {media_id}') - raise InvalidTaskError(_('no such media')) from e + raise CancelExecution(_('no such media'), retry=False) from e if media.manual_skip: log.info(f'Task for ID: {media_id} / {media} skipped, due to task being manually skipped.') - return + return False source = media.source wait_for_errors( media, @@ -744,7 +799,7 @@ def download_media_metadata(media_id): if raise_exception: raise log.debug(str(e)) - return + return False response = metadata if getattr(settings, 'SHRINK_NEW_MEDIA_METADATA', False): response = filter_response(metadata, True) @@ -779,6 +834,16 @@ def download_media_metadata(media_id): save_model(media) log.info(f'Saved {len(media.metadata_dumps())} bytes of metadata for: ' f'{source} / {media}: {media_id}') + return True + + +@background(schedule=dict(priority=40, run_at=60), queue=Val(TaskQueue.NET), remove_existing_tasks=True) +def download_media_metadata(media_id): + try: + res = download_metadata(media_id) + return res.get(blocking=True) + except CancelExecution as e: + raise InvalidTaskError(str(e)) from e @dynamic_retry(db_task, delay=10, priority=90, retries=15, queue=Val(TaskQueue.NET)) @@ -796,7 +861,7 @@ def download_media_image(media_id, url): # Media was toggled to be skipped after the task was scheduled log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but ' f'it is now marked to be skipped, not downloading thumbnail') - return + return False width = getattr(settings, 'MEDIA_THUMBNAIL_WIDTH', 430) height = getattr(settings, 'MEDIA_THUMBNAIL_HEIGHT', 240) try: @@ -856,8 +921,8 @@ def download_media_thumbnail(media_id, url): except CancelExecution as e: raise InvalidTaskError(str(e)) from e -@background(schedule=dict(priority=30, run_at=60), queue=Val(TaskQueue.NET), remove_existing_tasks=True) -def download_media(media_id, override=False): +@db_task(delay=60, priority=70, queue=Val(TaskQueue.LIMIT)) +def download_media_file(media_id, override=False): ''' Downloads the media to disk and attaches it to the Media instance. ''' @@ -865,12 +930,12 @@ def download_media(media_id, override=False): media = Media.objects.get(pk=media_id) except Media.DoesNotExist as e: # Task triggered but the media no longer exists, do nothing - raise InvalidTaskError(_('no such media')) from e + raise CancelExecution(_('no such media'), retry=False) from e else: if not media.download_checklist(override): # any condition that needs to reschedule the task # should raise an exception to avoid this - return + return False wait_for_errors( media, @@ -909,6 +974,16 @@ def download_media(media_id, override=False): media.write_nfo_file() # Schedule a task to update media servers schedule_media_servers_update() + return True + + +@background(schedule=dict(priority=30, run_at=60), queue=Val(TaskQueue.NET), remove_existing_tasks=True) +def download_media(media_id, override=False): + try: + res = download_media_file(media_id, override) + return res.get(blocking=True) + except CancelExecution as e: + raise InvalidTaskError(str(e)) from e @db_task(delay=30, expires=210, priority=100, queue=Val(TaskQueue.NET)) @@ -986,13 +1061,18 @@ def save_all_media_for_source(source_id): # Trigger the post_save signal for each media item linked to this source as various # flags may need to be recalculated + saved_now = set() tvn_format = '2/{:,}' + f'/{save_qs.count():,}' for mn, media in enumerate(qs_gen(save_qs), start=1): if media.uuid not in saved_later: update_task_status(task, tvn_format.format(mn)) - save_model(media) + saved_now.add(str(media.pk)) + #save_model(media) # Reset task.verbose_name to the saved value update_task_status(task, None) + # wait for tasks to complete + res = save_media.map(saved_now) + res.get(blocking=True) @dynamic_retry(db_task, backoff_func=lambda n: (n*3600)+600, priority=50, retries=15, queue=Val(TaskQueue.LIMIT)) @@ -1031,21 +1111,6 @@ def refresh_formats(media_id): save_model(media) -@db_task(delay=60, priority=80, retries=5, retry_delay=60, queue=Val(TaskQueue.FS)) -@atomic(durable=True) -def rename_media(media_id): - try: - media = Media.objects.get(pk=media_id) - except Media.DoesNotExist as e: - raise CancelExecution(_('no such media'), retry=False) from e - else: - with huey_lock_task( - f'media:{media.uuid}', - queue=Val(TaskQueue.DB), - ): - media.rename_files() - - @db_task(delay=300, priority=80, retries=5, retry_delay=600, queue=Val(TaskQueue.FS)) @atomic(durable=True) def rename_all_media_for_source(source_id): @@ -1067,7 +1132,7 @@ def rename_all_media_for_source(source_id): ) if not create_rename_tasks: return None - mqs = Media.objects.all().filter( + mqs = Media.objects.filter( source=source, downloaded=True, ) @@ -1076,7 +1141,7 @@ def rename_all_media_for_source(source_id): f'media:{media.uuid}', queue=Val(TaskQueue.DB), ): - with atomic(): + with atomic(durable=False): media.rename_files() @@ -1116,6 +1181,7 @@ def delete_all_media_for_source(source_id, source_name, source_directory): ).filter( source=source or source_id, ) + deleted_now = set() with atomic(durable=True): for media in qs_gen(mqs): log.info(f'Deleting media for source: {source_name} item: {media.name}') @@ -1124,7 +1190,10 @@ def delete_all_media_for_source(source_id, source_name, source_directory): media.skip = True media.manual_skip = True media.save() - media.delete() + deleted_now.add(str(media.pk)) + #media.delete() + res = delete_media.map(deleted_now) + res.get(blocking=True) # Remove the directory, if the user requested that directory_path = Path(source_directory) remove = (