Merge branch 'meeb:main' into patch-10

This commit is contained in:
tcely 2025-04-07 06:19:29 -04:00 committed by GitHub
commit 3549bbf9f6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
32 changed files with 348 additions and 186 deletions

View File

@ -22,7 +22,9 @@ jobs:
if: ${{ !cancelled() && 'pull_request' != github.event_name }}
runs-on: ubuntu-latest
outputs:
ffmpeg-date: ${{ steps.jq.outputs.FFMPEG_DATE }}
ffmpeg-releases: ${{ steps.ffmpeg.outputs.releases }}
ffmpeg-version: ${{ steps.jq.outputs.FFMPEG_VERSION }}
lowercase-github-actor: ${{ steps.github-actor.outputs.lowercase }}
lowercase-github-repository_owner: ${{ steps.github-repository_owner.outputs.lowercase }}
ytdlp-latest-release: ${{ steps.yt-dlp.outputs.latest-release }}
@ -45,43 +47,11 @@ jobs:
- name: Retrieve yt-dlp/yt-dlp releases with GitHub CLI
id: yt-dlp
uses: ./.github/actions/yt-dlp
test:
if: ${{ !cancelled() && ( 'pull_request' != github.event_name || (! github.event.pull_request.draft) ) }}
runs-on: ubuntu-22.04
strategy:
fail-fast: false
matrix:
python-version: ['3.8', '3.9', '3.10', '3.11', '3.12']
steps:
- uses: actions/checkout@v4
- name: Install Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pipenv
pipenv install --system --skip-lock
- name: Set up Django environment
run: |
cp -v -p tubesync/tubesync/local_settings.py.example tubesync/tubesync/local_settings.py
cp -v -a -t "${Python3_ROOT_DIR}"/lib/python3.*/site-packages/background_task/ patches/background_task/*
cp -v -a -t "${Python3_ROOT_DIR}"/lib/python3.*/site-packages/yt_dlp/ patches/yt_dlp/*
- name: Run Django tests
run: cd tubesync && python3 manage.py test --verbosity=2
containerise:
if: ${{ !cancelled() && 'success' == needs.info.result }}
needs: ['info', 'test']
runs-on: ubuntu-latest
timeout-minutes: 120
steps:
- name: Set environment variables with jq
- name: Set outputs with jq
id: jq
run: |
cat >| .ffmpeg.releases.json <<'EOF'
${{ needs.info.outputs.ffmpeg-releases }}
${{ steps.ffmpeg.outputs.releases }}
EOF
mk_delim() { local f='%s_EOF_%d_' ; printf -- "${f}" "$1" "${RANDOM}" ; } ;
open_ml_var() { local f=''\%'s<<'\%'s\n' ; printf -- "${f}" "$2" "$1" ; } ;
@ -103,9 +73,42 @@ jobs:
jq -r --arg date "${ffmpeg_date}" "${jq_arg}" -- .ffmpeg.releases.json ;
close_ml_var "${delim}" "${var}" ;
unset -v delim jq_arg var ;
} >> "${GITHUB_ENV}"
cat -v "${GITHUB_ENV}"
} >> "${GITHUB_OUTPUT}"
cat -v "${GITHUB_OUTPUT}"
rm -v -f .ffmpeg.releases.json
test:
if: ${{ !cancelled() && ( 'pull_request' != github.event_name || (! github.event.pull_request.draft) ) }}
runs-on: ubuntu-22.04
strategy:
fail-fast: false
matrix:
python-version: ['3.8', '3.9', '3.10', '3.11', '3.12']
steps:
- uses: actions/checkout@v4
- name: Install Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pipenv
PIPENV_VERBOSITY=64 pipenv install --system --skip-lock
- name: Set up Django environment
run: |
cp -v -p tubesync/tubesync/local_settings.py.example tubesync/tubesync/local_settings.py
cp -v -a -t "${Python3_ROOT_DIR}"/lib/python3.*/site-packages/background_task/ patches/background_task/*
cp -v -a -t "${Python3_ROOT_DIR}"/lib/python3.*/site-packages/yt_dlp/ patches/yt_dlp/*
- name: Run Django tests
run: cd tubesync && python3 manage.py test --verbosity=2
containerise:
if: ${{ !cancelled() && 'success' == needs.info.result }}
needs: ['info', 'test']
runs-on: ubuntu-latest
timeout-minutes: 120
steps:
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
@ -117,7 +120,31 @@ jobs:
DOCKER_USERNAME: ${{ github.actor }}
DOCKER_TOKEN: ${{ 'meeb' == github.repository_owner && secrets.REGISTRY_ACCESS_TOKEN || secrets.GITHUB_TOKEN }}
run: echo "${DOCKER_TOKEN}" | docker login --password-stdin --username "${DOCKER_USERNAME}" "${DOCKER_REGISTRY}"
- name: Build image for `dive`
id: build-dive-image
uses: docker/build-push-action@v6
with:
build-args: |
IMAGE_NAME=${{ env.IMAGE_NAME }}
FFMPEG_DATE=${{ needs.info.outputs.ffmpeg-date }}
FFMPEG_VERSION=${{ needs.info.outputs.ffmpeg-version }}
YTDLP_DATE=${{ fromJSON(needs.info.outputs.ytdlp-latest-release).tag.name }}
cache-from: type=gha
load: true
platforms: linux/amd64
push: false
tags: ghcr.io/${{ needs.info.outputs.lowercase-github-actor }}/${{ env.IMAGE_NAME }}:dive
- name: Analysis with `dive`
run: |
docker run --rm \
-v /var/run/docker.sock:/var/run/docker.sock \
'ghcr.io/wagoodman/dive' \
'ghcr.io/${{ needs.info.outputs.lowercase-github-actor }}/${{ env.IMAGE_NAME }}:dive' \
--ci \
--highestUserWastedPercent '0.03' \
--highestWastedBytes '10M'
- name: Build and push
id: build-push
timeout-minutes: 60
uses: docker/build-push-action@v6
with:
@ -133,6 +160,6 @@ jobs:
${{ 'meeb' == github.repository_owner && 'pull_request' != github.event_name && 'type=inline' || '' }}
build-args: |
IMAGE_NAME=${{ env.IMAGE_NAME }}
FFMPEG_DATE=${{ env.FFMPEG_DATE }}
FFMPEG_VERSION=${{ env.FFMPEG_VERSION }}
FFMPEG_DATE=${{ needs.info.outputs.ffmpeg-date }}
FFMPEG_VERSION=${{ needs.info.outputs.ffmpeg-version }}
YTDLP_DATE=${{ fromJSON(needs.info.outputs.ytdlp-latest-release).tag.name }}

View File

@ -47,7 +47,8 @@ RUN --mount=type=cache,id=apt-lib-cache-${TARGETARCH},sharing=private,target=/va
locale-gen en_US.UTF-8 && \
# Clean up
apt-get -y autopurge && \
apt-get -y autoclean
apt-get -y autoclean && \
rm -f /var/cache/debconf/*.dat-old
FROM alpine:${ALPINE_VERSION} AS ffmpeg-download
ARG FFMPEG_DATE
@ -289,7 +290,8 @@ RUN --mount=type=cache,id=apt-lib-cache-${TARGETARCH},sharing=private,target=/va
useradd -M -d /app -s /bin/false -g app app && \
# Clean up
apt-get -y autopurge && \
apt-get -y autoclean
apt-get -y autoclean && \
rm -v -f /var/cache/debconf/*.dat-old
# Install third party software
COPY --from=s6-overlay / /
@ -310,7 +312,8 @@ RUN --mount=type=cache,id=apt-lib-cache-${TARGETARCH},sharing=private,target=/va
apt-get -y autoremove --purge file && \
# Clean up
apt-get -y autopurge && \
apt-get -y autoclean
apt-get -y autoclean && \
rm -v -f /var/cache/debconf/*.dat-old
# Switch workdir to the the app
WORKDIR /app
@ -362,6 +365,7 @@ RUN --mount=type=tmpfs,target=/cache \
&& \
apt-get -y autopurge && \
apt-get -y autoclean && \
rm -v -f /var/cache/debconf/*.dat-old && \
rm -v -rf /tmp/*
# Copy root

View File

@ -7,7 +7,7 @@ verify_ssl = true
autopep8 = "*"
[packages]
django = "*"
django = "<5.2"
django-sass-processor = {extras = ["management-command"], version = "*"}
pillow = "*"
whitenoise = "*"

View File

@ -0,0 +1,5 @@
#!/command/with-contenv bash
exec nice -n "${TUBESYNC_NICE:-1}" s6-setuidgid app \
/usr/bin/python3 /app/manage.py process_tasks \
--queue database

View File

@ -0,0 +1 @@
gunicorn

View File

@ -0,0 +1 @@
SIGINT

View File

@ -0,0 +1,5 @@
#!/command/with-contenv bash
exec nice -n "${TUBESYNC_NICE:-1}" s6-setuidgid app \
/usr/bin/python3 /app/manage.py process_tasks \
--queue filesystem

View File

@ -0,0 +1 @@
longrun

View File

@ -0,0 +1 @@
gunicorn

View File

@ -0,0 +1 @@
SIGINT

View File

@ -1,4 +1,5 @@
#!/command/with-contenv bash
exec nice -n "${TUBESYNC_NICE:-1}" s6-setuidgid app \
/usr/bin/python3 /app/manage.py process_tasks
/usr/bin/python3 /app/manage.py process_tasks \
--queue network

View File

@ -0,0 +1 @@
longrun

View File

@ -0,0 +1,124 @@
# -*- coding: utf-8 -*-
import logging
import random
import sys
import time
from django import VERSION
from django.core.management.base import BaseCommand
from django.utils import autoreload
from background_task.tasks import tasks, autodiscover
from background_task.utils import SignalManager
from django.db import close_old_connections as close_connection
logger = logging.getLogger(__name__)
def _configure_log_std():
class StdOutWrapper(object):
def write(self, s):
logger.info(s)
class StdErrWrapper(object):
def write(self, s):
logger.error(s)
sys.stdout = StdOutWrapper()
sys.stderr = StdErrWrapper()
class Command(BaseCommand):
help = 'Run tasks that are scheduled to run on the queue'
# Command options are specified in an abstract way to enable Django < 1.8 compatibility
OPTIONS = (
(('--duration', ), {
'action': 'store',
'dest': 'duration',
'type': int,
'default': 0,
'help': 'Run task for this many seconds (0 or less to run forever) - default is 0',
}),
(('--sleep', ), {
'action': 'store',
'dest': 'sleep',
'type': float,
'default': 5.0,
'help': 'Sleep for this many seconds before checking for new tasks (if none were found) - default is 5',
}),
(('--queue', ), {
'action': 'store',
'dest': 'queue',
'help': 'Only process tasks on this named queue',
}),
(('--log-std', ), {
'action': 'store_true',
'dest': 'log_std',
'help': 'Redirect stdout and stderr to the logging system',
}),
(('--dev', ), {
'action': 'store_true',
'dest': 'dev',
'help': 'Auto-reload your code on changes. Use this only for development',
}),
)
if VERSION < (1, 8):
from optparse import make_option
option_list = BaseCommand.option_list + tuple([make_option(*args, **kwargs) for args, kwargs in OPTIONS])
# Used in Django >= 1.8
def add_arguments(self, parser):
for (args, kwargs) in self.OPTIONS:
parser.add_argument(*args, **kwargs)
def __init__(self, *args, **kwargs):
super(Command, self).__init__(*args, **kwargs)
self.sig_manager = None
self._tasks = tasks
def run(self, *args, **options):
duration = options.get('duration', 0)
sleep = options.get('sleep', 5.0)
queue = options.get('queue', None)
log_std = options.get('log_std', False)
is_dev = options.get('dev', False)
sig_manager = self.sig_manager
if is_dev:
# raise last Exception is exist
autoreload.raise_last_exception()
if log_std:
_configure_log_std()
autodiscover()
start_time = time.time()
while (duration <= 0) or (time.time() - start_time) <= duration:
if sig_manager.kill_now:
# shutting down gracefully
break
if not self._tasks.run_next_task(queue):
# there were no tasks in the queue, let's recover.
close_connection()
logger.debug('waiting for tasks')
time.sleep(sleep)
else:
# there were some tasks to process, let's check if there is more work to do after a little break.
time.sleep(random.uniform(sig_manager.time_to_wait[0], sig_manager.time_to_wait[1]))
self._tasks._pool_runner._pool.close()
def handle(self, *args, **options):
is_dev = options.get('dev', False)
self.sig_manager = SignalManager()
if is_dev:
reload_func = autoreload.run_with_reloader
if VERSION < (2, 2):
reload_func = autoreload.main
reload_func(self.run, *args, **options)
else:
self.run(*args, **options)

0
tubesync/full_playlist.sh Normal file → Executable file
View File

View File

@ -160,6 +160,12 @@ class SponsorBlock_Category(models.TextChoices):
MUSIC_OFFTOPIC = 'music_offtopic', _( 'Non-Music Section' )
class TaskQueue(models.TextChoices):
DB = 'database', _('Database')
FS = 'filesystem', _('Filesystem')
NET = 'network', _('Networking')
class YouTube_SourceType(models.TextChoices):
CHANNEL = 'c', _('YouTube channel')
CHANNEL_ID = 'i', _('YouTube channel by ID')

View File

@ -6,7 +6,7 @@ from django.db.models import signals
from common.logger import log
from sync.models import Source, Media, MediaServer
from sync.signals import media_post_delete
from sync.tasks import rescan_media_server
from sync.tasks import schedule_media_servers_update
class Command(BaseCommand):
@ -37,15 +37,6 @@ class Command(BaseCommand):
log.info(f'Source directory: {source.directory_path}')
source.delete()
# Update any media servers
for mediaserver in MediaServer.objects.all():
log.info(f'Scheduling media server updates')
verbose_name = _('Request media server rescan for "{}"')
rescan_media_server(
str(mediaserver.pk),
priority=0,
schedule=30,
verbose_name=verbose_name.format(mediaserver),
remove_existing_tasks=True
)
schedule_media_servers_update()
# All done
log.info('Done')

View File

@ -1,4 +1,5 @@
from django.core.management.base import BaseCommand, CommandError
from django.db.transaction import atomic
from django.utils.translation import gettext_lazy as _
from background_task.models import Task
from sync.models import Source
@ -12,6 +13,7 @@ class Command(BaseCommand):
help = 'Resets all tasks'
@atomic(durable=True)
def handle(self, *args, **options):
log.info('Resettings all tasks...')
# Delete all tasks
@ -24,8 +26,6 @@ class Command(BaseCommand):
index_source_task(
str(source.pk),
repeat=source.index_schedule,
queue=str(source.pk),
priority=10,
verbose_name=verbose_name.format(source.name)
)
# This also chains down to call each Media objects .save() as well

View File

@ -236,7 +236,7 @@ def get_best_video_format(media):
break
if not best_match:
for fmt in video_formats:
# Check for codec and resolution match bot drop 60fps
# Check for codec and resolution match but drop 60fps
if (source_resolution == fmt['format'] and
source_vcodec == fmt['vcodec'] and
not fmt['is_hdr']):
@ -294,7 +294,7 @@ def get_best_video_format(media):
break
if not best_match:
for fmt in video_formats:
# Check for codec and resolution match bot drop hdr
# Check for codec and resolution match but drop hdr
if (source_resolution == fmt['format'] and
source_vcodec == fmt['vcodec'] and
not fmt['is_60fps']):

View File

@ -237,7 +237,7 @@ class Source(models.Model):
_('source video codec'),
max_length=8,
db_index=True,
choices=list(reversed(YouTube_VideoCodec.choices[1:])),
choices=list(reversed(YouTube_VideoCodec.choices)),
default=YouTube_VideoCodec.VP9,
help_text=_('Source video codec, desired video encoding format to download (ignored if "resolution" is audio only)')
)

View File

@ -30,6 +30,8 @@ def source_pre_save(sender, instance, **kwargs):
log.debug(f'source_pre_save signal: no existing source: {sender} - {instance}')
return
args = ( str(instance.pk), )
check_source_directory_exists.now(*args)
existing_dirpath = existing_source.directory_path.resolve(strict=True)
new_dirpath = instance.directory_path.resolve(strict=False)
if existing_dirpath != new_dirpath:
@ -90,12 +92,9 @@ def source_pre_save(sender, instance, **kwargs):
verbose_name = _('Index media from source "{}"')
index_source_task(
str(instance.pk),
schedule=instance.index_schedule,
repeat=instance.index_schedule,
queue=str(instance.pk),
priority=10,
schedule=instance.index_schedule,
verbose_name=verbose_name.format(instance.name),
remove_existing_tasks=True
)
@ -106,14 +105,12 @@ def source_post_save(sender, instance, created, **kwargs):
verbose_name = _('Check download directory exists for source "{}"')
check_source_directory_exists(
str(instance.pk),
priority=0,
verbose_name=verbose_name.format(instance.name)
verbose_name=verbose_name.format(instance.name),
)
if instance.source_type != Val(YouTube_SourceType.PLAYLIST) and instance.copy_channel_images:
download_source_images(
str(instance.pk),
priority=5,
verbose_name=verbose_name.format(instance.name)
verbose_name=verbose_name.format(instance.name),
)
if instance.index_schedule > 0:
delete_task_by_source('sync.tasks.index_source_task', instance.pk)
@ -121,20 +118,15 @@ def source_post_save(sender, instance, created, **kwargs):
verbose_name = _('Index media from source "{}"')
index_source_task(
str(instance.pk),
schedule=600,
repeat=instance.index_schedule,
queue=str(instance.pk),
priority=10,
schedule=600,
verbose_name=verbose_name.format(instance.name),
remove_existing_tasks=True
)
verbose_name = _('Checking all media for source "{}"')
save_all_media_for_source(
str(instance.pk),
priority=25,
verbose_name=verbose_name.format(instance.name),
remove_existing_tasks=True
)
@ -155,7 +147,6 @@ def source_pre_delete(sender, instance, **kwargs):
delete_all_media_for_source(
str(instance.pk),
str(instance.name),
priority=1,
verbose_name=verbose_name.format(instance.name),
)
# Try to do it all immediately
@ -242,10 +233,7 @@ def media_post_save(sender, instance, created, **kwargs):
verbose_name = _('Renaming media for: {}: "{}"')
rename_media(
str(media.pk),
queue=str(media.pk),
priority=20,
verbose_name=verbose_name.format(media.key, media.name),
remove_existing_tasks=True
)
# If the media is missing metadata schedule it to be downloaded
@ -254,9 +242,7 @@ def media_post_save(sender, instance, created, **kwargs):
verbose_name = _('Downloading metadata for "{}"')
download_media_metadata(
str(instance.pk),
priority=20,
verbose_name=verbose_name.format(instance.pk),
remove_existing_tasks=True
)
# If the media is missing a thumbnail schedule it to be downloaded (unless we are skipping this media)
if not instance.thumb_file_exists:
@ -270,10 +256,7 @@ def media_post_save(sender, instance, created, **kwargs):
download_media_thumbnail(
str(instance.pk),
thumbnail_url,
queue=str(instance.source.pk),
priority=15,
verbose_name=verbose_name.format(instance.name),
remove_existing_tasks=True
)
# If the media has not yet been downloaded schedule it to be downloaded
if not (instance.media_file_exists or instance.filepath.exists() or existing_media_download_task):
@ -287,10 +270,7 @@ def media_post_save(sender, instance, created, **kwargs):
verbose_name = _('Downloading media for "{}"')
download_media(
str(instance.pk),
queue=str(instance.source.pk),
priority=15,
verbose_name=verbose_name.format(instance.name),
remove_existing_tasks=True
)
# Save the instance if any changes were required
if skip_changed or can_download_changed:

View File

@ -21,10 +21,12 @@ from django.db.transaction import atomic
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from background_task import background
from background_task.exceptions import InvalidTaskError
from background_task.models import Task, CompletedTask
from common.logger import log
from common.errors import NoMediaException, NoMetadataException, DownloadFailedException
from common.utils import json_serial, remove_enclosed
from .choices import Val, TaskQueue
from .models import Source, Media, MediaServer
from .utils import (get_remote_image, resize_image_to_height, delete_file,
write_text_file, filter_response)
@ -123,7 +125,8 @@ def update_task_status(task, status):
except DatabaseError as e:
if 'Save with update_fields did not affect any rows.' == str(e):
pass
raise
else:
raise
return True
@ -131,11 +134,12 @@ def get_source_completed_tasks(source_id, only_errors=False):
'''
Returns a queryset of CompletedTask objects for a source by source ID.
'''
q = {'queue': source_id}
q = {'task_params__istartswith': f'[["{source_id}"'}
if only_errors:
q['failed_at__isnull'] = False
return CompletedTask.objects.filter(**q).order_by('-failed_at')
def get_tasks(task_name, id=None, /, instance=None):
assert not (id is None and instance is None)
arg = str(id or instance.pk)
@ -160,10 +164,15 @@ def get_source_check_task(source_id):
def get_source_index_task(source_id):
return get_first_task('sync.tasks.index_source_task', source_id)
def delete_task_by_source(task_name, source_id):
now = timezone.now()
unlocked = Task.objects.unlocked(now)
return unlocked.filter(task_name=task_name, queue=str(source_id)).delete()
qs = unlocked.filter(
task_name=task_name,
task_params__istartswith=f'[["{source_id}"',
)
return qs.delete()
def delete_task_by_media(task_name, args):
@ -183,6 +192,13 @@ def cleanup_completed_tasks():
CompletedTask.objects.filter(run_at__lt=delta).delete()
@atomic(durable=False)
def migrate_queues():
tqs = Task.objects.all()
qs = tqs.exclude(queue__in=TaskQueue.values)
return qs.update(queue=Val(TaskQueue.NET))
def schedule_media_servers_update():
with atomic():
# Schedule a task to update media servers
@ -191,7 +207,7 @@ def schedule_media_servers_update():
for mediaserver in MediaServer.objects.all():
rescan_media_server(
str(mediaserver.pk),
priority=30,
priority=10,
verbose_name=verbose_name.format(mediaserver),
remove_existing_tasks=True,
)
@ -225,7 +241,7 @@ def cleanup_removed_media(source, videos):
schedule_media_servers_update()
@background(schedule=300, remove_existing_tasks=True)
@background(schedule=dict(priority=10, run_at=30), queue=Val(TaskQueue.NET), remove_existing_tasks=True)
def index_source_task(source_id):
'''
Indexes media available from a Source object.
@ -235,18 +251,20 @@ def index_source_task(source_id):
cleanup_old_media()
try:
source = Source.objects.get(pk=source_id)
except Source.DoesNotExist:
except Source.DoesNotExist as e:
# Task triggered but the Source has been deleted, delete the task
return
raise InvalidTaskError(_('no such source')) from e
# An inactive Source would return an empty list for videos anyway
if not source.is_active:
return
# Reset any errors
# TODO: determine if this affects anything
source.has_failed = False
source.save()
# Index the source
videos = source.index_media()
if not videos:
# TODO: Record this error in source.has_failed ?
raise NoMediaException(f'Source "{source}" (ID: {source_id}) returned no '
f'media to index, is the source key valid? Check the '
f'source configuration is correct and that the source '
@ -310,7 +328,7 @@ def index_source_task(source_id):
cleanup_removed_media(source, videos)
@background(schedule=0)
@background(schedule=dict(priority=0, run_at=0), queue=Val(TaskQueue.FS))
def check_source_directory_exists(source_id):
'''
Checks the output directory for a source exists and is writable, if it does
@ -319,17 +337,17 @@ def check_source_directory_exists(source_id):
'''
try:
source = Source.objects.get(pk=source_id)
except Source.DoesNotExist:
except Source.DoesNotExist as e:
# Task triggered but the Source has been deleted, delete the task
return
raise InvalidTaskError(_('no such source')) from e
# Check the source output directory exists
if not source.directory_exists():
# Try and create it
# Try to create it
log.info(f'Creating directory: {source.directory_path}')
source.make_directory()
@background(schedule=0)
@background(schedule=dict(priority=5, run_at=10), queue=Val(TaskQueue.NET))
def download_source_images(source_id):
'''
Downloads an image and save it as a local thumbnail attached to a
@ -337,11 +355,11 @@ def download_source_images(source_id):
'''
try:
source = Source.objects.get(pk=source_id)
except Source.DoesNotExist:
except Source.DoesNotExist as e:
# Task triggered but the source no longer exists, do nothing
log.error(f'Task download_source_images(pk={source_id}) called but no '
f'source exists with ID: {source_id}')
return
raise InvalidTaskError(_('no such source')) from e
avatar, banner = source.get_image_url
log.info(f'Thumbnail URL for source with ID: {source_id} / {source} '
f'Avatar: {avatar} '
@ -379,18 +397,18 @@ def download_source_images(source_id):
log.info(f'Thumbnail downloaded for source with ID: {source_id} / {source}')
@background(schedule=60, remove_existing_tasks=True)
@background(schedule=dict(priority=20, run_at=60), queue=Val(TaskQueue.NET), remove_existing_tasks=True)
def download_media_metadata(media_id):
'''
Downloads the metadata for a media item.
'''
try:
media = Media.objects.get(pk=media_id)
except Media.DoesNotExist:
except Media.DoesNotExist as e:
# Task triggered but the media no longer exists, do nothing
log.error(f'Task download_media_metadata(pk={media_id}) called but no '
f'media exists with ID: {media_id}')
return
raise InvalidTaskError(_('no such media')) from e
if media.manual_skip:
log.info(f'Task for ID: {media_id} / {media} skipped, due to task being manually skipped.')
return
@ -428,12 +446,9 @@ def download_media_metadata(media_id):
verbose_name = _('Waiting for the premiere of "{}" at: {}')
wait_for_media_premiere(
str(media.pk),
priority=0,
queue=str(media.pk),
repeat=Task.HOURLY,
repeat_until = published_datetime + timedelta(hours=1),
verbose_name=verbose_name.format(media.key, published_datetime.isoformat(' ', 'seconds')),
remove_existing_tasks=True,
)
raise_exception = False
if raise_exception:
@ -466,7 +481,7 @@ def download_media_metadata(media_id):
f'{source} / {media}: {media_id}')
@background(schedule=60, remove_existing_tasks=True)
@background(schedule=dict(priority=15, run_at=10), queue=Val(TaskQueue.NET), remove_existing_tasks=True)
def download_media_thumbnail(media_id, url):
'''
Downloads an image from a URL and save it as a local thumbnail attached to a
@ -474,10 +489,10 @@ def download_media_thumbnail(media_id, url):
'''
try:
media = Media.objects.get(pk=media_id)
except Media.DoesNotExist:
except Media.DoesNotExist as e:
# Task triggered but the media no longer exists, do nothing
return
if media.skip:
raise InvalidTaskError(_('no such media')) from e
if media.skip or media.manual_skip:
# Media was toggled to be skipped after the task was scheduled
log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but '
f'it is now marked to be skipped, not downloading thumbnail')
@ -504,38 +519,43 @@ def download_media_thumbnail(media_id, url):
return True
@background(schedule=60, remove_existing_tasks=True)
@background(schedule=dict(priority=15, run_at=60), queue=Val(TaskQueue.NET), remove_existing_tasks=True)
def download_media(media_id):
'''
Downloads the media to disk and attaches it to the Media instance.
'''
try:
media = Media.objects.get(pk=media_id)
except Media.DoesNotExist:
except Media.DoesNotExist as e:
# Task triggered but the media no longer exists, do nothing
return
if not media.has_metadata:
raise NoMetadataException('Metadata is not yet available.')
if media.skip:
# Media was toggled to be skipped after the task was scheduled
log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but '
f'it is now marked to be skipped, not downloading')
return
downloaded_file_exists = (
media.media_file_exists or
media.filepath.exists()
)
if media.downloaded and downloaded_file_exists:
# Media has been marked as downloaded before the download_media task was fired,
# skip it
log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but '
f'it has already been marked as downloaded, not downloading again')
return
raise InvalidTaskError(_('no such media')) from e
if not media.source.download_media:
log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but '
f'the source {media.source} has since been marked to not download, '
f'not downloading')
return
if media.skip or media.manual_skip:
# Media was toggled to be skipped after the task was scheduled
log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but '
f'it is now marked to be skipped, not downloading')
return
# metadata is required to generate the proper filepath
if not media.has_metadata:
raise NoMetadataException('Metadata is not yet available.')
downloaded_file_exists = (
media.downloaded and
media.has_metadata and
(
media.media_file_exists or
media.filepath.exists()
)
)
if downloaded_file_exists:
# Media has been marked as downloaded before the download_media task was fired,
# skip it
log.warn(f'Download task triggered for media: {media} (UUID: {media.pk}) but '
f'it has already been marked as downloaded, not downloading again')
return
max_cap_age = media.source.download_cap_date
published = media.published
if max_cap_age and published:
@ -608,16 +628,7 @@ def download_media(media_id):
log.warn(f'A permissions problem occured when writing the new media NFO file: {e.msg}')
pass
# Schedule a task to update media servers
for mediaserver in MediaServer.objects.all():
log.info(f'Scheduling media server updates')
verbose_name = _('Request media server rescan for "{}"')
rescan_media_server(
str(mediaserver.pk),
queue=str(media.source.pk),
priority=0,
verbose_name=verbose_name.format(mediaserver),
remove_existing_tasks=True
)
schedule_media_servers_update()
else:
# Expected file doesn't exist on disk
err = (f'Failed to download media: {media} (UUID: {media.pk}) to disk, '
@ -630,22 +641,22 @@ def download_media(media_id):
raise DownloadFailedException(err)
@background(schedule=300, remove_existing_tasks=True)
@background(schedule=dict(priority=0, run_at=30), queue=Val(TaskQueue.NET), remove_existing_tasks=True)
def rescan_media_server(mediaserver_id):
'''
Attempts to request a media rescan on a remote media server.
'''
try:
mediaserver = MediaServer.objects.get(pk=mediaserver_id)
except MediaServer.DoesNotExist:
except MediaServer.DoesNotExist as e:
# Task triggered but the media server no longer exists, do nothing
return
raise InvalidTaskError(_('no such server')) from e
# Request an rescan / update
log.info(f'Updating media server: {mediaserver}')
mediaserver.update()
@background(schedule=300, remove_existing_tasks=True)
@background(schedule=dict(priority=25, run_at=600), queue=Val(TaskQueue.NET), remove_existing_tasks=True)
def save_all_media_for_source(source_id):
'''
Iterates all media items linked to a source and saves them to
@ -655,11 +666,11 @@ def save_all_media_for_source(source_id):
'''
try:
source = Source.objects.get(pk=source_id)
except Source.DoesNotExist:
except Source.DoesNotExist as e:
# Task triggered but the source no longer exists, do nothing
log.error(f'Task save_all_media_for_source(pk={source_id}) called but no '
f'source exists with ID: {source_id}')
return
raise InvalidTaskError(_('no such source')) from e
already_saved = set()
mqs = Media.objects.filter(source=source)
@ -694,41 +705,41 @@ def save_all_media_for_source(source_id):
# flags may need to be recalculated
tvn_format = '2/{:,}' + f'/{mqs.count():,}'
for mn, media in enumerate(mqs, start=1):
update_task_status(task, tvn_format.format(mn))
if media.uuid not in already_saved:
update_task_status(task, tvn_format.format(mn))
with atomic():
media.save()
# Reset task.verbose_name to the saved value
update_task_status(task, None)
@background(schedule=60, remove_existing_tasks=True)
@background(schedule=dict(priority=20, run_at=60), queue=Val(TaskQueue.FS), remove_existing_tasks=True)
def rename_media(media_id):
try:
media = Media.objects.defer('metadata', 'thumb').get(pk=media_id)
except Media.DoesNotExist:
return
except Media.DoesNotExist as e:
raise InvalidTaskError(_('no such media')) from e
media.rename_files()
@background(schedule=300, remove_existing_tasks=True)
@background(schedule=dict(priority=20, run_at=300), queue=Val(TaskQueue.FS), remove_existing_tasks=True)
@atomic(durable=True)
def rename_all_media_for_source(source_id):
try:
source = Source.objects.get(pk=source_id)
except Source.DoesNotExist:
except Source.DoesNotExist as e:
# Task triggered but the source no longer exists, do nothing
log.error(f'Task rename_all_media_for_source(pk={source_id}) called but no '
f'source exists with ID: {source_id}')
return
raise InvalidTaskError(_('no such source')) from e
# Check that the settings allow renaming
rename_sources_setting = settings.RENAME_SOURCES or list()
rename_sources_setting = getattr(settings, 'RENAME_SOURCES', list())
create_rename_tasks = (
(
source.directory and
source.directory in rename_sources_setting
) or
settings.RENAME_ALL_SOURCES
getattr(settings, 'RENAME_ALL_SOURCES', False)
)
if not create_rename_tasks:
return
@ -744,15 +755,15 @@ def rename_all_media_for_source(source_id):
media.rename_files()
@background(schedule=60, remove_existing_tasks=True)
@background(schedule=dict(priority=0, run_at=60), queue=Val(TaskQueue.DB), remove_existing_tasks=True)
def wait_for_media_premiere(media_id):
hours = lambda td: 1+int((24*td.days)+(td.seconds/(60*60)))
try:
media = Media.objects.get(pk=media_id)
except Media.DoesNotExist:
return
if media.metadata:
except Media.DoesNotExist as e:
raise InvalidTaskError(_('no such media')) from e
if media.has_metadata:
return
now = timezone.now()
if media.published < now:
@ -764,17 +775,20 @@ def wait_for_media_premiere(media_id):
media.manual_skip = True
media.title = _(f'Premieres in {hours(media.published - now)} hours')
media.save()
task = get_media_premiere_task(media_id)
if task:
update_task_status(task, f'available in {hours(media.published - now)} hours')
@background(schedule=300, remove_existing_tasks=False)
@background(schedule=dict(priority=1, run_at=300), queue=Val(TaskQueue.FS), remove_existing_tasks=False)
def delete_all_media_for_source(source_id, source_name):
source = None
try:
source = Source.objects.get(pk=source_id)
except Source.DoesNotExist:
except Source.DoesNotExist as e:
# Task triggered but the source no longer exists, do nothing
log.error(f'Task delete_all_media_for_source(pk={source_id}) called but no '
f'source exists with ID: {source_id}')
pass
raise InvalidTaskError(_('no such source')) from e
mqs = Media.objects.all().defer(
'metadata',
).filter(

View File

@ -17,14 +17,14 @@
{% if task.has_error %}
<span class="collection-item">
<i class="fas fa-exclamation-triangle"></i> <strong>{{ task.verbose_name }}</strong><br>
Source: &quot;{{ task.queue }}&quot;<br>
Queue: &quot;{{ task.queue }}&quot;<br>
Error: &quot;{{ task.error_message }}&quot;<br>
<i class="far fa-clock"></i> Task ran at <strong>{{ task.run_at|date:'Y-m-d H:i:s' }}</strong>
</span>
{% else %}
<span class="collection-item">
<i class="fas fa-check"></i> <strong>{{ task.verbose_name }}</strong><br>
Source: &quot;{{ task.queue }}&quot;<br>
Queue: &quot;{{ task.queue }}&quot;<br>
<i class="far fa-clock"></i> Task ran at <strong>{{ task.run_at|date:'Y-m-d H:i:s' }}</strong>
</span>
{% endif %}

View File

@ -20,7 +20,7 @@ from .tasks import cleanup_old_media, check_source_directory_exists
from .filtering import filter_media
from .utils import filter_response
from .choices import (Val, Fallback, IndexSchedule, SourceResolution,
YouTube_AudioCodec, YouTube_VideoCodec,
TaskQueue, YouTube_AudioCodec, YouTube_VideoCodec,
YouTube_SourceType, youtube_long_source_types)
@ -138,7 +138,7 @@ class FrontEndTestCase(TestCase):
else:
# Invalid source tests should reload the page with an error
self.assertEqual(response.status_code, 200)
self.assertIn('<ul class="errorlist">',
self.assertIn('<ul class="errorlist"',
response.content.decode())
def test_add_source_prepopulation(self):
@ -211,7 +211,7 @@ class FrontEndTestCase(TestCase):
source_uuid = str(source.pk)
task = Task.objects.get_task('sync.tasks.index_source_task',
args=(source_uuid,))[0]
self.assertEqual(task.queue, source_uuid)
self.assertEqual(task.queue, Val(TaskQueue.NET))
# Run the check_source_directory_exists task
check_source_directory_exists.now(source_uuid)
# Check the source is now on the source overview page
@ -420,8 +420,7 @@ class FrontEndTestCase(TestCase):
found_download_task1 = False
found_download_task2 = False
found_download_task3 = False
q = {'queue': str(test_source.pk),
'task_name': 'sync.tasks.download_media_thumbnail'}
q = {'task_name': 'sync.tasks.download_media_thumbnail'}
for task in Task.objects.filter(**q):
if test_media1_pk in task.task_params:
found_thumbnail_task1 = True
@ -429,8 +428,7 @@ class FrontEndTestCase(TestCase):
found_thumbnail_task2 = True
if test_media3_pk in task.task_params:
found_thumbnail_task3 = True
q = {'queue': str(test_source.pk),
'task_name': 'sync.tasks.download_media'}
q = {'task_name': 'sync.tasks.download_media'}
for task in Task.objects.filter(**q):
if test_media1_pk in task.task_params:
found_download_task1 = True

View File

@ -29,7 +29,7 @@ from .forms import (ValidateSourceForm, ConfirmDeleteSourceForm, RedownloadMedia
from .utils import validate_url, delete_file, multi_key_sort
from .tasks import (map_task_to_instance, get_error_message,
get_source_completed_tasks, get_media_download_task,
delete_task_by_media, index_source_task)
delete_task_by_media, index_source_task, migrate_queues)
from .choices import (Val, MediaServerType, SourceResolution,
YouTube_SourceType, youtube_long_source_types,
youtube_help, youtube_validation_urls)
@ -118,15 +118,15 @@ class SourcesView(ListView):
if sobj is None:
return HttpResponseNotFound()
source = sobj
verbose_name = _('Index media from source "{}" once')
index_source_task(
str(sobj.pk),
queue=str(sobj.pk),
repeat=0,
priority=10,
schedule=30,
str(source.pk),
remove_existing_tasks=False,
verbose_name=verbose_name.format(sobj.name))
repeat=0,
schedule=30,
verbose_name=verbose_name.format(source.name),
)
url = reverse_lazy('sync:sources')
url = append_uri_params(url, {'message': 'source-refreshed'})
return HttpResponseRedirect(url)
@ -768,7 +768,8 @@ class TasksView(ListView):
def get_queryset(self):
qs = Task.objects.all()
if self.filter_source:
qs = qs.filter(queue=str(self.filter_source.pk))
params_prefix=f'[["{self.filter_source.pk}"'
qs = qs.filter(task_params__istartswith=params_prefix)
order = getattr(settings,
'BACKGROUND_TASK_PRIORITY_ORDERING',
'DESC'
@ -796,6 +797,7 @@ class TasksView(ListView):
data['total_errors'] = errors_qs.count()
data['scheduled'] = list()
data['total_scheduled'] = scheduled_qs.count()
data['migrated'] = migrate_queues()
def add_to_task(task):
obj, url = map_task_to_instance(task)
@ -896,7 +898,8 @@ class CompletedTasksView(ListView):
def get_queryset(self):
qs = CompletedTask.objects.all()
if self.filter_source:
qs = qs.filter(queue=str(self.filter_source.pk))
params_prefix=f'[["{self.filter_source.pk}"'
qs = qs.filter(task_params__istartswith=params_prefix)
return qs.order_by('-run_at')
def get_context_data(self, *args, **kwargs):
@ -933,8 +936,6 @@ class ResetTasks(FormView):
index_source_task(
str(source.pk),
repeat=source.index_schedule,
queue=str(source.pk),
priority=10,
verbose_name=verbose_name.format(source.name)
)
# This also chains down to call each Media objects .save() as well

View File

@ -170,16 +170,16 @@ def get_media_info(url, /, *, days=None, info_json=None):
})
default_postprocessors = user_set('postprocessors', default_opts.__dict__, list())
postprocessors = user_set('postprocessors', opts, default_postprocessors)
postprocessors.extend((dict(
postprocessors.append(dict(
key='Exec',
when='playlist',
exec_cmd="/usr/bin/env bash /app/full_playlist.sh '%(id)s' '%(playlist_count)d'",
),))
))
cache_directory_path = Path(user_set('cachedir', opts, '/dev/shm'))
playlist_infojson = 'postprocessor_[%(id)s]_%(n_entries)d_%(playlist_count)d_temp'
outtmpl = dict(
default='',
infojson='%(id)s.%(ext)s' if paths.get('infojson') else '',
infojson='%(extractor)s/%(id)s.%(ext)s' if paths.get('infojson') else '',
pl_infojson=f'{cache_directory_path}/infojson/playlist/{playlist_infojson}.%(ext)s',
)
for k in OUTTMPL_TYPES.keys():

View File

@ -136,7 +136,7 @@ HEALTHCHECK_ALLOWED_IPS = ('127.0.0.1',)
MAX_ATTEMPTS = 15 # Number of times tasks will be retried
MAX_RUN_TIME = 1*(24*60*60) # Maximum amount of time in seconds a task can run
BACKGROUND_TASK_RUN_ASYNC = True # Run tasks async in the background
BACKGROUND_TASK_RUN_ASYNC = False # Run tasks async in the background
BACKGROUND_TASK_ASYNC_THREADS = 1 # Number of async tasks to run at once
MAX_BACKGROUND_TASK_ASYNC_THREADS = 8 # For sanity reasons
BACKGROUND_TASK_PRIORITY_ORDERING = 'ASC' # Use 'niceness' task priority ordering