Merge branch 'meeb:main' into patch-7

This commit is contained in:
tcely 2025-06-15 11:11:11 -04:00 committed by GitHub
commit 4078308b97
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 106 additions and 50 deletions

View File

@ -1,3 +1,28 @@
from functools import wraps
def delay_to_eta(delay, /):
from huey.utils import normalize_time
return normalize_time(delay=delay)
def h_q_dict(q, /):
return dict(
scheduled=(q.scheduled_count(), q.scheduled(),),
pending=(q.pending_count(), q.pending(),),
result=(q.result_count(), list(q.all_results().keys()),),
)
def h_q_tuple(q, /):
if isinstance(q, str):
from django_huey import get_queue
q = get_queue(q)
return (
q.name,
list(q._registry._registry.keys()),
h_q_dict(q),
)
def sqlite_tasks(key, /, prefix=None):
@ -35,3 +60,46 @@ def sqlite_tasks(key, /, prefix=None):
),
)
def dynamic_retry(task_func=None, /, *args, **kwargs):
if task_func is None:
from django_huey import task as huey_task
task_func = huey_task
backoff_func = kwargs.pop('backoff_func', None)
def default_backoff(attempt, /):
return (5+(attempt**4))
if backoff_func is None or not callable(backoff_func):
backoff_func = default_backoff
def deco(fn):
@wraps(fn)
def inner(*a, **kwa):
backoff = backoff_func
# the scoping becomes complicated when reusing functions
try:
_task = kwa.pop('task')
except KeyError:
pass
else:
task = _task
try:
return fn(*a, **kwa)
except Exception as exc:
try:
task is not None
except NameError:
raise exc
for attempt in range(1, 240):
if backoff(attempt) > task.retry_delay:
task.retry_delay = backoff(attempt)
break
# insanity, but handle it anyway
if 239 == attempt:
task.retry_delay = backoff(attempt)
raise exc
kwargs.update(dict(
context=True,
retry_delay=backoff_func(1),
))
return task_func(*args, **kwargs)(inner)
return deco

View File

@ -1162,6 +1162,7 @@ class Media(models.Model):
log.info(f'Collected {len(other_paths)} other paths for: {self!s}')
# adopt orphaned files, if possible
fuzzy_paths = list()
media_format = str(self.source.media_format)
top_dir_path = Path(self.source.directory_path)
if '{key}' in media_format:

View File

@ -112,10 +112,7 @@ def source_post_save(sender, instance, created, **kwargs):
verbose_name=verbose_name.format(instance.name),
)
if instance.source_type != Val(YouTube_SourceType.PLAYLIST) and instance.copy_channel_images:
download_source_images(
str(instance.pk),
verbose_name=verbose_name.format(instance.name),
)
download_source_images(str(instance.pk))
if instance.index_schedule > 0:
delete_task_by_source('sync.tasks.index_source_task', instance.pk)
log.info(f'Scheduling first media indexing for source: {instance.name}')

View File

