diff --git a/config/root/etc/s6-overlay/s6-rc.d/tubesync-db-worker/run b/config/root/etc/s6-overlay/s6-rc.d/tubesync-db-worker/run index 03b75ea8..9fbcbc95 100755 --- a/config/root/etc/s6-overlay/s6-rc.d/tubesync-db-worker/run +++ b/config/root/etc/s6-overlay/s6-rc.d/tubesync-db-worker/run @@ -2,4 +2,5 @@ exec nice -n "${TUBESYNC_NICE:-1}" s6-setuidgid app \ /usr/bin/python3 /app/manage.py process_tasks \ - --queue database + --queue database --duration 86400 \ + --sleep "30.${RANDOM}" diff --git a/config/root/etc/s6-overlay/s6-rc.d/tubesync-fs-worker/run b/config/root/etc/s6-overlay/s6-rc.d/tubesync-fs-worker/run index 0642054d..c0a9fb79 100755 --- a/config/root/etc/s6-overlay/s6-rc.d/tubesync-fs-worker/run +++ b/config/root/etc/s6-overlay/s6-rc.d/tubesync-fs-worker/run @@ -2,4 +2,5 @@ exec nice -n "${TUBESYNC_NICE:-1}" s6-setuidgid app \ /usr/bin/python3 /app/manage.py process_tasks \ - --queue filesystem + --queue filesystem --duration 43200 \ + --sleep "20.${RANDOM}" diff --git a/config/root/etc/s6-overlay/s6-rc.d/tubesync-network-worker/run b/config/root/etc/s6-overlay/s6-rc.d/tubesync-network-worker/run index a9c17d49..7f7bcd26 100755 --- a/config/root/etc/s6-overlay/s6-rc.d/tubesync-network-worker/run +++ b/config/root/etc/s6-overlay/s6-rc.d/tubesync-network-worker/run @@ -2,4 +2,5 @@ exec nice -n "${TUBESYNC_NICE:-1}" s6-setuidgid app \ /usr/bin/python3 /app/manage.py process_tasks \ - --queue network + --queue network --duration 43200 \ + --sleep "10.${RANDOM}" diff --git a/tubesync/sync/tasks.py b/tubesync/sync/tasks.py index 7117da6b..653b08e6 100644 --- a/tubesync/sync/tasks.py +++ b/tubesync/sync/tasks.py @@ -17,7 +17,7 @@ from PIL import Image from django.conf import settings from django.core.files.base import ContentFile from django.core.files.uploadedfile import SimpleUploadedFile -from django.db import DatabaseError, IntegrityError +from django.db import reset_queries, DatabaseError, IntegrityError from django.db.transaction import atomic from django.utils import timezone from django.utils.translation import gettext_lazy as _ @@ -123,8 +123,7 @@ def update_task_status(task, status): else: task.verbose_name = f'[{status}] {task._verbose_name}' try: - with atomic(): - task.save(update_fields={'verbose_name'}) + task.save(update_fields={'verbose_name'}) except DatabaseError as e: if 'Save with update_fields did not affect any rows.' == str(e): pass @@ -202,25 +201,29 @@ def migrate_queues(): return qs.update(queue=Val(TaskQueue.NET)) +@atomic(durable=False) def schedule_media_servers_update(): - with atomic(): - # Schedule a task to update media servers - log.info(f'Scheduling media server updates') - verbose_name = _('Request media server rescan for "{}"') - for mediaserver in MediaServer.objects.all(): - rescan_media_server( - str(mediaserver.pk), - priority=10, - verbose_name=verbose_name.format(mediaserver), - remove_existing_tasks=True, - ) + # Schedule a task to update media servers + log.info(f'Scheduling media server updates') + verbose_name = _('Request media server rescan for "{}"') + for mediaserver in MediaServer.objects.all(): + rescan_media_server( + str(mediaserver.pk), + verbose_name=verbose_name.format(mediaserver), + ) def cleanup_old_media(): with atomic(): for source in Source.objects.filter(delete_old_media=True, days_to_keep__gt=0): delta = timezone.now() - timedelta(days=source.days_to_keep) - for media in source.media_source.filter(downloaded=True, download_date__lt=delta): + mqs = source.media_source.defer( + 'metadata', + ).filter( + downloaded=True, + download_date__lt=delta, + ) + for media in mqs: log.info(f'Deleting expired media: {source} / {media} ' f'(now older than {source.days_to_keep} days / ' f'download_date before {delta})') @@ -234,8 +237,12 @@ def cleanup_removed_media(source, videos): if not source.delete_removed_media: return log.info(f'Cleaning up media no longer in source: {source}') - media_objects = Media.objects.filter(source=source) - for media in media_objects: + mqs = Media.objects.defer( + 'metadata', + ).filter( + source=source, + ) + for media in mqs: matching_source_item = [video['id'] for video in videos if video['id'] == media.key] if not matching_source_item: log.info(f'{media.name} is no longer in source, removing') @@ -244,11 +251,12 @@ def cleanup_removed_media(source, videos): schedule_media_servers_update() -@background(schedule=dict(priority=10, run_at=30), queue=Val(TaskQueue.NET), remove_existing_tasks=True) +@background(schedule=dict(priority=20, run_at=30), queue=Val(TaskQueue.NET), remove_existing_tasks=True) def index_source_task(source_id): ''' Indexes media available from a Source object. ''' + reset_queries() cleanup_completed_tasks() # deleting expired media should happen any time an index task is requested cleanup_old_media() @@ -322,7 +330,6 @@ def index_source_task(source_id): verbose_name = _('Downloading metadata for "{}"') download_media_metadata( str(media.pk), - priority=20, verbose_name=verbose_name.format(media.pk), ) # Reset task.verbose_name to the saved value @@ -350,7 +357,7 @@ def check_source_directory_exists(source_id): source.make_directory() -@background(schedule=dict(priority=5, run_at=10), queue=Val(TaskQueue.NET)) +@background(schedule=dict(priority=10, run_at=10), queue=Val(TaskQueue.NET)) def download_source_images(source_id): ''' Downloads an image and save it as a local thumbnail attached to a @@ -400,7 +407,7 @@ def download_source_images(source_id): log.info(f'Thumbnail downloaded for source with ID: {source_id} / {source}') -@background(schedule=dict(priority=20, run_at=60), queue=Val(TaskQueue.NET), remove_existing_tasks=True) +@background(schedule=dict(priority=40, run_at=60), queue=Val(TaskQueue.NET), remove_existing_tasks=True) def download_media_metadata(media_id): ''' Downloads the metadata for a media item. @@ -484,7 +491,7 @@ def download_media_metadata(media_id): f'{source} / {media}: {media_id}') -@background(schedule=dict(priority=15, run_at=10), queue=Val(TaskQueue.NET), remove_existing_tasks=True) +@background(schedule=dict(priority=10, run_at=10), queue=Val(TaskQueue.FS), remove_existing_tasks=True) def download_media_thumbnail(media_id, url): ''' Downloads an image from a URL and save it as a local thumbnail attached to a @@ -522,7 +529,7 @@ def download_media_thumbnail(media_id, url): return True -@background(schedule=dict(priority=15, run_at=60), queue=Val(TaskQueue.NET), remove_existing_tasks=True) +@background(schedule=dict(priority=30, run_at=60), queue=Val(TaskQueue.NET), remove_existing_tasks=True) def download_media(media_id): ''' Downloads the media to disk and attaches it to the Media instance. @@ -676,7 +683,7 @@ def rescan_media_server(mediaserver_id): mediaserver.update() -@background(schedule=dict(priority=25, run_at=600), queue=Val(TaskQueue.FS), remove_existing_tasks=True) +@background(schedule=dict(priority=30, run_at=600), queue=Val(TaskQueue.FS), remove_existing_tasks=True) def save_all_media_for_source(source_id): ''' Iterates all media items linked to a source and saves them to @@ -684,6 +691,7 @@ def save_all_media_for_source(source_id): source has its parameters changed and all media needs to be checked to see if its download status has changed. ''' + reset_queries() try: source = Source.objects.get(pk=source_id) except Source.DoesNotExist as e: @@ -693,15 +701,26 @@ def save_all_media_for_source(source_id): raise InvalidTaskError(_('no such source')) from e saved_later = set() - mqs = Media.objects.filter(source=source) - task = get_source_check_task(source_id) - refresh_qs = mqs.filter( + refresh_qs = Media.objects.all().only( + 'pk', + 'uuid', + 'key', + 'title', # for name property + ).filter( + source=source, can_download=False, skip=False, manual_skip=False, downloaded=False, metadata__isnull=False, ) + uuid_qs = Media.objects.all().only( + 'pk', + 'uuid', + ).filter( + source=source, + ).values_list('uuid', flat=True) + task = get_source_check_task(source_id) if task: task._verbose_name = remove_enclosed( task.verbose_name, '[', ']', ' ', @@ -719,17 +738,23 @@ def save_all_media_for_source(source_id): # Trigger the post_save signal for each media item linked to this source as various # flags may need to be recalculated - tvn_format = '2/{:,}' + f'/{mqs.count():,}' - for mn, media in enumerate(mqs, start=1): - if media.uuid not in saved_later: + tvn_format = '2/{:,}' + f'/{uuid_qs.count():,}' + for mn, media_uuid in enumerate(uuid_qs, start=1): + if media_uuid not in saved_later: update_task_status(task, tvn_format.format(mn)) - with atomic(): - media.save() + try: + media = Media.objects.get(pk=str(media_uuid)) + except Media.DoesNotExist as e: + log.exception(str(e)) + pass + else: + with atomic(): + media.save() # Reset task.verbose_name to the saved value update_task_status(task, None) -@background(schedule=dict(priority=10, run_at=0), queue=Val(TaskQueue.NET), remove_existing_tasks=True) +@background(schedule=dict(priority=50, run_at=0), queue=Val(TaskQueue.NET), remove_existing_tasks=True) def refresh_formats(media_id): try: media = Media.objects.get(pk=media_id) @@ -776,7 +801,6 @@ def rename_all_media_for_source(source_id): if not create_rename_tasks: return mqs = Media.objects.all().defer( - 'metadata', 'thumb', ).filter( source=source, diff --git a/tubesync/sync/youtube.py b/tubesync/sync/youtube.py index 145e4c5d..61f9a489 100644 --- a/tubesync/sync/youtube.py +++ b/tubesync/sync/youtube.py @@ -205,10 +205,14 @@ def get_media_info(url, /, *, days=None, info_json=None): 'paths': paths, 'postprocessors': postprocessors, 'skip_unavailable_fragments': False, - 'sleep_interval_requests': 2 * settings.BACKGROUND_TASK_ASYNC_THREADS, + 'sleep_interval_requests': 1, 'verbose': True if settings.DEBUG else False, 'writeinfojson': True, }) + if settings.BACKGROUND_TASK_RUN_ASYNC: + opts.update({ + 'sleep_interval_requests': 2 * settings.BACKGROUND_TASK_ASYNC_THREADS, + }) if start: log.debug(f'get_media_info: used date range: {opts["daterange"]} for URL: {url}') response = {} diff --git a/tubesync/tubesync/local_settings.py.container b/tubesync/tubesync/local_settings.py.container index cc20f73b..4f386b66 100644 --- a/tubesync/tubesync/local_settings.py.container +++ b/tubesync/tubesync/local_settings.py.container @@ -62,6 +62,8 @@ else: DEFAULT_THREADS = 1 BACKGROUND_TASK_ASYNC_THREADS = getenv('TUBESYNC_WORKERS', DEFAULT_THREADS, integer=True) +if BACKGROUND_TASK_ASYNC_THREADS > 1: + BACKGROUND_TASK_RUN_ASYNC = True MEDIA_ROOT = CONFIG_BASE_DIR / 'media'