Merge branch 'main' into patch-10

This commit is contained in:
tcely 2025-03-19 01:38:31 -04:00 committed by GitHub
commit d31f29aa7c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 134 additions and 94 deletions

View File

@ -2,6 +2,7 @@
# check=error=true
ARG FFMPEG_VERSION="N"
ARG S6_VERSION="3.2.0.2"
ARG SHA256_S6_AMD64="59289456ab1761e277bd456a95e737c06b03ede99158beb24f12b165a904f478"

View File

@ -1202,7 +1202,8 @@ class Media(models.Model):
@property
def thumbnail(self):
return self.get_metadata_first_value('thumbnail', '')
default = f'https://i.ytimg.com/vi/{self.key}/maxresdefault.jpg'
return self.get_metadata_first_value('thumbnail', default)
@property
def name(self):

View File

@ -374,17 +374,3 @@ def media_post_delete(sender, instance, **kwargs):
log.info(f'Deleting file for: {instance} path: {file}')
delete_file(file)
if not instance.source.is_active:
return
# Schedule a task to update media servers
for mediaserver in MediaServer.objects.all():
log.info(f'Scheduling media server updates')
verbose_name = _('Request media server rescan for "{}"')
rescan_media_server(
str(mediaserver.pk),
schedule=5,
priority=0,
verbose_name=verbose_name.format(mediaserver),
remove_existing_tasks=True
)

View File