@ -27,6 +27,9 @@ from django.utils.translation import gettext_lazy as _
from background_task import background
from background_task.exceptions import InvalidTaskError
from background_task.models import Task, CompletedTask
from django_huey import db_periodic_task, db_task, task as huey_task # noqa
from huey import CancelExecution
from common.huey import dynamic_retry
from common.logger import log
from common.errors import ( BgTaskWorkerError, DownloadFailedException,
NoFormatException, NoMediaException,
@ -63,7 +66,6 @@ def map_task_to_instance(task):
'sync.tasks.download_media': Media,
'sync.tasks.download_media_metadata': Media,
'sync.tasks.save_all_media_for_source': Source,
'sync.tasks.refresh_formats': Media,
'sync.tasks.rename_media': Media,
'sync.tasks.rename_all_media_for_source': Source,
'sync.tasks.wait_for_media_premiere': Media,
@ -212,29 +214,22 @@ def migrate_queues():
def save_model(instance):
with atomic(durable=False):
instance.save()
if 'sqlite' != db_vendor:
with atomic(durable=False):
instance.save()
return
# work around for SQLite and its many
# "database is locked" errors
with atomic(durable=False):
instance.save()
arg = getattr(settings, 'SQLITE_DELAY_FLOAT', 1.5)
time.sleep(random.expovariate(arg))
@atomic(durable=False)
def schedule_media_servers_update():
# Schedule a task to update media servers
log.info('Scheduling media server updates')
verbose_name = _('Request media server rescan for "{}"')
for mediaserver in MediaServer.objects.all():
rescan_media_server(
str(mediaserver.pk),
verbose_name=verbose_name.format(mediaserver),
)
rescan_media_server(str(mediaserver.pk))
def wait_for_errors(model, /, *, task_name=None):
@ -269,8 +264,9 @@ def wait_for_errors(model, /, *, task_name=None):
update_task_status(task, None)
def cleanup_old_media():
with atomic():
@db_task(queue=Val(TaskQueue.FS))
def cleanup_old_media(durable=True):
with atomic(durable=durable):
for source in qs_gen(Source.objects.filter(delete_old_media=True, days_to_keep__gt=0)):
delta = timezone.now() - timedelta(days=source.days_to_keep)
mqs = source.media_source.defer(
@ -283,13 +279,19 @@ def cleanup_old_media():
log.info(f'Deleting expired media: {source} / {media} '
f'(now older than {source.days_to_keep} days / '
f'download_date before {delta})')
with atomic():
with atomic(durable=False):
# .delete() also triggers a pre_delete/post_delete signals that remove files
media.delete()
schedule_media_servers_update()
def cleanup_removed_media(source, video_keys):
@db_task(queue=Val(TaskQueue.FS))
def cleanup_removed_media(source_id, video_keys):
try:
source = Source.objects.get(pk=source_id)
except Source.DoesNotExist as e:
# Task triggered but the Source has been deleted, delete the task
raise CancelExecution(_('no such source'), retry=False) from e
if not source.delete_removed_media:
return
log.info(f'Cleaning up media no longer in source: {source}')
@ -298,11 +300,12 @@ def cleanup_removed_media(source, video_keys):
).filter(
source=source,
)
for media in qs_gen(mqs):
if media.key not in video_keys:
log.info(f'{media.name} is no longer in source, removing')
with atomic():
media.delete()
with atomic(durable=True):
for media in qs_gen(mqs):
if media.key not in video_keys:
log.info(f'{media.name} is no longer in source, removing')
with atomic(durable=False):
media.delete()
schedule_media_servers_update()
@ -522,7 +525,7 @@ def index_source_task(source_id):
save_db_batch(Metadata.objects, db_batch_data, db_fields_data)
save_db_batch(Media.objects, db_batch_media, db_fields_media)
# Cleanup of media no longer available from the source
cleanup_removed_media(source, video_keys)
cleanup_removed_media(str(source.pk), video_keys)
# Clear references to indexed data
videos = video = None
db_batch_data.clear()
@ -555,7 +558,7 @@ def check_source_directory_exists(source_id):
source.make_directory()
@background(schedule=dict(priority=10, run_at=10), queue=Val(TaskQueue.NET))
@dynamic_retry(db_task, delay=10, priority=90, retries=15, queue=Val(TaskQueue.NET))
def download_source_images(source_id):
'''
Downloads an image and save it as a local thumbnail attached to a
@ -567,7 +570,7 @@ def download_source_images(source_id):
# Task triggered but the source no longer exists, do nothing
log.error(f'Task download_source_images(pk={source_id}) called but no '
f'source exists with ID: {source_id}')
raise InvalidTaskError(_('no such source')) from e
raise CancelExecution(_('no such source'), retry=False) from e
avatar, banner = source.get_image_url
log.info(f'Thumbnail URL for source with ID: {source_id} / {source} '
f'Avatar: {avatar} '
@ -786,10 +789,7 @@ def download_media(media_id, override=False):
# Try refreshing formats
if media.has_metadata:
log.debug(f'Scheduling a task to refresh metadata for: {media.key}: "{media.name}"')
refresh_formats(
str(media.pk),
verbose_name=f'Refreshing metadata formats for: {media.key}: "{media.name}"',
)
refresh_formats(str(media.pk))
log.exception(str(e))
raise
else:
@ -797,10 +797,7 @@ def download_media(media_id, override=False):
# Try refreshing formats
if media.has_metadata:
log.debug(f'Scheduling a task to refresh metadata for: {media.key}: "{media.name}"')
refresh_formats(
str(media.pk),
verbose_name=f'Refreshing metadata formats for: {media.key}: "{media.name}"',
)
refresh_formats(str(media.pk))
# Expected file doesn't exist on disk
err = (f'Failed to download media: {media} (UUID: {media.pk}) to disk, '
f'expected outfile does not exist: {filepath}')
@ -817,7 +814,7 @@ def download_media(media_id, override=False):
schedule_media_servers_update()
@background(schedule=dict(priority=0, run_at=30), queue=Val(TaskQueue.NET), remove_existing_tasks=True)
@db_task(delay=30, expires=210, priority=100, queue=Val(TaskQueue.NET))
def rescan_media_server(mediaserver_id):
'''
Attempts to request a media rescan on a remote media server.
@ -826,7 +823,7 @@ def rescan_media_server(mediaserver_id):
mediaserver = MediaServer.objects.get(pk=mediaserver_id)
except MediaServer.DoesNotExist as e:
# Task triggered but the media server no longer exists, do nothing
raise InvalidTaskError(_('no such server')) from e
raise CancelExecution(_('no such server'), retry=False) from e
# Request an rescan / update
log.info(f'Updating media server: {mediaserver}')
mediaserver.update()
@ -879,10 +876,7 @@ def save_all_media_for_source(source_id):
tvn_format = '1/{:,}' + f'/{refresh_qs.count():,}'
for mn, media in enumerate(qs_gen(refresh_qs), start=1):
update_task_status(task, tvn_format.format(mn))
refresh_formats(
str(media.pk),
verbose_name=f'Refreshing metadata formats for: {media.key}: "{media.name}"',
)
refresh_formats(str(media.pk))
saved_later.add(media.uuid)
# Keep out of the way of the index task!
@ -904,19 +898,15 @@ def save_all_media_for_source(source_id):
update_task_status(task, None)
@background(schedule=dict(priority=50, run_at=0), queue=Val(TaskQueue.NET), remove_existing_tasks=True)
@dynamic_retry(db_task, backoff_func=lambda n: (n*3600)+600, priority=50, retries=15, queue=Val(TaskQueue.LIMIT))
def refresh_formats(media_id):
try:
media = Media.objects.get(pk=media_id)
except Media.DoesNotExist as e:
raise InvalidTaskError(_('no such media')) from e
try:
media.refresh_formats
except YouTubeError as e:
log.debug(f'Failed to refresh formats for: {media.source} / {media.key}: {e!s}')
pass
raise CancelExecution(_('no such media'), retry=False) from e
else:
save_model(media)
if media.refresh_formats:
save_model(media)
@background(schedule=dict(priority=20, run_at=60), queue=Val(TaskQueue.FS), remove_existing_tasks=True)

View File

@ -1833,7 +1833,7 @@ class TasksTestCase(TestCase):
self.assertEqual(src1.media_source.all().count(), 3)
self.assertEqual(src2.media_source.all().count(), 3)
cleanup_old_media()
cleanup_old_media.call_local(durable=False)
self.assertEqual(src1.media_source.all().count(), 3)
self.assertEqual(src2.media_source.all().count(), 3)