Merge pull request #959 from tcely/patch-2
Some checks are pending
CI / info (push) Waiting to run
CI / test (3.10) (push) Waiting to run
CI / test (3.11) (push) Waiting to run
CI / test (3.12) (push) Waiting to run
CI / test (3.8) (push) Waiting to run
CI / test (3.9) (push) Waiting to run
CI / containerise (push) Blocked by required conditions

Create the `django_queryset_generator` function
This commit is contained in:
meeb 2025-04-14 17:32:28 +10:00 committed by GitHub
commit f3c33e8743
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 71 additions and 42 deletions

View File

@ -1,11 +1,13 @@
import cProfile import cProfile
import emoji import emoji
import gc
import io import io
import os import os
import pstats import pstats
import string import string
import time import time
from datetime import datetime from datetime import datetime
from django.core.paginator import Paginator
from urllib.parse import urlunsplit, urlencode, urlparse from urllib.parse import urlunsplit, urlencode, urlparse
from yt_dlp.utils import LazyList from yt_dlp.utils import LazyList
from .errors import DatabaseConnectionError from .errors import DatabaseConnectionError
@ -222,3 +224,26 @@ def remove_enclosed(haystack, /, open='[', close=']', sep=' ', *, valid=None, st
return haystack return haystack
return haystack[:o] + haystack[len(n)+c:] return haystack[:o] + haystack[len(n)+c:]
def django_queryset_generator(query_set, /, *, page_size=100):
collecting = gc.isenabled()
qs = query_set.values_list('pk', flat=True)
if not qs.ordered:
qs = qs.order_by('pk')
paginator = Paginator(qs, page_size)
gc.disable()
for page_num in paginator.page_range:
page = paginator.page(page_num)
keys = list(page.object_list)
for key in keys:
yield query_set.filter(pk=key)[0]
gc.collect(generation=1)
page = None
keys = list()
gc.collect()
paginator = None
qs = None
gc.collect()
if collecting:
gc.enable()

View File

@ -27,7 +27,8 @@ from background_task.models import Task, CompletedTask
from common.logger import log from common.logger import log
from common.errors import ( NoFormatException, NoMediaException, from common.errors import ( NoFormatException, NoMediaException,
NoMetadataException, DownloadFailedException, ) NoMetadataException, DownloadFailedException, )
from common.utils import json_serial, remove_enclosed from common.utils import ( django_queryset_generator as qs_gen,
json_serial, remove_enclosed, )
from .choices import Val, TaskQueue from .choices import Val, TaskQueue
from .models import Source, Media, MediaServer from .models import Source, Media, MediaServer
from .utils import ( get_remote_image, resize_image_to_height, delete_file, from .utils import ( get_remote_image, resize_image_to_height, delete_file,
@ -215,7 +216,7 @@ def schedule_media_servers_update():
def cleanup_old_media(): def cleanup_old_media():
with atomic(): with atomic():
for source in Source.objects.filter(delete_old_media=True, days_to_keep__gt=0): for source in qs_gen(Source.objects.filter(delete_old_media=True, days_to_keep__gt=0)):
delta = timezone.now() - timedelta(days=source.days_to_keep) delta = timezone.now() - timedelta(days=source.days_to_keep)
mqs = source.media_source.defer( mqs = source.media_source.defer(
'metadata', 'metadata',
@ -223,7 +224,7 @@ def cleanup_old_media():
downloaded=True, downloaded=True,
download_date__lt=delta, download_date__lt=delta,
) )
for media in mqs: for media in qs_gen(mqs):
log.info(f'Deleting expired media: {source} / {media} ' log.info(f'Deleting expired media: {source} / {media} '
f'(now older than {source.days_to_keep} days / ' f'(now older than {source.days_to_keep} days / '
f'download_date before {delta})') f'download_date before {delta})')
@ -242,7 +243,7 @@ def cleanup_removed_media(source, videos):
).filter( ).filter(
source=source, source=source,
) )
for media in mqs: for media in qs_gen(mqs):
matching_source_item = [video['id'] for video in videos if video['id'] == media.key] matching_source_item = [video['id'] for video in videos if video['id'] == media.key]
if not matching_source_item: if not matching_source_item:
log.info(f'{media.name} is no longer in source, removing') log.info(f'{media.name} is no longer in source, removing')
@ -336,6 +337,7 @@ def index_source_task(source_id):
update_task_status(task, None) update_task_status(task, None)
# Cleanup of media no longer available from the source # Cleanup of media no longer available from the source
cleanup_removed_media(source, videos) cleanup_removed_media(source, videos)
videos = video = None
@background(schedule=dict(priority=0, run_at=0), queue=Val(TaskQueue.FS)) @background(schedule=dict(priority=0, run_at=0), queue=Val(TaskQueue.FS))
@ -388,6 +390,7 @@ def download_source_images(source_id):
file_path = source.directory_path / file_name file_path = source.directory_path / file_name
with open(file_path, 'wb') as f: with open(file_path, 'wb') as f:
f.write(django_file.read()) f.write(django_file.read())
i = image_file = None
if avatar != None: if avatar != None:
url = avatar url = avatar
@ -403,6 +406,7 @@ def download_source_images(source_id):
file_path = source.directory_path / file_name file_path = source.directory_path / file_name
with open(file_path, 'wb') as f: with open(file_path, 'wb') as f:
f.write(django_file.read()) f.write(django_file.read())
i = image_file = None
log.info(f'Thumbnail downloaded for source with ID: {source_id} / {source}') log.info(f'Thumbnail downloaded for source with ID: {source_id} / {source}')
@ -526,6 +530,7 @@ def download_media_thumbnail(media_id, url):
), ),
save=True save=True
) )
i = image_file = None
log.info(f'Saved thumbnail for: {media} from: {url}') log.info(f'Saved thumbnail for: {media} from: {url}')
return True return True
@ -701,7 +706,6 @@ def save_all_media_for_source(source_id):
f'source exists with ID: {source_id}') f'source exists with ID: {source_id}')
raise InvalidTaskError(_('no such source')) from e raise InvalidTaskError(_('no such source')) from e
saved_later = set()
refresh_qs = Media.objects.all().only( refresh_qs = Media.objects.all().only(
'pk', 'pk',
'uuid', 'uuid',
@ -715,12 +719,13 @@ def save_all_media_for_source(source_id):
downloaded=False, downloaded=False,
metadata__isnull=False, metadata__isnull=False,
) )
uuid_qs = Media.objects.all().only( save_qs = Media.objects.all().only(
'pk', 'pk',
'uuid', 'uuid',
).filter( ).filter(
source=source, source=source,
).values_list('uuid', flat=True) )
saved_later = set()
task = get_source_check_task(source_id) task = get_source_check_task(source_id)
if task: if task:
task._verbose_name = remove_enclosed( task._verbose_name = remove_enclosed(
@ -729,7 +734,7 @@ def save_all_media_for_source(source_id):
end=task.verbose_name.find('Check'), end=task.verbose_name.find('Check'),
) )
tvn_format = '1/{:,}' + f'/{refresh_qs.count():,}' tvn_format = '1/{:,}' + f'/{refresh_qs.count():,}'
for mn, media in enumerate(refresh_qs, start=1): for mn, media in enumerate(qs_gen(refresh_qs), start=1):
update_task_status(task, tvn_format.format(mn)) update_task_status(task, tvn_format.format(mn))
refresh_formats( refresh_formats(
str(media.pk), str(media.pk),
@ -739,18 +744,12 @@ def save_all_media_for_source(source_id):
# Trigger the post_save signal for each media item linked to this source as various # Trigger the post_save signal for each media item linked to this source as various
# flags may need to be recalculated # flags may need to be recalculated
tvn_format = '2/{:,}' + f'/{uuid_qs.count():,}' tvn_format = '2/{:,}' + f'/{save_qs.count():,}'
for mn, media_uuid in enumerate(uuid_qs, start=1): for mn, media in enumerate(qs_gen(save_qs), start=1):
if media_uuid not in saved_later: if media.uuid not in saved_later:
update_task_status(task, tvn_format.format(mn)) update_task_status(task, tvn_format.format(mn))
try: with atomic():
media = Media.objects.get(pk=str(media_uuid)) media.save()
except Media.DoesNotExist as e:
log.exception(str(e))
pass
else:
with atomic():
media.save()
# Reset task.verbose_name to the saved value # Reset task.verbose_name to the saved value
update_task_status(task, None) update_task_status(task, None)
@ -774,10 +773,12 @@ def refresh_formats(media_id):
@background(schedule=dict(priority=20, run_at=60), queue=Val(TaskQueue.FS), remove_existing_tasks=True) @background(schedule=dict(priority=20, run_at=60), queue=Val(TaskQueue.FS), remove_existing_tasks=True)
def rename_media(media_id): def rename_media(media_id):
try: try:
media = Media.objects.defer('metadata', 'thumb').get(pk=media_id) media = Media.objects.get(pk=media_id)
except Media.DoesNotExist as e: except Media.DoesNotExist as e:
raise InvalidTaskError(_('no such media')) from e raise InvalidTaskError(_('no such media')) from e
media.rename_files() else:
with atomic():
media.rename_files()
@background(schedule=dict(priority=20, run_at=300), queue=Val(TaskQueue.FS), remove_existing_tasks=True) @background(schedule=dict(priority=20, run_at=300), queue=Val(TaskQueue.FS), remove_existing_tasks=True)
@ -801,13 +802,11 @@ def rename_all_media_for_source(source_id):
) )
if not create_rename_tasks: if not create_rename_tasks:
return return
mqs = Media.objects.all().defer( mqs = Media.objects.all().filter(
'thumb',
).filter(
source=source, source=source,
downloaded=True, downloaded=True,
) )
for media in mqs: for media in qs_gen(mqs):
with atomic(): with atomic():
media.rename_files() media.rename_files()
@ -820,40 +819,45 @@ def wait_for_media_premiere(media_id):
media = Media.objects.get(pk=media_id) media = Media.objects.get(pk=media_id)
except Media.DoesNotExist as e: except Media.DoesNotExist as e:
raise InvalidTaskError(_('no such media')) from e raise InvalidTaskError(_('no such media')) from e
if media.has_metadata:
return
now = timezone.now()
if media.published < now:
media.manual_skip = False
media.skip = False
# start the download tasks
media.save()
else: else:
media.manual_skip = True if media.has_metadata:
media.title = _(f'Premieres in {hours(media.published - now)} hours') return
media.save() now = timezone.now()
task = get_media_premiere_task(media_id) if media.published < now:
if task: media.manual_skip = False
update_task_status(task, f'available in {hours(media.published - now)} hours') media.skip = False
# start the download tasks after save
else:
media.manual_skip = True
media.title = _(f'Premieres in {hours(media.published - now)} hours')
task = get_media_premiere_task(media_id)
if task:
update_task_status(task, f'available in {hours(media.published - now)} hours')
with atomic():
media.save()
@background(schedule=dict(priority=1, run_at=90), queue=Val(TaskQueue.FS), remove_existing_tasks=False) @background(schedule=dict(priority=1, run_at=90), queue=Val(TaskQueue.FS), remove_existing_tasks=False)
def delete_all_media_for_source(source_id, source_name, source_directory): def delete_all_media_for_source(source_id, source_name, source_directory):
source = None source = None
assert source_id
assert source_name
assert source_directory
try: try:
source = Source.objects.get(pk=source_id) source = Source.objects.get(pk=source_id)
except Source.DoesNotExist as e: except Source.DoesNotExist as e:
# Task triggered but the source no longer exists, do nothing # Task triggered but the source no longer exists, do nothing
log.error(f'Task delete_all_media_for_source(pk={source_id}) called but no ' log.warn(f'Task delete_all_media_for_source(pk={source_id}) called but no '
f'source exists with ID: {source_id}') f'source exists with ID: {source_id}')
raise InvalidTaskError(_('no such source')) from e #raise InvalidTaskError(_('no such source')) from e
pass # this task can run after a source was deleted
mqs = Media.objects.all().defer( mqs = Media.objects.all().defer(
'metadata', 'metadata',
).filter( ).filter(
source=source or source_id, source=source or source_id,
) )
with atomic(durable=True): with atomic(durable=True):
for media in mqs: for media in qs_gen(mqs):
log.info(f'Deleting media for source: {source_name} item: {media.name}') log.info(f'Deleting media for source: {source_name} item: {media.name}')
with atomic(): with atomic():
media.delete() media.delete()