@ -115,27 +115,29 @@ def get_source_completed_tasks(source_id, only_errors=False):
q['failed_at__isnull'] = False
return CompletedTask.objects.filter(**q).order_by('-failed_at')
def get_tasks(task_name, id=None, /, instance=None):
assert not (id is None and instance is None)
arg = str(id or instance.pk)
return Task.objects.get_task(str(task_name), args=(arg,),)
def get_first_task(task_name, id=None, /, *, instance=None):
tqs = get_tasks(task_name, id, instance).order_by('run_at')
return tqs[0] if tqs.count() else False
def get_media_download_task(media_id):
try:
return Task.objects.get_task('sync.tasks.download_media',
args=(str(media_id),))[0]
except IndexError:
return False
return get_first_task('sync.tasks.download_media', media_id)
def get_media_metadata_task(media_id):
try:
return Task.objects.get_task('sync.tasks.download_media_metadata',
args=(str(media_id),))[0]
except IndexError:
return False
return get_first_task('sync.tasks.download_media_metadata', media_id)
def get_media_premiere_task(media_id):
try:
return Task.objects.get_task('sync.tasks.wait_for_media_premiere',
args=(str(media_id),))[0]
except IndexError:
return False
return get_first_task('sync.tasks.wait_for_media_premiere', media_id)
def get_source_check_task(source_id):
return get_first_task('sync.tasks.save_all_media_for_source', source_id)
def get_source_index_task(source_id):
return get_first_task('sync.tasks.index_source_task', source_id)
def delete_task_by_source(task_name, source_id):
now = timezone.now()
@ -160,24 +162,46 @@ def cleanup_completed_tasks():
CompletedTask.objects.filter(run_at__lt=delta).delete()
def schedule_media_servers_update():
with atomic():
# Schedule a task to update media servers
log.info(f'Scheduling media server updates')
verbose_name = _('Request media server rescan for "{}"')
for mediaserver in MediaServer.objects.all():
rescan_media_server(
str(mediaserver.pk),
priority=30,
verbose_name=verbose_name.format(mediaserver),
remove_existing_tasks=True,
)
def cleanup_old_media():
for source in Source.objects.filter(delete_old_media=True, days_to_keep__gt=0):
delta = timezone.now() - timedelta(days=source.days_to_keep)
for media in source.media_source.filter(downloaded=True, download_date__lt=delta):
log.info(f'Deleting expired media: {source} / {media} '
f'(now older than {source.days_to_keep} days / '
f'download_date before {delta})')
# .delete() also triggers a pre_delete signal that removes the files
media.delete()
with atomic():
for source in Source.objects.filter(delete_old_media=True, days_to_keep__gt=0):
delta = timezone.now() - timedelta(days=source.days_to_keep)
for media in source.media_source.filter(downloaded=True, download_date__lt=delta):
log.info(f'Deleting expired media: {source} / {media} '
f'(now older than {source.days_to_keep} days / '
f'download_date before {delta})')
with atomic():
# .delete() also triggers a pre_delete/post_delete signals that remove files
media.delete()
schedule_media_servers_update()
def cleanup_removed_media(source, videos):
if not source.delete_removed_media:
return
log.info(f'Cleaning up media no longer in source: {source}')
media_objects = Media.objects.filter(source=source)
for media in media_objects:
matching_source_item = [video['id'] for video in videos if video['id'] == media.key]
if not matching_source_item:
log.info(f'{media.name} is no longer in source, removing')
media.delete()
with atomic():
media.delete()
schedule_media_servers_update()
@background(schedule=300, remove_existing_tasks=True)
@ -185,11 +209,17 @@ def index_source_task(source_id):
'''
Indexes media available from a Source object.
'''
cleanup_completed_tasks()
# deleting expired media should happen any time an index task is requested
cleanup_old_media()
try:
source = Source.objects.get(pk=source_id)
except Source.DoesNotExist:
# Task triggered but the Source has been deleted, delete the task
return
# An inactive Source would return an empty list for videos anyway
if not source.is_active:
return
# Reset any errors
source.has_failed = False
source.save()
@ -203,56 +233,61 @@ def index_source_task(source_id):
# Got some media, update the last crawl timestamp
source.last_crawl = timezone.now()
source.save()
log.info(f'Found {len(videos)} media items for source: {source}')
num_videos = len(videos)
log.info(f'Found {num_videos} media items for source: {source}')
fields = lambda f, m: m.get_metadata_field(f)
with atomic(durable=True):
for video in videos:
# Create or update each video as a Media object
key = video.get(source.key_field, None)
if not key:
# Video has no unique key (ID), it can't be indexed
continue
try:
media = Media.objects.get(key=key, source=source)
except Media.DoesNotExist:
media = Media(key=key)
media.source = source
media.duration = float(video.get(fields('duration', media), None) or 0) or None
media.title = str(video.get(fields('title', media), ''))[:200]
timestamp = video.get(fields('timestamp', media), None)
published_dt = media.metadata_published(timestamp)
if published_dt is not None:
media.published = published_dt
try:
with atomic():
media.save()
except IntegrityError as e:
log.error(f'Index media failed: {source} / {media} with "{e}"')
else:
log.debug(f'Indexed media: {source} / {media}')
# log the new media instances
new_media_instance = (
media.created and
source.last_crawl and
media.created >= source.last_crawl
task = get_source_index_task(source_id)
if task:
verbose_name = task.verbose_name
tvn_format = '[{}' + f'/{num_videos}] {verbose_name}'
for vn, video in enumerate(videos, start=1):
# Create or update each video as a Media object
key = video.get(source.key_field, None)
if not key:
# Video has no unique key (ID), it can't be indexed
continue
try:
media = Media.objects.get(key=key, source=source)
except Media.DoesNotExist:
media = Media(key=key)
media.source = source
media.duration = float(video.get(fields('duration', media), None) or 0) or None
media.title = str(video.get(fields('title', media), ''))[:200]
timestamp = video.get(fields('timestamp', media), None)
published_dt = media.metadata_published(timestamp)
if published_dt is not None:
media.published = published_dt
if task:
task.verbose_name = tvn_format.format(vn)
with atomic():
task.save(update_fields={'verbose_name'})
try:
media.save()
except IntegrityError as e:
log.error(f'Index media failed: {source} / {media} with "{e}"')
else:
log.debug(f'Indexed media: {source} / {media}')
# log the new media instances
new_media_instance = (
media.created and
source.last_crawl and
media.created >= source.last_crawl
)
if new_media_instance:
log.info(f'Indexed new media: {source} / {media}')
log.info(f'Scheduling task to download metadata for: {media.url}')
verbose_name = _('Downloading metadata for "{}"')
download_media_metadata(
str(media.pk),
priority=20,
verbose_name=verbose_name.format(media.pk),
)
if new_media_instance:
log.info(f'Indexed new media: {source} / {media}')
log.info(f'Scheduling task to download metadata for: {media.url}')
verbose_name = _('Downloading metadata for "{}"')
download_media_metadata(
str(media.pk),
priority=20,
verbose_name=verbose_name.format(media.pk),
)
# Tack on a cleanup of old completed tasks
cleanup_completed_tasks()
with atomic(durable=True):
# Tack on a cleanup of old media
cleanup_old_media()
if source.delete_removed_media:
log.info(f'Cleaning up media no longer in source: {source}')
cleanup_removed_media(source, videos)
if task:
task.verbose_name = verbose_name
with atomic():
task.save(update_fields={'verbose_name'})
# Cleanup of media no longer available from the source
cleanup_removed_media(source, videos)
@background(schedule=0)
@ -422,8 +457,6 @@ def download_media_thumbnail(media_id, url):
except Media.DoesNotExist:
# Task triggered but the media no longer exists, do nothing
return
if not media.has_metadata:
raise NoMetadataException('Metadata is not yet available.')
if media.skip:
# Media was toggled to be skipped after the task was scheduled
log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but '
@ -609,6 +642,7 @@ def save_all_media_for_source(source_id):
already_saved = set()
mqs = Media.objects.filter(source=source)
task = get_source_check_task(source_id)
refresh_qs = mqs.filter(
can_download=False,
skip=False,
@ -616,22 +650,40 @@ def save_all_media_for_source(source_id):
downloaded=False,
metadata__isnull=False,
)
for media in refresh_qs:
if task:
verbose_name = task.verbose_name
tvn_format = '[{}' + f'/{refresh_qs.count()}] {verbose_name}'
for mn, media in enumerate(refresh_qs, start=1):
if task:
task.verbose_name = tvn_format.format(mn)
with atomic():
task.save(update_fields={'verbose_name'})
try:
media.refresh_formats
except YouTubeError as e:
log.debug(f'Failed to refresh formats for: {source} / {media.key}: {e!s}')
pass
else:
media.save()
with atomic():
media.save()
already_saved.add(media.uuid)
# Trigger the post_save signal for each media item linked to this source as various
# flags may need to be recalculated
with atomic():
for media in mqs:
if task:
tvn_format = '[{}' + f'/{mqs.count()}] {verbose_name}'
for mn, media in enumerate(mqs, start=1):
if task:
task.verbose_name = tvn_format.format(mn)
with atomic():
task.save(update_fields={'verbose_name'})
if media.uuid not in already_saved:
media.save()
with atomic():
media.save()
if task:
task.verbose_name = verbose_name
with atomic():
task.save(update_fields={'verbose_name'})
@background(schedule=60, remove_existing_tasks=True)