mirror of
https://github.com/meeb/tubesync.git
synced 2025-06-25 22:46:34 +00:00
Move more work into huey
queues
This commit is contained in:
parent
e221361549
commit
ce63b2806c
@ -328,7 +328,7 @@ def wait_for_errors(model, /, *, queue_name=None, task_name=None):
|
||||
raise BgTaskWorkerError(_('queue worker stopped'))
|
||||
|
||||
|
||||
@db_task(queue=Val(TaskQueue.FS))
|
||||
@db_task(priority=90, queue=Val(TaskQueue.FS))
|
||||
def cleanup_old_media(durable=True):
|
||||
with atomic(durable=durable):
|
||||
for source in qs_gen(Source.objects.filter(delete_old_media=True, days_to_keep__gt=0)):
|
||||
@ -349,7 +349,7 @@ def cleanup_old_media(durable=True):
|
||||
schedule_media_servers_update()
|
||||
|
||||
|
||||
@db_task(queue=Val(TaskQueue.FS))
|
||||
@db_task(priority=90, queue=Val(TaskQueue.FS))
|
||||
def cleanup_removed_media(source_id, video_keys):
|
||||
try:
|
||||
source = Source.objects.get(pk=source_id)
|
||||
@ -451,8 +451,8 @@ def wait_for_database_queue():
|
||||
)
|
||||
|
||||
|
||||
@background(schedule=dict(priority=20, run_at=30), queue=Val(TaskQueue.NET), remove_existing_tasks=True)
|
||||
def index_source_task(source_id):
|
||||
@db_task(delay=30, priority=80, queue=Val(TaskQueue.LIMIT))
|
||||
def index_source(source_id):
|
||||
'''
|
||||
Indexes media available from a Source object.
|
||||
'''
|
||||
@ -464,10 +464,10 @@ def index_source_task(source_id):
|
||||
source = Source.objects.get(pk=source_id)
|
||||
except Source.DoesNotExist as e:
|
||||
# Task triggered but the Source has been deleted, delete the task
|
||||
raise InvalidTaskError(_('no such source')) from e
|
||||
raise CancelExecution(_('no such source'), retry=False) from e
|
||||
# An inactive Source would return an empty list for videos anyway
|
||||
if not source.is_active:
|
||||
return
|
||||
return False
|
||||
# update the target schedule column
|
||||
source.task_run_at_dt
|
||||
# Reset any errors
|
||||
@ -609,6 +609,16 @@ def index_source_task(source_id):
|
||||
schedule=dict(run_at=60),
|
||||
verbose_name=vn_fmt.format(source.name),
|
||||
)
|
||||
return True
|
||||
|
||||
|
||||
@background(schedule=dict(priority=20, run_at=30), queue=Val(TaskQueue.NET), remove_existing_tasks=True)
|
||||
def index_source_task(source_id):
|
||||
try:
|
||||
res = index_source(source_id)
|
||||
return res.get(blocking=True)
|
||||
except CancelExecution as e:
|
||||
raise InvalidTaskError(str(e)) from e
|
||||
|
||||
|
||||
@dynamic_retry(db_task, priority=100, retries=15, queue=Val(TaskQueue.FS))
|
||||
@ -682,8 +692,53 @@ def download_source_images(source_id):
|
||||
log.info(f'Thumbnail downloaded for source with ID: {source_id} / {source}')
|
||||
|
||||
|
||||
@background(schedule=dict(priority=40, run_at=60), queue=Val(TaskQueue.NET), remove_existing_tasks=True)
|
||||
def download_media_metadata(media_id):
|
||||
@db_task(delay=60, priority=90, retries=5, retry_delay=60, queue=Val(TaskQueue.FS))
|
||||
@atomic(durable=True)
|
||||
def delete_media(media_id):
|
||||
try:
|
||||
media = Media.objects.get(pk=media_id)
|
||||
except Media.DoesNotExist as e:
|
||||
raise CancelExecution(_('no such media'), retry=False) from e
|
||||
else:
|
||||
media.delete()
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
@db_task(delay=60, priority=70, retries=5, retry_delay=60, queue=Val(TaskQueue.FS))
|
||||
@atomic(durable=True)
|
||||
def rename_media(media_id):
|
||||
try:
|
||||
media = Media.objects.get(pk=media_id)
|
||||
except Media.DoesNotExist as e:
|
||||
raise CancelExecution(_('no such media'), retry=False) from e
|
||||
else:
|
||||
with huey_lock_task(
|
||||
f'media:{media.uuid}',
|
||||
queue=Val(TaskQueue.DB),
|
||||
):
|
||||
media.rename_files()
|
||||
|
||||
|
||||
@db_task(delay=60, priority=80, retries=5, retry_delay=60, queue=Val(TaskQueue.FS))
|
||||
@atomic(durable=True)
|
||||
def save_media(media_id):
|
||||
try:
|
||||
media = Media.objects.get(pk=media_id)
|
||||
except Media.DoesNotExist as e:
|
||||
raise CancelExecution(_('no such media'), retry=False) from e
|
||||
else:
|
||||
with huey_lock_task(
|
||||
f'media:{media.uuid}',
|
||||
queue=Val(TaskQueue.DB),
|
||||
):
|
||||
media.save()
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
@db_task(delay=60, priority=60, queue=Val(TaskQueue.LIMIT))
|
||||
def download_metadata(media_id):
|
||||
'''
|
||||
Downloads the metadata for a media item.
|
||||
'''
|
||||
@ -693,10 +748,10 @@ def download_media_metadata(media_id):
|
||||
# Task triggered but the media no longer exists, do nothing
|
||||
log.error(f'Task download_media_metadata(pk={media_id}) called but no '
|
||||
f'media exists with ID: {media_id}')
|
||||
raise InvalidTaskError(_('no such media')) from e
|
||||
raise CancelExecution(_('no such media'), retry=False) from e
|
||||
if media.manual_skip:
|
||||
log.info(f'Task for ID: {media_id} / {media} skipped, due to task being manually skipped.')
|
||||
return
|
||||
return False
|
||||
source = media.source
|
||||
wait_for_errors(
|
||||
media,
|
||||
@ -744,7 +799,7 @@ def download_media_metadata(media_id):
|
||||
if raise_exception:
|
||||
raise
|
||||
log.debug(str(e))
|
||||
return
|
||||
return False
|
||||
response = metadata
|
||||
if getattr(settings, 'SHRINK_NEW_MEDIA_METADATA', False):
|
||||
response = filter_response(metadata, True)
|
||||
@ -779,6 +834,16 @@ def download_media_metadata(media_id):
|
||||
save_model(media)
|
||||
log.info(f'Saved {len(media.metadata_dumps())} bytes of metadata for: '
|
||||
f'{source} / {media}: {media_id}')
|
||||
return True
|
||||
|
||||
|
||||
@background(schedule=dict(priority=40, run_at=60), queue=Val(TaskQueue.NET), remove_existing_tasks=True)
|
||||
def download_media_metadata(media_id):
|
||||
try:
|
||||
res = download_metadata(media_id)
|
||||
return res.get(blocking=True)
|
||||
except CancelExecution as e:
|
||||
raise InvalidTaskError(str(e)) from e
|
||||
|
||||
|
||||
@dynamic_retry(db_task, delay=10, priority=90, retries=15, queue=Val(TaskQueue.NET))
|
||||
@ -796,7 +861,7 @@ def download_media_image(media_id, url):
|
||||
# Media was toggled to be skipped after the task was scheduled
|
||||
log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but '
|
||||
f'it is now marked to be skipped, not downloading thumbnail')
|
||||
return
|
||||
return False
|
||||
width = getattr(settings, 'MEDIA_THUMBNAIL_WIDTH', 430)
|
||||
height = getattr(settings, 'MEDIA_THUMBNAIL_HEIGHT', 240)
|
||||
try:
|
||||
@ -856,8 +921,8 @@ def download_media_thumbnail(media_id, url):
|
||||
except CancelExecution as e:
|
||||
raise InvalidTaskError(str(e)) from e
|
||||
|
||||
@background(schedule=dict(priority=30, run_at=60), queue=Val(TaskQueue.NET), remove_existing_tasks=True)
|
||||
def download_media(media_id, override=False):
|
||||
@db_task(delay=60, priority=70, queue=Val(TaskQueue.LIMIT))
|
||||
def download_media_file(media_id, override=False):
|
||||
'''
|
||||
Downloads the media to disk and attaches it to the Media instance.
|
||||
'''
|
||||
@ -865,12 +930,12 @@ def download_media(media_id, override=False):
|
||||
media = Media.objects.get(pk=media_id)
|
||||
except Media.DoesNotExist as e:
|
||||
# Task triggered but the media no longer exists, do nothing
|
||||
raise InvalidTaskError(_('no such media')) from e
|
||||
raise CancelExecution(_('no such media'), retry=False) from e
|
||||
else:
|
||||
if not media.download_checklist(override):
|
||||
# any condition that needs to reschedule the task
|
||||
# should raise an exception to avoid this
|
||||
return
|
||||
return False
|
||||
|
||||
wait_for_errors(
|
||||
media,
|
||||
@ -909,6 +974,16 @@ def download_media(media_id, override=False):
|
||||
media.write_nfo_file()
|
||||
# Schedule a task to update media servers
|
||||
schedule_media_servers_update()
|
||||
return True
|
||||
|
||||
|
||||
@background(schedule=dict(priority=30, run_at=60), queue=Val(TaskQueue.NET), remove_existing_tasks=True)
|
||||
def download_media(media_id, override=False):
|
||||
try:
|
||||
res = download_media_file(media_id, override)
|
||||
return res.get(blocking=True)
|
||||
except CancelExecution as e:
|
||||
raise InvalidTaskError(str(e)) from e
|
||||
|
||||
|
||||
@db_task(delay=30, expires=210, priority=100, queue=Val(TaskQueue.NET))
|
||||
@ -986,13 +1061,18 @@ def save_all_media_for_source(source_id):
|
||||
|
||||
# Trigger the post_save signal for each media item linked to this source as various
|
||||
# flags may need to be recalculated
|
||||
saved_now = set()
|
||||
tvn_format = '2/{:,}' + f'/{save_qs.count():,}'
|
||||
for mn, media in enumerate(qs_gen(save_qs), start=1):
|
||||
if media.uuid not in saved_later:
|
||||
update_task_status(task, tvn_format.format(mn))
|
||||
save_model(media)
|
||||
saved_now.add(str(media.pk))
|
||||
#save_model(media)
|
||||
# Reset task.verbose_name to the saved value
|
||||
update_task_status(task, None)
|
||||
# wait for tasks to complete
|
||||
res = save_media.map(saved_now)
|
||||
res.get(blocking=True)
|
||||
|
||||
|
||||
@dynamic_retry(db_task, backoff_func=lambda n: (n*3600)+600, priority=50, retries=15, queue=Val(TaskQueue.LIMIT))
|
||||
@ -1031,21 +1111,6 @@ def refresh_formats(media_id):
|
||||
save_model(media)
|
||||
|
||||
|
||||
@db_task(delay=60, priority=80, retries=5, retry_delay=60, queue=Val(TaskQueue.FS))
|
||||
@atomic(durable=True)
|
||||
def rename_media(media_id):
|
||||
try:
|
||||
media = Media.objects.get(pk=media_id)
|
||||
except Media.DoesNotExist as e:
|
||||
raise CancelExecution(_('no such media'), retry=False) from e
|
||||
else:
|
||||
with huey_lock_task(
|
||||
f'media:{media.uuid}',
|
||||
queue=Val(TaskQueue.DB),
|
||||
):
|
||||
media.rename_files()
|
||||
|
||||
|
||||
@db_task(delay=300, priority=80, retries=5, retry_delay=600, queue=Val(TaskQueue.FS))
|
||||
@atomic(durable=True)
|
||||
def rename_all_media_for_source(source_id):
|
||||
@ -1067,7 +1132,7 @@ def rename_all_media_for_source(source_id):
|
||||
)
|
||||
if not create_rename_tasks:
|
||||
return None
|
||||
mqs = Media.objects.all().filter(
|
||||
mqs = Media.objects.filter(
|
||||
source=source,
|
||||
downloaded=True,
|
||||
)
|
||||
@ -1076,7 +1141,7 @@ def rename_all_media_for_source(source_id):
|
||||
f'media:{media.uuid}',
|
||||
queue=Val(TaskQueue.DB),
|
||||
):
|
||||
with atomic():
|
||||
with atomic(durable=False):
|
||||
media.rename_files()
|
||||
|
||||
|
||||
@ -1116,6 +1181,7 @@ def delete_all_media_for_source(source_id, source_name, source_directory):
|
||||
).filter(
|
||||
source=source or source_id,
|
||||
)
|
||||
deleted_now = set()
|
||||
with atomic(durable=True):
|
||||
for media in qs_gen(mqs):
|
||||
log.info(f'Deleting media for source: {source_name} item: {media.name}')
|
||||
@ -1124,7 +1190,10 @@ def delete_all_media_for_source(source_id, source_name, source_directory):
|
||||
media.skip = True
|
||||
media.manual_skip = True
|
||||
media.save()
|
||||
media.delete()
|
||||
deleted_now.add(str(media.pk))
|
||||
#media.delete()
|
||||
res = delete_media.map(deleted_now)
|
||||
res.get(blocking=True)
|
||||
# Remove the directory, if the user requested that
|
||||
directory_path = Path(source_directory)
|
||||
remove = (
|
||||
|
Loading…
Reference in New Issue
Block a user