From eb206f9f4d4d46cbc9482e79c425f3aa6a1c9f84 Mon Sep 17 00:00:00 2001 From: tcely Date: Sun, 13 Apr 2025 10:17:09 -0400 Subject: [PATCH 1/6] Sketch for `django_queryset_generator` function --- tubesync/common/utils.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tubesync/common/utils.py b/tubesync/common/utils.py index 5894f0fc..c0e77cf4 100644 --- a/tubesync/common/utils.py +++ b/tubesync/common/utils.py @@ -222,3 +222,7 @@ def remove_enclosed(haystack, /, open='[', close=']', sep=' ', *, valid=None, st return haystack return haystack[:o] + haystack[len(n)+c:] + +def django_queryset_generator(query_set, /): + pass + From 29e94427c9ecf27911d096a0016c6990d8e870d6 Mon Sep 17 00:00:00 2001 From: tcely Date: Sun, 13 Apr 2025 11:13:32 -0400 Subject: [PATCH 2/6] Fill in the `django_queryset_generator` function --- tubesync/common/utils.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/tubesync/common/utils.py b/tubesync/common/utils.py index c0e77cf4..b74f0f76 100644 --- a/tubesync/common/utils.py +++ b/tubesync/common/utils.py @@ -1,11 +1,13 @@ import cProfile import emoji +import gc import io import os import pstats import string import time from datetime import datetime +from django.core.paginator import Paginator from urllib.parse import urlunsplit, urlencode, urlparse from yt_dlp.utils import LazyList from .errors import DatabaseConnectionError @@ -223,6 +225,19 @@ def remove_enclosed(haystack, /, open='[', close=']', sep=' ', *, valid=None, st return haystack[:o] + haystack[len(n)+c:] -def django_queryset_generator(query_set, /): - pass +def django_queryset_generator(query_set, /, *, page_size=100): + collecting = gc.isenabled() + gc.disable() + paginator = Paginator( + query_set.values_list('pk', flat=True), + page_size, + ) + for page_num in paginator.page_range: + page = paginator.page(page_num) + for key in page.object_list: + yield query_set.filter(pk=key)[0] + gc.collect(generation=1) + gc.collect() + if collecting: + gc.enable() From 950cdbd8484120a58747ca5f7431645fcc3706ab Mon Sep 17 00:00:00 2001 From: tcely Date: Sun, 13 Apr 2025 11:24:15 -0400 Subject: [PATCH 3/6] Use `django_queryset_generator` in `cleanup_old_media` --- tubesync/sync/tasks.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tubesync/sync/tasks.py b/tubesync/sync/tasks.py index 2ec1101a..3a9c43a6 100644 --- a/tubesync/sync/tasks.py +++ b/tubesync/sync/tasks.py @@ -27,7 +27,8 @@ from background_task.models import Task, CompletedTask from common.logger import log from common.errors import ( NoFormatException, NoMediaException, NoMetadataException, DownloadFailedException, ) -from common.utils import json_serial, remove_enclosed +from common.utils import ( django_queryset_generator as qa_gen, + json_serial, remove_enclosed, ) from .choices import Val, TaskQueue from .models import Source, Media, MediaServer from .utils import ( get_remote_image, resize_image_to_height, delete_file, @@ -215,7 +216,7 @@ def schedule_media_servers_update(): def cleanup_old_media(): with atomic(): - for source in Source.objects.filter(delete_old_media=True, days_to_keep__gt=0): + for source in qs_gen(Source.objects.filter(delete_old_media=True, days_to_keep__gt=0)): delta = timezone.now() - timedelta(days=source.days_to_keep) mqs = source.media_source.defer( 'metadata', @@ -223,7 +224,7 @@ def cleanup_old_media(): downloaded=True, download_date__lt=delta, ) - for media in mqs: + for media in qs_gen(mqs): log.info(f'Deleting expired media: {source} / {media} ' f'(now older than {source.days_to_keep} days / ' f'download_date before {delta})') From cfceae2eb98c6ee1cbb2a5473aca66630ad21e31 Mon Sep 17 00:00:00 2001 From: tcely Date: Sun, 13 Apr 2025 11:26:27 -0400 Subject: [PATCH 4/6] fixup: typo --- tubesync/sync/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tubesync/sync/tasks.py b/tubesync/sync/tasks.py index 3a9c43a6..3b8c6b86 100644 --- a/tubesync/sync/tasks.py +++ b/tubesync/sync/tasks.py @@ -27,7 +27,7 @@ from background_task.models import Task, CompletedTask from common.logger import log from common.errors import ( NoFormatException, NoMediaException, NoMetadataException, DownloadFailedException, ) -from common.utils import ( django_queryset_generator as qa_gen, +from common.utils import ( django_queryset_generator as qs_gen, json_serial, remove_enclosed, ) from .choices import Val, TaskQueue from .models import Source, Media, MediaServer From 7c293a0444a2d9dbfc9b2492932baf55f00e5300 Mon Sep 17 00:00:00 2001 From: tcely Date: Sun, 13 Apr 2025 12:14:36 -0400 Subject: [PATCH 5/6] Avoid `UnorderedObjectListWarning` --- tubesync/common/utils.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/tubesync/common/utils.py b/tubesync/common/utils.py index b74f0f76..eddcfe86 100644 --- a/tubesync/common/utils.py +++ b/tubesync/common/utils.py @@ -227,16 +227,22 @@ def remove_enclosed(haystack, /, open='[', close=']', sep=' ', *, valid=None, st def django_queryset_generator(query_set, /, *, page_size=100): collecting = gc.isenabled() + qs = query_set.values_list('pk', flat=True) + if not qs.ordered: + qs = qs.order_by('pk') + paginator = Paginator(qs, page_size) gc.disable() - paginator = Paginator( - query_set.values_list('pk', flat=True), - page_size, - ) for page_num in paginator.page_range: page = paginator.page(page_num) - for key in page.object_list: + keys = list(page.object_list) + for key in keys: yield query_set.filter(pk=key)[0] gc.collect(generation=1) + page = None + keys = list() + gc.collect() + paginator = None + qs = None gc.collect() if collecting: gc.enable() From 465abe23d022d856853761e93c82d383bd4008ea Mon Sep 17 00:00:00 2001 From: tcely Date: Sun, 13 Apr 2025 13:30:26 -0400 Subject: [PATCH 6/6] Use `qs_gen` in tasks.py --- tubesync/sync/tasks.py | 81 ++++++++++++++++++++++-------------------- 1 file changed, 42 insertions(+), 39 deletions(-) diff --git a/tubesync/sync/tasks.py b/tubesync/sync/tasks.py index 3b8c6b86..b42542fe 100644 --- a/tubesync/sync/tasks.py +++ b/tubesync/sync/tasks.py @@ -243,7 +243,7 @@ def cleanup_removed_media(source, videos): ).filter( source=source, ) - for media in mqs: + for media in qs_gen(mqs): matching_source_item = [video['id'] for video in videos if video['id'] == media.key] if not matching_source_item: log.info(f'{media.name} is no longer in source, removing') @@ -337,6 +337,7 @@ def index_source_task(source_id): update_task_status(task, None) # Cleanup of media no longer available from the source cleanup_removed_media(source, videos) + videos = video = None @background(schedule=dict(priority=0, run_at=0), queue=Val(TaskQueue.FS)) @@ -389,6 +390,7 @@ def download_source_images(source_id): file_path = source.directory_path / file_name with open(file_path, 'wb') as f: f.write(django_file.read()) + i = image_file = None if avatar != None: url = avatar @@ -404,6 +406,7 @@ def download_source_images(source_id): file_path = source.directory_path / file_name with open(file_path, 'wb') as f: f.write(django_file.read()) + i = image_file = None log.info(f'Thumbnail downloaded for source with ID: {source_id} / {source}') @@ -527,6 +530,7 @@ def download_media_thumbnail(media_id, url): ), save=True ) + i = image_file = None log.info(f'Saved thumbnail for: {media} from: {url}') return True @@ -702,7 +706,6 @@ def save_all_media_for_source(source_id): f'source exists with ID: {source_id}') raise InvalidTaskError(_('no such source')) from e - saved_later = set() refresh_qs = Media.objects.all().only( 'pk', 'uuid', @@ -716,12 +719,13 @@ def save_all_media_for_source(source_id): downloaded=False, metadata__isnull=False, ) - uuid_qs = Media.objects.all().only( + save_qs = Media.objects.all().only( 'pk', 'uuid', ).filter( source=source, - ).values_list('uuid', flat=True) + ) + saved_later = set() task = get_source_check_task(source_id) if task: task._verbose_name = remove_enclosed( @@ -730,7 +734,7 @@ def save_all_media_for_source(source_id): end=task.verbose_name.find('Check'), ) tvn_format = '1/{:,}' + f'/{refresh_qs.count():,}' - for mn, media in enumerate(refresh_qs, start=1): + for mn, media in enumerate(qs_gen(refresh_qs), start=1): update_task_status(task, tvn_format.format(mn)) refresh_formats( str(media.pk), @@ -740,18 +744,12 @@ def save_all_media_for_source(source_id): # Trigger the post_save signal for each media item linked to this source as various # flags may need to be recalculated - tvn_format = '2/{:,}' + f'/{uuid_qs.count():,}' - for mn, media_uuid in enumerate(uuid_qs, start=1): - if media_uuid not in saved_later: + tvn_format = '2/{:,}' + f'/{save_qs.count():,}' + for mn, media in enumerate(qs_gen(save_qs), start=1): + if media.uuid not in saved_later: update_task_status(task, tvn_format.format(mn)) - try: - media = Media.objects.get(pk=str(media_uuid)) - except Media.DoesNotExist as e: - log.exception(str(e)) - pass - else: - with atomic(): - media.save() + with atomic(): + media.save() # Reset task.verbose_name to the saved value update_task_status(task, None) @@ -775,10 +773,12 @@ def refresh_formats(media_id): @background(schedule=dict(priority=20, run_at=60), queue=Val(TaskQueue.FS), remove_existing_tasks=True) def rename_media(media_id): try: - media = Media.objects.defer('metadata', 'thumb').get(pk=media_id) + media = Media.objects.get(pk=media_id) except Media.DoesNotExist as e: raise InvalidTaskError(_('no such media')) from e - media.rename_files() + else: + with atomic(): + media.rename_files() @background(schedule=dict(priority=20, run_at=300), queue=Val(TaskQueue.FS), remove_existing_tasks=True) @@ -802,13 +802,11 @@ def rename_all_media_for_source(source_id): ) if not create_rename_tasks: return - mqs = Media.objects.all().defer( - 'thumb', - ).filter( + mqs = Media.objects.all().filter( source=source, downloaded=True, ) - for media in mqs: + for media in qs_gen(mqs): with atomic(): media.rename_files() @@ -821,40 +819,45 @@ def wait_for_media_premiere(media_id): media = Media.objects.get(pk=media_id) except Media.DoesNotExist as e: raise InvalidTaskError(_('no such media')) from e - if media.has_metadata: - return - now = timezone.now() - if media.published < now: - media.manual_skip = False - media.skip = False - # start the download tasks - media.save() else: - media.manual_skip = True - media.title = _(f'Premieres in {hours(media.published - now)} hours') - media.save() - task = get_media_premiere_task(media_id) - if task: - update_task_status(task, f'available in {hours(media.published - now)} hours') + if media.has_metadata: + return + now = timezone.now() + if media.published < now: + media.manual_skip = False + media.skip = False + # start the download tasks after save + else: + media.manual_skip = True + media.title = _(f'Premieres in {hours(media.published - now)} hours') + task = get_media_premiere_task(media_id) + if task: + update_task_status(task, f'available in {hours(media.published - now)} hours') + with atomic(): + media.save() @background(schedule=dict(priority=1, run_at=90), queue=Val(TaskQueue.FS), remove_existing_tasks=False) def delete_all_media_for_source(source_id, source_name, source_directory): source = None + assert source_id + assert source_name + assert source_directory try: source = Source.objects.get(pk=source_id) except Source.DoesNotExist as e: # Task triggered but the source no longer exists, do nothing - log.error(f'Task delete_all_media_for_source(pk={source_id}) called but no ' + log.warn(f'Task delete_all_media_for_source(pk={source_id}) called but no ' f'source exists with ID: {source_id}') - raise InvalidTaskError(_('no such source')) from e + #raise InvalidTaskError(_('no such source')) from e + pass # this task can run after a source was deleted mqs = Media.objects.all().defer( 'metadata', ).filter( source=source or source_id, ) with atomic(durable=True): - for media in mqs: + for media in qs_gen(mqs): log.info(f'Deleting media for source: {source_name} item: {media.name}') with atomic(): media.delete()