mirror of
https://github.com/meeb/tubesync.git
synced 2025-06-24 05:56:37 +00:00
Review of tasks.py
This commit is contained in:
parent
17a8cf036c
commit
eab0ad9d7c
@ -21,6 +21,7 @@ from django.db.transaction import atomic
|
||||
from django.utils import timezone
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from background_task import background
|
||||
from background_task.exceptions import InvalidTaskError
|
||||
from background_task.models import Task, CompletedTask
|
||||
from common.logger import log
|
||||
from common.errors import NoMediaException, NoMetadataException, DownloadFailedException
|
||||
@ -123,7 +124,8 @@ def update_task_status(task, status):
|
||||
except DatabaseError as e:
|
||||
if 'Save with update_fields did not affect any rows.' == str(e):
|
||||
pass
|
||||
raise
|
||||
else:
|
||||
raise
|
||||
return True
|
||||
|
||||
|
||||
@ -136,6 +138,7 @@ def get_source_completed_tasks(source_id, only_errors=False):
|
||||
q['failed_at__isnull'] = False
|
||||
return CompletedTask.objects.filter(**q).order_by('-failed_at')
|
||||
|
||||
|
||||
def get_tasks(task_name, id=None, /, instance=None):
|
||||
assert not (id is None and instance is None)
|
||||
arg = str(id or instance.pk)
|
||||
@ -160,6 +163,7 @@ def get_source_check_task(source_id):
|
||||
def get_source_index_task(source_id):
|
||||
return get_first_task('sync.tasks.index_source_task', source_id)
|
||||
|
||||
|
||||
def delete_task_by_source(task_name, source_id):
|
||||
now = timezone.now()
|
||||
unlocked = Task.objects.unlocked(now)
|
||||
@ -191,7 +195,7 @@ def schedule_media_servers_update():
|
||||
for mediaserver in MediaServer.objects.all():
|
||||
rescan_media_server(
|
||||
str(mediaserver.pk),
|
||||
priority=30,
|
||||
priority=10,
|
||||
verbose_name=verbose_name.format(mediaserver),
|
||||
remove_existing_tasks=True,
|
||||
)
|
||||
@ -225,7 +229,7 @@ def cleanup_removed_media(source, videos):
|
||||
schedule_media_servers_update()
|
||||
|
||||
|
||||
@background(schedule=300, remove_existing_tasks=True)
|
||||
@background(schedule=dict(run_at=300), remove_existing_tasks=True)
|
||||
def index_source_task(source_id):
|
||||
'''
|
||||
Indexes media available from a Source object.
|
||||
@ -235,18 +239,20 @@ def index_source_task(source_id):
|
||||
cleanup_old_media()
|
||||
try:
|
||||
source = Source.objects.get(pk=source_id)
|
||||
except Source.DoesNotExist:
|
||||
except Source.DoesNotExist as e:
|
||||
# Task triggered but the Source has been deleted, delete the task
|
||||
return
|
||||
raise InvalidTaskError(_('no such source')) from e
|
||||
# An inactive Source would return an empty list for videos anyway
|
||||
if not source.is_active:
|
||||
return
|
||||
# Reset any errors
|
||||
# TODO: determine if this affects anything
|
||||
source.has_failed = False
|
||||
source.save()
|
||||
# Index the source
|
||||
videos = source.index_media()
|
||||
if not videos:
|
||||
# TODO: Record this error in source.has_failed ?
|
||||
raise NoMediaException(f'Source "{source}" (ID: {source_id}) returned no '
|
||||
f'media to index, is the source key valid? Check the '
|
||||
f'source configuration is correct and that the source '
|
||||
@ -310,7 +316,7 @@ def index_source_task(source_id):
|
||||
cleanup_removed_media(source, videos)
|
||||
|
||||
|
||||
@background(schedule=0)
|
||||
@background(schedule=dict(run_at=0))
|
||||
def check_source_directory_exists(source_id):
|
||||
'''
|
||||
Checks the output directory for a source exists and is writable, if it does
|
||||
@ -319,17 +325,17 @@ def check_source_directory_exists(source_id):
|
||||
'''
|
||||
try:
|
||||
source = Source.objects.get(pk=source_id)
|
||||
except Source.DoesNotExist:
|
||||
except Source.DoesNotExist as e:
|
||||
# Task triggered but the Source has been deleted, delete the task
|
||||
return
|
||||
raise InvalidTaskError(_('no such source')) from e
|
||||
# Check the source output directory exists
|
||||
if not source.directory_exists():
|
||||
# Try and create it
|
||||
# Try to create it
|
||||
log.info(f'Creating directory: {source.directory_path}')
|
||||
source.make_directory()
|
||||
|
||||
|
||||
@background(schedule=0)
|
||||
@background(schedule=dict(run_at=0))
|
||||
def download_source_images(source_id):
|
||||
'''
|
||||
Downloads an image and save it as a local thumbnail attached to a
|
||||
@ -337,11 +343,11 @@ def download_source_images(source_id):
|
||||
'''
|
||||
try:
|
||||
source = Source.objects.get(pk=source_id)
|
||||
except Source.DoesNotExist:
|
||||
except Source.DoesNotExist as e:
|
||||
# Task triggered but the source no longer exists, do nothing
|
||||
log.error(f'Task download_source_images(pk={source_id}) called but no '
|
||||
f'source exists with ID: {source_id}')
|
||||
return
|
||||
raise InvalidTaskError(_('no such source')) from e
|
||||
avatar, banner = source.get_image_url
|
||||
log.info(f'Thumbnail URL for source with ID: {source_id} / {source} '
|
||||
f'Avatar: {avatar} '
|
||||
@ -379,18 +385,18 @@ def download_source_images(source_id):
|
||||
log.info(f'Thumbnail downloaded for source with ID: {source_id} / {source}')
|
||||
|
||||
|
||||
@background(schedule=60, remove_existing_tasks=True)
|
||||
@background(schedule=dict(run_at=60), remove_existing_tasks=True)
|
||||
def download_media_metadata(media_id):
|
||||
'''
|
||||
Downloads the metadata for a media item.
|
||||
'''
|
||||
try:
|
||||
media = Media.objects.get(pk=media_id)
|
||||
except Media.DoesNotExist:
|
||||
except Media.DoesNotExist as e:
|
||||
# Task triggered but the media no longer exists, do nothing
|
||||
log.error(f'Task download_media_metadata(pk={media_id}) called but no '
|
||||
f'media exists with ID: {media_id}')
|
||||
return
|
||||
raise InvalidTaskError(_('no such media')) from e
|
||||
if media.manual_skip:
|
||||
log.info(f'Task for ID: {media_id} / {media} skipped, due to task being manually skipped.')
|
||||
return
|
||||
@ -466,7 +472,7 @@ def download_media_metadata(media_id):
|
||||
f'{source} / {media}: {media_id}')
|
||||
|
||||
|
||||
@background(schedule=60, remove_existing_tasks=True)
|
||||
@background(schedule=dict(run_at=60), remove_existing_tasks=True)
|
||||
def download_media_thumbnail(media_id, url):
|
||||
'''
|
||||
Downloads an image from a URL and save it as a local thumbnail attached to a
|
||||
@ -474,10 +480,10 @@ def download_media_thumbnail(media_id, url):
|
||||
'''
|
||||
try:
|
||||
media = Media.objects.get(pk=media_id)
|
||||
except Media.DoesNotExist:
|
||||
except Media.DoesNotExist as e:
|
||||
# Task triggered but the media no longer exists, do nothing
|
||||
return
|
||||
if media.skip:
|
||||
raise InvalidTaskError(_('no such media')) from e
|
||||
if media.skip or media.manual_skip:
|
||||
# Media was toggled to be skipped after the task was scheduled
|
||||
log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but '
|
||||
f'it is now marked to be skipped, not downloading thumbnail')
|
||||
@ -504,38 +510,43 @@ def download_media_thumbnail(media_id, url):
|
||||
return True
|
||||
|
||||
|
||||
@background(schedule=60, remove_existing_tasks=True)
|
||||
@background(schedule=dict(run_at=60), remove_existing_tasks=True)
|
||||
def download_media(media_id):
|
||||
'''
|
||||
Downloads the media to disk and attaches it to the Media instance.
|
||||
'''
|
||||
try:
|
||||
media = Media.objects.get(pk=media_id)
|
||||
except Media.DoesNotExist:
|
||||
except Media.DoesNotExist as e:
|
||||
# Task triggered but the media no longer exists, do nothing
|
||||
return
|
||||
if not media.has_metadata:
|
||||
raise NoMetadataException('Metadata is not yet available.')
|
||||
if media.skip:
|
||||
# Media was toggled to be skipped after the task was scheduled
|
||||
log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but '
|
||||
f'it is now marked to be skipped, not downloading')
|
||||
return
|
||||
downloaded_file_exists = (
|
||||
media.media_file_exists or
|
||||
media.filepath.exists()
|
||||
)
|
||||
if media.downloaded and downloaded_file_exists:
|
||||
# Media has been marked as downloaded before the download_media task was fired,
|
||||
# skip it
|
||||
log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but '
|
||||
f'it has already been marked as downloaded, not downloading again')
|
||||
return
|
||||
raise InvalidTaskError(_('no such media')) from e
|
||||
if not media.source.download_media:
|
||||
log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but '
|
||||
f'the source {media.source} has since been marked to not download, '
|
||||
f'not downloading')
|
||||
return
|
||||
if media.skip or media.manual_skip:
|
||||
# Media was toggled to be skipped after the task was scheduled
|
||||
log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but '
|
||||
f'it is now marked to be skipped, not downloading')
|
||||
return
|
||||
# metadata is required to generate the proper filepath
|
||||
if not media.has_metadata:
|
||||
raise NoMetadataException('Metadata is not yet available.')
|
||||
downloaded_file_exists = (
|
||||
media.downloaded and
|
||||
media.has_metadata and
|
||||
(
|
||||
media.media_file_exists or
|
||||
media.filepath.exists()
|
||||
)
|
||||
)
|
||||
if downloaded_file_exists:
|
||||
# Media has been marked as downloaded before the download_media task was fired,
|
||||
# skip it
|
||||
log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but '
|
||||
f'it has already been marked as downloaded, not downloading again')
|
||||
return
|
||||
max_cap_age = media.source.download_cap_date
|
||||
published = media.published
|
||||
if max_cap_age and published:
|
||||
@ -608,16 +619,7 @@ def download_media(media_id):
|
||||
log.warn(f'A permissions problem occured when writing the new media NFO file: {e.msg}')
|
||||
pass
|
||||
# Schedule a task to update media servers
|
||||
for mediaserver in MediaServer.objects.all():
|
||||
log.info(f'Scheduling media server updates')
|
||||
verbose_name = _('Request media server rescan for "{}"')
|
||||
rescan_media_server(
|
||||
str(mediaserver.pk),
|
||||
queue=str(media.source.pk),
|
||||
priority=0,
|
||||
verbose_name=verbose_name.format(mediaserver),
|
||||
remove_existing_tasks=True
|
||||
)
|
||||
schedule_media_servers_update()
|
||||
else:
|
||||
# Expected file doesn't exist on disk
|
||||
err = (f'Failed to download media: {media} (UUID: {media.pk}) to disk, '
|
||||
@ -630,22 +632,22 @@ def download_media(media_id):
|
||||
raise DownloadFailedException(err)
|
||||
|
||||
|
||||
@background(schedule=300, remove_existing_tasks=True)
|
||||
@background(schedule=dict(run_at=300), remove_existing_tasks=True)
|
||||
def rescan_media_server(mediaserver_id):
|
||||
'''
|
||||
Attempts to request a media rescan on a remote media server.
|
||||
'''
|
||||
try:
|
||||
mediaserver = MediaServer.objects.get(pk=mediaserver_id)
|
||||
except MediaServer.DoesNotExist:
|
||||
except MediaServer.DoesNotExist as e:
|
||||
# Task triggered but the media server no longer exists, do nothing
|
||||
return
|
||||
raise InvalidTaskError(_('no such server')) from e
|
||||
# Request an rescan / update
|
||||
log.info(f'Updating media server: {mediaserver}')
|
||||
mediaserver.update()
|
||||
|
||||
|
||||
@background(schedule=300, remove_existing_tasks=True)
|
||||
@background(schedule=dict(run_at=300), remove_existing_tasks=True)
|
||||
def save_all_media_for_source(source_id):
|
||||
'''
|
||||
Iterates all media items linked to a source and saves them to
|
||||
@ -655,11 +657,11 @@ def save_all_media_for_source(source_id):
|
||||
'''
|
||||
try:
|
||||
source = Source.objects.get(pk=source_id)
|
||||
except Source.DoesNotExist:
|
||||
except Source.DoesNotExist as e:
|
||||
# Task triggered but the source no longer exists, do nothing
|
||||
log.error(f'Task save_all_media_for_source(pk={source_id}) called but no '
|
||||
f'source exists with ID: {source_id}')
|
||||
return
|
||||
raise InvalidTaskError(_('no such source')) from e
|
||||
|
||||
already_saved = set()
|
||||
mqs = Media.objects.filter(source=source)
|
||||
@ -694,41 +696,41 @@ def save_all_media_for_source(source_id):
|
||||
# flags may need to be recalculated
|
||||
tvn_format = '2/{:,}' + f'/{mqs.count():,}'
|
||||
for mn, media in enumerate(mqs, start=1):
|
||||
update_task_status(task, tvn_format.format(mn))
|
||||
if media.uuid not in already_saved:
|
||||
update_task_status(task, tvn_format.format(mn))
|
||||
with atomic():
|
||||
media.save()
|
||||
# Reset task.verbose_name to the saved value
|
||||
update_task_status(task, None)
|
||||
|
||||
|
||||
@background(schedule=60, remove_existing_tasks=True)
|
||||
@background(schedule=dict(run_at=60), remove_existing_tasks=True)
|
||||
def rename_media(media_id):
|
||||
try:
|
||||
media = Media.objects.defer('metadata', 'thumb').get(pk=media_id)
|
||||
except Media.DoesNotExist:
|
||||
return
|
||||
except Media.DoesNotExist as e:
|
||||
raise InvalidTaskError(_('no such media')) from e
|
||||
media.rename_files()
|
||||
|
||||
|
||||
@background(schedule=300, remove_existing_tasks=True)
|
||||
@background(schedule=dict(run_at=300), remove_existing_tasks=True)
|
||||
@atomic(durable=True)
|
||||
def rename_all_media_for_source(source_id):
|
||||
try:
|
||||
source = Source.objects.get(pk=source_id)
|
||||
except Source.DoesNotExist:
|
||||
except Source.DoesNotExist as e:
|
||||
# Task triggered but the source no longer exists, do nothing
|
||||
log.error(f'Task rename_all_media_for_source(pk={source_id}) called but no '
|
||||
f'source exists with ID: {source_id}')
|
||||
return
|
||||
raise InvalidTaskError(_('no such source')) from e
|
||||
# Check that the settings allow renaming
|
||||
rename_sources_setting = settings.RENAME_SOURCES or list()
|
||||
rename_sources_setting = getattr(settings, 'RENAME_SOURCES', list())
|
||||
create_rename_tasks = (
|
||||
(
|
||||
source.directory and
|
||||
source.directory in rename_sources_setting
|
||||
) or
|
||||
settings.RENAME_ALL_SOURCES
|
||||
getattr(settings, 'RENAME_ALL_SOURCES', False)
|
||||
)
|
||||
if not create_rename_tasks:
|
||||
return
|
||||
@ -744,15 +746,15 @@ def rename_all_media_for_source(source_id):
|
||||
media.rename_files()
|
||||
|
||||
|
||||
@background(schedule=60, remove_existing_tasks=True)
|
||||
@background(schedule=dict(run_at=60), remove_existing_tasks=True)
|
||||
def wait_for_media_premiere(media_id):
|
||||
hours = lambda td: 1+int((24*td.days)+(td.seconds/(60*60)))
|
||||
|
||||
try:
|
||||
media = Media.objects.get(pk=media_id)
|
||||
except Media.DoesNotExist:
|
||||
return
|
||||
if media.metadata:
|
||||
except Media.DoesNotExist as e:
|
||||
raise InvalidTaskError(_('no such media')) from e
|
||||
if media.has_metadata:
|
||||
return
|
||||
now = timezone.now()
|
||||
if media.published < now:
|
||||
@ -764,17 +766,20 @@ def wait_for_media_premiere(media_id):
|
||||
media.manual_skip = True
|
||||
media.title = _(f'Premieres in {hours(media.published - now)} hours')
|
||||
media.save()
|
||||
task = get_media_premiere_task(media_id)
|
||||
if task:
|
||||
update_task_status(task, f'available in {hours(media.published - now)} hours')
|
||||
|
||||
@background(schedule=300, remove_existing_tasks=False)
|
||||
@background(schedule=dict(run_at=300), remove_existing_tasks=False)
|
||||
def delete_all_media_for_source(source_id, source_name):
|
||||
source = None
|
||||
try:
|
||||
source = Source.objects.get(pk=source_id)
|
||||
except Source.DoesNotExist:
|
||||
except Source.DoesNotExist as e:
|
||||
# Task triggered but the source no longer exists, do nothing
|
||||
log.error(f'Task delete_all_media_for_source(pk={source_id}) called but no '
|
||||
f'source exists with ID: {source_id}')
|
||||
pass
|
||||
raise InvalidTaskError(_('no such source')) from e
|
||||
mqs = Media.objects.all().defer(
|
||||
'metadata',
|
||||
).filter(
|
||||
|
Loading…
Reference in New Issue
Block a user