From eab0ad9d7c0a9f2ba1a4b826cdf29bff06b93fb7 Mon Sep 17 00:00:00 2001 From: tcely Date: Thu, 3 Apr 2025 01:50:20 -0400 Subject: [PATCH] Review of tasks.py --- tubesync/sync/tasks.py | 147 +++++++++++++++++++++-------------------- 1 file changed, 76 insertions(+), 71 deletions(-) diff --git a/tubesync/sync/tasks.py b/tubesync/sync/tasks.py index 6cf0fc2d..8e35f7ac 100644 --- a/tubesync/sync/tasks.py +++ b/tubesync/sync/tasks.py @@ -21,6 +21,7 @@ from django.db.transaction import atomic from django.utils import timezone from django.utils.translation import gettext_lazy as _ from background_task import background +from background_task.exceptions import InvalidTaskError from background_task.models import Task, CompletedTask from common.logger import log from common.errors import NoMediaException, NoMetadataException, DownloadFailedException @@ -123,7 +124,8 @@ def update_task_status(task, status): except DatabaseError as e: if 'Save with update_fields did not affect any rows.' == str(e): pass - raise + else: + raise return True @@ -136,6 +138,7 @@ def get_source_completed_tasks(source_id, only_errors=False): q['failed_at__isnull'] = False return CompletedTask.objects.filter(**q).order_by('-failed_at') + def get_tasks(task_name, id=None, /, instance=None): assert not (id is None and instance is None) arg = str(id or instance.pk) @@ -160,6 +163,7 @@ def get_source_check_task(source_id): def get_source_index_task(source_id): return get_first_task('sync.tasks.index_source_task', source_id) + def delete_task_by_source(task_name, source_id): now = timezone.now() unlocked = Task.objects.unlocked(now) @@ -191,7 +195,7 @@ def schedule_media_servers_update(): for mediaserver in MediaServer.objects.all(): rescan_media_server( str(mediaserver.pk), - priority=30, + priority=10, verbose_name=verbose_name.format(mediaserver), remove_existing_tasks=True, ) @@ -225,7 +229,7 @@ def cleanup_removed_media(source, videos): schedule_media_servers_update() -@background(schedule=300, remove_existing_tasks=True) +@background(schedule=dict(run_at=300), remove_existing_tasks=True) def index_source_task(source_id): ''' Indexes media available from a Source object. @@ -235,18 +239,20 @@ def index_source_task(source_id): cleanup_old_media() try: source = Source.objects.get(pk=source_id) - except Source.DoesNotExist: + except Source.DoesNotExist as e: # Task triggered but the Source has been deleted, delete the task - return + raise InvalidTaskError(_('no such source')) from e # An inactive Source would return an empty list for videos anyway if not source.is_active: return # Reset any errors + # TODO: determine if this affects anything source.has_failed = False source.save() # Index the source videos = source.index_media() if not videos: + # TODO: Record this error in source.has_failed ? raise NoMediaException(f'Source "{source}" (ID: {source_id}) returned no ' f'media to index, is the source key valid? Check the ' f'source configuration is correct and that the source ' @@ -310,7 +316,7 @@ def index_source_task(source_id): cleanup_removed_media(source, videos) -@background(schedule=0) +@background(schedule=dict(run_at=0)) def check_source_directory_exists(source_id): ''' Checks the output directory for a source exists and is writable, if it does @@ -319,17 +325,17 @@ def check_source_directory_exists(source_id): ''' try: source = Source.objects.get(pk=source_id) - except Source.DoesNotExist: + except Source.DoesNotExist as e: # Task triggered but the Source has been deleted, delete the task - return + raise InvalidTaskError(_('no such source')) from e # Check the source output directory exists if not source.directory_exists(): - # Try and create it + # Try to create it log.info(f'Creating directory: {source.directory_path}') source.make_directory() -@background(schedule=0) +@background(schedule=dict(run_at=0)) def download_source_images(source_id): ''' Downloads an image and save it as a local thumbnail attached to a @@ -337,11 +343,11 @@ def download_source_images(source_id): ''' try: source = Source.objects.get(pk=source_id) - except Source.DoesNotExist: + except Source.DoesNotExist as e: # Task triggered but the source no longer exists, do nothing log.error(f'Task download_source_images(pk={source_id}) called but no ' f'source exists with ID: {source_id}') - return + raise InvalidTaskError(_('no such source')) from e avatar, banner = source.get_image_url log.info(f'Thumbnail URL for source with ID: {source_id} / {source} ' f'Avatar: {avatar} ' @@ -379,18 +385,18 @@ def download_source_images(source_id): log.info(f'Thumbnail downloaded for source with ID: {source_id} / {source}') -@background(schedule=60, remove_existing_tasks=True) +@background(schedule=dict(run_at=60), remove_existing_tasks=True) def download_media_metadata(media_id): ''' Downloads the metadata for a media item. ''' try: media = Media.objects.get(pk=media_id) - except Media.DoesNotExist: + except Media.DoesNotExist as e: # Task triggered but the media no longer exists, do nothing log.error(f'Task download_media_metadata(pk={media_id}) called but no ' f'media exists with ID: {media_id}') - return + raise InvalidTaskError(_('no such media')) from e if media.manual_skip: log.info(f'Task for ID: {media_id} / {media} skipped, due to task being manually skipped.') return @@ -466,7 +472,7 @@ def download_media_metadata(media_id): f'{source} / {media}: {media_id}') -@background(schedule=60, remove_existing_tasks=True) +@background(schedule=dict(run_at=60), remove_existing_tasks=True) def download_media_thumbnail(media_id, url): ''' Downloads an image from a URL and save it as a local thumbnail attached to a @@ -474,10 +480,10 @@ def download_media_thumbnail(media_id, url): ''' try: media = Media.objects.get(pk=media_id) - except Media.DoesNotExist: + except Media.DoesNotExist as e: # Task triggered but the media no longer exists, do nothing - return - if media.skip: + raise InvalidTaskError(_('no such media')) from e + if media.skip or media.manual_skip: # Media was toggled to be skipped after the task was scheduled log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but ' f'it is now marked to be skipped, not downloading thumbnail') @@ -504,38 +510,43 @@ def download_media_thumbnail(media_id, url): return True -@background(schedule=60, remove_existing_tasks=True) +@background(schedule=dict(run_at=60), remove_existing_tasks=True) def download_media(media_id): ''' Downloads the media to disk and attaches it to the Media instance. ''' try: media = Media.objects.get(pk=media_id) - except Media.DoesNotExist: + except Media.DoesNotExist as e: # Task triggered but the media no longer exists, do nothing - return - if not media.has_metadata: - raise NoMetadataException('Metadata is not yet available.') - if media.skip: - # Media was toggled to be skipped after the task was scheduled - log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but ' - f'it is now marked to be skipped, not downloading') - return - downloaded_file_exists = ( - media.media_file_exists or - media.filepath.exists() - ) - if media.downloaded and downloaded_file_exists: - # Media has been marked as downloaded before the download_media task was fired, - # skip it - log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but ' - f'it has already been marked as downloaded, not downloading again') - return + raise InvalidTaskError(_('no such media')) from e if not media.source.download_media: log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but ' f'the source {media.source} has since been marked to not download, ' f'not downloading') return + if media.skip or media.manual_skip: + # Media was toggled to be skipped after the task was scheduled + log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but ' + f'it is now marked to be skipped, not downloading') + return + # metadata is required to generate the proper filepath + if not media.has_metadata: + raise NoMetadataException('Metadata is not yet available.') + downloaded_file_exists = ( + media.downloaded and + media.has_metadata and + ( + media.media_file_exists or + media.filepath.exists() + ) + ) + if downloaded_file_exists: + # Media has been marked as downloaded before the download_media task was fired, + # skip it + log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but ' + f'it has already been marked as downloaded, not downloading again') + return max_cap_age = media.source.download_cap_date published = media.published if max_cap_age and published: @@ -608,16 +619,7 @@ def download_media(media_id): log.warn(f'A permissions problem occured when writing the new media NFO file: {e.msg}') pass # Schedule a task to update media servers - for mediaserver in MediaServer.objects.all(): - log.info(f'Scheduling media server updates') - verbose_name = _('Request media server rescan for "{}"') - rescan_media_server( - str(mediaserver.pk), - queue=str(media.source.pk), - priority=0, - verbose_name=verbose_name.format(mediaserver), - remove_existing_tasks=True - ) + schedule_media_servers_update() else: # Expected file doesn't exist on disk err = (f'Failed to download media: {media} (UUID: {media.pk}) to disk, ' @@ -630,22 +632,22 @@ def download_media(media_id): raise DownloadFailedException(err) -@background(schedule=300, remove_existing_tasks=True) +@background(schedule=dict(run_at=300), remove_existing_tasks=True) def rescan_media_server(mediaserver_id): ''' Attempts to request a media rescan on a remote media server. ''' try: mediaserver = MediaServer.objects.get(pk=mediaserver_id) - except MediaServer.DoesNotExist: + except MediaServer.DoesNotExist as e: # Task triggered but the media server no longer exists, do nothing - return + raise InvalidTaskError(_('no such server')) from e # Request an rescan / update log.info(f'Updating media server: {mediaserver}') mediaserver.update() -@background(schedule=300, remove_existing_tasks=True) +@background(schedule=dict(run_at=300), remove_existing_tasks=True) def save_all_media_for_source(source_id): ''' Iterates all media items linked to a source and saves them to @@ -655,11 +657,11 @@ def save_all_media_for_source(source_id): ''' try: source = Source.objects.get(pk=source_id) - except Source.DoesNotExist: + except Source.DoesNotExist as e: # Task triggered but the source no longer exists, do nothing log.error(f'Task save_all_media_for_source(pk={source_id}) called but no ' f'source exists with ID: {source_id}') - return + raise InvalidTaskError(_('no such source')) from e already_saved = set() mqs = Media.objects.filter(source=source) @@ -694,41 +696,41 @@ def save_all_media_for_source(source_id): # flags may need to be recalculated tvn_format = '2/{:,}' + f'/{mqs.count():,}' for mn, media in enumerate(mqs, start=1): - update_task_status(task, tvn_format.format(mn)) if media.uuid not in already_saved: + update_task_status(task, tvn_format.format(mn)) with atomic(): media.save() # Reset task.verbose_name to the saved value update_task_status(task, None) -@background(schedule=60, remove_existing_tasks=True) +@background(schedule=dict(run_at=60), remove_existing_tasks=True) def rename_media(media_id): try: media = Media.objects.defer('metadata', 'thumb').get(pk=media_id) - except Media.DoesNotExist: - return + except Media.DoesNotExist as e: + raise InvalidTaskError(_('no such media')) from e media.rename_files() -@background(schedule=300, remove_existing_tasks=True) +@background(schedule=dict(run_at=300), remove_existing_tasks=True) @atomic(durable=True) def rename_all_media_for_source(source_id): try: source = Source.objects.get(pk=source_id) - except Source.DoesNotExist: + except Source.DoesNotExist as e: # Task triggered but the source no longer exists, do nothing log.error(f'Task rename_all_media_for_source(pk={source_id}) called but no ' f'source exists with ID: {source_id}') - return + raise InvalidTaskError(_('no such source')) from e # Check that the settings allow renaming - rename_sources_setting = settings.RENAME_SOURCES or list() + rename_sources_setting = getattr(settings, 'RENAME_SOURCES', list()) create_rename_tasks = ( ( source.directory and source.directory in rename_sources_setting ) or - settings.RENAME_ALL_SOURCES + getattr(settings, 'RENAME_ALL_SOURCES', False) ) if not create_rename_tasks: return @@ -744,15 +746,15 @@ def rename_all_media_for_source(source_id): media.rename_files() -@background(schedule=60, remove_existing_tasks=True) +@background(schedule=dict(run_at=60), remove_existing_tasks=True) def wait_for_media_premiere(media_id): hours = lambda td: 1+int((24*td.days)+(td.seconds/(60*60))) try: media = Media.objects.get(pk=media_id) - except Media.DoesNotExist: - return - if media.metadata: + except Media.DoesNotExist as e: + raise InvalidTaskError(_('no such media')) from e + if media.has_metadata: return now = timezone.now() if media.published < now: @@ -764,17 +766,20 @@ def wait_for_media_premiere(media_id): media.manual_skip = True media.title = _(f'Premieres in {hours(media.published - now)} hours') media.save() + task = get_media_premiere_task(media_id) + if task: + update_task_status(task, f'available in {hours(media.published - now)} hours') -@background(schedule=300, remove_existing_tasks=False) +@background(schedule=dict(run_at=300), remove_existing_tasks=False) def delete_all_media_for_source(source_id, source_name): source = None try: source = Source.objects.get(pk=source_id) - except Source.DoesNotExist: + except Source.DoesNotExist as e: # Task triggered but the source no longer exists, do nothing log.error(f'Task delete_all_media_for_source(pk={source_id}) called but no ' f'source exists with ID: {source_id}') - pass + raise InvalidTaskError(_('no such source')) from e mqs = Media.objects.all().defer( 'metadata', ).filter(