Merge pull request #1151 from tcely/patch-5
Some checks are pending
CI / info (push) Waiting to run
CI / test (3.10) (push) Waiting to run
CI / test (3.11) (push) Waiting to run
CI / test (3.12) (push) Waiting to run
CI / test (3.13) (push) Waiting to run
CI / containerise (push) Blocked by required conditions

Move more work into `huey` queues
This commit is contained in:
meeb 2025-06-24 17:03:59 +10:00 committed by GitHub
commit fd67be5e5b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -328,7 +328,7 @@ def wait_for_errors(model, /, *, queue_name=None, task_name=None):
raise BgTaskWorkerError(_('queue worker stopped')) raise BgTaskWorkerError(_('queue worker stopped'))
@db_task(queue=Val(TaskQueue.FS)) @db_task(priority=90, queue=Val(TaskQueue.FS))
def cleanup_old_media(durable=True): def cleanup_old_media(durable=True):
with atomic(durable=durable): with atomic(durable=durable):
for source in qs_gen(Source.objects.filter(delete_old_media=True, days_to_keep__gt=0)): for source in qs_gen(Source.objects.filter(delete_old_media=True, days_to_keep__gt=0)):
@ -349,7 +349,7 @@ def cleanup_old_media(durable=True):
schedule_media_servers_update() schedule_media_servers_update()
@db_task(queue=Val(TaskQueue.FS)) @db_task(priority=90, queue=Val(TaskQueue.FS))
def cleanup_removed_media(source_id, video_keys): def cleanup_removed_media(source_id, video_keys):
try: try:
source = Source.objects.get(pk=source_id) source = Source.objects.get(pk=source_id)
@ -451,8 +451,8 @@ def wait_for_database_queue():
) )
@background(schedule=dict(priority=20, run_at=30), queue=Val(TaskQueue.NET), remove_existing_tasks=True) @db_task(delay=30, priority=80, queue=Val(TaskQueue.LIMIT))
def index_source_task(source_id): def index_source(source_id):
''' '''
Indexes media available from a Source object. Indexes media available from a Source object.
''' '''
@ -464,10 +464,10 @@ def index_source_task(source_id):
source = Source.objects.get(pk=source_id) source = Source.objects.get(pk=source_id)
except Source.DoesNotExist as e: except Source.DoesNotExist as e:
# Task triggered but the Source has been deleted, delete the task # Task triggered but the Source has been deleted, delete the task
raise InvalidTaskError(_('no such source')) from e raise CancelExecution(_('no such source'), retry=False) from e
# An inactive Source would return an empty list for videos anyway # An inactive Source would return an empty list for videos anyway
if not source.is_active: if not source.is_active:
return return False
# update the target schedule column # update the target schedule column
source.task_run_at_dt source.task_run_at_dt
# Reset any errors # Reset any errors
@ -609,6 +609,16 @@ def index_source_task(source_id):
schedule=dict(run_at=60), schedule=dict(run_at=60),
verbose_name=vn_fmt.format(source.name), verbose_name=vn_fmt.format(source.name),
) )
return True
@background(schedule=dict(priority=20, run_at=30), queue=Val(TaskQueue.NET), remove_existing_tasks=True)
def index_source_task(source_id):
try:
res = index_source(source_id)
return res.get(blocking=True)
except CancelExecution as e:
raise InvalidTaskError(str(e)) from e
@dynamic_retry(db_task, priority=100, retries=15, queue=Val(TaskQueue.FS)) @dynamic_retry(db_task, priority=100, retries=15, queue=Val(TaskQueue.FS))
@ -682,8 +692,53 @@ def download_source_images(source_id):
log.info(f'Thumbnail downloaded for source with ID: {source_id} / {source}') log.info(f'Thumbnail downloaded for source with ID: {source_id} / {source}')
@background(schedule=dict(priority=40, run_at=60), queue=Val(TaskQueue.NET), remove_existing_tasks=True) @db_task(delay=60, priority=90, retries=5, retry_delay=60, queue=Val(TaskQueue.FS))
def download_media_metadata(media_id): @atomic(durable=True)
def delete_media(media_id):
try:
media = Media.objects.get(pk=media_id)
except Media.DoesNotExist as e:
raise CancelExecution(_('no such media'), retry=False) from e
else:
media.delete()
return True
return False
@db_task(delay=60, priority=70, retries=5, retry_delay=60, queue=Val(TaskQueue.FS))
@atomic(durable=True)
def rename_media(media_id):
try:
media = Media.objects.get(pk=media_id)
except Media.DoesNotExist as e:
raise CancelExecution(_('no such media'), retry=False) from e
else:
with huey_lock_task(
f'media:{media.uuid}',
queue=Val(TaskQueue.DB),
):
media.rename_files()
@db_task(delay=60, priority=80, retries=5, retry_delay=60, queue=Val(TaskQueue.FS))
@atomic(durable=True)
def save_media(media_id):
try:
media = Media.objects.get(pk=media_id)
except Media.DoesNotExist as e:
raise CancelExecution(_('no such media'), retry=False) from e
else:
with huey_lock_task(
f'media:{media.uuid}',
queue=Val(TaskQueue.DB),
):
media.save()
return True
return False
@db_task(delay=60, priority=60, queue=Val(TaskQueue.LIMIT))
def download_metadata(media_id):
''' '''
Downloads the metadata for a media item. Downloads the metadata for a media item.
''' '''
@ -693,10 +748,10 @@ def download_media_metadata(media_id):
# Task triggered but the media no longer exists, do nothing # Task triggered but the media no longer exists, do nothing
log.error(f'Task download_media_metadata(pk={media_id}) called but no ' log.error(f'Task download_media_metadata(pk={media_id}) called but no '
f'media exists with ID: {media_id}') f'media exists with ID: {media_id}')
raise InvalidTaskError(_('no such media')) from e raise CancelExecution(_('no such media'), retry=False) from e
if media.manual_skip: if media.manual_skip:
log.info(f'Task for ID: {media_id} / {media} skipped, due to task being manually skipped.') log.info(f'Task for ID: {media_id} / {media} skipped, due to task being manually skipped.')
return return False
source = media.source source = media.source
wait_for_errors( wait_for_errors(
media, media,
@ -744,7 +799,7 @@ def download_media_metadata(media_id):
if raise_exception: if raise_exception:
raise raise
log.debug(str(e)) log.debug(str(e))
return return False
response = metadata response = metadata
if getattr(settings, 'SHRINK_NEW_MEDIA_METADATA', False): if getattr(settings, 'SHRINK_NEW_MEDIA_METADATA', False):
response = filter_response(metadata, True) response = filter_response(metadata, True)
@ -779,6 +834,16 @@ def download_media_metadata(media_id):
save_model(media) save_model(media)
log.info(f'Saved {len(media.metadata_dumps())} bytes of metadata for: ' log.info(f'Saved {len(media.metadata_dumps())} bytes of metadata for: '
f'{source} / {media}: {media_id}') f'{source} / {media}: {media_id}')
return True
@background(schedule=dict(priority=40, run_at=60), queue=Val(TaskQueue.NET), remove_existing_tasks=True)
def download_media_metadata(media_id):
try:
res = download_metadata(media_id)
return res.get(blocking=True)
except CancelExecution as e:
raise InvalidTaskError(str(e)) from e
@dynamic_retry(db_task, delay=10, priority=90, retries=15, queue=Val(TaskQueue.NET)) @dynamic_retry(db_task, delay=10, priority=90, retries=15, queue=Val(TaskQueue.NET))
@ -796,7 +861,7 @@ def download_media_image(media_id, url):
# Media was toggled to be skipped after the task was scheduled # Media was toggled to be skipped after the task was scheduled
log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but ' log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but '
f'it is now marked to be skipped, not downloading thumbnail') f'it is now marked to be skipped, not downloading thumbnail')
return return False
width = getattr(settings, 'MEDIA_THUMBNAIL_WIDTH', 430) width = getattr(settings, 'MEDIA_THUMBNAIL_WIDTH', 430)
height = getattr(settings, 'MEDIA_THUMBNAIL_HEIGHT', 240) height = getattr(settings, 'MEDIA_THUMBNAIL_HEIGHT', 240)
try: try:
@ -856,8 +921,8 @@ def download_media_thumbnail(media_id, url):
except CancelExecution as e: except CancelExecution as e:
raise InvalidTaskError(str(e)) from e raise InvalidTaskError(str(e)) from e
@background(schedule=dict(priority=30, run_at=60), queue=Val(TaskQueue.NET), remove_existing_tasks=True) @db_task(delay=60, priority=70, queue=Val(TaskQueue.LIMIT))
def download_media(media_id, override=False): def download_media_file(media_id, override=False):
''' '''
Downloads the media to disk and attaches it to the Media instance. Downloads the media to disk and attaches it to the Media instance.
''' '''
@ -865,12 +930,12 @@ def download_media(media_id, override=False):
media = Media.objects.get(pk=media_id) media = Media.objects.get(pk=media_id)
except Media.DoesNotExist as e: except Media.DoesNotExist as e:
# Task triggered but the media no longer exists, do nothing # Task triggered but the media no longer exists, do nothing
raise InvalidTaskError(_('no such media')) from e raise CancelExecution(_('no such media'), retry=False) from e
else: else:
if not media.download_checklist(override): if not media.download_checklist(override):
# any condition that needs to reschedule the task # any condition that needs to reschedule the task
# should raise an exception to avoid this # should raise an exception to avoid this
return return False
wait_for_errors( wait_for_errors(
media, media,
@ -909,6 +974,16 @@ def download_media(media_id, override=False):
media.write_nfo_file() media.write_nfo_file()
# Schedule a task to update media servers # Schedule a task to update media servers
schedule_media_servers_update() schedule_media_servers_update()
return True
@background(schedule=dict(priority=30, run_at=60), queue=Val(TaskQueue.NET), remove_existing_tasks=True)
def download_media(media_id, override=False):
try:
res = download_media_file(media_id, override)
return res.get(blocking=True)
except CancelExecution as e:
raise InvalidTaskError(str(e)) from e
@db_task(delay=30, expires=210, priority=100, queue=Val(TaskQueue.NET)) @db_task(delay=30, expires=210, priority=100, queue=Val(TaskQueue.NET))
@ -986,13 +1061,18 @@ def save_all_media_for_source(source_id):
# Trigger the post_save signal for each media item linked to this source as various # Trigger the post_save signal for each media item linked to this source as various
# flags may need to be recalculated # flags may need to be recalculated
saved_now = set()
tvn_format = '2/{:,}' + f'/{save_qs.count():,}' tvn_format = '2/{:,}' + f'/{save_qs.count():,}'
for mn, media in enumerate(qs_gen(save_qs), start=1): for mn, media in enumerate(qs_gen(save_qs), start=1):
if media.uuid not in saved_later: if media.uuid not in saved_later:
update_task_status(task, tvn_format.format(mn)) update_task_status(task, tvn_format.format(mn))
save_model(media) saved_now.add(str(media.pk))
#save_model(media)
# Reset task.verbose_name to the saved value # Reset task.verbose_name to the saved value
update_task_status(task, None) update_task_status(task, None)
# wait for tasks to complete
res = save_media.map(saved_now)
res.get(blocking=True)
@dynamic_retry(db_task, backoff_func=lambda n: (n*3600)+600, priority=50, retries=15, queue=Val(TaskQueue.LIMIT)) @dynamic_retry(db_task, backoff_func=lambda n: (n*3600)+600, priority=50, retries=15, queue=Val(TaskQueue.LIMIT))
@ -1031,21 +1111,6 @@ def refresh_formats(media_id):
save_model(media) save_model(media)
@db_task(delay=60, priority=80, retries=5, retry_delay=60, queue=Val(TaskQueue.FS))
@atomic(durable=True)
def rename_media(media_id):
try:
media = Media.objects.get(pk=media_id)
except Media.DoesNotExist as e:
raise CancelExecution(_('no such media'), retry=False) from e
else:
with huey_lock_task(
f'media:{media.uuid}',
queue=Val(TaskQueue.DB),
):
media.rename_files()
@db_task(delay=300, priority=80, retries=5, retry_delay=600, queue=Val(TaskQueue.FS)) @db_task(delay=300, priority=80, retries=5, retry_delay=600, queue=Val(TaskQueue.FS))
@atomic(durable=True) @atomic(durable=True)
def rename_all_media_for_source(source_id): def rename_all_media_for_source(source_id):
@ -1067,7 +1132,7 @@ def rename_all_media_for_source(source_id):
) )
if not create_rename_tasks: if not create_rename_tasks:
return None return None
mqs = Media.objects.all().filter( mqs = Media.objects.filter(
source=source, source=source,
downloaded=True, downloaded=True,
) )
@ -1076,7 +1141,7 @@ def rename_all_media_for_source(source_id):
f'media:{media.uuid}', f'media:{media.uuid}',
queue=Val(TaskQueue.DB), queue=Val(TaskQueue.DB),
): ):
with atomic(): with atomic(durable=False):
media.rename_files() media.rename_files()
@ -1116,6 +1181,7 @@ def delete_all_media_for_source(source_id, source_name, source_directory):
).filter( ).filter(
source=source or source_id, source=source or source_id,
) )
deleted_now = set()
with atomic(durable=True): with atomic(durable=True):
for media in qs_gen(mqs): for media in qs_gen(mqs):
log.info(f'Deleting media for source: {source_name} item: {media.name}') log.info(f'Deleting media for source: {source_name} item: {media.name}')
@ -1124,7 +1190,10 @@ def delete_all_media_for_source(source_id, source_name, source_directory):
media.skip = True media.skip = True
media.manual_skip = True media.manual_skip = True
media.save() media.save()
media.delete() deleted_now.add(str(media.pk))
#media.delete()
res = delete_media.map(deleted_now)
res.get(blocking=True)
# Remove the directory, if the user requested that # Remove the directory, if the user requested that
directory_path = Path(source_directory) directory_path = Path(source_directory)
remove = ( remove = (